mirror of
https://github.com/LBRYFoundation/lbry-sdk.git
synced 2025-08-23 17:27:25 +00:00
cleanup and reorgnazing some stuff
Fixing tests relabel failing tests properly run all the tests for the hub cleanup HubNode
This commit is contained in:
parent
c124e88d12
commit
a97fc6dba8
7 changed files with 192 additions and 203 deletions
|
@ -51,6 +51,7 @@ from lbry.extras.daemon.security import ensure_request_allowed
|
||||||
from lbry.file_analysis import VideoFileAnalyzer
|
from lbry.file_analysis import VideoFileAnalyzer
|
||||||
from lbry.schema.claim import Claim
|
from lbry.schema.claim import Claim
|
||||||
from lbry.schema.url import URL
|
from lbry.schema.url import URL
|
||||||
|
from lbry.wallet.orchstr8.node import fix_kwargs_for_hub
|
||||||
|
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
from lbry.blob.blob_manager import BlobManager
|
from lbry.blob.blob_manager import BlobManager
|
||||||
|
@ -2494,7 +2495,11 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
|
|
||||||
Returns: {Paginated[Output]}
|
Returns: {Paginated[Output]}
|
||||||
"""
|
"""
|
||||||
wallet = self.wallet_manager.get_wallet_or_default(kwargs.pop('wallet_id', None))
|
if os.environ.get("GO_HUB") and os.environ.get("GO_HUB") == "true":
|
||||||
|
kwargs['new_sdk_server'] = "localhost:50051"
|
||||||
|
kwargs = fix_kwargs_for_hub(**kwargs)
|
||||||
|
else:
|
||||||
|
# Don't do this if using the hub server, it screws everything up
|
||||||
if "claim_ids" in kwargs and not kwargs["claim_ids"]:
|
if "claim_ids" in kwargs and not kwargs["claim_ids"]:
|
||||||
kwargs.pop("claim_ids")
|
kwargs.pop("claim_ids")
|
||||||
if {'claim_id', 'claim_ids'}.issubset(kwargs):
|
if {'claim_id', 'claim_ids'}.issubset(kwargs):
|
||||||
|
@ -2507,8 +2512,10 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
if 'has_no_source' in kwargs:
|
if 'has_no_source' in kwargs:
|
||||||
kwargs['has_source'] = not kwargs.pop('has_no_source')
|
kwargs['has_source'] = not kwargs.pop('has_no_source')
|
||||||
page_num, page_size = abs(kwargs.pop('page', 1)), min(abs(kwargs.pop('page_size', DEFAULT_PAGE_SIZE)), 50)
|
page_num, page_size = abs(kwargs.pop('page', 1)), min(abs(kwargs.pop('page_size', DEFAULT_PAGE_SIZE)), 50)
|
||||||
|
wallet = self.wallet_manager.get_wallet_or_default(kwargs.pop('wallet_id', None))
|
||||||
kwargs.update({'offset': page_size * (page_num - 1), 'limit': page_size})
|
kwargs.update({'offset': page_size * (page_num - 1), 'limit': page_size})
|
||||||
txos, blocked, _, total = await self.ledger.claim_search(wallet.accounts, **kwargs)
|
txos, blocked, _, total = await self.ledger.claim_search(wallet.accounts, **kwargs)
|
||||||
|
print(len(txos))
|
||||||
result = {
|
result = {
|
||||||
"items": txos,
|
"items": txos,
|
||||||
"blocked": blocked,
|
"blocked": blocked,
|
||||||
|
|
|
@ -152,7 +152,6 @@ class Outputs:
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_grpc(cls, outputs: OutputsMessage) -> 'Outputs':
|
def from_grpc(cls, outputs: OutputsMessage) -> 'Outputs':
|
||||||
print(outputs)
|
|
||||||
txs = set()
|
txs = set()
|
||||||
for txo_message in chain(outputs.txos, outputs.extra_txos):
|
for txo_message in chain(outputs.txos, outputs.extra_txos):
|
||||||
if txo_message.WhichOneof('meta') == 'error':
|
if txo_message.WhichOneof('meta') == 'error':
|
||||||
|
|
|
@ -19,6 +19,7 @@ from lbry.conf import Config
|
||||||
from lbry.wallet.util import satoshis_to_coins
|
from lbry.wallet.util import satoshis_to_coins
|
||||||
from lbry.wallet.orchstr8 import Conductor
|
from lbry.wallet.orchstr8 import Conductor
|
||||||
from lbry.wallet.orchstr8.node import BlockchainNode, WalletNode, HubNode
|
from lbry.wallet.orchstr8.node import BlockchainNode, WalletNode, HubNode
|
||||||
|
from lbry.wallet.orchstr8.node import fix_kwargs_for_hub
|
||||||
|
|
||||||
from lbry.extras.daemon.daemon import Daemon, jsonrpc_dumps_pretty
|
from lbry.extras.daemon.daemon import Daemon, jsonrpc_dumps_pretty
|
||||||
from lbry.extras.daemon.components import Component, WalletComponent
|
from lbry.extras.daemon.components import Component, WalletComponent
|
||||||
|
@ -623,9 +624,6 @@ class CommandTestCase(IntegrationTestCase):
|
||||||
return (await self.out(self.daemon.jsonrpc_resolve(uri, **kwargs)))[uri]
|
return (await self.out(self.daemon.jsonrpc_resolve(uri, **kwargs)))[uri]
|
||||||
|
|
||||||
async def claim_search(self, **kwargs):
|
async def claim_search(self, **kwargs):
|
||||||
if os.environ.get("GO_HUB") and os.environ.get("GO_HUB") == "true":
|
|
||||||
kwargs['new_sdk_server'] = "localhost:50051"
|
|
||||||
kwargs = self.hub.fix_kwargs(**kwargs)
|
|
||||||
return (await self.out(self.daemon.jsonrpc_claim_search(**kwargs)))['items']
|
return (await self.out(self.daemon.jsonrpc_claim_search(**kwargs)))['items']
|
||||||
|
|
||||||
async def file_list(self, *args, **kwargs):
|
async def file_list(self, *args, **kwargs):
|
||||||
|
@ -643,7 +641,7 @@ class CommandTestCase(IntegrationTestCase):
|
||||||
async def claim_list(self, *args, **kwargs):
|
async def claim_list(self, *args, **kwargs):
|
||||||
if os.environ.get("GO_HUB") and os.environ.get("GO_HUB") == "true":
|
if os.environ.get("GO_HUB") and os.environ.get("GO_HUB") == "true":
|
||||||
kwargs['new_sdk_server'] = "localhost:50051"
|
kwargs['new_sdk_server'] = "localhost:50051"
|
||||||
kwargs = self.hub.fix_kwargs(**kwargs)
|
kwargs = fix_kwargs_for_hub(**kwargs)
|
||||||
res = await self.out(self.hub.claim_search(**kwargs))
|
res = await self.out(self.hub.claim_search(**kwargs))
|
||||||
return res
|
return res
|
||||||
return (await self.out(self.daemon.jsonrpc_claim_list(*args, **kwargs)))['items']
|
return (await self.out(self.daemon.jsonrpc_claim_list(*args, **kwargs)))['items']
|
||||||
|
|
|
@ -769,10 +769,12 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
include_is_my_output=False,
|
include_is_my_output=False,
|
||||||
include_sent_supports=False,
|
include_sent_supports=False,
|
||||||
include_sent_tips=False,
|
include_sent_tips=False,
|
||||||
include_received_tips=False) -> Tuple[List[Output], dict, int, int]:
|
include_received_tips=False,
|
||||||
|
hub_server=False) -> Tuple[List[Output], dict, int, int]:
|
||||||
encoded_outputs = await query
|
encoded_outputs = await query
|
||||||
# log.warning(base64.b64decode(encoded_outputs))
|
# log.warning(base64.b64decode(encoded_outputs))
|
||||||
if os.environ.get("GO_HUB") and os.environ.get("GO_HUB") == "true":
|
# if os.environ.get("GO_HUB") and os.environ.get("GO_HUB") == "true":
|
||||||
|
if hub_server:
|
||||||
outputs = Outputs.from_grpc(encoded_outputs)
|
outputs = Outputs.from_grpc(encoded_outputs)
|
||||||
else:
|
else:
|
||||||
outputs = Outputs.from_base64(encoded_outputs or b'') # TODO: why is the server returning None?
|
outputs = Outputs.from_base64(encoded_outputs or b'') # TODO: why is the server returning None?
|
||||||
|
@ -894,6 +896,7 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
claim_search(**kwargs), accounts,
|
claim_search(**kwargs), accounts,
|
||||||
include_purchase_receipt=include_purchase_receipt,
|
include_purchase_receipt=include_purchase_receipt,
|
||||||
include_is_my_output=include_is_my_output,
|
include_is_my_output=include_is_my_output,
|
||||||
|
hub_server=True if new_sdk_server else False
|
||||||
)
|
)
|
||||||
|
|
||||||
async def get_claim_by_claim_id(self, accounts, claim_id, **kwargs) -> Output:
|
async def get_claim_by_claim_id(self, accounts, claim_id, **kwargs) -> Output:
|
||||||
|
|
|
@ -469,13 +469,6 @@ class Network:
|
||||||
return self.rpc('blockchain.claimtrie.resolve', urls, False, session_override)
|
return self.rpc('blockchain.claimtrie.resolve', urls, False, session_override)
|
||||||
|
|
||||||
def claim_search(self, session_override=None, **kwargs):
|
def claim_search(self, session_override=None, **kwargs):
|
||||||
# FIXME: How do i get a session to connect to my go rpc server?!?
|
|
||||||
# if os.environ.get("GO_HUB") and os.environ.get("GO_HUB") == "true":
|
|
||||||
# session_override = ClientSession(
|
|
||||||
# network=self, server=("localhost", 50051)
|
|
||||||
# )
|
|
||||||
# return self.rpc('pb.Hub.Search', kwargs, False, session_override)
|
|
||||||
# else:
|
|
||||||
return self.rpc('blockchain.claimtrie.search', kwargs, False, session_override)
|
return self.rpc('blockchain.claimtrie.search', kwargs, False, session_override)
|
||||||
|
|
||||||
async def new_resolve(self, server, urls):
|
async def new_resolve(self, server, urls):
|
||||||
|
@ -494,13 +487,12 @@ class Network:
|
||||||
log.warning(kwargs)
|
log.warning(kwargs)
|
||||||
response = await stub.Search(SearchRequest(**kwargs))
|
response = await stub.Search(SearchRequest(**kwargs))
|
||||||
return response
|
return response
|
||||||
# kwargs['protobuf'] = True
|
kwargs['protobuf'] = True
|
||||||
# # TODO: grpc python client here
|
|
||||||
#
|
message = {"method": "claim_search", "params": kwargs}
|
||||||
# message = {"method": "claim_search", "params": kwargs}
|
async with self.aiohttp_session.post(server, json=message) as r:
|
||||||
# async with self.aiohttp_session.post(server, json=message) as r:
|
result = await r.json()
|
||||||
# result = await r.json()
|
return result['result']
|
||||||
# return result['result']
|
|
||||||
|
|
||||||
async def sum_supports(self, server, **kwargs):
|
async def sum_supports(self, server, **kwargs):
|
||||||
message = {"method": "support_sum", "params": kwargs}
|
message = {"method": "support_sum", "params": kwargs}
|
||||||
|
|
|
@ -488,133 +488,10 @@ class HubProcess(asyncio.SubprocessProtocol):
|
||||||
self.ready.set()
|
self.ready.set()
|
||||||
|
|
||||||
|
|
||||||
class HubNode:
|
def fix_kwargs_for_hub(**kwargs):
|
||||||
|
# DEFAULT_PAGE_SIZE = 20
|
||||||
def __init__(self, url, daemon, cli):
|
# page_num, page_size = abs(kwargs.pop('page', 1)), min(abs(kwargs.pop('page_size', DEFAULT_PAGE_SIZE)), 50)
|
||||||
self.debug = True
|
# kwargs.update({'offset': page_size * (page_num - 1), 'limit': page_size})
|
||||||
|
|
||||||
self.latest_release_url = url
|
|
||||||
self.project_dir = os.path.dirname(os.path.dirname(__file__))
|
|
||||||
self.bin_dir = os.path.join(self.project_dir, 'bin')
|
|
||||||
self.daemon_bin = os.path.join(self.bin_dir, daemon)
|
|
||||||
self.cli_bin = os.path.join(os.environ['GOPATH'], 'bin/grpcurl')
|
|
||||||
self.log = log.getChild('hub')
|
|
||||||
self.data_path = None
|
|
||||||
self.protocol = None
|
|
||||||
self.transport = None
|
|
||||||
self.block_expected = 0
|
|
||||||
self.hostname = 'localhost'
|
|
||||||
# self.peerport = 9246 + 13 # avoid conflict with default peer port
|
|
||||||
self.rpcport = 50051 # avoid conflict with default rpc port
|
|
||||||
self.rpcuser = 'rpcuser'
|
|
||||||
self.rpcpassword = 'rpcpassword'
|
|
||||||
self.stopped = False
|
|
||||||
self.restart_ready = asyncio.Event()
|
|
||||||
self.restart_ready.set()
|
|
||||||
self.running = asyncio.Event()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def rpc_url(self):
|
|
||||||
return f'http://{self.rpcuser}:{self.rpcpassword}@{self.hostname}:{self.rpcport}/'
|
|
||||||
|
|
||||||
@property
|
|
||||||
def exists(self):
|
|
||||||
return (
|
|
||||||
os.path.exists(self.cli_bin) and
|
|
||||||
os.path.exists(self.daemon_bin)
|
|
||||||
)
|
|
||||||
|
|
||||||
def download(self):
|
|
||||||
downloaded_file = os.path.join(
|
|
||||||
self.bin_dir,
|
|
||||||
self.latest_release_url[self.latest_release_url.rfind('/')+1:]
|
|
||||||
)
|
|
||||||
|
|
||||||
if not os.path.exists(self.bin_dir):
|
|
||||||
os.mkdir(self.bin_dir)
|
|
||||||
|
|
||||||
if not os.path.exists(downloaded_file):
|
|
||||||
self.log.info('Downloading: %s', self.latest_release_url)
|
|
||||||
with urllib.request.urlopen(self.latest_release_url) as response:
|
|
||||||
with open(downloaded_file, 'wb') as out_file:
|
|
||||||
shutil.copyfileobj(response, out_file)
|
|
||||||
|
|
||||||
self.log.info('Extracting: %s', downloaded_file)
|
|
||||||
|
|
||||||
if downloaded_file.endswith('.zip'):
|
|
||||||
with zipfile.ZipFile(downloaded_file) as dotzip:
|
|
||||||
dotzip.extractall(self.bin_dir)
|
|
||||||
# zipfile bug https://bugs.python.org/issue15795
|
|
||||||
os.chmod(self.cli_bin, 0o755)
|
|
||||||
os.chmod(self.daemon_bin, 0o755)
|
|
||||||
|
|
||||||
elif downloaded_file.endswith('.tar.gz'):
|
|
||||||
with tarfile.open(downloaded_file) as tar:
|
|
||||||
tar.extractall(self.bin_dir)
|
|
||||||
|
|
||||||
return self.exists
|
|
||||||
|
|
||||||
def ensure(self):
|
|
||||||
return self.exists or self.download()
|
|
||||||
|
|
||||||
async def start(self):
|
|
||||||
assert self.ensure()
|
|
||||||
self.data_path = tempfile.mkdtemp()
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
asyncio.get_child_watcher().attach_loop(loop)
|
|
||||||
command = [
|
|
||||||
self.daemon_bin, 'serve',
|
|
||||||
]
|
|
||||||
self.log.info(' '.join(command))
|
|
||||||
while not self.stopped:
|
|
||||||
if self.running.is_set():
|
|
||||||
await asyncio.sleep(1)
|
|
||||||
continue
|
|
||||||
await self.restart_ready.wait()
|
|
||||||
try:
|
|
||||||
if not self.debug:
|
|
||||||
self.transport, self.protocol = await loop.subprocess_exec(
|
|
||||||
HubProcess, *command
|
|
||||||
)
|
|
||||||
await self.protocol.ready.wait()
|
|
||||||
assert not self.protocol.stopped.is_set()
|
|
||||||
self.running.set()
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
self.running.clear()
|
|
||||||
raise
|
|
||||||
except Exception as e:
|
|
||||||
self.running.clear()
|
|
||||||
log.exception('failed to start hub', exc_info=e)
|
|
||||||
|
|
||||||
async def stop(self, cleanup=True):
|
|
||||||
self.stopped = True
|
|
||||||
try:
|
|
||||||
if not self.debug:
|
|
||||||
self.transport.terminate()
|
|
||||||
await self.protocol.stopped.wait()
|
|
||||||
self.transport.close()
|
|
||||||
finally:
|
|
||||||
if cleanup:
|
|
||||||
self.cleanup()
|
|
||||||
|
|
||||||
async def clear_mempool(self):
|
|
||||||
self.restart_ready.clear()
|
|
||||||
self.transport.terminate()
|
|
||||||
await self.protocol.stopped.wait()
|
|
||||||
self.transport.close()
|
|
||||||
self.running.clear()
|
|
||||||
# os.remove(os.path.join(self.data_path, 'regtest', 'mempool.dat'))
|
|
||||||
self.restart_ready.set()
|
|
||||||
await self.running.wait()
|
|
||||||
|
|
||||||
def cleanup(self):
|
|
||||||
pass
|
|
||||||
#shutil.rmtree(self.data_path, ignore_errors=True)
|
|
||||||
|
|
||||||
def fix_kwargs(self, **kwargs):
|
|
||||||
DEFAULT_PAGE_SIZE = 20
|
|
||||||
page_num, page_size = abs(kwargs.pop('page', 1)), min(abs(kwargs.pop('page_size', DEFAULT_PAGE_SIZE)), 50)
|
|
||||||
kwargs.update({'offset': page_size * (page_num - 1), 'limit': page_size})
|
|
||||||
repeated_fields = {"name", "claim_name", "normalized", "reposted_claim_id", "_id", "public_key_hash",
|
repeated_fields = {"name", "claim_name", "normalized", "reposted_claim_id", "_id", "public_key_hash",
|
||||||
"public_key_bytes", "signature_digest", "signature", "tx_id", "channel_id",
|
"public_key_bytes", "signature_digest", "signature", "tx_id", "channel_id",
|
||||||
"fee_currency", "media_type", "stream_type", "claim_type", "description", "author", "title",
|
"fee_currency", "media_type", "stream_type", "claim_type", "description", "author", "title",
|
||||||
|
@ -733,7 +610,131 @@ class HubNode:
|
||||||
kwargs['media_type'] = kwargs.pop('media_types')
|
kwargs['media_type'] = kwargs.pop('media_types')
|
||||||
return kwargs
|
return kwargs
|
||||||
|
|
||||||
async def _cli_cmnd2(self, *args):
|
|
||||||
|
class HubNode:
|
||||||
|
|
||||||
|
def __init__(self, url, daemon, cli):
|
||||||
|
self.debug = True
|
||||||
|
|
||||||
|
self.latest_release_url = url
|
||||||
|
self.project_dir = os.path.dirname(os.path.dirname(__file__))
|
||||||
|
self.bin_dir = os.path.join(self.project_dir, 'bin')
|
||||||
|
self.daemon_bin = os.path.join(self.bin_dir, daemon)
|
||||||
|
self.cli_bin = os.path.join(self.bin_dir, daemon)
|
||||||
|
self.log = log.getChild('hub')
|
||||||
|
self.data_path = None
|
||||||
|
self.protocol = None
|
||||||
|
self.transport = None
|
||||||
|
self.block_expected = 0
|
||||||
|
self.hostname = 'localhost'
|
||||||
|
# self.peerport = 9246 + 13 # avoid conflict with default peer port
|
||||||
|
self.rpcport = 50051 # avoid conflict with default rpc port
|
||||||
|
self.rpcuser = 'rpcuser'
|
||||||
|
self.rpcpassword = 'rpcpassword'
|
||||||
|
self.stopped = False
|
||||||
|
self.restart_ready = asyncio.Event()
|
||||||
|
self.restart_ready.set()
|
||||||
|
self.running = asyncio.Event()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def rpc_url(self):
|
||||||
|
return f'http://{self.rpcuser}:{self.rpcpassword}@{self.hostname}:{self.rpcport}/'
|
||||||
|
|
||||||
|
@property
|
||||||
|
def exists(self):
|
||||||
|
return (
|
||||||
|
os.path.exists(self.cli_bin) and
|
||||||
|
os.path.exists(self.daemon_bin)
|
||||||
|
)
|
||||||
|
|
||||||
|
def download(self):
|
||||||
|
downloaded_file = os.path.join(
|
||||||
|
self.bin_dir,
|
||||||
|
self.latest_release_url[self.latest_release_url.rfind('/')+1:]
|
||||||
|
)
|
||||||
|
|
||||||
|
if not os.path.exists(self.bin_dir):
|
||||||
|
os.mkdir(self.bin_dir)
|
||||||
|
|
||||||
|
if not os.path.exists(downloaded_file):
|
||||||
|
self.log.info('Downloading: %s', self.latest_release_url)
|
||||||
|
with urllib.request.urlopen(self.latest_release_url) as response:
|
||||||
|
with open(downloaded_file, 'wb') as out_file:
|
||||||
|
shutil.copyfileobj(response, out_file)
|
||||||
|
|
||||||
|
self.log.info('Extracting: %s', downloaded_file)
|
||||||
|
|
||||||
|
if downloaded_file.endswith('.zip'):
|
||||||
|
with zipfile.ZipFile(downloaded_file) as dotzip:
|
||||||
|
dotzip.extractall(self.bin_dir)
|
||||||
|
# zipfile bug https://bugs.python.org/issue15795
|
||||||
|
os.chmod(self.cli_bin, 0o755)
|
||||||
|
os.chmod(self.daemon_bin, 0o755)
|
||||||
|
|
||||||
|
elif downloaded_file.endswith('.tar.gz'):
|
||||||
|
with tarfile.open(downloaded_file) as tar:
|
||||||
|
tar.extractall(self.bin_dir)
|
||||||
|
|
||||||
|
return self.exists
|
||||||
|
|
||||||
|
def ensure(self):
|
||||||
|
return self.exists or self.download()
|
||||||
|
|
||||||
|
async def start(self):
|
||||||
|
assert self.ensure()
|
||||||
|
self.data_path = tempfile.mkdtemp()
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
asyncio.get_child_watcher().attach_loop(loop)
|
||||||
|
command = [
|
||||||
|
self.daemon_bin, 'serve',
|
||||||
|
]
|
||||||
|
self.log.info(' '.join(command))
|
||||||
|
while not self.stopped:
|
||||||
|
if self.running.is_set():
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
continue
|
||||||
|
await self.restart_ready.wait()
|
||||||
|
try:
|
||||||
|
if not self.debug:
|
||||||
|
self.transport, self.protocol = await loop.subprocess_exec(
|
||||||
|
HubProcess, *command
|
||||||
|
)
|
||||||
|
await self.protocol.ready.wait()
|
||||||
|
assert not self.protocol.stopped.is_set()
|
||||||
|
self.running.set()
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
self.running.clear()
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
self.running.clear()
|
||||||
|
log.exception('failed to start hub', exc_info=e)
|
||||||
|
|
||||||
|
async def stop(self, cleanup=True):
|
||||||
|
self.stopped = True
|
||||||
|
try:
|
||||||
|
if not self.debug:
|
||||||
|
self.transport.terminate()
|
||||||
|
await self.protocol.stopped.wait()
|
||||||
|
self.transport.close()
|
||||||
|
finally:
|
||||||
|
if cleanup:
|
||||||
|
self.cleanup()
|
||||||
|
|
||||||
|
async def clear_mempool(self):
|
||||||
|
self.restart_ready.clear()
|
||||||
|
self.transport.terminate()
|
||||||
|
await self.protocol.stopped.wait()
|
||||||
|
self.transport.close()
|
||||||
|
self.running.clear()
|
||||||
|
# os.remove(os.path.join(self.data_path, 'regtest', 'mempool.dat'))
|
||||||
|
self.restart_ready.set()
|
||||||
|
await self.running.wait()
|
||||||
|
|
||||||
|
def cleanup(self):
|
||||||
|
pass
|
||||||
|
#shutil.rmtree(self.data_path, ignore_errors=True)
|
||||||
|
|
||||||
|
async def _cli_cmnd(self, *args):
|
||||||
cmnd_args = [
|
cmnd_args = [
|
||||||
self.daemon_bin,
|
self.daemon_bin,
|
||||||
] + list(args)
|
] + list(args)
|
||||||
|
@ -750,32 +751,5 @@ class HubNode:
|
||||||
raise Exception(result)
|
raise Exception(result)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
async def _cli_cmnd(self, *args, **kwargs):
|
|
||||||
cmnd_args = [
|
|
||||||
self.cli_bin,
|
|
||||||
'-d', f'{json.dumps(kwargs)}',
|
|
||||||
'-plaintext',
|
|
||||||
f'{self.hostname}:{self.rpcport}',
|
|
||||||
'pb.Hub.Search'
|
|
||||||
] + list(args)
|
|
||||||
self.log.warning(' '.join(cmnd_args))
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
asyncio.get_child_watcher().attach_loop(loop)
|
|
||||||
process = await asyncio.create_subprocess_exec(
|
|
||||||
*cmnd_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
|
|
||||||
)
|
|
||||||
out, _ = await process.communicate()
|
|
||||||
result = out.decode().strip()
|
|
||||||
self.log.warning(result)
|
|
||||||
if result.startswith('error code'):
|
|
||||||
raise Exception(result)
|
|
||||||
return result
|
|
||||||
|
|
||||||
async def claim_search(self, **kwargs):
|
|
||||||
kwargs = self.fix_kwargs(**kwargs)
|
|
||||||
res = json.loads(await self._cli_cmnd(**kwargs))
|
|
||||||
# log.warning(res)
|
|
||||||
return res
|
|
||||||
|
|
||||||
async def name_query(self, name):
|
async def name_query(self, name):
|
||||||
return await self._cli_cmnd2('--name', name)
|
return await self._cli_cmnd('--name', name)
|
||||||
|
|
|
@ -6,6 +6,7 @@ from binascii import unhexlify
|
||||||
from unittest import skip
|
from unittest import skip
|
||||||
from urllib.request import urlopen
|
from urllib.request import urlopen
|
||||||
|
|
||||||
|
import lbry.wallet.transaction
|
||||||
from lbry.error import InsufficientFundsError
|
from lbry.error import InsufficientFundsError
|
||||||
from lbry.extras.daemon.comment_client import verify
|
from lbry.extras.daemon.comment_client import verify
|
||||||
|
|
||||||
|
@ -69,9 +70,13 @@ class ClaimSearchCommand(ClaimTestCase):
|
||||||
|
|
||||||
async def assertFindsClaims(self, claims, **kwargs):
|
async def assertFindsClaims(self, claims, **kwargs):
|
||||||
kwargs.setdefault('order_by', ['height', '^name'])
|
kwargs.setdefault('order_by', ['height', '^name'])
|
||||||
|
if os.environ.get("GO_HUB") and os.environ["GO_HUB"] == "true":
|
||||||
|
kwargs['new_sdk_server'] = self.hub.hostname + ":" + str(self.hub.rpcport)
|
||||||
results = await self.claim_search(**kwargs)
|
results = await self.claim_search(**kwargs)
|
||||||
self.assertEqual(len(claims), len(results))
|
self.assertEqual(len(claims), len(results))
|
||||||
|
for claim, result in zip(claims, results):
|
||||||
|
print((claim['txid'], self.get_claim_id(claim)),
|
||||||
|
(result['txid'], result['claim_id'], result['height']))
|
||||||
for claim, result in zip(claims, results):
|
for claim, result in zip(claims, results):
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
(claim['txid'], self.get_claim_id(claim)),
|
(claim['txid'], self.get_claim_id(claim)),
|
||||||
|
@ -90,7 +95,7 @@ class ClaimSearchCommand(ClaimTestCase):
|
||||||
# f"(expected {claim['outputs'][0]['name']}) != (got {result['name']})"
|
# f"(expected {claim['outputs'][0]['name']}) != (got {result['name']})"
|
||||||
# )
|
# )
|
||||||
|
|
||||||
@skip("okay")
|
# @skip("okay")
|
||||||
async def test_basic_claim_search(self):
|
async def test_basic_claim_search(self):
|
||||||
await self.create_channel()
|
await self.create_channel()
|
||||||
channel_txo = self.channel['outputs'][0]
|
channel_txo = self.channel['outputs'][0]
|
||||||
|
@ -132,11 +137,20 @@ class ClaimSearchCommand(ClaimTestCase):
|
||||||
await self.assertFindsClaims(claims, channel_ids=[self.channel_id])
|
await self.assertFindsClaims(claims, channel_ids=[self.channel_id])
|
||||||
# FIXME
|
# FIXME
|
||||||
# channel param doesn't work yet because we need to implement resolve url from search first
|
# channel param doesn't work yet because we need to implement resolve url from search first
|
||||||
# await self.assertFindsClaims(claims, channel=f"@abc#{self.channel_id}")
|
cid = await self.daemon.jsonrpc_resolve(f"@abc#{self.channel_id}")
|
||||||
# await self.assertFindsClaims([], channel=f"@inexistent")
|
await self.assertFindsClaims(claims, channel_id=cid[f"@abc#{self.channel_id}"].claim_id)
|
||||||
|
cid = await self.daemon.jsonrpc_resolve(f"@inexistent")
|
||||||
|
if type(cid["@inexistent"]) == dict:
|
||||||
|
cid = ""
|
||||||
|
else:
|
||||||
|
cid = cid["@inexistent"].claim_id
|
||||||
|
await self.assertFindsClaims([], channel_id=cid)
|
||||||
await self.assertFindsClaims([three, two, signed2, signed], channel_ids=[channel_id2, self.channel_id])
|
await self.assertFindsClaims([three, two, signed2, signed], channel_ids=[channel_id2, self.channel_id])
|
||||||
await self.channel_abandon(claim_id=self.channel_id)
|
await self.channel_abandon(claim_id=self.channel_id)
|
||||||
# await self.assertFindsClaims([], channel=f"@abc#{self.channel_id}", valid_channel_signature=True)
|
# since the resolve is being done separately this would only test finding something with an empty channel so I
|
||||||
|
# think we can just remove these and test those independently
|
||||||
|
# cid = await self.daemon.jsonrpc_resolve(f"@abc#{self.channel_id}")
|
||||||
|
# await self.assertFindsClaims([], channel_id=cid[f"@abc#{self.channel_id}"].claim_id, valid_channel_signature=True)
|
||||||
await self.assertFindsClaims([], channel_ids=[self.channel_id], valid_channel_signature=True)
|
await self.assertFindsClaims([], channel_ids=[self.channel_id], valid_channel_signature=True)
|
||||||
await self.assertFindsClaims([signed2], channel_ids=[channel_id2], valid_channel_signature=True)
|
await self.assertFindsClaims([signed2], channel_ids=[channel_id2], valid_channel_signature=True)
|
||||||
# pass `invalid_channel_signature=False` to catch a bug in argument processing
|
# pass `invalid_channel_signature=False` to catch a bug in argument processing
|
||||||
|
@ -166,7 +180,7 @@ class ClaimSearchCommand(ClaimTestCase):
|
||||||
await self.assertFindsClaims([three], claim_id=self.get_claim_id(three))
|
await self.assertFindsClaims([three], claim_id=self.get_claim_id(three))
|
||||||
await self.assertFindsClaims([three], claim_id=self.get_claim_id(three), text='*')
|
await self.assertFindsClaims([three], claim_id=self.get_claim_id(three), text='*')
|
||||||
|
|
||||||
@skip("okay")
|
# @skip("okay")
|
||||||
async def test_source_filter(self):
|
async def test_source_filter(self):
|
||||||
channel = await self.channel_create('@abc')
|
channel = await self.channel_create('@abc')
|
||||||
no_source = await self.stream_create('no-source', data=None)
|
no_source = await self.stream_create('no-source', data=None)
|
||||||
|
@ -181,50 +195,52 @@ class ClaimSearchCommand(ClaimTestCase):
|
||||||
await self.assertFindsClaims([channel_repost, no_source_repost, normal_repost, normal, no_source, channel])
|
await self.assertFindsClaims([channel_repost, no_source_repost, normal_repost, normal, no_source, channel])
|
||||||
# await self.assertListsClaims([channel_repost, no_source_repost, normal_repost, normal, no_source, channel])
|
# await self.assertListsClaims([channel_repost, no_source_repost, normal_repost, normal, no_source, channel])
|
||||||
|
|
||||||
@skip("Won't work until we can resolve the channel id")
|
# @skip("Okay???")
|
||||||
async def test_pagination(self):
|
async def test_pagination(self):
|
||||||
await self.create_channel()
|
await self.create_channel()
|
||||||
await self.create_lots_of_streams()
|
await self.create_lots_of_streams()
|
||||||
|
|
||||||
|
channel_id = (await self.daemon.jsonrpc_resolve(f"@abc"))["@abc"].claim_id
|
||||||
# FIXME: this doesn't work when jsonrpc_claim_search is called directly
|
# FIXME: this doesn't work when jsonrpc_claim_search is called directly
|
||||||
# # with and without totals
|
new_sdk_server = self.hub.hostname + ":" + str(self.hub.rpcport)
|
||||||
# results = await self.daemon.jsonrpc_claim_search()
|
# with and without totals
|
||||||
# self.assertEqual(results['total_pages'], 2)
|
results = await self.daemon.jsonrpc_claim_search(new_sdk_server=new_sdk_server)
|
||||||
# self.assertEqual(results['total_items'], 25)
|
self.assertEqual(results['total_pages'], 2)
|
||||||
# results = await self.daemon.jsonrpc_claim_search(no_totals=True)
|
self.assertEqual(results['total_items'], 25)
|
||||||
|
# FIXME: Do we still need to support this?
|
||||||
|
# results = await self.daemon.jsonrpc_claim_search(no_totals=True, new_sdk_server=new_sdk_server)
|
||||||
# self.assertNotIn('total_pages', results)
|
# self.assertNotIn('total_pages', results)
|
||||||
# self.assertNotIn('total_items', results)
|
# self.assertNotIn('total_items', results)
|
||||||
|
|
||||||
# defaults
|
# defaults
|
||||||
page = await self.claim_search(channel='@abc', order_by=['height', '^name'])
|
page = await self.claim_search(channel_id=channel_id, order_by=['height', '^name'])
|
||||||
print(page)
|
|
||||||
page_claim_ids = [item['name'] for item in page]
|
page_claim_ids = [item['name'] for item in page]
|
||||||
self.assertEqual(page_claim_ids, self.streams[:DEFAULT_PAGE_SIZE])
|
self.assertEqual(page_claim_ids, self.streams[:DEFAULT_PAGE_SIZE])
|
||||||
|
|
||||||
# page with default page_size
|
# page with default page_size
|
||||||
page = await self.claim_search(page=2, channel='@abc', order_by=['height', '^name'])
|
page = await self.claim_search(page=2, channel_id=channel_id, order_by=['height', '^name'])
|
||||||
page_claim_ids = [item['name'] for item in page]
|
page_claim_ids = [item['name'] for item in page]
|
||||||
self.assertEqual(page_claim_ids, self.streams[DEFAULT_PAGE_SIZE:(DEFAULT_PAGE_SIZE*2)])
|
self.assertEqual(page_claim_ids, self.streams[DEFAULT_PAGE_SIZE:(DEFAULT_PAGE_SIZE*2)])
|
||||||
|
|
||||||
# page_size larger than dataset
|
# page_size larger than dataset
|
||||||
page = await self.claim_search(page_size=50, channel='@abc', order_by=['height', '^name'])
|
page = await self.claim_search(page_size=50, channel_id=channel_id, order_by=['height', '^name'])
|
||||||
page_claim_ids = [item['name'] for item in page]
|
page_claim_ids = [item['name'] for item in page]
|
||||||
self.assertEqual(page_claim_ids, self.streams)
|
self.assertEqual(page_claim_ids, self.streams)
|
||||||
|
|
||||||
# page_size less than dataset
|
# page_size less than dataset
|
||||||
page = await self.claim_search(page_size=6, channel='@abc', order_by=['height', '^name'])
|
page = await self.claim_search(page_size=6, channel_id=channel_id, order_by=['height', '^name'])
|
||||||
page_claim_ids = [item['name'] for item in page]
|
page_claim_ids = [item['name'] for item in page]
|
||||||
self.assertEqual(page_claim_ids, self.streams[:6])
|
self.assertEqual(page_claim_ids, self.streams[:6])
|
||||||
|
|
||||||
# page and page_size
|
# page and page_size
|
||||||
page = await self.claim_search(page=2, page_size=6, channel='@abc', order_by=['height', '^name'])
|
page = await self.claim_search(page=2, page_size=6, channel_id=channel_id, order_by=['height', '^name'])
|
||||||
page_claim_ids = [item['name'] for item in page]
|
page_claim_ids = [item['name'] for item in page]
|
||||||
self.assertEqual(page_claim_ids, self.streams[6:12])
|
self.assertEqual(page_claim_ids, self.streams[6:12])
|
||||||
|
|
||||||
out_of_bounds = await self.claim_search(page=4, page_size=20, channel='@abc')
|
out_of_bounds = await self.claim_search(page=4, page_size=20, channel_id=channel_id)
|
||||||
self.assertEqual(out_of_bounds, [])
|
self.assertEqual(out_of_bounds, [])
|
||||||
|
|
||||||
@skip("okay")
|
# @skip("okay")
|
||||||
async def test_tag_search(self):
|
async def test_tag_search(self):
|
||||||
claim1 = await self.stream_create('claim1', tags=['aBc'])
|
claim1 = await self.stream_create('claim1', tags=['aBc'])
|
||||||
claim2 = await self.stream_create('claim2', tags=['#abc', 'def'])
|
claim2 = await self.stream_create('claim2', tags=['#abc', 'def'])
|
||||||
|
@ -261,7 +277,7 @@ class ClaimSearchCommand(ClaimTestCase):
|
||||||
await self.assertFindsClaims([claim3], all_tags=['abc', 'ghi'], any_tags=['jkl'], not_tags=['mno'])
|
await self.assertFindsClaims([claim3], all_tags=['abc', 'ghi'], any_tags=['jkl'], not_tags=['mno'])
|
||||||
await self.assertFindsClaims([claim4, claim3, claim2], all_tags=['abc'], any_tags=['def', 'ghi'])
|
await self.assertFindsClaims([claim4, claim3, claim2], all_tags=['abc'], any_tags=['def', 'ghi'])
|
||||||
|
|
||||||
@skip("okay")
|
# @skip("okay")
|
||||||
async def test_order_by(self):
|
async def test_order_by(self):
|
||||||
height = self.ledger.network.remote_height
|
height = self.ledger.network.remote_height
|
||||||
claims = [await self.stream_create(f'claim{i}') for i in range(5)]
|
claims = [await self.stream_create(f'claim{i}') for i in range(5)]
|
||||||
|
@ -278,7 +294,7 @@ class ClaimSearchCommand(ClaimTestCase):
|
||||||
|
|
||||||
await self.assertFindsClaims(claims, order_by=["^name"])
|
await self.assertFindsClaims(claims, order_by=["^name"])
|
||||||
|
|
||||||
@skip("okay")
|
# @skip("okay")
|
||||||
async def test_search_by_fee(self):
|
async def test_search_by_fee(self):
|
||||||
claim1 = await self.stream_create('claim1', fee_amount='1.0', fee_currency='lbc')
|
claim1 = await self.stream_create('claim1', fee_amount='1.0', fee_currency='lbc')
|
||||||
claim2 = await self.stream_create('claim2', fee_amount='0.9', fee_currency='lbc')
|
claim2 = await self.stream_create('claim2', fee_amount='0.9', fee_currency='lbc')
|
||||||
|
@ -293,7 +309,7 @@ class ClaimSearchCommand(ClaimTestCase):
|
||||||
await self.assertFindsClaims([claim3], fee_amount='0.5', fee_currency='lbc')
|
await self.assertFindsClaims([claim3], fee_amount='0.5', fee_currency='lbc')
|
||||||
await self.assertFindsClaims([claim5], fee_currency='usd')
|
await self.assertFindsClaims([claim5], fee_currency='usd')
|
||||||
|
|
||||||
@skip("okay")
|
# @skip("okay")
|
||||||
async def test_search_by_language(self):
|
async def test_search_by_language(self):
|
||||||
claim1 = await self.stream_create('claim1', fee_amount='1.0', fee_currency='lbc')
|
claim1 = await self.stream_create('claim1', fee_amount='1.0', fee_currency='lbc')
|
||||||
claim2 = await self.stream_create('claim2', fee_amount='0.9', fee_currency='lbc')
|
claim2 = await self.stream_create('claim2', fee_amount='0.9', fee_currency='lbc')
|
||||||
|
@ -308,7 +324,7 @@ class ClaimSearchCommand(ClaimTestCase):
|
||||||
await self.assertFindsClaims([claim5, claim4, claim3], any_languages=['en', 'es'])
|
await self.assertFindsClaims([claim5, claim4, claim3], any_languages=['en', 'es'])
|
||||||
await self.assertFindsClaims([], fee_currency='foo')
|
await self.assertFindsClaims([], fee_currency='foo')
|
||||||
|
|
||||||
@skip("okay")
|
# @skip("okay")
|
||||||
async def test_search_by_channel(self):
|
async def test_search_by_channel(self):
|
||||||
match = self.assertFindsClaims
|
match = self.assertFindsClaims
|
||||||
|
|
||||||
|
@ -364,7 +380,7 @@ class ClaimSearchCommand(ClaimTestCase):
|
||||||
not_channel_ids=[chan2_id], has_channel_signature=True, valid_channel_signature=True)
|
not_channel_ids=[chan2_id], has_channel_signature=True, valid_channel_signature=True)
|
||||||
await match([], not_channel_ids=[chan1_id, chan2_id], has_channel_signature=True, valid_channel_signature=True)
|
await match([], not_channel_ids=[chan1_id, chan2_id], has_channel_signature=True, valid_channel_signature=True)
|
||||||
|
|
||||||
@skip("not okay")
|
# @skip("okay")
|
||||||
async def test_limit_claims_per_channel(self):
|
async def test_limit_claims_per_channel(self):
|
||||||
match = self.assertFindsClaims
|
match = self.assertFindsClaims
|
||||||
chan1_id = self.get_claim_id(await self.channel_create('@chan1'))
|
chan1_id = self.get_claim_id(await self.channel_create('@chan1'))
|
||||||
|
@ -384,7 +400,7 @@ class ClaimSearchCommand(ClaimTestCase):
|
||||||
limit_claims_per_channel=3, claim_type='stream'
|
limit_claims_per_channel=3, claim_type='stream'
|
||||||
)
|
)
|
||||||
|
|
||||||
@skip("not okay")
|
# @skip("okay")
|
||||||
async def test_limit_claims_per_channel_across_sorted_pages(self):
|
async def test_limit_claims_per_channel_across_sorted_pages(self):
|
||||||
await self.generate(10)
|
await self.generate(10)
|
||||||
match = self.assertFindsClaims
|
match = self.assertFindsClaims
|
||||||
|
@ -417,7 +433,7 @@ class ClaimSearchCommand(ClaimTestCase):
|
||||||
limit_claims_per_channel=1, claim_type='stream', order_by=['^height']
|
limit_claims_per_channel=1, claim_type='stream', order_by=['^height']
|
||||||
)
|
)
|
||||||
|
|
||||||
@skip("okay")
|
# @skip("okay")
|
||||||
async def test_claim_type_and_media_type_search(self):
|
async def test_claim_type_and_media_type_search(self):
|
||||||
# create an invalid/unknown claim
|
# create an invalid/unknown claim
|
||||||
address = await self.account.receiving.get_or_create_usable_address()
|
address = await self.account.receiving.get_or_create_usable_address()
|
||||||
|
@ -460,7 +476,7 @@ class ClaimSearchCommand(ClaimTestCase):
|
||||||
await self.assertFindsClaims([], duration='>100')
|
await self.assertFindsClaims([], duration='>100')
|
||||||
await self.assertFindsClaims([], duration='<14')
|
await self.assertFindsClaims([], duration='<14')
|
||||||
|
|
||||||
@skip("okay")
|
# @skip("okay")
|
||||||
async def test_search_by_text(self):
|
async def test_search_by_text(self):
|
||||||
chan1_id = self.get_claim_id(await self.channel_create('@SatoshiNakamoto'))
|
chan1_id = self.get_claim_id(await self.channel_create('@SatoshiNakamoto'))
|
||||||
chan2_id = self.get_claim_id(await self.channel_create('@Bitcoin'))
|
chan2_id = self.get_claim_id(await self.channel_create('@Bitcoin'))
|
||||||
|
|
Loading…
Add table
Reference in a new issue