From 19c0a81c427acd7cf544fde5f91b35e6a31df704 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 21 Mar 2020 04:32:03 -0300 Subject: [PATCH] fix bad usages of hash and some tests --- lbry/extras/daemon/components.py | 2 +- lbry/testcase.py | 1 + lbry/wallet/header.py | 10 +++++++--- lbry/wallet/ledger.py | 13 +++++++------ lbry/wallet/manager.py | 4 ++-- lbry/wallet/orchstr8/node.py | 1 + .../blockchain/test_blockchain_reorganization.py | 6 +++--- tests/integration/blockchain/test_network.py | 12 +++++------- .../integration/blockchain/test_resolve_command.py | 2 +- tests/integration/blockchain/test_sync.py | 1 + tests/unit/lbrynet_daemon/test_Daemon.py | 2 +- 11 files changed, 30 insertions(+), 24 deletions(-) diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index 7de4d9fe2..5271c1558 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -143,7 +143,7 @@ class WalletComponent(Component): progress = min(max(math.ceil(float(download_height) / float(target_height) * 100), 0), 100) else: progress = 100 - best_hash = self.wallet_manager.get_best_blockhash() + best_hash = await self.wallet_manager.get_best_blockhash() result.update({ 'headers_synchronization_progress': progress, 'blocks': max(local_height, 0), diff --git a/lbry/testcase.py b/lbry/testcase.py index 56c9cb4c6..667fab5e1 100644 --- a/lbry/testcase.py +++ b/lbry/testcase.py @@ -370,6 +370,7 @@ class CommandTestCase(IntegrationTestCase): ) self.extra_wallet_node_port += 1 await wallet_node.start(self.conductor.spv_node, seed=seed) + await wallet_node.ledger.on_ready.first self.extra_wallet_nodes.append(wallet_node) upload_dir = os.path.join(wallet_node.data_path, 'uploads') diff --git a/lbry/wallet/header.py b/lbry/wallet/header.py index 97956d54d..1375d5442 100644 --- a/lbry/wallet/header.py +++ b/lbry/wallet/header.py @@ -49,6 +49,8 @@ class Headers: self.executor = ThreadPoolExecutor(1) async def open(self): + if not self.executor: + self.executor = ThreadPoolExecutor(1) if self.path != ':memory:': if not os.path.exists(self.path): self.io = open(self.path, 'w+b') @@ -57,7 +59,9 @@ class Headers: self._size = self.io.seek(0, os.SEEK_END) // self.header_size async def close(self): - self.executor.shutdown() + if self.executor: + self.executor.shutdown() + self.executor = None self.io.close() @staticmethod @@ -142,7 +146,7 @@ class Headers: async def ensure_chunk_at(self, height): if await self.has_header(height): - log.info("has header %s", height) + log.debug("has header %s", height) return log.info("on-demand fetching height %s", height) start = (height // 1000) * 1000 @@ -208,7 +212,7 @@ class Headers: # .seek()/.write()/.truncate() might also .flush() when needed # the goal here is mainly to ensure we're definitely flush()'ing self.io.flush() - self._size = self.io.tell() // self.header_size + self._size = max(self._size or 0, self.io.tell() // self.header_size) return written async def validate_chunk(self, height, chunk): diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 0d1284dc2..ad5801e61 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -316,6 +316,8 @@ class Ledger(metaclass=LedgerRegistry): first_connection = self.network.on_connected.first asyncio.ensure_future(self.network.start()) await first_connection + async with self._header_processing_lock: + await self._update_tasks.add(self.initial_headers_sync()) await asyncio.gather(*(a.maybe_migrate_certificates() for a in self.accounts)) await asyncio.gather(*(a.save_max_gap() for a in self.accounts)) if len(self.accounts) > 10: @@ -326,10 +328,8 @@ class Ledger(metaclass=LedgerRegistry): async def join_network(self, *_): log.info("Subscribing and updating accounts.") - self._update_tasks.add(self.initial_headers_sync()) async with self._header_processing_lock: - await self.headers.ensure_tip() - await self.update_headers() + await self._update_tasks.add(self.initial_headers_sync()) await self.subscribe_accounts() await self._update_tasks.done.wait() self._on_ready_controller.add(True) @@ -353,8 +353,9 @@ class Ledger(metaclass=LedgerRegistry): async def doit(): for height in reversed(range(0, target, 1000)): await self.headers.ensure_chunk_at(height) - asyncio.ensure_future(doit()) - return + await self.headers.ensure_tip() + self._update_tasks.add(doit()) + await self.update_headers() async def update_headers(self, height=None, headers=None, subscription_update=False): rewound = 0 @@ -894,7 +895,7 @@ class Ledger(metaclass=LedgerRegistry): headers = self.headers history = [] for tx in txs: # pylint: disable=too-many-nested-blocks - ts = headers.estimated_timestamp(tx.height)['timestamp'] + ts = headers.estimated_timestamp(tx.height) item = { 'txid': tx.id, 'timestamp': ts, diff --git a/lbry/wallet/manager.py b/lbry/wallet/manager.py index 6e8cc5db1..37b2ec992 100644 --- a/lbry/wallet/manager.py +++ b/lbry/wallet/manager.py @@ -248,10 +248,10 @@ class WalletManager: log.warning("Failed to migrate %s receiving addresses!", len(set(receiving_addresses).difference(set(migrated_receiving)))) - def get_best_blockhash(self): + async def get_best_blockhash(self): if len(self.ledger.headers) <= 0: return self.ledger.genesis_hash - return self.ledger.headers.hash(self.ledger.headers.height).decode() + return (await self.ledger.headers.hash(self.ledger.headers.height)).decode() def get_unused_address(self): return self.default_account.receiving.get_or_create_usable_address() diff --git a/lbry/wallet/orchstr8/node.py b/lbry/wallet/orchstr8/node.py index a94a7bc0a..411b57599 100644 --- a/lbry/wallet/orchstr8/node.py +++ b/lbry/wallet/orchstr8/node.py @@ -77,6 +77,7 @@ class Conductor: async def start_wallet(self): if not self.wallet_started: await self.wallet_node.start(self.spv_node) + await self.wallet_node.ledger.on_ready.first self.wallet_started = True async def stop_wallet(self): diff --git a/tests/integration/blockchain/test_blockchain_reorganization.py b/tests/integration/blockchain/test_blockchain_reorganization.py index 6aabb09fd..b271966df 100644 --- a/tests/integration/blockchain/test_blockchain_reorganization.py +++ b/tests/integration/blockchain/test_blockchain_reorganization.py @@ -8,7 +8,7 @@ class BlockchainReorganizationTests(IntegrationTestCase): async def assertBlockHash(self, height): self.assertEqual( - self.ledger.headers.hash(height).decode(), + (await self.ledger.headers.hash(height)).decode(), await self.blockchain.get_block_hash(height) ) @@ -16,7 +16,7 @@ class BlockchainReorganizationTests(IntegrationTestCase): # invalidate current block, move forward 2 self.assertEqual(self.ledger.headers.height, 200) await self.assertBlockHash(200) - await self.blockchain.invalidate_block(self.ledger.headers.hash(200).decode()) + await self.blockchain.invalidate_block((await self.ledger.headers.hash(200)).decode()) await self.blockchain.generate(2) await self.ledger.on_header.where(lambda e: e.height == 201) self.assertEqual(self.ledger.headers.height, 201) @@ -24,7 +24,7 @@ class BlockchainReorganizationTests(IntegrationTestCase): await self.assertBlockHash(201) # invalidate current block, move forward 3 - await self.blockchain.invalidate_block(self.ledger.headers.hash(200).decode()) + await self.blockchain.invalidate_block((await self.ledger.headers.hash(200)).decode()) await self.blockchain.generate(3) await self.ledger.on_header.where(lambda e: e.height == 202) self.assertEqual(self.ledger.headers.height, 202) diff --git a/tests/integration/blockchain/test_network.py b/tests/integration/blockchain/test_network.py index 95400e321..f2222d560 100644 --- a/tests/integration/blockchain/test_network.py +++ b/tests/integration/blockchain/test_network.py @@ -90,13 +90,11 @@ class ReconnectTests(IntegrationTestCase): while self.conductor.spv_node.server.session_mgr.notified_height < initial_height + 99: # off by 1 await asyncio.sleep(0.1) self.assertEqual(initial_height, self.ledger.local_height_including_downloaded_height) - # locks header processing so we make sure we are the only ones modifying it - async with self.ledger._header_processing_lock: - await self.ledger.headers.open() - await self.ledger.network.start() - await self.ledger.network.on_connected.first - await self.ledger.initial_headers_sync() - self.assertEqual(initial_height + 100, self.ledger.local_height_including_downloaded_height) + await self.ledger.headers.open() + await self.ledger.network.start() + await self.ledger.network.on_connected.first + await self.ledger.initial_headers_sync() + self.assertEqual(initial_height + 100, self.ledger.local_height_including_downloaded_height) async def test_connection_drop_still_receives_events_after_reconnected(self): address1 = await self.account.receiving.get_or_create_usable_address() diff --git a/tests/integration/blockchain/test_resolve_command.py b/tests/integration/blockchain/test_resolve_command.py index ea6874cfd..b6477199a 100644 --- a/tests/integration/blockchain/test_resolve_command.py +++ b/tests/integration/blockchain/test_resolve_command.py @@ -337,7 +337,7 @@ class ResolveAfterReorg(BaseResolveTestCase): blocks = self.ledger.headers.height - start self.blockchain.block_expected = start - 1 # go back to start - await self.blockchain.invalidate_block(self.ledger.headers.hash(start).decode()) + await self.blockchain.invalidate_block((await self.ledger.headers.hash(start)).decode()) # go to previous + 1 await self.generate(blocks + 2) diff --git a/tests/integration/blockchain/test_sync.py b/tests/integration/blockchain/test_sync.py index 7af2bd1aa..7a3ffd3a7 100644 --- a/tests/integration/blockchain/test_sync.py +++ b/tests/integration/blockchain/test_sync.py @@ -27,6 +27,7 @@ class SyncTests(IntegrationTestCase): wallet_node = WalletNode(WalletManager, RegTestLedger, port=self.api_port) await wallet_node.start(self.conductor.spv_node, seed) self.started_nodes.append(wallet_node) + await wallet_node.ledger.on_ready.first return wallet_node async def test_nodes_with_same_account_stay_in_sync(self): diff --git a/tests/unit/lbrynet_daemon/test_Daemon.py b/tests/unit/lbrynet_daemon/test_Daemon.py index 71f5325ed..6cbd57b04 100644 --- a/tests/unit/lbrynet_daemon/test_Daemon.py +++ b/tests/unit/lbrynet_daemon/test_Daemon.py @@ -92,7 +92,7 @@ class TestCostEst(unittest.TestCase): @unittest.SkipTest class TestJsonRpc(unittest.TestCase): def setUp(self): - def noop(): + async def noop(): return None test_utils.reset_time(self)