diff --git a/lbry/wallet/orchstr8/node.py b/lbry/wallet/orchstr8/node.py index a94a7bc0a..83436eb2d 100644 --- a/lbry/wallet/orchstr8/node.py +++ b/lbry/wallet/orchstr8/node.py @@ -55,7 +55,8 @@ class Conductor: async def start_blockchain(self): if not self.blockchain_started: - await self.blockchain_node.start() + asyncio.create_task(self.blockchain_node.start()) + await self.blockchain_node.running.wait() await self.blockchain_node.generate(200) self.blockchain_started = True @@ -255,6 +256,10 @@ class BlockchainNode: self.rpcport = 9245 + 2 # avoid conflict with default rpc port self.rpcuser = 'rpcuser' self.rpcpassword = 'rpcpassword' + self.stopped = False + self.restart_ready = asyncio.Event() + self.restart_ready.set() + self.running = asyncio.Event() @property def rpc_url(self): @@ -315,13 +320,27 @@ class BlockchainNode: f'-port={self.peerport}' ] self.log.info(' '.join(command)) - self.transport, self.protocol = await loop.subprocess_exec( - BlockchainProcess, *command - ) - await self.protocol.ready.wait() - assert not self.protocol.stopped.is_set() + while not self.stopped: + if self.running.is_set(): + await asyncio.sleep(1) + continue + await self.restart_ready.wait() + try: + self.transport, self.protocol = await loop.subprocess_exec( + BlockchainProcess, *command + ) + await self.protocol.ready.wait() + assert not self.protocol.stopped.is_set() + self.running.set() + except asyncio.CancelledError: + self.running.clear() + raise + except Exception as e: + self.running.clear() + log.exception('failed to start lbrycrdd', exc_info=e) async def stop(self, cleanup=True): + self.stopped = True try: self.transport.terminate() await self.protocol.stopped.wait() @@ -330,6 +349,16 @@ class BlockchainNode: if cleanup: self.cleanup() + async def clear_mempool(self): + self.restart_ready.clear() + self.transport.terminate() + await self.protocol.stopped.wait() + self.transport.close() + self.running.clear() + os.remove(os.path.join(self.data_path, 'regtest', 'mempool.dat')) + self.restart_ready.set() + await self.running.wait() + def cleanup(self): shutil.rmtree(self.data_path, ignore_errors=True) @@ -361,6 +390,12 @@ class BlockchainNode: def get_block_hash(self, block): return self._cli_cmnd('getblockhash', str(block)) + def sendrawtransaction(self, tx): + return self._cli_cmnd('sendrawtransaction', tx) + + async def get_block(self, block_hash): + return json.loads(await self._cli_cmnd('getblock', block_hash, '1')) + def get_raw_change_address(self): return self._cli_cmnd('getrawchangeaddress') diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 9c6f10682..44eba7d1a 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -2,7 +2,7 @@ import time import asyncio from struct import pack, unpack from concurrent.futures.thread import ThreadPoolExecutor - +from typing import Optional import lbry from lbry.schema.claim import Claim from lbry.wallet.server.db.writer import SQLDB @@ -10,7 +10,7 @@ from lbry.wallet.server.daemon import DaemonError from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN from lbry.wallet.server.util import chunks, class_logger from lbry.wallet.server.leveldb import FlushData -from lbry.wallet.server.prometheus import BLOCK_COUNT, BLOCK_UPDATE_TIMES +from lbry.wallet.server.prometheus import BLOCK_COUNT, BLOCK_UPDATE_TIMES, REORG_COUNT class Prefetcher: @@ -219,7 +219,7 @@ class BlockProcessor: 'resetting the prefetcher') await self.prefetcher.reset_height(self.height) - async def reorg_chain(self, count=None): + async def reorg_chain(self, count: Optional[int] = None): """Handle a chain reorganisation. Count is the number of blocks to simulate a reorg, or None for @@ -253,7 +253,9 @@ class BlockProcessor: await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks) await self.run_in_thread_with_lock(flush_backup) last -= len(raw_blocks) + await self.run_in_thread_with_lock(self.db.sql.delete_claims_above_height, self.height) await self.prefetcher.reset_height(self.height) + REORG_COUNT.inc() async def reorg_hashes(self, count): """Return a pair (start, last, hashes) of blocks to back up during a @@ -270,7 +272,7 @@ class BlockProcessor: return start, last, await self.db.fs_block_hashes(start, count) - async def calc_reorg_range(self, count): + async def calc_reorg_range(self, count: Optional[int]): """Calculate the reorg range""" def diff_pos(hashes1, hashes2): diff --git a/lbry/wallet/server/db/writer.py b/lbry/wallet/server/db/writer.py index cb9ad07ea..988b7b266 100644 --- a/lbry/wallet/server/db/writer.py +++ b/lbry/wallet/server/db/writer.py @@ -433,6 +433,15 @@ class SQLDB: return {r.channel_hash for r in affected_channels} return set() + def delete_claims_above_height(self, height: int): + claim_hashes = [x[0] for x in self.execute( + "SELECT claim_hash FROM claim WHERE height>?", (height, ) + ).fetchall()] + while claim_hashes: + batch = set(claim_hashes[:500]) + claim_hashes = claim_hashes[500:] + self.delete_claims(batch) + def _clear_claim_metadata(self, claim_hashes: Set[bytes]): if claim_hashes: for table in ('tag',): # 'language', 'location', etc diff --git a/lbry/wallet/server/prometheus.py b/lbry/wallet/server/prometheus.py index a2f82f21e..13359980a 100644 --- a/lbry/wallet/server/prometheus.py +++ b/lbry/wallet/server/prometheus.py @@ -51,7 +51,9 @@ BLOCK_COUNT = Gauge( "block_count", "Number of processed blocks", namespace=NAMESPACE ) BLOCK_UPDATE_TIMES = Histogram("block_time", "Block update times", namespace=NAMESPACE) - +REORG_COUNT = Gauge( + "reorg_count", "Number of reorgs", namespace=NAMESPACE +) class PrometheusServer: def __init__(self): diff --git a/tests/integration/blockchain/test_blockchain_reorganization.py b/tests/integration/blockchain/test_blockchain_reorganization.py index b271966df..216030839 100644 --- a/tests/integration/blockchain/test_blockchain_reorganization.py +++ b/tests/integration/blockchain/test_blockchain_reorganization.py @@ -1,8 +1,11 @@ import logging -from lbry.testcase import IntegrationTestCase +import asyncio +from binascii import hexlify +from lbry.testcase import CommandTestCase +from lbry.wallet.server.prometheus import REORG_COUNT -class BlockchainReorganizationTests(IntegrationTestCase): +class BlockchainReorganizationTests(CommandTestCase): VERBOSITY = logging.WARN @@ -13,21 +16,105 @@ class BlockchainReorganizationTests(IntegrationTestCase): ) async def test_reorg(self): + REORG_COUNT.set(0) # invalidate current block, move forward 2 - self.assertEqual(self.ledger.headers.height, 200) - await self.assertBlockHash(200) - await self.blockchain.invalidate_block((await self.ledger.headers.hash(200)).decode()) + self.assertEqual(self.ledger.headers.height, 206) + await self.assertBlockHash(206) + await self.blockchain.invalidate_block((await self.ledger.headers.hash(206)).decode()) await self.blockchain.generate(2) - await self.ledger.on_header.where(lambda e: e.height == 201) - self.assertEqual(self.ledger.headers.height, 201) - await self.assertBlockHash(200) - await self.assertBlockHash(201) + await self.ledger.on_header.where(lambda e: e.height == 207) + self.assertEqual(self.ledger.headers.height, 207) + await self.assertBlockHash(206) + await self.assertBlockHash(207) + self.assertEqual(1, REORG_COUNT._samples()[0][2]) # invalidate current block, move forward 3 - await self.blockchain.invalidate_block((await self.ledger.headers.hash(200)).decode()) + await self.blockchain.invalidate_block((await self.ledger.headers.hash(206)).decode()) await self.blockchain.generate(3) - await self.ledger.on_header.where(lambda e: e.height == 202) - self.assertEqual(self.ledger.headers.height, 202) - await self.assertBlockHash(200) - await self.assertBlockHash(201) - await self.assertBlockHash(202) + await self.ledger.on_header.where(lambda e: e.height == 208) + self.assertEqual(self.ledger.headers.height, 208) + await self.assertBlockHash(206) + await self.assertBlockHash(207) + await self.assertBlockHash(208) + self.assertEqual(2, REORG_COUNT._samples()[0][2]) + + async def test_reorg_change_claim_height(self): + # sanity check + txos, _, _, _ = await self.ledger.claim_search([], name='hovercraft') + self.assertListEqual(txos, []) + + still_valid = await self.daemon.jsonrpc_stream_create( + 'still-valid', '1.0', file_path=self.create_upload_file(data=b'hi!') + ) + await self.ledger.wait(still_valid) + await self.generate(1) + + # create a claim and verify it's returned by claim_search + self.assertEqual(self.ledger.headers.height, 207) + broadcast_tx = await self.daemon.jsonrpc_stream_create( + 'hovercraft', '1.0', file_path=self.create_upload_file(data=b'hi!') + ) + await self.ledger.wait(broadcast_tx) + await self.generate(1) + await self.ledger.wait(broadcast_tx, self.blockchain.block_expected) + self.assertEqual(self.ledger.headers.height, 208) + txos, _, _, _ = await self.ledger.claim_search([], name='hovercraft') + self.assertEqual(1, len(txos)) + txo = txos[0] + self.assertEqual(txo.tx_ref.id, broadcast_tx.id) + self.assertEqual(txo.tx_ref.height, 208) + + # check that our tx is in block 208 as returned by lbrycrdd + invalidated_block_hash = (await self.ledger.headers.hash(208)).decode() + block_207 = await self.blockchain.get_block(invalidated_block_hash) + self.assertIn(txo.tx_ref.id, block_207['tx']) + self.assertEqual(208, txos[0].tx_ref.height) + + # reorg the last block dropping our claim tx + await self.blockchain.invalidate_block(invalidated_block_hash) + await self.blockchain.clear_mempool() + await self.blockchain.generate(2) + + # verify the claim was dropped from block 208 as returned by lbrycrdd + reorg_block_hash = await self.blockchain.get_block_hash(208) + self.assertNotEqual(invalidated_block_hash, reorg_block_hash) + block_207 = await self.blockchain.get_block(reorg_block_hash) + self.assertNotIn(txo.tx_ref.id, block_207['tx']) + + # wait for the client to catch up and verify the reorg + await asyncio.wait_for(self.on_header(209), 3.0) + await self.assertBlockHash(207) + await self.assertBlockHash(208) + await self.assertBlockHash(209) + client_reorg_block_hash = (await self.ledger.headers.hash(208)).decode() + self.assertEqual(client_reorg_block_hash, reorg_block_hash) + + # verify the dropped claim is no longer returned by claim search + txos, _, _, _ = await self.ledger.claim_search([], name='hovercraft') + self.assertListEqual(txos, []) + + # verify the claim published a block earlier wasn't also reverted + txos, _, _, _ = await self.ledger.claim_search([], name='still-valid') + self.assertEqual(1, len(txos)) + self.assertEqual(207, txos[0].tx_ref.height) + + # broadcast the claim in a different block + new_txid = await self.blockchain.sendrawtransaction(hexlify(broadcast_tx.raw).decode()) + self.assertEqual(broadcast_tx.id, new_txid) + await self.blockchain.generate(1) + + # wait for the client to catch up + await asyncio.wait_for(self.on_header(210), 1.0) + + # verify the claim is in the new block and that it is returned by claim_search + block_210 = await self.blockchain.get_block((await self.ledger.headers.hash(210)).decode()) + self.assertIn(txo.tx_ref.id, block_210['tx']) + txos, _, _, _ = await self.ledger.claim_search([], name='hovercraft') + self.assertEqual(1, len(txos)) + self.assertEqual(txos[0].tx_ref.id, new_txid) + self.assertEqual(210, txos[0].tx_ref.height) + + # this should still be unchanged + txos, _, _, _ = await self.ledger.claim_search([], name='still-valid') + self.assertEqual(1, len(txos)) + self.assertEqual(207, txos[0].tx_ref.height)