From 8de6bd7c7a2fecc4d786f70b28ccb8d870d5c41d Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 9 Feb 2017 13:21:31 -0500 Subject: [PATCH] convert EncryptedFileDownloader to inlineCallbacks --- .../EncryptedFileDownloader.py | 154 +++++++----------- 1 file changed, 56 insertions(+), 98 deletions(-) diff --git a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py index 85bc46dd2..4c5f1938d 100644 --- a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py +++ b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py @@ -48,57 +48,36 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): def saving_status(self): return self._saving_status + @defer.inlineCallbacks def restore(self): - d = self.stream_info_manager._get_sd_blob_hashes_for_stream(self.stream_hash) - - def _save_stream_info(sd_hash): - if sd_hash: - self.sd_hash = sd_hash[0] - d = self.wallet.get_claim_metadata_for_sd_hash(self.sd_hash) - d.addCallback(lambda r: _save_claim(r[0], r[1], r[2])) - return d - else: - return None - - def _save_claim_id(claim_id): - self.claim_id = claim_id - return defer.succeed(None) - - def _notify_bad_claim(name, txid, nout): - err_msg = "Error loading name claim for lbry file: \ - lbry://%s, tx %s output %i does not contain a valid claim, deleting it" - log.error(err_msg, name, txid, nout) - return self.lbry_file_manager.delete_lbry_file(self) - - def _save_claim(name, txid, nout): - self.uri = name - self.txid = txid - self.nout = nout - d = self.wallet.get_claimid(name, txid, nout) - d.addCallbacks(_save_claim_id, lambda err: _notify_bad_claim(name, txid, nout)) - return d - - d.addCallback(_save_stream_info) - d.addCallback(lambda _: self._reupload()) - d.addCallback(lambda _: self.lbry_file_manager.get_lbry_file_status(self)) - - def restore_status(status): - if status == ManagedEncryptedFileDownloader.STATUS_RUNNING: - return self.start() - elif status == ManagedEncryptedFileDownloader.STATUS_STOPPED: - return defer.succeed(False) - elif status == ManagedEncryptedFileDownloader.STATUS_FINISHED: - self.completed = True - return defer.succeed(True) - - d.addCallback(restore_status) - return d + sd_hash = yield self.stream_info_manager._get_sd_blob_hashes_for_stream(self.stream_hash) + if sd_hash: + self.sd_hash = sd_hash[0] + else: + raise Exception("No sd hash for stream hash %s", self.stream_hash) + claim_metadata = yield self.wallet.get_claim_metadata_for_sd_hash(self.sd_hash) + if claim_metadata is None: + raise Exception("A claim doesn't exist for sd %s" % self.sd_hash) + self.uri, self.txid, self.nout = claim_metadata + self.claim_id = yield self.wallet.get_claimid(self.uri, self.txid, self.nout) + status = yield self.lbry_file_manager.get_lbry_file_status(self) + if status == ManagedEncryptedFileDownloader.STATUS_RUNNING: + yield self.start() + elif status == ManagedEncryptedFileDownloader.STATUS_STOPPED: + defer.returnValue(False) + elif status == ManagedEncryptedFileDownloader.STATUS_FINISHED: + self.completed = True + defer.returnValue(True) + else: + raise Exception("Unknown status for stream %s: %s", self.stream_hash, status) + @defer.inlineCallbacks def _reupload(self): if not conf.settings['reflector_reupload']: - return - reflector_server = random.choice(conf.settings['reflector_servers']) - return reupload.check_and_restore_availability(self, reflector_server) + defer.returnValue(None) + else: + reflector_server = random.choice(conf.settings['reflector_servers']) + yield reupload.check_and_restore_availability(self, reflector_server) @defer.inlineCallbacks def stop(self, err=None, change_status=True): @@ -107,34 +86,24 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): yield EncryptedFileDownloader.stop(self, err=err) if change_status is True: status = yield self._save_status() + defer.returnValue(status) + @defer.inlineCallbacks def status(self): - def find_completed_blobhashes(blobs): - blobhashes = [b[0] for b in blobs if b[0] is not None] + blobs = yield self.stream_info_manager.get_blobs_for_stream(self.stream_hash) + blob_hashes = [b[0] for b in blobs if b[0] is not None] + completed_blobs = yield self.blob_manager.completed_blobs(blob_hashes) + num_blobs_completed = len(completed_blobs) + num_blobs_known = len(blob_hashes) - def get_num_completed(completed_blobs): - return len(completed_blobs), len(blobhashes) - - inner_d = self.blob_manager.completed_blobs(blobhashes) - inner_d.addCallback(get_num_completed) - return inner_d - - def make_full_status(progress): - num_completed = progress[0] - num_known = progress[1] - if self.completed is True: - s = "completed" - elif self.stopped is True: - s = "stopped" - else: - s = "running" - status = EncryptedFileStatusReport(self.file_name, num_completed, num_known, s) - return status - - d = self.stream_info_manager.get_blobs_for_stream(self.stream_hash) - d.addCallback(find_completed_blobhashes) - d.addCallback(make_full_status) - return d + if self.completed: + status = "completed" + elif self.stopped: + status = "stopped" + else: + status = "running" + defer.returnValue(EncryptedFileStatusReport(self.file_name, num_blobs_completed, + num_blobs_known, status)) @defer.inlineCallbacks def _start(self): @@ -166,8 +135,9 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): status = ManagedEncryptedFileDownloader.STATUS_STOPPED else: status = ManagedEncryptedFileDownloader.STATUS_RUNNING - yield self.lbry_file_manager.change_lbry_file_status(self, status) + status = yield self.lbry_file_manager.change_lbry_file_status(self, status) self._saving_status = False + defer.returnValue(status) def _get_progress_manager(self, download_manager): return FullStreamProgressManager(self._finished_downloading, @@ -181,35 +151,23 @@ class ManagedEncryptedFileDownloaderFactory(object): self.lbry_file_manager = lbry_file_manager def can_download(self, sd_validator): + # TODO: add a sd_validator for non live streams, use it return True - def make_downloader(self, metadata, options, payment_rate_manager, - download_directory=None, file_name=None): + @defer.inlineCallbacks + def make_downloader(self, metadata, options, payment_rate_manager, download_directory=None, + file_name=None): data_rate = options[0] upload_allowed = options[1] - - def save_source_if_blob(stream_hash): - if metadata.metadata_source == StreamMetadata.FROM_BLOB: - # TODO: should never have to dig this deep into a another classes - # members. lbry_file_manager should have a - # save_sd_blob_hash_to_stream method - d = self.lbry_file_manager.stream_info_manager.save_sd_blob_hash_to_stream( - stream_hash, metadata.source_blob_hash) - else: - d = defer.succeed(True) - d.addCallback(lambda _: stream_hash) - return d - - d = save_sd_info(self.lbry_file_manager.stream_info_manager, metadata.validator.raw_info) - d.addCallback(save_source_if_blob) - d.addCallback(lambda stream_hash: self.lbry_file_manager.add_lbry_file( - stream_hash, - payment_rate_manager, - data_rate, - upload_allowed, - download_directory=download_directory, - file_name=file_name)) - return d + stream_hash = yield save_sd_info(self.lbry_file_manager.stream_info_manager, + metadata.validator.raw_info) + if metadata.metadata_source == StreamMetadata.FROM_BLOB: + yield self.lbry_file_manager.save_sd_blob_hash_to_stream(stream_hash, + metadata.source_blob_hash) + lbry_file = yield self.lbry_file_manager.add_lbry_file(stream_hash, payment_rate_manager, + data_rate, upload_allowed, + download_directory, file_name) + defer.returnValue(lbry_file) @staticmethod def get_description():