diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index 9d0f8d061..9304d9c99 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -10,6 +10,7 @@ import typing import random import hashlib import tracemalloc +from decimal import Decimal from urllib.parse import urlencode, quote from typing import Callable, Optional, List from binascii import hexlify, unhexlify @@ -51,7 +52,8 @@ from lbry.extras.daemon.security import ensure_request_allowed from lbry.file_analysis import VideoFileAnalyzer from lbry.schema.claim import Claim from lbry.schema.url import URL -from lbry.wallet.orchstr8.node import fix_kwargs_for_hub +from lbry.wallet.server.db.elasticsearch.constants import RANGE_FIELDS +MY_RANGE_FIELDS = RANGE_FIELDS - {"limit_claims_per_channel"} if typing.TYPE_CHECKING: from lbry.blob.blob_manager import BlobManager @@ -169,6 +171,118 @@ def paginate_list(items: List, page: Optional[int], page_size: Optional[int]): } +def fix_kwargs_for_hub(**kwargs): + repeated_fields = {"name", "claim_name", "normalized", "reposted_claim_id", "_id", "public_key_hash", + "public_key_bytes", "signature_digest", "signature", "tx_id", "channel_id", + "fee_currency", "media_type", "stream_type", "claim_type", "description", "author", "title", + "canonical_url", "short_url", "claim_id"} + value_fields = {"offset", "limit", "has_channel_signature", "has_source", "has_no_source", + "limit_claims_per_channel", "tx_nout", "remove_duplicates", + "signature_valid", "is_controlling", "amount_order", "no_totals"} + ops = {'<=': 'lte', '>=': 'gte', '<': 'lt', '>': 'gt'} + for key in list(kwargs.keys()): + value = kwargs[key] + + if key == "txid": + kwargs["tx_id"] = kwargs.pop("txid") + key = "tx_id" + if key == "nout": + kwargs["tx_nout"] = kwargs.pop("nout") + key = "tx_nout" + if key == "valid_channel_signature": + kwargs["signature_valid"] = kwargs.pop("valid_channel_signature") + if key == "invalid_channel_signature": + kwargs["signature_valid"] = not kwargs.pop("invalid_channel_signature") + if key in {"valid_channel_signature", "invalid_channel_signature"}: + key = "signature_valid" + value = kwargs[key] + if key == "has_no_source": + kwargs["has_source"] = not kwargs.pop("has_no_source") + key = "has_source" + value = kwargs[key] + + if key in value_fields: + kwargs[key] = {"value": value} if not isinstance(value, dict) else value + + if key in repeated_fields and isinstance(value, str): + kwargs[key] = [value] + + if key == "claim_id": + kwargs["claim_id"] = { + "invert": False, + "value": kwargs["claim_id"] + } + if key == "not_claim_id": + kwargs["claim_id"] = { + "invert": True, + "value": kwargs.pop("not_claim_id") + } + if key == "claim_ids": + kwargs["claim_id"] = { + "invert": False, + "value": kwargs.pop("claim_ids") + } + if key == "not_claim_ids": + kwargs["claim_id"] = { + "invert": True, + "value": kwargs["not_claim_ids"] + } + del kwargs["not_claim_ids"] + if key == "channel_id": + kwargs["channel_id"] = { + "invert": False, + "value": kwargs["channel_id"] + } + if key == "channel": + kwargs["channel_id"] = { + "invert": False, + "value": kwargs.pop("channel") + } + if key == "not_channel_id": + kwargs["channel_id"] = { + "invert": True, + "value": kwargs.pop("not_channel_id") + } + if key == "channel_ids": + kwargs["channel_ids"] = { + "invert": False, + "value": kwargs["channel_ids"] + } + if key == "not_channel_ids": + kwargs["channel_ids"] = { + "invert": True, + "value": kwargs.pop("not_channel_ids") + } + + if key in MY_RANGE_FIELDS and isinstance(value, str) and value[0] in ops: + operator_length = 2 if value[:2] in ops else 1 + operator, value = value[:operator_length], value[operator_length:] + + op = 0 + if operator == '=': + op = 0 + if operator in ('<=', 'lte'): + op = 1 + if operator in ('>=', 'gte'): + op = 2 + if operator in ('<', 'lt'): + op = 3 + if operator in ('>', 'gt'): + op = 4 + kwargs[key] = {"op": op, "value": [str(value)]} + elif key in MY_RANGE_FIELDS: + kwargs[key] = {"op": 0, "value": [str(value)]} + + if key == 'fee_amount': + value = kwargs['fee_amount'] + value.update({"value": [str(Decimal(value['value'][0]) * 1000)]}) + kwargs['fee_amount'] = value + if key == 'stream_types': + kwargs['stream_type'] = kwargs.pop('stream_types') + if key == 'media_types': + kwargs['media_type'] = kwargs.pop('media_types') + return kwargs + DHT_HAS_CONTACTS = "dht_has_contacts" @@ -2496,7 +2610,18 @@ class Daemon(metaclass=JSONRPCServerType): Returns: {Paginated[Output]} """ if os.environ.get("GO_HUB") and os.environ.get("GO_HUB") == "true": - kwargs['new_sdk_server'] = "localhost:50051" + log.warning("### Using go hub! ###") + host = os.environ.get("HUB_HOST", "localhost") + port = os.environ.get("HUB_PORT", "50051") + kwargs['new_sdk_server'] = f"{host}:{port}" + if kwargs.get("channel"): + channel = kwargs.pop("channel") + channel_obj = (await self.jsonrpc_resolve(channel))[channel] + if isinstance(channel_obj, dict): + # This happens when the channel doesn't exist + kwargs["channel_id"] = "" + else: + kwargs["channel_id"] = channel_obj.claim_id kwargs = fix_kwargs_for_hub(**kwargs) else: # Don't do this if using the hub server, it screws everything up diff --git a/lbry/testcase.py b/lbry/testcase.py index 1549d4809..8e022399b 100644 --- a/lbry/testcase.py +++ b/lbry/testcase.py @@ -19,7 +19,6 @@ from lbry.conf import Config from lbry.wallet.util import satoshis_to_coins from lbry.wallet.orchstr8 import Conductor from lbry.wallet.orchstr8.node import BlockchainNode, WalletNode, HubNode -from lbry.wallet.orchstr8.node import fix_kwargs_for_hub from lbry.extras.daemon.daemon import Daemon, jsonrpc_dumps_pretty from lbry.extras.daemon.components import Component, WalletComponent diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 7fb7069eb..211e3ef7a 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -891,7 +891,7 @@ class Ledger(metaclass=LedgerRegistry): claim_search(**kwargs), accounts, include_purchase_receipt=include_purchase_receipt, include_is_my_output=include_is_my_output, - hub_server=True if new_sdk_server else False + hub_server=new_sdk_server is not None ) async def get_claim_by_claim_id(self, accounts, claim_id, **kwargs) -> Output: diff --git a/lbry/wallet/network.py b/lbry/wallet/network.py index 65873b824..b9b391eeb 100644 --- a/lbry/wallet/network.py +++ b/lbry/wallet/network.py @@ -1,7 +1,6 @@ import logging import asyncio import json -import os import socket import random from time import perf_counter @@ -479,21 +478,14 @@ class Network: return result['result'] async def new_claim_search(self, server, **kwargs): - if os.environ.get("GO_HUB") and os.environ.get("GO_HUB") == "true": - if "offset" in kwargs and type(kwargs["offset"]) == int: - kwargs["offset"] = {"value": kwargs["offset"]} - if "limit" in kwargs and type(kwargs["limit"]) == int: - kwargs["limit"] = {"value": kwargs["limit"]} - async with grpc.aio.insecure_channel(server) as channel: - stub = hub_pb2_grpc.HubStub(channel) - response = await stub.Search(SearchRequest(**kwargs)) - return response - kwargs['protobuf'] = True - - message = {"method": "claim_search", "params": kwargs} - async with self.aiohttp_session.post(server, json=message) as r: - result = await r.json() - return result['result'] + if "offset" in kwargs and isinstance(kwargs["offset"], int): + kwargs["offset"] = {"value": kwargs["offset"]} + if "limit" in kwargs and isinstance(kwargs["limit"], int): + kwargs["limit"] = {"value": kwargs["limit"]} + async with grpc.aio.insecure_channel(server) as channel: + stub = hub_pb2_grpc.HubStub(channel) + response = await stub.Search(SearchRequest(**kwargs)) + return response async def sum_supports(self, server, **kwargs): message = {"method": "support_sum", "params": kwargs} diff --git a/lbry/wallet/orchstr8/__init__.py b/lbry/wallet/orchstr8/__init__.py index eea5d88be..2d047f9a5 100644 --- a/lbry/wallet/orchstr8/__init__.py +++ b/lbry/wallet/orchstr8/__init__.py @@ -1,2 +1,5 @@ +__hub_url__ = ( + "https://github.com/lbryio/hub/releases/download/v0.2021.06.14-beta/hub" +) from .node import Conductor from .service import ConductorService diff --git a/lbry/wallet/orchstr8/node.py b/lbry/wallet/orchstr8/node.py index 36eb15689..8f7060111 100644 --- a/lbry/wallet/orchstr8/node.py +++ b/lbry/wallet/orchstr8/node.py @@ -18,11 +18,8 @@ from lbry.wallet.server.server import Server from lbry.wallet.server.env import Env from lbry.wallet import Wallet, Ledger, RegTestLedger, WalletManager, Account, BlockHeightEvent from lbry.conf import KnownHubsList, Config +from lbry.wallet.orchstr8 import __hub_url__ -from decimal import Decimal -from lbry.wallet.server.db.elasticsearch.constants import INDEX_DEFAULT_SETTINGS, REPLACEMENTS, FIELDS, TEXT_FIELDS, \ - RANGE_FIELDS -MY_RANGE_FIELDS = RANGE_FIELDS - {"limit_claims_per_channel"} log = logging.getLogger(__name__) @@ -51,7 +48,7 @@ class Conductor: self.wallet_node = WalletNode( self.manager_module, RegTestLedger, default_seed=seed ) - self.hub_node = HubNode("asdf", "hub", "hub") + self.hub_node = HubNode(__hub_url__, "hub", "hub") self.blockchain_started = False self.spv_started = False @@ -482,139 +479,17 @@ class HubProcess(asyncio.SubprocessProtocol): raise SystemError(data.decode()) if b'listening on' in data: self.ready.set() + print(data.decode("utf-8")) def process_exited(self): self.stopped.set() self.ready.set() -def fix_kwargs_for_hub(**kwargs): - # DEFAULT_PAGE_SIZE = 20 - # page_num, page_size = abs(kwargs.pop('page', 1)), min(abs(kwargs.pop('page_size', DEFAULT_PAGE_SIZE)), 50) - # kwargs.update({'offset': page_size * (page_num - 1), 'limit': page_size}) - repeated_fields = {"name", "claim_name", "normalized", "reposted_claim_id", "_id", "public_key_hash", - "public_key_bytes", "signature_digest", "signature", "tx_id", "channel_id", - "fee_currency", "media_type", "stream_type", "claim_type", "description", "author", "title", - "canonical_url", "short_url", "claim_id"} - value_fields = {"offset", "limit", "has_channel_signature", "has_source", "has_no_source", - "limit_claims_per_channel", "tx_nout", "remove_duplicates", - "signature_valid", "is_controlling", "amount_order"} - ops = {'<=': 'lte', '>=': 'gte', '<': 'lt', '>': 'gt'} - for key in list(kwargs.keys()): - value = kwargs[key] - - if "txid" == key: - kwargs["tx_id"] = kwargs.pop("txid") - key = "tx_id" - if "nout" == key: - kwargs["tx_nout"] = kwargs.pop("nout") - key = "tx_nout" - if "valid_channel_signature" == key: - kwargs["signature_valid"] = kwargs.pop("valid_channel_signature") - if "invalid_channel_signature" == key: - kwargs["signature_valid"] = not kwargs.pop("invalid_channel_signature") - if key in {"valid_channel_signature", "invalid_channel_signature"}: - key = "signature_valid" - value = kwargs[key] - if "has_no_source" == key: - kwargs["has_source"] = not kwargs.pop("has_no_source") - key = "has_source" - value = kwargs[key] - - if key in value_fields: - kwargs[key] = {"value": value} if type(value) != dict else value - - if key in repeated_fields: - kwargs[key] = [value] - - - if "claim_id" == key: - kwargs["claim_id"] = { - "invert": False, - "value": kwargs["claim_id"] - } - if "not_claim_id" == key: - kwargs["claim_id"] = { - "invert": True, - "value": kwargs["not_claim_id"] - } - del kwargs["not_claim_id"] - if "claim_ids" == key: - kwargs["claim_id"] = { - "invert": False, - "value": kwargs["claim_ids"] - } - del kwargs["claim_ids"] - if "not_claim_ids" == key: - kwargs["claim_id"] = { - "invert": True, - "value": kwargs["not_claim_ids"] - } - del kwargs["not_claim_ids"] - if "channel_id" == key: - kwargs["channel_id"] = { - "invert": False, - "value": kwargs["channel_id"] - } - if "channel" == key: - kwargs["channel_id"] = { - "invert": False, - "value": kwargs["channel"] - } - del kwargs["channel"] - if "not_channel_id" == key: - kwargs["channel_id"] = { - "invert": True, - "value": kwargs["not_channel_id"] - } - del kwargs["not_channel_id"] - if "channel_ids" == key: - kwargs["channel_ids"] = { - "invert": False, - "value": kwargs["channel_ids"] - } - if "not_channel_ids" == key: - kwargs["channel_ids"] = { - "invert": True, - "value": kwargs["not_channel_ids"] - } - del kwargs["not_channel_ids"] - - - if key in MY_RANGE_FIELDS and isinstance(value, str) and value[0] in ops: - operator_length = 2 if value[:2] in ops else 1 - operator, value = value[:operator_length], value[operator_length:] - - op = 0 - if operator == '=': - op = 0 - if operator == '<=' or operator == 'lte': - op = 1 - if operator == '>=' or operator == 'gte': - op = 2 - if operator == '<' or operator == 'lt': - op = 3 - if operator == '>' or operator == 'gt': - op = 4 - kwargs[key] = {"op": op, "value": [str(value)]} - elif key in MY_RANGE_FIELDS: - kwargs[key] = {"op": 0, "value": [str(value)]} - - if 'fee_amount' == key: - value = kwargs['fee_amount'] - value.update({"value": [str(Decimal(value['value'][0]) * 1000)]}) - kwargs['fee_amount'] = value - if 'stream_types' == key: - kwargs['stream_type'] = kwargs.pop('stream_types') - if 'media_types' == key: - kwargs['media_type'] = kwargs.pop('media_types') - return kwargs - - class HubNode: def __init__(self, url, daemon, cli): - self.debug = True + self.debug = False self.latest_release_url = url self.project_dir = os.path.dirname(os.path.dirname(__file__)) @@ -622,24 +497,15 @@ class HubNode: self.daemon_bin = os.path.join(self.bin_dir, daemon) self.cli_bin = os.path.join(self.bin_dir, daemon) self.log = log.getChild('hub') - self.data_path = None - self.protocol = None self.transport = None - self.block_expected = 0 + self.protocol = None self.hostname = 'localhost' - # self.peerport = 9246 + 13 # avoid conflict with default peer port - self.rpcport = 50051 # avoid conflict with default rpc port - self.rpcuser = 'rpcuser' - self.rpcpassword = 'rpcpassword' + self.rpcport = 50051 # avoid conflict with default rpc port self.stopped = False self.restart_ready = asyncio.Event() self.restart_ready.set() self.running = asyncio.Event() - @property - def rpc_url(self): - return f'http://{self.rpcuser}:{self.rpcpassword}@{self.hostname}:{self.rpcport}/' - @property def exists(self): return ( @@ -675,6 +541,8 @@ class HubNode: with tarfile.open(downloaded_file) as tar: tar.extractall(self.bin_dir) + os.chmod(self.daemon_bin, 0o755) + return self.exists def ensure(self): @@ -682,11 +550,10 @@ class HubNode: async def start(self): assert self.ensure() - self.data_path = tempfile.mkdtemp() loop = asyncio.get_event_loop() asyncio.get_child_watcher().attach_loop(loop) command = [ - self.daemon_bin, 'serve', + self.daemon_bin, 'serve', '--dev' ] self.log.info(' '.join(command)) while not self.stopped: @@ -720,19 +587,8 @@ class HubNode: if cleanup: self.cleanup() - async def clear_mempool(self): - self.restart_ready.clear() - self.transport.terminate() - await self.protocol.stopped.wait() - self.transport.close() - self.running.clear() - # os.remove(os.path.join(self.data_path, 'regtest', 'mempool.dat')) - self.restart_ready.set() - await self.running.wait() - def cleanup(self): pass - #shutil.rmtree(self.data_path, ignore_errors=True) async def _cli_cmnd(self, *args): cmnd_args = [ diff --git a/tests/integration/hub/test_hub_commands.py b/tests/integration/hub/test_hub_commands.py index dddcc71f6..fdf21cc0f 100644 --- a/tests/integration/hub/test_hub_commands.py +++ b/tests/integration/hub/test_hub_commands.py @@ -135,8 +135,6 @@ class ClaimSearchCommand(ClaimTestCase): # three streams in channel, zero streams in abandoned channel claims = [three, two, signed] await self.assertFindsClaims(claims, channel_ids=[self.channel_id]) - # FIXME - # channel param doesn't work yet because we need to implement resolve url from search first cid = await self.daemon.jsonrpc_resolve(f"@abc#{self.channel_id}") await self.assertFindsClaims(claims, channel_id=cid[f"@abc#{self.channel_id}"].claim_id) cid = await self.daemon.jsonrpc_resolve(f"@inexistent") @@ -168,9 +166,10 @@ class ClaimSearchCommand(ClaimTestCase): # FIXME # print(valid_claims) # Something happens in inflation I think and this gets switch from valid to not - # self.assertTrue(all([c['is_channel_signature_valid'] for c in valid_claims])) - # And signing channel only has id? 'signing_channel': {'channel_id': '6f4513e9bbd63d7b7f13dbf4fd2ef28c560ac89b'} - # self.assertEqual('@abc', valid_claims[0]['signing_channel']['name']) + self.assertTrue(all([c['is_channel_signature_valid'] for c in valid_claims])) + # import json + # print(json.dumps(valid_claims, indent=4, sort_keys=True)) + self.assertEqual('@abc', valid_claims[0]['signing_channel']['name']) # abandoned stream won't show up for streams in channel search await self.stream_abandon(txid=signed2['txid'], nout=0) @@ -397,6 +396,7 @@ class ClaimSearchCommand(ClaimTestCase): limit_claims_per_channel=3, claim_type='stream' ) + # @skip("okay") async def test_no_duplicates(self): await self.generate(10) match = self.assertFindsClaims @@ -499,7 +499,7 @@ class ClaimSearchCommand(ClaimTestCase): await self.assertFindsClaims([], duration='>100') await self.assertFindsClaims([], duration='<14') - # @skip("okay") + # # @skip("okay") async def test_search_by_text(self): chan1_id = self.get_claim_id(await self.channel_create('@SatoshiNakamoto')) chan2_id = self.get_claim_id(await self.channel_create('@Bitcoin')) diff --git a/tests/unit/schema/test_models.py b/tests/unit/schema/test_models.py index a2587d2f3..13c868ed0 100644 --- a/tests/unit/schema/test_models.py +++ b/tests/unit/schema/test_models.py @@ -88,13 +88,13 @@ class TestLanguages(TestCase): def test_language_error_parsing(self): stream = Stream() - with self.assertRaisesRegex(ValueError, 'Language has no value defined for name zz'): + with self.assertRaisesRegex(ValueError, "Enum Language has no value defined for name 'zz'"): stream.languages.append('zz') - with self.assertRaisesRegex(ValueError, 'Script has no value defined for name Zabc'): + with self.assertRaisesRegex(ValueError, "Enum Script has no value defined for name 'Zabc'"): stream.languages.append('en-Zabc') - with self.assertRaisesRegex(ValueError, 'Country has no value defined for name ZZ'): + with self.assertRaisesRegex(ValueError, "Enum Country has no value defined for name 'ZZ'"): stream.languages.append('en-Zzzz-ZZ') - with self.assertRaisesRegex(AssertionError, 'Failed to parse language tag: en-Zzz-US'): + with self.assertRaisesRegex(AssertionError, "Failed to parse language tag: en-Zzz-US"): stream.languages.append('en-Zzz-US') diff --git a/tox.ini b/tox.ini index 0b8b7ff90..8f789478a 100644 --- a/tox.ini +++ b/tox.ini @@ -8,6 +8,7 @@ changedir = {toxinidir}/tests setenv = HOME=/tmp ELASTIC_HOST={env:ELASTIC_HOST:localhost} + GO_HUB=true commands = orchstr8 download blockchain: coverage run -p --source={envsitepackagesdir}/lbry -m unittest discover -vv integration.blockchain {posargs}