From f4b2a05fffae3e69f9be9d0232343c951a8d0141 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 23 Aug 2018 18:35:47 -0400 Subject: [PATCH 1/3] fix a typo --- lbrynet/lbry_file/client/EncryptedFileDownloader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/lbry_file/client/EncryptedFileDownloader.py b/lbrynet/lbry_file/client/EncryptedFileDownloader.py index 9e5d218a0..2c230ec7f 100644 --- a/lbrynet/lbry_file/client/EncryptedFileDownloader.py +++ b/lbrynet/lbry_file/client/EncryptedFileDownloader.py @@ -25,7 +25,7 @@ class EncryptedFileDownloader(CryptStreamDownloader): payment_rate_manager, wallet, key, stream_name) self.stream_hash = stream_hash self.storage = storage - self.file_name = binascii.unhexlify(os.path.basename(file_name)) + self.file_name = os.path.basename(binascii.unhexlify(file_name)) self._calculated_total_bytes = None @defer.inlineCallbacks From a7a53f9381ed93340bec58aa45595605191dabf1 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 23 Aug 2018 18:36:08 -0400 Subject: [PATCH 2/3] fix mirror downloader looping call --- lbrynet/core/HTTPBlobDownloader.py | 126 ++++++++++++------ .../unit/core/test_HTTPBlobDownloader.py | 3 +- 2 files changed, 84 insertions(+), 45 deletions(-) diff --git a/lbrynet/core/HTTPBlobDownloader.py b/lbrynet/core/HTTPBlobDownloader.py index f5f706b5a..0c36f232c 100644 --- a/lbrynet/core/HTTPBlobDownloader.py +++ b/lbrynet/core/HTTPBlobDownloader.py @@ -1,6 +1,7 @@ from random import choice import logging from twisted.internet import defer, task +from twisted.internet.error import ConnectingCancelledError import treq from lbrynet.core.utils import DeferredDict from lbrynet.core.Error import DownloadCanceledError @@ -16,69 +17,105 @@ class HTTPBlobDownloader(object): to cancel other writers when a writer finishes first. That's why there is no call to cancel/resume/stop between different types of downloaders. ''' - def __init__(self, blob_manager, blob_hashes=None, servers=None, client=None, sd_hashes=None, retry=True): + def __init__(self, blob_manager, blob_hashes=None, servers=None, client=None, sd_hashes=None, retry=True, + clock=None): + if not clock: + from twisted.internet import reactor + self.clock = reactor + else: + self.clock = clock self.blob_manager = blob_manager self.servers = servers or [] self.client = client or treq self.blob_hashes = blob_hashes or [] self.missing_blob_hashes = [] + self.downloaded_blob_hashes = [] self.sd_hashes = sd_hashes or [] self.head_blob_hashes = [] self.max_failures = 3 - self.running = False self.semaphore = defer.DeferredSemaphore(2) self.deferreds = [] self.writers = [] self.retry = retry - self.looping_call = task.LoopingCall(self._download_and_retry) + self.looping_call = task.LoopingCall(self._download_lc) + self.looping_call.clock = self.clock self.finished_deferred = defer.Deferred() - self.last_missing = 100000000 + self.finished_deferred.addErrback(lambda err: err.trap(defer.CancelledError)) + self.short_delay = 30 + self.long_delay = 600 + self.delay = self.short_delay + self.last_missing = 10000000 + self.lc_deferred = None @defer.inlineCallbacks - def _download_and_retry(self): - if not self.running and self.blob_hashes and self.servers: - yield self._download_blobs() - if self.retry and self.missing_blob_hashes: - if len(self.missing_blob_hashes) < self.last_missing: - self.last_missing = len(self.missing_blob_hashes) - log.info("queueing retry of %i blobs", len(self.missing_blob_hashes)) - while self.missing_blob_hashes: - self.blob_hashes.append(self.missing_blob_hashes.pop()) - defer.returnValue(None) - if self.looping_call.running: - self.looping_call.stop() - if self.retry and self.last_missing and len(self.missing_blob_hashes) == self.last_missing: - log.info("mirror not making progress, trying less frequently") - self.looping_call.start(600, now=False) - elif not self.finished_deferred.called: - self.finished_deferred.callback(None) - log.info("mirror finished") - def start(self): - if not self.running: - self.looping_call.start(30) - self.running = True - return self.finished_deferred + if not self.looping_call.running: + self.lc_deferred = self.looping_call.start(self.short_delay, now=True) + self.lc_deferred.addErrback(lambda err: err.trap(defer.CancelledError)) + yield self.finished_deferred def stop(self): - if self.running: - for d in reversed(self.deferreds): - d.cancel() - while self.writers: - writer = self.writers.pop() - writer.close(DownloadCanceledError()) - self.running = False - self.blob_hashes = [] + for d in reversed(self.deferreds): + d.cancel() + while self.writers: + writer = self.writers.pop() + writer.close(DownloadCanceledError()) + self.blob_hashes = [] if self.looping_call.running: self.looping_call.stop() + if self.lc_deferred and not self.lc_deferred.called: + self.lc_deferred.cancel() + if not self.finished_deferred.called: + self.finished_deferred.cancel() @defer.inlineCallbacks - def _download_blobs(self): - blobs = yield DeferredDict( - {blob_hash: self.blob_manager.get_blob(blob_hash) for blob_hash in self.blob_hashes} - ) - self.deferreds = [self.download_blob(blobs[blob_hash]) for blob_hash in self.blob_hashes] - yield defer.DeferredList(self.deferreds) + def _download_lc(self): + delay = yield self._download_and_get_retry_delay() + log.debug("delay: %s, missing: %i, downloaded from mirror: %i", delay, len(self.missing_blob_hashes), + len(self.downloaded_blob_hashes)) + while self.missing_blob_hashes: + self.blob_hashes.append(self.missing_blob_hashes.pop()) + if not delay: + if self.looping_call.running: + self.looping_call.stop() + if not self.finished_deferred.called: + log.debug("mirror finished") + self.finished_deferred.callback(None) + elif delay and delay != self.delay: + if delay == self.long_delay: + log.debug("mirror not making progress, trying less frequently") + elif delay == self.short_delay: + log.debug("queueing retry of %i blobs", len(self.missing_blob_hashes)) + if self.looping_call.running: + self.looping_call.stop() + self.delay = delay + self.looping_call = task.LoopingCall(self._download_lc) + self.looping_call.clock = self.clock + self.lc_deferred = self.looping_call.start(self.delay, now=False) + self.lc_deferred.addErrback(lambda err: err.trap(defer.CancelledError)) + yield self.finished_deferred + + @defer.inlineCallbacks + def _download_and_get_retry_delay(self): + if self.blob_hashes and self.servers: + if self.sd_hashes: + log.debug("trying to download stream from mirror (sd %s)", self.sd_hashes[0][:8]) + else: + log.debug("trying to download %i blobs from mirror", len(self.blob_hashes)) + blobs = yield DeferredDict( + {blob_hash: self.blob_manager.get_blob(blob_hash) for blob_hash in self.blob_hashes} + ) + self.deferreds = [self.download_blob(blobs[blob_hash]) for blob_hash in self.blob_hashes] + yield defer.DeferredList(self.deferreds) + if self.retry and self.missing_blob_hashes: + if not self.downloaded_blob_hashes: + defer.returnValue(self.long_delay) + if len(self.missing_blob_hashes) < self.last_missing: + self.last_missing = len(self.missing_blob_hashes) + defer.returnValue(self.short_delay) + if self.retry and self.last_missing and len(self.missing_blob_hashes) == self.last_missing: + defer.returnValue(self.long_delay) + defer.returnValue(None) @defer.inlineCallbacks def _download_blob(self, blob): @@ -93,9 +130,12 @@ class HTTPBlobDownloader(object): log.info('Mirror completed download for %s', blob.blob_hash) should_announce = blob.blob_hash in self.sd_hashes or blob.blob_hash in self.head_blob_hashes yield self.blob_manager.blob_completed(blob, should_announce=should_announce) + self.downloaded_blob_hashes.append(blob.blob_hash) break - except (IOError, Exception, defer.CancelledError) as e: - if isinstance(e, (DownloadCanceledError, defer.CancelledError)) or 'closed file' in str(e): + except (IOError, Exception, defer.CancelledError, ConnectingCancelledError) as e: + if isinstance( + e, (DownloadCanceledError, defer.CancelledError, ConnectingCancelledError) + ) or 'closed file' in str(e): # some other downloader finished first or it was simply cancelled log.info("Mirror download cancelled: %s", blob.blob_hash) break diff --git a/lbrynet/tests/unit/core/test_HTTPBlobDownloader.py b/lbrynet/tests/unit/core/test_HTTPBlobDownloader.py index c6b0efcee..3c40e997a 100644 --- a/lbrynet/tests/unit/core/test_HTTPBlobDownloader.py +++ b/lbrynet/tests/unit/core/test_HTTPBlobDownloader.py @@ -75,11 +75,10 @@ class HTTPBlobDownloaderTest(unittest.TestCase): self.assertEqual(self.blob.get_is_verified(), False) self.assertEqual(self.blob.writers, {}) - @defer.inlineCallbacks def test_stop(self): self.client.collect.side_effect = lambda response, write: defer.Deferred() self.downloader.start() # hangs if yielded, as intended, to simulate a long ongoing write while we call stop - yield self.downloader.stop() + self.downloader.stop() self.blob_manager.get_blob.assert_called_with(self.blob_hash) self.client.get.assert_called_with('http://{}/{}'.format('server1', self.blob_hash)) self.client.collect.assert_called() From 18db2aa97040e54777ab07e33a9c4e847dfcb32e Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 23 Aug 2018 22:38:55 -0400 Subject: [PATCH 3/3] changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c95e2548c..6c9fe3b66 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,7 @@ at anytime. ### Fixed * issue in dht ping queue where enqueued pings that aren't yet due wouldn't be rescheduled - * blob mirror downloader not finishing streams that were partially uploaded + * blob mirror downloader not finishing streams that were partially uploaded at the time of the download attempt (https://github.com/lbryio/lbry/issues/1376) ### Deprecated *