From a4880c1cf0a1cbc03ab2c3b57e73805e8d1c21b6 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 31 Jan 2022 15:36:15 -0500 Subject: [PATCH] flush in advance/rollback methods --- lbry/wallet/server/block_processor.py | 45 +++++++++++++-------------- 1 file changed, 21 insertions(+), 24 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index e8c9a410c..a7ac6a91a 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -252,9 +252,10 @@ class BlockProcessor: total_start = time.perf_counter() try: for block in blocks: + if self._stopping: + return start = time.perf_counter() - await self.run_in_thread(self.advance_block, block) - await self.flush() + await self.run_in_thread_with_lock(self.advance_block, block) self.logger.info("writer advanced to %i in %0.3fs", self.height, time.perf_counter() - start) if self.height == self.coin.nExtendedClaimExpirationForkHeight: @@ -289,7 +290,7 @@ class BlockProcessor: try: assert count > 0, count for _ in range(count): - await self.backup_block() + await self.run_in_thread_with_lock(self.backup_block) self.logger.info(f'backed up to height {self.height:,d}') if self.env.cache_all_claim_txos: @@ -311,19 +312,6 @@ class BlockProcessor: 'resetting the prefetcher') await self.prefetcher.reset_height(self.height) - async def flush(self): - save_undo = (self.daemon.cached_height() - self.height) <= self.env.reorg_limit - - def flush(): - self.db.write_db_state() - if save_undo: - self.db.prefix_db.commit(self.height, self.tip) - else: - self.db.prefix_db.unsafe_commit() - self.clear_after_advance_or_reorg() - self.db.assert_db_state() - await self.run_in_thread_with_lock(flush) - def _add_claim_or_update(self, height: int, txo: 'Output', tx_hash: bytes, tx_num: int, nout: int, spent_claims: typing.Dict[bytes, typing.Tuple[int, int, str]]): try: @@ -1415,7 +1403,6 @@ class BlockProcessor: self.db.headers.append(block.header) self.tip = self.coin.header_hash(block.header) - self.db.fs_height = self.height self.db.fs_tx_count = self.tx_count self.db.hist_flush_count += 1 @@ -1430,6 +1417,17 @@ class BlockProcessor: self.db.last_flush = now self.db.write_db_state() + # flush the changes + save_undo = (self.daemon.cached_height() - self.height) <= self.env.reorg_limit + + self.db.write_db_state() + if save_undo: + self.db.prefix_db.commit(self.height, self.tip) + else: + self.db.prefix_db.unsafe_commit() + self.clear_after_advance_or_reorg() + self.db.assert_db_state() + def clear_after_advance_or_reorg(self): self.txo_to_claim.clear() self.claim_hash_to_txo.clear() @@ -1463,7 +1461,7 @@ class BlockProcessor: self.pending_support_amount_change.clear() self.touched_hashXs.clear() - async def backup_block(self): + def backup_block(self): assert len(self.db.prefix_db._op_stack) == 0 touched_and_deleted = self.db.prefix_db.touched_or_deleted.get(self.height) self.touched_claims_to_send_es.update(touched_and_deleted.touched_claims) @@ -1514,13 +1512,12 @@ class BlockProcessor: self.db.last_flush = now self.db.last_flush_tx_count = self.db.fs_tx_count - def rollback(): - self.db.prefix_db.rollback(self.height + 1, reverted_block_hash) - self.db.es_sync_height = self.height - self.db.write_db_state() - self.db.prefix_db.unsafe_commit() + # rollback + self.db.prefix_db.rollback(self.height + 1, reverted_block_hash) + self.db.es_sync_height = self.height + self.db.write_db_state() + self.db.prefix_db.unsafe_commit() - await self.run_in_thread_with_lock(rollback) self.clear_after_advance_or_reorg() self.db.assert_db_state()