diff --git a/lbrynet/core/Wallet.py b/lbrynet/core/Wallet.py index 3052fdce8..eba48ed0f 100644 --- a/lbrynet/core/Wallet.py +++ b/lbrynet/core/Wallet.py @@ -4,27 +4,23 @@ import datetime import logging from decimal import Decimal -import treq from zope.interface import implements from twisted.internet import threads, reactor, defer, task from twisted.python.failure import Failure from twisted.internet.error import ConnectionAborted -from hashlib import sha256 from lbryum import wallet as lbryum_wallet from lbryum.network import Network from lbryum.simple_config import SimpleConfig from lbryum.constants import COIN from lbryum.commands import Commands from lbryum.errors import InvalidPassword -from lbryum.constants import HEADERS_URL, HEADER_SIZE from lbryschema.uri import parse_lbry_uri from lbryschema.claim import ClaimDict from lbryschema.error import DecodeError from lbryschema.decode import smart_decode -from lbrynet.txlbryum.factory import StratumClient from lbrynet.interfaces import IRequestCreator, IQueryHandlerFactory, IQueryHandler, IWallet from lbrynet.core.utils import DeferredDict from lbrynet.core.client.ClientRequest import ClientRequest @@ -92,107 +88,8 @@ class Wallet(object): self._batch_count = 20 self._pending_claim_checker = task.LoopingCall(self.fetch_and_save_heights_for_pending_claims) - @defer.inlineCallbacks - def fetch_headers_from_s3(self): - local_header_size = self.local_header_file_size() - resume_header = {"Range": "bytes={}-".format(local_header_size)} - response = yield treq.get(HEADERS_URL, headers=resume_header) - got_406 = response.code == 406 # our file is bigger - final_size_after_download = response.length + local_header_size - if got_406: - log.warning("s3 is more out of date than we are") - # should have something to download and a final length divisible by the header size - elif final_size_after_download and not final_size_after_download % HEADER_SIZE: - s3_height = (final_size_after_download / HEADER_SIZE) - 1 - local_height = self.local_header_file_height() - if s3_height > local_height: - if local_header_size: - log.info("Resuming download of %i bytes from s3", response.length) - with open(os.path.join(self.config.path, "blockchain_headers"), "a+b") as headers_file: - yield treq.collect(response, headers_file.write) - else: - with open(os.path.join(self.config.path, "blockchain_headers"), "wb") as headers_file: - yield treq.collect(response, headers_file.write) - log.info("fetched headers from s3 (s3 height: %i), now verifying integrity after download.", s3_height) - self._check_header_file_integrity() - else: - log.warning("s3 is more out of date than we are") - else: - log.error("invalid size for headers from s3") - - def local_header_file_height(self): - return max((self.local_header_file_size() / HEADER_SIZE) - 1, 0) - - def local_header_file_size(self): - headers_path = os.path.join(self.config.path, "blockchain_headers") - if os.path.isfile(headers_path): - return os.stat(headers_path).st_size - return 0 - - @defer.inlineCallbacks - def get_remote_height(self, server, port): - connected = defer.Deferred() - connected.addTimeout(3, reactor, lambda *_: None) - client = StratumClient(connected) - reactor.connectTCP(server, port, client) - yield connected - remote_height = yield client.blockchain_block_get_server_height() - client.client.transport.loseConnection() - defer.returnValue(remote_height) - - @defer.inlineCallbacks - def should_download_headers_from_s3(self): - from lbrynet import conf - if conf.settings['blockchain_name'] != "lbrycrd_main": - defer.returnValue(False) - self._check_header_file_integrity() - s3_headers_depth = conf.settings['s3_headers_depth'] - if not s3_headers_depth: - defer.returnValue(False) - local_height = self.local_header_file_height() - for server_url in self.config.get('default_servers'): - port = int(self.config.get('default_servers')[server_url]['t']) - try: - remote_height = yield self.get_remote_height(server_url, port) - log.info("%s:%i height: %i, local height: %s", server_url, port, remote_height, local_height) - if remote_height > (local_height + s3_headers_depth): - defer.returnValue(True) - except Exception as err: - log.warning("error requesting remote height from %s:%i - %s", server_url, port, err) - defer.returnValue(False) - - def _check_header_file_integrity(self): - # TODO: temporary workaround for usability. move to txlbryum and check headers instead of file integrity - from lbrynet import conf - if conf.settings['blockchain_name'] != "lbrycrd_main": - return - hashsum = sha256() - checksum_height, checksum = conf.settings['HEADERS_FILE_SHA256_CHECKSUM'] - checksum_length_in_bytes = checksum_height * HEADER_SIZE - if self.local_header_file_size() < checksum_length_in_bytes: - return - headers_path = os.path.join(self.config.path, "blockchain_headers") - with open(headers_path, "rb") as headers_file: - hashsum.update(headers_file.read(checksum_length_in_bytes)) - current_checksum = hashsum.hexdigest() - if current_checksum != checksum: - msg = "Expected checksum {}, got {}".format(checksum, current_checksum) - log.warning("Wallet file corrupted, checksum mismatch. " + msg) - log.warning("Deleting header file so it can be downloaded again.") - os.unlink(headers_path) - elif (self.local_header_file_size() % HEADER_SIZE) != 0: - log.warning("Header file is good up to checkpoint height, but incomplete. Truncating to checkpoint.") - with open(headers_path, "rb+") as headers_file: - headers_file.truncate(checksum_length_in_bytes) - @defer.inlineCallbacks def start(self): - should_download_headers = yield self.should_download_headers_from_s3() - if should_download_headers: - try: - yield self.fetch_headers_from_s3() - except Exception as err: - log.error("failed to fetch headers from s3: %s", err) log.info("Starting wallet.") yield self._start() self.stopped = False diff --git a/lbrynet/daemon/Components.py b/lbrynet/daemon/Components.py index 620ad9a20..19183411f 100644 --- a/lbrynet/daemon/Components.py +++ b/lbrynet/daemon/Components.py @@ -1,8 +1,12 @@ import os import logging +from hashlib import sha256 import miniupnpc +import treq +import math from twisted.internet import defer, threads, reactor, error - +from lbryum.simple_config import SimpleConfig +from lbryum.constants import HEADERS_URL, HEADER_SIZE from lbrynet import conf from lbrynet.core.Session import Session from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, EncryptedFileStreamType @@ -17,6 +21,7 @@ from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileSaverFactory from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier from lbrynet.reflector import ServerFactory as reflector_server_factory +from lbrynet.txlbryum.factory import StratumClient from lbrynet.core.utils import generate_id @@ -25,6 +30,7 @@ log = logging.getLogger(__name__) # settings must be initialized before this file is imported DATABASE_COMPONENT = "database" +HEADERS_COMPONENT = "blockchain_headers" WALLET_COMPONENT = "wallet" SESSION_COMPONENT = "session" DHT_COMPONENT = "dht" @@ -35,6 +41,24 @@ PEER_PROTOCOL_SERVER_COMPONENT = "peer_protocol_server" REFLECTOR_COMPONENT = "reflector" UPNP_COMPONENT = "upnp" EXCHANGE_RATE_MANAGER_COMPONENT = "exchange_rate_manager" +def get_wallet_config(): + wallet_type = GCS('wallet') + if wallet_type == conf.LBRYCRD_WALLET: + raise ValueError('LBRYcrd Wallet is no longer supported') + elif wallet_type != conf.LBRYUM_WALLET: + raise ValueError('Wallet Type {} is not valid'.format(wallet_type)) + lbryum_servers = {address: {'t': str(port)} + for address, port in GCS('lbryum_servers')} + config = { + 'auto_connect': True, + 'chain': GCS('blockchain_name'), + 'default_servers': lbryum_servers + } + if 'use_keyring' in conf.settings: + config['use_keyring'] = GCS('use_keyring') + if conf.settings['lbryum_wallet_dir']: + config['lbryum_path'] = GCS('lbryum_wallet_dir') + return config class ConfigSettings(object): @@ -138,9 +162,142 @@ class DatabaseComponent(Component): self.storage = None +class HeadersComponent(Component): + component_name = HEADERS_COMPONENT + + def __init__(self, component_manager): + Component.__init__(self, component_manager) + self.config = SimpleConfig(get_wallet_config()) + self._downloading_headers = None + self._headers_progress_percent = None + + @property + def component(self): + return self + + def get_status(self): + if self._downloading_headers is None: + return {} + return { + 'downloading_headers': self._downloading_headers, + 'download_progress': self._headers_progress_percent + } + + @defer.inlineCallbacks + def fetch_headers_from_s3(self): + def collector(data, h_file): + h_file.write(data) + local_size = float(h_file.tell()) + final_size = float(final_size_after_download) + self._headers_progress_percent = math.ceil(local_size / final_size * 100) + + local_header_size = self.local_header_file_size() + resume_header = {"Range": "bytes={}-".format(local_header_size)} + response = yield treq.get(HEADERS_URL, headers=resume_header) + got_406 = response.code == 406 # our file is bigger + final_size_after_download = response.length + local_header_size + if got_406: + log.warning("s3 is more out of date than we are") + # should have something to download and a final length divisible by the header size + elif final_size_after_download and not final_size_after_download % HEADER_SIZE: + s3_height = (final_size_after_download / HEADER_SIZE) - 1 + local_height = self.local_header_file_height() + if s3_height > local_height: + if local_header_size: + log.info("Resuming download of %i bytes from s3", response.length) + with open(os.path.join(self.config.path, "blockchain_headers"), "a+b") as headers_file: + yield treq.collect(response, lambda d: collector(d, headers_file)) + else: + with open(os.path.join(self.config.path, "blockchain_headers"), "wb") as headers_file: + yield treq.collect(response, lambda d: collector(d, headers_file)) + log.info("fetched headers from s3 (s3 height: %i), now verifying integrity after download.", s3_height) + self._check_header_file_integrity() + else: + log.warning("s3 is more out of date than we are") + else: + log.error("invalid size for headers from s3") + + def local_header_file_height(self): + return max((self.local_header_file_size() / HEADER_SIZE) - 1, 0) + + def local_header_file_size(self): + headers_path = os.path.join(self.config.path, "blockchain_headers") + if os.path.isfile(headers_path): + return os.stat(headers_path).st_size + return 0 + + @defer.inlineCallbacks + def get_remote_height(self, server, port): + connected = defer.Deferred() + connected.addTimeout(3, reactor, lambda *_: None) + client = StratumClient(connected) + reactor.connectTCP(server, port, client) + yield connected + remote_height = yield client.blockchain_block_get_server_height() + client.client.transport.loseConnection() + defer.returnValue(remote_height) + + @defer.inlineCallbacks + def should_download_headers_from_s3(self): + from lbrynet import conf + if conf.settings['blockchain_name'] != "lbrycrd_main": + defer.returnValue(False) + self._check_header_file_integrity() + s3_headers_depth = conf.settings['s3_headers_depth'] + if not s3_headers_depth: + defer.returnValue(False) + local_height = self.local_header_file_height() + for server_url in self.config.get('default_servers'): + port = int(self.config.get('default_servers')[server_url]['t']) + try: + remote_height = yield self.get_remote_height(server_url, port) + log.info("%s:%i height: %i, local height: %s", server_url, port, remote_height, local_height) + if remote_height > (local_height + s3_headers_depth): + defer.returnValue(True) + except Exception as err: + log.warning("error requesting remote height from %s:%i - %s", server_url, port, err) + defer.returnValue(False) + + def _check_header_file_integrity(self): + # TODO: temporary workaround for usability. move to txlbryum and check headers instead of file integrity + from lbrynet import conf + if conf.settings['blockchain_name'] != "lbrycrd_main": + return + hashsum = sha256() + checksum_height, checksum = conf.settings['HEADERS_FILE_SHA256_CHECKSUM'] + checksum_length_in_bytes = checksum_height * HEADER_SIZE + if self.local_header_file_size() < checksum_length_in_bytes: + return + headers_path = os.path.join(self.config.path, "blockchain_headers") + with open(headers_path, "rb") as headers_file: + hashsum.update(headers_file.read(checksum_length_in_bytes)) + current_checksum = hashsum.hexdigest() + if current_checksum != checksum: + msg = "Expected checksum {}, got {}".format(checksum, current_checksum) + log.warning("Wallet file corrupted, checksum mismatch. " + msg) + log.warning("Deleting header file so it can be downloaded again.") + os.unlink(headers_path) + elif (self.local_header_file_size() % HEADER_SIZE) != 0: + log.warning("Header file is good up to checkpoint height, but incomplete. Truncating to checkpoint.") + with open(headers_path, "rb+") as headers_file: + headers_file.truncate(checksum_length_in_bytes) + + @defer.inlineCallbacks + def start(self): + self._downloading_headers = yield self.should_download_headers_from_s3() + if self._downloading_headers: + try: + yield self.fetch_headers_from_s3() + except Exception as err: + log.error("failed to fetch headers from s3: %s", err) + + def stop(self): + return defer.succeed(None) + + class WalletComponent(Component): component_name = WALLET_COMPONENT - depends_on = [DATABASE_COMPONENT] + depends_on = [DATABASE_COMPONENT, HEADERS_COMPONENT] def __init__(self, component_manager): Component.__init__(self, component_manager) @@ -150,34 +307,26 @@ class WalletComponent(Component): def component(self): return self.wallet + @defer.inlineCallbacks + def get_status(self): + if not self.wallet: + return + local_height = self.wallet.network.get_local_height() + remote_height = self.wallet.network.get_server_height() + best_hash = yield self.wallet.get_best_blockhash() + defer.returnValue({ + 'blocks': local_height, + 'blocks_behind': remote_height - local_height, + 'best_blockhash': best_hash, + 'is_encrypted': self.wallet.wallet.use_encryption + }) + @defer.inlineCallbacks def start(self): storage = self.component_manager.get_component(DATABASE_COMPONENT) - wallet_type = GCS('wallet') - - if wallet_type == conf.LBRYCRD_WALLET: - raise ValueError('LBRYcrd Wallet is no longer supported') - elif wallet_type == conf.LBRYUM_WALLET: - - log.info("Using lbryum wallet") - - lbryum_servers = {address: {'t': str(port)} - for address, port in GCS('lbryum_servers')} - - config = { - 'auto_connect': True, - 'chain': GCS('blockchain_name'), - 'default_servers': lbryum_servers - } - - if 'use_keyring' in conf.settings: - config['use_keyring'] = GCS('use_keyring') - if conf.settings['lbryum_wallet_dir']: - config['lbryum_path'] = GCS('lbryum_wallet_dir') - self.wallet = LBRYumWallet(storage, config) - yield self.wallet.start() - else: - raise ValueError('Wallet Type {} is not valid'.format(wallet_type)) + config = get_wallet_config() + self.wallet = LBRYumWallet(storage, config) + yield self.wallet.start() @defer.inlineCallbacks def stop(self): diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index e551ab6cc..6b99a101b 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -727,29 +727,12 @@ class Daemon(AuthJSONRPCServer): 'code': connection_code, 'message': CONNECTION_MESSAGES[connection_code], }, - 'wallet_is_encrypted': wallet_is_encrypted, - 'blocks_behind': remote_height - local_height, # deprecated. remove from UI, then here - 'blockchain_status': { - 'blocks': local_height, - 'blocks_behind': remote_height - local_height, - 'best_blockhash': best_hash, - }, - 'dht_node_status': { - 'node_id': conf.settings.node_id.encode('hex'), - 'peers_in_routing_table': 0 if not self.component_manager.all_components_running(DHT_COMPONENT) else - len(self.dht_node.contacts) - } } - if session_status: - blobs = yield self.session.blob_manager.get_all_verified_blobs() - announce_queue_size = self.session.hash_announcer.hash_queue_size() - should_announce_blobs = yield self.session.blob_manager.count_should_announce_blobs() - response['session_status'] = { - 'managed_blobs': len(blobs), - 'managed_streams': len(self.file_manager.lbry_files), - 'announce_queue_size': announce_queue_size, - 'should_announce_blobs': should_announce_blobs, - } + for component in self.component_manager.components: + status = yield defer.maybeDeferred(component.get_status) + if status: + response[component.component_name] = status + defer.returnValue(response) def jsonrpc_version(self):