From 09e25ba285e65bb07575fbc571f047ee9b9a9b78 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Tue, 26 Sep 2017 00:13:47 -0400 Subject: [PATCH 01/30] add reader, for BlobFile, change original reader to v0 --- lbrynet/blob/blob_file.py | 4 ++-- lbrynet/blob/reader.py | 23 ++++++++++++++++++++++- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index f1a2010ee..56e3a4ed6 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -9,7 +9,7 @@ from lbrynet import conf from lbrynet.core.Error import DownloadCanceledError, InvalidDataError, InvalidBlobHashError from lbrynet.core.utils import is_valid_blobhash from lbrynet.blob.writer import HashBlobWriter -from lbrynet.blob.reader import HashBlobReader +from lbrynet.blob.reader import HashBlobReader, HashBlobReader_v0 log = logging.getLogger(__name__) @@ -155,7 +155,7 @@ class BlobFile(object): return args[0] file_sender = FileSender() - reader = HashBlobReader(write_func) + reader = HashBlobReader_v0(write_func) file_handle = self.open_for_reading() if file_handle is not None: d = file_sender.beginFileTransfer(file_handle, reader) diff --git a/lbrynet/blob/reader.py b/lbrynet/blob/reader.py index c85cc38f3..fb4724d3a 100644 --- a/lbrynet/blob/reader.py +++ b/lbrynet/blob/reader.py @@ -5,7 +5,7 @@ from zope.interface import implements log = logging.getLogger(__name__) -class HashBlobReader(object): +class HashBlobReader_v0(object): implements(interfaces.IConsumer) def __init__(self, write_func): @@ -28,3 +28,24 @@ class HashBlobReader(object): self.write_func(data) if self.streaming is False: reactor.callLater(0, self.producer.resumeProducing) + +class HashBlobReader(object): + def __init__(self, file_path, finished_cb): + self.finished_cb = finished_cb + self.finished_cb_d = None + self.read_handle = open(file_path, 'rb') + + def __del__(self): + self.close() + + def read(self, size=-1): + return self.read_handle.read(size) + + def close(self): + # if we've already closed and called finished_cb, do nothing + if self.finished_cb_d is not None: + return + self.read_handle.close() + self.finished_cb_d = self.finished_cb(self) + + From 3b5690614c1794f5c53e6d3893ac027e0b79a429 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Wed, 27 Sep 2017 17:00:21 -0400 Subject: [PATCH 02/30] Return HashBlobReader instead of file handle in open_for_reading() --- lbrynet/blob/blob_file.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index 56e3a4ed6..a1095d528 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -75,19 +75,17 @@ class BlobFile(object): """ open blob for reading - returns a file handle that can be read() from. - once finished with the file handle, user must call close_read_handle() - otherwise blob cannot be deleted. + returns a file like object that can be read() from, and closed() when + finished """ if self._verified is True: - file_handle = None try: - file_handle = open(self.file_path, 'rb') + reader = HashBlobReader(self.file_path, self.reader_finished) self.readers += 1 - return file_handle + return reader except IOError: log.exception('Failed to open %s', self.file_path) - self.close_read_handle(file_handle) + reader.close() return None def delete(self): @@ -164,6 +162,10 @@ class BlobFile(object): d = defer.fail(IOError("Could not read the blob")) return d + def reader_finished(self, reader): + self.readers -= 1 + return defer.succeed(True) + def writer_finished(self, writer, err=None): def fire_finished_deferred(): self._verified = True From f6da00cbaba33c4ad05862f9aee8d5fca95ec5a2 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Wed, 27 Sep 2017 16:53:31 -0400 Subject: [PATCH 03/30] no need to catch IOError exception here, let it propagage --- lbrynet/blob/blob_file.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index a1095d528..c73f14ca1 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -79,13 +79,9 @@ class BlobFile(object): finished """ if self._verified is True: - try: - reader = HashBlobReader(self.file_path, self.reader_finished) - self.readers += 1 - return reader - except IOError: - log.exception('Failed to open %s', self.file_path) - reader.close() + reader = HashBlobReader(self.file_path, self.reader_finished) + self.readers += 1 + return reader return None def delete(self): From aab43c8d6cc78a83d6e26535bbc3551dd3a14d9c Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Wed, 27 Sep 2017 17:00:51 -0400 Subject: [PATCH 04/30] close the reader directly instead of calling close_read_handle() --- lbrynet/core/StreamDescriptor.py | 2 +- lbrynet/core/server/BlobRequestHandler.py | 4 ++-- lbrynet/daemon/Daemon.py | 4 ++-- lbrynet/reflector/client/blob.py | 3 ++- lbrynet/reflector/client/client.py | 2 +- 5 files changed, 8 insertions(+), 7 deletions(-) diff --git a/lbrynet/core/StreamDescriptor.py b/lbrynet/core/StreamDescriptor.py index b3fb714cb..d0bd28310 100644 --- a/lbrynet/core/StreamDescriptor.py +++ b/lbrynet/core/StreamDescriptor.py @@ -55,7 +55,7 @@ class BlobStreamDescriptorReader(StreamDescriptorReader): f = self.blob.open_for_reading() if f is not None: raw_data = f.read() - self.blob.close_read_handle(f) + f.close() return raw_data else: raise ValueError("Could not open the blob for reading") diff --git a/lbrynet/core/server/BlobRequestHandler.py b/lbrynet/core/server/BlobRequestHandler.py index 308d0c822..e075fe086 100644 --- a/lbrynet/core/server/BlobRequestHandler.py +++ b/lbrynet/core/server/BlobRequestHandler.py @@ -89,7 +89,7 @@ class BlobRequestHandler(object): def cancel_send(self, err): if self.currently_uploading is not None: - self.currently_uploading.close_read_handle(self.read_handle) + self.read_handle.close() self.read_handle = None self.currently_uploading = None return err @@ -225,7 +225,7 @@ class BlobRequestHandler(object): def set_not_uploading(reason=None): if self.currently_uploading is not None: - self.currently_uploading.close_read_handle(self.read_handle) + self.read_handle.close() self.read_handle = None self.currently_uploading = None self.file_sender = None diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index f77f183c0..a56e3d910 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -2375,7 +2375,7 @@ class Daemon(AuthJSONRPCServer): if encoding and encoding in decoders: blob_file = blob.open_for_reading() result = decoders[encoding](blob_file.read()) - blob.close_read_handle(blob_file) + blob_file.close() else: result = "Downloaded blob %s" % blob_hash @@ -2624,7 +2624,7 @@ class Daemon(AuthJSONRPCServer): def read_sd_blob(sd_blob): sd_blob_file = sd_blob.open_for_reading() decoded_sd_blob = json.loads(sd_blob_file.read()) - sd_blob.close_read_handle(sd_blob_file) + sd_blob_file.close() return decoded_sd_blob resolved_result = yield self.session.wallet.resolve(uri) diff --git a/lbrynet/reflector/client/blob.py b/lbrynet/reflector/client/blob.py index 1f1c540a2..d2c41ce30 100644 --- a/lbrynet/reflector/client/blob.py +++ b/lbrynet/reflector/client/blob.py @@ -94,7 +94,7 @@ class BlobReflectorClient(Protocol): def set_not_uploading(self): if self.next_blob_to_send is not None: - self.next_blob_to_send.close_read_handle(self.read_handle) + self.read_handle.close() self.read_handle = None self.next_blob_to_send = None self.file_sender = None @@ -105,6 +105,7 @@ class BlobReflectorClient(Protocol): assert self.read_handle is not None, \ "self.read_handle was None when trying to start the transfer" d = self.file_sender.beginFileTransfer(self.read_handle, self) + d.addCallback(lambda _: self.read_handle.close()) return d def handle_handshake_response(self, response_dict): diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index ebf605b02..660abbd39 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -179,7 +179,7 @@ class EncryptedFileReflectorClient(Protocol): def set_not_uploading(self): if self.next_blob_to_send is not None: log.debug("Close %s", self.next_blob_to_send) - self.next_blob_to_send.close_read_handle(self.read_handle) + self.read_handle.close() self.read_handle = None self.next_blob_to_send = None if self.file_sender is not None: From a4ea49cf6588c4079b11465e8416d4b378512476 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Wed, 27 Sep 2017 17:02:17 -0400 Subject: [PATCH 05/30] read handle was not being closed after file transfer --- lbrynet/reflector/client/client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index 660abbd39..7e0060f93 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -191,6 +191,7 @@ class EncryptedFileReflectorClient(Protocol): assert self.read_handle is not None, \ "self.read_handle was None when trying to start the transfer" d = self.file_sender.beginFileTransfer(self.read_handle, self) + d.addCallback(lambda _: self.read_handle.close()) return d def handle_handshake_response(self, response_dict): From 533835a7321bf034d05f04d8a4104211630b5b3b Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Wed, 27 Sep 2017 16:56:29 -0400 Subject: [PATCH 06/30] add comments and docstrings --- lbrynet/blob/blob_file.py | 18 +++++++++++++----- lbrynet/blob/reader.py | 8 ++++++++ 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index c73f14ca1..993a77400 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -144,6 +144,10 @@ class BlobFile(object): return False def read(self, write_func): + """ + This function is only used in StreamBlobDecryptor + and should be deprecated in favor of open_for_reading() + """ def close_self(*args): self.close_read_handle(file_handle) return args[0] @@ -158,6 +162,15 @@ class BlobFile(object): d = defer.fail(IOError("Could not read the blob")) return d + def close_read_handle(self, file_handle): + """ + This function is only used in StreamBlobDecryptor + and should be deprecated in favor of open_for_reading() + """ + if file_handle is not None: + file_handle.close() + self.readers -= 1 + def reader_finished(self, reader): self.readers -= 1 return defer.succeed(True) @@ -206,11 +219,6 @@ class BlobFile(object): d.addBoth(lambda _: writer.close_handle()) return d - def close_read_handle(self, file_handle): - if file_handle is not None: - file_handle.close() - self.readers -= 1 - @defer.inlineCallbacks def _save_verified_blob(self, writer): with self.setting_verified_blob_lock: diff --git a/lbrynet/blob/reader.py b/lbrynet/blob/reader.py index fb4724d3a..292516a02 100644 --- a/lbrynet/blob/reader.py +++ b/lbrynet/blob/reader.py @@ -6,6 +6,10 @@ log = logging.getLogger(__name__) class HashBlobReader_v0(object): + """ + This is a class that is only used in StreamBlobDecryptor + and should be deprecated + """ implements(interfaces.IConsumer) def __init__(self, write_func): @@ -30,6 +34,10 @@ class HashBlobReader_v0(object): reactor.callLater(0, self.producer.resumeProducing) class HashBlobReader(object): + """ + This is a file like reader class that supports + read(size) and close() + """ def __init__(self, file_path, finished_cb): self.finished_cb = finished_cb self.finished_cb_d = None From cf3a0e2e9d93dc4dbd8ddaa862c20cb80db698b7 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Tue, 26 Sep 2017 10:54:20 -0400 Subject: [PATCH 07/30] add some more tests for deleting blob files --- tests/unit/core/test_HashBlob.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/tests/unit/core/test_HashBlob.py b/tests/unit/core/test_HashBlob.py index d1c282478..93a19ef19 100644 --- a/tests/unit/core/test_HashBlob.py +++ b/tests/unit/core/test_HashBlob.py @@ -44,9 +44,15 @@ class BlobFileTest(unittest.TestCase): blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) self.assertTrue(blob_file.verified) f = blob_file.open_for_reading() + self.assertEqual(1, blob_file.readers) c = f.read() self.assertEqual(c, self.fake_content) + # close reader + f.close() + self.assertEqual(0, blob_file.readers) + + @defer.inlineCallbacks def test_delete(self): blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) @@ -58,6 +64,21 @@ class BlobFileTest(unittest.TestCase): blob_file = BlobFile(self.blob_dir, self.fake_content_hash) self.assertFalse(blob_file.verified) + @defer.inlineCallbacks + def test_delete_fail(self): + # deletes should fail if being written to + blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) + writer, finished_d = blob_file.open_for_writing(peer=1) + yield self.assertFailure(blob_file.delete(), ValueError) + writer.write(self.fake_content) + writer.close() + + # deletes should fail if being read and not closed + blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) + self.assertTrue(blob_file.verified) + f = blob_file.open_for_reading() + yield self.assertFailure(blob_file.delete(), ValueError) + @defer.inlineCallbacks def test_too_much_write(self): # writing too much data should result in failure From 8c41b9ecdea9065d74142a825905fae596285147 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Wed, 27 Sep 2017 17:41:52 -0400 Subject: [PATCH 08/30] added changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ceff6db7e..7aaafa3a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ at anytime. ### Fixed * Fixed handling cancelled blob and availability requests * Fixed redundant blob requests to a peer + * Fixed https://github.com/lbryio/lbry/issues/923 ### Deprecated * Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead. From b0a3771ccfbc7726f3c14271daa141361995c887 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 27 Sep 2017 17:23:41 -0400 Subject: [PATCH 09/30] better download errors --- lbrynet/core/Error.py | 15 +++++++++++++++ lbrynet/core/client/StandaloneBlobDownloader.py | 4 ++-- lbrynet/daemon/Downloader.py | 17 +++++++++++++---- 3 files changed, 30 insertions(+), 6 deletions(-) diff --git a/lbrynet/core/Error.py b/lbrynet/core/Error.py index d1e8bb785..4822b7342 100644 --- a/lbrynet/core/Error.py +++ b/lbrynet/core/Error.py @@ -9,11 +9,26 @@ class DuplicateStreamHashError(Exception): class DownloadCanceledError(Exception): pass + +class DownloadSDTimeout(Exception): + def __init__(self, download): + Exception.__init__(self, 'Failed to download sd blob {} within timeout'.format(download)) + self.download = download + + class DownloadTimeoutError(Exception): def __init__(self, download): Exception.__init__(self, 'Failed to download {} within timeout'.format(download)) self.download = download + +class DownloadDataTimeout(Exception): + def __init__(self, download): + Exception.__init__(self, 'Failed to download data blobs for sd hash ' + '{} within timeout'.format(download)) + self.download = download + + class RequestCanceledError(Exception): pass diff --git a/lbrynet/core/client/StandaloneBlobDownloader.py b/lbrynet/core/client/StandaloneBlobDownloader.py index f7a108c65..10509fd27 100644 --- a/lbrynet/core/client/StandaloneBlobDownloader.py +++ b/lbrynet/core/client/StandaloneBlobDownloader.py @@ -5,7 +5,7 @@ from lbrynet.core.BlobInfo import BlobInfo from lbrynet.core.client.BlobRequester import BlobRequester from lbrynet.core.client.ConnectionManager import ConnectionManager from lbrynet.core.client.DownloadManager import DownloadManager -from lbrynet.core.Error import InvalidBlobHashError, DownloadTimeoutError +from lbrynet.core.Error import InvalidBlobHashError, DownloadSDTimeout from lbrynet.core.utils import is_valid_blobhash, safe_start_looping_call, safe_stop_looping_call from twisted.python.failure import Failure from twisted.internet import defer @@ -136,7 +136,7 @@ class StandaloneBlobDownloader(object): def _download_timedout(self): self.stop() if not self.finished_deferred.called: - self.finished_deferred.errback(DownloadTimeoutError(self.blob_hash)) + self.finished_deferred.errback(DownloadSDTimeout(self.blob_hash)) def insufficient_funds(self, err): self.stop() diff --git a/lbrynet/daemon/Downloader.py b/lbrynet/daemon/Downloader.py index 488f02886..f801ee474 100644 --- a/lbrynet/daemon/Downloader.py +++ b/lbrynet/daemon/Downloader.py @@ -5,7 +5,8 @@ from twisted.internet.task import LoopingCall from lbryschema.fee import Fee -from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed, DownloadTimeoutError +from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed +from lbrynet.core.Error import DownloadDataTimeout, DownloadCanceledError from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call from lbrynet.core.StreamDescriptor import download_sd_blob from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory @@ -68,7 +69,7 @@ class GetStream(object): if self.data_downloading_deferred.called: safe_stop_looping_call(self.checker) else: - log.info("Downloading stream data (%i seconds)", self.timeout_counter) + log.info("Waiting for stream data (%i seconds)", self.timeout_counter) def check_status(self): """ @@ -77,7 +78,8 @@ class GetStream(object): self.timeout_counter += 1 if self.timeout_counter >= self.timeout: if not self.data_downloading_deferred.called: - self.data_downloading_deferred.errback(DownloadTimeoutError(self.file_name)) + self.data_downloading_deferred.errback(DownloadDataTimeout(self.sd_hash)) + safe_stop_looping_call(self.checker) else: d = self.downloader.status() @@ -150,6 +152,12 @@ class GetStream(object): self._check_status(status) defer.returnValue(self.download_path) + def fail(self, err): + safe_stop_looping_call(self.checker) + if not err.check(DownloadDataTimeout): + raise err + return DownloadCanceledError() + @defer.inlineCallbacks def _initialize(self, stream_info): # Set sd_hash and return key_fee from stream_info @@ -179,7 +187,8 @@ class GetStream(object): log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path) self.finished_deferred = self.downloader.start() - self.finished_deferred.addCallback(self.finish, name) + self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), + self.fail) @defer.inlineCallbacks def start(self, stream_info, name): From 250855d45db08b6876038440c4b09c623abc6be7 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 27 Sep 2017 17:25:21 -0400 Subject: [PATCH 10/30] stop failed downloader outside of GetStream --- lbrynet/daemon/Downloader.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lbrynet/daemon/Downloader.py b/lbrynet/daemon/Downloader.py index f801ee474..3bfe4bf08 100644 --- a/lbrynet/daemon/Downloader.py +++ b/lbrynet/daemon/Downloader.py @@ -76,7 +76,7 @@ class GetStream(object): Check if we've got the first data blob in the stream yet """ self.timeout_counter += 1 - if self.timeout_counter >= self.timeout: + if self.timeout_counter > self.timeout: if not self.data_downloading_deferred.called: self.data_downloading_deferred.errback(DownloadDataTimeout(self.sd_hash)) @@ -213,9 +213,8 @@ class GetStream(object): try: yield self.data_downloading_deferred - except Exception as err: - self.downloader.stop() + except DownloadDataTimeout as err: safe_stop_looping_call(self.checker) - raise + raise err defer.returnValue((self.downloader, self.finished_deferred)) From 9de4657a4dffd040a78269cf1d16ffc43c24b782 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 27 Sep 2017 17:30:03 -0400 Subject: [PATCH 11/30] fix blob history and callback from _download_succeeded previously _download_succeeded did not block on blob_completed, presumably because even longer ago it did not block on a deriving immediate_announce call and thus took a long time to return --- lbrynet/core/client/BlobRequester.py | 9 +++++---- lbrynet/core/server/BlobRequestHandler.py | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/lbrynet/core/client/BlobRequester.py b/lbrynet/core/client/BlobRequester.py index 1ce4f7205..37185bbf0 100644 --- a/lbrynet/core/client/BlobRequester.py +++ b/lbrynet/core/client/BlobRequester.py @@ -516,8 +516,6 @@ class DownloadRequest(RequestHelper): def _pay_or_cancel_payment(self, arg, reserved_points, blob): if self._can_pay_peer(blob, arg): self._pay_peer(blob.length, reserved_points) - d = self.requestor.blob_manager.add_blob_to_download_history( - str(blob), str(self.peer.host), float(self.protocol_prices[self.protocol])) else: self._cancel_points(reserved_points) return arg @@ -567,8 +565,11 @@ class DownloadRequest(RequestHelper): self.peer.update_stats('blobs_downloaded', 1) self.peer.update_score(5.0) should_announce = blob.blob_hash == self.head_blob_hash - self.requestor.blob_manager.blob_completed(blob, should_announce=should_announce) - return arg + d = self.requestor.blob_manager.blob_completed(blob, should_announce=should_announce) + d.addCallback(lambda _: self.requestor.blob_manager.add_blob_to_download_history( + blob.blob_hash, self.peer.host, self.protocol_prices[self.protocol])) + d.addCallback(lambda _: arg) + return d def _download_failed(self, reason): if not reason.check(DownloadCanceledError, PriceDisagreementError): diff --git a/lbrynet/core/server/BlobRequestHandler.py b/lbrynet/core/server/BlobRequestHandler.py index 308d0c822..de98cf898 100644 --- a/lbrynet/core/server/BlobRequestHandler.py +++ b/lbrynet/core/server/BlobRequestHandler.py @@ -162,7 +162,7 @@ class BlobRequestHandler(object): def record_transaction(self, blob): d = self.blob_manager.add_blob_to_upload_history( - str(blob), self.peer.host, self.blob_data_payment_rate) + blob.blob_hash, self.peer.host, self.blob_data_payment_rate) return d def _reply_to_send_request(self, response, incoming): From af99edc764102116f2fd9ee22bef54560699880f Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 27 Sep 2017 17:30:22 -0400 Subject: [PATCH 12/30] add get_host_downloaded_from --- lbrynet/core/BlobManager.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index f6c329dc1..6293db4ff 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -128,6 +128,16 @@ class DiskBlobManager(DHTHashSupplier): d = self._add_blob_to_download_history(blob_hash, host, rate) return d + @defer.inlineCallbacks + def get_host_downloaded_from(self, blob_hash): + query_str = "SELECT host FROM download WHERE blob=? ORDER BY ts DESC LIMIT 1" + host = yield self.db_conn.runQuery(query_str, (blob_hash,)) + if host: + result = host[0][0] + else: + result = None + defer.returnValue(result) + def add_blob_to_upload_history(self, blob_hash, host, rate): d = self._add_blob_to_upload_history(blob_hash, host, rate) return d From c9ae251d60d8f7aee5b0acb27d1c119eefadb7d2 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 27 Sep 2017 17:02:36 -0400 Subject: [PATCH 13/30] download analytics --- lbrynet/analytics.py | 40 ++++++++++++++++----------- lbrynet/daemon/Daemon.py | 58 ++++++++++++++++++++++++++++++++-------- 2 files changed, 71 insertions(+), 27 deletions(-) diff --git a/lbrynet/analytics.py b/lbrynet/analytics.py index 02885fb6c..855bcfcee 100644 --- a/lbrynet/analytics.py +++ b/lbrynet/analytics.py @@ -57,15 +57,14 @@ class Manager(object): self._event(DOWNLOAD_STARTED, self._download_properties(id_, name, claim_dict)) ) - def send_download_errored(self, id_, name, claim_dict=None): - self.analytics_api.track( - self._event(DOWNLOAD_ERRORED, self._download_properties(id_, name, claim_dict)) - ) + def send_download_errored(self, err, id_, name, claim_dict, report): + download_error_properties = self._download_error_properties(err, id_, name, claim_dict, + report) + self.analytics_api.track(self._event(DOWNLOAD_ERRORED, download_error_properties)) - def send_download_finished(self, id_, name, claim_dict=None): - self.analytics_api.track( - self._event(DOWNLOAD_FINISHED, self._download_properties(id_, name, claim_dict)) - ) + def send_download_finished(self, id_, name, report, claim_dict=None): + download_properties = self._download_properties(id_, name, claim_dict, report) + self.analytics_api.track(self._event(DOWNLOAD_FINISHED, download_properties)) def send_claim_action(self, action): self.analytics_api.track(self._event(CLAIM_ACTION, {'action': action})) @@ -159,18 +158,27 @@ class Manager(object): return properties @staticmethod - def _download_properties(id_, name, claim_dict=None): - sd_hash = None - if claim_dict: - try: - sd_hash = claim_dict.source_hash - except (KeyError, TypeError, ValueError): - log.debug('Failed to get sd_hash from %s', claim_dict, exc_info=True) - return { + def _download_properties(id_, name, claim_dict=None, report=None): + sd_hash = None if not claim_dict else claim_dict.source_hash + p = { 'download_id': id_, 'name': name, 'stream_info': sd_hash } + if report: + p['report'] = report + return p + + @staticmethod + def _download_error_properties(error, id_, name, claim_dict, report): + return { + 'download_id': id_, + 'name': name, + 'stream_info': claim_dict.source_hash, + 'error': str(type(error)), + 'reason': error.message, + 'report': report + } @staticmethod def _make_context(platform, wallet): diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index f77f183c0..521134f87 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -46,7 +46,7 @@ from lbrynet.core.looping_call_manager import LoopingCallManager from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.Error import InsufficientFundsError, UnknownNameError, NoSuchSDHash -from lbrynet.core.Error import NoSuchStreamHash +from lbrynet.core.Error import NoSuchStreamHash, DownloadDataTimeout, DownloadSDTimeout from lbrynet.core.Error import NullFundsError, NegativeFundsError log = logging.getLogger(__name__) @@ -600,7 +600,40 @@ class Daemon(AuthJSONRPCServer): return download_sd_blob(self.session, blob_hash, rate_manager, timeout) @defer.inlineCallbacks - def _download_name(self, name, claim_dict, claim_id, timeout=None, file_name=None): + def _get_stream_analytics_report(self, claim_dict): + sd_hash = claim_dict.source_hash + try: + stream_hash = yield self.stream_info_manager.get_stream_hash_for_sd_hash(sd_hash) + except NoSuchSDHash: + stream_hash = None + report = { + "sd_hash": sd_hash, + "stream_hash": stream_hash, + } + blobs = {} + sd_host = yield self.session.blob_manager.get_host_downloaded_from(sd_hash) + report["sd_blob"] = sd_host + if stream_hash: + blob_infos = yield self.stream_info_manager.get_blobs_for_stream(stream_hash) + report["known_blobs"] = len(blob_infos) + else: + blob_infos = [] + report["known_blobs"] = 0 + for blob_hash, blob_num, iv, length in blob_infos: + host = yield self.session.blob_manager.get_host_downloaded_from(blob_hash) + if host: + blobs[blob_num] = host + report["blobs"] = blobs + defer.returnValue(report) + + @defer.inlineCallbacks + def _download_finished(self, download_id, name, claim_dict): + report = yield self._get_stream_analytics_report(claim_dict) + self.analytics_manager.send_download_finished(download_id, name, report, claim_dict) + + @defer.inlineCallbacks + def _download_name(self, name, claim_dict, claim_id, timeout=None, file_name=None, + delete_on_timeout=True): """ Add a lbry file to the file manager, start the download, and return the new lbry file. If it already exists in the file manager, return the existing lbry file @@ -621,17 +654,20 @@ class Daemon(AuthJSONRPCServer): file_name) try: lbry_file, finished_deferred = yield self.streams[claim_id].start(claim_dict, name) - finished_deferred.addCallback( - lambda _: self.analytics_manager.send_download_finished(download_id, - name, - claim_dict)) + finished_deferred.addCallback(lambda _: self._download_finished(download_id, name, + claim_dict)) result = yield self._get_lbry_file_dict(lbry_file, full_status=True) - del self.streams[claim_id] - except Exception as err: - log.warning('Failed to get %s: %s', name, err) - self.analytics_manager.send_download_errored(download_id, name, claim_dict) - del self.streams[claim_id] + except (DownloadDataTimeout, DownloadSDTimeout) as err: + log.warning('Failed to get %s (%s)', name, err) + report = yield self._get_stream_analytics_report(claim_dict) + if isinstance(err, DownloadDataTimeout) and delete_on_timeout: + yield self.lbry_file_manager.delete_lbry_file(self.streams[claim_id].downloader) + elif self.streams[claim_id].downloader: + yield self.streams[claim_id].downloader.stop(err) + self.analytics_manager.send_download_errored(err, download_id, name, claim_dict, + report) result = {'error': err.message} + del self.streams[claim_id] defer.returnValue(result) @defer.inlineCallbacks From fa83a4f67159acf71933662f9ab2f90bdf1b013c Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 27 Sep 2017 17:47:36 -0400 Subject: [PATCH 14/30] fix stop condition --- lbrynet/daemon/Downloader.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/lbrynet/daemon/Downloader.py b/lbrynet/daemon/Downloader.py index 3bfe4bf08..626c79947 100644 --- a/lbrynet/daemon/Downloader.py +++ b/lbrynet/daemon/Downloader.py @@ -10,7 +10,6 @@ from lbrynet.core.Error import DownloadDataTimeout, DownloadCanceledError from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call from lbrynet.core.StreamDescriptor import download_sd_blob from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory -from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet import conf INITIALIZING_CODE = 'initializing' @@ -62,9 +61,7 @@ class GetStream(object): return os.path.join(self.download_directory, self.downloader.file_name) def _check_status(self, status): - stop_condition = (status.num_completed > 0 or - status.running_status == ManagedEncryptedFileDownloader.STATUS_STOPPED) - if stop_condition and not self.data_downloading_deferred.called: + if status.num_completed > 0 and not self.data_downloading_deferred.called: self.data_downloading_deferred.callback(True) if self.data_downloading_deferred.called: safe_stop_looping_call(self.checker) From 81b584a35a029591b120f3ba49ccfe27d1f85773 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 27 Sep 2017 17:47:50 -0400 Subject: [PATCH 15/30] update tests and scripts --- scripts/query_available_blobs.py | 2 +- tests/unit/core/test_BlobManager.py | 4 +- tests/unit/lbrynet_daemon/test_Downloader.py | 92 +++++++++----------- 3 files changed, 46 insertions(+), 52 deletions(-) diff --git a/scripts/query_available_blobs.py b/scripts/query_available_blobs.py index 727c019c8..39e1f406f 100644 --- a/scripts/query_available_blobs.py +++ b/scripts/query_available_blobs.py @@ -59,7 +59,7 @@ def main(args=None): use_upnp=False, wallet=wallet ) - api = analytics.Api.new_instance() + api = analytics.Api.new_instance(conf.settings['share_usage_data']) run(args, session, api) reactor.run() finally: diff --git a/tests/unit/core/test_BlobManager.py b/tests/unit/core/test_BlobManager.py index f6b4a1f04..ddeee7c0c 100644 --- a/tests/unit/core/test_BlobManager.py +++ b/tests/unit/core/test_BlobManager.py @@ -50,10 +50,10 @@ class BlobManagerTest(unittest.TestCase): writer, finished_d = yield blob.open_for_writing(self.peer) yield writer.write(data) yield self.bm.blob_completed(blob) - yield self.bm.add_blob_to_upload_history(blob_hash,'test',len(data)) + yield self.bm.add_blob_to_upload_history(blob_hash, 'test', len(data)) # check to see if blob is there - self.assertTrue(os.path.isfile(os.path.join(self.blob_dir,blob_hash))) + self.assertTrue(os.path.isfile(os.path.join(self.blob_dir, blob_hash))) blobs = yield self.bm.get_all_verified_blobs() self.assertTrue(blob_hash in blobs) defer.returnValue(blob_hash) diff --git a/tests/unit/lbrynet_daemon/test_Downloader.py b/tests/unit/lbrynet_daemon/test_Downloader.py index df3e4315c..791e28c4d 100644 --- a/tests/unit/lbrynet_daemon/test_Downloader.py +++ b/tests/unit/lbrynet_daemon/test_Downloader.py @@ -1,26 +1,22 @@ import types import mock -import json from twisted.trial import unittest from twisted.internet import defer, task -from lbryschema.claim import ClaimDict from lbrynet.core import Session, PaymentRateManager, Wallet -from lbrynet.core.Error import DownloadTimeoutError +from lbrynet.core.Error import DownloadDataTimeout, DownloadSDTimeout from lbrynet.daemon import Downloader -from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier,StreamMetadata -from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier +from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier from lbrynet.file_manager.EncryptedFileStatusReport import EncryptedFileStatusReport -from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader, ManagedEncryptedFileDownloaderFactory +from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.daemon.ExchangeRateManager import ExchangeRateManager -from tests.mocks import BlobAvailabilityTracker as DummyBlobAvailabilityTracker from tests.mocks import ExchangeRateManager as DummyExchangeRateManager -from tests.mocks import BTCLBCFeed, USDBTCFeed from tests.mocks import mock_conf_settings + class MocDownloader(object): def __init__(self): self.finish_deferred = defer.Deferred(None) @@ -106,7 +102,7 @@ class GetStreamTests(unittest.TestCase): DownloadTimeoutError is raised """ def download_sd_blob(self): - raise DownloadTimeoutError(self.file_name) + raise DownloadSDTimeout(self.file_name) getstream = self.init_getstream_with_mocs() getstream._initialize = types.MethodType(moc_initialize, getstream) @@ -115,31 +111,30 @@ class GetStreamTests(unittest.TestCase): getstream.pay_key_fee = types.MethodType(moc_pay_key_fee, getstream) name='test' stream_info = None - with self.assertRaises(DownloadTimeoutError): + with self.assertRaises(DownloadSDTimeout): yield getstream.start(stream_info,name) self.assertFalse(getstream.pay_key_fee_called) - - @defer.inlineCallbacks - def test_timeout(self): - """ - test that timeout (set to 2 here) exception is raised - when download times out while downloading first blob, and key fee is paid - """ - getstream = self.init_getstream_with_mocs() - getstream._initialize = types.MethodType(moc_initialize, getstream) - getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream) - getstream._download = types.MethodType(moc_download, getstream) - getstream.pay_key_fee = types.MethodType(moc_pay_key_fee, getstream) - name='test' - stream_info = None - start = getstream.start(stream_info,name) - self.clock.advance(1) - self.clock.advance(1) - with self.assertRaises(DownloadTimeoutError): - yield start - self.assertTrue(getstream.downloader.stop_called) - self.assertTrue(getstream.pay_key_fee_called) + # @defer.inlineCallbacks + # def test_timeout(self): + # """ + # test that timeout (set to 2 here) exception is raised + # when download times out while downloading first blob, and key fee is paid + # """ + # getstream = self.init_getstream_with_mocs() + # getstream._initialize = types.MethodType(moc_initialize, getstream) + # getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream) + # getstream._download = types.MethodType(moc_download, getstream) + # getstream.pay_key_fee = types.MethodType(moc_pay_key_fee, getstream) + # name='test' + # stream_info = None + # start = getstream.start(stream_info,name) + # self.clock.advance(1) + # self.clock.advance(1) + # with self.assertRaises(DownloadDataTimeout): + # yield start + # self.assertTrue(getstream.downloader.stop_called) + # self.assertTrue(getstream.pay_key_fee_called) @defer.inlineCallbacks def test_finish_one_blob(self): @@ -163,21 +158,20 @@ class GetStreamTests(unittest.TestCase): downloader, f_deferred = yield start self.assertTrue(getstream.pay_key_fee_called) - - @defer.inlineCallbacks - def test_finish_stopped_downloader(self): - """ - test that if we have a stopped downloader, beforfe a blob is downloaded, - start() returns - """ - getstream = self.init_getstream_with_mocs() - getstream._initialize = types.MethodType(moc_initialize, getstream) - getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream) - getstream._download = types.MethodType(moc_download, getstream) - name='test' - stream_info = None - start = getstream.start(stream_info,name) - - getstream.downloader.running_status = ManagedEncryptedFileDownloader.STATUS_STOPPED - self.clock.advance(1) - downloader, f_deferred = yield start + # @defer.inlineCallbacks + # def test_finish_stopped_downloader(self): + # """ + # test that if we have a stopped downloader, beforfe a blob is downloaded, + # start() returns + # """ + # getstream = self.init_getstream_with_mocs() + # getstream._initialize = types.MethodType(moc_initialize, getstream) + # getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream) + # getstream._download = types.MethodType(moc_download, getstream) + # name='test' + # stream_info = None + # start = getstream.start(stream_info,name) + # + # getstream.downloader.running_status = ManagedEncryptedFileDownloader.STATUS_STOPPED + # self.clock.advance(1) + # downloader, f_deferred = yield start From d2b91d5f51175c5d13351946a2a3ea08e2ee3905 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 27 Sep 2017 17:55:58 -0400 Subject: [PATCH 16/30] changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ceff6db7e..21ceeeff9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ at anytime. ### Fixed * Fixed handling cancelled blob and availability requests * Fixed redundant blob requests to a peer + * Fixed blob download history ### Deprecated * Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead. @@ -27,6 +28,8 @@ at anytime. ### Changed * Announcing by head blob is turned on by default * Updated reflector server dns + * Improved download analytics + * Improved download errors by distinguishing a data timeout from a sd timeout ### Added * Added WAL pragma to sqlite3 From 89ebed570ec091db797d814bd67caf4b141de39f Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Thu, 28 Sep 2017 11:04:36 -0400 Subject: [PATCH 17/30] fix test_timeout in test_Downloader --- tests/unit/lbrynet_daemon/test_Downloader.py | 40 ++++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/tests/unit/lbrynet_daemon/test_Downloader.py b/tests/unit/lbrynet_daemon/test_Downloader.py index 791e28c4d..231e277f4 100644 --- a/tests/unit/lbrynet_daemon/test_Downloader.py +++ b/tests/unit/lbrynet_daemon/test_Downloader.py @@ -115,26 +115,26 @@ class GetStreamTests(unittest.TestCase): yield getstream.start(stream_info,name) self.assertFalse(getstream.pay_key_fee_called) - # @defer.inlineCallbacks - # def test_timeout(self): - # """ - # test that timeout (set to 2 here) exception is raised - # when download times out while downloading first blob, and key fee is paid - # """ - # getstream = self.init_getstream_with_mocs() - # getstream._initialize = types.MethodType(moc_initialize, getstream) - # getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream) - # getstream._download = types.MethodType(moc_download, getstream) - # getstream.pay_key_fee = types.MethodType(moc_pay_key_fee, getstream) - # name='test' - # stream_info = None - # start = getstream.start(stream_info,name) - # self.clock.advance(1) - # self.clock.advance(1) - # with self.assertRaises(DownloadDataTimeout): - # yield start - # self.assertTrue(getstream.downloader.stop_called) - # self.assertTrue(getstream.pay_key_fee_called) + @defer.inlineCallbacks + def test_timeout(self): + """ + test that timeout (set to 3 here) exception is raised + when download times out while downloading first blob, and key fee is paid + """ + getstream = self.init_getstream_with_mocs() + getstream._initialize = types.MethodType(moc_initialize, getstream) + getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream) + getstream._download = types.MethodType(moc_download, getstream) + getstream.pay_key_fee = types.MethodType(moc_pay_key_fee, getstream) + name='test' + stream_info = None + start = getstream.start(stream_info,name) + self.clock.advance(1) + self.clock.advance(1) + self.clock.advance(1) + with self.assertRaises(DownloadDataTimeout): + yield start + self.assertTrue(getstream.pay_key_fee_called) @defer.inlineCallbacks def test_finish_one_blob(self): From 50b51569a3d5b919f206a9c2bbe2365cdaba79c2 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 28 Sep 2017 13:51:20 -0400 Subject: [PATCH 18/30] cancel streams on shutdown --- lbrynet/daemon/Daemon.py | 8 ++++++++ lbrynet/daemon/Downloader.py | 17 ++++++++++++----- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 521134f87..1155f0cab 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -392,6 +392,11 @@ class Daemon(AuthJSONRPCServer): def _already_shutting_down(sig_num, frame): log.info("Already shutting down") + def _stop_streams(self): + """stop pending GetStream downloads""" + for claim_id, stream in self.streams.iteritems(): + stream.cancel(reason="daemon shutdown") + def _shutdown(self): # ignore INT/TERM signals once shutdown has started signal.signal(signal.SIGINT, self._already_shutting_down) @@ -399,6 +404,9 @@ class Daemon(AuthJSONRPCServer): log.info("Closing lbrynet session") log.info("Status at time of shutdown: " + self.startup_status[0]) + + self._stop_streams() + self.looping_call_manager.shutdown() if self.analytics_manager: self.analytics_manager.shutdown() diff --git a/lbrynet/daemon/Downloader.py b/lbrynet/daemon/Downloader.py index 626c79947..53a164d58 100644 --- a/lbrynet/daemon/Downloader.py +++ b/lbrynet/daemon/Downloader.py @@ -151,9 +151,7 @@ class GetStream(object): def fail(self, err): safe_stop_looping_call(self.checker) - if not err.check(DownloadDataTimeout): - raise err - return DownloadCanceledError() + raise err @defer.inlineCallbacks def _initialize(self, stream_info): @@ -184,8 +182,7 @@ class GetStream(object): log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path) self.finished_deferred = self.downloader.start() - self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), - self.fail) + self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), self.fail) @defer.inlineCallbacks def start(self, stream_info, name): @@ -215,3 +212,13 @@ class GetStream(object): raise err defer.returnValue((self.downloader, self.finished_deferred)) + + def cancel(self, reason=None): + if reason: + msg = "download stream cancelled: %s" % reason + else: + msg = "download stream cancelled" + if self.finished_deferred and not self.finished_deferred.called: + self.finished_deferred.errback(DownloadCanceledError(msg)) + if self.data_downloading_deferred and not self.data_downloading_deferred.called: + self.data_downloading_deferred.errback(DownloadCanceledError(msg)) From 82a2805aaf663293e96e143915c69803871e2160 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 28 Sep 2017 13:52:30 -0400 Subject: [PATCH 19/30] json blobs --- lbrynet/daemon/Daemon.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 1155f0cab..5aba3c7d2 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -631,7 +631,7 @@ class Daemon(AuthJSONRPCServer): host = yield self.session.blob_manager.get_host_downloaded_from(blob_hash) if host: blobs[blob_num] = host - report["blobs"] = blobs + report["blobs"] = json.dumps(blobs) defer.returnValue(report) @defer.inlineCallbacks From 9fd60c823f9169039ff37b379cbb3ea71f9d243b Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 28 Sep 2017 13:53:12 -0400 Subject: [PATCH 20/30] add _download_failed errback --- lbrynet/daemon/Daemon.py | 43 ++++++++++++++++++++++++---------------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 5aba3c7d2..0bb3338bd 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -635,18 +635,25 @@ class Daemon(AuthJSONRPCServer): defer.returnValue(report) @defer.inlineCallbacks - def _download_finished(self, download_id, name, claim_dict): - report = yield self._get_stream_analytics_report(claim_dict) - self.analytics_manager.send_download_finished(download_id, name, report, claim_dict) - - @defer.inlineCallbacks - def _download_name(self, name, claim_dict, claim_id, timeout=None, file_name=None, - delete_on_timeout=True): + def _download_name(self, name, claim_dict, claim_id, timeout=None, file_name=None): """ Add a lbry file to the file manager, start the download, and return the new lbry file. If it already exists in the file manager, return the existing lbry file """ + @defer.inlineCallbacks + def _download_finished(download_id, name, claim_dict): + log.info("Finished: %s", name) + report = yield self._get_stream_analytics_report(claim_dict) + self.analytics_manager.send_download_finished(download_id, name, report, claim_dict) + + @defer.inlineCallbacks + def _download_failed(error, download_id, name, claim_dict): + log.warning("Failed %s: %s", name, error) + report = yield self._get_stream_analytics_report(claim_dict) + self.analytics_manager.send_download_errored(error, download_id, name, claim_dict, + report) + if claim_id in self.streams: downloader = self.streams[claim_id] result = yield downloader.finished_deferred @@ -662,18 +669,20 @@ class Daemon(AuthJSONRPCServer): file_name) try: lbry_file, finished_deferred = yield self.streams[claim_id].start(claim_dict, name) - finished_deferred.addCallback(lambda _: self._download_finished(download_id, name, - claim_dict)) + finished_deferred.addCallbacks(lambda _: _download_finished(download_id, name, + claim_dict), + lambda e: _download_failed(e, download_id, name, + claim_dict)) + result = yield self._get_lbry_file_dict(lbry_file, full_status=True) - except (DownloadDataTimeout, DownloadSDTimeout) as err: - log.warning('Failed to get %s (%s)', name, err) - report = yield self._get_stream_analytics_report(claim_dict) - if isinstance(err, DownloadDataTimeout) and delete_on_timeout: - yield self.lbry_file_manager.delete_lbry_file(self.streams[claim_id].downloader) - elif self.streams[claim_id].downloader: + except Exception as err: + if isinstance(err, (DownloadDataTimeout, DownloadSDTimeout)): + log.warning('Failed to get %s (%s)', name, err) + else: + log.error('Failed to get %s (%s)', name, err) + if self.streams[claim_id].downloader: yield self.streams[claim_id].downloader.stop(err) - self.analytics_manager.send_download_errored(err, download_id, name, claim_dict, - report) + yield _download_failed(err, download_id, name, claim_dict) result = {'error': err.message} del self.streams[claim_id] defer.returnValue(result) From b7bfb259e527eb7b08fa43a80768ce9703c8ea49 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 28 Sep 2017 13:54:01 -0400 Subject: [PATCH 21/30] fix download exceptions --- lbrynet/daemon/Daemon.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 0bb3338bd..6ab358bcf 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -612,14 +612,17 @@ class Daemon(AuthJSONRPCServer): sd_hash = claim_dict.source_hash try: stream_hash = yield self.stream_info_manager.get_stream_hash_for_sd_hash(sd_hash) - except NoSuchSDHash: + except Exception: stream_hash = None report = { "sd_hash": sd_hash, "stream_hash": stream_hash, } blobs = {} - sd_host = yield self.session.blob_manager.get_host_downloaded_from(sd_hash) + try: + sd_host = yield self.session.blob_manager.get_host_downloaded_from(sd_hash) + except Exception: + sd_host = None report["sd_blob"] = sd_host if stream_hash: blob_infos = yield self.stream_info_manager.get_blobs_for_stream(stream_hash) @@ -628,7 +631,10 @@ class Daemon(AuthJSONRPCServer): blob_infos = [] report["known_blobs"] = 0 for blob_hash, blob_num, iv, length in blob_infos: - host = yield self.session.blob_manager.get_host_downloaded_from(blob_hash) + try: + host = yield self.session.blob_manager.get_host_downloaded_from(blob_hash) + except Exception: + host = None if host: blobs[blob_num] = host report["blobs"] = json.dumps(blobs) @@ -643,13 +649,11 @@ class Daemon(AuthJSONRPCServer): @defer.inlineCallbacks def _download_finished(download_id, name, claim_dict): - log.info("Finished: %s", name) report = yield self._get_stream_analytics_report(claim_dict) self.analytics_manager.send_download_finished(download_id, name, report, claim_dict) @defer.inlineCallbacks def _download_failed(error, download_id, name, claim_dict): - log.warning("Failed %s: %s", name, error) report = yield self._get_stream_analytics_report(claim_dict) self.analytics_manager.send_download_errored(error, download_id, name, claim_dict, report) @@ -684,7 +688,8 @@ class Daemon(AuthJSONRPCServer): yield self.streams[claim_id].downloader.stop(err) yield _download_failed(err, download_id, name, claim_dict) result = {'error': err.message} - del self.streams[claim_id] + finally: + del self.streams[claim_id] defer.returnValue(result) @defer.inlineCallbacks From c6db4b187aa1a41a330310dccf1720cb4b5a8e51 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 28 Sep 2017 15:45:24 -0400 Subject: [PATCH 22/30] fix error name --- lbrynet/analytics.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lbrynet/analytics.py b/lbrynet/analytics.py index 855bcfcee..4b41e0804 100644 --- a/lbrynet/analytics.py +++ b/lbrynet/analytics.py @@ -171,11 +171,15 @@ class Manager(object): @staticmethod def _download_error_properties(error, id_, name, claim_dict, report): + def error_name(err): + if not hasattr(type(err), "__name__"): + return str(type(err)) + return type(err).__name__ return { 'download_id': id_, 'name': name, 'stream_info': claim_dict.source_hash, - 'error': str(type(error)), + 'error': error_name(error), 'reason': error.message, 'report': report } From 2ebb9da10833f8e67424ae31ce1558107a33d699 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 28 Sep 2017 15:46:02 -0400 Subject: [PATCH 23/30] move download_failed --- lbrynet/daemon/Daemon.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 6ab358bcf..63d7d1461 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -680,13 +680,13 @@ class Daemon(AuthJSONRPCServer): result = yield self._get_lbry_file_dict(lbry_file, full_status=True) except Exception as err: + yield _download_failed(err, download_id, name, claim_dict) if isinstance(err, (DownloadDataTimeout, DownloadSDTimeout)): log.warning('Failed to get %s (%s)', name, err) else: log.error('Failed to get %s (%s)', name, err) if self.streams[claim_id].downloader: yield self.streams[claim_id].downloader.stop(err) - yield _download_failed(err, download_id, name, claim_dict) result = {'error': err.message} finally: del self.streams[claim_id] From 89ef5620a4f88723fd393df6cb402922b6dca7e7 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 28 Sep 2017 16:08:15 -0400 Subject: [PATCH 24/30] Bump version 0.17.0rc5 --> 0.17.0rc6 Signed-off-by: Jack Robison --- lbrynet/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/__init__.py b/lbrynet/__init__.py index 71b24a30f..8cb0f0961 100644 --- a/lbrynet/__init__.py +++ b/lbrynet/__init__.py @@ -1,6 +1,6 @@ import logging -__version__ = "0.17.0rc5" +__version__ = "0.17.0rc6" version = tuple(__version__.split('.')) logging.getLogger(__name__).addHandler(logging.NullHandler()) From b85a813aaff636400d8089b21f3a74f31735152b Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 28 Sep 2017 17:36:55 -0400 Subject: [PATCH 25/30] fix sd timeout --- lbrynet/daemon/Downloader.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/lbrynet/daemon/Downloader.py b/lbrynet/daemon/Downloader.py index 53a164d58..e3820521d 100644 --- a/lbrynet/daemon/Downloader.py +++ b/lbrynet/daemon/Downloader.py @@ -6,7 +6,7 @@ from twisted.internet.task import LoopingCall from lbryschema.fee import Fee from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed -from lbrynet.core.Error import DownloadDataTimeout, DownloadCanceledError +from lbrynet.core.Error import DownloadDataTimeout, DownloadCanceledError, DownloadSDTimeout from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call from lbrynet.core.StreamDescriptor import download_sd_blob from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory @@ -66,7 +66,7 @@ class GetStream(object): if self.data_downloading_deferred.called: safe_stop_looping_call(self.checker) else: - log.info("Waiting for stream data (%i seconds)", self.timeout_counter) + log.debug("Waiting for stream data (%i seconds)", self.timeout_counter) def check_status(self): """ @@ -75,12 +75,17 @@ class GetStream(object): self.timeout_counter += 1 if self.timeout_counter > self.timeout: if not self.data_downloading_deferred.called: - self.data_downloading_deferred.errback(DownloadDataTimeout(self.sd_hash)) - + if self.downloader: + err = DownloadDataTimeout(self.sd_hash) + else: + err = DownloadSDTimeout(self.sd_hash) + self.data_downloading_deferred.errback(err) safe_stop_looping_call(self.checker) - else: + elif self.downloader: d = self.downloader.status() d.addCallback(self._check_status) + else: + log.debug("Waiting for stream descriptor (%i seconds)", self.timeout_counter) def convert_max_fee(self): currency, amount = self.max_key_fee['currency'], self.max_key_fee['amount'] @@ -179,7 +184,6 @@ class GetStream(object): def _download(self, sd_blob, name, key_fee): self.downloader = yield self._create_downloader(sd_blob) yield self.pay_key_fee(key_fee, name) - log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path) self.finished_deferred = self.downloader.start() self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), self.fail) @@ -198,12 +202,12 @@ class GetStream(object): self.set_status(INITIALIZING_CODE, name) key_fee = yield self._initialize(stream_info) + safe_start_looping_call(self.checker, 1) self.set_status(DOWNLOAD_METADATA_CODE, name) sd_blob = yield self._download_sd_blob() yield self._download(sd_blob, name, key_fee) self.set_status(DOWNLOAD_RUNNING_CODE, name) - safe_start_looping_call(self.checker, 1) try: yield self.data_downloading_deferred @@ -218,7 +222,5 @@ class GetStream(object): msg = "download stream cancelled: %s" % reason else: msg = "download stream cancelled" - if self.finished_deferred and not self.finished_deferred.called: - self.finished_deferred.errback(DownloadCanceledError(msg)) if self.data_downloading_deferred and not self.data_downloading_deferred.called: self.data_downloading_deferred.errback(DownloadCanceledError(msg)) From 5ac60cf2c215f2c26fe92b385da7483415e17a82 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 28 Sep 2017 17:46:47 -0400 Subject: [PATCH 26/30] Bump version 0.17.0rc6 --> 0.17.0rc7 Signed-off-by: Jack Robison --- lbrynet/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/__init__.py b/lbrynet/__init__.py index 8cb0f0961..82d5cfb40 100644 --- a/lbrynet/__init__.py +++ b/lbrynet/__init__.py @@ -1,6 +1,6 @@ import logging -__version__ = "0.17.0rc6" +__version__ = "0.17.0rc7" version = tuple(__version__.split('.')) logging.getLogger(__name__).addHandler(logging.NullHandler()) From 4c1c3592b7ed9d9972f16f001e45d473ff44f71b Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 29 Sep 2017 12:31:34 -0400 Subject: [PATCH 27/30] changelog --- CHANGELOG.md | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 21ceeeff9..57d5a3662 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,7 +19,6 @@ at anytime. ### Fixed * Fixed handling cancelled blob and availability requests * Fixed redundant blob requests to a peer - * Fixed blob download history ### Deprecated * Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead. @@ -28,8 +27,6 @@ at anytime. ### Changed * Announcing by head blob is turned on by default * Updated reflector server dns - * Improved download analytics - * Improved download errors by distinguishing a data timeout from a sd timeout ### Added * Added WAL pragma to sqlite3 @@ -47,6 +44,20 @@ at anytime. * Removed unused `EncryptedFileOpener` +## [0.16.3] - 2017-09-28 +### Fixed + * Fixed blob download history + +### Changed + * Improved download analytics + * Improved download errors by distinguishing a data timeout from a sd timeout + + +## [0.16.2] - 2017-09-26 +### Fixed + * Fixed https://github.com/lbryio/lbry/issues/771 (handle when a certificate is missing for a signed claim in `claim_list_mine`) + + ## [0.16.1] - 2017-09-20 ### Fixed * Fixed `transaction_list` doc string @@ -55,6 +66,7 @@ at anytime. ### Changed * Bumped `lbryum` requirement to 3.1.8 [see changelog](https://github.com/lbryio/lbryum/blob/master/CHANGELOG.md#318---2017-09-20) + ## [0.16.0] - 2017-09-18 ### Fixed * Fixed uncaught error when shutting down after a failed daemon startup From 6a7982022a9ada254f54b5a55b3d42091ed56e4a Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 29 Sep 2017 12:37:57 -0400 Subject: [PATCH 28/30] Bump version 0.17.0rc7 --> 0.17.0rc8 Signed-off-by: Jack Robison --- lbrynet/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/__init__.py b/lbrynet/__init__.py index 82d5cfb40..2e08aadf6 100644 --- a/lbrynet/__init__.py +++ b/lbrynet/__init__.py @@ -1,6 +1,6 @@ import logging -__version__ = "0.17.0rc7" +__version__ = "0.17.0rc8" version = tuple(__version__.split('.')) logging.getLogger(__name__).addHandler(logging.NullHandler()) From aa3982f0e2043d2c0b0523fe8cdb98c6179ef6cc Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 2 Oct 2017 11:21:51 -0400 Subject: [PATCH 29/30] temporarily disable blob analytics --- lbrynet/daemon/Daemon.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 48124050d..2555a5821 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -630,14 +630,14 @@ class Daemon(AuthJSONRPCServer): else: blob_infos = [] report["known_blobs"] = 0 - for blob_hash, blob_num, iv, length in blob_infos: - try: - host = yield self.session.blob_manager.get_host_downloaded_from(blob_hash) - except Exception: - host = None - if host: - blobs[blob_num] = host - report["blobs"] = json.dumps(blobs) + # for blob_hash, blob_num, iv, length in blob_infos: + # try: + # host = yield self.session.blob_manager.get_host_downloaded_from(blob_hash) + # except Exception: + # host = None + # if host: + # blobs[blob_num] = host + # report["blobs"] = json.dumps(blobs) defer.returnValue(report) @defer.inlineCallbacks From a0817840c53f676cc0ae211d53eddae24ef09d52 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 2 Oct 2017 11:22:44 -0400 Subject: [PATCH 30/30] Bump version 0.17.0rc8 --> 0.17.0rc9 Signed-off-by: Jack Robison --- lbrynet/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/__init__.py b/lbrynet/__init__.py index 2e08aadf6..601dacab0 100644 --- a/lbrynet/__init__.py +++ b/lbrynet/__init__.py @@ -1,6 +1,6 @@ import logging -__version__ = "0.17.0rc8" +__version__ = "0.17.0rc9" version = tuple(__version__.split('.')) logging.getLogger(__name__).addHandler(logging.NullHandler())