convert EncryptedFileDownloader to inlineCallbacks

This commit is contained in:
Jack Robison 2017-02-09 13:21:31 -05:00
parent bb9b221cf6
commit 8de6bd7c7a

View file

@ -48,57 +48,36 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
def saving_status(self): def saving_status(self):
return self._saving_status return self._saving_status
@defer.inlineCallbacks
def restore(self): def restore(self):
d = self.stream_info_manager._get_sd_blob_hashes_for_stream(self.stream_hash) sd_hash = yield self.stream_info_manager._get_sd_blob_hashes_for_stream(self.stream_hash)
if sd_hash:
def _save_stream_info(sd_hash): self.sd_hash = sd_hash[0]
if sd_hash: else:
self.sd_hash = sd_hash[0] raise Exception("No sd hash for stream hash %s", self.stream_hash)
d = self.wallet.get_claim_metadata_for_sd_hash(self.sd_hash) claim_metadata = yield self.wallet.get_claim_metadata_for_sd_hash(self.sd_hash)
d.addCallback(lambda r: _save_claim(r[0], r[1], r[2])) if claim_metadata is None:
return d raise Exception("A claim doesn't exist for sd %s" % self.sd_hash)
else: self.uri, self.txid, self.nout = claim_metadata
return None self.claim_id = yield self.wallet.get_claimid(self.uri, self.txid, self.nout)
status = yield self.lbry_file_manager.get_lbry_file_status(self)
def _save_claim_id(claim_id): if status == ManagedEncryptedFileDownloader.STATUS_RUNNING:
self.claim_id = claim_id yield self.start()
return defer.succeed(None) elif status == ManagedEncryptedFileDownloader.STATUS_STOPPED:
defer.returnValue(False)
def _notify_bad_claim(name, txid, nout): elif status == ManagedEncryptedFileDownloader.STATUS_FINISHED:
err_msg = "Error loading name claim for lbry file: \ self.completed = True
lbry://%s, tx %s output %i does not contain a valid claim, deleting it" defer.returnValue(True)
log.error(err_msg, name, txid, nout) else:
return self.lbry_file_manager.delete_lbry_file(self) raise Exception("Unknown status for stream %s: %s", self.stream_hash, status)
def _save_claim(name, txid, nout):
self.uri = name
self.txid = txid
self.nout = nout
d = self.wallet.get_claimid(name, txid, nout)
d.addCallbacks(_save_claim_id, lambda err: _notify_bad_claim(name, txid, nout))
return d
d.addCallback(_save_stream_info)
d.addCallback(lambda _: self._reupload())
d.addCallback(lambda _: self.lbry_file_manager.get_lbry_file_status(self))
def restore_status(status):
if status == ManagedEncryptedFileDownloader.STATUS_RUNNING:
return self.start()
elif status == ManagedEncryptedFileDownloader.STATUS_STOPPED:
return defer.succeed(False)
elif status == ManagedEncryptedFileDownloader.STATUS_FINISHED:
self.completed = True
return defer.succeed(True)
d.addCallback(restore_status)
return d
@defer.inlineCallbacks
def _reupload(self): def _reupload(self):
if not conf.settings['reflector_reupload']: if not conf.settings['reflector_reupload']:
return defer.returnValue(None)
reflector_server = random.choice(conf.settings['reflector_servers']) else:
return reupload.check_and_restore_availability(self, reflector_server) reflector_server = random.choice(conf.settings['reflector_servers'])
yield reupload.check_and_restore_availability(self, reflector_server)
@defer.inlineCallbacks @defer.inlineCallbacks
def stop(self, err=None, change_status=True): def stop(self, err=None, change_status=True):
@ -107,34 +86,24 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
yield EncryptedFileDownloader.stop(self, err=err) yield EncryptedFileDownloader.stop(self, err=err)
if change_status is True: if change_status is True:
status = yield self._save_status() status = yield self._save_status()
defer.returnValue(status)
@defer.inlineCallbacks
def status(self): def status(self):
def find_completed_blobhashes(blobs): blobs = yield self.stream_info_manager.get_blobs_for_stream(self.stream_hash)
blobhashes = [b[0] for b in blobs if b[0] is not None] blob_hashes = [b[0] for b in blobs if b[0] is not None]
completed_blobs = yield self.blob_manager.completed_blobs(blob_hashes)
num_blobs_completed = len(completed_blobs)
num_blobs_known = len(blob_hashes)
def get_num_completed(completed_blobs): if self.completed:
return len(completed_blobs), len(blobhashes) status = "completed"
elif self.stopped:
inner_d = self.blob_manager.completed_blobs(blobhashes) status = "stopped"
inner_d.addCallback(get_num_completed) else:
return inner_d status = "running"
defer.returnValue(EncryptedFileStatusReport(self.file_name, num_blobs_completed,
def make_full_status(progress): num_blobs_known, status))
num_completed = progress[0]
num_known = progress[1]
if self.completed is True:
s = "completed"
elif self.stopped is True:
s = "stopped"
else:
s = "running"
status = EncryptedFileStatusReport(self.file_name, num_completed, num_known, s)
return status
d = self.stream_info_manager.get_blobs_for_stream(self.stream_hash)
d.addCallback(find_completed_blobhashes)
d.addCallback(make_full_status)
return d
@defer.inlineCallbacks @defer.inlineCallbacks
def _start(self): def _start(self):
@ -166,8 +135,9 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
status = ManagedEncryptedFileDownloader.STATUS_STOPPED status = ManagedEncryptedFileDownloader.STATUS_STOPPED
else: else:
status = ManagedEncryptedFileDownloader.STATUS_RUNNING status = ManagedEncryptedFileDownloader.STATUS_RUNNING
yield self.lbry_file_manager.change_lbry_file_status(self, status) status = yield self.lbry_file_manager.change_lbry_file_status(self, status)
self._saving_status = False self._saving_status = False
defer.returnValue(status)
def _get_progress_manager(self, download_manager): def _get_progress_manager(self, download_manager):
return FullStreamProgressManager(self._finished_downloading, return FullStreamProgressManager(self._finished_downloading,
@ -181,35 +151,23 @@ class ManagedEncryptedFileDownloaderFactory(object):
self.lbry_file_manager = lbry_file_manager self.lbry_file_manager = lbry_file_manager
def can_download(self, sd_validator): def can_download(self, sd_validator):
# TODO: add a sd_validator for non live streams, use it
return True return True
def make_downloader(self, metadata, options, payment_rate_manager, @defer.inlineCallbacks
download_directory=None, file_name=None): def make_downloader(self, metadata, options, payment_rate_manager, download_directory=None,
file_name=None):
data_rate = options[0] data_rate = options[0]
upload_allowed = options[1] upload_allowed = options[1]
stream_hash = yield save_sd_info(self.lbry_file_manager.stream_info_manager,
def save_source_if_blob(stream_hash): metadata.validator.raw_info)
if metadata.metadata_source == StreamMetadata.FROM_BLOB: if metadata.metadata_source == StreamMetadata.FROM_BLOB:
# TODO: should never have to dig this deep into a another classes yield self.lbry_file_manager.save_sd_blob_hash_to_stream(stream_hash,
# members. lbry_file_manager should have a metadata.source_blob_hash)
# save_sd_blob_hash_to_stream method lbry_file = yield self.lbry_file_manager.add_lbry_file(stream_hash, payment_rate_manager,
d = self.lbry_file_manager.stream_info_manager.save_sd_blob_hash_to_stream( data_rate, upload_allowed,
stream_hash, metadata.source_blob_hash) download_directory, file_name)
else: defer.returnValue(lbry_file)
d = defer.succeed(True)
d.addCallback(lambda _: stream_hash)
return d
d = save_sd_info(self.lbry_file_manager.stream_info_manager, metadata.validator.raw_info)
d.addCallback(save_source_if_blob)
d.addCallback(lambda stream_hash: self.lbry_file_manager.add_lbry_file(
stream_hash,
payment_rate_manager,
data_rate,
upload_allowed,
download_directory=download_directory,
file_name=file_name))
return d
@staticmethod @staticmethod
def get_description(): def get_description():