From 1e4613fd8a809588d8c81a796c250f9be8738008 Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Wed, 11 Nov 2020 10:57:51 -0500 Subject: [PATCH] more commands --- lbry/service/api.py | 44 +++-- lbry/service/light_client.py | 182 +++++++++++++-------- lbry/testcase.py | 6 + lbry/wallet/account.py | 17 -- lbry/wallet/wallet.py | 4 +- tests/integration/commands/__init__.py | 0 tests/integration/commands/test_account.py | 68 +++----- 7 files changed, 175 insertions(+), 146 deletions(-) create mode 100644 tests/integration/commands/__init__.py diff --git a/lbry/service/api.py b/lbry/service/api.py index 09a9a3729..24ec08575 100644 --- a/lbry/service/api.py +++ b/lbry/service/api.py @@ -12,7 +12,7 @@ import base58 from aiohttp import ClientSession from lbry.conf import Setting, NOT_SET -from lbry.db import TXO_TYPES +from lbry.db import TXO_TYPES, CLAIM_TYPE_NAMES from lbry.db.utils import constrain_single_or_list from lbry.wallet import Wallet, Account, SingleKey, HierarchicalDeterministic from lbry.blockchain import Transaction, Output, dewies_to_lbc, dict_values_to_lbc @@ -1501,11 +1501,16 @@ class API: {kwargs} """ - kwargs = claim_filter_and_and_signed_filter_and_stream_filter_and_channel_filter_and_pagination_kwargs + kwargs = claim_filter_and_stream_filter_and_pagination_kwargs kwargs['type'] = claim_type or CLAIM_TYPE_NAMES if 'is_spent' not in kwargs: kwargs['is_not_spent'] = True - return await self.txo_list(**kwargs) + return await self.txo_list( + account_id=account_id, wallet_id=wallet_id, + is_spent=is_spent, resolve=resolve, + include_received_tips=include_received_tips, + **kwargs + ) async def claim_search( self, @@ -1631,9 +1636,11 @@ class API: amount = self.ledger.get_dewies_or_error('bid', bid, positive_value=True) holding_account = wallet.accounts.get_or_default(channel_dict.pop('account_id')) funding_accounts = wallet.accounts.get_or_all(tx_dict.pop('fund_account_id')) + change_account = wallet.accounts.get_or_default(tx_dict.pop('change_account_id')) await wallet.verify_duplicate(name, allow_duplicate_name) tx = await wallet.channels.create( - name=name, amount=amount, holding_account=holding_account, funding_accounts=funding_accounts, + name=name, amount=amount, holding_account=holding_account, + funding_accounts=funding_accounts, change_account=change_account, save_key=not tx_dict['preview'], **remove_nulls(channel_dict) ) await self.service.maybe_broadcast_or_release(tx, **tx_dict) @@ -2113,15 +2120,19 @@ class API: List my stream claims. Usage: - stream list [ | --account_id=] [--wallet_id=] + stream list [--account_id=] [--wallet_id=] [--is_spent] [--resolve] {kwargs} """ - kwargs['type'] = 'stream' - if 'is_spent' not in kwargs: - kwargs['is_not_spent'] = True - return await self.txo_list(*args, **kwargs) + claim_filter_and_pagination_kwargs['type'] = 'stream' + if 'is_spent' not in claim_filter_and_pagination_kwargs: + claim_filter_and_pagination_kwargs['is_not_spent'] = True + return await self.txo_list( + account_id=account_id, wallet_id=wallet_id, + is_spent=is_spent, resolve=resolve, + **claim_filter_and_pagination_kwargs + ) async def stream_cost_estimate( self, @@ -2410,6 +2421,7 @@ class API: {kwargs} """ + kwargs = pagination_kwargs kwargs['type'] = 'support' if 'is_spent' not in kwargs: kwargs['is_not_spent'] = True @@ -2425,7 +2437,9 @@ class API: elif staked: kwargs['is_my_input'] = True kwargs['is_my_output'] = True - return await self.txo_list(*args, **kwargs) + return await self.txo_list( + account_id=account_id, wallet_id=wallet_id, is_spent=is_spent, **kwargs + ) async def support_search( self, @@ -2722,7 +2736,7 @@ class API: else: raise ValueError(f"'{order_by}' is not a valid --order_by value.") self._constrain_txo_from_kwargs(constraints, **txo_dict) - return await paginate_rows( + return await Paginated.from_getter( self.service.get_txos, wallet=wallet, accounts=accounts, **pagination, **constraints @@ -2820,13 +2834,13 @@ class API: List unspent transaction outputs Usage: - utxo_list + utxo list {kwargs} """ - kwargs['type'] = ['other', 'purchase'] - kwargs['is_not_spent'] = True - return await self.txo_list(*args, **kwargs) + txo_filter_and_pagination_kwargs['type'] = ['other', 'purchase'] + txo_filter_and_pagination_kwargs['is_not_spent'] = True + return await self.txo_list(**txo_filter_and_pagination_kwargs) async def utxo_release( self, diff --git a/lbry/service/light_client.py b/lbry/service/light_client.py index 64b30c945..5f382e6d6 100644 --- a/lbry/service/light_client.py +++ b/lbry/service/light_client.py @@ -11,11 +11,12 @@ from typing import List, Optional, DefaultDict, NamedTuple #from lbry.crypto.hash import double_sha256, sha256 from lbry.tasks import TaskGroup -from lbry.blockchain.transaction import Transaction -from lbry.blockchain.block import get_address_filter +from lbry.blockchain import Transaction +from lbry.blockchain.block import Block, get_address_filter from lbry.event import BroadcastSubscription, EventController from lbry.wallet.account import AddressManager from lbry.blockchain import Ledger, Transaction +from lbry.db import Database from .base import Service, Sync from .api import Client as APIClient @@ -36,14 +37,21 @@ class LightClient(Service): f"http://{ledger.conf.full_nodes[0][0]}:{ledger.conf.full_nodes[0][1]}/ws" ) self.sync = FastSync(self, self.client) - self.blocks = BlockHeaderManager(self.db, self.client) - self.filters = FilterManager(self.db, self.client) + + async def start(self): + await self.client.connect() + await super().start() + await self.client.start_event_streams() + + async def stop(self): + await super().stop() + await self.client.disconnect() async def search_transactions(self, txids): return await self.client.transaction_search(txids=txids) async def get_address_filters(self, start_height: int, end_height: int = None, granularity: int = 0): - return await self.filters.get_filters( + return await self.sync.filters.get_filters( start_height=start_height, end_height=end_height, granularity=granularity ) @@ -108,74 +116,16 @@ class FilterManager: self.client = client self.cache = {} - async def get_filters(self, start_height, end_height, granularity): - return await self.client.address_filter( - start_height=start_height, end_height=end_height, granularity=granularity - ) - - -class BlockHeaderManager: - """ - Efficient on-demand block header access. - Stores and retrieves from local db what it previously downloaded and - downloads on-demand what it doesn't have from full node. - """ - - def __init__(self, db, client): - self.db = db - self.client = client - self.cache = {} - - async def get_header(self, height): - blocks = await self.client.block_list(height) - if blocks: - return blocks[0] - - async def add(self, header): - pass - async def download(self): - pass + filters_response = await self.client.get_address_filters(0, 500) + filters = await filters_response.first + address_array = [bytearray(self.client.ledger.address_to_hash160(address))] + for filter in filters: + print(filter) + filter = get_address_filter(unhexlify(filter['filter'])) + print(filter.MatchAny(address_array)) -class FastSync(Sync): - - def __init__(self, service: Service, client: APIClient): - super().__init__(service.ledger, service.db) - self.service = service - self.client = client - self.advance_loop_task: Optional[asyncio.Task] = None - self.on_block = client.get_event_stream('blockchain.block') - self.on_block_event = asyncio.Event() - self.on_block_subscription: Optional[BroadcastSubscription] = None - self.on_mempool = client.get_event_stream('blockchain.mempool') - self.on_mempool_event = asyncio.Event() - self.on_mempool_subscription: Optional[BroadcastSubscription] = None - - async def wait_for_client_ready(self): - await self.client.connect() - - async def start(self): - return - self.db.stop_event.clear() - await self.wait_for_client_ready() - self.advance_loop_task = asyncio.create_task(self.advance()) - await self.advance_loop_task - await self.client.subscribe() - self.advance_loop_task = asyncio.create_task(self.advance_loop()) - self.on_block_subscription = self.on_block.listen( - lambda e: self.on_block_event.set() - ) - self.on_mempool_subscription = self.on_mempool.listen( - lambda e: self.on_mempool_event.set() - ) - await self.download_filters() - await self.download_headers() - - async def stop(self): - await self.client.disconnect() - - async def advance(self): address_array = [ bytearray(a['address'].encode()) for a in await self.service.db.get_all_addresses() @@ -194,6 +144,98 @@ class FastSync(Sync): tx = Transaction(unhexlify(txs[txid])) await self.service.db.insert_transaction(tx) + async def get_filters(self, start_height, end_height, granularity): + return await self.client.address_filter( + start_height=start_height, end_height=end_height, granularity=granularity + ) + + +class BlockHeaderManager: + """ + Efficient on-demand block header access. + Stores and retrieves from local db what it previously downloaded and + downloads on-demand what it doesn't have from full node. + """ + + def __init__(self, db: Database, client: APIClient): + self.db = db + self.client = client + self.cache = {} + + async def download(self): + our_height = await self.db.get_best_block_height() + best_height = await self.client.block_tip() + for block in await self.client.block_list(our_height+1, best_height): + await self.db.insert_block(Block( + height=block["height"], + version=0, + file_number=0, + block_hash=block["block_hash"], + prev_block_hash=block["previous_hash"], + merkle_root=block["merkle_root"], + claim_trie_root=block["claim_trie_root"], + timestamp=block["timestamp"], + bits=block["bits"], + nonce=block["nonce"], + txs=[] + )) + + async def get_header(self, height): + blocks = await self.client.first.block_list(height=height) + if blocks: + return blocks[0] + + +class FastSync(Sync): + + def __init__(self, service: Service, client: APIClient): + super().__init__(service.ledger, service.db) + self.service = service + self.client = client + self.advance_loop_task: Optional[asyncio.Task] = None + self.on_block = client.get_event_stream('blockchain.block') + self.on_block_event = asyncio.Event() + self.on_block_subscription: Optional[BroadcastSubscription] = None + self.blocks = BlockHeaderManager(self.db, self.client) + self.filters = FilterManager(self.db, self.client) + + async def get_block_headers(self, start_height: int, end_height: int = None): + return await self.client.block_list(start_height, end_height) + + async def get_best_block_height(self) -> int: + return await self.client.block_tip() + + async def start(self): + self.advance_loop_task = asyncio.create_task(self.advance()) + await self.advance_loop_task + self.advance_loop_task = asyncio.create_task(self.loop()) + self.on_block_subscription = self.on_block.listen( + lambda e: self.on_block_event.set() + ) + + async def stop(self): + for task in (self.on_block_subscription, self.advance_loop_task): + if task is not None: + task.cancel() + + async def advance(self): + await asyncio.wait([ + self.blocks.download(), + self.filters.download() + ]) + + async def loop(self): + while True: + try: + await self.on_block_event.wait() + self.on_block_event.clear() + await self.advance() + except asyncio.CancelledError: + return + except Exception as e: + log.exception(e) + await self.stop() + # async def get_local_status_and_history(self, address, history=None): # if not history: # address_details = await self.db.get_address(address=address) diff --git a/lbry/testcase.py b/lbry/testcase.py index 0e3b181cb..03f0ea45f 100644 --- a/lbry/testcase.py +++ b/lbry/testcase.py @@ -810,6 +810,9 @@ class CommandTestCase(IntegrationTestCase): async def file_list(self, *args, **kwargs): return (await self.out(self.api.file_list(*args, **kwargs)))['items'] + async def utxo_list(self, *args, **kwargs): + return (await self.out(self.api.utxo_list(*args, **kwargs)))['items'] + async def txo_list(self, *args, **kwargs): return (await self.out(self.api.txo_list(*args, **kwargs)))['items'] @@ -834,6 +837,9 @@ class CommandTestCase(IntegrationTestCase): async def collection_resolve(self, *args, **kwargs): return (await self.out(self.api.collection_resolve(*args, **kwargs)))['items'] + async def support_list(self, *args, **kwargs): + return (await self.out(self.api.support_list(*args, **kwargs)))['items'] + async def transaction_list(self, *args, **kwargs): return (await self.out(self.api.transaction_list(*args, **kwargs)))['items'] diff --git a/lbry/wallet/account.py b/lbry/wallet/account.py index ce6508958..d91faec48 100644 --- a/lbry/wallet/account.py +++ b/lbry/wallet/account.py @@ -458,23 +458,6 @@ class Account: self._channel_keys_deserialized[channel_pubkey_hash] = private_key return private_key - async def maybe_migrate_certificates(self): - def to_der(private_key_pem): - return ecdsa.SigningKey.from_pem(private_key_pem, hashfunc=sha256).get_verifying_key().to_der() - - if not self.channel_keys: - return - channel_keys = {} - for private_key_pem in self.channel_keys.values(): - if not isinstance(private_key_pem, str): - continue - if "-----BEGIN EC PRIVATE KEY-----" not in private_key_pem: - continue - public_key_der = await asyncio.get_running_loop().run_in_executor(None, to_der, private_key_pem) - channel_keys[self.ledger.public_key_to_address(public_key_der)] = private_key_pem - if self.channel_keys != channel_keys: - self.channel_keys = channel_keys - async def save_max_gap(self): gap_changed = False if issubclass(self.address_generator, HierarchicalDeterministic): diff --git a/lbry/wallet/wallet.py b/lbry/wallet/wallet.py index 268d405a6..bb30bef47 100644 --- a/lbry/wallet/wallet.py +++ b/lbry/wallet/wallet.py @@ -572,7 +572,7 @@ class ChannelListManager(ClaimListManager): async def create( self, name: str, amount: int, holding_account: Account, - funding_accounts: List[Account], save_key=True, **kwargs + funding_accounts: List[Account], change_account: Account, save_key=True, **kwargs ) -> Transaction: holding_address = await holding_account.receiving.get_or_create_usable_address() @@ -586,7 +586,7 @@ class ChannelListManager(ClaimListManager): await txo.generate_channel_private_key() tx = await self.wallet.create_transaction( - [], [txo], funding_accounts, funding_accounts[0] + [], [txo], funding_accounts, change_account ) await self.wallet.sign(tx) diff --git a/tests/integration/commands/__init__.py b/tests/integration/commands/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/integration/commands/test_account.py b/tests/integration/commands/test_account.py index 5617e8254..d7f3633e3 100644 --- a/tests/integration/commands/test_account.py +++ b/tests/integration/commands/test_account.py @@ -65,79 +65,63 @@ class AccountManagement(CommandTestCase): self.assertEqual(len(accounts), 1) self.assertEqual(accounts[0]['name'], 'recreated account') - async def test_wallet_migration(self): - # null certificates should get deleted - await self.channel_create('@foo1') - await self.channel_create('@foo2') - await self.channel_create('@foo3') - keys = list(self.account.channel_keys.keys()) - self.account.channel_keys[keys[0]] = None - self.account.channel_keys[keys[1]] = "some invalid junk" - await self.account.maybe_migrate_certificates() - self.assertEqual(list(self.account.channel_keys.keys()), [keys[2]]) - async def assertFindsClaims(self, claim_names, awaitable): - self.assertEqual(claim_names, [txo.claim_name for txo in (await awaitable)['items']]) + self.assertEqual(claim_names, [txo["name"] for txo in await awaitable]) async def assertOutputAmount(self, amounts, awaitable): - self.assertEqual(amounts, [dewies_to_lbc(txo.amount) for txo in (await awaitable)['items']]) + self.assertEqual(amounts, [txo["amount"] for txo in await awaitable]) async def test_commands_across_accounts(self): - channel_list = self.daemon.jsonrpc_channel_list - stream_list = self.daemon.jsonrpc_stream_list - support_list = self.daemon.jsonrpc_support_list - utxo_list = self.daemon.jsonrpc_utxo_list - default_account = self.wallet.default_account - second_account = await self.daemon.jsonrpc_account_create('second account') + account1 = self.wallet.accounts.default.id + account2 = (await self.account_create('second account'))["id"] - tx = await self.daemon.jsonrpc_account_send( - '0.05', await self.daemon.jsonrpc_address_unused(account_id=second_account.id) - ) - await self.confirm_tx(tx.id) - await self.assertOutputAmount(['0.05', '9.949876'], utxo_list()) - await self.assertOutputAmount(['0.05'], utxo_list(account_id=second_account.id)) - await self.assertOutputAmount(['9.949876'], utxo_list(account_id=default_account.id)) + address2 = await self.address_unused(account2) + await self.wallet_send('0.05', address2, fund_account_id=self.account.id) + await self.generate(1) + await self.assertOutputAmount(['0.05', '9.949876'], self.utxo_list()) + await self.assertOutputAmount(['9.949876'], self.utxo_list(account_id=account1)) + await self.assertOutputAmount(['0.05'], self.utxo_list(account_id=account2)) channel1 = await self.channel_create('@channel-in-account1', '0.01') channel2 = await self.channel_create( - '@channel-in-account2', '0.01', account_id=second_account.id, funding_account_ids=[default_account.id] + '@channel-in-account2', '0.01', account_id=account2, fund_account_id=[account1] ) - await self.assertFindsClaims(['@channel-in-account2', '@channel-in-account1'], channel_list()) - await self.assertFindsClaims(['@channel-in-account1'], channel_list(account_id=default_account.id)) - await self.assertFindsClaims(['@channel-in-account2'], channel_list(account_id=second_account.id)) + await self.assertFindsClaims(['@channel-in-account2', '@channel-in-account1'], self.channel_list()) + await self.assertFindsClaims(['@channel-in-account1'], self.channel_list(account_id=account1)) + await self.assertFindsClaims(['@channel-in-account2'], self.channel_list(account_id=account2)) stream1 = await self.stream_create('stream-in-account1', '0.01', channel_id=self.get_claim_id(channel1)) stream2 = await self.stream_create( 'stream-in-account2', '0.01', channel_id=self.get_claim_id(channel2), - account_id=second_account.id, funding_account_ids=[default_account.id] + account_id=account2, fund_account_id=[account1] ) - await self.assertFindsClaims(['stream-in-account2', 'stream-in-account1'], stream_list()) - await self.assertFindsClaims(['stream-in-account1'], stream_list(account_id=default_account.id)) - await self.assertFindsClaims(['stream-in-account2'], stream_list(account_id=second_account.id)) + await self.assertFindsClaims(['stream-in-account2', 'stream-in-account1'], self.stream_list()) + await self.assertFindsClaims(['stream-in-account1'], self.stream_list(account_id=account1)) + await self.assertFindsClaims(['stream-in-account2'], self.stream_list(account_id=account2)) await self.assertFindsClaims( ['stream-in-account2', 'stream-in-account1', '@channel-in-account2', '@channel-in-account1'], - self.daemon.jsonrpc_claim_list() + self.claim_list() ) await self.assertFindsClaims( ['stream-in-account1', '@channel-in-account1'], - self.daemon.jsonrpc_claim_list(account_id=default_account.id) + self.claim_list(account_id=account1) ) await self.assertFindsClaims( ['stream-in-account2', '@channel-in-account2'], - self.daemon.jsonrpc_claim_list(account_id=second_account.id) + self.claim_list(account_id=account2) ) support1 = await self.support_create(self.get_claim_id(stream1), '0.01') support2 = await self.support_create( - self.get_claim_id(stream2), '0.01', account_id=second_account.id, funding_account_ids=[default_account.id] + self.get_claim_id(stream2), '0.01', account_id=account2, fund_account_id=[account1] ) - self.assertEqual([support2['txid'], support1['txid']], [txo.tx_ref.id for txo in (await support_list())['items']]) - self.assertEqual([support1['txid']], [txo.tx_ref.id for txo in (await support_list(account_id=default_account.id))['items']]) - self.assertEqual([support2['txid']], [txo.tx_ref.id for txo in (await support_list(account_id=second_account.id))['items']]) + self.assertEqual([support2['txid'], support1['txid']], [txo['txid'] for txo in await self.support_list()]) + self.assertEqual([support1['txid']], [txo['txid'] for txo in await self.support_list(account_id=account1)]) + self.assertEqual([support2['txid']], [txo['txid'] for txo in await self.support_list(account_id=account2)]) - history = await self.daemon.jsonrpc_transaction_list() + history = await self.transaction_list() self.assertItemCount(history, 8) history = history['items'] self.assertEqual(extract(history[0]['support_info'][0], ['claim_name', 'is_tip', 'amount', 'balance_delta']), {