Module nempy.sym.network

Expand source code
import asyncio
import datetime
import json
import logging
import multiprocessing
import threading
import time
import re
from base64 import b32encode
from binascii import unhexlify
from http import HTTPStatus
from typing import Optional, Union, List, Callable, Dict
from urllib.parse import urlparse
from requests.exceptions import RequestException

import requests
import websockets
from nempy.sym.constants import BlockchainStatuses, EPOCH_TIME_TESTNET, EPOCH_TIME_MAINNET, NetworkType, \
    TransactionTypes, AccountValidationState
from pydantic import BaseModel, StrictInt, StrictFloat
from symbolchain.core.CryptoTypes import Hash256
from symbolchain.core.facade.SymFacade import SymFacade
from tabulate import tabulate
from websockets import exceptions

from . import ed25519, constants, config
from .constants import TransactionStatus

logger = logging.getLogger(__name__)


class SymbolNetworkException(Exception):
    """Is one exception for the convenience of working with the blockchain network"""
    codes = {
        'ResourceNotFound': 404,
        'InvalidAddress': 409,
        'InvalidArgument': 409,
        'InvalidContent': 400,
        'Internal': 500,
    }

    def __init__(self, code, message):
        self.code = self.codes.get(code)
        self.name = code
        self.message = message
        super(SymbolNetworkException, self).__init__(f'{self.code} - {self.name}', self.message)


def url_validation(url):
    """django URL validation regex
    Raise an exception if the url is not valid"""
    regex = re.compile(
        r'^(?:http|ftp)s?://'  # http:// or https://
        r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|'  # domain...
        r'localhost|'  # localhost...
        r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'  # ...or ip
        r'(?::\d+)?'  # optional port
        r'(?:/?|[/?]\S+)$', re.IGNORECASE)
    if re.match(regex, url) is None:
        raise ValueError(f'`{url}` is not a valid URL')


def mosaic_id_to_name_n_real(mosaic_id: str, amount: int) -> Dict[str, float]:
    """
    Converts mosaic identifiers to names and integer numbers to real numbers.

    Parameters
    ----------
    mosaic_id
        Mosaic ID as string
    amount
        Mosaic units in Symbol are defined as absolute amounts. To get an absolute amount,
        multiply the amount of assets you want to create or send by 10^divisibility.
        For example, if the mosaic has divisibility 2, to create or send 10 units (relative)
        you should define 1,000 (absolute) instead.
    Returns
    -------
    Dict[str, float]
        A dictionary with a name and a real amount value. For example
    ```py
    {'id': 'symbol.xym', 'amount': 1.1}
    ```
    """
    if not isinstance(amount, int):
        raise TypeError('To avoid confusion, automatic conversion to integer is prohibited')
    divisibility = get_divisibility(mosaic_id)
    divider = 10 ** int(divisibility)
    mn = get_mosaic_names(mosaic_id)
    name = mosaic_id
    names = mn['mosaicNames'][0]['names']
    if len(names) > 0:
        name = names[0]
    return {'id': name, 'amount': float(amount / divider)}


class Meta(BaseModel):
    """Transaction meta information"""
    height: int
    hash: str
    merkleComponentHash: str
    index: int


class MosaicInfo(BaseModel):
    """Mosaic information in a transaction"""
    id: str
    amount: Union[StrictInt, StrictFloat]

    def __str__(self):
        return f'{self.amount}({self.id})'


class TransactionInfo(BaseModel):
    """Contains information about transactions of the blockchain network"""
    size: int
    signature: str
    signerPublicKey: str
    version: int
    network: int
    type: Union[int, str]
    maxFee: int
    deadline: Union[int, datetime.datetime]
    recipientAddress: str
    message: Optional[str]
    signer_address: Optional[str]
    mosaics: List[MosaicInfo]

    def humanization(self):
        """Converts information from the blockchain into a readable form"""
        self.deadline = Timing().deadline_to_date(self.deadline)
        if self.message is not None:
            self.message = unhexlify(self.message)[1:].decode('utf-8')
        self.recipientAddress = b32encode(unhexlify(self.recipientAddress)).decode('utf-8')[:-1]
        self.mosaics = [MosaicInfo(**mosaic_id_to_name_n_real(mosaic.id, mosaic.amount)) for mosaic in self.mosaics]
        self.type = TransactionTypes.get_type_by_id(self.type).name
        facade = SymFacade(node_selector.network_type.value)
        self.signer_address = str(facade.network.public_key_to_address(Hash256(self.signerPublicKey)))


class TransactionResponse(BaseModel):
    id: str
    meta: Meta
    transaction: TransactionInfo
    status: Optional[str]

    def __str__(self):
        if self.transaction.signer_address.startswith('T'):
            test_net_explorer = 'http://explorer.testnet.symboldev.network/transactions/'
        else:
            test_net_explorer = 'http://explorer.symbolblockchain.io/transactions/'
        prepare = list()
        mosaics = [str(mosaic) for mosaic in self.transaction.mosaics]
        mosaics = '\n'.join(mosaics)
        prepare.append(['Type:', self.transaction.type.title()])
        prepare.append(['Status:', self.status.title()])
        prepare.append(['Hash:', f'{test_net_explorer}{self.meta.hash}'])
        prepare.append(['Paid Fee:', f'{self.transaction.maxFee / 1000000}(XYM)'])
        prepare.append(['Height:', self.meta.height])
        prepare.append(['Deadline:', self.transaction.deadline])
        prepare.append(['Signature:', self.transaction.signature])
        prepare.append(['Signer Public Key:', self.transaction.signerPublicKey])
        prepare.append(['From:', self.transaction.signer_address])
        prepare.append(['To:', self.transaction.recipientAddress])
        prepare.append(['Mosaic:', mosaics])
        prepare.append(['Message:', self.transaction.message])
        table = tabulate(prepare, headers=['Property', 'Value'], tablefmt='grid')
        return table


def send_transaction(payload: bytes) -> bool:
    """Announces a transaction to the network"""
    try:
        headers = {'Content-type': 'application/json'}
        answer = requests.put(f'{node_selector.url}/transactions', data=payload, headers=headers, timeout=10)
        if answer.status_code != HTTPStatus.ACCEPTED:
            raise SymbolNetworkException(**answer.json())
    except (RequestException, SymbolNetworkException) as e:
        logger.exception(e)
        return False
    else:
        return True


def get_mosaic_names(mosaics_ids: Union[list, str]) -> Optional[dict]:
    """
    Get readable names for a set of mosaics.

    Parameters
    ----------
    mosaics_ids
        IDs of mosaic as list or str if there is only one mosaic
    Returns
    -------
    Optional[Dict[str, list]]
        dict of mosaics. For example:
    ```py
    {"mosaicNames": [{"mosaicId": "091F837E059AE13C", "names": ["symbol.xym"]}]}
    ```
    """
    if isinstance(mosaics_ids, str):
        mosaics_ids = [mosaics_ids]
    try:
        for mosaic_id in mosaics_ids:
            if not ed25519.check_hex(mosaic_id, constants.HexSequenceSizes.MOSAIC_ID):
                raise SymbolNetworkException('InvalidArgument', f'mosaicId `{mosaic_id}` has an invalid format')
        payload = {'mosaicIds': mosaics_ids}
        headers = {'Content-type': 'application/json'}
        answer = requests.post(f'{node_selector.url}/namespaces/mosaic/names', json=payload, headers=headers, timeout=10)
        if answer.status_code != HTTPStatus.OK:
            raise SymbolNetworkException(**answer.json())
    except (RequestException, SymbolNetworkException) as e:
        logger.exception(e)
        raise
    else:
        return answer.json()


def get_accounts_info(address: str) -> Optional[dict]:
    try:
        if (avs := ed25519.check_address(address)) != AccountValidationState.OK:
            raise SymbolNetworkException('InvalidAddress', f'Incorrect account address: `{address}`: {avs}')
        endpoint = f'{node_selector.url}/accounts/{address}'
        answer = requests.get(endpoint)
        if answer.status_code != HTTPStatus.OK:
            return None
    except RequestException as e:
        logger.exception(e)
        raise
    except SymbolNetworkException as e:
        logger.exception(e)
        raise
    else:
        return answer.json()


def search_transactions(address: Optional[str] = None,
                        recipient_address: Optional[str] = None,
                        signer_public_key: Optional[str] = None,
                        height: Optional[int] = None,
                        from_height: Optional[int] = None,
                        to_height: Optional[str] = None,
                        from_transfer_amount: Optional[str] = None,
                        to_transfer_amount: Optional[str] = None,
                        type: int = 16724,
                        embedded: bool = False,
                        transfer_mosaic_id: Optional[str] = None,
                        page_size: int = 10,
                        page_number: int = 1,
                        offset: Optional[str] = None,
                        order: str = 'desc',
                        transaction_status: TransactionStatus = TransactionStatus.CONFIRMED_ADDED
                        ) -> Optional[list]:
    params = {
        'address': address,
        'recipientAddress': recipient_address,
        'signerPublicKey': signer_public_key,
        'height': height,
        'fromHeight': from_height,
        'toHeight': to_height,
        'fromTransferAmount': from_transfer_amount,
        'toTransferAmount': to_transfer_amount,
        'type': type,
        'embedded': str(embedded).lower(),
        'transferMosaicId': transfer_mosaic_id,
        'pageSize': page_size,
        'pageNumber': page_number,
        'offset': offset,
        'order': order
    }
    payload = {key: val for key, val in params.items() if val is not None}
    endpoint = f'{node_selector.url}/transactions/{transaction_status.value}'
    try:
        answer = requests.get(endpoint, params=payload)
        if answer.status_code != HTTPStatus.OK:
            raise SymbolNetworkException(**answer.json())
    except RequestException as e:
        logger.exception(e)
        raise
    except SymbolNetworkException as e:
        logger.exception(e)
        raise
    transactions = answer.json()
    transactions_response = []
    for transaction in transactions['data']:
        mosaics = [MosaicInfo(id=mosaic['id'], amount=int(mosaic['amount'])) for mosaic in transaction['transaction']['mosaics']]
        del(transaction['transaction']['mosaics'])
        _transaction = TransactionResponse(id=transaction['id'],
                                           meta=Meta(**transaction['meta']),
                                           transaction=TransactionInfo(mosaics=mosaics, **transaction['transaction'])
                                           )
        _transaction.status = transaction_status.value
        transactions_response.append(_transaction)
        _transaction.transaction.humanization()
    return transactions_response


def get_namespace_info(namespace_id: str) -> Optional[dict]:
    endpoint = f'{node_selector.url}/namespaces/{namespace_id}'
    try:
        answer = requests.get(endpoint)
    except Exception as e:
        logger.error(e)
        return None
    if answer.status_code != HTTPStatus.OK:
        logger.error(answer.text)
        if answer.status_code == HTTPStatus.NOT_FOUND:
            logger.error(f'Invalid namespace ID `{namespace_id}`')
            return {}
        return None
    namespace_info = answer.json()
    return namespace_info


def check_transaction_state(transaction_hash):
    timeout = 10
    check_order = ['confirmed', 'unconfirmed', 'partial']
    status = TransactionStatus.NOT_FOUND
    for checker in check_order:
        endpoint = f'{node_selector.url}/transactions/{checker}/{transaction_hash}'
        try:
            answer = requests.get(endpoint, timeout=timeout)
            if answer.status_code != 200:
                raise SymbolNetworkException(**answer.json())
        except (RequestException, SymbolNetworkException) as e:
            if isinstance(e, SymbolNetworkException) and e.code == 404:
                return TransactionStatus.NOT_FOUND
            logger.exception(e)
            raise
        else:
            if checker == 'confirmed':
                status = TransactionStatus.CONFIRMED_ADDED
            elif checker == 'unconfirmed':
                status = TransactionStatus.UNCONFIRMED_ADDED
            elif checker == 'partial':
                status = TransactionStatus.PARTIAL_ADDED
        return status


def get_network_properties():
    answer = requests.get(f'{node_selector.url}/network/properties')
    if answer.status_code == HTTPStatus.OK:
        network_properties = answer.json()
        return network_properties
    answer.raise_for_status()


def get_node_network():
    try:
        answer = requests.get(f'{node_selector.url}/node/info')
    except RequestException as e:
        logger.exception(e)
        raise
    if answer.status_code == HTTPStatus.OK:
        fee_info = answer.json()
        network_generation_hash_seed = fee_info['networkGenerationHashSeed']
        if network_generation_hash_seed == constants.NETWORK_GENERATION_HASH_SEED_TEST:
            return NetworkType.TEST_NET
        elif network_generation_hash_seed == constants.NETWORK_GENERATION_HASH_SEED_PUBLIC:
            return NetworkType.MAIN_NET
        else:
            return None
    answer.raise_for_status()


def get_block_information(height: int):
    answer = requests.get(f'{node_selector.url}/blocks/{height}')
    if answer.status_code == HTTPStatus.OK:
        block_info = answer.json()
        return block_info
    answer.raise_for_status()


def get_fee_multipliers():
    try:
        answer = requests.get(f'{node_selector.url}/network/fees/transaction')
    except RequestException as e:
        logger.exception(e)
        return None
    if answer.status_code == HTTPStatus.OK:
        fee_multipliers = answer.json()
        return fee_multipliers
    return None


def get_divisibility(mosaic_id: str) -> Optional[int]:
    try:
        if not ed25519.check_hex(mosaic_id, constants.HexSequenceSizes.MOSAIC_ID):
            raise SymbolNetworkException('InvalidArgument', f'mosaicId `{mosaic_id}` has an invalid format')
        answer = requests.get(f'{node_selector.url}/mosaics/{mosaic_id}')
        if answer.status_code == HTTPStatus.OK:
            node_info = answer.json()
            divisibility = int(node_info['mosaic']['divisibility'])
        else:
            raise SymbolNetworkException(**answer.json())
    except RequestException as e:
        logger.exception(e)
        raise
    except SymbolNetworkException as e:
        logger.exception(e)
        raise
    else:
        return divisibility


def get_divisibilities(n_pages: int = 0):
    mosaics = {}
    payload = {'pageSize': 100}

    page_count = 1
    while True:
        try:
            answer = requests.get(f'{node_selector.url}/mosaics', params=payload)
        except Exception as e:
            logger.error(e)
            return None
        if answer.status_code == HTTPStatus.OK:
            mosaics_pages = answer.json()['data']
            if len(mosaics_pages) == 0:
                return mosaics
            last_page = None
            for page in mosaics_pages:
                mosaic_id = page['mosaic']['id']
                divisibility = page['mosaic']['divisibility']
                mosaics[mosaic_id] = divisibility
                last_page = page
            payload['offset'] = last_page['id']
            page_count = page_count + 1 if n_pages else page_count
            if page_count > n_pages:
                return mosaics


def get_balance(address: str) -> Optional[dict]:
    try:
        address_info = get_accounts_info(address)
        if address_info is None:
            return {}
        mosaics = address_info['account']['mosaics']
        balance = {mosaic['id']: int(mosaic['amount']) / 10 ** get_divisibility(mosaic['id']) for mosaic in mosaics}
    except (SymbolNetworkException, RequestException) as e:
        if isinstance(e, SymbolNetworkException) and e.code == 404:
            return {}
        raise
    else:
        return balance


class Monitor:
    """Allows you to subscribe to events on the blockchain network"""
    where_to_subscribe = {
            'confirmedAdded': 'address',
            'unconfirmedAdded': 'address',
            'unconfirmedRemoved': 'address',
            'partialAdded': 'address',
            'partialRemoved': 'address',
            'cosignature': 'address',
            'status': 'address',
            'block': None,
            'finalizedBlock': None
    }

    def __init__(self,
                 url: str,
                 subscribers: List[str],
                 formatting: bool = False,
                 log: str = '',
                 callback: Optional[Callable] = None):
        self.url = url
        self.subscribers = subscribers
        self.formatting = formatting
        self.log = log
        self.callback = callback
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.monitoring())

    async def monitoring(self):
        result = urlparse(self.url)
        url = f"ws://{result.hostname}:{result.port}/ws"
        print(f'MONITORING: {url}')
        try:
            async with websockets.connect(url) as ws:
                response = json.loads(await ws.recv())
                print(f'UID: {response["uid"]}')
                if 'uid' in response:
                    prepare = []
                    for subscriber in self.subscribers:
                        added = json.dumps({"uid": response["uid"], "subscribe": f"{subscriber}"})
                        await ws.send(added)
                        # print(f'Subscribed to: {subscriber}')
                        prepare.append([subscriber])
                    table = tabulate(prepare, headers=['Subscribers'], tablefmt='grid')
                    print(table)
                    print('Listening... `Ctrl+C` for abort')
                    while True:
                        res = await ws.recv()
                        if self.formatting:
                            res = json.dumps(json.loads(res), indent=4)
                        if self.callback is not None:
                            self.callback(json.loads(res))
                            continue
                        print(res)
                        if self.log:
                            with open(self.log, 'a+') as f:
                                res += '\n'
                                f.write(res)
        except exceptions.WebSocketException as e:
            logger.exception(e)
            raise


