mirror of
https://github.com/LBRYFoundation/lbry-sdk.git
synced 2025-08-29 16:31:25 +00:00
Merge pull request #975 from lbryio/ordex-dht-list-fix
Don't filter out local node ID when returning peer list (continued)
This commit is contained in:
commit
7898f6fdb2
8 changed files with 36 additions and 19 deletions
|
@ -18,6 +18,8 @@ at anytime.
|
||||||
* Fixed failing ConnectionManager unit test for parallel connections
|
* Fixed failing ConnectionManager unit test for parallel connections
|
||||||
* Fixed race condition between `publish` and `channel_new`
|
* Fixed race condition between `publish` and `channel_new`
|
||||||
* Fixed incorrect response on attempting to delete blob twice
|
* Fixed incorrect response on attempting to delete blob twice
|
||||||
|
* Fixed local node ID reporting in peer list
|
||||||
|
*
|
||||||
|
|
||||||
### Deprecated
|
### Deprecated
|
||||||
*
|
*
|
||||||
|
|
|
@ -274,7 +274,8 @@ class Session(object):
|
||||||
self.dht_node = self.dht_node_class(
|
self.dht_node = self.dht_node_class(
|
||||||
udpPort=self.dht_node_port,
|
udpPort=self.dht_node_port,
|
||||||
node_id=self.node_id,
|
node_id=self.node_id,
|
||||||
externalIP=self.external_ip
|
externalIP=self.external_ip,
|
||||||
|
peerPort=self.peer_port
|
||||||
)
|
)
|
||||||
self.peer_finder = DHTPeerFinder(self.dht_node, self.peer_manager)
|
self.peer_finder = DHTPeerFinder(self.dht_node, self.peer_manager)
|
||||||
if self.hash_announcer is None:
|
if self.hash_announcer is None:
|
||||||
|
|
|
@ -135,7 +135,7 @@ class BlobRequester(object):
|
||||||
return defer.succeed(r)
|
return defer.succeed(r)
|
||||||
|
|
||||||
def _find_peers_for_hash(self, h):
|
def _find_peers_for_hash(self, h):
|
||||||
d = self.peer_finder.find_peers_for_blob(h)
|
d = self.peer_finder.find_peers_for_blob(h, filter_self=True)
|
||||||
|
|
||||||
def choose_best_peers(peers):
|
def choose_best_peers(peers):
|
||||||
bad_peers = self._get_bad_peers()
|
bad_peers = self._get_bad_peers()
|
||||||
|
|
|
@ -15,6 +15,10 @@ class DHTPeerFinder(object):
|
||||||
implements(IPeerFinder)
|
implements(IPeerFinder)
|
||||||
|
|
||||||
def __init__(self, dht_node, peer_manager):
|
def __init__(self, dht_node, peer_manager):
|
||||||
|
"""
|
||||||
|
dht_node - an instance of dht.Node class
|
||||||
|
peer_manager - an instance of PeerManager class
|
||||||
|
"""
|
||||||
self.dht_node = dht_node
|
self.dht_node = dht_node
|
||||||
self.peer_manager = peer_manager
|
self.peer_manager = peer_manager
|
||||||
self.peers = []
|
self.peers = []
|
||||||
|
@ -34,7 +38,17 @@ class DHTPeerFinder(object):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def find_peers_for_blob(self, blob_hash, timeout=None):
|
def find_peers_for_blob(self, blob_hash, timeout=None, filter_self=False):
|
||||||
|
"""
|
||||||
|
Find peers for blob in the DHT
|
||||||
|
blob_hash (str): blob hash to look for
|
||||||
|
timeout (int): seconds to timeout after
|
||||||
|
filter_self (bool): if True, and if a peer for a blob is itself, filter it
|
||||||
|
from the result
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list of peers for the blob
|
||||||
|
"""
|
||||||
def _trigger_timeout():
|
def _trigger_timeout():
|
||||||
if not finished_deferred.called:
|
if not finished_deferred.called:
|
||||||
log.debug("Peer search for %s timed out", short_hash(blob_hash))
|
log.debug("Peer search for %s timed out", short_hash(blob_hash))
|
||||||
|
@ -54,6 +68,8 @@ class DHTPeerFinder(object):
|
||||||
peers = set(peer_list)
|
peers = set(peer_list)
|
||||||
good_peers = []
|
good_peers = []
|
||||||
for host, port in peers:
|
for host, port in peers:
|
||||||
|
if filter_self and (host, port) == (self.dht_node.externalIP, self.dht_node.peerPort):
|
||||||
|
continue
|
||||||
peer = self.peer_manager.get_peer(host, port)
|
peer = self.peer_manager.get_peer(host, port)
|
||||||
if peer.is_available() is True:
|
if peer.is_available() is True:
|
||||||
good_peers.append(peer)
|
good_peers.append(peer)
|
||||||
|
|
|
@ -76,7 +76,7 @@ class DHTHashAnnouncer(object):
|
||||||
if len(self.hash_queue):
|
if len(self.hash_queue):
|
||||||
h, announce_deferred = self.hash_queue.popleft()
|
h, announce_deferred = self.hash_queue.popleft()
|
||||||
log.debug('Announcing blob %s to dht', h)
|
log.debug('Announcing blob %s to dht', h)
|
||||||
d = self.dht_node.announceHaveBlob(binascii.unhexlify(h), self.peer_port)
|
d = self.dht_node.announceHaveBlob(binascii.unhexlify(h))
|
||||||
d.chainDeferred(announce_deferred)
|
d.chainDeferred(announce_deferred)
|
||||||
d.addBoth(lambda _: utils.call_later(0, announce))
|
d.addBoth(lambda _: utils.call_later(0, announce))
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -51,7 +51,7 @@ class Node(object):
|
||||||
|
|
||||||
def __init__(self, node_id=None, udpPort=4000, dataStore=None,
|
def __init__(self, node_id=None, udpPort=4000, dataStore=None,
|
||||||
routingTableClass=None, networkProtocol=None,
|
routingTableClass=None, networkProtocol=None,
|
||||||
externalIP=None):
|
externalIP=None, peerPort=None):
|
||||||
"""
|
"""
|
||||||
@param dataStore: The data store to use. This must be class inheriting
|
@param dataStore: The data store to use. This must be class inheriting
|
||||||
from the C{DataStore} interface (or providing the
|
from the C{DataStore} interface (or providing the
|
||||||
|
@ -73,6 +73,8 @@ class Node(object):
|
||||||
change the format of the physical RPC messages
|
change the format of the physical RPC messages
|
||||||
being transmitted.
|
being transmitted.
|
||||||
@type networkProtocol: entangled.kademlia.protocol.KademliaProtocol
|
@type networkProtocol: entangled.kademlia.protocol.KademliaProtocol
|
||||||
|
@param externalIP: the IP at which this node can be contacted
|
||||||
|
@param peerPort: the port at which this node announces it has a blob for
|
||||||
"""
|
"""
|
||||||
self.node_id = node_id or self._generateID()
|
self.node_id = node_id or self._generateID()
|
||||||
self.port = udpPort
|
self.port = udpPort
|
||||||
|
@ -112,6 +114,7 @@ class Node(object):
|
||||||
contactTriple[0], contactTriple[1], contactTriple[2], self._protocol)
|
contactTriple[0], contactTriple[1], contactTriple[2], self._protocol)
|
||||||
self._routingTable.addContact(contact)
|
self._routingTable.addContact(contact)
|
||||||
self.externalIP = externalIP
|
self.externalIP = externalIP
|
||||||
|
self.peerPort = peerPort
|
||||||
self.hash_watcher = HashWatcher()
|
self.hash_watcher = HashWatcher()
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
|
@ -206,8 +209,8 @@ class Node(object):
|
||||||
return 0
|
return 0
|
||||||
return num_in_data_store * self.getApproximateTotalDHTNodes() / 8
|
return num_in_data_store * self.getApproximateTotalDHTNodes() / 8
|
||||||
|
|
||||||
def announceHaveBlob(self, key, port):
|
def announceHaveBlob(self, key):
|
||||||
return self.iterativeAnnounceHaveBlob(key, {'port': port, 'lbryid': self.node_id})
|
return self.iterativeAnnounceHaveBlob(key, {'port': self.peerPort, 'lbryid': self.node_id})
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def getPeersForBlob(self, blob_hash):
|
def getPeersForBlob(self, blob_hash):
|
||||||
|
@ -216,14 +219,10 @@ class Node(object):
|
||||||
if result:
|
if result:
|
||||||
if blob_hash in result:
|
if blob_hash in result:
|
||||||
for peer in result[blob_hash]:
|
for peer in result[blob_hash]:
|
||||||
if self.node_id != peer[6:]:
|
host = ".".join([str(ord(d)) for d in peer[:4]])
|
||||||
host = ".".join([str(ord(d)) for d in peer[:4]])
|
port, = struct.unpack('>H', peer[4:6])
|
||||||
if host == "127.0.0.1" and "from_peer" in result \
|
if (host, port) not in expanded_peers:
|
||||||
and result["from_peer"] != "self":
|
expanded_peers.append((host, port))
|
||||||
host = result["from_peer"]
|
|
||||||
port, = struct.unpack('>H', peer[4:6])
|
|
||||||
if (host, port) not in expanded_peers:
|
|
||||||
expanded_peers.append((host, port))
|
|
||||||
defer.returnValue(expanded_peers)
|
defer.returnValue(expanded_peers)
|
||||||
|
|
||||||
def get_most_popular_hashes(self, num_to_return):
|
def get_most_popular_hashes(self, num_to_return):
|
||||||
|
@ -366,7 +365,7 @@ class Node(object):
|
||||||
# Ok, we have the value locally, so use that
|
# Ok, we have the value locally, so use that
|
||||||
peers = self._dataStore.getPeersForBlob(key)
|
peers = self._dataStore.getPeersForBlob(key)
|
||||||
# Send this value to the closest node without it
|
# Send this value to the closest node without it
|
||||||
outerDf.callback({key: peers, "from_peer": 'self'})
|
outerDf.callback({key: peers})
|
||||||
else:
|
else:
|
||||||
# Ok, value does not exist in DHT at all
|
# Ok, value does not exist in DHT at all
|
||||||
outerDf.callback(result)
|
outerDf.callback(result)
|
||||||
|
@ -706,7 +705,6 @@ class _IterativeFindHelper(object):
|
||||||
if self.find_value is True and self.key in result and not 'contacts' in result:
|
if self.find_value is True and self.key in result and not 'contacts' in result:
|
||||||
# We have found the value
|
# We have found the value
|
||||||
self.find_value_result[self.key] = result[self.key]
|
self.find_value_result[self.key] = result[self.key]
|
||||||
self.find_value_result['from_peer'] = aContact.address
|
|
||||||
else:
|
else:
|
||||||
if self.find_value is True:
|
if self.find_value is True:
|
||||||
self._setClosestNodeValue(responseMsg, aContact)
|
self._setClosestNodeValue(responseMsg, aContact)
|
||||||
|
|
|
@ -132,7 +132,7 @@ class PeerFinder(object):
|
||||||
self.num_peers = num_peers
|
self.num_peers = num_peers
|
||||||
self.count = 0
|
self.count = 0
|
||||||
|
|
||||||
def find_peers_for_blob(self, *args):
|
def find_peers_for_blob(self, h, filter_self=False):
|
||||||
peer_port = self.start_port + self.count
|
peer_port = self.start_port + self.count
|
||||||
self.count += 1
|
self.count += 1
|
||||||
if self.count >= self.num_peers:
|
if self.count >= self.num_peers:
|
||||||
|
|
|
@ -8,7 +8,7 @@ class MocDHTNode(object):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.blobs_announced = 0
|
self.blobs_announced = 0
|
||||||
|
|
||||||
def announceHaveBlob(self, blob, port):
|
def announceHaveBlob(self, blob):
|
||||||
self.blobs_announced += 1
|
self.blobs_announced += 1
|
||||||
return defer.succeed(True)
|
return defer.succeed(True)
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue