Source code for nash.api

"""
NashApi - main API class
"""
import os
import time
from collections import defaultdict

from nash import settings
from nash.helpers import login, market_precision_to_num_decimals, upload_wallet_and_keys
import nash.nash_core as nash_core
from sgqlc.endpoint.http import HTTPEndpoint

from nash.decorators import load_markets
from nash.logger import logger
from nash.exceptions import GraphQlError, LoginFailedError
from nash.graphql_signed import GraphQlSignedRequestsMixin
from nash.graphql_unsigned import GraphQlUnsignedRequestsMixin
from nash.graphql_mutating import GraphQlMutatingRequestsMixin
from nash.graphql_accounts import GraphQlAccountsRequestsMixin
from urllib.request import HTTPCookieProcessor
from http.cookiejar import MozillaCookieJar
from urllib.request import build_opener
import json


UA_STRING = 'Mozilla/5.0 (Windows; U; Windows NT 5.1; it; rv:1.8.1.11) Gecko/20071127 Firefox/2.0.0.11'


[docs]class NashApi(GraphQlSignedRequestsMixin, GraphQlUnsignedRequestsMixin, GraphQlMutatingRequestsMixin, GraphQlAccountsRequestsMixin): """ Nash API Class """ cas_api_cookie: str = None account: dict = None chain_indices: dict = None chain_indices_core: dict = None # nash_core requires different identifiers for the chains nash_core_config: dict = None nash_core_init_params: dict = None asset_nonces: dict = None signing_pubkey: str = None gql_endpoint = None auto_relogin = None last_login_time = None # timestamp of last login. used for auto-relogin delay cookie_jar = MozillaCookieJar() _markets_raw = None # [ schema.Market ] markets = None # { name: schema.Market } market_assets = None # { name: schema.Asset } def __init__(self): self.asset_nonces = {} url = '%s/graphql' % settings.ENDPOINT logger.debug("GraphQL HTTP endpoint: %s", url) self.gql_endpoint = HTTPEndpoint(url, urlopen=self.api_urlopen) def _derive_passwords(self, password, salt=settings.SALT): """ Derives the passwords for login and key decryption from the initial password Returns: auth_key (str), crypto_key (str): Derived keys""" auth_key, crypto_key = nash_core.derive_hkdf_keys_from_password(password, salt) return auth_key, crypto_key @property def is_logged_in(self): """ Returns whether the API is currently logged in or not. Returns: is_logged_in (bool) """ return self.nash_core_config is not None
[docs] def create_account(self, username: str, password: str, country_code='US', state_code='MT', full_name='John Nasher', do_login=True): """Create an account. Note that this is not usable in production environments. Args: username (str): username to use password (str): password to use country_code (str, optional): country code of user. Defaults to 'US'. state_code (str, optional): state or region of user. Defaults to 'MT'. full_name (str, optional): full name of user. Defaults to 'John Nasher'. do_login (bool, optional): whether to login after creating account. Defaults to True. """ # Derive passwords auth_key, crypto_key = self._derive_passwords(password) # logger.debug("Auth, crypto key: %s %s" % (auth_key, crypto_key)) resp = self.do_signup(username, auth_key, country_code, full_name, state_code) if resp: logger.debug("Created account for %s " % username) if do_login: self.verify_account(username, password) logger.debug("Logged in as %s " % username)
[docs] def verify_account(self, username, password, do_login=True): """Auto-verifies an account. Note that this will not work in production. Args: username (str): username to verify password (str): password for user.. do_login (bool, optional): whether to login after verifying. Defaults to True. """ resp = self.do_verify_account(username) if resp: logger.debug("verify account for %s " % username) if do_login: self.login(username, password) logger.debug("Logged in as %s " % username)
[docs] def login(self, username, password, twofa_code=None, auto_relogin=True): """ Login at the exchange. Receive the user key and chain indices, derive the private keys If login fails for any reason, raises LoginFailedError. Args: username (str): email address password (str): password twofa_code (str, optional): If 2FA is enabled, the current code auto_relogin (bool): Whether to automatically relogin after a session expiry. Defaults to `True` Returns: None Raises: nash.exceptions.LoginFailedError """ if twofa_code and auto_relogin: raise LoginFailedError("Cannot use auto_relogin with 2FA code. Please either disable 2FA or set auto_relogin to False") # Derive passwords auth_key, crypto_key = self._derive_passwords(password) # logger.debug("Auth, crypto key: %s %s" % (auth_key, crypto_key)) # Login self.cas_api_cookie, response = login(username, auth_key) self.chain_indices = {} # if 2fa required, we have to login with that too if twofa_code: two_factor_response = self.do_two_factor_auth(twofa_code) self.account = two_factor_response.account for wallet in self.account.wallets: self.chain_indices[wallet.blockchain] = wallet.chain_index else: # Save login response items for chain_index in response["chain_index"]: self.chain_indices[chain_index["blockchain"]] = chain_index["height"] self.account = response["account"] encrypted_secret_key = self.account["encrypted_secret_key"] encrypted_secret_key_nonce = self.account["encrypted_secret_key_nonce"] encrypted_secret_key_tag = self.account["encrypted_secret_key_tag"] # If encrypted secret key is not present, we have to create it if not encrypted_secret_key or encrypted_secret_key is None: if settings.AUTO_CREATE_KEYS_IF_NOT_EXISTS: self._core_create_keys_and_upload(password, crypto_key) return else: raise Exception("Login successful but user does not yet have wallet/keys!") # If keys came back from the CAS, use them to initialise the API self._core_init({ "passphrase": '', "secretKey": encrypted_secret_key, "secretTag": encrypted_secret_key_tag, "secretNonce": encrypted_secret_key_nonce, "encryptionKey": crypto_key, "enginePubkey": "dummy", "chainIndices": self.chain_indices }) self.last_login_time = time.time() # Finally, if auto-relogin is enabled, remember the login details self.auto_relogin = auto_relogin self.username = username self.password = password
def _core_create_keys_and_upload(self, passphrase: str, encryption_key: str): """ Create initial set of keys and upload them """ # Create new initial key res = nash_core.new_encryped_secret_key(passphrase, encryption_key) init_params = { "passphrase": '', "secretKey": res["encryptedKey"], "secretTag": res["tag"], "secretNonce": res["nonce"], "encryptionKey": encryption_key, "enginePubkey": "dummy", "chainIndices": {"neo": 1, "eth": 1} } logger.debug("Created new encrypted key set, uploading to CAS") self._core_init(init_params) res = self.do_add_wallets_and_keys(self.nash_core_config, res, self.cas_api_cookie) logger.debug("Uploaded encrypted key set to CAS") def _core_init(self, nash_core_init_params): """ Initialise the core library and save the signing pubkey for easy reuse. Does a list-markets GQL requests to receive asset and market data. """ self.nash_core_init_params = nash_core_init_params # we need to gather asset and market data # for initilaizing the nash core go library self._load_markets() self.nash_core_init_params['assetData'] = self._format_core_asset_data() self.nash_core_init_params['marketData'] = self._format_core_market_data() self.nash_core_config = nash_core.initialize(nash_core_init_params) self.signing_pubkey = self.nash_core_config["PayloadSigning"]["PublicKey"] self.get_all_asset_nonces()
[docs] def logout(self): """ Logs the user out. Currently only clears local variables and tokens. Perhaps later we'd also issue a logout request to the CAS Returns: None """ self.account = None self.chain_indices = None self.chain_indices_core = None self.nash_core_config = None self.nash_core_init_params = None self.cas_api_cookie = None
def _exec_gql_query(self, op): headers = {} if self.cas_api_cookie: headers['Cookie'] = self.cas_api_cookie if settings.DEBUG_GQL_REQUEST: print(op) data = self.gql_endpoint(op, extra_headers=headers) if settings.DEBUG_GQL_RESPONSE: print(data) errors = data.get('errors') if errors: # Check if error is "unauthorized", which means token is invalid/expired if len(errors) == 1 and "message" in errors[0] and errors[0]["message"].lower() == "unauthorized": print("Detected GQL error 'unauthorized'...") # If auto_relogin is set, retry login once. On success: retry query, on error: raise Exception if self.auto_relogin and self.is_logged_in and time.time() - self.last_login_time > 10: print("Trying RELOGIN...") self.login(self.username, self.password, twofa_code=None, auto_relogin=True) return self._exec_gql_query(op) # Normal GraphQl error path: raise GraphQlError(errors) return (op + data) def api_urlopen(self, req, timeout=None): req.headers['User-Agent'] = UA_STRING return build_opener(HTTPCookieProcessor(cookiejar=self.cookie_jar)).open(req, timeout=timeout) def address_for_currency(self, currency: str) -> str: for assetname, asset in self.market_assets.items(): if asset.symbol == currency.lower(): blockchain = asset.blockchain return self.nash_core_config["Wallets"][blockchain]["Address"] raise Exception("No wallet found for currency %s " % currency)
[docs] def get_tokens_and_scripthashes(self) -> dict: """ Return a list of tokens with info about chain and scripthash """ # Build a list of all markets self._load_markets() market_tokens = self._get_traded_assets() # Create the resulting dictionary blockchains_and_tokens = defaultdict(dict) # Traverse all tokens and add those which have markets to the result dict for balance in self.list_account_balances(): asset = balance.asset asset_symbol = asset.symbol.lower() if asset_symbol in market_tokens: blockchains_and_tokens[asset.blockchain][asset_symbol] = asset.hash return dict(blockchains_and_tokens)
[docs] def get_all_asset_nonces(self): """Updates internal list of asset/nonce sets for use in trading and adding movements. """ traded_assets = self._get_traded_assets() try: new_nonces = {} nonces = self.get_assets_nonces(traded_assets) for asset_nonce in nonces: new_nonces[asset_nonce.asset] = asset_nonce.nonces self.asset_nonces = new_nonces except Exception as error: logger.warn("Could not get asset nonces %s " % error)
def _get_traded_assets(self): market_tokens = set() for market in self._markets_raw: market_tokens.add(market.a_unit.lower()) market_tokens.add(market.b_unit.lower()) return list(market_tokens) def _load_markets(self, force_update=False): """ In order to initialize the Go library, the API needs to know about the markets (e.g. for the precisions) """ # If already loaded, skip if not force_update if self._markets_raw and self.market_assets and not force_update: return self._markets_raw = self.list_markets(with_assets=True) self.markets = {} self.market_assets = {} for market in self._markets_raw: self.markets[market.name] = market if market.a_asset.name not in self.market_assets: self.market_assets[market.a_asset.name] = market.a_asset if market.b_asset.name not in self.market_assets: self.market_assets[market.b_asset.name] = market.b_asset def _format_core_market_data(self) -> dict: if not self._markets_raw: self._load_markets() output = {} for name, market in self.markets.items(): output[name] = { 'minTickSize': market_precision_to_num_decimals(market.min_tick_size), 'minTradeSize': market_precision_to_num_decimals(market.min_trade_size), 'minTradeIncrement': market_precision_to_num_decimals(market.min_trade_increment), 'minTradeIncrementB': market_precision_to_num_decimals(market.min_trade_increment_b) } return output def _format_core_asset_data(self) -> dict: if not self.market_assets: self._load_markets() tradeable_output = {} for asset in self.market_assets.values(): tradeable_output[asset.symbol.lower()] = { "name": asset.name, "hash": asset.hash, "precision": asset.blockchain_precision, "blockchain": asset.blockchain } return tradeable_output