diff --git a/scribe/reader/elastic_sync.py b/scribe/reader/elastic_sync.py index ee4acc1..64017d9 100644 --- a/scribe/reader/elastic_sync.py +++ b/scribe/reader/elastic_sync.py @@ -292,6 +292,42 @@ class ElasticWriter(BaseBlockchainReader): def last_synced_height(self) -> int: return self._last_wrote_height + async def catch_up(self): + last_state = self.db.prefix_db.db_state.get() + db_height = last_state.height + if last_state and self._last_wrote_height and db_height > self._last_wrote_height: + self.log.info( + "syncing ES from block %i to rocksdb height of %i", self._last_wrote_height, last_state.height + ) + for height in range(self._last_wrote_height + 1, last_state.height + 1): + self.advance(height) + else: + return + success = 0 + cnt = 0 + if self._touched_claims or self._deleted_claims or self._trending: + async for ok, item in async_streaming_bulk( + self.sync_client, self._claim_producer(), + raise_on_error=False): + cnt += 1 + if not ok: + self.log.warning("indexing failed for an item: %s", item) + else: + success += 1 + await self.sync_client.indices.refresh(self.index) + await self.apply_filters( + self.db.blocked_streams, self.db.blocked_channels, self.db.filtered_streams, + self.db.filtered_channels + ) + self.write_es_height(db_height, last_state.tip[::-1].hex()) + self._touched_claims.clear() + self._deleted_claims.clear() + self._removed_during_undo.clear() + self._trending.clear() + self._advanced = False + self.notify_es_notification_listeners(self._last_wrote_height, last_state.tip) + self.log.info("Indexing block %i done. %i/%i successful", self._last_wrote_height, success, cnt) + async def reindex(self, force=False): if force or self._last_wrote_height == 0 and self.db.db_height > 0: if self._last_wrote_height == 0: @@ -305,6 +341,7 @@ class ElasticWriter(BaseBlockchainReader): yield self.start_index() yield self._start_cancellable(self.run_es_notifier) yield self.reindex(force=self._force_reindex) + yield self.catch_up() yield self._start_cancellable(self.refresh_blocks_forever) def _iter_stop_tasks(self):