From 2190f4ac85737b4587730a3e8f41db9c59c5f880 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sun, 12 Feb 2017 20:57:23 -0500 Subject: [PATCH] refactor GetStream -convert to inline callbacks -return more errors than just timeouts -delete on timeout -have a more understandable return condition (first data blob having downloaded) --- .../EncryptedFileDownloader.py | 2 +- lbrynet/lbrynet_daemon/Daemon.py | 48 ++-- lbrynet/lbrynet_daemon/Downloader.py | 263 +++++++++--------- 3 files changed, 167 insertions(+), 146 deletions(-) diff --git a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py index 83676c1b3..5b69d982d 100644 --- a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py +++ b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py @@ -47,7 +47,7 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): @defer.inlineCallbacks def restore(self): - sd_hash = yield self.stream_info_manager._get_sd_blob_hashes_for_stream(self.stream_hash) + sd_hash = yield self.stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash) if sd_hash: self.sd_hash = sd_hash[0] else: diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 3dad9195c..eb94e2e0d 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -763,8 +763,12 @@ class Daemon(AuthJSONRPCServer): """ timeout = timeout if timeout is not None else conf.settings['download_timeout'] - helper = _DownloadNameHelper( - self, name, timeout, download_directory, file_name, wait_for_write) + try: + helper = _DownloadNameHelper(self, name, timeout, download_directory, file_name, + wait_for_write) + except Exception as err: + log.exception(err) + raise err if not stream_info: self.waiting_on[name] = True @@ -806,8 +810,7 @@ class Daemon(AuthJSONRPCServer): timeout=timeout, download_directory=download_directory, file_name=file_name) - d = self.streams[name].start(stream_info, name) - return d + return self.streams[name].start(stream_info, name) def _get_long_count_timestamp(self): dt = utils.utcnow() - utils.datetime_obj(year=2012, month=12, day=21) @@ -842,7 +845,7 @@ class Daemon(AuthJSONRPCServer): if stream_count == 0: yield self.stream_info_manager.delete_stream(stream_hash) else: - log.warning("Can't delete stream info for %s", stream_hash) + log.warning("Can't delete stream info for %s, count is %i", stream_hash, stream_count) if delete_file: if os.path.isfile(filename): os.remove(filename) @@ -1490,8 +1493,7 @@ class Daemon(AuthJSONRPCServer): max_tries = 3 while tries <= max_tries: try: - log.info( - 'Making try %s / %s to start download of %s', tries, max_tries, name) + log.info('Making try %s / %s to start download of %s', tries, max_tries, name) new_sd_hash, file_path = yield self._download_name( name=name, timeout=timeout, @@ -1502,10 +1504,10 @@ class Daemon(AuthJSONRPCServer): ) break except Exception as e: - log.exception('Failed to get %s', name) + log.warning('Failed to get %s', name) if tries == max_tries: self.analytics_manager.send_download_errored(download_id, name, stream_info) - response = yield self._render_response(str(e)) + response = yield self._render_response(e.message) defer.returnValue(response) tries += 1 # TODO: should stream_hash key be changed to sd_hash? @@ -1515,7 +1517,7 @@ class Daemon(AuthJSONRPCServer): } stream = self.streams.get(name) if stream: - stream.downloader.finished_deferred.addCallback( + stream.finished_deferred.addCallback( lambda _: self.analytics_manager.send_download_finished( download_id, name, stream_info) ) @@ -2326,9 +2328,7 @@ def get_sd_hash(stream_info): class _DownloadNameHelper(object): - def __init__(self, daemon, name, - timeout=None, - download_directory=None, file_name=None, + def __init__(self, daemon, name, timeout=None, download_directory=None, file_name=None, wait_for_write=True): self.daemon = daemon self.name = name @@ -2378,16 +2378,24 @@ class _DownloadNameHelper(object): @defer.inlineCallbacks def _get_stream(self, stream_info): - was_successful, sd_hash, download_path = yield self.daemon.add_stream( - self.name, self.timeout, self.download_directory, self.file_name, stream_info) - if not was_successful: - log.warning("lbry://%s timed out, removing from streams", self.name) + try: + download_path = yield self.daemon.add_stream( + self.name, self.timeout, self.download_directory, self.file_name, stream_info) + except (InsufficientFundsError, Exception) as err: + if Failure(err).check(InsufficientFundsError): + log.warning("Insufficient funds to download lbry://%s", self.name) + self.remove_from_wait("Insufficient funds") + else: + log.warning("lbry://%s timed out, removing from streams", self.name) + self.remove_from_wait("Timed out") + if self.daemon.streams[self.name].downloader is not None: + yield self.daemon._delete_lbry_file(self.daemon.streams[self.name].downloader) del self.daemon.streams[self.name] - self.remove_from_wait("Timed out") - raise Exception("Timed out") + raise err + if self.wait_for_write: yield self._wait_for_write() - defer.returnValue((sd_hash, download_path)) + defer.returnValue((self.daemon.streams[self.name].sd_hash, download_path)) def _wait_for_write(self): d = defer.succeed(None) diff --git a/lbrynet/lbrynet_daemon/Downloader.py b/lbrynet/lbrynet_daemon/Downloader.py index af2beb4bd..abcc351ac 100644 --- a/lbrynet/lbrynet_daemon/Downloader.py +++ b/lbrynet/lbrynet_daemon/Downloader.py @@ -1,7 +1,5 @@ import logging import os - -from copy import deepcopy from twisted.internet import defer from twisted.internet.task import LoopingCall @@ -15,7 +13,6 @@ INITIALIZING_CODE = 'initializing' DOWNLOAD_METADATA_CODE = 'downloading_metadata' DOWNLOAD_TIMEOUT_CODE = 'timeout' DOWNLOAD_RUNNING_CODE = 'running' -# TODO: is this ever used? DOWNLOAD_STOPPED_CODE = 'stopped' STREAM_STAGES = [ (INITIALIZING_CODE, 'Initializing'), @@ -29,144 +26,160 @@ STREAM_STAGES = [ log = logging.getLogger(__name__) +def safe_start(looping_call): + if not looping_call.running: + looping_call.start(1) + + +def safe_stop(looping_call): + if looping_call.running: + looping_call.stop() + + class GetStream(object): - def __init__(self, sd_identifier, session, wallet, - lbry_file_manager, exchange_rate_manager, - max_key_fee, data_rate=0.5, timeout=None, - download_directory=None, file_name=None): - if timeout is None: - timeout = conf.settings['download_timeout'] - self.wallet = wallet - self.resolved_name = None - self.description = None - self.fee = None - self.data_rate = data_rate + def __init__(self, sd_identifier, session, wallet, lbry_file_manager, exchange_rate_manager, + max_key_fee, data_rate=None, timeout=None, download_directory=None, + file_name=None): + self.timeout = timeout or conf.settings['download_timeout'] + self.data_rate = data_rate or conf.settings['data_rate'] + self.max_key_fee = max_key_fee or conf.settings['max_key_fee'][1] + self.download_directory = download_directory or conf.settings['download_directory'] self.file_name = file_name + self.timeout_counter = 0 + self.code = None + self.sd_hash = None + self.wallet = wallet self.session = session self.exchange_rate_manager = exchange_rate_manager self.payment_rate_manager = self.session.payment_rate_manager self.lbry_file_manager = lbry_file_manager self.sd_identifier = sd_identifier - self.sd_hash = None - self.max_key_fee = max_key_fee - self.stream_info = None - self.stream_info_manager = None - self._d = defer.Deferred(None) - self.timeout = timeout - self.timeout_counter = 0 - self.download_directory = download_directory - self.download_path = None self.downloader = None - # fired after the metadata has been downloaded and the - # actual file has been started - self.finished = defer.Deferred(None) self.checker = LoopingCall(self.check_status) - self.code = STREAM_STAGES[0] + + # fired when the download is complete + self.finished_deferred = defer.Deferred(None) + # fired after the metadata and the first data blob have been downloaded + self.data_downloading_deferred = defer.Deferred(None) + + @property + def download_path(self): + return os.path.join(self.download_directory, self.downloader.file_name) + + def _check_status(self, status): + if status.num_completed and not self.data_downloading_deferred.called: + self.data_downloading_deferred.callback(True) + if self.data_downloading_deferred.called: + safe_stop(self.checker) + else: + log.info("Downloading stream data (%i seconds)", self.timeout_counter) def check_status(self): + """ + Check if we've got the first data blob in the stream yet + """ + self.timeout_counter += 1 + if self.timeout_counter >= self.timeout: + if not self.data_downloading_deferred.called: + self.data_downloading_deferred.errback(Exception("Timeout")) + safe_stop(self.checker) + elif self.downloader: + d = self.downloader.status() + d.addCallback(self._check_status) + else: + log.info("Downloading stream descriptor blob (%i seconds)", self.timeout_counter) - # download_path is set after the sd blob has been downloaded - if self.download_path: - self.checker.stop() - self.finished.callback((True, self.sd_hash, self.download_path)) - - elif self.timeout_counter >= self.timeout: - log.info("Timeout downloading lbry://%s", self.resolved_name) - self.checker.stop() - self._d.cancel() - self.code = STREAM_STAGES[4] - self.finished.callback((False, None, None)) - - def _convert_max_fee(self): + def convert_max_fee(self): max_fee = FeeValidator(self.max_key_fee) if max_fee.currency_symbol == "LBC": return max_fee.amount return self.exchange_rate_manager.to_lbc(self.max_key_fee).amount + def set_status(self, status, name): + log.info("Download lbry://%s status changed to %s" % (name, status)) + self.code = next(s for s in STREAM_STAGES if s[0] == status) + + def check_fee(self, fee): + validated_fee = FeeValidator(fee) + max_key_fee = self.convert_max_fee() + converted_fee = self.exchange_rate_manager.to_lbc(validated_fee).amount + if converted_fee > self.wallet.get_balance(): + raise InsufficientFundsError('Unable to pay the key fee of %s' % converted_fee) + if converted_fee > max_key_fee: + raise KeyFeeAboveMaxAllowed('Key fee %s above max allowed %s' % (converted_fee, + max_key_fee)) + return validated_fee + + def get_downloader_factory(self, factories): + for factory in factories: + if isinstance(factory, ManagedEncryptedFileDownloaderFactory): + return factory + raise Exception('No suitable factory was found in {}'.format(factories)) + + @defer.inlineCallbacks + def get_downloader(self, factory, stream_metadata): + downloader_options = [self.data_rate, True] + downloader = yield factory.make_downloader(stream_metadata, downloader_options, + self.payment_rate_manager, + download_directory=self.download_directory, + file_name=self.file_name) + defer.returnValue(downloader) + + def _pay_key_fee(self, address, fee_lbc, name): + log.info("Pay key fee %f --> %s", fee_lbc, address) + reserved_points = self.wallet.reserve_points(address, fee_lbc) + if reserved_points is None: + raise InsufficientFundsError('Unable to pay the key fee of %s for %s' % (fee_lbc, name)) + return self.wallet.send_points_to_address(reserved_points, fee_lbc) + + @defer.inlineCallbacks + def pay_key_fee(self, fee, name): + if fee is not None: + fee_lbc = self.exchange_rate_manager.to_lbc(fee).amount + yield self._pay_key_fee(fee.address, fee_lbc, name) + else: + defer.returnValue(None) + + @defer.inlineCallbacks + def finish(self, results, name): + self.set_status(DOWNLOAD_STOPPED_CODE, name) + log.info("Finished downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], + self.download_path) + safe_stop(self.checker) + status = yield self.downloader.status() + self._check_status(status) + defer.returnValue(self.download_path) + + @defer.inlineCallbacks + def download(self, stream_info, name): + self.set_status(INITIALIZING_CODE, name) + self.sd_hash = stream_info['sources']['lbry_sd_hash'] + if 'fee' in stream_info: + fee = self.check_fee(stream_info['fee']) + else: + fee = None + + self.set_status(DOWNLOAD_METADATA_CODE, name) + sd_blob = yield download_sd_blob(self.session, self.sd_hash, self.payment_rate_manager) + stream_metadata = yield self.sd_identifier.get_metadata_for_sd_blob(sd_blob) + factory = self.get_downloader_factory(stream_metadata.factories) + self.downloader = yield self.get_downloader(factory, stream_metadata) + + self.set_status(DOWNLOAD_RUNNING_CODE, name) + if fee: + yield self.pay_key_fee(fee, name) + log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path) + self.finished_deferred = self.downloader.start() + self.finished_deferred.addCallback(self.finish, name) + yield self.data_downloading_deferred + + @defer.inlineCallbacks def start(self, stream_info, name): - def _cancel(err): - # this callback sequence gets cancelled in check_status if - # it takes too long when that happens, we want the logic - # to live in check_status - if err.check(defer.CancelledError): - return - if self.checker: - self.checker.stop() - self.finished.errback(err) - - def _set_status(x, status): - log.info("Download lbry://%s status changed to %s" % (self.resolved_name, status)) - self.code = next(s for s in STREAM_STAGES if s[0] == status) - return x - - def get_downloader_factory(metadata): - for factory in metadata.factories: - if isinstance(factory, ManagedEncryptedFileDownloaderFactory): - return factory, metadata - raise Exception('No suitable factory was found in {}'.format(metadata.factories)) - - def make_downloader(args): - factory, metadata = args - return factory.make_downloader(metadata, - [self.data_rate, True], - self.payment_rate_manager, - download_directory=self.download_directory, - file_name=self.file_name) - - self.resolved_name = name - self.stream_info = deepcopy(stream_info) - self.description = self.stream_info['description'] - self.sd_hash = self.stream_info['sources']['lbry_sd_hash'] - - if 'fee' in self.stream_info: - self.fee = FeeValidator(self.stream_info['fee']) - max_key_fee = self._convert_max_fee() - converted_fee = self.exchange_rate_manager.to_lbc(self.fee).amount - if converted_fee > self.wallet.get_balance(): - msg = "Insufficient funds to download lbry://{}. Need {:0.2f}, have {:0.2f}".format( - self.resolved_name, converted_fee, self.wallet.get_balance()) - raise InsufficientFundsError(msg) - if converted_fee > max_key_fee: - msg = "Key fee {:0.2f} above limit of {:0.2f} didn't download lbry://{}".format( - converted_fee, max_key_fee, self.resolved_name) - raise KeyFeeAboveMaxAllowed(msg) - log.info( - "Key fee %f below limit of %f, downloading lbry://%s", - converted_fee, max_key_fee, self.resolved_name) - - self.checker.start(1) - - self._d.addCallback(lambda _: _set_status(None, DOWNLOAD_METADATA_CODE)) - self._d.addCallback(lambda _: download_sd_blob( - self.session, self.sd_hash, self.payment_rate_manager)) - self._d.addCallback(self.sd_identifier.get_metadata_for_sd_blob) - self._d.addCallback(lambda r: _set_status(r, DOWNLOAD_RUNNING_CODE)) - self._d.addCallback(get_downloader_factory) - self._d.addCallback(make_downloader) - self._d.addCallbacks(self._start_download, _cancel) - self._d.callback(None) - - return self.finished - - def _start_download(self, downloader): - log.info('Starting download for %s', self.resolved_name) - self.downloader = downloader - self.download_path = os.path.join(downloader.download_directory, downloader.file_name) - - d = self._pay_key_fee() - d.addCallback(lambda _: log.info( - "Downloading %s --> %s", self.sd_hash, self.downloader.file_name)) - d.addCallback(lambda _: self.downloader.start()) - - def _pay_key_fee(self): - if self.fee is not None: - fee_lbc = self.exchange_rate_manager.to_lbc(self.fee).amount - reserved_points = self.wallet.reserve_points(self.fee.address, fee_lbc) - if reserved_points is None: - log.warning('Unable to pay the key fee of %s for %s', fee_lbc, self.resolved_name) - # TODO: If we get here, nobody will know that there was an error - # as nobody actually cares about self._d - return defer.fail(InsufficientFundsError()) - return self.wallet.send_points_to_address(reserved_points, fee_lbc) - return defer.succeed(None) + try: + safe_start(self.checker) + yield self.download(stream_info, name) + defer.returnValue(self.download_path) + except Exception as err: + safe_stop(self.checker) + raise err