diff --git a/CHANGELOG.md b/CHANGELOG.md index 53f585185..687daad0e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,8 @@ at anytime. * Fixed failing ConnectionManager unit test for parallel connections * Fixed race condition between `publish` and `channel_new` * Fixed incorrect response on attempting to delete blob twice + * Fixed local node ID reporting in peer list + * ### Deprecated * diff --git a/lbrynet/core/Session.py b/lbrynet/core/Session.py index edf4e425f..d85b7aca2 100644 --- a/lbrynet/core/Session.py +++ b/lbrynet/core/Session.py @@ -274,7 +274,8 @@ class Session(object): self.dht_node = self.dht_node_class( udpPort=self.dht_node_port, node_id=self.node_id, - externalIP=self.external_ip + externalIP=self.external_ip, + peerPort=self.peer_port ) self.peer_finder = DHTPeerFinder(self.dht_node, self.peer_manager) if self.hash_announcer is None: diff --git a/lbrynet/core/client/BlobRequester.py b/lbrynet/core/client/BlobRequester.py index 37185bbf0..ac8da9844 100644 --- a/lbrynet/core/client/BlobRequester.py +++ b/lbrynet/core/client/BlobRequester.py @@ -135,7 +135,7 @@ class BlobRequester(object): return defer.succeed(r) def _find_peers_for_hash(self, h): - d = self.peer_finder.find_peers_for_blob(h) + d = self.peer_finder.find_peers_for_blob(h, filter_self=True) def choose_best_peers(peers): bad_peers = self._get_bad_peers() diff --git a/lbrynet/core/client/DHTPeerFinder.py b/lbrynet/core/client/DHTPeerFinder.py index 488dce1ed..1682006cb 100644 --- a/lbrynet/core/client/DHTPeerFinder.py +++ b/lbrynet/core/client/DHTPeerFinder.py @@ -15,6 +15,10 @@ class DHTPeerFinder(object): implements(IPeerFinder) def __init__(self, dht_node, peer_manager): + """ + dht_node - an instance of dht.Node class + peer_manager - an instance of PeerManager class + """ self.dht_node = dht_node self.peer_manager = peer_manager self.peers = [] @@ -34,7 +38,17 @@ class DHTPeerFinder(object): pass @defer.inlineCallbacks - def find_peers_for_blob(self, blob_hash, timeout=None): + def find_peers_for_blob(self, blob_hash, timeout=None, filter_self=False): + """ + Find peers for blob in the DHT + blob_hash (str): blob hash to look for + timeout (int): seconds to timeout after + filter_self (bool): if True, and if a peer for a blob is itself, filter it + from the result + + Returns: + list of peers for the blob + """ def _trigger_timeout(): if not finished_deferred.called: log.debug("Peer search for %s timed out", short_hash(blob_hash)) @@ -54,6 +68,8 @@ class DHTPeerFinder(object): peers = set(peer_list) good_peers = [] for host, port in peers: + if filter_self and (host, port) == (self.dht_node.externalIP, self.dht_node.peerPort): + continue peer = self.peer_manager.get_peer(host, port) if peer.is_available() is True: good_peers.append(peer) diff --git a/lbrynet/core/server/DHTHashAnnouncer.py b/lbrynet/core/server/DHTHashAnnouncer.py index aaa16bc74..ba7bede7d 100644 --- a/lbrynet/core/server/DHTHashAnnouncer.py +++ b/lbrynet/core/server/DHTHashAnnouncer.py @@ -76,7 +76,7 @@ class DHTHashAnnouncer(object): if len(self.hash_queue): h, announce_deferred = self.hash_queue.popleft() log.debug('Announcing blob %s to dht', h) - d = self.dht_node.announceHaveBlob(binascii.unhexlify(h), self.peer_port) + d = self.dht_node.announceHaveBlob(binascii.unhexlify(h)) d.chainDeferred(announce_deferred) d.addBoth(lambda _: utils.call_later(0, announce)) else: diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index c77dc27c8..0d094e4e7 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -51,7 +51,7 @@ class Node(object): def __init__(self, node_id=None, udpPort=4000, dataStore=None, routingTableClass=None, networkProtocol=None, - externalIP=None): + externalIP=None, peerPort=None): """ @param dataStore: The data store to use. This must be class inheriting from the C{DataStore} interface (or providing the @@ -73,6 +73,8 @@ class Node(object): change the format of the physical RPC messages being transmitted. @type networkProtocol: entangled.kademlia.protocol.KademliaProtocol + @param externalIP: the IP at which this node can be contacted + @param peerPort: the port at which this node announces it has a blob for """ self.node_id = node_id or self._generateID() self.port = udpPort @@ -112,6 +114,7 @@ class Node(object): contactTriple[0], contactTriple[1], contactTriple[2], self._protocol) self._routingTable.addContact(contact) self.externalIP = externalIP + self.peerPort = peerPort self.hash_watcher = HashWatcher() def __del__(self): @@ -206,8 +209,8 @@ class Node(object): return 0 return num_in_data_store * self.getApproximateTotalDHTNodes() / 8 - def announceHaveBlob(self, key, port): - return self.iterativeAnnounceHaveBlob(key, {'port': port, 'lbryid': self.node_id}) + def announceHaveBlob(self, key): + return self.iterativeAnnounceHaveBlob(key, {'port': self.peerPort, 'lbryid': self.node_id}) @defer.inlineCallbacks def getPeersForBlob(self, blob_hash): @@ -216,14 +219,10 @@ class Node(object): if result: if blob_hash in result: for peer in result[blob_hash]: - if self.node_id != peer[6:]: - host = ".".join([str(ord(d)) for d in peer[:4]]) - if host == "127.0.0.1" and "from_peer" in result \ - and result["from_peer"] != "self": - host = result["from_peer"] - port, = struct.unpack('>H', peer[4:6]) - if (host, port) not in expanded_peers: - expanded_peers.append((host, port)) + host = ".".join([str(ord(d)) for d in peer[:4]]) + port, = struct.unpack('>H', peer[4:6]) + if (host, port) not in expanded_peers: + expanded_peers.append((host, port)) defer.returnValue(expanded_peers) def get_most_popular_hashes(self, num_to_return): @@ -366,7 +365,7 @@ class Node(object): # Ok, we have the value locally, so use that peers = self._dataStore.getPeersForBlob(key) # Send this value to the closest node without it - outerDf.callback({key: peers, "from_peer": 'self'}) + outerDf.callback({key: peers}) else: # Ok, value does not exist in DHT at all outerDf.callback(result) @@ -706,7 +705,6 @@ class _IterativeFindHelper(object): if self.find_value is True and self.key in result and not 'contacts' in result: # We have found the value self.find_value_result[self.key] = result[self.key] - self.find_value_result['from_peer'] = aContact.address else: if self.find_value is True: self._setClosestNodeValue(responseMsg, aContact) diff --git a/lbrynet/tests/mocks.py b/lbrynet/tests/mocks.py index 4359aadb5..74c516931 100644 --- a/lbrynet/tests/mocks.py +++ b/lbrynet/tests/mocks.py @@ -132,7 +132,7 @@ class PeerFinder(object): self.num_peers = num_peers self.count = 0 - def find_peers_for_blob(self, *args): + def find_peers_for_blob(self, h, filter_self=False): peer_port = self.start_port + self.count self.count += 1 if self.count >= self.num_peers: diff --git a/lbrynet/tests/unit/core/server/test_DHTHashAnnouncer.py b/lbrynet/tests/unit/core/server/test_DHTHashAnnouncer.py index 927dff723..60021ffc9 100644 --- a/lbrynet/tests/unit/core/server/test_DHTHashAnnouncer.py +++ b/lbrynet/tests/unit/core/server/test_DHTHashAnnouncer.py @@ -8,7 +8,7 @@ class MocDHTNode(object): def __init__(self): self.blobs_announced = 0 - def announceHaveBlob(self, blob, port): + def announceHaveBlob(self, blob): self.blobs_announced += 1 return defer.succeed(True)