From d74fa05a8b866a7cfed6aad2caa30667c16cc00c Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Fri, 18 Dec 2020 10:44:58 -0500 Subject: [PATCH] client wallet sync --- lbry/db/database.py | 6 +++ lbry/db/queries/base.py | 10 ++++ lbry/db/query_context.py | 2 +- lbry/service/api.py | 4 ++ lbry/service/base.py | 4 +- lbry/service/light_client.py | 93 +++++++++++++++++++++++------------- lbry/testcase.py | 6 +-- lbry/wallet/manager.py | 13 +++++ 8 files changed, 99 insertions(+), 39 deletions(-) diff --git a/lbry/db/database.py b/lbry/db/database.py index c65668aab..532323b21 100644 --- a/lbry/db/database.py +++ b/lbry/db/database.py @@ -232,6 +232,9 @@ class Database: async def get_best_block_height(self) -> int: return await self.run(q.get_best_block_height) + async def get_best_block_filter(self) -> int: + return await self.run(q.get_best_block_filter) + async def process_all_things_after_sync(self): return await self.run(sync.process_all_things_after_sync) @@ -244,6 +247,9 @@ class Database: async def insert_block(self, block): return await self.run(q.insert_block, block) + async def insert_block_filter(self, height: int, address_filter: bytes): + return await self.run(q.insert_block_filter, height, address_filter) + async def insert_transaction(self, block_hash, tx): return await self.run(q.insert_transaction, block_hash, tx) diff --git a/lbry/db/queries/base.py b/lbry/db/queries/base.py index 0aa3365e4..ee7022242 100644 --- a/lbry/db/queries/base.py +++ b/lbry/db/queries/base.py @@ -40,6 +40,10 @@ def get_best_block_height(): return context().fetchmax(Block.c.height, -1) +def get_best_block_filter(): + return context().fetchmax(BlockFilter.c.height, -1) + + def insert_block(block): context().get_bulk_loader().add_block(block).flush(return_row_count_for_table=None) @@ -56,6 +60,12 @@ def get_block_headers(first, last=None): return context().fetchall(query) +def insert_block_filter(height: int, address_filter: bytes): + loader = context().get_bulk_loader() + loader.add_block_filter(height, address_filter) + loader.flush(return_row_count_for_table=None) + + def get_filters(start_height, end_height=None, granularity=0): assert granularity >= 0, "filter granularity must be 0 or positive number" if granularity == 0: diff --git a/lbry/db/query_context.py b/lbry/db/query_context.py index 00e69f3f1..755712f3b 100644 --- a/lbry/db/query_context.py +++ b/lbry/db/query_context.py @@ -72,7 +72,7 @@ class QueryContext: if self._variable_limit is not None: return self._variable_limit if self.is_sqlite: - for result in self.fetchall('PRAGMA COMPILE_OPTIONS;'): + for result in self.fetchall(text('PRAGMA COMPILE_OPTIONS;')): for _, value in result.items(): if value.startswith('MAX_VARIABLE_NUMBER'): self._variable_limit = int(value.split('=')[1]) diff --git a/lbry/service/api.py b/lbry/service/api.py index c25ef879e..5b712daf4 100644 --- a/lbry/service/api.py +++ b/lbry/service/api.py @@ -3458,6 +3458,10 @@ class Client(API): raise ValueError(f'Unknown message received: {d}') await controller.close() del self.requests[d['id']] + elif 'event' in d: + controller = self.subscriptions.get(d['event']) + if controller is not None: + await controller.add(d['payload']) elif 'method' in d and d['method'].startswith('event'): print(d) else: diff --git a/lbry/service/base.py b/lbry/service/base.py index 3a5bd5da5..0af9228b8 100644 --- a/lbry/service/base.py +++ b/lbry/service/base.py @@ -76,12 +76,12 @@ class Service: async def start(self): await self.db.open() - await self.wallets.storage.prepare() - await self.wallets.initialize() + await self.wallets.open() await self.sync.start() async def stop(self): await self.sync.stop() + await self.wallets.close() await self.db.close() async def get_status(self): diff --git a/lbry/service/light_client.py b/lbry/service/light_client.py index 67888c2ac..8622aa41b 100644 --- a/lbry/service/light_client.py +++ b/lbry/service/light_client.py @@ -6,6 +6,7 @@ from binascii import unhexlify from lbry.blockchain.block import Block, get_address_filter from lbry.event import BroadcastSubscription +from lbry.crypto.hash import hash160 from lbry.wallet.account import AddressManager from lbry.blockchain import Ledger, Transaction from lbry.db import Database @@ -110,33 +111,15 @@ class FilterManager: self.client = client self.cache = {} - async def download(self): - filters = await self.client.first.address_filter(start_height=0, end_height=500, granularity=0) - #address = None - #address_array = [bytearray(self.db.ledger.address_to_hash160(address))] - #for address_filter in filters: - # print(address_filter) - # address_filter = get_address_filter(unhexlify(address_filter['filter'])) - # print(address_filter.MatchAny(address_array)) - - -# address_array = [ -# bytearray(a['address'].encode()) -# for a in await self.service.db.get_all_addresses() -# ] -# block_filters = await self.service.get_block_address_filters() -# for block_hash, block_filter in block_filters.items(): -# bf = get_address_filter(block_filter) -# if bf.MatchAny(address_array): -# print(f'match: {block_hash} - {block_filter}') -# tx_filters = await self.service.get_transaction_address_filters(block_hash=block_hash) -# for txid, tx_filter in tx_filters.items(): -# tf = get_address_filter(tx_filter) -# if tf.MatchAny(address_array): -# print(f' match: {txid} - {tx_filter}') -# txs = await self.service.search_transactions([txid]) -# tx = Transaction(unhexlify(txs[txid])) -# await self.service.db.insert_transaction(tx) + async def download(self, best_height): + our_height = await self.db.get_best_block_filter() + new_block_filters = await self.client.address_filter( + start_height=our_height+1, end_height=best_height, granularity=1 + ) + for block_filter in await new_block_filters.first: + await self.db.insert_block_filter( + block_filter["height"], unhexlify(block_filter["filter"]) + ) async def get_filters(self, start_height, end_height, granularity): return await self.client.address_filter( @@ -156,9 +139,8 @@ class BlockHeaderManager: self.client = client self.cache = {} - async def download(self): + async def download(self, best_height): our_height = await self.db.get_best_block_height() - best_height = await self.client.first.block_tip() for block in await self.client.first.block_list(start_height=our_height+1, end_height=best_height): await self.db.insert_block(Block( height=block["height"], @@ -194,10 +176,10 @@ class FastSync(Sync): self.filters = FilterManager(self.db, self.client) async def get_block_headers(self, start_height: int, end_height: int = None): - return await self.client.block_list(start_height, end_height) + return await self.client.first.block_list(start_height, end_height) async def get_best_block_height(self) -> int: - return await self.client.block_tip() + return await self.client.first.block_tip() async def start(self): self.advance_loop_task = asyncio.create_task(self.advance()) @@ -213,11 +195,56 @@ class FastSync(Sync): task.cancel() async def advance(self): + best_height = await self.client.first.block_tip() await asyncio.wait([ - self.blocks.download(), - self.filters.download() + self.blocks.download(best_height), + self.filters.download(best_height), ]) + block_filters = {} + for block_filter in await self.db.get_filters(0, best_height, 1): + block_filters[block_filter['height']] = \ + get_address_filter(unhexlify(block_filter['filter'])) + + for wallet in self.service.wallets: + for account in wallet.accounts: + for address_manager in account.address_managers.values(): + i = gap = 0 + while gap < 20: + key, i = address_manager.public_key.child(i), i+1 + address = bytearray(hash160(key.pubkey_bytes)) + for block, matcher in block_filters.items(): + if matcher.Match(address): + gap = 0 + continue + gap += 1 + + # address = None + # address_array = [bytearray(self.db.ledger.address_to_hash160(address))] + # for address_filter in filters: + # print(address_filter) + # address_filter = get_address_filter(unhexlify(address_filter['filter'])) + # print(address_filter.MatchAny(address_array)) + + +# address_array = [ +# bytearray(a['address'].encode()) +# for a in await self.service.db.get_all_addresses() +# ] +# block_filters = await self.service.get_block_address_filters() +# for block_hash, block_filter in block_filters.items(): +# bf = get_address_filter(block_filter) +# if bf.MatchAny(address_array): +# print(f'match: {block_hash} - {block_filter}') +# tx_filters = await self.service.get_transaction_address_filters(block_hash=block_hash) +# for txid, tx_filter in tx_filters.items(): +# tf = get_address_filter(tx_filter) +# if tf.MatchAny(address_array): +# print(f' match: {txid} - {tx_filter}') +# txs = await self.service.search_transactions([txid]) +# tx = Transaction(unhexlify(txs[txid])) +# await self.service.db.insert_transaction(tx) + async def loop(self): while True: try: diff --git a/lbry/testcase.py b/lbry/testcase.py index 95e825c80..cbdb24616 100644 --- a/lbry/testcase.py +++ b/lbry/testcase.py @@ -465,8 +465,8 @@ class IntegrationTestCase(AsyncioTestCase): service = FullNode(ledger) console = Console(service) daemon = Daemon(service, console) - self.addCleanup(daemon.stop) if start: + self.addCleanup(daemon.stop) await daemon.start() return daemon @@ -485,8 +485,8 @@ class IntegrationTestCase(AsyncioTestCase): service = FullEndpoint(ledger) console = Console(service) daemon = Daemon(service, console) - self.addCleanup(daemon.stop) if start: + self.addCleanup(daemon.stop) await daemon.start() return daemon @@ -504,8 +504,8 @@ class IntegrationTestCase(AsyncioTestCase): service = LightClient(ledger) console = Console(service) daemon = Daemon(service, console) - self.addCleanup(daemon.stop) if start: + self.addCleanup(daemon.stop) await daemon.start() return daemon diff --git a/lbry/wallet/manager.py b/lbry/wallet/manager.py index 3292be05c..b634f56da 100644 --- a/lbry/wallet/manager.py +++ b/lbry/wallet/manager.py @@ -27,6 +27,12 @@ class WalletManager: else: raise Exception(f"Unknown wallet storage format: {self.ledger.conf.wallet_storage}") + def __len__(self): + return self.wallets.__len__() + + def __iter__(self): + return self.wallets.values().__iter__() + def __getitem__(self, wallet_id: str) -> Wallet: try: return self.wallets[wallet_id] @@ -56,6 +62,13 @@ class WalletManager: raise ValueError("Cannot spend funds with locked wallet, unlock first.") return wallet + async def open(self): + await self.storage.prepare() + await self.initialize() + + async def close(self): + pass + async def initialize(self): for wallet_id in self.ledger.conf.wallets: if wallet_id in self.wallets: