diff --git a/scripts/dht_monitor.py b/scripts/dht_monitor.py index 60a07f799..abb7ca66e 100644 --- a/scripts/dht_monitor.py +++ b/scripts/dht_monitor.py @@ -1,7 +1,7 @@ import curses import time -from jsonrpc.proxy import JSONRPCProxy import logging +from lbrynet.daemon import get_client log = logging.getLogger(__name__) log.addHandler(logging.FileHandler("dht contacts.log")) @@ -9,7 +9,7 @@ log.addHandler(logging.FileHandler("dht contacts.log")) log.setLevel(logging.INFO) stdscr = curses.initscr() -api = JSONRPCProxy.from_url("http://localhost:5279") +api = get_client() def init_curses(): @@ -53,7 +53,7 @@ def refresh(last_contacts, last_blobs): stdscr.addstr(y, 0, "bucket %s" % i) y += 1 for h in sorted(buckets[i], key=lambda x: x['node_id'].decode('hex')): - stdscr.addstr(y, 0, '%s (%s) - %i blobs' % (h['node_id'], h['address'], + stdscr.addstr(y, 0, '%s (%s:%i) - %i blobs' % (h['node_id'], h['address'], h['port'], len(h['blobs']))) y += 1 y += 1 diff --git a/scripts/dht_seed_monitor.py b/scripts/dht_seed_monitor.py new file mode 100644 index 000000000..f075fb741 --- /dev/null +++ b/scripts/dht_seed_monitor.py @@ -0,0 +1,85 @@ +import curses +import time +import datetime +from jsonrpc.proxy import JSONRPCProxy + +stdscr = curses.initscr() + +api = JSONRPCProxy.from_url("http://localhost:5280") + + +def init_curses(): + curses.noecho() + curses.cbreak() + stdscr.nodelay(1) + stdscr.keypad(1) + + +def teardown_curses(): + curses.nocbreak() + stdscr.keypad(0) + curses.echo() + curses.endwin() + + +def refresh(node_index): + height, width = stdscr.getmaxyx() + node_ids = api.get_node_ids() + node_id = node_ids[node_index] + node_statuses = api.node_status() + running = node_statuses[node_id] + buckets = api.node_routing_table(node_id=node_id) + + for y in range(height): + stdscr.addstr(y, 0, " " * (width - 1)) + + stdscr.addstr(0, 0, "node id: %s, running: %s (%i/%i running)" % (node_id, running, sum(node_statuses.values()), len(node_ids))) + stdscr.addstr(1, 0, "%i buckets, %i contacts" % + (len(buckets), sum([len(buckets[b]['contacts']) for b in buckets]))) + + y = 3 + for i in sorted(buckets.keys()): + stdscr.addstr(y, 0, "bucket %s" % i) + y += 1 + for h in sorted(buckets[i]['contacts'], key=lambda x: x['node_id'].decode('hex')): + stdscr.addstr(y, 0, '%s (%s:%i) failures: %i, last replied to us: %s, last requested from us: %s' % + (h['node_id'], h['address'], h['port'], h['failedRPCs'], + datetime.datetime.fromtimestamp(float(h['lastReplied'] or 0)), + datetime.datetime.fromtimestamp(float(h['lastRequested'] or 0)))) + y += 1 + y += 1 + + stdscr.addstr(y + 1, 0, str(time.time())) + stdscr.refresh() + return len(node_ids) + + +def do_main(): + c = None + nodes = 1 + node_index = 0 + while c not in [ord('q'), ord('Q')]: + try: + nodes = refresh(node_index) + except: + pass + c = stdscr.getch() + if c == curses.KEY_LEFT: + node_index -= 1 + node_index = max(node_index, 0) + elif c == curses.KEY_RIGHT: + node_index += 1 + node_index = min(node_index, nodes - 1) + time.sleep(0.1) + + +def main(): + try: + init_curses() + do_main() + finally: + teardown_curses() + + +if __name__ == "__main__": + main() diff --git a/scripts/seed_node.py b/scripts/seed_node.py new file mode 100644 index 000000000..18e349dbe --- /dev/null +++ b/scripts/seed_node.py @@ -0,0 +1,218 @@ +import struct +import json +import logging +import argparse +import hashlib +from copy import deepcopy +from urllib import urlopen +from twisted.internet import reactor, defer +from twisted.web import resource +from twisted.web.server import Site +from lbrynet import conf +from lbrynet.dht import constants +from lbrynet.dht.node import Node +from lbrynet.dht.error import TransportNotConnected +from lbrynet.core.log_support import configure_console, configure_twisted +from lbrynet.daemon.auth.server import AuthJSONRPCServer + +# configure_twisted() +conf.initialize_settings() +configure_console() +lbrynet_handler = logging.getLogger("lbrynet").handlers[0] +log = logging.getLogger("dht router") +log.addHandler(lbrynet_handler) +log.setLevel(logging.INFO) + + +def node_id_supplier(seed="jack.lbry.tech"): # simple deterministic node id generator + h = hashlib.sha384() + h.update(seed) + while True: + next_id = h.digest() + yield next_id + h = hashlib.sha384() + h.update(seed) + h.update(next_id) + + +def get_external_ip(): + response = json.loads(urlopen("https://api.lbry.io/ip").read()) + if not response['success']: + raise ValueError("failed to get external ip") + return response['data']['ip'] + + +def format_contact(contact): + return { + "node_id": contact.id.encode('hex'), + "address": contact.address, + "port": contact.port, + "lastReplied": contact.lastReplied, + "lastRequested": contact.lastRequested, + "failedRPCs": contact.failedRPCs + } + + +class MultiSeedRPCServer(AuthJSONRPCServer): + def __init__(self, starting_node_port=4455, nodes=50, rpc_port=5280): + AuthJSONRPCServer.__init__(self, False) + self.port = None + self.rpc_port = rpc_port + self.external_ip = get_external_ip() + node_id_gen = node_id_supplier() + self._nodes = [Node(node_id=next(node_id_gen), udpPort=starting_node_port+i, externalIP=self.external_ip) + for i in range(nodes)] + self._own_addresses = [(self.external_ip, starting_node_port+i) for i in range(nodes)] + reactor.addSystemEventTrigger('after', 'startup', self.start) + + @defer.inlineCallbacks + def start(self): + self.announced_startup = True + root = resource.Resource() + root.putChild('', self) + self.port = reactor.listenTCP(self.rpc_port, Site(root), interface='localhost') + log.info("starting %i nodes on %s, rpc available on localhost:%i", len(self._nodes), self.external_ip, self.rpc_port) + + for node in self._nodes: + node.start_listening() + yield node._protocol._listening + + for node1 in self._nodes: + for node2 in self._nodes: + if node1 is node2: + continue + try: + yield node1.addContact(node1.contact_manager.make_contact(node2.node_id, node2.externalIP, + node2.port, node1._protocol)) + except TransportNotConnected: + pass + node1.safe_start_looping_call(node1._change_token_lc, constants.tokenSecretChangeInterval) + node1.safe_start_looping_call(node1._refresh_node_lc, constants.checkRefreshInterval) + node1._join_deferred = defer.succeed(True) + reactor.addSystemEventTrigger('before', 'shutdown', self.stop) + log.info("finished bootstrapping the network, running %i nodes", len(self._nodes)) + + @defer.inlineCallbacks + def stop(self): + yield self.port.stopListening() + yield defer.DeferredList([node.stop() for node in self._nodes]) + + def jsonrpc_get_node_ids(self): + return defer.succeed([node.node_id.encode('hex') for node in self._nodes]) + + def jsonrpc_node_datastore(self, node_id): + def format_datastore(node): + datastore = deepcopy(node._dataStore._dict) + result = {} + for key, values in datastore.iteritems(): + contacts = [] + for (value, last_published, originally_published, original_publisher_id) in values: + host = ".".join([str(ord(d)) for d in value[:4]]) + port, = struct.unpack('>H', value[4:6]) + peer_node_id = value[6:] + contact_dict = format_contact(node.contact_manager.make_contact(peer_node_id, host, port)) + contact_dict['lastPublished'] = last_published + contact_dict['originallyPublished'] = originally_published + contact_dict['originalPublisherID'] = original_publisher_id + contacts.append(contact_dict) + result[key.encode('hex')] = contacts + return result + + for node in self._nodes: + if node.node_id == node_id.decode('hex'): + return defer.succeed(format_datastore(node)) + + def jsonrpc_node_routing_table(self, node_id): + def format_bucket(bucket): + return { + "contacts": [format_contact(contact) for contact in bucket._contacts], + "lastAccessed": bucket.lastAccessed + } + + def format_routing(node): + return { + i: format_bucket(bucket) for i, bucket in enumerate(node._routingTable._buckets) + } + + for node in self._nodes: + if node.node_id == node_id.decode('hex'): + return defer.succeed(format_routing(node)) + + def jsonrpc_restart_node(self, node_id): + for node in self._nodes: + if node.node_id == node_id.decode('hex'): + d = node.stop() + d.addCallback(lambda _: node.start(self._own_addresses)) + return d + + @defer.inlineCallbacks + def jsonrpc_local_node_rpc(self, from_node, query, args=()): + def format_result(response): + if isinstance(response, list): + return [[node_id.encode('hex'), address, port] for (node_id, address, port) in response] + if isinstance(response, dict): + return {'token': response['token'].encode('hex'), 'contacts': format_result(response['contacts'])} + return response + + for node in self._nodes: + if node.node_id == from_node.decode('hex'): + fn = getattr(node, query) + self_contact = node.contact_manager.make_contact(node.node_id, node.externalIP, node.port, node._protocol) + if args: + args = (str(arg) if isinstance(arg, (str, unicode)) else int(arg) for arg in args) + result = yield fn(self_contact, *args) + else: + result = yield fn() + # print "result: %s" % result + defer.returnValue(format_result(result)) + + @defer.inlineCallbacks + def jsonrpc_node_rpc(self, from_node, to_node, query, args=()): + def format_result(response): + if isinstance(response, list): + return [[node_id.encode('hex'), address, port] for (node_id, address, port) in response] + if isinstance(response, dict): + return {'token': response['token'].encode('hex'), 'contacts': format_result(response['contacts'])} + return response + + for node in self._nodes: + if node.node_id == from_node.decode('hex'): + remote = node._routingTable.getContact(to_node.decode('hex')) + fn = getattr(remote, query) + if args: + args = (str(arg).decode('hex') for arg in args) + result = yield fn(*args) + else: + result = yield fn() + defer.returnValue(format_result(result)) + + @defer.inlineCallbacks + def jsonrpc_get_nodes_who_know(self, ip_address): + nodes = [] + for node_id in [n.node_id.encode('hex') for n in self._nodes]: + routing_info = yield self.jsonrpc_node_routing_table(node_id=node_id) + for index, bucket in routing_info.iteritems(): + if ip_address in map(lambda c: c['address'], bucket['contacts']): + nodes.append(node_id) + break + defer.returnValue(nodes) + + def jsonrpc_node_status(self): + return defer.succeed({ + node.node_id.encode('hex'): node._join_deferred is not None and node._join_deferred.called + for node in self._nodes + }) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--rpc_port', default=5280) + parser.add_argument('--starting_port', default=4455) + parser.add_argument('--nodes', default=50) + args = parser.parse_args() + MultiSeedRPCServer(int(args.starting_port), int(args.nodes), int(args.rpc_port)) + reactor.run() + + +if __name__ == "__main__": + main()