diff --git a/.gitignore b/.gitignore index ab1320464..8c1c0f33d 100644 --- a/.gitignore +++ b/.gitignore @@ -11,7 +11,7 @@ lbry.egg-info __pycache__ _trial_temp/ -/tests/integration/blockchain/files +/tests/integration/commands/files /tests/.coverage.* /lbry/blockchain/bin diff --git a/lbry/db/full_text_search.py b/lbry/db/full_text_search.py deleted file mode 100644 index 3f82fbf6d..000000000 --- a/lbry/db/full_text_search.py +++ /dev/null @@ -1,52 +0,0 @@ -from lbry.wallet.database import constraints_to_sql - -CREATE_FULL_TEXT_SEARCH = """ -create virtual table if not exists search using fts5( - claim_name, channel_name, title, description, author, tags, - content=claim, tokenize=porter -); -""" - -FTS_ORDER_BY = "bm25(search, 4.0, 8.0, 1.0, 0.5, 1.0, 0.5)" - - -def fts_action_sql(claims=None, action='insert'): - select = { - 'rowid': "claim.rowid", - 'claim_name': "claim.normalized", - 'channel_name': "channel.normalized", - 'title': "claim.title", - 'description': "claim.description", - 'author': "claim.author", - 'tags': "(select group_concat(tag, ' ') from tag where tag.claim_hash=claim.claim_hash)" - } - if action == 'delete': - select['search'] = '"delete"' - - where, values = "", {} - if claims: - where, values = constraints_to_sql({'claim.claim_hash__in': claims}) - where = 'WHERE '+where - - return f""" - INSERT INTO search ({','.join(select.keys())}) - SELECT {','.join(select.values())} FROM claim - LEFT JOIN claim as channel ON (claim.channel_hash=channel.claim_hash) {where} - """, values - - -def update_full_text_search(action, outputs, db, is_first_sync): - if is_first_sync: - return - if not outputs: - return - if action in ("before-delete", "before-update"): - db.execute(*fts_action_sql(outputs, 'delete')) - elif action in ("after-insert", "after-update"): - db.execute(*fts_action_sql(outputs, 'insert')) - else: - raise ValueError(f"Invalid action for updating full text search: '{action}'") - - -def first_sync_finished(db): - db.execute(*fts_action_sql()) diff --git a/lbry/db/queries/search.py b/lbry/db/queries/search.py index 10eda7f29..0859c51b1 100644 --- a/lbry/db/queries/search.py +++ b/lbry/db/queries/search.py @@ -191,16 +191,16 @@ def select_claims(cols: List = None, for_count=False, **constraints) -> Select: if 'public_key_id' in constraints: constraints['public_key_hash'] = ( context().ledger.address_to_hash160(constraints.pop('public_key_id'))) - if 'channel_hash' in constraints: - constraints['channel_hash'] = constraints.pop('channel_hash') - if 'channel_ids' in constraints: - channel_ids = constraints.pop('channel_ids') - if channel_ids: + if 'channel_id' in constraints: + channel_id = constraints.pop('channel_id') + if channel_id: + if isinstance(channel_id, str): + channel_id = [channel_id] constraints['channel_hash__in'] = { - unhexlify(cid)[::-1] for cid in channel_ids + unhexlify(cid)[::-1] for cid in channel_id } - if 'not_channel_ids' in constraints: - not_channel_ids = constraints.pop('not_channel_ids') + if 'not_channel_id' in constraints: + not_channel_ids = constraints.pop('not_channel_id') if not_channel_ids: not_channel_ids_binary = { unhexlify(ncid)[::-1] for ncid in not_channel_ids @@ -213,17 +213,18 @@ def select_claims(cols: List = None, for_count=False, **constraints) -> Select: 'signature_valid__is_null': True, 'channel_hash__not_in': not_channel_ids_binary } - if 'signature_valid' in constraints: + if 'is_signature_valid' in constraints: has_channel_signature = constraints.pop('has_channel_signature', False) + is_signature_valid = constraints.pop('is_signature_valid') if has_channel_signature: - constraints['signature_valid'] = constraints.pop('signature_valid') + constraints['is_signature_valid'] = is_signature_valid else: constraints['null_or_signature__or'] = { - 'signature_valid__is_null': True, - 'signature_valid': constraints.pop('signature_valid') + 'is_signature_valid__is_null': True, + 'is_signature_valid': is_signature_valid } elif constraints.pop('has_channel_signature', False): - constraints['signature_valid__is_not_null'] = True + constraints['is_signature_valid__is_not_null'] = True if 'txid' in constraints: tx_hash = unhexlify(constraints.pop('txid'))[::-1] @@ -261,7 +262,7 @@ def select_claims(cols: List = None, for_count=False, **constraints) -> Select: constraints["search"] = constraints.pop("text") return query( - [Claim], + [Claim, TXO], select(*cols) .select_from( Claim.join(TXO).join(TX) @@ -276,18 +277,47 @@ def protobuf_search_claims(**constraints) -> str: def search_claims(**constraints) -> Tuple[List[Output], Optional[int], Optional[Censor]]: + ctx = context() + search_censor = ctx.get_search_censor() + total = None if constraints.pop('include_total', False): total = search_claim_count(**constraints) + constraints['offset'] = abs(constraints.get('offset', 0)) constraints['limit'] = min(abs(constraints.get('limit', 10)), 50) - ctx = context() - search_censor = ctx.get_search_censor() - rows = context().fetchall(select_claims(**constraints)) + + channel_url = constraints.pop('channel', None) + if channel_url: + from .resolve import resolve_url + channel = resolve_url(channel_url) + if isinstance(channel, Output): + constraints['channel_hash'] = channel.claim_hash + else: + return [], total, search_censor + + rows = ctx.fetchall(select_claims(**constraints)) txos = rows_to_txos(rows, include_tx=False) + annotate_with_channels(txos) return txos, total, search_censor +def annotate_with_channels(txos): + channel_hashes = set() + for txo in txos: + if txo.can_decode_claim and txo.claim.is_signed: + channel_hashes.add(txo.claim.signing_channel_hash) + if channel_hashes: + rows = context().fetchall(select_claims(claim_hash__in=channel_hashes)) + channels = { + txo.claim_hash: txo for txo in + rows_to_txos(rows, include_tx=False) + } + for txo in txos: + if txo.can_decode_claim and txo.claim.is_signed: + txo.channel = channels.get(txo.claim.signing_channel_hash, None) + + def search_claim_count(**constraints) -> int: constraints.pop('offset', None) constraints.pop('limit', None) @@ -305,9 +335,9 @@ END def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_count=False): - any_items = set(cleaner(constraints.pop(f'any_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]) - all_items = set(cleaner(constraints.pop(f'all_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]) - not_items = set(cleaner(constraints.pop(f'not_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]) + any_items = set(cleaner(constraints.pop(f'any_{attr}', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]) + all_items = set(cleaner(constraints.pop(f'all_{attr}', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]) + not_items = set(cleaner(constraints.pop(f'not_{attr}', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]) all_items = {item for item in all_items if item not in not_items} any_items = {item for item in any_items if item not in not_items} diff --git a/lbry/db/queries/txio.py b/lbry/db/queries/txio.py index c6634e3ed..2b7d422c6 100644 --- a/lbry/db/queries/txio.py +++ b/lbry/db/queries/txio.py @@ -2,7 +2,7 @@ import logging from datetime import date from typing import Tuple, List, Optional, Union -from sqlalchemy import union, func, text, between, distinct, case +from sqlalchemy import union, func, text, between, distinct, case, false from sqlalchemy.future import select, Select from ...blockchain.transaction import ( @@ -372,7 +372,7 @@ def select_txos( ) joins = TXO.join(TX) if constraints.pop('is_spent', None) is False: - s = s.where((TXO.c.spent_height == 0) & (TXO.c.is_reserved == False)) + s = s.where((TXO.c.spent_height == 0) & (TXO.c.is_reserved == false())) if include_is_my_input: joins = joins.join(TXI, (TXI.c.position == 0) & (TXI.c.tx_hash == TXO.c.tx_hash), isouter=True) if claim_id_not_in_claim_table: @@ -534,7 +534,7 @@ def get_balance(account_ids): else: txo_address_check = TXO.c.address.in_(my_addresses) txi_address_check = TXI.c.address.in_(my_addresses) - query = ( + s: Select = ( select( func.coalesce(func.sum(TXO.c.amount), 0).label("total"), func.coalesce(func.sum(case( @@ -557,7 +557,7 @@ def get_balance(account_ids): TXO.join(TXI, (TXI.c.position == 0) & (TXI.c.tx_hash == TXO.c.tx_hash), isouter=True) ) ) - result = ctx.fetchone(query) + result = ctx.fetchone(s) return { "total": result["total"], "available": result["total"] - result["reserved"], diff --git a/lbry/db/utils.py b/lbry/db/utils.py index 3f5d94006..690dc570d 100644 --- a/lbry/db/utils.py +++ b/lbry/db/utils.py @@ -1,7 +1,7 @@ from itertools import islice from typing import List, Union -from sqlalchemy import text, and_ +from sqlalchemy import text, and_, or_ from sqlalchemy.sql.expression import Select, FunctionElement from sqlalchemy.types import Numeric from sqlalchemy.ext.compiler import compiles @@ -98,9 +98,7 @@ def query(table, s: Select, **constraints) -> Select: s = s.where(in_account_ids(account_ids)) if constraints: - s = s.where( - constraints_to_clause(table, constraints) - ) + s = s.where(and_(*constraints_to_clause(table, constraints))) return s @@ -148,6 +146,9 @@ def constraints_to_clause(tables, constraints): raise ValueError(f"{col} requires a list, set or string as constraint value.") else: continue + elif key.endswith('__or'): + clause.append(or_(*constraints_to_clause(tables, constraint))) + continue else: col, op = key, '__eq__' attr = None @@ -170,4 +171,4 @@ def constraints_to_clause(tables, constraints): if attr is None: raise ValueError(f"Attribute '{col}' not found on tables: {', '.join([t.name for t in tables])}.") clause.append(getattr(attr, op)(constraint)) - return and_(*clause) + return clause diff --git a/lbry/service/api.py b/lbry/service/api.py index 287ad65a0..5e499e533 100644 --- a/lbry/service/api.py +++ b/lbry/service/api.py @@ -1098,8 +1098,9 @@ class API: raise ValueError("--outputs must be an integer.") if everything and outputs > 1: raise ValueError("Using --everything along with --outputs is not supported.") - return await from_account.fund( - to_account=to_account, amount=amount, everything=everything, + return await wallet.fund( + from_account=from_account, to_account=to_account, + amount=amount, everything=everything, outputs=outputs, broadcast=broadcast ) @@ -1518,7 +1519,7 @@ class API: claim_type: str = None, # claim type: channel, stream, repost, collection include_purchase_receipt=False, # lookup and include a receipt if this wallet has purchased the claim include_is_my_output=False, # lookup and include a boolean indicating if claim being resolved is yours - is_controlling=False, # winning claims of their respective name + is_controlling: bool = None, # winning claims of their respective name activation_height: int = None, # height at which claim starts competing for name # (supports equality constraints) expiration_height: int = None, # height at which claim will expire (supports equality constraints) @@ -1578,16 +1579,23 @@ class API: claim_filter_dict, kwargs = pop_kwargs('claim_filter', extract_claim_filter( **claim_filter_and_stream_filter_and_pagination_kwargs )) + stream_filter_dict, kwargs = pop_kwargs('stream_filter', extract_stream_filter(**kwargs)) pagination, kwargs = pop_kwargs('pagination', extract_pagination(**kwargs)) + assert_consumed_kwargs(kwargs) wallet = self.wallets.get_or_default(wallet_id) # if {'claim_id', 'claim_ids'}.issubset(kwargs): # raise ValueError("Only 'claim_id' or 'claim_ids' is allowed, not both.") -# if kwargs.pop('valid_channel_signature', False): -# kwargs['signature_valid'] = 1 -# if kwargs.pop('invalid_channel_signature', False): -# kwargs['signature_valid'] = 0 + if stream_filter_dict.pop('valid_channel_signature', False): + stream_filter_dict['is_signature_valid'] = True + if stream_filter_dict.pop('invalid_channel_signature', False): + stream_filter_dict['is_signature_valid'] = False + if is_controlling is not None: + claim_filter_dict["is_controlling"] = is_controlling + if public_key_id is not None: + claim_filter_dict["public_key_id"] = public_key_id page_num = abs(pagination['page'] or 1) page_size = min(abs(pagination['page_size'] or DEFAULT_PAGE_SIZE), 50) + claim_filter_dict.update(stream_filter_dict) claim_filter_dict.update({ 'offset': page_size * (page_num - 1), 'limit': page_size, 'include_total': pagination['include_total'], @@ -1701,31 +1709,18 @@ class API: {kwargs} """ - wallet = self.wallets.get_or_default(wallet_id) - assert not wallet.is_locked, "Cannot spend funds with locked wallet, unlock first." - if account_id: - account = wallet.get_account_or_error(account_id) - accounts = [account] - else: - account = wallet.default_account - accounts = wallet.accounts - - if txid is not None and nout is not None: - claims = await self.ledger.get_claims( - wallet=wallet, accounts=accounts, tx_hash=unhexlify(txid)[::-1], position=nout - ) - elif claim_id is not None: - claims = await self.ledger.get_claims( - wallet=wallet, accounts=accounts, claim_id=claim_id - ) - else: - raise Exception('Must specify claim_id, or txid and nout') - - if not claims: - raise Exception('No claim found for the specified claim_id or txid:nout') - - tx = await Transaction.create( - [Input.spend(txo) for txo in claims], [], [account], account + abandon_dict, kwargs = pop_kwargs('abandon', extract_abandon(**abandon_and_tx_kwargs)) + tx_dict, kwargs = pop_kwargs('tx', extract_tx(**kwargs)) + assert_consumed_kwargs(kwargs) + wallet = self.wallets.get_or_default_for_spending(tx_dict.pop('wallet_id')) + funding_accounts = wallet.accounts.get_or_all(tx_dict.pop('fund_account_id')) + change_account = wallet.accounts.get_or_default(tx_dict.pop('change_account_id')) + tx = await wallet.channels.delete( + claim_id=abandon_dict.pop('claim_id'), + txid=abandon_dict.pop('txid'), + nout=abandon_dict.pop('nout'), + funding_accounts=funding_accounts, + change_account=change_account ) await self.service.maybe_broadcast_or_release(tx, **tx_dict) return tx @@ -2078,34 +2073,20 @@ class API: {kwargs} """ - wallet = self.wallets.get_or_default(wallet_id) - assert not wallet.is_locked, "Cannot spend funds with locked wallet, unlock first." - if account_id: - account = wallet.get_account_or_error(account_id) - accounts = [account] - else: - account = wallet.default_account - accounts = wallet.accounts - - if txid is not None and nout is not None: - claims = await self.ledger.get_claims( - wallet=wallet, accounts=accounts, tx_hash=unhexlify(txid)[::-1], position=nout - ) - elif claim_id is not None: - claims = await self.ledger.get_claims( - wallet=wallet, accounts=accounts, claim_id=claim_id - ) - else: - raise Exception('Must specify claim_id, or txid and nout') - - if not claims: - raise Exception('No claim found for the specified claim_id or txid:nout') - - tx = await Transaction.create( - [Input.spend(txo) for txo in claims], [], accounts, account + abandon_dict, kwargs = pop_kwargs('abandon', extract_abandon(**abandon_and_tx_kwargs)) + tx_dict, kwargs = pop_kwargs('tx', extract_tx(**kwargs)) + assert_consumed_kwargs(kwargs) + wallet = self.wallets.get_or_default_for_spending(tx_dict.pop('wallet_id')) + funding_accounts = wallet.accounts.get_or_all(tx_dict.pop('fund_account_id')) + change_account = wallet.accounts.get_or_default(tx_dict.pop('change_account_id')) + tx = await wallet.streams.delete( + claim_id=abandon_dict.pop('claim_id'), + txid=abandon_dict.pop('txid'), + nout=abandon_dict.pop('nout'), + funding_accounts=funding_accounts, + change_account=change_account ) - - await self.service.maybe_broadcast_or_release(tx, tx_dict) + await self.service.maybe_broadcast_or_release(tx, **tx_dict) return tx async def stream_list( diff --git a/lbry/service/base.py b/lbry/service/base.py index 868a14a5d..a64704415 100644 --- a/lbry/service/base.py +++ b/lbry/service/base.py @@ -1,11 +1,9 @@ -import os import asyncio import logging -from typing import List, Optional, Tuple, NamedTuple, Dict +from typing import List, Optional, NamedTuple, Dict from lbry.db import Database, Result from lbry.db.constants import TXO_TYPES -from lbry.schema.result import Censor from lbry.blockchain.transaction import Transaction, Output from lbry.blockchain.ledger import Ledger from lbry.wallet import WalletManager diff --git a/lbry/service/daemon.py b/lbry/service/daemon.py index 140a55a6e..2a478c222 100644 --- a/lbry/service/daemon.py +++ b/lbry/service/daemon.py @@ -177,7 +177,8 @@ class Daemon: subscribers = self.app["subscriptions"][event_name]["subscribers"] subscribers.add(web_socket) - def broadcast_event(self, event_name, subscribers, payload): + @staticmethod + def broadcast_event(event_name, subscribers, payload): for web_socket in subscribers: asyncio.create_task(web_socket.send_json({ 'event': event_name, 'payload': payload diff --git a/lbry/service/full_endpoint.py b/lbry/service/full_endpoint.py index 811899b24..44ffded7e 100644 --- a/lbry/service/full_endpoint.py +++ b/lbry/service/full_endpoint.py @@ -1,10 +1,9 @@ import logging +from typing import Optional, List, Dict from binascii import hexlify, unhexlify -from lbry.blockchain.lbrycrd import Lbrycrd -from lbry.blockchain.sync import BlockchainSync -from lbry.blockchain.ledger import Ledger -from lbry.blockchain.transaction import Transaction +from lbry.blockchain import Ledger, Transaction +from lbry.event import BroadcastSubscription from .base import Service, Sync from .api import Client as APIClient @@ -24,27 +23,17 @@ class NoSync(Sync): self.on_mempool = client.get_event_stream('blockchain.mempool') self.on_mempool_subscription: Optional[BroadcastSubscription] = None - async def wait_for_client_ready(self): - await self.client.connect() - async def start(self): - self.db.stop_event.clear() - await self.wait_for_client_ready() - self.advance_loop_task = asyncio.create_task(self.advance()) - await self.advance_loop_task - await self.client.subscribe() - self.advance_loop_task = asyncio.create_task(self.advance_loop()) - self.on_block_subscription = self.on_block.listen( - lambda e: self.on_block_event.set() - ) - self.on_mempool_subscription = self.on_mempool.listen( - lambda e: self.on_mempool_event.set() - ) - await self.download_filters() - await self.download_headers() + pass async def stop(self): - await self.client.disconnect() + pass + + async def get_block_headers(self, start_height: int, end_height: int = None): + return await self.db.get_block_headers(start_height, end_height) + + async def get_best_block_height(self) -> int: + return await self.db.get_best_block_height() class FullEndpoint(Service): @@ -59,3 +48,36 @@ class FullEndpoint(Service): f"http://{ledger.conf.full_nodes[0][0]}:{ledger.conf.full_nodes[0][1]}/api" ) self.sync = NoSync(self, self.client) + + async def get_block_headers(self, first, last=None): + return await self.db.get_block_headers(first, last) + + async def get_address_filters(self, start_height: int, end_height: int = None, granularity: int = 0): + return await self.db.get_filters( + start_height=start_height, end_height=end_height, granularity=granularity + ) + + async def search_transactions(self, txids): + tx_hashes = [unhexlify(txid)[::-1] for txid in txids] + return { + hexlify(tx['tx_hash'][::-1]).decode(): hexlify(tx['raw']).decode() + for tx in await self.db.get_transactions(tx_hashes=tx_hashes) + } + + async def broadcast(self, tx): + pass + + async def wait(self, tx: Transaction, height=-1, timeout=1): + pass + + async def resolve(self, urls, **kwargs): + pass + + async def search_claims(self, accounts, **kwargs): + pass + + async def search_supports(self, accounts, **kwargs): + pass + + async def sum_supports(self, claim_hash: bytes, include_channel_content=False) -> List[Dict]: + return await self.db.sum_supports(claim_hash, include_channel_content) diff --git a/lbry/service/full_node.py b/lbry/service/full_node.py index 3eb341260..04a96a35f 100644 --- a/lbry/service/full_node.py +++ b/lbry/service/full_node.py @@ -40,7 +40,7 @@ class FullNode(Service): return 'everything is wonderful' async def get_block_headers(self, first, last=None): - return await self.db.get_blocks(first, last) + return await self.db.get_block_headers(first, last) async def get_address_filters(self, start_height: int, end_height: int = None, granularity: int = 0): return await self.db.get_filters( diff --git a/lbry/service/json_encoder.py b/lbry/service/json_encoder.py index 77dbdd03e..545daeeb9 100644 --- a/lbry/service/json_encoder.py +++ b/lbry/service/json_encoder.py @@ -300,13 +300,19 @@ class JSONResponseEncoder(JSONEncoder): output['purchase_receipt'] = self.encode_output(txo.purchase_receipt) if txo.claim.is_channel: output['has_signing_key'] = txo.has_private_key - if check_signature and txo.claim.is_signed: + if check_signature and txo.claim.is_signed and 'is_signature_valid' in txo.meta: if txo.channel is not None: output['signing_channel'] = self.encode_output(txo.channel) - output['is_channel_signature_valid'] = txo.is_signed_by(txo.channel, self.service.ledger) else: output['signing_channel'] = {'channel_id': txo.claim.signing_channel_id} - output['is_channel_signature_valid'] = False + output['is_channel_signature_valid'] = txo.meta.get('is_signature_valid', False) +# if check_signature and txo.claim.is_signed: +# if txo.channel is not None: +# output['signing_channel'] = self.encode_output(txo.channel) +# output['is_channel_signature_valid'] = txo.is_signed_by(txo.channel, self.service.ledger) +# else: +# output['signing_channel'] = {'channel_id': txo.claim.signing_channel_id} +# output['is_channel_signature_valid'] = txo.meta.get('is_signature_valid', False) except DecodeError: pass return output diff --git a/lbry/service/light_client.py b/lbry/service/light_client.py index 5f382e6d6..7c6936d27 100644 --- a/lbry/service/light_client.py +++ b/lbry/service/light_client.py @@ -1,19 +1,11 @@ import asyncio import logging -from typing import List, Dict -#from io import StringIO -#from functools import partial -#from operator import itemgetter -from collections import defaultdict -#from binascii import hexlify, unhexlify -from typing import List, Optional, DefaultDict, NamedTuple +from typing import Dict +from typing import List, Optional, NamedTuple +from binascii import unhexlify -#from lbry.crypto.hash import double_sha256, sha256 - -from lbry.tasks import TaskGroup -from lbry.blockchain import Transaction from lbry.blockchain.block import Block, get_address_filter -from lbry.event import BroadcastSubscription, EventController +from lbry.event import BroadcastSubscription from lbry.wallet.account import AddressManager from lbry.blockchain import Ledger, Transaction from lbry.db import Database @@ -119,30 +111,31 @@ class FilterManager: async def download(self): filters_response = await self.client.get_address_filters(0, 500) filters = await filters_response.first + address = None address_array = [bytearray(self.client.ledger.address_to_hash160(address))] - for filter in filters: - print(filter) - filter = get_address_filter(unhexlify(filter['filter'])) - print(filter.MatchAny(address_array)) + for address_filter in filters: + print(address_filter) + address_filter = get_address_filter(unhexlify(address_filter['filter'])) + print(address_filter.MatchAny(address_array)) - address_array = [ - bytearray(a['address'].encode()) - for a in await self.service.db.get_all_addresses() - ] - block_filters = await self.service.get_block_address_filters() - for block_hash, block_filter in block_filters.items(): - bf = get_address_filter(block_filter) - if bf.MatchAny(address_array): - print(f'match: {block_hash} - {block_filter}') - tx_filters = await self.service.get_transaction_address_filters(block_hash=block_hash) - for txid, tx_filter in tx_filters.items(): - tf = get_address_filter(tx_filter) - if tf.MatchAny(address_array): - print(f' match: {txid} - {tx_filter}') - txs = await self.service.search_transactions([txid]) - tx = Transaction(unhexlify(txs[txid])) - await self.service.db.insert_transaction(tx) +# address_array = [ +# bytearray(a['address'].encode()) +# for a in await self.service.db.get_all_addresses() +# ] +# block_filters = await self.service.get_block_address_filters() +# for block_hash, block_filter in block_filters.items(): +# bf = get_address_filter(block_filter) +# if bf.MatchAny(address_array): +# print(f'match: {block_hash} - {block_filter}') +# tx_filters = await self.service.get_transaction_address_filters(block_hash=block_hash) +# for txid, tx_filter in tx_filters.items(): +# tf = get_address_filter(tx_filter) +# if tf.MatchAny(address_array): +# print(f' match: {txid} - {tx_filter}') +# txs = await self.service.search_transactions([txid]) +# tx = Transaction(unhexlify(txs[txid])) +# await self.service.db.insert_transaction(tx) async def get_filters(self, start_height, end_height, granularity): return await self.client.address_filter( @@ -537,4 +530,4 @@ class FastSync(Sync): # for account in self.accounts: # if account.id == details['account']: # return account.address_managers[details['chain']] - # return None \ No newline at end of file + # return None diff --git a/lbry/testcase.py b/lbry/testcase.py index 03f0ea45f..91868f179 100644 --- a/lbry/testcase.py +++ b/lbry/testcase.py @@ -515,6 +515,9 @@ class IntegrationTestCase(AsyncioTestCase): lambda e: e.tx.hash == tx_hash ) + async def on_transaction_dict(self, tx): + await self.service.wait(Transaction(unhexlify(tx['hex']))) + def on_address_update(self, address): return self.ledger.on_transaction.where( lambda e: e.address == address @@ -605,9 +608,6 @@ class CommandTestCase(IntegrationTestCase): await self.on_transaction_id(txid, ledger) return txid - async def on_transaction_dict(self, tx): - await self.ledger.wait(Transaction(unhexlify(tx['hex']))) - @staticmethod def get_all_addresses(tx): addresses = set() @@ -672,9 +672,6 @@ class CommandTestCase(IntegrationTestCase): async def account_remove(self, *args, **kwargs): return await self.out(self.api.account_remove(*args, **kwargs)) - async def account_send(self, *args, **kwargs): - return await self.out(self.api.account_send(*args, **kwargs)) - async def account_balance(self, *args, **kwargs): return await self.out(self.api.account_balance(*args, **kwargs)) @@ -716,8 +713,6 @@ class CommandTestCase(IntegrationTestCase): ) async def stream_abandon(self, *args, confirm=True, **kwargs): - if 'blocking' not in kwargs: - kwargs['blocking'] = False return await self.confirm_and_render( self.api.stream_abandon(*args, **kwargs), confirm ) @@ -743,8 +738,6 @@ class CommandTestCase(IntegrationTestCase): ) async def channel_abandon(self, *args, confirm=True, **kwargs): - if 'blocking' not in kwargs: - kwargs['blocking'] = False return await self.confirm_and_render( self.api.channel_abandon(*args, **kwargs), confirm ) @@ -762,8 +755,6 @@ class CommandTestCase(IntegrationTestCase): ) async def collection_abandon(self, *args, confirm=True, **kwargs): - if 'blocking' not in kwargs: - kwargs['blocking'] = False return await self.confirm_and_render( self.api.stream_abandon(*args, **kwargs), confirm ) @@ -783,11 +774,6 @@ class CommandTestCase(IntegrationTestCase): self.api.account_fund(*args, **kwargs), confirm ) - async def account_send(self, *args, confirm=True, **kwargs): - return await self.confirm_and_render( - self.api.account_send(*args, **kwargs), confirm - ) - async def wallet_send(self, *args, confirm=True, **kwargs): return await self.confirm_and_render( self.api.wallet_send(*args, **kwargs), confirm diff --git a/lbry/wallet/account.py b/lbry/wallet/account.py index d91faec48..1db875393 100644 --- a/lbry/wallet/account.py +++ b/lbry/wallet/account.py @@ -4,14 +4,13 @@ import json import logging import asyncio import random -from functools import partial from hashlib import sha256 from typing import Type, Dict, Tuple, Optional, Any, List import ecdsa from lbry.constants import COIN -from lbry.db import Database, CLAIM_TYPE_CODES, TXO_TYPES +from lbry.db import Database from lbry.db.tables import AccountAddress from lbry.blockchain import Ledger from lbry.error import InvalidPasswordError diff --git a/lbry/wallet/manager.py b/lbry/wallet/manager.py index 0a8317f1a..3292be05c 100644 --- a/lbry/wallet/manager.py +++ b/lbry/wallet/manager.py @@ -6,6 +6,7 @@ import logging from typing import Optional, Dict from lbry.db import Database +from lbry.blockchain.dewies import dict_values_to_lbc from .wallet import Wallet from .account import SingleKey, HierarchicalDeterministic @@ -106,21 +107,22 @@ class WalletManager: async def _report_state(self): try: - for account in self.accounts: - balance = dewies_to_lbc(await account.get_balance(include_claims=True)) - _, channel_count = await account.get_channels(limit=1) - claim_count = await account.get_claim_count() - if isinstance(account.receiving, SingleKey): - log.info("Loaded single key account %s with %s LBC. " - "%d channels, %d certificates and %d claims", - account.id, balance, channel_count, len(account.channel_keys), claim_count) - else: - total_receiving = len(await account.receiving.get_addresses()) - total_change = len(await account.change.get_addresses()) - log.info("Loaded account %s with %s LBC, %d receiving addresses (gap: %d), " - "%d change addresses (gap: %d), %d channels, %d certificates and %d claims. ", - account.id, balance, total_receiving, account.receiving.gap, total_change, - account.change.gap, channel_count, len(account.channel_keys), claim_count) + for wallet in self.wallets.values(): + for account in wallet.accounts: + balance = dict_values_to_lbc(await account.get_balance(include_claims=True)) + _, channel_count = await account.get_channels(limit=1) + claim_count = await account.get_claim_count() + if isinstance(account.receiving, SingleKey): + log.info("Loaded single key account %s with %s LBC. " + "%d channels, %d certificates and %d claims", + account.id, balance, channel_count, len(account.channel_keys), claim_count) + else: + total_receiving = len(await account.receiving.get_addresses()) + total_change = len(await account.change.get_addresses()) + log.info("Loaded account %s with %s LBC, %d receiving addresses (gap: %d), " + "%d change addresses (gap: %d), %d channels, %d certificates and %d claims. ", + account.id, balance, total_receiving, account.receiving.gap, total_change, + account.change.gap, channel_count, len(account.channel_keys), claim_count) except Exception as err: if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8 raise @@ -135,7 +137,7 @@ class WalletStorage: async def prepare(self): raise NotImplementedError - async def exists(self, walllet_id: str) -> bool: + async def exists(self, wallet_id: str) -> bool: raise NotImplementedError async def get(self, wallet_id: str) -> Wallet: diff --git a/lbry/wallet/wallet.py b/lbry/wallet/wallet.py index 6ff66648a..c4c58c1ab 100644 --- a/lbry/wallet/wallet.py +++ b/lbry/wallet/wallet.py @@ -8,6 +8,7 @@ from typing import Awaitable, Callable, List, Tuple, Optional, Iterable, Union from hashlib import sha256 from operator import attrgetter from decimal import Decimal +from binascii import unhexlify from lbry.db import Database, SPENDABLE_TYPE_CODES, Result from lbry.event import EventController @@ -518,9 +519,10 @@ class ClaimListManager(BaseListManager): return tx async def update( - self, previous_claim: Output, claim: Claim, amount: int, holding_address: str, - funding_accounts: List[Account], change_account: Account, - signing_channel: Output = None) -> Transaction: + self, previous_claim: Output, claim: Claim, amount: int, holding_address: str, + funding_accounts: List[Account], change_account: Account, + signing_channel: Output = None + ) -> Transaction: updated_claim = Output.pay_update_claim_pubkey_hash( amount, previous_claim.claim_name, previous_claim.claim_id, claim, self.wallet.ledger.address_to_hash160(holding_address) @@ -533,18 +535,27 @@ class ClaimListManager(BaseListManager): [Input.spend(previous_claim)], [updated_claim], funding_accounts, change_account ) - async def delete(self, claim_id=None, txid=None, nout=None): + async def delete( + self, claim_id=None, txid=None, nout=None, + funding_accounts: List[Account] = None, change_account: Account = None + ): claim = await self.get(claim_id=claim_id, txid=txid, nout=nout) - return await self.wallet.create_transaction( - [Input.spend(claim)], [], self.wallet._accounts, self.wallet._accounts[0] + tx = await self.wallet.create_transaction( + [Input.spend(claim)], [], + funding_accounts or self.wallet._accounts, + change_account or self.wallet._accounts[0] ) + await self.wallet.sign(tx) + return tx async def list(self, **constraints) -> Result[Output]: return await self.wallet.db.get_claims(wallet=self.wallet, **constraints) async def get(self, claim_id=None, claim_name=None, txid=None, nout=None) -> Output: if txid is not None and nout is not None: - key, value, constraints = 'txid:nout', f'{txid}:{nout}', {'tx_hash': '', 'position': nout} + key, value, constraints = 'txid:nout', f'{txid}:{nout}', { + 'tx_hash': unhexlify(txid)[::-1], 'position': nout + } elif claim_id is not None: key, value, constraints = 'id', claim_id, {'claim_id': claim_id} elif claim_name is not None: diff --git a/tests/integration/blockchain/test_claim_commands.py b/tests/integration/commands/test_claim_commands.py similarity index 96% rename from tests/integration/blockchain/test_claim_commands.py rename to tests/integration/commands/test_claim_commands.py index 411911f22..4b34a58e7 100644 --- a/tests/integration/blockchain/test_claim_commands.py +++ b/tests/integration/commands/test_claim_commands.py @@ -2,6 +2,7 @@ import os.path import tempfile import logging import asyncio +from unittest import skip from binascii import unhexlify from urllib.request import urlopen @@ -71,6 +72,7 @@ class ClaimSearchCommand(ClaimTestCase): (result['txid'], result['claim_id']) ) + @skip async def test_disconnect_on_memory_error(self): claim_ids = [ '0000000000000000000000000000000000000000', @@ -116,9 +118,9 @@ class ClaimSearchCommand(ClaimTestCase): # finding claims with and without a channel await self.assertFindsClaims([signed2, signed], name='on-channel-claim') - await self.assertFindsClaims([signed2, signed], channel_ids=[self.channel_id, channel_id2]) - await self.assertFindsClaim(signed, name='on-channel-claim', channel_ids=[self.channel_id]) - await self.assertFindsClaim(signed2, name='on-channel-claim', channel_ids=[channel_id2]) + await self.assertFindsClaims([signed2, signed], channel_id=[self.channel_id, channel_id2]) + await self.assertFindsClaim(signed, name='on-channel-claim', channel_id=[self.channel_id]) + await self.assertFindsClaim(signed2, name='on-channel-claim', channel_id=[channel_id2]) await self.assertFindsClaim(unsigned, name='unsigned') await self.assertFindsClaim(unsigned, txid=unsigned['txid'], nout=0) await self.assertFindsClaim(unsigned, claim_id=self.get_claim_id(unsigned)) @@ -128,37 +130,44 @@ class ClaimSearchCommand(ClaimTestCase): # three streams in channel, zero streams in abandoned channel claims = [three, two, signed] - await self.assertFindsClaims(claims, channel_ids=[self.channel_id]) + await self.assertFindsClaims(claims, channel_id=[self.channel_id]) await self.assertFindsClaims(claims, channel=f"@abc#{self.channel_id}") - await self.assertFindsClaims([three, two, signed2, signed], channel_ids=[channel_id2, self.channel_id]) + await self.assertFindsClaims([three, two, signed2, signed], channel_id=[channel_id2, self.channel_id]) await self.channel_abandon(claim_id=self.channel_id) await self.assertFindsClaims([], channel=f"@abc#{self.channel_id}", valid_channel_signature=True) - await self.assertFindsClaims([], channel_ids=[self.channel_id], valid_channel_signature=True) - await self.assertFindsClaims([signed2], channel_ids=[channel_id2], valid_channel_signature=True) + await self.assertFindsClaims([signed2], channel_id=[channel_id2], valid_channel_signature=True) # pass `invalid_channel_signature=False` to catch a bug in argument processing - await self.assertFindsClaims([signed2], channel_ids=[channel_id2, self.channel_id], + await self.assertFindsClaims([signed2], channel_id=[channel_id2], valid_channel_signature=True, invalid_channel_signature=False) + # in old SDK abandoned channels caused content to have invalid signature, + # in new SDK this is not the case + # TODO: create situation where streams legitimately have invalid signature, harder in new SDK + # await self.assertFindsClaims([], channel_id=[self.channel_id], valid_channel_signature=True) # invalid signature still returns channel_id - invalid_claims = await self.claim_search(invalid_channel_signature=True, has_channel_signature=True) - self.assertEqual(3, len(invalid_claims)) - self.assertTrue(all([not c['is_channel_signature_valid'] for c in invalid_claims])) - self.assertEqual({'channel_id': self.channel_id}, invalid_claims[0]['signing_channel']) + #invalid_claims = await self.claim_search(invalid_channel_signature=True, has_channel_signature=True) + #self.assertEqual(3, len(invalid_claims)) + #self.assertTrue(all([not c['is_channel_signature_valid'] for c in invalid_claims])) + #self.assertEqual({'channel_id': self.channel_id}, invalid_claims[0]['signing_channel']) + + self.assertEqual( + 0, len(await self.claim_search(invalid_channel_signature=True, has_channel_signature=True)) + ) valid_claims = await self.claim_search(valid_channel_signature=True, has_channel_signature=True) - self.assertEqual(1, len(valid_claims)) + self.assertEqual(4, len(valid_claims)) self.assertTrue(all([c['is_channel_signature_valid'] for c in valid_claims])) - self.assertEqual('@abc', valid_claims[0]['signing_channel']['name']) + self.assertEqual('@abc', valid_claims[1]['signing_channel']['name']) # abandoned stream won't show up for streams in channel search await self.stream_abandon(txid=signed2['txid'], nout=0) - await self.assertFindsClaims([], channel_ids=[channel_id2]) + await self.assertFindsClaims([], channel_id=[channel_id2]) async def test_pagination(self): await self.create_channel() await self.create_lots_of_streams() # with and without totals - results = await self.api.claim_search(include_totals=True) + results = await self.api.claim_search(include_total=True) self.assertEqual(results['total_pages'], 2) self.assertEqual(results['total_items'], 25) results = await self.api.claim_search() @@ -194,40 +203,40 @@ class ClaimSearchCommand(ClaimTestCase): self.assertEqual(out_of_bounds, []) async def test_tag_search(self): - claim1 = await self.stream_create('claim1', tags=['aBc']) - claim2 = await self.stream_create('claim2', tags=['#abc', 'def']) - claim3 = await self.stream_create('claim3', tags=['abc', 'ghi', 'jkl']) - claim4 = await self.stream_create('claim4', tags=['abc\t', 'ghi', 'mno']) - claim5 = await self.stream_create('claim5', tags=['pqr']) + claim1 = await self.stream_create('claim1', tag=['aBc']) + claim2 = await self.stream_create('claim2', tag=['#abc', 'def']) + claim3 = await self.stream_create('claim3', tag=['abc', 'ghi', 'jkl']) + claim4 = await self.stream_create('claim4', tag=['abc\t', 'ghi', 'mno']) + claim5 = await self.stream_create('claim5', tag=['pqr']) # any_tags - await self.assertFindsClaims([claim5, claim4, claim3, claim2, claim1], any_tags=['\tabc', 'pqr']) - await self.assertFindsClaims([claim4, claim3, claim2, claim1], any_tags=['abc']) - await self.assertFindsClaims([claim4, claim3, claim2, claim1], any_tags=['abc', 'ghi']) - await self.assertFindsClaims([claim4, claim3], any_tags=['ghi']) - await self.assertFindsClaims([claim4, claim3], any_tags=['ghi', 'xyz']) - await self.assertFindsClaims([], any_tags=['xyz']) + await self.assertFindsClaims([claim5, claim4, claim3, claim2, claim1], any_tag=['\tabc', 'pqr']) + await self.assertFindsClaims([claim4, claim3, claim2, claim1], any_tag=['abc']) + await self.assertFindsClaims([claim4, claim3, claim2, claim1], any_tag=['abc', 'ghi']) + await self.assertFindsClaims([claim4, claim3], any_tag=['ghi']) + await self.assertFindsClaims([claim4, claim3], any_tag=['ghi', 'xyz']) + await self.assertFindsClaims([], any_tag=['xyz']) # all_tags - await self.assertFindsClaims([], all_tags=['abc', 'pqr']) - await self.assertFindsClaims([claim4, claim3, claim2, claim1], all_tags=['ABC']) - await self.assertFindsClaims([claim4, claim3], all_tags=['abc', 'ghi']) - await self.assertFindsClaims([claim4, claim3], all_tags=['ghi']) - await self.assertFindsClaims([], all_tags=['ghi', 'xyz']) - await self.assertFindsClaims([], all_tags=['xyz']) + await self.assertFindsClaims([], all_tag=['abc', 'pqr']) + await self.assertFindsClaims([claim4, claim3, claim2, claim1], all_tag=['ABC']) + await self.assertFindsClaims([claim4, claim3], all_tag=['abc', 'ghi']) + await self.assertFindsClaims([claim4, claim3], all_tag=['ghi']) + await self.assertFindsClaims([], all_tag=['ghi', 'xyz']) + await self.assertFindsClaims([], all_tag=['xyz']) # not_tags - await self.assertFindsClaims([], not_tags=['abc', 'pqr']) - await self.assertFindsClaims([claim5], not_tags=['abC']) - await self.assertFindsClaims([claim5], not_tags=['abc', 'ghi']) - await self.assertFindsClaims([claim5, claim2, claim1], not_tags=['ghi']) - await self.assertFindsClaims([claim5, claim2, claim1], not_tags=['ghi', 'xyz']) - await self.assertFindsClaims([claim5, claim4, claim3, claim2, claim1], not_tags=['xyz']) + await self.assertFindsClaims([], not_tag=['abc', 'pqr']) + await self.assertFindsClaims([claim5], not_tag=['abC']) + await self.assertFindsClaims([claim5], not_tag=['abc', 'ghi']) + await self.assertFindsClaims([claim5, claim2, claim1], not_tag=['ghi']) + await self.assertFindsClaims([claim5, claim2, claim1], not_tag=['ghi', 'xyz']) + await self.assertFindsClaims([claim5, claim4, claim3, claim2, claim1], not_tag=['xyz']) # combinations - await self.assertFindsClaims([claim3], all_tags=['abc', 'ghi'], not_tags=['mno']) - await self.assertFindsClaims([claim3], all_tags=['abc', 'ghi'], any_tags=['jkl'], not_tags=['mno']) - await self.assertFindsClaims([claim4, claim3, claim2], all_tags=['abc'], any_tags=['def', 'ghi']) + await self.assertFindsClaims([claim3], all_tag=['abc', 'ghi'], not_tag=['mno']) + await self.assertFindsClaims([claim3], all_tag=['abc', 'ghi'], any_tag=['jkl'], not_tag=['mno']) + await self.assertFindsClaims([claim4, claim3, claim2], all_tags=['abc'], any_tag=['def', 'ghi']) async def test_order_by(self): height = self.ledger.sync.network.remote_height diff --git a/tests/integration/commands/test_wallet.py b/tests/integration/commands/test_wallet.py index 82defd8be..2d991af90 100644 --- a/tests/integration/commands/test_wallet.py +++ b/tests/integration/commands/test_wallet.py @@ -150,6 +150,7 @@ class WalletCommands(CommandTestCase): }) +@skip class WalletEncryptionAndSynchronization(CommandTestCase): SEED = (