From 2903ccaeb499a18cfe052b40c475892248e6b8a9 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 29 Mar 2018 10:46:29 -0400 Subject: [PATCH 1/2] add peer_ping --- CHANGELOG.md | 1 + lbrynet/daemon/Daemon.py | 27 +++++++++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6281c5236..37acab451 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,7 @@ at anytime. * virtual kademlia network and mock udp transport for dht integration tests * integration tests for bootstrapping the dht * configurable `concurrent_announcers` setting + * `peer_ping` command ### Removed * `announce_all` argument from `blob_announce` diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 44a87200c..61147a9e5 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -48,6 +48,7 @@ from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.Error import InsufficientFundsError, UnknownNameError from lbrynet.core.Error import DownloadDataTimeout, DownloadSDTimeout from lbrynet.core.Error import NullFundsError, NegativeFundsError +from lbrynet.dht.error import TimeoutError from lbrynet.core.Peer import Peer from lbrynet.core.SinglePeerDownloader import SinglePeerDownloader from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader @@ -3040,6 +3041,32 @@ class Daemon(AuthJSONRPCServer): d.addCallback(lambda r: self._render_response(r)) return d + @defer.inlineCallbacks + def jsonrpc_peer_ping(self, node_id): + """ + Find and ping a peer by node id + + Usage: + peer_ping ( | --node_id=) + + Returns: + (str) pong, or {'error': } if an error is encountered + """ + + contact = None + try: + contact = yield self.session.dht_node.findContact(node_id.decode('hex')) + except TimeoutError: + result = {'error': 'timeout finding peer'} + defer.returnValue(result) + if not contact: + defer.returnValue({'error': 'peer not found'}) + try: + result = yield contact.ping() + except TimeoutError: + result = {'error': 'ping timeout'} + defer.returnValue(result) + def jsonrpc_routing_table_get(self): """ Get DHT routing information From 5cea031f38e134c02b67ae58795b37df7515bca3 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 3 Apr 2018 13:06:16 -0400 Subject: [PATCH 2/2] return list of dictionaries from peer_list, include peer node ids --- CHANGELOG.md | 1 + lbrynet/daemon/Daemon.py | 30 ++++++++++++++++++++++++------ lbrynet/dht/node.py | 11 ++++++++--- 3 files changed, 33 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 37acab451..bd95f777a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ at anytime. * if the `use_authentication` setting is configured, use authentication for all api methods instead of only those with the `auth_required` decorator * regenerate api keys on startup if the using authentication * support both positional and keyword args for api calls + * `peer_list` to return a list of dictionaries instead of a list of lists, added peer node ids to the results ### Added * virtual kademlia network and mock udp transport for dht integration tests diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 61147a9e5..4b9b8643a 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -2850,6 +2850,7 @@ class Daemon(AuthJSONRPCServer): response = yield self._render_response("Deleted %s" % blob_hash) defer.returnValue(response) + @defer.inlineCallbacks def jsonrpc_peer_list(self, blob_hash, timeout=None): """ Get peers for blob hash @@ -2862,15 +2863,32 @@ class Daemon(AuthJSONRPCServer): --timeout= : (int) peer search timeout in seconds Returns: - (list) List of contacts + (list) List of contact dictionaries {'host': , 'port': , 'node_id': } """ - timeout = timeout or conf.settings['peer_search_timeout'] + if not utils.is_valid_blobhash(blob_hash): + raise Exception("invalid blob hash") - d = self.session.peer_finder.find_peers_for_blob(blob_hash, timeout=timeout) - d.addCallback(lambda r: [[c.host, c.port, c.is_available()] for c in r]) - d.addCallback(lambda r: self._render_response(r)) - return d + finished_deferred = self.session.dht_node.getPeersForBlob(binascii.unhexlify(blob_hash), True) + + def _trigger_timeout(): + if not finished_deferred.called: + log.debug("Peer search for %s timed out", blob_hash) + finished_deferred.cancel() + + timeout = timeout or conf.settings['peer_search_timeout'] + self.session.dht_node.reactor_callLater(timeout, _trigger_timeout) + + peers = yield finished_deferred + results = [ + { + "host": host, + "port": port, + "node_id": node_id + } + for host, port, node_id in peers + ] + defer.returnValue(results) @defer.inlineCallbacks def jsonrpc_blob_announce(self, blob_hash=None, stream_hash=None, sd_hash=None): diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 0e5c980ab..9ad105c1e 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -249,7 +249,7 @@ class Node(object): ) @defer.inlineCallbacks - def getPeersForBlob(self, blob_hash): + def getPeersForBlob(self, blob_hash, include_node_ids=False): result = yield self.iterativeFindValue(blob_hash) expanded_peers = [] if result: @@ -257,8 +257,13 @@ class Node(object): for peer in result[blob_hash]: host = ".".join([str(ord(d)) for d in peer[:4]]) port, = struct.unpack('>H', peer[4:6]) - if (host, port) not in expanded_peers: - expanded_peers.append((host, port)) + if not include_node_ids: + if (host, port) not in expanded_peers: + expanded_peers.append((host, port)) + else: + peer_node_id = peer[6:].encode('hex') + if (host, port, peer_node_id) not in expanded_peers: + expanded_peers.append((host, port, peer_node_id)) defer.returnValue(expanded_peers) def get_most_popular_hashes(self, num_to_return):