From c6a3c05a0c64a43aa0bfd01f2b5a13bf441f3807 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 7 May 2019 01:32:23 -0300 Subject: [PATCH 1/6] add missing migrator line --- lbrynet/extras/daemon/migrator/dbmigrator.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lbrynet/extras/daemon/migrator/dbmigrator.py b/lbrynet/extras/daemon/migrator/dbmigrator.py index db759a1bd..47fa080f9 100644 --- a/lbrynet/extras/daemon/migrator/dbmigrator.py +++ b/lbrynet/extras/daemon/migrator/dbmigrator.py @@ -22,6 +22,8 @@ def migrate_db(conf, start, end): from .migrate7to8 import do_migration elif current == 8: from .migrate8to9 import do_migration + elif current == 9: + from .migrate9to10 import do_migration else: raise Exception("DB migration of version {} to {} is not available".format(current, current+1)) From 84b471d4866561c444cbebcd502055eae51e7523 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 7 May 2019 01:32:55 -0300 Subject: [PATCH 2/6] limit batch size on announcer test so it covers batching logic --- tests/unit/dht/test_blob_announcer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/dht/test_blob_announcer.py b/tests/unit/dht/test_blob_announcer.py index 85dcd0946..0b5415b76 100644 --- a/tests/unit/dht/test_blob_announcer.py +++ b/tests/unit/dht/test_blob_announcer.py @@ -85,7 +85,7 @@ class TestBlobAnnouncer(AsyncioTestCase): ) to_announce = await self.storage.get_blobs_to_announce() self.assertEqual(2, len(to_announce)) - self.blob_announcer.start() + self.blob_announcer.start(batch_size=1) # so it covers batching logic await self.advance(61.0) to_announce = await self.storage.get_blobs_to_announce() self.assertEqual(0, len(to_announce)) From dc4560cc9a9c62a667f77120aa4355a9b2e99e2b Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 7 May 2019 01:33:15 -0300 Subject: [PATCH 3/6] refactor announcer --- lbrynet/dht/blob_announcer.py | 75 +++++++++++++++-------------------- 1 file changed, 31 insertions(+), 44 deletions(-) diff --git a/lbrynet/dht/blob_announcer.py b/lbrynet/dht/blob_announcer.py index 130a06626..64fb3ffe2 100644 --- a/lbrynet/dht/blob_announcer.py +++ b/lbrynet/dht/blob_announcer.py @@ -13,55 +13,42 @@ class BlobAnnouncer: self.loop = loop self.node = node self.storage = storage - self.pending_call: asyncio.Handle = None self.announce_task: asyncio.Task = None - self.running = False self.announce_queue: typing.List[str] = [] - async def _announce(self, batch_size: typing.Optional[int] = 10): - if not batch_size: - return - if not self.node.joined.is_set(): - await self.node.joined.wait() - blob_hashes = await self.storage.get_blobs_to_announce() - if blob_hashes: - self.announce_queue.extend(blob_hashes) - log.info("%i blobs to announce", len(blob_hashes)) - batch = [] - while len(self.announce_queue): - cnt = 0 - announced = [] - while self.announce_queue and cnt < batch_size: - blob_hash = self.announce_queue.pop() - announced.append(blob_hash) - batch.append(self.node.announce_blob(blob_hash)) - cnt += 1 - to_await = [] - while batch: - to_await.append(batch.pop()) - if to_await: - await asyncio.gather(*tuple(to_await), loop=self.loop) - await self.storage.update_last_announced_blobs(announced) - log.info("announced %i blobs", len(announced)) - if self.running: - self.pending_call = self.loop.call_later(60, self.announce, batch_size) + async def _submit_announcement(self, blob_hash): + try: + peers = len(await self.node.announce_blob(blob_hash)) + if peers > 4: + return blob_hash + else: + log.warning("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers) + except Exception as err: + log.warning("error announcing %s: %s", blob_hash[:8], str(err)) - def announce(self, batch_size: typing.Optional[int] = 10): - self.announce_task = self.loop.create_task(self._announce(batch_size)) + + async def _announce(self, batch_size: typing.Optional[int] = 10): + while batch_size: + if not self.node.joined.is_set(): + await self.node.joined.wait() + self.announce_queue.extend(await self.storage.get_blobs_to_announce()) + log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue)) + while len(self.announce_queue): + log.warning("%i blobs to announce", len(self.announce_queue)) + announced = await asyncio.gather(*[ + self._submit_announcement( + self.announce_queue.pop()) for _ in range(batch_size) if self.announce_queue + ], loop=self.loop) + announced = list(filter(None, announced)) + if announced: + await self.storage.update_last_announced_blobs(announced) + log.info("announced %i blobs", len(announced)) + await asyncio.sleep(60) def start(self, batch_size: typing.Optional[int] = 10): - if self.running: - raise Exception("already running") - self.running = True - self.announce(batch_size) + assert not self.announce_task or self.announce_task.done(), "already running" + self.announce_task = self.loop.create_task(self._announce(batch_size)) def stop(self): - self.running = False - if self.pending_call: - if not self.pending_call.cancelled(): - self.pending_call.cancel() - self.pending_call = None - if self.announce_task: - if not (self.announce_task.done() or self.announce_task.cancelled()): - self.announce_task.cancel() - self.announce_task = None + if self.announce_task and not self.announce_task.done(): + self.announce_task.cancel() From b91f27219dea7042541ff81b6f8a3e367a6e4f4f Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 7 May 2019 01:54:09 -0300 Subject: [PATCH 4/6] use fetchall as there is an update while iterating --- lbrynet/extras/daemon/migrator/migrate9to10.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/extras/daemon/migrator/migrate9to10.py b/lbrynet/extras/daemon/migrator/migrate9to10.py index ac08e77bc..97a48cc6f 100644 --- a/lbrynet/extras/daemon/migrator/migrate9to10.py +++ b/lbrynet/extras/daemon/migrator/migrate9to10.py @@ -8,7 +8,7 @@ def do_migration(conf): cursor = connection.cursor() query = "select stream_hash, sd_hash from main.stream" - for stream_hash, sd_hash in cursor.execute(query): + for stream_hash, sd_hash in cursor.execute(query).fetchall(): head_blob_hash = cursor.execute( "select blob_hash from stream_blob where position = 0 and stream_hash = ?", (stream_hash,) From 1bc4e4b702fb3545c3694d63e3116fe77bbd8ea4 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 7 May 2019 01:56:13 -0300 Subject: [PATCH 5/6] typo on logging, announcements logging is info, not warning --- lbrynet/dht/blob_announcer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/dht/blob_announcer.py b/lbrynet/dht/blob_announcer.py index 64fb3ffe2..f965aa7b1 100644 --- a/lbrynet/dht/blob_announcer.py +++ b/lbrynet/dht/blob_announcer.py @@ -34,7 +34,7 @@ class BlobAnnouncer: self.announce_queue.extend(await self.storage.get_blobs_to_announce()) log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue)) while len(self.announce_queue): - log.warning("%i blobs to announce", len(self.announce_queue)) + log.info("%i blobs to announce", len(self.announce_queue)) announced = await asyncio.gather(*[ self._submit_announcement( self.announce_queue.pop()) for _ in range(batch_size) if self.announce_queue From 007dd4386180d7bd46d91981fbcb6084024440fd Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 7 May 2019 10:59:33 -0300 Subject: [PATCH 6/6] re-raise cancelled errors --- lbrynet/dht/blob_announcer.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lbrynet/dht/blob_announcer.py b/lbrynet/dht/blob_announcer.py index f965aa7b1..33ab4f70e 100644 --- a/lbrynet/dht/blob_announcer.py +++ b/lbrynet/dht/blob_announcer.py @@ -24,6 +24,8 @@ class BlobAnnouncer: else: log.warning("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers) except Exception as err: + if isinstance(err, asyncio.CancelledError): + raise err log.warning("error announcing %s: %s", blob_hash[:8], str(err))