class Timing:
    """Works with network time"""
    def __init__(self, network_type: Optional[NetworkType] = None):
        if network_type is None:
            network_type = node_selector.network_type
        if network_type == NetworkType.TEST_NET:
            self.epoch_time = EPOCH_TIME_TESTNET
        elif network_type == NetworkType.MAIN_NET:
            self.epoch_time = EPOCH_TIME_MAINNET
        else:
            raise EnvironmentError('It is not possible to determine the type of network')

    def calc_deadline(self, days: float = 0, seconds: float = 0, milliseconds: float = 0,
                      minutes: float = 0, hours: float = 0, weeks: float = 0) -> int:

        if days + seconds + milliseconds + minutes + hours + weeks <= 0:
            raise TimeoutError('Added time must be positive otherwise the transaction will not have time to process')
        # perhaps this code will be needed if you need to get time from a node
        # node_info = json.loads(requests.get(endpoint).text)
        # receive_timestamp = int(node_info['communicationTimestamps']['receiveTimestamp'])
        # td = datetime.timedelta(milliseconds=receive_timestamp)
        now = datetime.datetime.now(tz=datetime.timezone.utc)
        td = now - self.epoch_time
        td += datetime.timedelta(days=days, seconds=seconds,
                                 milliseconds=milliseconds, minutes=minutes,
                                 hours=hours, weeks=weeks)
        deadline = int(td.total_seconds() * 1000)
        return deadline

    def deadline_to_date(self, deadline: int, is_local: bool = False) -> datetime:
        def utc2local(utc):
            utc_epoch = time.mktime(utc.timetuple())
            offset = datetime.datetime.fromtimestamp(utc_epoch) - datetime.datetime.utcfromtimestamp(utc_epoch)
            return utc + offset

        deadline = int(deadline)
        epoch_timestamp = datetime.datetime.timestamp(self.epoch_time)
        deadline_date_utc = datetime.datetime.utcfromtimestamp(epoch_timestamp + deadline / 1000)
        if is_local:
            local_deadline_date = utc2local(deadline_date_utc)
            return local_deadline_date
        return deadline_date_utc


class Thread:
    """A helper class for working with a thread, starting and stopping it by signals"""
    def __init__(self):
        self.stop_event: Optional[threading.Event] = None
        self.thread: Optional[threading.Thread] = None
        self.is_started = False
        self.updated = threading.Event()

    def stop(self):
        if self.thread is not None and self.thread.is_alive():
            self.stop_event.set()
            self.thread.join()
            self.is_started = False
            logger.debug(f'The node actualization thread {self.thread.name} has been stopped.')

    def start(self, func: Callable, interval: int = 3600):
        self.is_started = True
        self.stop_event = threading.Event()
        self.updated = threading.Event()
        params = {'interval': interval, 'stop_event': self.stop_event, 'updated': self.updated}
        self.thread = threading.Thread(target=func, kwargs=params, daemon=True)
        self.thread.start()
        logger.debug(f'New actualizer thread started: {self.thread.name}')
        return self

    def wait(self):
        updated_is_set = self.updated.wait(60)
        if not updated_is_set:
            raise RuntimeError('Very long waiting time for node selection')


class NodeSelector:
    """Works with a list of nodes in both the main and test networks.
       Offline finds the best connection options and makes adjustments if conditions change.
       Also allows you to add connections manually.
    """
    _URL: Optional[str] = None
    _URLs: Optional[list] = None
    is_elections: bool = False
    _network_type: NetworkType = NetworkType.TEST_NET

    def __init__(self, node_urls: Union[List[str], str]):
        self.thread = Thread()
        self.url = node_urls

    @property
    def url(self):
        while self.is_elections:
            time.sleep(0.1)
        return self._URL

    @url.setter
    def url(self, urls: Union[list, str]):
        self.is_elections = True
        self.thread.stop()
        if isinstance(urls, str):
            urls = [urls]
        for url in urls:
            url_validation(url)
        self._URLs = urls
        if len(self._URLs) == 1:
            self._URL = self._URLs[0]  # setting a single URL value
            logger.debug(f'Installed node: {self._URL}')
        else:
            self.thread.start(self.node_actualizer, interval=3600).wait()
        self.is_elections = False

    def node_actualizer(self, interval, stop_event, updated):
        while True:
            self.reelection_node()
            updated.set()
            event_is_set = stop_event.wait(interval)
            if event_is_set:
                break

    def reelection_node(self):
        asyncio.set_event_loop(asyncio.new_event_loop())
        logger.debug('Node reselecting...')
        heights = [NodeSelector.get_height(url) for url in self._URLs]
        max_height = max(heights)
        heights_filter = [True if height >= max_height * 0.97 else False for height in heights]
        # filtered by block height - 97%
        filtered_by_height = [url for i, url in enumerate(self._URLs) if heights_filter[i]]
        urls_p_h = {url: (NodeSelector.ping(url), NodeSelector.simple_health(url)) for url in filtered_by_height}
        # Remove non-working nodes from the dict
        working = {key: val for key, val in urls_p_h.items() if val[1]}
        _sorted_URLs = [k for k, v in sorted(working.items(), key=lambda item: item[1][0])]
        new_url = _sorted_URLs[0] if len(_sorted_URLs) > 0 else None
        if new_url != self._URL and self._URL is not None:
            logger.warning(f'Reselection node: {self._URL} -> {new_url}')
        if new_url is None:
            logger.error('It was not possible to select the current node from the list of available ones')
        self._URL = new_url
        logger.debug(f'Selected node: {self._URL}')

    @property
    def network_type(self):
        return self._network_type

    @network_type.setter
    def network_type(self, network_type):
        if network_type == self.network_type:
            return
        self._network_type = network_type
        if self._network_type == NetworkType.MAIN_NET:
            logger.debug('Switch to MAIN network')
            self.url = config.MAIN_NODE_URLs
        elif self._network_type == NetworkType.TEST_NET:
            logger.debug('Switch to TEST network')
            self.url = config.TEST_NODE_URLs
        else:
            raise TypeError('Unknown network type')

    @staticmethod
    def health(url) -> BlockchainStatuses:
        """
        Returns the statuses of node services
        Parameters
        ----------
        url
            URL node in the form of http://ngl-dual-001.testnet.symboldev.network:3000
        Returns
        -------
        BlockchainStatuses
            The statuses of node services
        ```py
        BlockchainStatuses.DB_FAILURE
        BlockchainStatuses.NO_NODES_AVAILABLE
        BlockchainStatuses.NOT_INITIALIZED
        BlockchainStatuses.REST_FAILURE
        BlockchainStatuses.OK
        BlockchainStatuses.UNKNOWN
        ```
        """
        if url is None:
            return BlockchainStatuses.NO_NODES_AVAILABLE
        try:
            answer = requests.get(f'{url}/node/health', timeout=1)
        except Exception as e:
            logger.exception(e)
            return BlockchainStatuses.REST_FAILURE
        if answer.status_code == HTTPStatus.OK:
            node_info = answer.json()
            if node_info['status']['apiNode'] == 'up' and node_info['status']['db'] == 'up':
                return BlockchainStatuses.OK
            if node_info['status']['apiNode'] == 'down':
                return BlockchainStatuses.NODE_FAILURE
            if node_info['status']['db'] == 'down':
                return BlockchainStatuses.DB_FAILURE
        return BlockchainStatuses.UNKNOWN

    @staticmethod
    def simple_health(url) -> bool:
        health_status = NodeSelector.health(url)
        if health_status == BlockchainStatuses.OK:
            return True
        return False

    @staticmethod
    def get_height(url) -> int:
        """
        Returns the last block known to the node
        Parameters
        ----------
        url
            URL node in the form of http://ngl-dual-001.testnet.symboldev.network:3000

        Returns
        -------

        """
        try:
            answer = requests.get(f'{url}/chain/info', timeout=1)
        except Exception:
            return 0
        node_info = answer.json()
        height = node_info['height']
        return int(height)

    @staticmethod
    def ping(url) -> Optional[float]:
        """Calculate and return a latency point using sockets"""
        if multiprocessing.current_process().daemon:
            asyncio.set_event_loop(asyncio.new_event_loop())
        parse_result = urlparse(url)
        loop = asyncio.get_event_loop()
        latency = loop.run_until_complete(NodeSelector.measure_latency(host=parse_result.hostname, port=parse_result.port, runs=3))
        if (result := len(list(filter(None, latency)))) == 0:
            return None
        average = sum(filter(None, latency)) / result
        return average

    @staticmethod
    async def measure_latency(
            host: str,
            port: int = 443,
            timeout: float = 5,
            runs: int = 1,
            wait: float = 0,
    ) -> list:
        """
        Builds a list composed of latency_points
        Parameters
        ----------
        host
            Host name
        port
            Port
        timeout
            Server response timeout
        runs
            Number of attempts
        wait
            Delay before request
        Returns
        -------
        list
            list of latency for all runs
        """
        tasks = []
        latency_points = []
        for i in range(runs):
            await asyncio.sleep(wait)
            tasks.append(asyncio.create_task(NodeSelector.latency_point(host=host, port=port, timeout=timeout)))
            # last_latency_point = await latency_point(host=host, port=port, timeout=timeout)
        for i in range(runs):
            latency_points.append(await tasks[i])
        return latency_points

    @staticmethod
    async def latency_point(host: str, port: int = 443, timeout: float = 5) -> Optional[float]:
        """
        Calculate a latency point using sockets. If something bad happens the point returned is None
        Parameters
        ----------
        host
            Host name
        port
            Port
        timeout
            Server response timeout
        Returns
        -------
        Optional[float]
            Returns float if possible
        """
        # New Socket and Time out
        # Start a timer
        s_start = time.time()

        # Try to Connect
        uri = f"ws://{host}:{port}"
        try:
            async with websockets.connect(uri, close_timeout=timeout):
                pass
        except exceptions.InvalidMessage:
            pass
        except exceptions.InvalidStatusCode:
            pass
        except Exception as e:
            logger.debug(str(e))
            return None

        # Stop Timer
        s_runtime = (time.time() - s_start) * 1000
        return float(s_runtime)


# singleton for background work with the list of nodes
node_selector = NodeSelector(config.TEST_NODE_URLs)

Functions

def check_transaction_state(transaction_hash)
Expand source code
def check_transaction_state(transaction_hash):
    timeout = 10
    check_order = ['confirmed', 'unconfirmed', 'partial']
    status = TransactionStatus.NOT_FOUND
    for checker in check_order:
        endpoint = f'{node_selector.url}/transactions/{checker}/{transaction_hash}'
        try:
            answer = requests.get(endpoint, timeout=timeout)
            if answer.status_code != 200:
                raise SymbolNetworkException(**answer.json())
        except (RequestException, SymbolNetworkException) as e:
            if isinstance(e, SymbolNetworkException) and e.code == 404:
                return TransactionStatus.NOT_FOUND
            logger.exception(e)
            raise
        else:
            if checker == 'confirmed':
                status = TransactionStatus.CONFIRMED_ADDED
            elif checker == 'unconfirmed':
                status = TransactionStatus.UNCONFIRMED_ADDED
            elif checker == 'partial':
                status = TransactionStatus.PARTIAL_ADDED
        return status
def get_accounts_info(address: str) ‑> Optional[dict]
Expand source code
def get_accounts_info(address: str) -> Optional[dict]:
    try:
        if (avs := ed25519.check_address(address)) != AccountValidationState.OK:
            raise SymbolNetworkException('InvalidAddress', f'Incorrect account address: `{address}`: {avs}')
        endpoint = f'{node_selector.url}/accounts/{address}'
        answer = requests.get(endpoint)
        if answer.status_code != HTTPStatus.OK:
            return None
    except RequestException as e:
        logger.exception(e)
        raise
    except SymbolNetworkException as e:
        logger.exception(e)
        raise
    else:
        return answer.json()
def get_balance(address: str) ‑> Optional[dict]
Expand source code
def get_balance(address: str) -> Optional[dict]:
    try:
        address_info = get_accounts_info(address)
        if address_info is None:
            return {}
        mosaics = address_info['account']['mosaics']
        balance = {mosaic['id']: int(mosaic['amount']) / 10 ** get_divisibility(mosaic['id']) for mosaic in mosaics}
    except (SymbolNetworkException, RequestException) as e:
        if isinstance(e, SymbolNetworkException) and e.code == 404:
            return {}
        raise
    else:
        return balance
def get_block_information(height: int)
Expand source code
def get_block_information(height: int):
    answer = requests.get(f'{node_selector.url}/blocks/{height}')
    if answer.status_code == HTTPStatus.OK:
        block_info = answer.json()
        return block_info
    answer.raise_for_status()
def get_divisibilities(n_pages: int = 0)
Expand source code
def get_divisibilities(n_pages: int = 0):
    mosaics = {}
    payload = {'pageSize': 100}

    page_count = 1
    while True:
        try:
            answer = requests.get(f'{node_selector.url}/mosaics', params=payload)
        except Exception as e:
            logger.error(e)
            return None
        if answer.status_code == HTTPStatus.OK:
            mosaics_pages = answer.json()['data']
            if len(mosaics_pages) == 0:
                return mosaics
            last_page = None
            for page in mosaics_pages:
                mosaic_id = page['mosaic']['id']
                divisibility = page['mosaic']['divisibility']
                mosaics[mosaic_id] = divisibility
                last_page = page
            payload['offset'] = last_page['id']
            page_count = page_count + 1 if n_pages else page_count
            if page_count > n_pages:
                return mosaics
def get_divisibility(mosaic_id: str) ‑> Optional[int]
Expand source code
def get_divisibility(mosaic_id: str) -> Optional[int]:
    try:
        if not ed25519.check_hex(mosaic_id, constants.HexSequenceSizes.MOSAIC_ID):
            raise SymbolNetworkException('InvalidArgument', f'mosaicId `{mosaic_id}` has an invalid format')
        answer = requests.get(f'{node_selector.url}/mosaics/{mosaic_id}')
        if answer.status_code == HTTPStatus.OK:
            node_info = answer.json()
            divisibility = int(node_info['mosaic']['divisibility'])
        else:
            raise SymbolNetworkException(**answer.json())
    except RequestException as e:
        logger.exception(e)
        raise
    except SymbolNetworkException as e:
        logger.exception(e)
        raise
    else:
        return divisibility
def get_fee_multipliers()
Expand source code
def get_fee_multipliers():
    try:
        answer = requests.get(f'{node_selector.url}/network/fees/transaction')
    except RequestException as e:
        logger.exception(e)
        return None
    if answer.status_code == HTTPStatus.OK:
        fee_multipliers = answer.json()
        return fee_multipliers
    return None
def get_mosaic_names(mosaics_ids: Union[list, str]) ‑> Optional[dict]

Get readable names for a set of mosaics.

Parameters

mosaics_ids
IDs of mosaic as list or str if there is only one mosaic

Returns

Optional[Dict[str, list]]
dict of mosaics. For example:
{"mosaicNames": [{"mosaicId": "091F837E059AE13C", "names": ["symbol.xym"]}]}
Expand source code
def get_mosaic_names(mosaics_ids: Union[list, str]) -> Optional[dict]:
    """
    Get readable names for a set of mosaics.

    Parameters
    ----------
    mosaics_ids
        IDs of mosaic as list or str if there is only one mosaic
    Returns
    -------
    Optional[Dict[str, list]]
        dict of mosaics. For example:
    ```py
    {"mosaicNames": [{"mosaicId": "091F837E059AE13C", "names": ["symbol.xym"]}]}
    ```
    """
    if isinstance(mosaics_ids, str):
        mosaics_ids = [mosaics_ids]
    try:
        for mosaic_id in mosaics_ids:
            if not ed25519.check_hex(mosaic_id, constants.HexSequenceSizes.MOSAIC_ID):
                raise SymbolNetworkException('InvalidArgument', f'mosaicId `{mosaic_id}` has an invalid format')
        payload = {'mosaicIds': mosaics_ids}
        headers = {'Content-type': 'application/json'}
        answer = requests.post(f'{node_selector.url}/namespaces/mosaic/names', json=payload, headers=headers, timeout=10)
        if answer.status_code != HTTPStatus.OK:
            raise SymbolNetworkException(**answer.json())
    except (RequestException, SymbolNetworkException) as e:
        logger.exception(e)
        raise
    else:
        return answer.json()
def get_namespace_info(namespace_id: str) ‑> Optional[dict]
Expand source code
def get_namespace_info(namespace_id: str) -> Optional[dict]:
    endpoint = f'{node_selector.url}/namespaces/{namespace_id}'
    try:
        answer = requests.get(endpoint)
    except Exception as e:
        logger.error(e)
        return None
    if answer.status_code != HTTPStatus.OK:
        logger.error(answer.text)
        if answer.status_code == HTTPStatus.NOT_FOUND:
            logger.error(f'Invalid namespace ID `{namespace_id}`')
            return {}
        return None
    namespace_info = answer.json()
    return namespace_info
def get_network_properties()
Expand source code
def get_network_properties():
    answer = requests.get(f'{node_selector.url}/network/properties')
    if answer.status_code == HTTPStatus.OK:
        network_properties = answer.json()
        return network_properties
    answer.raise_for_status()
def get_node_network()
Expand source code
def get_node_network():
    try:
        answer = requests.get(f'{node_selector.url}/node/info')
    except RequestException as e:
        logger.exception(e)
        raise
    if answer.status_code == HTTPStatus.OK:
        fee_info = answer.json()
        network_generation_hash_seed = fee_info['networkGenerationHashSeed']
        if network_generation_hash_seed == constants.NETWORK_GENERATION_HASH_SEED_TEST:
            return NetworkType.TEST_NET
        elif network_generation_hash_seed == constants.NETWORK_GENERATION_HASH_SEED_PUBLIC:
            return NetworkType.MAIN_NET
        else:
            return None
    answer.raise_for_status()
def mosaic_id_to_name_n_real(mosaic_id: str, amount: int) ‑> Dict[str, float]

Converts mosaic identifiers to names and integer numbers to real numbers.

Parameters

mosaic_id
Mosaic ID as string
amount
Mosaic units in Symbol are defined as absolute amounts. To get an absolute amount, multiply the amount of assets you want to create or send by 10^divisibility. For example, if the mosaic has divisibility 2, to create or send 10 units (relative) you should define 1,000 (absolute) instead.

Returns

Dict[str, float]
A dictionary with a name and a real amount value. For example
{'id': 'symbol.xym', 'amount': 1.1}
Expand source code
def mosaic_id_to_name_n_real(mosaic_id: str, amount: int) -> Dict[str, float]:
    """
    Converts mosaic identifiers to names and integer numbers to real numbers.

    Parameters
    ----------
    mosaic_id
        Mosaic ID as string
    amount
        Mosaic units in Symbol are defined as absolute amounts. To get an absolute amount,
        multiply the amount of assets you want to create or send by 10^divisibility.
        For example, if the mosaic has divisibility 2, to create or send 10 units (relative)
        you should define 1,000 (absolute) instead.
    Returns
    -------
    Dict[str, float]
        A dictionary with a name and a real amount value. For example
    ```py
    {'id': 'symbol.xym', 'amount': 1.1}
    ```
    """
    if not isinstance(amount, int):
        raise TypeError('To avoid confusion, automatic conversion to integer is prohibited')
    divisibility = get_divisibility(mosaic_id)
    divider = 10 ** int(divisibility)
    mn = get_mosaic_names(mosaic_id)
    name = mosaic_id
    names = mn['mosaicNames'][0]['names']
    if len(names) > 0:
        name = names[0]
    return {'id': name, 'amount': float(amount / divider)}
def search_transactions(address: Optional[str] = None, recipient_address: Optional[str] = None, signer_public_key: Optional[str] = None, height: Optional[int] = None, from_height: Optional[int] = None, to_height: Optional[str] = None, from_transfer_amount: Optional[str] = None, to_transfer_amount: Optional[str] = None, type: int = 16724, embedded: bool = False, transfer_mosaic_id: Optional[str] = None, page_size: int = 10, page_number: int = 1, offset: Optional[str] = None, order: str = 'desc', transaction_status: TransactionStatus = TransactionStatus.CONFIRMED_ADDED) ‑> Optional[list]
Expand source code
def search_transactions(address: Optional[str] = None,
                        recipient_address: Optional[str] = None,
                        signer_public_key: Optional[str] = None,
                        height: Optional[int] = None,
                        from_height: Optional[int] = None,
                        to_height: Optional[str] = None,
                        from_transfer_amount: Optional[str] = None,
                        to_transfer_amount: Optional[str] = None,
                        type: int = 16724,
                        embedded: bool = False,
                        transfer_mosaic_id: Optional[str] = None,
                        page_size: int = 10,
                        page_number: int = 1,
                        offset: Optional[str] = None,
                        order: str = 'desc',
                        transaction_status: TransactionStatus = TransactionStatus.CONFIRMED_ADDED
                        ) -> Optional[list]:
    params = {
        'address': address,
        'recipientAddress': recipient_address,
        'signerPublicKey': signer_public_key,
        'height': height,
        'fromHeight': from_height,
        'toHeight': to_height,
        'fromTransferAmount': from_transfer_amount,
        'toTransferAmount': to_transfer_amount,
        'type': type,
        'embedded': str(embedded).lower(),
        'transferMosaicId': transfer_mosaic_id,
        'pageSize': page_size,
        'pageNumber': page_number,
        'offset': offset,
        'order': order
    }
    payload = {key: val for key, val in params.items() if val is not None}
    endpoint = f'{node_selector.url}/transactions/{transaction_status.value}'
    try:
        answer = requests.get(endpoint, params=payload)
        if answer.status_code != HTTPStatus.OK:
            raise SymbolNetworkException(**answer.json())
    except RequestException as e:
        logger.exception(e)
        raise
    except SymbolNetworkException as e:
        logger.exception(e)
        raise
    transactions = answer.json()
    transactions_response = []
    for transaction in transactions['data']:
        mosaics = [MosaicInfo(id=mosaic['id'], amount=int(mosaic['amount'])) for mosaic in transaction['transaction']['mosaics']]
        del(transaction['transaction']['mosaics'])
        _transaction = TransactionResponse(id=transaction['id'],
                                           meta=Meta(**transaction['meta']),
                                           transaction=TransactionInfo(mosaics=mosaics, **transaction['transaction'])
                                           )
        _transaction.status = transaction_status.value
        transactions_response.append(_transaction)
        _transaction.transaction.humanization()
    return transactions_response
def send_transaction(payload: bytes) ‑> bool

Announces a transaction to the network

Expand source code
def send_transaction(payload: bytes) -> bool:
    """Announces a transaction to the network"""
    try:
        headers = {'Content-type': 'application/json'}
        answer = requests.put(f'{node_selector.url}/transactions', data=payload, headers=headers, timeout=10)
        if answer.status_code != HTTPStatus.ACCEPTED:
            raise SymbolNetworkException(**answer.json())
    except (RequestException, SymbolNetworkException) as e:
        logger.exception(e)
        return False
    else:
        return True
def url_validation(url)

django URL validation regex Raise an exception if the url is not valid

Expand source code
def url_validation(url):
    """django URL validation regex
    Raise an exception if the url is not valid"""
    regex = re.compile(
        r'^(?:http|ftp)s?://'  # http:// or https://
        r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|'  # domain...
        r'localhost|'  # localhost...
        r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'  # ...or ip
        r'(?::\d+)?'  # optional port
        r'(?:/?|[/?]\S+)$', re.IGNORECASE)
    if re.match(regex, url) is None:
        raise ValueError(f'`{url}` is not a valid URL')

Classes

class Meta (**data: Any)

Transaction meta information

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class Meta(BaseModel):
    """Transaction meta information"""
    height: int
    hash: str
    merkleComponentHash: str
    index: int

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var hash : str
var height : int
var index : int
var merkleComponentHash : str
class Monitor (url: str, subscribers: List[str], formatting: bool = False, log: str = '', callback: Optional[Callable] = None)

Allows you to subscribe to events on the blockchain network

Expand source code
class Monitor:
    """Allows you to subscribe to events on the blockchain network"""
    where_to_subscribe = {
            'confirmedAdded': 'address',
            'unconfirmedAdded': 'address',
            'unconfirmedRemoved': 'address',
            'partialAdded': 'address',
            'partialRemoved': 'address',
            'cosignature': 'address',
            'status': 'address',
            'block': None,
            'finalizedBlock': None
    }

    def __init__(self,
                 url: str,
                 subscribers: List[str],
                 formatting: bool = False,
                 log: str = '',
                 callback: Optional[Callable] = None):
        self.url = url
        self.subscribers = subscribers
        self.formatting = formatting
        self.log = log
        self.callback = callback
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.monitoring())

    async def monitoring(self):
        result = urlparse(self.url)
        url = f"ws://{result.hostname}:{result.port}/ws"
        print(f'MONITORING: {url}')
        try:
            async with websockets.connect(url) as ws:
                response = json.loads(await ws.recv())
                print(f'UID: {response["uid"]}')
                if 'uid' in response:
                    prepare = []
                    for subscriber in self.subscribers:
                        added = json.dumps({"uid": response["uid"], "subscribe": f"{subscriber}"})
                        await ws.send(added)
                        # print(f'Subscribed to: {subscriber}')
                        prepare.append([subscriber])
                    table = tabulate(prepare, headers=['Subscribers'], tablefmt='grid')
                    print(table)
                    print('Listening... `Ctrl+C` for abort')
                    while True:
                        res = await ws.recv()
                        if self.formatting:
                            res = json.dumps(json.loads(res), indent=4)
                        if self.callback is not None:
                            self.callback(json.loads(res))
                            continue
                        print(res)
                        if self.log:
                            with open(self.log, 'a+') as f:
                                res += '\n'
                                f.write(res)
        except exceptions.WebSocketException as e:
            logger.exception(e)
            raise

Class variables

var where_to_subscribe

Methods

async def monitoring(self)
Expand source code
async def monitoring(self):
    result = urlparse(self.url)
    url = f"ws://{result.hostname}:{result.port}/ws"
    print(f'MONITORING: {url}')
    try:
        async with websockets.connect(url) as ws:
            response = json.loads(await ws.recv())
            print(f'UID: {response["uid"]}')
            if 'uid' in response:
                prepare = []
                for subscriber in self.subscribers:
                    added = json.dumps({"uid": response["uid"], "subscribe": f"{subscriber}"})
                    await ws.send(added)
                    # print(f'Subscribed to: {subscriber}')
                    prepare.append([subscriber])
                table = tabulate(prepare, headers=['Subscribers'], tablefmt='grid')
                print(table)
                print('Listening... `Ctrl+C` for abort')
                while True:
                    res = await ws.recv()
                    if self.formatting:
                        res = json.dumps(json.loads(res), indent=4)
                    if self.callback is not None:
                        self.callback(json.loads(res))
                        continue
                    print(res)
                    if self.log:
                        with open(self.log, 'a+') as f:
                            res += '\n'
                            f.write(res)
    except exceptions.WebSocketException as e:
        logger.exception(e)
        raise
class MosaicInfo (**data: Any)

Mosaic information in a transaction

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class MosaicInfo(BaseModel):
    """Mosaic information in a transaction"""
    id: str
    amount: Union[StrictInt, StrictFloat]

    def __str__(self):
        return f'{self.amount}({self.id})'

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var amount : Union[pydantic.types.StrictInt, pydantic.types.StrictFloat]
var id : str
class NodeSelector (node_urls: Union[List[str], str])

Works with a list of nodes in both the main and test networks. Offline finds the best connection options and makes adjustments if conditions change. Also allows you to add connections manually.

Expand source code
class NodeSelector:
    """Works with a list of nodes in both the main and test networks.
       Offline finds the best connection options and makes adjustments if conditions change.
       Also allows you to add connections manually.
    """
    _URL: Optional[str] = None
    _URLs: Optional[list] = None
    is_elections: bool = False
    _network_type: NetworkType = NetworkType.TEST_NET

    def __init__(self, node_urls: Union[List[str], str]):
        self.thread = Thread()
        self.url = node_urls

    @property
    def url(self):
        while self.is_elections:
            time.sleep(0.1)
        return self._URL

    @url.setter
    def url(self, urls: Union[list, str]):
        self.is_elections = True
        self.thread.stop()
        if isinstance(urls, str):
            urls = [urls]
        for url in urls:
            url_validation(url)
        self._URLs = urls
        if len(self._URLs) == 1:
            self._URL = self._URLs[0]  # setting a single URL value
            logger.debug(f'Installed node: {self._URL}')
        else:
            self.thread.start(self.node_actualizer, interval=3600).wait()
        self.is_elections = False

    def node_actualizer(self, interval, stop_event, updated):
        while True:
            self.reelection_node()
            updated.set()
            event_is_set = stop_event.wait(interval)
            if event_is_set:
                break

    def reelection_node(self):
        asyncio.set_event_loop(asyncio.new_event_loop())
        logger.debug('Node reselecting...')
        heights = [NodeSelector.get_height(url) for url in self._URLs]
        max_height = max(heights)
        heights_filter = [True if height >= max_height * 0.97 else False for height in heights]
        # filtered by block height - 97%
        filtered_by_height = [url for i, url in enumerate(self._URLs) if heights_filter[i]]
        urls_p_h = {url: (NodeSelector.ping(url), NodeSelector.simple_health(url)) for url in filtered_by_height}
        # Remove non-working nodes from the dict
        working = {key: val for key, val in urls_p_h.items() if val[1]}
        _sorted_URLs = [k for k, v in sorted(working.items(), key=lambda item: item[1][0])]
        new_url = _sorted_URLs[0] if len(_sorted_URLs) > 0 else None
        if new_url != self._URL and self._URL is not None:
            logger.warning(f'Reselection node: {self._URL} -> {new_url}')
        if new_url is None:
            logger.error('It was not possible to select the current node from the list of available ones')
        self._URL = new_url
        logger.debug(f'Selected node: {self._URL}')

    @property
    def network_type(self):
        return self._network_type

    @network_type.setter
    def network_type(self, network_type):
        if network_type == self.network_type:
            return
        self._network_type = network_type
        if self._network_type == NetworkType.MAIN_NET:
            logger.debug('Switch to MAIN network')
            self.url = config.MAIN_NODE_URLs
        elif self._network_type == NetworkType.TEST_NET:
            logger.debug('Switch to TEST network')
            self.url = config.TEST_NODE_URLs
        else:
            raise TypeError('Unknown network type')

    @staticmethod
    def health(url) -> BlockchainStatuses:
        """
        Returns the statuses of node services
        Parameters
        ----------
        url
            URL node in the form of http://ngl-dual-001.testnet.symboldev.network:3000
        Returns
        -------
        BlockchainStatuses
            The statuses of node services
        ```py
        BlockchainStatuses.DB_FAILURE
        BlockchainStatuses.NO_NODES_AVAILABLE
        BlockchainStatuses.NOT_INITIALIZED
        BlockchainStatuses.REST_FAILURE
        BlockchainStatuses.OK
        BlockchainStatuses.UNKNOWN
        ```
        """
        if url is None:
            return BlockchainStatuses.NO_NODES_AVAILABLE
        try:
            answer = requests.get(f'{url}/node/health', timeout=1)
        except Exception as e:
            logger.exception(e)
            return BlockchainStatuses.REST_FAILURE
        if answer.status_code == HTTPStatus.OK:
            node_info = answer.json()
            if node_info['status']['apiNode'] == 'up' and node_info['status']['db'] == 'up':
                return BlockchainStatuses.OK
            if node_info['status']['apiNode'] == 'down':
                return BlockchainStatuses.NODE_FAILURE
            if node_info['status']['db'] == 'down':
                return BlockchainStatuses.DB_FAILURE
        return BlockchainStatuses.UNKNOWN

    @staticmethod
    def simple_health(url) -> bool:
        health_status = NodeSelector.health(url)
        if health_status == BlockchainStatuses.OK:
            return True
        return False

    @staticmethod
    def get_height(url) -> int:
        """
        Returns the last block known to the node
        Parameters
        ----------
        url
            URL node in the form of http://ngl-dual-001.testnet.symboldev.network:3000

        Returns
        -------

        """
        try:
            answer = requests.get(f'{url}/chain/info', timeout=1)
        except Exception:
            return 0
        node_info = answer.json()
        height = node_info['height']
        return int(height)

    @staticmethod
    def ping(url) -> Optional[float]:
        """Calculate and return a latency point using sockets"""
        if multiprocessing.current_process().daemon:
            asyncio.set_event_loop(asyncio.new_event_loop())
        parse_result = urlparse(url)
        loop = asyncio.get_event_loop()
        latency = loop.run_until_complete(NodeSelector.measure_latency(host=parse_result.hostname, port=parse_result.port, runs=3))
        if (result := len(list(filter(None, latency)))) == 0:
            return None
        average = sum(filter(None, latency)) / result
        return average

    @staticmethod
    async def measure_latency(
            host: str,
            port: int = 443,
            timeout: float = 5,
            runs: int = 1,
            wait: float = 0,
    ) -> list:
        """
        Builds a list composed of latency_points
        Parameters
        ----------
        host
            Host name
        port
            Port
        timeout
            Server response timeout
        runs
            Number of attempts
        wait
            Delay before request
        Returns
        -------
        list
            list of latency for all runs
        """
        tasks = []
        latency_points = []
        for i in range(runs):
            await asyncio.sleep(wait)
            tasks.append(asyncio.create_task(NodeSelector.latency_point(host=host, port=port, timeout=timeout)))
            # last_latency_point = await latency_point(host=host, port=port, timeout=timeout)
        for i in range(runs):
            latency_points.append(await tasks[i])
        return latency_points

    @staticmethod
    async def latency_point(host: str, port: int = 443, timeout: float = 5) -> Optional[float]:
        """
        Calculate a latency point using sockets. If something bad happens the point returned is None
        Parameters
        ----------
        host
            Host name
        port
            Port
        timeout
            Server response timeout
        Returns
        -------
        Optional[float]
            Returns float if possible
        """
        # New Socket and Time out
        # Start a timer
        s_start = time.time()

        # Try to Connect
        uri = f"ws://{host}:{port}"
        try:
            async with websockets.connect(uri, close_timeout=timeout):
                pass
        except exceptions.InvalidMessage:
            pass
        except exceptions.InvalidStatusCode:
            pass
        except Exception as e:
            logger.debug(str(e))
            return None

        # Stop Timer
        s_runtime = (time.time() - s_start) * 1000
        return float(s_runtime)

Class variables

var is_elections : bool

Static methods

def get_height(url) ‑> int

Returns the last block known to the node Parameters


url
URL node in the form of http://ngl-dual-001.testnet.symboldev.network:3000

Returns

