diff --git a/lbry/dht/protocol/protocol.py b/lbry/dht/protocol/protocol.py index 6741fa828..c34c129c3 100644 --- a/lbry/dht/protocol/protocol.py +++ b/lbry/dht/protocol/protocol.py @@ -356,72 +356,10 @@ class KademliaProtocol(DatagramProtocol): return args, {} async def _add_peer(self, peer: 'KademliaPeer'): - if not peer.node_id: - log.warning("Tried adding a peer with no node id!") - return False - for my_peer in self.routing_table.get_peers(): - if (my_peer.address, my_peer.udp_port) == (peer.address, peer.udp_port) and my_peer.node_id != peer.node_id: - self.routing_table.remove_peer(my_peer) - self.routing_table.join_buckets() - bucket_index = self.routing_table.kbucket_index(peer.node_id) - if self.routing_table.buckets[bucket_index].add_peer(peer): - return True - - # The bucket is full; see if it can be split (by checking if its range includes the host node's node_id) - if self.routing_table.should_split(bucket_index, peer.node_id): - self.routing_table.split_bucket(bucket_index) - # Retry the insertion attempt - result = await self._add_peer(peer) - self.routing_table.join_buckets() - return result - else: - # We can't split the k-bucket - # - # The 13 page kademlia paper specifies that the least recently contacted node in the bucket - # shall be pinged. If it fails to reply it is replaced with the new contact. If the ping is successful - # the new contact is ignored and not added to the bucket (sections 2.2 and 2.4). - # - # A reasonable extension to this is BEP 0005, which extends the above: - # - # Not all nodes that we learn about are equal. Some are "good" and some are not. - # Many nodes using the DHT are able to send queries and receive responses, - # but are not able to respond to queries from other nodes. It is important that - # each node's routing table must contain only known good nodes. A good node is - # a node has responded to one of our queries within the last 15 minutes. A node - # is also good if it has ever responded to one of our queries and has sent us a - # query within the last 15 minutes. After 15 minutes of inactivity, a node becomes - # questionable. Nodes become bad when they fail to respond to multiple queries - # in a row. Nodes that we know are good are given priority over nodes with unknown status. - # - # When there are bad or questionable nodes in the bucket, the least recent is selected for - # potential replacement (BEP 0005). When all nodes in the bucket are fresh, the head (least recent) - # contact is selected as described in section 2.2 of the kademlia paper. In both cases the new contact - # is ignored if the pinged node replies. - - not_good_contacts = self.routing_table.buckets[bucket_index].get_bad_or_unknown_peers() - not_recently_replied = [] - for my_peer in not_good_contacts: - last_replied = self.peer_manager.get_last_replied(my_peer.address, my_peer.udp_port) - if not last_replied or last_replied + 60 < self.loop.time(): - not_recently_replied.append(my_peer) - if not_recently_replied: - to_replace = not_recently_replied[0] - else: - to_replace = self.routing_table.buckets[bucket_index].peers[0] - last_replied = self.peer_manager.get_last_replied(to_replace.address, to_replace.udp_port) - if last_replied and last_replied + 60 > self.loop.time(): - return False - log.debug("pinging %s:%s", to_replace.address, to_replace.udp_port) - try: - to_replace_rpc = self.get_rpc_peer(to_replace) - await to_replace_rpc.ping() - return False - except asyncio.TimeoutError: - log.debug("Replacing dead contact in bucket %i: %s:%i with %s:%i ", bucket_index, - to_replace.address, to_replace.udp_port, peer.address, peer.udp_port) - if to_replace in self.routing_table.buckets[bucket_index]: - self.routing_table.buckets[bucket_index].remove_peer(to_replace) - return await self._add_peer(peer) + async def probe(some_peer: 'KademliaPeer'): + rpc_peer = self.get_rpc_peer(some_peer) + await rpc_peer.ping() + return await self.routing_table.add_peer(peer, probe) def add_peer(self, peer: 'KademliaPeer'): if peer.node_id == self.node_id: @@ -439,7 +377,6 @@ class KademliaProtocol(DatagramProtocol): async with self._split_lock: peer = self._to_remove.pop() self.routing_table.remove_peer(peer) - self.routing_table.join_buckets() while self._to_add: async with self._split_lock: await self._add_peer(self._to_add.pop()) diff --git a/lbry/dht/protocol/routing_table.py b/lbry/dht/protocol/routing_table.py index 42c82e2bc..676b488d8 100644 --- a/lbry/dht/protocol/routing_table.py +++ b/lbry/dht/protocol/routing_table.py @@ -248,15 +248,10 @@ class TreeRoutingTable: bucket_index = self.kbucket_index(peer.node_id) try: self.buckets[bucket_index].remove_peer(peer) + self._join_buckets() except ValueError: return - def touch_kbucket(self, key: bytes) -> None: - self.touch_kbucket_by_index(self.kbucket_index(key)) - - def touch_kbucket_by_index(self, bucket_index: int): - self.buckets[bucket_index].last_accessed = int(self._loop.time()) - def kbucket_index(self, key: bytes) -> int: i = 0 for bucket in self.buckets: @@ -303,7 +298,7 @@ class TreeRoutingTable: old_bucket.remove_peer(contact) self.bucket_in_routing_table_metric.labels("global").set(len(self.buckets)) - def join_buckets(self): + def _join_buckets(self): if len(self.buckets) == 1: return to_pop = [i for i, bucket in enumerate(self.buckets) if len(bucket) == 0] @@ -326,14 +321,7 @@ class TreeRoutingTable: self.buckets[bucket_index_to_pop + 1].range_min = bucket.range_min self.buckets.remove(bucket) self.bucket_in_routing_table_metric.labels("global").set(len(self.buckets)) - return self.join_buckets() - - def contact_in_routing_table(self, address_tuple: typing.Tuple[str, int]) -> bool: - for bucket in self.buckets: - for contact in bucket.get_peers(sort_distance_to=False): - if address_tuple[0] == contact.address and address_tuple[1] == contact.udp_port: - return True - return False + return self._join_buckets() def buckets_with_contacts(self) -> int: count = 0 @@ -341,3 +329,70 @@ class TreeRoutingTable: if len(bucket) > 0: count += 1 return count + + async def add_peer(self, peer: 'KademliaPeer', probe: typing.Callable[['KademliaPeer'], typing.Awaitable]): + if not peer.node_id: + log.warning("Tried adding a peer with no node id!") + return False + for my_peer in self.get_peers(): + if (my_peer.address, my_peer.udp_port) == (peer.address, peer.udp_port) and my_peer.node_id != peer.node_id: + self.remove_peer(my_peer) + self._join_buckets() + bucket_index = self.kbucket_index(peer.node_id) + if self.buckets[bucket_index].add_peer(peer): + return True + + # The bucket is full; see if it can be split (by checking if its range includes the host node's node_id) + if self.should_split(bucket_index, peer.node_id): + self.split_bucket(bucket_index) + # Retry the insertion attempt + result = await self.add_peer(peer, probe) + self._join_buckets() + return result + else: + # We can't split the k-bucket + # + # The 13 page kademlia paper specifies that the least recently contacted node in the bucket + # shall be pinged. If it fails to reply it is replaced with the new contact. If the ping is successful + # the new contact is ignored and not added to the bucket (sections 2.2 and 2.4). + # + # A reasonable extension to this is BEP 0005, which extends the above: + # + # Not all nodes that we learn about are equal. Some are "good" and some are not. + # Many nodes using the DHT are able to send queries and receive responses, + # but are not able to respond to queries from other nodes. It is important that + # each node's routing table must contain only known good nodes. A good node is + # a node has responded to one of our queries within the last 15 minutes. A node + # is also good if it has ever responded to one of our queries and has sent us a + # query within the last 15 minutes. After 15 minutes of inactivity, a node becomes + # questionable. Nodes become bad when they fail to respond to multiple queries + # in a row. Nodes that we know are good are given priority over nodes with unknown status. + # + # When there are bad or questionable nodes in the bucket, the least recent is selected for + # potential replacement (BEP 0005). When all nodes in the bucket are fresh, the head (least recent) + # contact is selected as described in section 2.2 of the kademlia paper. In both cases the new contact + # is ignored if the pinged node replies. + + not_good_contacts = self.buckets[bucket_index].get_bad_or_unknown_peers() + not_recently_replied = [] + for my_peer in not_good_contacts: + last_replied = self._peer_manager.get_last_replied(my_peer.address, my_peer.udp_port) + if not last_replied or last_replied + 60 < self._loop.time(): + not_recently_replied.append(my_peer) + if not_recently_replied: + to_replace = not_recently_replied[0] + else: + to_replace = self.buckets[bucket_index].peers[0] + last_replied = self._peer_manager.get_last_replied(to_replace.address, to_replace.udp_port) + if last_replied and last_replied + 60 > self._loop.time(): + return False + log.debug("pinging %s:%s", to_replace.address, to_replace.udp_port) + try: + await probe(to_replace) + return False + except asyncio.TimeoutError: + log.debug("Replacing dead contact in bucket %i: %s:%i with %s:%i ", bucket_index, + to_replace.address, to_replace.udp_port, peer.address, peer.udp_port) + if to_replace in self.buckets[bucket_index]: + self.buckets[bucket_index].remove_peer(to_replace) + return await self.add_peer(peer, probe)