From c01728a6a7e25299a562a7c760b2c02fd349c515 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 22 Jan 2019 17:45:13 -0500 Subject: [PATCH] update and add scripts --- scripts/dht_monitor.py | 76 ++++++------------ scripts/generate_json_api.py | 1 - scripts/standalone_blob_server.py | 33 ++++++++ scripts/time_to_first_byte.py | 126 ++++++++++++++++++++++++------ 4 files changed, 158 insertions(+), 78 deletions(-) create mode 100644 scripts/standalone_blob_server.py diff --git a/scripts/dht_monitor.py b/scripts/dht_monitor.py index abb7ca66e..1e6e839b5 100644 --- a/scripts/dht_monitor.py +++ b/scripts/dht_monitor.py @@ -1,7 +1,9 @@ import curses import time import logging -from lbrynet.daemon import get_client +import asyncio +from lbrynet import conf +from lbrynet.extras.daemon.client import LBRYAPIClient log = logging.getLogger(__name__) log.addHandler(logging.FileHandler("dht contacts.log")) @@ -9,8 +11,6 @@ log.addHandler(logging.FileHandler("dht contacts.log")) log.setLevel(logging.INFO) stdscr = curses.initscr() -api = get_client() - def init_curses(): curses.noecho() @@ -26,79 +26,47 @@ def teardown_curses(): curses.endwin() -def refresh(last_contacts, last_blobs): +def refresh(routing_table_info): height, width = stdscr.getmaxyx() - try: - routing_table_info = api.routing_table_get() - node_id = routing_table_info['node_id'] - except: - node_id = "UNKNOWN" - routing_table_info = { - 'buckets': {}, - 'contacts': [], - 'blob_hashes': [] - } + node_id = routing_table_info['node_id'] + for y in range(height): stdscr.addstr(y, 0, " " * (width - 1)) buckets = routing_table_info['buckets'] - stdscr.addstr(0, 0, "node id: %s" % node_id) - stdscr.addstr(1, 0, "%i buckets, %i contacts, %i blobs" % - (len(buckets), len(routing_table_info['contacts']), - len(routing_table_info['blob_hashes']))) + stdscr.addstr(0, 0, f"node id: {node_id}") + stdscr.addstr(1, 0, f"{len(buckets)} buckets") y = 3 - for i in sorted(buckets.keys()): + for i in range(len(buckets)): stdscr.addstr(y, 0, "bucket %s" % i) y += 1 - for h in sorted(buckets[i], key=lambda x: x['node_id'].decode('hex')): - stdscr.addstr(y, 0, '%s (%s:%i) - %i blobs' % (h['node_id'], h['address'], h['port'], - len(h['blobs']))) + for peer in buckets[str(i)]: + stdscr.addstr(y, 0, f"{peer['node_id'][:8]} ({peer['address']}:{peer['udp_port']})") y += 1 y += 1 - new_contacts = set(routing_table_info['contacts']) - last_contacts - lost_contacts = last_contacts - set(routing_table_info['contacts']) - - if new_contacts: - for c in new_contacts: - log.debug("added contact %s", c) - if lost_contacts: - for c in lost_contacts: - log.info("lost contact %s", c) - - new_blobs = set(routing_table_info['blob_hashes']) - last_blobs - lost_blobs = last_blobs - set(routing_table_info['blob_hashes']) - - if new_blobs: - for c in new_blobs: - log.debug("added blob %s", c) - if lost_blobs: - for c in lost_blobs: - log.info("lost blob %s", c) - stdscr.addstr(y + 1, 0, str(time.time())) stdscr.refresh() - return set(routing_table_info['contacts']), set(routing_table_info['blob_hashes']) -def do_main(): - c = None - last_contacts, last_blobs = set(), set() - while c not in [ord('q'), ord('Q')]: - last_contacts, last_blobs = refresh(last_contacts, last_blobs) - c = stdscr.getch() - time.sleep(0.1) +async def main(): + conf.initialize_settings() + api = await LBRYAPIClient.get_client() - -def main(): try: init_curses() - do_main() + c = None + while c not in [ord('q'), ord('Q')]: + routing_info = await api.routing_table_get() + refresh(routing_info) + c = stdscr.getch() + time.sleep(0.1) finally: + await api.session.close() teardown_curses() if __name__ == "__main__": - main() + asyncio.run(main()) diff --git a/scripts/generate_json_api.py b/scripts/generate_json_api.py index 6e104bc31..6da79eaab 100644 --- a/scripts/generate_json_api.py +++ b/scripts/generate_json_api.py @@ -2,7 +2,6 @@ import os import re import json import inspect -from textwrap import dedent from lbrynet.extras.daemon.Daemon import Daemon diff --git a/scripts/standalone_blob_server.py b/scripts/standalone_blob_server.py new file mode 100644 index 000000000..455aaf3c8 --- /dev/null +++ b/scripts/standalone_blob_server.py @@ -0,0 +1,33 @@ +import sys +import os +import asyncio +from lbrynet.blob.blob_manager import BlobFileManager +from lbrynet.blob_exchange.server import BlobServer +from lbrynet.schema.address import decode_address +from lbrynet.extras.daemon.storage import SQLiteStorage + + +async def main(address: str): + try: + decode_address(address) + except: + print(f"'{address}' is not a valid lbrycrd address") + return 1 + loop = asyncio.get_running_loop() + + storage = SQLiteStorage(os.path.expanduser("~/.lbrynet/lbrynet.sqlite")) + await storage.open() + blob_manager = BlobFileManager(loop, os.path.expanduser("~/.lbrynet/blobfiles"), storage) + await blob_manager.setup() + + server = await loop.create_server( + lambda: BlobServer(loop, blob_manager, address), + '0.0.0.0', 4444) + try: + async with server: + await server.serve_forever() + finally: + await storage.close() + +if __name__ == "__main__": + asyncio.run(main(sys.argv[1])) diff --git a/scripts/time_to_first_byte.py b/scripts/time_to_first_byte.py index 2efbba9e1..e62f9fc0e 100644 --- a/scripts/time_to_first_byte.py +++ b/scripts/time_to_first_byte.py @@ -1,12 +1,15 @@ +import os +import json import argparse import asyncio import aiohttp import time from aiohttp import ClientConnectorError -from lbrynet import conf +from lbrynet import conf, __version__ from lbrynet.schema.uri import parse_lbry_uri -from lbrynet.extras.daemon.DaemonConsole import LBRYAPIClient +from lbrynet.extras.daemon.client import LBRYAPIClient +from lbrynet.extras import system_info, cli def extract_uris(response): @@ -33,21 +36,80 @@ async def get_frontpage_uris(): await session.close() -async def main(): - uris = await get_frontpage_uris() - print("got %i uris" % len(uris)) - api = await LBRYAPIClient.get_client() +async def report_to_slack(output, webhook): + payload = { + "text": f"lbrynet {__version__} ({system_info.get_platform()['platform']}) time to first byte:\n{output}" + } + async with aiohttp.request('post', webhook, data=json.dumps(payload)): + pass + +def confidence(times, z): + mean = sum(times) / len(times) + standard_dev = (sum(((t - sum(times) / len(times)) ** 2.0 for t in times)) / len(times)) ** 0.5 + err = (z * standard_dev) / (len(times) ** 0.5) + return f"{round(mean, 3) + round(err, 3)}s" + + +def variance(times): + mean = sum(times) / len(times) + return round(sum(((i - mean) ** 2.0 for i in times)) / (len(times) - 1), 3) + + +async def wait_for_done(api, uri): + name = uri.split("#")[0] + last_complete = 0 + hang_count = 0 + while True: + files = await api.file_list(claim_name=name) + file = files[0] + if file['status'] in ['finished', 'stopped']: + return True, f"{file['blobs_completed']}/{file['blobs_in_stream']}", int(file['blobs_completed']) + if last_complete < int(file['blobs_completed']): + print(f"{file['blobs_completed']}/{file['blobs_in_stream']}...") + hang_count = 0 + last_complete = int(file['blobs_completed']) + else: + hang_count += 1 + await asyncio.sleep(1.0) + if hang_count > 30: + return False, f"{file['blobs_completed']}/{file['blobs_in_stream']}", int(file['blobs_completed']) + + +async def main(start_daemon=True, uris=None): + if not uris: + uris = await get_frontpage_uris() + api = await LBRYAPIClient.get_client() + daemon = None try: await api.status() except (ClientConnectorError, ConnectionError): await api.session.close() - print("Could not connect to daemon. Are you sure it's running?") - return + if start_daemon: + print("Could not connect to running daemon, starting...") + daemon = await cli.start_daemon(console_output=False) + await daemon.component_manager.started.wait() + print("Started daemon") + return await main(start_daemon=False, uris=uris) + print("Could not connect to daemon") + return 1 + print(f"Checking {len(uris)} uris from the front page") + print("**********************************************") + + resolvable = [] + for name in uris: + resolved = await api.resolve(uri=name) + if 'error' not in resolved.get(name, {}): + resolvable.append(name) + + print(f"{len(resolvable)}/{len(uris)} are resolvable") first_byte_times = [] + downloaded_times = [] + failures = [] + download_failures = [] - for uri in uris: + for uri in resolvable: await api.call( "file_delete", { "delete_from_download_dir": True, @@ -56,37 +118,55 @@ async def main(): } ) - for i, uri in enumerate(uris): + for i, uri in enumerate(resolvable): start = time.time() try: await api.call("get", {"uri": uri}) first_byte = time.time() first_byte_times.append(first_byte - start) - print(f"{i + 1}/{len(uris)} - {first_byte - start} {uri}") + print(f"{i + 1}/{len(resolvable)} - {first_byte - start} {uri}") + # downloaded, msg, blobs_in_stream = await wait_for_done(api, uri) + # if downloaded: + # downloaded_times.append((time.time()-start) / downloaded) + # print(f"{i + 1}/{len(uris)} - downloaded @ {(time.time()-start) / blobs_in_stream}, {msg} {uri}") + # else: + # print(f"failed to downlload {uri}, got {msg}") + # download_failures.append(uri) except: - print(f"{i + 1}/{len(uris)} - timed out in {time.time() - start} {uri}") + print(f"{i + 1}/{len(uris)} - timeout in {time.time() - start} {uri}") + failures.append(uri) await api.call( "file_delete", { "delete_from_download_dir": True, "claim_name": parse_lbry_uri(uri).name } ) + await asyncio.sleep(0.1) - avg = sum(first_byte_times) / len(first_byte_times) - print() - print(f"Average time to first byte: {avg} ({len(first_byte_times)} streams)") - print(f"Started {len(first_byte_times)} Timed out {len(uris) - len(first_byte_times)}") - + print("**********************************************") + result = f"Tried to start downloading {len(resolvable)} streams from the front page\n" \ + f"95% confidence time-to-first-byte: {confidence(first_byte_times, 1.984)}\n" \ + f"99% confidence time-to-first-byte: {confidence(first_byte_times, 2.626)}\n" \ + f"Variance: {variance(first_byte_times)}\n" \ + f"Started {len(first_byte_times)}/{len(resolvable)} streams" + if failures: + nt = '\n\t' + result += f"\nFailures:\n\t{nt.join([f for f in failures])}" + print(result) await api.session.close() + if daemon: + await daemon.shutdown() + webhook = os.environ.get('TTFB_SLACK_TOKEN', None) + if webhook: + await report_to_slack(result, webhook) if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument("--data_dir", default=None) - parser.add_argument("--wallet_dir", default=None) - parser.add_argument("--download_directory", default=None) + parser.add_argument("--data_dir") + parser.add_argument("--wallet_dir") + parser.add_argument("--download_directory") args = parser.parse_args() - conf.initialize_settings( - data_dir=args.data_dir, wallet_dir=args.wallet_dir, download_dir=args.download_directory - ) + + conf.initialize_settings() asyncio.run(main())