diff --git a/CHANGELOG.md b/CHANGELOG.md index 7e5303185..3d711f009 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ at anytime. ### Fixed * loggly error reporting not following `share_usage_data` * improper error handling when data is not valid JSON + * edge cases of http mirrored download of blobs ### Deprecated * automatic claim renew, this is no longer needed diff --git a/lbrynet/blob/writer.py b/lbrynet/blob/writer.py index dc4d3d77a..e30a6d417 100644 --- a/lbrynet/blob/writer.py +++ b/lbrynet/blob/writer.py @@ -27,7 +27,7 @@ class HashBlobWriter(object): def write(self, data): if self.write_handle is None: - log.exception("writer has already been closed") + log.warning("writer has already been closed") raise IOError('I/O operation on closed file') self._hashsum.update(data) diff --git a/lbrynet/core/HTTPBlobDownloader.py b/lbrynet/core/HTTPBlobDownloader.py index b01621aaa..cf616d16b 100644 --- a/lbrynet/core/HTTPBlobDownloader.py +++ b/lbrynet/core/HTTPBlobDownloader.py @@ -3,7 +3,8 @@ import logging from twisted.internet import defer import treq -from twisted.internet.task import LoopingCall + +from lbrynet.core.Error import DownloadCanceledError log = logging.getLogger(__name__) @@ -21,65 +22,78 @@ class HTTPBlobDownloader(object): self.servers = servers or [] self.client = client or treq self.blob_hashes = blob_hashes or [] - self.looping_call = LoopingCall(self._download_next_blob_hash_for_file) - self.failures = 0 self.max_failures = 3 - self.interval = 1 - - @property - def running(self): - return self.looping_call.running + self.running = False + self.semaphore = defer.DeferredSemaphore(2) + self.deferreds = [] + self.writers = [] def start(self): if not self.running and self.blob_hashes and self.servers: - return self.looping_call.start(self.interval, now=True) + return self._start() defer.succeed(None) def stop(self): if self.running: + for d in reversed(self.deferreds): + d.cancel() + for writer in self.writers: + writer.close(DownloadCanceledError()) + self.running = False self.blob_hashes = [] - return self.looping_call.stop() @defer.inlineCallbacks - def _download_next_blob_hash_for_file(self): + def _start(self): + self.running = True + dl = [] for blob_hash in self.blob_hashes: blob = yield self.blob_manager.get_blob(blob_hash) if not blob.verified: - self.download_blob(blob) - return - self.stop() + d = self.semaphore.run(self.download_blob, blob) + d.addErrback(lambda err: err.check(defer.TimeoutError, defer.CancelledError)) + dl.append(d) + self.deferreds = dl + yield defer.DeferredList(dl) @defer.inlineCallbacks def download_blob(self, blob): - try: - yield self._download_blob(blob) - self.failures = 0 - except Exception as exception: - self.failures += 1 - log.exception('Mirror failed downloading') - if self.failures >= self.max_failures: - self.stop() - self.failures = 0 + for _ in range(self.max_failures): + writer, finished_deferred = blob.open_for_writing('mirror') + self.writers.append(writer) + try: + downloaded = yield self._write_blob(writer, blob) + if downloaded: + yield finished_deferred # yield for verification errors, so we log them + if blob.verified: + log.info('Mirror completed download for %s', blob.blob_hash) + break + except (IOError, Exception) as e: + if isinstance(e, DownloadCanceledError) or 'closed file' in str(e): + # some other downloader finished first or it was simply cancelled + log.info("Mirror download cancelled: %s", blob.blob_hash) + break + else: + log.exception('Mirror failed downloading') + finally: + finished_deferred.addBoth(lambda _: None) # suppress echoed errors + if 'mirror' in blob.writers: + writer.close() + self.writers.remove(writer) + @defer.inlineCallbacks - def _download_blob(self, blob): - if not blob.get_is_verified() and not blob.is_downloading() and 'mirror' not in blob.writers: - response = yield self.client.get(url_for(choice(self.servers), blob.blob_hash)) - if response.code != 200: - log.debug('[Mirror] Missing a blob: %s', blob.blob_hash) - if blob.blob_hash in self.blob_hashes: - self.blob_hashes.remove(blob.blob_hash) - defer.returnValue(blob.blob_hash) - log.debug('[Mirror] Download started: %s', blob.blob_hash) - blob.set_length(response.length) - writer, finished_deferred = blob.open_for_writing('mirror') - try: - yield self.client.collect(response, writer.write) - log.info('Mirror completed download for %s', blob.blob_hash) - except Exception as e: - writer.close(e) - yield finished_deferred - defer.returnValue(blob.blob_hash) + def _write_blob(self, writer, blob): + response = yield self.client.get(url_for(choice(self.servers), blob.blob_hash)) + if response.code != 200: + log.debug('Missing a blob: %s', blob.blob_hash) + if blob.blob_hash in self.blob_hashes: + self.blob_hashes.remove(blob.blob_hash) + defer.returnValue(False) + + log.debug('Download started: %s', blob.blob_hash) + blob.set_length(response.length) + yield self.client.collect(response, writer.write) + defer.returnValue(True) def url_for(server, blob_hash=''): diff --git a/lbrynet/tests/unit/core/test_HTTPBlobDownloader.py b/lbrynet/tests/unit/core/test_HTTPBlobDownloader.py index 6020dbea0..9187b55d9 100644 --- a/lbrynet/tests/unit/core/test_HTTPBlobDownloader.py +++ b/lbrynet/tests/unit/core/test_HTTPBlobDownloader.py @@ -36,6 +36,24 @@ class HTTPBlobDownloaderTest(unittest.TestCase): self.assertEqual(self.blob.get_is_verified(), True) self.assertEqual(self.blob.writers, {}) + @defer.inlineCallbacks + def test_download_invalid_content(self): + self.client.collect.side_effect = bad_collect + yield self.downloader.start() + self.assertEqual(self.blob.get_length(), self.response.length) + self.assertEqual(self.blob.get_is_verified(), False) + self.assertEqual(self.blob.writers, {}) + + @defer.inlineCallbacks + def test_peer_finished_first_causing_a_write_on_closed_handle(self): + self.client.collect.side_effect = lambda response, write: defer.fail(IOError('I/O operation on closed file')) + yield self.downloader.start() + self.blob_manager.get_blob.assert_called_with(self.blob_hash) + self.client.get.assert_called_with('http://{}/{}'.format('server1', self.blob_hash)) + self.client.collect.assert_called() + self.assertEqual(self.blob.get_length(), self.response.length) + self.assertEqual(self.blob.writers, {}) + @defer.inlineCallbacks def test_download_transfer_failed(self): self.client.collect.side_effect = lambda response, write: defer.fail(Exception()) @@ -56,7 +74,22 @@ class HTTPBlobDownloaderTest(unittest.TestCase): self.assertEqual(self.blob.get_is_verified(), False) self.assertEqual(self.blob.writers, {}) + @defer.inlineCallbacks + def test_stop(self): + self.client.collect.side_effect = lambda response, write: defer.Deferred() + self.downloader.start() # hangs if yielded, as intended, to simulate a long ongoing write while we call stop + yield self.downloader.stop() + self.blob_manager.get_blob.assert_called_with(self.blob_hash) + self.client.get.assert_called_with('http://{}/{}'.format('server1', self.blob_hash)) + self.client.collect.assert_called() + self.assertEqual(self.blob.get_length(), self.response.length) + self.assertEqual(self.blob.get_is_verified(), False) + self.assertEqual(self.blob.writers, {}) + def collect(response, write): write('f' * response.length) - defer.succeed(None) + + +def bad_collect(response, write): + write('0' * response.length)