Expand source code
@staticmethod
def get_height(url) -> int:
    """
    Returns the last block known to the node
    Parameters
    ----------
    url
        URL node in the form of http://ngl-dual-001.testnet.symboldev.network:3000

    Returns
    -------

    """
    try:
        answer = requests.get(f'{url}/chain/info', timeout=1)
    except Exception:
        return 0
    node_info = answer.json()
    height = node_info['height']
    return int(height)
def health(url) ‑> BlockchainStatuses

Returns the statuses of node services Parameters


url
URL node in the form of http://ngl-dual-001.testnet.symboldev.network:3000

Returns

BlockchainStatuses
The statuses of node services
BlockchainStatuses.DB_FAILURE
BlockchainStatuses.NO_NODES_AVAILABLE
BlockchainStatuses.NOT_INITIALIZED
BlockchainStatuses.REST_FAILURE
BlockchainStatuses.OK
BlockchainStatuses.UNKNOWN
Expand source code
@staticmethod
def health(url) -> BlockchainStatuses:
    """
    Returns the statuses of node services
    Parameters
    ----------
    url
        URL node in the form of http://ngl-dual-001.testnet.symboldev.network:3000
    Returns
    -------
    BlockchainStatuses
        The statuses of node services
    ```py
    BlockchainStatuses.DB_FAILURE
    BlockchainStatuses.NO_NODES_AVAILABLE
    BlockchainStatuses.NOT_INITIALIZED
    BlockchainStatuses.REST_FAILURE
    BlockchainStatuses.OK
    BlockchainStatuses.UNKNOWN
    ```
    """
    if url is None:
        return BlockchainStatuses.NO_NODES_AVAILABLE
    try:
        answer = requests.get(f'{url}/node/health', timeout=1)
    except Exception as e:
        logger.exception(e)
        return BlockchainStatuses.REST_FAILURE
    if answer.status_code == HTTPStatus.OK:
        node_info = answer.json()
        if node_info['status']['apiNode'] == 'up' and node_info['status']['db'] == 'up':
            return BlockchainStatuses.OK
        if node_info['status']['apiNode'] == 'down':
            return BlockchainStatuses.NODE_FAILURE
        if node_info['status']['db'] == 'down':
            return BlockchainStatuses.DB_FAILURE
    return BlockchainStatuses.UNKNOWN
async def latency_point(host: str, port: int = 443, timeout: float = 5) ‑> Optional[float]

Calculate a latency point using sockets. If something bad happens the point returned is None Parameters


host
Host name
port
Port
timeout
Server response timeout

Returns

Optional[float]
Returns float if possible
Expand source code
@staticmethod
async def latency_point(host: str, port: int = 443, timeout: float = 5) -> Optional[float]:
    """
    Calculate a latency point using sockets. If something bad happens the point returned is None
    Parameters
    ----------
    host
        Host name
    port
        Port
    timeout
        Server response timeout
    Returns
    -------
    Optional[float]
        Returns float if possible
    """
    # New Socket and Time out
    # Start a timer
    s_start = time.time()

    # Try to Connect
    uri = f"ws://{host}:{port}"
    try:
        async with websockets.connect(uri, close_timeout=timeout):
            pass
    except exceptions.InvalidMessage:
        pass
    except exceptions.InvalidStatusCode:
        pass
    except Exception as e:
        logger.debug(str(e))
        return None

    # Stop Timer
    s_runtime = (time.time() - s_start) * 1000
    return float(s_runtime)
async def measure_latency(host: str, port: int = 443, timeout: float = 5, runs: int = 1, wait: float = 0) ‑> list

Builds a list composed of latency_points Parameters


host
Host name
port
Port
timeout
Server response timeout
runs
Number of attempts
wait
Delay before request

Returns

list
list of latency for all runs
Expand source code
@staticmethod
async def measure_latency(
        host: str,
        port: int = 443,
        timeout: float = 5,
        runs: int = 1,
        wait: float = 0,
) -> list:
    """
    Builds a list composed of latency_points
    Parameters
    ----------
    host
        Host name
    port
        Port
    timeout
        Server response timeout
    runs
        Number of attempts
    wait
        Delay before request
    Returns
    -------
    list
        list of latency for all runs
    """
    tasks = []
    latency_points = []
    for i in range(runs):
        await asyncio.sleep(wait)
        tasks.append(asyncio.create_task(NodeSelector.latency_point(host=host, port=port, timeout=timeout)))
        # last_latency_point = await latency_point(host=host, port=port, timeout=timeout)
    for i in range(runs):
        latency_points.append(await tasks[i])
    return latency_points
def ping(url) ‑> Optional[float]

Calculate and return a latency point using sockets

Expand source code
@staticmethod
def ping(url) -> Optional[float]:
    """Calculate and return a latency point using sockets"""
    if multiprocessing.current_process().daemon:
        asyncio.set_event_loop(asyncio.new_event_loop())
    parse_result = urlparse(url)
    loop = asyncio.get_event_loop()
    latency = loop.run_until_complete(NodeSelector.measure_latency(host=parse_result.hostname, port=parse_result.port, runs=3))
    if (result := len(list(filter(None, latency)))) == 0:
        return None
    average = sum(filter(None, latency)) / result
    return average
def simple_health(url) ‑> bool
Expand source code
@staticmethod
def simple_health(url) -> bool:
    health_status = NodeSelector.health(url)
    if health_status == BlockchainStatuses.OK:
        return True
    return False

Instance variables

var network_type
Expand source code
@property
def network_type(self):
    return self._network_type
var url
Expand source code
@property
def url(self):
    while self.is_elections:
        time.sleep(0.1)
    return self._URL

Methods

def node_actualizer(self, interval, stop_event, updated)
Expand source code
def node_actualizer(self, interval, stop_event, updated):
    while True:
        self.reelection_node()
        updated.set()
        event_is_set = stop_event.wait(interval)
        if event_is_set:
            break
def reelection_node(self)
Expand source code
def reelection_node(self):
    asyncio.set_event_loop(asyncio.new_event_loop())
    logger.debug('Node reselecting...')
    heights = [NodeSelector.get_height(url) for url in self._URLs]
    max_height = max(heights)
    heights_filter = [True if height >= max_height * 0.97 else False for height in heights]
    # filtered by block height - 97%
    filtered_by_height = [url for i, url in enumerate(self._URLs) if heights_filter[i]]
    urls_p_h = {url: (NodeSelector.ping(url), NodeSelector.simple_health(url)) for url in filtered_by_height}
    # Remove non-working nodes from the dict
    working = {key: val for key, val in urls_p_h.items() if val[1]}
    _sorted_URLs = [k for k, v in sorted(working.items(), key=lambda item: item[1][0])]
    new_url = _sorted_URLs[0] if len(_sorted_URLs) > 0 else None
    if new_url != self._URL and self._URL is not None:
        logger.warning(f'Reselection node: {self._URL} -> {new_url}')
    if new_url is None:
        logger.error('It was not possible to select the current node from the list of available ones')
    self._URL = new_url
    logger.debug(f'Selected node: {self._URL}')
class SymbolNetworkException (code, message)

Is one exception for the convenience of working with the blockchain network

Expand source code
class SymbolNetworkException(Exception):
    """Is one exception for the convenience of working with the blockchain network"""
    codes = {
        'ResourceNotFound': 404,
        'InvalidAddress': 409,
        'InvalidArgument': 409,
        'InvalidContent': 400,
        'Internal': 500,
    }

    def __init__(self, code, message):
        self.code = self.codes.get(code)
        self.name = code
        self.message = message
        super(SymbolNetworkException, self).__init__(f'{self.code} - {self.name}', self.message)

Ancestors

  • builtins.Exception
  • builtins.BaseException

Class variables

var codes
class Thread

A helper class for working with a thread, starting and stopping it by signals

Expand source code
class Thread:
    """A helper class for working with a thread, starting and stopping it by signals"""
    def __init__(self):
        self.stop_event: Optional[threading.Event] = None
        self.thread: Optional[threading.Thread] = None
        self.is_started = False
        self.updated = threading.Event()

    def stop(self):
        if self.thread is not None and self.thread.is_alive():
            self.stop_event.set()
            self.thread.join()
            self.is_started = False
            logger.debug(f'The node actualization thread {self.thread.name} has been stopped.')

    def start(self, func: Callable, interval: int = 3600):
        self.is_started = True
        self.stop_event = threading.Event()
        self.updated = threading.Event()
        params = {'interval': interval, 'stop_event': self.stop_event, 'updated': self.updated}
        self.thread = threading.Thread(target=func, kwargs=params, daemon=True)
        self.thread.start()
        logger.debug(f'New actualizer thread started: {self.thread.name}')
        return self

    def wait(self):
        updated_is_set = self.updated.wait(60)
        if not updated_is_set:
            raise RuntimeError('Very long waiting time for node selection')

Methods

def start(self, func: Callable, interval: int = 3600)
Expand source code
def start(self, func: Callable, interval: int = 3600):
    self.is_started = True
    self.stop_event = threading.Event()
    self.updated = threading.Event()
    params = {'interval': interval, 'stop_event': self.stop_event, 'updated': self.updated}
    self.thread = threading.Thread(target=func, kwargs=params, daemon=True)
    self.thread.start()
    logger.debug(f'New actualizer thread started: {self.thread.name}')
    return self
def stop(self)
Expand source code
def stop(self):
    if self.thread is not None and self.thread.is_alive():
        self.stop_event.set()
        self.thread.join()
        self.is_started = False
        logger.debug(f'The node actualization thread {self.thread.name} has been stopped.')
def wait(self)
Expand source code
def wait(self):
    updated_is_set = self.updated.wait(60)
    if not updated_is_set:
        raise RuntimeError('Very long waiting time for node selection')
class Timing (network_type: Optional[NetworkType] = None)

Works with network time

Expand source code
class Timing:
    """Works with network time"""
    def __init__(self, network_type: Optional[NetworkType] = None):
        if network_type is None:
            network_type = node_selector.network_type
        if network_type == NetworkType.TEST_NET:
            self.epoch_time = EPOCH_TIME_TESTNET
        elif network_type == NetworkType.MAIN_NET:
            self.epoch_time = EPOCH_TIME_MAINNET
        else:
            raise EnvironmentError('It is not possible to determine the type of network')

    def calc_deadline(self, days: float = 0, seconds: float = 0, milliseconds: float = 0,
                      minutes: float = 0, hours: float = 0, weeks: float = 0) -> int:

        if days + seconds + milliseconds + minutes + hours + weeks <= 0:
            raise TimeoutError('Added time must be positive otherwise the transaction will not have time to process')
        # perhaps this code will be needed if you need to get time from a node
        # node_info = json.loads(requests.get(endpoint).text)
        # receive_timestamp = int(node_info['communicationTimestamps']['receiveTimestamp'])
        # td = datetime.timedelta(milliseconds=receive_timestamp)
        now = datetime.datetime.now(tz=datetime.timezone.utc)
        td = now - self.epoch_time
        td += datetime.timedelta(days=days, seconds=seconds,
                                 milliseconds=milliseconds, minutes=minutes,
                                 hours=hours, weeks=weeks)
        deadline = int(td.total_seconds() * 1000)
        return deadline

    def deadline_to_date(self, deadline: int, is_local: bool = False) -> datetime:
        def utc2local(utc):
            utc_epoch = time.mktime(utc.timetuple())
            offset = datetime.datetime.fromtimestamp(utc_epoch) - datetime.datetime.utcfromtimestamp(utc_epoch)
            return utc + offset

        deadline = int(deadline)
        epoch_timestamp = datetime.datetime.timestamp(self.epoch_time)
        deadline_date_utc = datetime.datetime.utcfromtimestamp(epoch_timestamp + deadline / 1000)
        if is_local:
            local_deadline_date = utc2local(deadline_date_utc)
            return local_deadline_date
        return deadline_date_utc

Methods

def calc_deadline(self, days: float = 0, seconds: float = 0, milliseconds: float = 0, minutes: float = 0, hours: float = 0, weeks: float = 0) ‑> int
Expand source code
def calc_deadline(self, days: float = 0, seconds: float = 0, milliseconds: float = 0,
                  minutes: float = 0, hours: float = 0, weeks: float = 0) -> int:

    if days + seconds + milliseconds + minutes + hours + weeks <= 0:
        raise TimeoutError('Added time must be positive otherwise the transaction will not have time to process')
    # perhaps this code will be needed if you need to get time from a node
    # node_info = json.loads(requests.get(endpoint).text)
    # receive_timestamp = int(node_info['communicationTimestamps']['receiveTimestamp'])
    # td = datetime.timedelta(milliseconds=receive_timestamp)
    now = datetime.datetime.now(tz=datetime.timezone.utc)
    td = now - self.epoch_time
    td += datetime.timedelta(days=days, seconds=seconds,
                             milliseconds=milliseconds, minutes=minutes,
                             hours=hours, weeks=weeks)
    deadline = int(td.total_seconds() * 1000)
    return deadline
def deadline_to_date(self, deadline: int, is_local: bool = False) ‑> 
Expand source code
def deadline_to_date(self, deadline: int, is_local: bool = False) -> datetime:
    def utc2local(utc):
        utc_epoch = time.mktime(utc.timetuple())
        offset = datetime.datetime.fromtimestamp(utc_epoch) - datetime.datetime.utcfromtimestamp(utc_epoch)
        return utc + offset

    deadline = int(deadline)
    epoch_timestamp = datetime.datetime.timestamp(self.epoch_time)
    deadline_date_utc = datetime.datetime.utcfromtimestamp(epoch_timestamp + deadline / 1000)
    if is_local:
        local_deadline_date = utc2local(deadline_date_utc)
        return local_deadline_date
    return deadline_date_utc
class TransactionInfo (**data: Any)

Contains information about transactions of the blockchain network

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class TransactionInfo(BaseModel):
    """Contains information about transactions of the blockchain network"""
    size: int
    signature: str
    signerPublicKey: str
    version: int
    network: int
    type: Union[int, str]
    maxFee: int
    deadline: Union[int, datetime.datetime]
    recipientAddress: str
    message: Optional[str]
    signer_address: Optional[str]
    mosaics: List[MosaicInfo]

    def humanization(self):
        """Converts information from the blockchain into a readable form"""
        self.deadline = Timing().deadline_to_date(self.deadline)
        if self.message is not None:
            self.message = unhexlify(self.message)[1:].decode('utf-8')
        self.recipientAddress = b32encode(unhexlify(self.recipientAddress)).decode('utf-8')[:-1]
        self.mosaics = [MosaicInfo(**mosaic_id_to_name_n_real(mosaic.id, mosaic.amount)) for mosaic in self.mosaics]
        self.type = TransactionTypes.get_type_by_id(self.type).name
        facade = SymFacade(node_selector.network_type.value)
        self.signer_address = str(facade.network.public_key_to_address(Hash256(self.signerPublicKey)))

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var deadline : Union[int, datetime.datetime]
var maxFee : int
var message : Optional[str]
var mosaics : List[MosaicInfo]
var network : int
var recipientAddress : str
var signature : str
var signerPublicKey : str
var signer_address : Optional[str]
var size : int
var type : Union[int, str]
var version : int

Methods

def humanization(self)

Converts information from the blockchain into a readable form

Expand source code
def humanization(self):
    """Converts information from the blockchain into a readable form"""
    self.deadline = Timing().deadline_to_date(self.deadline)
    if self.message is not None:
        self.message = unhexlify(self.message)[1:].decode('utf-8')
    self.recipientAddress = b32encode(unhexlify(self.recipientAddress)).decode('utf-8')[:-1]
    self.mosaics = [MosaicInfo(**mosaic_id_to_name_n_real(mosaic.id, mosaic.amount)) for mosaic in self.mosaics]
    self.type = TransactionTypes.get_type_by_id(self.type).name
    facade = SymFacade(node_selector.network_type.value)
    self.signer_address = str(facade.network.public_key_to_address(Hash256(self.signerPublicKey)))
class TransactionResponse (**data: Any)

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class TransactionResponse(BaseModel):
    id: str
    meta: Meta
    transaction: TransactionInfo
    status: Optional[str]

    def __str__(self):
        if self.transaction.signer_address.startswith('T'):
            test_net_explorer = 'http://explorer.testnet.symboldev.network/transactions/'
        else:
            test_net_explorer = 'http://explorer.symbolblockchain.io/transactions/'
        prepare = list()
        mosaics = [str(mosaic) for mosaic in self.transaction.mosaics]
        mosaics = '\n'.join(mosaics)
        prepare.append(['Type:', self.transaction.type.title()])
        prepare.append(['Status:', self.status.title()])
        prepare.append(['Hash:', f'{test_net_explorer}{self.meta.hash}'])
        prepare.append(['Paid Fee:', f'{self.transaction.maxFee / 1000000}(XYM)'])
        prepare.append(['Height:', self.meta.height])
        prepare.append(['Deadline:', self.transaction.deadline])
        prepare.append(['Signature:', self.transaction.signature])
        prepare.append(['Signer Public Key:', self.transaction.signerPublicKey])
        prepare.append(['From:', self.transaction.signer_address])
        prepare.append(['To:', self.transaction.recipientAddress])
        prepare.append(['Mosaic:', mosaics])
        prepare.append(['Message:', self.transaction.message])
        table = tabulate(prepare, headers=['Property', 'Value'], tablefmt='grid')
        return table

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var id : str
var metaMeta
var status : Optional[str]
var transactionTransactionInfo