From 34bd9e5cb408d1ce78ac4f7f47d7c4c7f09c6a66 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 21 Mar 2022 04:26:27 -0300 Subject: [PATCH 1/6] exclude sd blobs from calculation and make them be picked last on removal --- lbry/extras/daemon/storage.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index e5c9ec67c..c53f81a66 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -449,7 +449,8 @@ class SQLiteStorage(SQLiteMixin): return await self.db.execute_fetchall( "select blob.blob_hash, blob.blob_length, blob.added_on " "from blob left join stream_blob using (blob_hash) " - "where stream_blob.stream_hash is null and blob.is_mine=? order by blob.added_on asc", + "where stream_blob.stream_hash is null and blob.is_mine=?" + "order by blob.blob_length desc, blob.added_on asc", (is_mine,) ) @@ -479,7 +480,7 @@ class SQLiteStorage(SQLiteMixin): coalesce(sum(case when is_mine=1 then blob_length else 0 end), 0) as private_storage - from blob left join stream_blob using (blob_hash) + from blob left join stream_blob using (blob_hash) where blob_hash not in (select sd_hash from stream) """) return { 'network_storage': network_size, From c5e2f19dde86e130b4fbdb9d1267a807ddeb3823 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 21 Mar 2022 04:38:51 -0300 Subject: [PATCH 2/6] fix bug where added_on is always 0 for downloads --- lbry/blob/blob_info.py | 3 ++- tests/integration/datanetwork/test_file_commands.py | 6 ++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/lbry/blob/blob_info.py b/lbry/blob/blob_info.py index 7d4bc71dd..1058d1285 100644 --- a/lbry/blob/blob_info.py +++ b/lbry/blob/blob_info.py @@ -1,3 +1,4 @@ +import time import typing @@ -18,7 +19,7 @@ class BlobInfo: self.blob_num = blob_num self.length = length self.iv = iv - self.added_on = added_on + self.added_on = added_on or time.time() self.is_mine = is_mine def as_dict(self) -> typing.Dict: diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index a57718f6b..7472527e5 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -573,6 +573,12 @@ class DiskSpaceManagement(CommandTestCase): self.assertTrue(blobs2.issubset(blobs)) self.assertFalse(blobs3.issubset(blobs)) self.assertTrue(blobs4.issubset(blobs)) + # check that added_on gets set on downloads (was a bug) + self.assertLess(0, await self.daemon.storage.run_and_return_one_or_none("select min(added_on) from blob")) + await self.daemon.jsonrpc_file_delete(delete_all=True) + await self.daemon.jsonrpc_get("foo4", save_file=False) + self.assertLess(0, await self.daemon.storage.run_and_return_one_or_none("select min(added_on) from blob")) + class TestBackgroundDownloaderComponent(CommandTestCase): From aac72fa51296c6e7b1fbb35645697cafb6d8cab1 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 21 Mar 2022 21:33:33 -0300 Subject: [PATCH 3/6] fix bug where recovery doesnt update blob status --- lbry/blob/blob_manager.py | 12 +++++++++--- lbry/stream/stream_manager.py | 4 ++++ tests/unit/stream/test_stream_manager.py | 25 +++++++++++++++++++++--- 3 files changed, 35 insertions(+), 6 deletions(-) diff --git a/lbry/blob/blob_manager.py b/lbry/blob/blob_manager.py index 52441ecfb..15709be08 100644 --- a/lbry/blob/blob_manager.py +++ b/lbry/blob/blob_manager.py @@ -113,9 +113,15 @@ class BlobManager: (blob.blob_hash, blob.length, blob.added_on, blob.is_mine), finished=False) ) - def check_completed_blobs(self, blob_hashes: typing.List[str]) -> typing.List[str]: - """Returns of the blobhashes_to_check, which are valid""" - return [blob_hash for blob_hash in blob_hashes if self.is_blob_verified(blob_hash)] + def ensure_completed_blobs_status(self, blob_hashes: typing.List[str]) -> asyncio.Task: + """Ensures that completed blobs from a given list of blob hashes are set as 'finished' in the database.""" + to_add = [] + for blob_hash in blob_hashes: + if not self.is_blob_verified(blob_hash): + continue + blob = self.get_blob(blob_hash) + to_add.append((blob.blob_hash, blob.length, blob.added_on, blob.is_mine)) + return self.loop.create_task(self.storage.add_blobs(*to_add, finished=True)) def delete_blob(self, blob_hash: str): if not is_valid_blobhash(blob_hash): diff --git a/lbry/stream/stream_manager.py b/lbry/stream/stream_manager.py index 72fd1414a..6b4f705ff 100644 --- a/lbry/stream/stream_manager.py +++ b/lbry/stream/stream_manager.py @@ -70,6 +70,7 @@ class StreamManager(SourceManager): async def recover_streams(self, file_infos: typing.List[typing.Dict]): to_restore = [] + to_check = [] async def recover_stream(sd_hash: str, stream_hash: str, stream_name: str, suggested_file_name: str, key: str, @@ -82,6 +83,7 @@ class StreamManager(SourceManager): if not descriptor: return to_restore.append((descriptor, sd_blob, content_fee)) + to_check.extend([sd_blob.blob_hash] + [blob.blob_hash for blob in descriptor.blobs[:-1]]) await asyncio.gather(*[ recover_stream( @@ -93,6 +95,8 @@ class StreamManager(SourceManager): if to_restore: await self.storage.recover_streams(to_restore, self.config.download_dir) + if to_check: + await self.blob_manager.ensure_completed_blobs_status(to_check) # if self.blob_manager._save_blobs: # log.info("Recovered %i/%i attempted streams", len(to_restore), len(file_infos)) diff --git a/tests/unit/stream/test_stream_manager.py b/tests/unit/stream/test_stream_manager.py index a6767509f..76e2a5c04 100644 --- a/tests/unit/stream/test_stream_manager.py +++ b/tests/unit/stream/test_stream_manager.py @@ -451,16 +451,16 @@ class TestStreamManager(BlobExchangeTestBase): await asyncio.sleep(0, loop=self.loop) self.stream_manager.stop() self.client_blob_manager.stop() + # partial removal, only sd blob is missing. + # in this case, we recover the sd blob while the other blobs are kept untouched as 'finished' os.remove(os.path.join(self.client_blob_manager.blob_dir, stream.sd_hash)) - for blob in stream.descriptor.blobs[:-1]: - os.remove(os.path.join(self.client_blob_manager.blob_dir, blob.blob_hash)) await self.client_blob_manager.setup() await self.stream_manager.start() self.assertEqual(1, len(self.stream_manager.streams)) self.assertListEqual([self.sd_hash], list(self.stream_manager.streams.keys())) for blob_hash in [stream.sd_hash] + [b.blob_hash for b in stream.descriptor.blobs[:-1]]: blob_status = await self.client_storage.get_blob_status(blob_hash) - self.assertEqual('pending', blob_status) + self.assertEqual('finished', blob_status) self.assertEqual('finished', self.stream_manager.streams[self.sd_hash].status) sd_blob = self.client_blob_manager.get_blob(stream.sd_hash) @@ -468,5 +468,24 @@ class TestStreamManager(BlobExchangeTestBase): self.assertTrue(sd_blob.get_is_verified()) self.assertListEqual(expected_analytics_events, received_events) + # full removal, check that status is preserved (except sd blob, which was written) + self.client_blob_manager.stop() + os.remove(os.path.join(self.client_blob_manager.blob_dir, stream.sd_hash)) + for blob in stream.descriptor.blobs[:-1]: + os.remove(os.path.join(self.client_blob_manager.blob_dir, blob.blob_hash)) + await self.client_blob_manager.setup() + await self.stream_manager.start() + for blob_hash in [b.blob_hash for b in stream.descriptor.blobs[:-1]]: + blob_status = await self.client_storage.get_blob_status(blob_hash) + self.assertEqual('pending', blob_status) + # sd blob was recovered + sd_blob = self.client_blob_manager.get_blob(stream.sd_hash) + self.assertTrue(sd_blob.file_exists) + self.assertTrue(sd_blob.get_is_verified()) + self.assertListEqual(expected_analytics_events, received_events) + # db reflects that too + blob_status = await self.client_storage.get_blob_status(stream.sd_hash) + self.assertEqual('finished', blob_status) + def test_download_then_recover_old_sort_stream_on_startup(self): return self.test_download_then_recover_stream_on_startup(old_sort=True) From c9c2495611ff05bc2d585de82c7a53c63588d188 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 21 Mar 2022 21:58:36 -0300 Subject: [PATCH 4/6] if a blob file exists but is pending on db, fix on startup --- lbry/blob/blob_manager.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/lbry/blob/blob_manager.py b/lbry/blob/blob_manager.py index 15709be08..566306945 100644 --- a/lbry/blob/blob_manager.py +++ b/lbry/blob/blob_manager.py @@ -83,6 +83,8 @@ class BlobManager: to_add = await self.storage.sync_missing_blobs(in_blobfiles_dir) if to_add: self.completed_blob_hashes.update(to_add) + # check blobs that aren't set as finished but were seen on disk + await self.ensure_completed_blobs_status(in_blobfiles_dir - to_add) if self.config.track_bandwidth: self.connection_manager.start() return True @@ -113,7 +115,7 @@ class BlobManager: (blob.blob_hash, blob.length, blob.added_on, blob.is_mine), finished=False) ) - def ensure_completed_blobs_status(self, blob_hashes: typing.List[str]) -> asyncio.Task: + async def ensure_completed_blobs_status(self, blob_hashes: typing.Iterable[str]): """Ensures that completed blobs from a given list of blob hashes are set as 'finished' in the database.""" to_add = [] for blob_hash in blob_hashes: @@ -121,7 +123,10 @@ class BlobManager: continue blob = self.get_blob(blob_hash) to_add.append((blob.blob_hash, blob.length, blob.added_on, blob.is_mine)) - return self.loop.create_task(self.storage.add_blobs(*to_add, finished=True)) + if len(to_add) > 500: + await self.storage.add_blobs(*to_add, finished=True) + to_add.clear() + return await self.storage.add_blobs(*to_add, finished=True) def delete_blob(self, blob_hash: str): if not is_valid_blobhash(blob_hash): From cb78e95e3dfc985bb5cf52737ea62a7a02d936b2 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 23 Mar 2022 13:39:56 -0300 Subject: [PATCH 5/6] add missing space on query, typo --- lbry/extras/daemon/storage.py | 2 +- tests/integration/datanetwork/test_file_commands.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index c53f81a66..f12a53616 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -449,7 +449,7 @@ class SQLiteStorage(SQLiteMixin): return await self.db.execute_fetchall( "select blob.blob_hash, blob.blob_length, blob.added_on " "from blob left join stream_blob using (blob_hash) " - "where stream_blob.stream_hash is null and blob.is_mine=?" + "where stream_blob.stream_hash is null and blob.is_mine=? " "order by blob.blob_length desc, blob.added_on asc", (is_mine,) ) diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 7472527e5..6772a1ae5 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -580,7 +580,6 @@ class DiskSpaceManagement(CommandTestCase): self.assertLess(0, await self.daemon.storage.run_and_return_one_or_none("select min(added_on) from blob")) - class TestBackgroundDownloaderComponent(CommandTestCase): async def get_blobs_from_sd_blob(self, sd_blob): descriptor = await StreamDescriptor.from_stream_descriptor_blob( From 200761ff139d10fb512465f2e48b4406862fde75 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 23 Mar 2022 13:41:02 -0300 Subject: [PATCH 6/6] make added_on a required parameter on BlobInfo, fix callers --- lbry/blob/blob_file.py | 2 +- lbry/blob/blob_info.py | 7 +++---- lbry/extras/daemon/migrator/migrate8to9.py | 2 +- lbry/extras/daemon/storage.py | 8 +++++--- lbry/stream/descriptor.py | 5 +++-- tests/unit/database/test_SQLiteStorage.py | 4 ++-- 6 files changed, 15 insertions(+), 13 deletions(-) diff --git a/lbry/blob/blob_file.py b/lbry/blob/blob_file.py index 8b7b9ee49..62ae64ca5 100644 --- a/lbry/blob/blob_file.py +++ b/lbry/blob/blob_file.py @@ -201,7 +201,7 @@ class AbstractBlob: writer = blob.get_blob_writer() writer.write(blob_bytes) await blob.verified.wait() - return BlobInfo(blob_num, length, binascii.hexlify(iv).decode(), blob_hash, added_on, is_mine) + return BlobInfo(blob_num, length, binascii.hexlify(iv).decode(), added_on, blob_hash, is_mine) def save_verified_blob(self, verified_bytes: bytes): if self.verified.is_set(): diff --git a/lbry/blob/blob_info.py b/lbry/blob/blob_info.py index 1058d1285..09ca6c546 100644 --- a/lbry/blob/blob_info.py +++ b/lbry/blob/blob_info.py @@ -1,4 +1,3 @@ -import time import typing @@ -13,13 +12,13 @@ class BlobInfo: ] def __init__( - self, blob_num: int, length: int, iv: str, - blob_hash: typing.Optional[str] = None, added_on=0, is_mine=False): + self, blob_num: int, length: int, iv: str, added_on, + blob_hash: typing.Optional[str] = None, is_mine=False): self.blob_hash = blob_hash self.blob_num = blob_num self.length = length self.iv = iv - self.added_on = added_on or time.time() + self.added_on = added_on self.is_mine = is_mine def as_dict(self) -> typing.Dict: diff --git a/lbry/extras/daemon/migrator/migrate8to9.py b/lbry/extras/daemon/migrator/migrate8to9.py index c9aee0a37..888370ace 100644 --- a/lbry/extras/daemon/migrator/migrate8to9.py +++ b/lbry/extras/daemon/migrator/migrate8to9.py @@ -20,7 +20,7 @@ def do_migration(conf): "left outer join blob b ON b.blob_hash=s.blob_hash order by s.position").fetchall() blobs_by_stream = {} for stream_hash, position, iv, blob_hash, blob_length in blobs: - blobs_by_stream.setdefault(stream_hash, []).append(BlobInfo(position, blob_length or 0, iv, blob_hash)) + blobs_by_stream.setdefault(stream_hash, []).append(BlobInfo(position, blob_length or 0, iv, 0, blob_hash)) for stream_name, stream_key, suggested_filename, sd_hash, stream_hash in streams: sd = StreamDescriptor(None, blob_dir, stream_name, stream_key, suggested_filename, diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index f12a53616..eaac3301e 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -532,7 +532,8 @@ class SQLiteStorage(SQLiteMixin): def _get_blobs_for_stream(transaction): crypt_blob_infos = [] stream_blobs = transaction.execute( - "select blob_hash, position, iv from stream_blob where stream_hash=? " + "select s.blob_hash, s.position, s.iv, b.added_on " + "from stream_blob s left outer join blob b on b.blob_hash=s.blob_hash where stream_hash=? " "order by position asc", (stream_hash, ) ).fetchall() if only_completed: @@ -552,9 +553,10 @@ class SQLiteStorage(SQLiteMixin): for blob_hash, length in lengths: blob_length_dict[blob_hash] = length - for blob_hash, position, iv in stream_blobs: + current_time = time.time() + for blob_hash, position, iv, added_on in stream_blobs: blob_length = blob_length_dict.get(blob_hash, 0) - crypt_blob_infos.append(BlobInfo(position, blob_length, iv, blob_hash)) + crypt_blob_infos.append(BlobInfo(position, blob_length, iv, added_on or current_time, blob_hash)) if not blob_hash: break return crypt_blob_infos diff --git a/lbry/stream/descriptor.py b/lbry/stream/descriptor.py index b68184433..45397e4cb 100644 --- a/lbry/stream/descriptor.py +++ b/lbry/stream/descriptor.py @@ -194,12 +194,13 @@ class StreamDescriptor: raise InvalidStreamDescriptorError("Stream terminator blob should not have a hash") if any(i != blob_info['blob_num'] for i, blob_info in enumerate(decoded['blobs'])): raise InvalidStreamDescriptorError("Stream contains out of order or skipped blobs") + added_on = time.time() descriptor = cls( loop, blob_dir, binascii.unhexlify(decoded['stream_name']).decode(), decoded['key'], binascii.unhexlify(decoded['suggested_file_name']).decode(), - [BlobInfo(info['blob_num'], info['length'], info['iv'], info.get('blob_hash')) + [BlobInfo(info['blob_num'], info['length'], info['iv'], added_on, info.get('blob_hash')) for info in decoded['blobs']], decoded['stream_hash'], blob.blob_hash @@ -266,7 +267,7 @@ class StreamDescriptor: blobs.append(blob_info) blobs.append( # add the stream terminator - BlobInfo(len(blobs), 0, binascii.hexlify(next(iv_generator)).decode(), None, added_on, True) + BlobInfo(len(blobs), 0, binascii.hexlify(next(iv_generator)).decode(), added_on, None, True) ) file_name = os.path.basename(file_path) suggested_file_name = sanitize_file_name(file_name) diff --git a/tests/unit/database/test_SQLiteStorage.py b/tests/unit/database/test_SQLiteStorage.py index 8f4d42703..0d85e5651 100644 --- a/tests/unit/database/test_SQLiteStorage.py +++ b/tests/unit/database/test_SQLiteStorage.py @@ -84,7 +84,7 @@ class StorageTest(AsyncioTestCase): await self.storage.add_blobs((blob_hash, length, 0, 0), finished=True) async def store_fake_stream(self, stream_hash, blobs=None, file_name="fake_file", key="DEADBEEF"): - blobs = blobs or [BlobInfo(1, 100, "DEADBEEF", random_lbry_hash())] + blobs = blobs or [BlobInfo(1, 100, "DEADBEEF", 0, random_lbry_hash())] descriptor = StreamDescriptor( asyncio.get_event_loop(), self.blob_dir, file_name, key, file_name, blobs, stream_hash ) @@ -95,7 +95,7 @@ class StorageTest(AsyncioTestCase): async def make_and_store_fake_stream(self, blob_count=2, stream_hash=None): stream_hash = stream_hash or random_lbry_hash() blobs = [ - BlobInfo(i + 1, 100, "DEADBEEF", random_lbry_hash()) + BlobInfo(i + 1, 100, "DEADBEEF", 0, random_lbry_hash()) for i in range(blob_count) ] await self.store_fake_stream(stream_hash, blobs)