From 965969b856f3acebaddbc4b3ebfa396f2e2031df Mon Sep 17 00:00:00 2001 From: Antonio Quartulli Date: Wed, 25 Oct 2017 08:55:24 +0800 Subject: [PATCH 1/6] Don't filter out local node ID when returning peer list If a node is returning a peer list for a given blob hash (being this been requested via CLI or via DHT) and it is part of the resulting peer list, it will filter itself out before returning the list. This makes the results across the DHT inconsistent as different nodes won't include themselves when responding a findValue/findNode query. Remove such filtering so that the local node ID is always included when needed. Signed-off-by: Antonio Quartulli --- lbrynet/dht/node.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index c77dc27c8..c2245e749 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -216,14 +216,13 @@ class Node(object): if result: if blob_hash in result: for peer in result[blob_hash]: - if self.node_id != peer[6:]: - host = ".".join([str(ord(d)) for d in peer[:4]]) - if host == "127.0.0.1" and "from_peer" in result \ - and result["from_peer"] != "self": - host = result["from_peer"] - port, = struct.unpack('>H', peer[4:6]) - if (host, port) not in expanded_peers: - expanded_peers.append((host, port)) + host = ".".join([str(ord(d)) for d in peer[:4]]) + if host == "127.0.0.1" and "from_peer" in result \ + and result["from_peer"] != "self": + host = result["from_peer"] + port, = struct.unpack('>H', peer[4:6]) + if (host, port) not in expanded_peers: + expanded_peers.append((host, port)) defer.returnValue(expanded_peers) def get_most_popular_hashes(self, num_to_return): From cf1b9b2aa8b8d99599fc3ae8998eed4fda27b7c1 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Fri, 27 Oct 2017 13:32:12 -0400 Subject: [PATCH 2/6] add changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 53f585185..687daad0e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,8 @@ at anytime. * Fixed failing ConnectionManager unit test for parallel connections * Fixed race condition between `publish` and `channel_new` * Fixed incorrect response on attempting to delete blob twice + * Fixed local node ID reporting in peer list + * ### Deprecated * From 7e8f3254b1c89d5edad488b15b4265521a4e12d3 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Fri, 27 Oct 2017 14:12:52 -0400 Subject: [PATCH 3/6] Add to DHT Node class initialization argument peerPort where it serves blobs, instead of specifying it in announceHaveBlob --- lbrynet/core/Session.py | 3 ++- lbrynet/core/server/DHTHashAnnouncer.py | 2 +- lbrynet/dht/node.py | 9 ++++++--- lbrynet/tests/mocks.py | 2 +- lbrynet/tests/unit/core/server/test_DHTHashAnnouncer.py | 2 +- 5 files changed, 11 insertions(+), 7 deletions(-) diff --git a/lbrynet/core/Session.py b/lbrynet/core/Session.py index edf4e425f..d85b7aca2 100644 --- a/lbrynet/core/Session.py +++ b/lbrynet/core/Session.py @@ -274,7 +274,8 @@ class Session(object): self.dht_node = self.dht_node_class( udpPort=self.dht_node_port, node_id=self.node_id, - externalIP=self.external_ip + externalIP=self.external_ip, + peerPort=self.peer_port ) self.peer_finder = DHTPeerFinder(self.dht_node, self.peer_manager) if self.hash_announcer is None: diff --git a/lbrynet/core/server/DHTHashAnnouncer.py b/lbrynet/core/server/DHTHashAnnouncer.py index aaa16bc74..ba7bede7d 100644 --- a/lbrynet/core/server/DHTHashAnnouncer.py +++ b/lbrynet/core/server/DHTHashAnnouncer.py @@ -76,7 +76,7 @@ class DHTHashAnnouncer(object): if len(self.hash_queue): h, announce_deferred = self.hash_queue.popleft() log.debug('Announcing blob %s to dht', h) - d = self.dht_node.announceHaveBlob(binascii.unhexlify(h), self.peer_port) + d = self.dht_node.announceHaveBlob(binascii.unhexlify(h)) d.chainDeferred(announce_deferred) d.addBoth(lambda _: utils.call_later(0, announce)) else: diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index c2245e749..40e5ef7ad 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -51,7 +51,7 @@ class Node(object): def __init__(self, node_id=None, udpPort=4000, dataStore=None, routingTableClass=None, networkProtocol=None, - externalIP=None): + externalIP=None, peerPort=None): """ @param dataStore: The data store to use. This must be class inheriting from the C{DataStore} interface (or providing the @@ -73,6 +73,8 @@ class Node(object): change the format of the physical RPC messages being transmitted. @type networkProtocol: entangled.kademlia.protocol.KademliaProtocol + @param externalIP: the IP at which this node can be contacted + @param peerPort: the port at which this node announces it has a blob for """ self.node_id = node_id or self._generateID() self.port = udpPort @@ -112,6 +114,7 @@ class Node(object): contactTriple[0], contactTriple[1], contactTriple[2], self._protocol) self._routingTable.addContact(contact) self.externalIP = externalIP + self.peerPort = peerPort self.hash_watcher = HashWatcher() def __del__(self): @@ -206,8 +209,8 @@ class Node(object): return 0 return num_in_data_store * self.getApproximateTotalDHTNodes() / 8 - def announceHaveBlob(self, key, port): - return self.iterativeAnnounceHaveBlob(key, {'port': port, 'lbryid': self.node_id}) + def announceHaveBlob(self, key): + return self.iterativeAnnounceHaveBlob(key, {'port': self.peerPort, 'lbryid': self.node_id}) @defer.inlineCallbacks def getPeersForBlob(self, blob_hash): diff --git a/lbrynet/tests/mocks.py b/lbrynet/tests/mocks.py index 4359aadb5..74c516931 100644 --- a/lbrynet/tests/mocks.py +++ b/lbrynet/tests/mocks.py @@ -132,7 +132,7 @@ class PeerFinder(object): self.num_peers = num_peers self.count = 0 - def find_peers_for_blob(self, *args): + def find_peers_for_blob(self, h, filter_self=False): peer_port = self.start_port + self.count self.count += 1 if self.count >= self.num_peers: diff --git a/lbrynet/tests/unit/core/server/test_DHTHashAnnouncer.py b/lbrynet/tests/unit/core/server/test_DHTHashAnnouncer.py index 927dff723..60021ffc9 100644 --- a/lbrynet/tests/unit/core/server/test_DHTHashAnnouncer.py +++ b/lbrynet/tests/unit/core/server/test_DHTHashAnnouncer.py @@ -8,7 +8,7 @@ class MocDHTNode(object): def __init__(self): self.blobs_announced = 0 - def announceHaveBlob(self, blob, port): + def announceHaveBlob(self, blob): self.blobs_announced += 1 return defer.succeed(True) From 71ebf79939db49dd9bf6f5980395eb55a4e5167f Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Fri, 27 Oct 2017 13:31:42 -0400 Subject: [PATCH 4/6] add argument in DHTPeerFinder.find_peers_for_blob that filters itself from peer list. Use this argument to remove itself from peer list when downloading blobs do not filter self on peer list --- lbrynet/core/client/BlobRequester.py | 2 +- lbrynet/core/client/DHTPeerFinder.py | 18 +++++++++++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/lbrynet/core/client/BlobRequester.py b/lbrynet/core/client/BlobRequester.py index 37185bbf0..ac8da9844 100644 --- a/lbrynet/core/client/BlobRequester.py +++ b/lbrynet/core/client/BlobRequester.py @@ -135,7 +135,7 @@ class BlobRequester(object): return defer.succeed(r) def _find_peers_for_hash(self, h): - d = self.peer_finder.find_peers_for_blob(h) + d = self.peer_finder.find_peers_for_blob(h, filter_self=True) def choose_best_peers(peers): bad_peers = self._get_bad_peers() diff --git a/lbrynet/core/client/DHTPeerFinder.py b/lbrynet/core/client/DHTPeerFinder.py index 488dce1ed..1682006cb 100644 --- a/lbrynet/core/client/DHTPeerFinder.py +++ b/lbrynet/core/client/DHTPeerFinder.py @@ -15,6 +15,10 @@ class DHTPeerFinder(object): implements(IPeerFinder) def __init__(self, dht_node, peer_manager): + """ + dht_node - an instance of dht.Node class + peer_manager - an instance of PeerManager class + """ self.dht_node = dht_node self.peer_manager = peer_manager self.peers = [] @@ -34,7 +38,17 @@ class DHTPeerFinder(object): pass @defer.inlineCallbacks - def find_peers_for_blob(self, blob_hash, timeout=None): + def find_peers_for_blob(self, blob_hash, timeout=None, filter_self=False): + """ + Find peers for blob in the DHT + blob_hash (str): blob hash to look for + timeout (int): seconds to timeout after + filter_self (bool): if True, and if a peer for a blob is itself, filter it + from the result + + Returns: + list of peers for the blob + """ def _trigger_timeout(): if not finished_deferred.called: log.debug("Peer search for %s timed out", short_hash(blob_hash)) @@ -54,6 +68,8 @@ class DHTPeerFinder(object): peers = set(peer_list) good_peers = [] for host, port in peers: + if filter_self and (host, port) == (self.dht_node.externalIP, self.dht_node.peerPort): + continue peer = self.peer_manager.get_peer(host, port) if peer.is_available() is True: good_peers.append(peer) From 9776655c3cd848ed5f4937e9aa75180ebfb994be Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Mon, 30 Oct 2017 11:45:45 -0400 Subject: [PATCH 5/6] remove unneeded if statement in getPeersForBlob --- lbrynet/dht/node.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 40e5ef7ad..76d9056b3 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -220,9 +220,6 @@ class Node(object): if blob_hash in result: for peer in result[blob_hash]: host = ".".join([str(ord(d)) for d in peer[:4]]) - if host == "127.0.0.1" and "from_peer" in result \ - and result["from_peer"] != "self": - host = result["from_peer"] port, = struct.unpack('>H', peer[4:6]) if (host, port) not in expanded_peers: expanded_peers.append((host, port)) From b380f5d34483b625c53abe199c2f90706f525c3a Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Fri, 3 Nov 2017 12:13:52 -0400 Subject: [PATCH 6/6] completely remove unused from_peer field --- lbrynet/dht/node.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 76d9056b3..0d094e4e7 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -365,7 +365,7 @@ class Node(object): # Ok, we have the value locally, so use that peers = self._dataStore.getPeersForBlob(key) # Send this value to the closest node without it - outerDf.callback({key: peers, "from_peer": 'self'}) + outerDf.callback({key: peers}) else: # Ok, value does not exist in DHT at all outerDf.callback(result) @@ -705,7 +705,6 @@ class _IterativeFindHelper(object): if self.find_value is True and self.key in result and not 'contacts' in result: # We have found the value self.find_value_result[self.key] = result[self.key] - self.find_value_result['from_peer'] = aContact.address else: if self.find_value is True: self._setClosestNodeValue(responseMsg, aContact)