diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index e7076a5a7..df40d7ea6 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -248,7 +248,7 @@ class BlockProcessor: self.logger.info('chain reorg detected') else: self.logger.info(f'faking a reorg of {count:,d} blocks') - await self.flush(True) + async def get_raw_blocks(last_height, hex_hashes): heights = range(last_height, last_height - len(hex_hashes), -1) @@ -265,17 +265,28 @@ class BlockProcessor: self.touched.discard(None) self.db.flush_backup(self.flush_data(), self.touched) - start, last, hashes = await self.reorg_hashes(count) - # Reverse and convert to hex strings. - hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)] - for hex_hashes in chunks(hashes, 50): - raw_blocks = await get_raw_blocks(last, hex_hashes) - await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks) - await self.run_in_thread_with_lock(flush_backup) - last -= len(raw_blocks) - await self.run_in_thread_with_lock(self.db.sql.delete_claims_above_height, self.height) - await self.prefetcher.reset_height(self.height) - self.reorg_count_metric.inc() + try: + await self.flush(True) + + start, last, hashes = await self.reorg_hashes(count) + # Reverse and convert to hex strings. + hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)] + self.logger.info("reorg %i block hashes", len(hashes)) + for hex_hashes in chunks(hashes, 50): + raw_blocks = await get_raw_blocks(last, hex_hashes) + self.logger.info("got %i raw blocks", len(raw_blocks)) + await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks) + await self.run_in_thread_with_lock(flush_backup) + last -= len(raw_blocks) + + await self.run_in_thread_with_lock(self.db.sql.delete_claims_above_height, self.height) + await self.prefetcher.reset_height(self.height) + self.reorg_count_metric.inc() + except: + self.logger.exception("boom") + raise + finally: + self.logger.info("done with reorg") async def reorg_hashes(self, count): """Return a pair (start, last, hashes) of blocks to back up during a @@ -465,6 +476,7 @@ class BlockProcessor: coin = self.coin for raw_block in raw_blocks: + self.logger.info("backup block %i", self.height) # Check and update self.tip block = coin.block(raw_block, self.height) header_hash = coin.header_hash(block.header) @@ -493,7 +505,6 @@ class BlockProcessor: undo_entry_len = 12 + HASHX_LEN for tx, tx_hash in reversed(txs): - self.db.total_transactions.pop() for idx, txout in enumerate(tx.outputs): # Spend the TX outputs. Be careful with unspendable # outputs - we didn't save those in the first place. @@ -511,6 +522,8 @@ class BlockProcessor: self.utxo_cache[txin.prev_hash + s_pack(' 1: + tx_num, = unpack('