From f59adef282a61ffba6f3371d6157c61befe57ea4 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 22 Mar 2022 12:17:47 -0400 Subject: [PATCH] block `scribe-hub` and `scribe-elastic-sync` startup on `scribe` initially catching up fixes https://github.com/lbryio/scribe/issues/1 --- scribe/blockchain/service.py | 44 +++++++++++++++++++++------------ scribe/db/db.py | 14 +++++------ scribe/db/prefixes.py | 10 ++++---- scribe/elasticsearch/service.py | 17 +++++++++++++ scribe/hub/service.py | 3 ++- scribe/service.py | 3 +++ 6 files changed, 62 insertions(+), 29 deletions(-) diff --git a/scribe/blockchain/service.py b/scribe/blockchain/service.py index ba0c816..baa63ca 100644 --- a/scribe/blockchain/service.py +++ b/scribe/blockchain/service.py @@ -1540,10 +1540,12 @@ class BlockchainProcessorService(BlockchainService): """Loop forever processing blocks as they arrive.""" self._caught_up_event = caught_up_event try: + if self.height != self.daemon.cached_height() and not self.db.catching_up: + await self._need_catch_up() # tell the readers that we're still catching up with lbrycrd/lbcd while not self._stopping: if self.height == self.daemon.cached_height(): if not self._caught_up_event.is_set(): - await self._first_caught_up() + await self._finished_initial_catch_up() self._caught_up_event.set() try: await asyncio.wait_for(self.blocks_event.wait(), self.wait_for_blocks_duration) @@ -1558,25 +1560,27 @@ class BlockchainProcessorService(BlockchainService): await self.refresh_mempool() except asyncio.CancelledError: raise - except Exception: - self.log.exception("error while updating mempool txs") - raise + except Exception as err: + self.log.exception("error while updating mempool txs: %s", err) + raise err else: try: await self.check_and_advance_blocks(blocks) except asyncio.CancelledError: raise - except Exception: - self.log.exception("error while processing txs") - raise + except Exception as err: + self.log.exception("error while processing txs: %s", err) + raise err + except asyncio.CancelledError: + raise + except Exception as err: + self.log.exception("error in block processor loop: %s", err) + raise err finally: self._ready_to_stop.set() - async def _first_caught_up(self): - self.log.info(f'caught up to height {self.height}') - # Flush everything but with first_sync->False state. - first_sync = self.db.first_sync - self.db.first_sync = False + async def _need_catch_up(self): + self.db.catching_up = True def flush(): assert len(self.db.prefix_db._op_stack) == 0 @@ -1586,10 +1590,18 @@ class BlockchainProcessorService(BlockchainService): await self.run_in_thread_with_lock(flush) - if first_sync: - self.log.info(f'{__version__} synced to ' - f'height {self.height:,d}, halting here.') - self.shutdown_event.set() + async def _finished_initial_catch_up(self): + self.log.info(f'caught up to height {self.height}') + # Flush everything but with catching_up->False state. + self.db.catching_up = False + + def flush(): + assert len(self.db.prefix_db._op_stack) == 0 + self.db.write_db_state() + self.db.prefix_db.unsafe_commit() + self.db.assert_db_state() + + await self.run_in_thread_with_lock(flush) def _iter_start_tasks(self): self.height = self.db.db_height diff --git a/scribe/db/db.py b/scribe/db/db.py index a45814c..fa097c0 100644 --- a/scribe/db/db.py +++ b/scribe/db/db.py @@ -1045,10 +1045,10 @@ class HubDB: if self.db_height > 0: self.prefix_db.db_state.stage_delete((), self.prefix_db.db_state.get()) self.prefix_db.db_state.stage_put((), ( - self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip, - self.utxo_flush_count, int(self.wall_time), self.first_sync, self.db_version, - self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor, - self.es_sync_height + self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip, + self.utxo_flush_count, int(self.wall_time), self.catching_up, self.db_version, + self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor, + self.es_sync_height ) ) @@ -1062,7 +1062,7 @@ class HubDB: self.db_version = max(self.DB_VERSIONS) self.utxo_flush_count = 0 self.wall_time = 0 - self.first_sync = True + self.catching_up = True self.hist_flush_count = 0 self.hist_comp_flush_count = -1 self.hist_comp_cursor = -1 @@ -1083,7 +1083,7 @@ class HubDB: self.db_tip = state.tip self.utxo_flush_count = state.utxo_flush_count self.wall_time = state.wall_time - self.first_sync = state.first_sync + self.catching_up = state.catching_up self.hist_flush_count = state.hist_flush_count self.hist_comp_flush_count = state.comp_flush_count self.hist_comp_cursor = state.comp_cursor @@ -1097,7 +1097,7 @@ class HubDB: assert self.db_height == state.height, f"{self.db_height} != {state.height}" assert self.db_tx_count == state.tx_count, f"{self.db_tx_count} != {state.tx_count}" assert self.db_tip == state.tip, f"{self.db_tip} != {state.tip}" - assert self.first_sync == state.first_sync, f"{self.first_sync} != {state.first_sync}" + assert self.catching_up == state.catching_up, f"{self.catching_up} != {state.catching_up}" assert self.es_sync_height == state.es_sync_height, f"{self.es_sync_height} != {state.es_sync_height}" async def all_utxos(self, hashX): diff --git a/scribe/db/prefixes.py b/scribe/db/prefixes.py index dc47013..6543330 100644 --- a/scribe/db/prefixes.py +++ b/scribe/db/prefixes.py @@ -422,7 +422,7 @@ class DBState(typing.NamedTuple): tip: bytes utxo_flush_count: int wall_time: int - first_sync: bool + catching_up: bool db_version: int hist_flush_count: int comp_flush_count: int @@ -1439,11 +1439,11 @@ class DBStatePrefixRow(PrefixRow): @classmethod def pack_value(cls, genesis: bytes, height: int, tx_count: int, tip: bytes, utxo_flush_count: int, wall_time: int, - first_sync: bool, db_version: int, hist_flush_count: int, comp_flush_count: int, + catching_up: bool, db_version: int, hist_flush_count: int, comp_flush_count: int, comp_cursor: int, es_sync_height: int) -> bytes: return super().pack_value( genesis, height, tx_count, tip, utxo_flush_count, - wall_time, 1 if first_sync else 0, db_version, hist_flush_count, + wall_time, 1 if catching_up else 0, db_version, hist_flush_count, comp_flush_count, comp_cursor, es_sync_height ) @@ -1457,10 +1457,10 @@ class DBStatePrefixRow(PrefixRow): @classmethod def pack_item(cls, genesis: bytes, height: int, tx_count: int, tip: bytes, utxo_flush_count: int, wall_time: int, - first_sync: bool, db_version: int, hist_flush_count: int, comp_flush_count: int, + catching_up: bool, db_version: int, hist_flush_count: int, comp_flush_count: int, comp_cursor: int, es_sync_height: int): return cls.pack_key(), cls.pack_value( - genesis, height, tx_count, tip, utxo_flush_count, wall_time, first_sync, db_version, hist_flush_count, + genesis, height, tx_count, tip, utxo_flush_count, wall_time, catching_up, db_version, hist_flush_count, comp_flush_count, comp_cursor, es_sync_height ) diff --git a/scribe/elasticsearch/service.py b/scribe/elasticsearch/service.py index 6b471ce..2ec0fa6 100644 --- a/scribe/elasticsearch/service.py +++ b/scribe/elasticsearch/service.py @@ -336,7 +336,24 @@ class ElasticSyncService(BlockchainReaderService): self.log.info("reindex (last wrote: %i, db height: %i)", self._last_wrote_height, self.db.db_height) await self._reindex() + async def block_bulk_sync_on_writer_catchup(self): + def _check_if_catching_up(): + self.db.prefix_db.try_catch_up_with_primary() + state = self.db.prefix_db.db_state.get() + return state.catching_up + + loop = asyncio.get_event_loop() + + catching_up = True + while catching_up: + catching_up = await loop.run_in_executor(self._executor, _check_if_catching_up) + if catching_up: + await asyncio.sleep(1) + else: + return + def _iter_start_tasks(self): + yield self.block_bulk_sync_on_writer_catchup() yield self.read_es_height() yield self.start_index() yield self.start_cancellable(self.run_es_notifier) diff --git a/scribe/hub/service.py b/scribe/hub/service.py index 6caef20..01e6b99 100644 --- a/scribe/hub/service.py +++ b/scribe/hub/service.py @@ -92,8 +92,9 @@ class HubServerService(BlockchainReaderService): def _iter_start_tasks(self): yield self.start_status_server() yield self.start_cancellable(self.es_notification_client.maintain_connection) - yield self.start_cancellable(self.receive_es_notifications) yield self.start_cancellable(self.refresh_blocks_forever) + yield self.finished_initial_catch_up.wait() + yield self.start_cancellable(self.receive_es_notifications) yield self.session_manager.search_index.start() yield self.start_cancellable(self.session_manager.serve, self.mempool) diff --git a/scribe/service.py b/scribe/service.py index bdac2b1..62faf8b 100644 --- a/scribe/service.py +++ b/scribe/service.py @@ -140,6 +140,7 @@ class BlockchainReaderService(BlockchainService): super().__init__(env, secondary_name, thread_workers, thread_prefix) self._refresh_interval = 0.1 self.prometheus_server: typing.Optional[PrometheusServer] = None + self.finished_initial_catch_up = asyncio.Event() async def poll_for_changes(self): """ @@ -233,6 +234,8 @@ class BlockchainReaderService(BlockchainService): try: async with self.lock: await self.poll_for_changes() + if not self.db.catching_up and not self.finished_initial_catch_up.is_set(): + self.finished_initial_catch_up.set() except asyncio.CancelledError: raise except: