From 09e25ba285e65bb07575fbc571f047ee9b9a9b78 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Tue, 26 Sep 2017 00:13:47 -0400 Subject: [PATCH 1/8] add reader, for BlobFile, change original reader to v0 --- lbrynet/blob/blob_file.py | 4 ++-- lbrynet/blob/reader.py | 23 ++++++++++++++++++++++- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index f1a2010ee..56e3a4ed6 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -9,7 +9,7 @@ from lbrynet import conf from lbrynet.core.Error import DownloadCanceledError, InvalidDataError, InvalidBlobHashError from lbrynet.core.utils import is_valid_blobhash from lbrynet.blob.writer import HashBlobWriter -from lbrynet.blob.reader import HashBlobReader +from lbrynet.blob.reader import HashBlobReader, HashBlobReader_v0 log = logging.getLogger(__name__) @@ -155,7 +155,7 @@ class BlobFile(object): return args[0] file_sender = FileSender() - reader = HashBlobReader(write_func) + reader = HashBlobReader_v0(write_func) file_handle = self.open_for_reading() if file_handle is not None: d = file_sender.beginFileTransfer(file_handle, reader) diff --git a/lbrynet/blob/reader.py b/lbrynet/blob/reader.py index c85cc38f3..fb4724d3a 100644 --- a/lbrynet/blob/reader.py +++ b/lbrynet/blob/reader.py @@ -5,7 +5,7 @@ from zope.interface import implements log = logging.getLogger(__name__) -class HashBlobReader(object): +class HashBlobReader_v0(object): implements(interfaces.IConsumer) def __init__(self, write_func): @@ -28,3 +28,24 @@ class HashBlobReader(object): self.write_func(data) if self.streaming is False: reactor.callLater(0, self.producer.resumeProducing) + +class HashBlobReader(object): + def __init__(self, file_path, finished_cb): + self.finished_cb = finished_cb + self.finished_cb_d = None + self.read_handle = open(file_path, 'rb') + + def __del__(self): + self.close() + + def read(self, size=-1): + return self.read_handle.read(size) + + def close(self): + # if we've already closed and called finished_cb, do nothing + if self.finished_cb_d is not None: + return + self.read_handle.close() + self.finished_cb_d = self.finished_cb(self) + + From 3b5690614c1794f5c53e6d3893ac027e0b79a429 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Wed, 27 Sep 2017 17:00:21 -0400 Subject: [PATCH 2/8] Return HashBlobReader instead of file handle in open_for_reading() --- lbrynet/blob/blob_file.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index 56e3a4ed6..a1095d528 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -75,19 +75,17 @@ class BlobFile(object): """ open blob for reading - returns a file handle that can be read() from. - once finished with the file handle, user must call close_read_handle() - otherwise blob cannot be deleted. + returns a file like object that can be read() from, and closed() when + finished """ if self._verified is True: - file_handle = None try: - file_handle = open(self.file_path, 'rb') + reader = HashBlobReader(self.file_path, self.reader_finished) self.readers += 1 - return file_handle + return reader except IOError: log.exception('Failed to open %s', self.file_path) - self.close_read_handle(file_handle) + reader.close() return None def delete(self): @@ -164,6 +162,10 @@ class BlobFile(object): d = defer.fail(IOError("Could not read the blob")) return d + def reader_finished(self, reader): + self.readers -= 1 + return defer.succeed(True) + def writer_finished(self, writer, err=None): def fire_finished_deferred(): self._verified = True From f6da00cbaba33c4ad05862f9aee8d5fca95ec5a2 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Wed, 27 Sep 2017 16:53:31 -0400 Subject: [PATCH 3/8] no need to catch IOError exception here, let it propagage --- lbrynet/blob/blob_file.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index a1095d528..c73f14ca1 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -79,13 +79,9 @@ class BlobFile(object): finished """ if self._verified is True: - try: - reader = HashBlobReader(self.file_path, self.reader_finished) - self.readers += 1 - return reader - except IOError: - log.exception('Failed to open %s', self.file_path) - reader.close() + reader = HashBlobReader(self.file_path, self.reader_finished) + self.readers += 1 + return reader return None def delete(self): From aab43c8d6cc78a83d6e26535bbc3551dd3a14d9c Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Wed, 27 Sep 2017 17:00:51 -0400 Subject: [PATCH 4/8] close the reader directly instead of calling close_read_handle() --- lbrynet/core/StreamDescriptor.py | 2 +- lbrynet/core/server/BlobRequestHandler.py | 4 ++-- lbrynet/daemon/Daemon.py | 4 ++-- lbrynet/reflector/client/blob.py | 3 ++- lbrynet/reflector/client/client.py | 2 +- 5 files changed, 8 insertions(+), 7 deletions(-) diff --git a/lbrynet/core/StreamDescriptor.py b/lbrynet/core/StreamDescriptor.py index b3fb714cb..d0bd28310 100644 --- a/lbrynet/core/StreamDescriptor.py +++ b/lbrynet/core/StreamDescriptor.py @@ -55,7 +55,7 @@ class BlobStreamDescriptorReader(StreamDescriptorReader): f = self.blob.open_for_reading() if f is not None: raw_data = f.read() - self.blob.close_read_handle(f) + f.close() return raw_data else: raise ValueError("Could not open the blob for reading") diff --git a/lbrynet/core/server/BlobRequestHandler.py b/lbrynet/core/server/BlobRequestHandler.py index 308d0c822..e075fe086 100644 --- a/lbrynet/core/server/BlobRequestHandler.py +++ b/lbrynet/core/server/BlobRequestHandler.py @@ -89,7 +89,7 @@ class BlobRequestHandler(object): def cancel_send(self, err): if self.currently_uploading is not None: - self.currently_uploading.close_read_handle(self.read_handle) + self.read_handle.close() self.read_handle = None self.currently_uploading = None return err @@ -225,7 +225,7 @@ class BlobRequestHandler(object): def set_not_uploading(reason=None): if self.currently_uploading is not None: - self.currently_uploading.close_read_handle(self.read_handle) + self.read_handle.close() self.read_handle = None self.currently_uploading = None self.file_sender = None diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index f77f183c0..a56e3d910 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -2375,7 +2375,7 @@ class Daemon(AuthJSONRPCServer): if encoding and encoding in decoders: blob_file = blob.open_for_reading() result = decoders[encoding](blob_file.read()) - blob.close_read_handle(blob_file) + blob_file.close() else: result = "Downloaded blob %s" % blob_hash @@ -2624,7 +2624,7 @@ class Daemon(AuthJSONRPCServer): def read_sd_blob(sd_blob): sd_blob_file = sd_blob.open_for_reading() decoded_sd_blob = json.loads(sd_blob_file.read()) - sd_blob.close_read_handle(sd_blob_file) + sd_blob_file.close() return decoded_sd_blob resolved_result = yield self.session.wallet.resolve(uri) diff --git a/lbrynet/reflector/client/blob.py b/lbrynet/reflector/client/blob.py index 1f1c540a2..d2c41ce30 100644 --- a/lbrynet/reflector/client/blob.py +++ b/lbrynet/reflector/client/blob.py @@ -94,7 +94,7 @@ class BlobReflectorClient(Protocol): def set_not_uploading(self): if self.next_blob_to_send is not None: - self.next_blob_to_send.close_read_handle(self.read_handle) + self.read_handle.close() self.read_handle = None self.next_blob_to_send = None self.file_sender = None @@ -105,6 +105,7 @@ class BlobReflectorClient(Protocol): assert self.read_handle is not None, \ "self.read_handle was None when trying to start the transfer" d = self.file_sender.beginFileTransfer(self.read_handle, self) + d.addCallback(lambda _: self.read_handle.close()) return d def handle_handshake_response(self, response_dict): diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index ebf605b02..660abbd39 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -179,7 +179,7 @@ class EncryptedFileReflectorClient(Protocol): def set_not_uploading(self): if self.next_blob_to_send is not None: log.debug("Close %s", self.next_blob_to_send) - self.next_blob_to_send.close_read_handle(self.read_handle) + self.read_handle.close() self.read_handle = None self.next_blob_to_send = None if self.file_sender is not None: From a4ea49cf6588c4079b11465e8416d4b378512476 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Wed, 27 Sep 2017 17:02:17 -0400 Subject: [PATCH 5/8] read handle was not being closed after file transfer --- lbrynet/reflector/client/client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index 660abbd39..7e0060f93 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -191,6 +191,7 @@ class EncryptedFileReflectorClient(Protocol): assert self.read_handle is not None, \ "self.read_handle was None when trying to start the transfer" d = self.file_sender.beginFileTransfer(self.read_handle, self) + d.addCallback(lambda _: self.read_handle.close()) return d def handle_handshake_response(self, response_dict): From 533835a7321bf034d05f04d8a4104211630b5b3b Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Wed, 27 Sep 2017 16:56:29 -0400 Subject: [PATCH 6/8] add comments and docstrings --- lbrynet/blob/blob_file.py | 18 +++++++++++++----- lbrynet/blob/reader.py | 8 ++++++++ 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index c73f14ca1..993a77400 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -144,6 +144,10 @@ class BlobFile(object): return False def read(self, write_func): + """ + This function is only used in StreamBlobDecryptor + and should be deprecated in favor of open_for_reading() + """ def close_self(*args): self.close_read_handle(file_handle) return args[0] @@ -158,6 +162,15 @@ class BlobFile(object): d = defer.fail(IOError("Could not read the blob")) return d + def close_read_handle(self, file_handle): + """ + This function is only used in StreamBlobDecryptor + and should be deprecated in favor of open_for_reading() + """ + if file_handle is not None: + file_handle.close() + self.readers -= 1 + def reader_finished(self, reader): self.readers -= 1 return defer.succeed(True) @@ -206,11 +219,6 @@ class BlobFile(object): d.addBoth(lambda _: writer.close_handle()) return d - def close_read_handle(self, file_handle): - if file_handle is not None: - file_handle.close() - self.readers -= 1 - @defer.inlineCallbacks def _save_verified_blob(self, writer): with self.setting_verified_blob_lock: diff --git a/lbrynet/blob/reader.py b/lbrynet/blob/reader.py index fb4724d3a..292516a02 100644 --- a/lbrynet/blob/reader.py +++ b/lbrynet/blob/reader.py @@ -6,6 +6,10 @@ log = logging.getLogger(__name__) class HashBlobReader_v0(object): + """ + This is a class that is only used in StreamBlobDecryptor + and should be deprecated + """ implements(interfaces.IConsumer) def __init__(self, write_func): @@ -30,6 +34,10 @@ class HashBlobReader_v0(object): reactor.callLater(0, self.producer.resumeProducing) class HashBlobReader(object): + """ + This is a file like reader class that supports + read(size) and close() + """ def __init__(self, file_path, finished_cb): self.finished_cb = finished_cb self.finished_cb_d = None From cf3a0e2e9d93dc4dbd8ddaa862c20cb80db698b7 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Tue, 26 Sep 2017 10:54:20 -0400 Subject: [PATCH 7/8] add some more tests for deleting blob files --- tests/unit/core/test_HashBlob.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/tests/unit/core/test_HashBlob.py b/tests/unit/core/test_HashBlob.py index d1c282478..93a19ef19 100644 --- a/tests/unit/core/test_HashBlob.py +++ b/tests/unit/core/test_HashBlob.py @@ -44,9 +44,15 @@ class BlobFileTest(unittest.TestCase): blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) self.assertTrue(blob_file.verified) f = blob_file.open_for_reading() + self.assertEqual(1, blob_file.readers) c = f.read() self.assertEqual(c, self.fake_content) + # close reader + f.close() + self.assertEqual(0, blob_file.readers) + + @defer.inlineCallbacks def test_delete(self): blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) @@ -58,6 +64,21 @@ class BlobFileTest(unittest.TestCase): blob_file = BlobFile(self.blob_dir, self.fake_content_hash) self.assertFalse(blob_file.verified) + @defer.inlineCallbacks + def test_delete_fail(self): + # deletes should fail if being written to + blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) + writer, finished_d = blob_file.open_for_writing(peer=1) + yield self.assertFailure(blob_file.delete(), ValueError) + writer.write(self.fake_content) + writer.close() + + # deletes should fail if being read and not closed + blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) + self.assertTrue(blob_file.verified) + f = blob_file.open_for_reading() + yield self.assertFailure(blob_file.delete(), ValueError) + @defer.inlineCallbacks def test_too_much_write(self): # writing too much data should result in failure From 8c41b9ecdea9065d74142a825905fae596285147 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Wed, 27 Sep 2017 17:41:52 -0400 Subject: [PATCH 8/8] added changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ceff6db7e..7aaafa3a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ at anytime. ### Fixed * Fixed handling cancelled blob and availability requests * Fixed redundant blob requests to a peer + * Fixed https://github.com/lbryio/lbry/issues/923 ### Deprecated * Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead.