diff --git a/lbry/service/light_client.py b/lbry/service/light_client.py index eb957cd30..f5447bc72 100644 --- a/lbry/service/light_client.py +++ b/lbry/service/light_client.py @@ -35,6 +35,9 @@ class LightClient(Service): await self.client.connect() await super().start() await self.client.start_event_streams() + self.wallets.on_change.listen( + lambda _: self.sync.on_block_event.set() + ) async def stop(self): await super().stop() @@ -82,33 +85,54 @@ class FilterManager: self.cache = {} async def download_and_save_filters(self, needed_filters): + if not needed_filters: + print(' nothing to download') for factor, filter_start, filter_end in needed_filters: - print(f'loop, factor: {factor}, filter start: {filter_start}, filter end: {filter_end}') if factor == 0: + print( + f'=> address_filter(granularity={factor}, ' + f'start_height={filter_start}, end_height={filter_end})' + ) filters = await self.client.first.address_filter( granularity=factor, start_height=filter_start, end_height=filter_end ) - print(f'tx_filters: {len(filters)}') - for tx_filter in filters: - await self.db.insert_tx_filter( - unhexlify(tx_filter["txid"])[::-1], tx_filter["height"], - unhexlify(tx_filter["filter"]) - ) + print( + f'<= address_filter(granularity={factor}, ' + f'start_height={filter_start}, end_height={filter_end})' + ) + print(f' inserting {len(filters)} tx filters...') + await self.db.insert_tx_filters(( + unhexlify(tx_filter["txid"])[::-1], + tx_filter["height"], + unhexlify(tx_filter["filter"]) + ) for tx_filter in filters) + elif factor <= 3: + print( + f'=> address_filter(granularity={factor}, ' + f'start_height={filter_start}, end_height={filter_end})' + ) + filters = await self.client.first.address_filter( + granularity=factor, start_height=filter_start, end_height=filter_end + ) + print( + f'<= address_filter(granularity={factor}, ' + f'start_height={filter_start}, end_height={filter_end})' + ) + await self.db.insert_block_filters( + (block_filter["height"], factor, unhexlify(block_filter["filter"])) + for block_filter in filters + ) else: - if factor > 1: - step = 10**factor - else: - step = 1 - for start in range(filter_start, filter_end+1, step): + for start in range(filter_start, filter_end+1, 10**factor): print(f'=> address_filter(granularity={factor}, start_height={start})') filters = await self.client.first.address_filter( granularity=factor, start_height=start ) print(f'<= address_filter(granularity={factor}, start_height={start})') - for block_filter in filters: - await self.db.insert_block_filter( - block_filter["height"], factor, unhexlify(block_filter["filter"]) - ) + await self.db.insert_block_filters( + (block_filter["height"], factor, unhexlify(block_filter["filter"])) + for block_filter in filters + ) async def download_and_save_txs(self, tx_hashes): if not tx_hashes: @@ -161,18 +185,19 @@ class FilterManager: await self.download_and_save_txs(missing) async def download(self, best_height: int, wallets: WalletManager): - print('download_initial_filters') + print('downloading initial filters...') await self.download_initial_filters(best_height) - print('generate_addresses') + print('generating addresses...') await self.generate_addresses(best_height, wallets) - print('download_sub_filters 3') + print('downloading level 3 filters...') await self.download_sub_filters(3, wallets) - print('download_sub_filters 2') + print('downloading level 2 filters...') await self.download_sub_filters(2, wallets) - print('download_sub_filters 1') + print('downloading level 1 filters...') await self.download_sub_filters(1, wallets) - print('download_transactions') + print('downloading transactions...') await self.download_transactions(wallets) + print(f" = finished sync'ing up-to block {best_height} = ") @staticmethod def get_root_of_merkle_tree(branches, branch_positions, working_branch): @@ -219,7 +244,7 @@ class BlockHeaderManager: async def download(self, best_height): print('downloading blocks...') our_height = await self.db.get_best_block_height() - for start in range(our_height+1, best_height, 10000): + for start in range(our_height+1, best_height, 50000): end = min(start+9999, best_height) print(f'=> block_list(start_height={start}, end_height={end})') blocks = await self.client.first.block_list(start_height=start, end_height=end) @@ -284,10 +309,12 @@ class FastSync(Sync): async def advance(self): height = self.best_height or await self.client.first.block_tip() - await asyncio.wait([ - self.blocks.download(height), - self.filters.download(height, self.service.wallets), - ]) + await self.blocks.download(height) + await self.filters.download(height, self.service.wallets) + # await asyncio.wait([ + # self.blocks.download(height), + # self.filters.download(height, self.service.wallets), + # ]) await self._on_synced_controller.add(height) async def loop(self): diff --git a/lbry/wallet/manager.py b/lbry/wallet/manager.py index b634f56da..241f03b4d 100644 --- a/lbry/wallet/manager.py +++ b/lbry/wallet/manager.py @@ -7,6 +7,7 @@ from typing import Optional, Dict from lbry.db import Database from lbry.blockchain.dewies import dict_values_to_lbc +from lbry.event import EventController from .wallet import Wallet from .account import SingleKey, HierarchicalDeterministic @@ -19,6 +20,8 @@ class WalletManager: def __init__(self, db: Database): self.db = db self.ledger = db.ledger + self._on_change_controller = EventController() + self.on_change = self._on_change_controller.stream self.wallets: Dict[str, Wallet] = {} if self.ledger.conf.wallet_storage == "file": self.storage = FileWallet(self.db, self.ledger.conf.wallet_dir) @@ -112,7 +115,12 @@ class WalletManager: def add(self, wallet: Wallet) -> Wallet: self.wallets[wallet.id] = wallet - wallet.on_change.listen(lambda _: self.storage.save(wallet)) + + def wallet_change_handler(event): + asyncio.create_task(self.storage.save(wallet)) + asyncio.create_task(self._on_change_controller.add(event)) + wallet.on_change.listen(wallet_change_handler) + return wallet def remove(self, wallet_id: str) -> Wallet: