From a0a7d5f5698607dabc6073d430ef5d74ec398664 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 10 Oct 2017 15:04:48 -0400 Subject: [PATCH] add routing_table_get --- lbrynet/daemon/Daemon.py | 74 +++++++++++++++++++++++++++++++++++++++- scripts/dht_monitor.py | 19 ++++++----- 2 files changed, 83 insertions(+), 10 deletions(-) diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index e2e9df32e..65e0207a9 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -9,7 +9,7 @@ import json import textwrap import random import signal - +from copy import deepcopy from twisted.web import server from twisted.internet import defer, threads, error, reactor from twisted.internet.task import LoopingCall @@ -2658,6 +2658,78 @@ class Daemon(AuthJSONRPCServer): d.addCallback(lambda r: self._render_response(r)) return d + def jsonrpc_routing_table_get(self): + """ + Get DHT routing information + + Usage: + routing_table_get + + Returns: + (dict) dictionary containing routing and contact information + { + "buckets": { + : [ + { + "address": (str) peer address, + "node_id": (str) peer node id, + "blobs": (list) blob hashes announced by peer + } + "contacts": (list) contact node ids, + "blob_hashes": (list) all of the blob hashes stored by peers in the list of buckets + "node_id": (str) the local dht node id + """ + + result = {} + data_store = deepcopy(self.session.dht_node._dataStore._dict) + datastore_len = len(data_store) + hosts = {} + + if datastore_len: + for k, v in data_store.iteritems(): + for value, lastPublished, originallyPublished, originalPublisherID in v: + try: + contact = self.session.dht_node._routingTable.getContact( + originalPublisherID) + except ValueError: + continue + if contact in hosts: + blobs = hosts[contact] + else: + blobs = [] + blobs.append(k.encode('hex')) + hosts[contact] = blobs + + contact_set = [] + blob_hashes = [] + result['buckets'] = {} + + for i in range(len(self.session.dht_node._routingTable._buckets)): + for contact in self.session.dht_node._routingTable._buckets[i]._contacts: + contacts = result['buckets'].get(i, []) + if contact in hosts: + blobs = hosts[contact] + del hosts[contact] + else: + blobs = [] + host = { + "address": contact.address, + "node_id": contact.id.encode("hex"), + "blobs": blobs, + } + for blob_hash in blobs: + if blob_hash not in blob_hashes: + blob_hashes.append(blob_hash) + contacts.append(host) + result['buckets'][i] = contacts + if contact.id.encode('hex') not in contact_set: + contact_set.append(contact.id.encode("hex")) + + result['contacts'] = contact_set + result['blob_hashes'] = blob_hashes + result['node_id'] = self.session.dht_node.node_id.encode('hex') + return self._render_response(result) + @defer.inlineCallbacks def jsonrpc_get_availability(self, uri, sd_timeout=None, peer_timeout=None): """ diff --git a/scripts/dht_monitor.py b/scripts/dht_monitor.py index 70a93fea7..60a07f799 100644 --- a/scripts/dht_monitor.py +++ b/scripts/dht_monitor.py @@ -9,7 +9,7 @@ log.addHandler(logging.FileHandler("dht contacts.log")) log.setLevel(logging.INFO) stdscr = curses.initscr() -api = JSONRPCProxy.from_url("http://localhost:5280") +api = JSONRPCProxy.from_url("http://localhost:5279") def init_curses(): @@ -31,13 +31,13 @@ def refresh(last_contacts, last_blobs): try: routing_table_info = api.routing_table_get() - node_id = routing_table_info['node id'] + node_id = routing_table_info['node_id'] except: node_id = "UNKNOWN" routing_table_info = { 'buckets': {}, 'contacts': [], - 'blob hashes': [] + 'blob_hashes': [] } for y in range(height): stdscr.addstr(y, 0, " " * (width - 1)) @@ -46,14 +46,15 @@ def refresh(last_contacts, last_blobs): stdscr.addstr(0, 0, "node id: %s" % node_id) stdscr.addstr(1, 0, "%i buckets, %i contacts, %i blobs" % (len(buckets), len(routing_table_info['contacts']), - len(routing_table_info['blob hashes']))) + len(routing_table_info['blob_hashes']))) y = 3 for i in sorted(buckets.keys()): stdscr.addstr(y, 0, "bucket %s" % i) y += 1 - for h in sorted(buckets[i], key=lambda x: x['id'].decode('hex')): - stdscr.addstr(y, 0, '%s (%s) - %i blobs' % (h['id'], h['address'], len(h['blobs']))) + for h in sorted(buckets[i], key=lambda x: x['node_id'].decode('hex')): + stdscr.addstr(y, 0, '%s (%s) - %i blobs' % (h['node_id'], h['address'], + len(h['blobs']))) y += 1 y += 1 @@ -67,8 +68,8 @@ def refresh(last_contacts, last_blobs): for c in lost_contacts: log.info("lost contact %s", c) - new_blobs = set(routing_table_info['blob hashes']) - last_blobs - lost_blobs = last_blobs - set(routing_table_info['blob hashes']) + new_blobs = set(routing_table_info['blob_hashes']) - last_blobs + lost_blobs = last_blobs - set(routing_table_info['blob_hashes']) if new_blobs: for c in new_blobs: @@ -79,7 +80,7 @@ def refresh(last_contacts, last_blobs): stdscr.addstr(y + 1, 0, str(time.time())) stdscr.refresh() - return set(routing_table_info['contacts']), set(routing_table_info['blob hashes']) + return set(routing_table_info['contacts']), set(routing_table_info['blob_hashes']) def do_main():