From 0d343ecb2f37291b6b98ba9565e62e1cf8544ab6 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 11 Jul 2022 02:00:49 -0300 Subject: [PATCH 1/9] simplify iterative find constructor --- lbry/dht/node.py | 10 +++---- lbry/dht/protocol/iterative_find.py | 45 +++++++---------------------- 2 files changed, 14 insertions(+), 41 deletions(-) diff --git a/lbry/dht/node.py b/lbry/dht/node.py index 2bb6edfcd..2a9ebc4e1 100644 --- a/lbry/dht/node.py +++ b/lbry/dht/node.py @@ -203,15 +203,13 @@ class Node: def get_iterative_node_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None, max_results: int = constants.K) -> IterativeNodeFinder: - - return IterativeNodeFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol, - key, max_results, None, shortlist) + shortlist = shortlist or self.protocol.routing_table.find_close_peers(key) + return IterativeNodeFinder(self.loop, self.protocol, key, max_results, shortlist) def get_iterative_value_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None, max_results: int = -1) -> IterativeValueFinder: - - return IterativeValueFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol, - key, max_results, None, shortlist) + shortlist = shortlist or self.protocol.routing_table.find_close_peers(key) + return IterativeValueFinder(self.loop, self.protocol, key, max_results, shortlist) async def peer_search(self, node_id: bytes, count=constants.K, max_results=constants.K * 2, shortlist: typing.Optional[typing.List['KademliaPeer']] = None diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index 9a557ef51..d41b3b2a0 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -12,7 +12,6 @@ from lbry.dht.peer import make_kademlia_peer from lbry.dht.serialization.datagram import PAGE_KEY if TYPE_CHECKING: - from lbry.dht.protocol.routing_table import TreeRoutingTable from lbry.dht.protocol.protocol import KademliaProtocol from lbry.dht.peer import PeerManager, KademliaPeer @@ -57,37 +56,19 @@ class FindValueResponse(FindResponse): return [(node_id, address.decode(), port) for node_id, address, port in self.close_triples] -def get_shortlist(routing_table: 'TreeRoutingTable', key: bytes, - shortlist: typing.Optional[typing.List['KademliaPeer']]) -> typing.List['KademliaPeer']: - """ - If not provided, initialize the shortlist of peers to probe to the (up to) k closest peers in the routing table - - :param routing_table: a TreeRoutingTable - :param key: a 48 byte hash - :param shortlist: optional manually provided shortlist, this is done during bootstrapping when there are no - peers in the routing table. During bootstrap the shortlist is set to be the seed nodes. - """ - if len(key) != constants.HASH_LENGTH: - raise ValueError("invalid key length: %i" % len(key)) - return shortlist or routing_table.find_close_peers(key) - - class IterativeFinder(AsyncIterator): - def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', - routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes, + def __init__(self, loop: asyncio.AbstractEventLoop, + protocol: 'KademliaProtocol', key: bytes, max_results: typing.Optional[int] = constants.K, - exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None, shortlist: typing.Optional[typing.List['KademliaPeer']] = None): if len(key) != constants.HASH_LENGTH: raise ValueError("invalid key length: %i" % len(key)) self.loop = loop - self.peer_manager = peer_manager - self.routing_table = routing_table + self.peer_manager = protocol.peer_manager self.protocol = protocol self.key = key self.max_results = max(constants.K, max_results) - self.exclude = exclude or [] self.active: typing.Dict['KademliaPeer', int] = OrderedDict() # peer: distance, sorted self.contacted: typing.Set['KademliaPeer'] = set() @@ -99,7 +80,7 @@ class IterativeFinder(AsyncIterator): self.iteration_count = 0 self.running = False self.tasks: typing.List[asyncio.Task] = [] - for peer in get_shortlist(routing_table, key, shortlist): + for peer in shortlist: if peer.node_id: self._add_active(peer, force=True) else: @@ -198,8 +179,6 @@ class IterativeFinder(AsyncIterator): if index > (constants.K + len(self.running_probes)): break origin_address = (peer.address, peer.udp_port) - if origin_address in self.exclude: - continue if peer.node_id == self.protocol.node_id: continue if origin_address == (self.protocol.external_ip, self.protocol.udp_port): @@ -277,13 +256,11 @@ class IterativeFinder(AsyncIterator): type(self).__name__, id(self), self.key.hex()[:8]) class IterativeNodeFinder(IterativeFinder): - def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', - routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes, + def __init__(self, loop: asyncio.AbstractEventLoop, + protocol: 'KademliaProtocol', key: bytes, max_results: typing.Optional[int] = constants.K, - exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None, shortlist: typing.Optional[typing.List['KademliaPeer']] = None): - super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude, - shortlist) + super().__init__(loop, protocol, key, max_results, shortlist) self.yielded_peers: typing.Set['KademliaPeer'] = set() async def send_probe(self, peer: 'KademliaPeer') -> FindNodeResponse: @@ -319,13 +296,11 @@ class IterativeNodeFinder(IterativeFinder): class IterativeValueFinder(IterativeFinder): - def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', - routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes, + def __init__(self, loop: asyncio.AbstractEventLoop, + protocol: 'KademliaProtocol', key: bytes, max_results: typing.Optional[int] = constants.K, - exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None, shortlist: typing.Optional[typing.List['KademliaPeer']] = None): - super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude, - shortlist) + super().__init__(loop, protocol, key, max_results, shortlist) self.blob_peers: typing.Set['KademliaPeer'] = set() # this tracks the index of the most recent page we requested from each peer self.peer_pages: typing.DefaultDict['KademliaPeer', int] = defaultdict(int) From 972db8024646df9c1a436407c1e06b4dc9870ffd Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 11 Jul 2022 02:38:55 -0300 Subject: [PATCH 2/9] move add peer logic to routing table --- lbry/dht/protocol/protocol.py | 71 ++----------------------- lbry/dht/protocol/routing_table.py | 85 ++++++++++++++++++++++++------ 2 files changed, 74 insertions(+), 82 deletions(-) diff --git a/lbry/dht/protocol/protocol.py b/lbry/dht/protocol/protocol.py index 6741fa828..c34c129c3 100644 --- a/lbry/dht/protocol/protocol.py +++ b/lbry/dht/protocol/protocol.py @@ -356,72 +356,10 @@ class KademliaProtocol(DatagramProtocol): return args, {} async def _add_peer(self, peer: 'KademliaPeer'): - if not peer.node_id: - log.warning("Tried adding a peer with no node id!") - return False - for my_peer in self.routing_table.get_peers(): - if (my_peer.address, my_peer.udp_port) == (peer.address, peer.udp_port) and my_peer.node_id != peer.node_id: - self.routing_table.remove_peer(my_peer) - self.routing_table.join_buckets() - bucket_index = self.routing_table.kbucket_index(peer.node_id) - if self.routing_table.buckets[bucket_index].add_peer(peer): - return True - - # The bucket is full; see if it can be split (by checking if its range includes the host node's node_id) - if self.routing_table.should_split(bucket_index, peer.node_id): - self.routing_table.split_bucket(bucket_index) - # Retry the insertion attempt - result = await self._add_peer(peer) - self.routing_table.join_buckets() - return result - else: - # We can't split the k-bucket - # - # The 13 page kademlia paper specifies that the least recently contacted node in the bucket - # shall be pinged. If it fails to reply it is replaced with the new contact. If the ping is successful - # the new contact is ignored and not added to the bucket (sections 2.2 and 2.4). - # - # A reasonable extension to this is BEP 0005, which extends the above: - # - # Not all nodes that we learn about are equal. Some are "good" and some are not. - # Many nodes using the DHT are able to send queries and receive responses, - # but are not able to respond to queries from other nodes. It is important that - # each node's routing table must contain only known good nodes. A good node is - # a node has responded to one of our queries within the last 15 minutes. A node - # is also good if it has ever responded to one of our queries and has sent us a - # query within the last 15 minutes. After 15 minutes of inactivity, a node becomes - # questionable. Nodes become bad when they fail to respond to multiple queries - # in a row. Nodes that we know are good are given priority over nodes with unknown status. - # - # When there are bad or questionable nodes in the bucket, the least recent is selected for - # potential replacement (BEP 0005). When all nodes in the bucket are fresh, the head (least recent) - # contact is selected as described in section 2.2 of the kademlia paper. In both cases the new contact - # is ignored if the pinged node replies. - - not_good_contacts = self.routing_table.buckets[bucket_index].get_bad_or_unknown_peers() - not_recently_replied = [] - for my_peer in not_good_contacts: - last_replied = self.peer_manager.get_last_replied(my_peer.address, my_peer.udp_port) - if not last_replied or last_replied + 60 < self.loop.time(): - not_recently_replied.append(my_peer) - if not_recently_replied: - to_replace = not_recently_replied[0] - else: - to_replace = self.routing_table.buckets[bucket_index].peers[0] - last_replied = self.peer_manager.get_last_replied(to_replace.address, to_replace.udp_port) - if last_replied and last_replied + 60 > self.loop.time(): - return False - log.debug("pinging %s:%s", to_replace.address, to_replace.udp_port) - try: - to_replace_rpc = self.get_rpc_peer(to_replace) - await to_replace_rpc.ping() - return False - except asyncio.TimeoutError: - log.debug("Replacing dead contact in bucket %i: %s:%i with %s:%i ", bucket_index, - to_replace.address, to_replace.udp_port, peer.address, peer.udp_port) - if to_replace in self.routing_table.buckets[bucket_index]: - self.routing_table.buckets[bucket_index].remove_peer(to_replace) - return await self._add_peer(peer) + async def probe(some_peer: 'KademliaPeer'): + rpc_peer = self.get_rpc_peer(some_peer) + await rpc_peer.ping() + return await self.routing_table.add_peer(peer, probe) def add_peer(self, peer: 'KademliaPeer'): if peer.node_id == self.node_id: @@ -439,7 +377,6 @@ class KademliaProtocol(DatagramProtocol): async with self._split_lock: peer = self._to_remove.pop() self.routing_table.remove_peer(peer) - self.routing_table.join_buckets() while self._to_add: async with self._split_lock: await self._add_peer(self._to_add.pop()) diff --git a/lbry/dht/protocol/routing_table.py b/lbry/dht/protocol/routing_table.py index 42c82e2bc..676b488d8 100644 --- a/lbry/dht/protocol/routing_table.py +++ b/lbry/dht/protocol/routing_table.py @@ -248,15 +248,10 @@ class TreeRoutingTable: bucket_index = self.kbucket_index(peer.node_id) try: self.buckets[bucket_index].remove_peer(peer) + self._join_buckets() except ValueError: return - def touch_kbucket(self, key: bytes) -> None: - self.touch_kbucket_by_index(self.kbucket_index(key)) - - def touch_kbucket_by_index(self, bucket_index: int): - self.buckets[bucket_index].last_accessed = int(self._loop.time()) - def kbucket_index(self, key: bytes) -> int: i = 0 for bucket in self.buckets: @@ -303,7 +298,7 @@ class TreeRoutingTable: old_bucket.remove_peer(contact) self.bucket_in_routing_table_metric.labels("global").set(len(self.buckets)) - def join_buckets(self): + def _join_buckets(self): if len(self.buckets) == 1: return to_pop = [i for i, bucket in enumerate(self.buckets) if len(bucket) == 0] @@ -326,14 +321,7 @@ class TreeRoutingTable: self.buckets[bucket_index_to_pop + 1].range_min = bucket.range_min self.buckets.remove(bucket) self.bucket_in_routing_table_metric.labels("global").set(len(self.buckets)) - return self.join_buckets() - - def contact_in_routing_table(self, address_tuple: typing.Tuple[str, int]) -> bool: - for bucket in self.buckets: - for contact in bucket.get_peers(sort_distance_to=False): - if address_tuple[0] == contact.address and address_tuple[1] == contact.udp_port: - return True - return False + return self._join_buckets() def buckets_with_contacts(self) -> int: count = 0 @@ -341,3 +329,70 @@ class TreeRoutingTable: if len(bucket) > 0: count += 1 return count + + async def add_peer(self, peer: 'KademliaPeer', probe: typing.Callable[['KademliaPeer'], typing.Awaitable]): + if not peer.node_id: + log.warning("Tried adding a peer with no node id!") + return False + for my_peer in self.get_peers(): + if (my_peer.address, my_peer.udp_port) == (peer.address, peer.udp_port) and my_peer.node_id != peer.node_id: + self.remove_peer(my_peer) + self._join_buckets() + bucket_index = self.kbucket_index(peer.node_id) + if self.buckets[bucket_index].add_peer(peer): + return True + + # The bucket is full; see if it can be split (by checking if its range includes the host node's node_id) + if self.should_split(bucket_index, peer.node_id): + self.split_bucket(bucket_index) + # Retry the insertion attempt + result = await self.add_peer(peer, probe) + self._join_buckets() + return result + else: + # We can't split the k-bucket + # + # The 13 page kademlia paper specifies that the least recently contacted node in the bucket + # shall be pinged. If it fails to reply it is replaced with the new contact. If the ping is successful + # the new contact is ignored and not added to the bucket (sections 2.2 and 2.4). + # + # A reasonable extension to this is BEP 0005, which extends the above: + # + # Not all nodes that we learn about are equal. Some are "good" and some are not. + # Many nodes using the DHT are able to send queries and receive responses, + # but are not able to respond to queries from other nodes. It is important that + # each node's routing table must contain only known good nodes. A good node is + # a node has responded to one of our queries within the last 15 minutes. A node + # is also good if it has ever responded to one of our queries and has sent us a + # query within the last 15 minutes. After 15 minutes of inactivity, a node becomes + # questionable. Nodes become bad when they fail to respond to multiple queries + # in a row. Nodes that we know are good are given priority over nodes with unknown status. + # + # When there are bad or questionable nodes in the bucket, the least recent is selected for + # potential replacement (BEP 0005). When all nodes in the bucket are fresh, the head (least recent) + # contact is selected as described in section 2.2 of the kademlia paper. In both cases the new contact + # is ignored if the pinged node replies. + + not_good_contacts = self.buckets[bucket_index].get_bad_or_unknown_peers() + not_recently_replied = [] + for my_peer in not_good_contacts: + last_replied = self._peer_manager.get_last_replied(my_peer.address, my_peer.udp_port) + if not last_replied or last_replied + 60 < self._loop.time(): + not_recently_replied.append(my_peer) + if not_recently_replied: + to_replace = not_recently_replied[0] + else: + to_replace = self.buckets[bucket_index].peers[0] + last_replied = self._peer_manager.get_last_replied(to_replace.address, to_replace.udp_port) + if last_replied and last_replied + 60 > self._loop.time(): + return False + log.debug("pinging %s:%s", to_replace.address, to_replace.udp_port) + try: + await probe(to_replace) + return False + except asyncio.TimeoutError: + log.debug("Replacing dead contact in bucket %i: %s:%i with %s:%i ", bucket_index, + to_replace.address, to_replace.udp_port, peer.address, peer.udp_port) + if to_replace in self.buckets[bucket_index]: + self.buckets[bucket_index].remove_peer(to_replace) + return await self.add_peer(peer, probe) From d7b65c15d2838fb3a2d14c448c8c02acecd52465 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 11 Jul 2022 03:04:44 -0300 Subject: [PATCH 3/9] return none instead of raising --- lbry/dht/protocol/protocol.py | 7 +++---- lbry/dht/protocol/routing_table.py | 7 +------ 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/lbry/dht/protocol/protocol.py b/lbry/dht/protocol/protocol.py index c34c129c3..813fc5813 100644 --- a/lbry/dht/protocol/protocol.py +++ b/lbry/dht/protocol/protocol.py @@ -229,7 +229,7 @@ class PingQueue: async def ping_task(): try: if self._protocol.peer_manager.peer_is_good(peer): - if peer not in self._protocol.routing_table.get_peers(): + if not self._protocol.routing_table.get_peer(peer.node_id): self._protocol.add_peer(peer) return await self._protocol.get_rpc_peer(peer).ping() @@ -419,9 +419,8 @@ class KademliaProtocol(DatagramProtocol): # This is an RPC method request self.received_request_metric.labels(method=request_datagram.method).inc() self.peer_manager.report_last_requested(address[0], address[1]) - try: - peer = self.routing_table.get_peer(request_datagram.node_id) - except IndexError: + peer = self.routing_table.get_peer(request_datagram.node_id) + if not peer: try: peer = make_kademlia_peer(request_datagram.node_id, address[0], address[1]) except ValueError as err: diff --git a/lbry/dht/protocol/routing_table.py b/lbry/dht/protocol/routing_table.py index 676b488d8..4db26cc22 100644 --- a/lbry/dht/protocol/routing_table.py +++ b/lbry/dht/protocol/routing_table.py @@ -82,7 +82,6 @@ class KBucket: for peer in self.peers: if peer.node_id == node_id: return peer - raise IndexError(node_id) def get_peers(self, count=-1, exclude_contact=None, sort_distance_to=None) -> typing.List['KademliaPeer']: """ Returns a list containing up to the first count number of contacts @@ -225,11 +224,7 @@ class TreeRoutingTable: return [] def get_peer(self, contact_id: bytes) -> 'KademliaPeer': - """ - @raise IndexError: No contact with the specified contact ID is known - by this node - """ - return self.buckets[self.kbucket_index(contact_id)].get_peer(contact_id) + return self.buckets[self._kbucket_index(contact_id)].get_peer(contact_id) def get_refresh_list(self, start_index: int = 0, force: bool = False) -> typing.List[bytes]: bucket_index = start_index From d8c1aaebc200565d5166535e0c65b09c4c2c9964 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 11 Jul 2022 03:05:34 -0300 Subject: [PATCH 4/9] routing table: mark private methods --- lbry/dht/node.py | 7 ------- lbry/dht/protocol/routing_table.py | 27 +++++++++++++++++---------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/lbry/dht/node.py b/lbry/dht/node.py index 2a9ebc4e1..47cbff39e 100644 --- a/lbry/dht/node.py +++ b/lbry/dht/node.py @@ -70,13 +70,6 @@ class Node: # get ids falling in the midpoint of each bucket that hasn't been recently updated node_ids = self.protocol.routing_table.get_refresh_list(0, True) - # if we have 3 or fewer populated buckets get two random ids in the range of each to try and - # populate/split the buckets further - buckets_with_contacts = self.protocol.routing_table.buckets_with_contacts() - if buckets_with_contacts <= 3: - for i in range(buckets_with_contacts): - node_ids.append(self.protocol.routing_table.random_id_in_bucket_range(i)) - node_ids.append(self.protocol.routing_table.random_id_in_bucket_range(i)) if self.protocol.routing_table.get_peers(): # if we have node ids to look up, perform the iterative search until we have k results diff --git a/lbry/dht/protocol/routing_table.py b/lbry/dht/protocol/routing_table.py index 4db26cc22..73c18ac47 100644 --- a/lbry/dht/protocol/routing_table.py +++ b/lbry/dht/protocol/routing_table.py @@ -199,7 +199,7 @@ class TreeRoutingTable: def get_peers(self) -> typing.List['KademliaPeer']: return list(itertools.chain.from_iterable(map(lambda bucket: bucket.peers, self.buckets))) - def should_split(self, bucket_index: int, to_add: bytes) -> bool: + def _should_split(self, bucket_index: int, to_add: bytes) -> bool: # https://stackoverflow.com/questions/32129978/highly-unbalanced-kademlia-routing-table/32187456#32187456 if bucket_index < self._split_buckets_under_index: return True @@ -232,22 +232,29 @@ class TreeRoutingTable: now = int(self._loop.time()) for bucket in self.buckets[start_index:]: if force or now - bucket.last_accessed >= constants.REFRESH_INTERVAL: - to_search = self.midpoint_id_in_bucket_range(bucket_index) + to_search = self._midpoint_id_in_bucket_range(bucket_index) refresh_ids.append(to_search) bucket_index += 1 + # if we have 3 or fewer populated buckets get two random ids in the range of each to try and + # populate/split the buckets further + buckets_with_contacts = self.buckets_with_contacts() + if buckets_with_contacts <= 3: + for i in range(buckets_with_contacts): + refresh_ids.append(self._random_id_in_bucket_range(i)) + refresh_ids.append(self._random_id_in_bucket_range(i)) return refresh_ids def remove_peer(self, peer: 'KademliaPeer') -> None: if not peer.node_id: return - bucket_index = self.kbucket_index(peer.node_id) + bucket_index = self._kbucket_index(peer.node_id) try: self.buckets[bucket_index].remove_peer(peer) self._join_buckets() except ValueError: return - def kbucket_index(self, key: bytes) -> int: + def _kbucket_index(self, key: bytes) -> int: i = 0 for bucket in self.buckets: if bucket.key_in_range(key): @@ -256,19 +263,19 @@ class TreeRoutingTable: i += 1 return i - def random_id_in_bucket_range(self, bucket_index: int) -> bytes: + def _random_id_in_bucket_range(self, bucket_index: int) -> bytes: random_id = int(random.randrange(self.buckets[bucket_index].range_min, self.buckets[bucket_index].range_max)) return Distance( self._parent_node_id )(random_id.to_bytes(constants.HASH_LENGTH, 'big')).to_bytes(constants.HASH_LENGTH, 'big') - def midpoint_id_in_bucket_range(self, bucket_index: int) -> bytes: + def _midpoint_id_in_bucket_range(self, bucket_index: int) -> bytes: half = int((self.buckets[bucket_index].range_max - self.buckets[bucket_index].range_min) // 2) return Distance(self._parent_node_id)( int(self.buckets[bucket_index].range_min + half).to_bytes(constants.HASH_LENGTH, 'big') ).to_bytes(constants.HASH_LENGTH, 'big') - def split_bucket(self, old_bucket_index: int) -> None: + def _split_bucket(self, old_bucket_index: int) -> None: """ Splits the specified k-bucket into two new buckets which together cover the same range in the key/ID space @@ -333,13 +340,13 @@ class TreeRoutingTable: if (my_peer.address, my_peer.udp_port) == (peer.address, peer.udp_port) and my_peer.node_id != peer.node_id: self.remove_peer(my_peer) self._join_buckets() - bucket_index = self.kbucket_index(peer.node_id) + bucket_index = self._kbucket_index(peer.node_id) if self.buckets[bucket_index].add_peer(peer): return True # The bucket is full; see if it can be split (by checking if its range includes the host node's node_id) - if self.should_split(bucket_index, peer.node_id): - self.split_bucket(bucket_index) + if self._should_split(bucket_index, peer.node_id): + self._split_bucket(bucket_index) # Retry the insertion attempt result = await self.add_peer(peer, probe) self._join_buckets() From 318728aebda018f1273dfe7bc82f57217751ae2c Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 11 Jul 2022 04:11:01 -0300 Subject: [PATCH 5/9] add bootstrap flag to routing table --- lbry/dht/node.py | 4 ++-- lbry/dht/protocol/protocol.py | 9 +++++++-- lbry/dht/protocol/routing_table.py | 19 ++++++++++++++----- scripts/dht_node.py | 2 +- tests/unit/dht/test_node.py | 28 ++++++++++++++++++++++++++++ 5 files changed, 52 insertions(+), 10 deletions(-) diff --git a/lbry/dht/node.py b/lbry/dht/node.py index 47cbff39e..74270c404 100644 --- a/lbry/dht/node.py +++ b/lbry/dht/node.py @@ -30,12 +30,12 @@ class Node: ) def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int, internal_udp_port: int, peer_port: int, external_ip: str, rpc_timeout: float = constants.RPC_TIMEOUT, - split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX, + split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX, is_bootstrap_node: bool = False, storage: typing.Optional['SQLiteStorage'] = None): self.loop = loop self.internal_udp_port = internal_udp_port self.protocol = KademliaProtocol(loop, peer_manager, node_id, external_ip, udp_port, peer_port, rpc_timeout, - split_buckets_under_index) + split_buckets_under_index, is_bootstrap_node) self.listening_port: asyncio.DatagramTransport = None self.joined = asyncio.Event(loop=self.loop) self._join_task: asyncio.Task = None diff --git a/lbry/dht/protocol/protocol.py b/lbry/dht/protocol/protocol.py index 813fc5813..89563c89b 100644 --- a/lbry/dht/protocol/protocol.py +++ b/lbry/dht/protocol/protocol.py @@ -218,6 +218,10 @@ class PingQueue: def running(self): return self._running + @property + def busy(self): + return self._running and (any(self._running_pings) or any(self._pending_contacts)) + def enqueue_maybe_ping(self, *peers: 'KademliaPeer', delay: typing.Optional[float] = None): delay = delay if delay is not None else self._default_delay now = self._loop.time() @@ -294,7 +298,7 @@ class KademliaProtocol(DatagramProtocol): def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, external_ip: str, udp_port: int, peer_port: int, rpc_timeout: float = constants.RPC_TIMEOUT, - split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX): + split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX, is_boostrap_node: bool = False): self.peer_manager = peer_manager self.loop = loop self.node_id = node_id @@ -309,7 +313,8 @@ class KademliaProtocol(DatagramProtocol): self.transport: DatagramTransport = None self.old_token_secret = constants.generate_id() self.token_secret = constants.generate_id() - self.routing_table = TreeRoutingTable(self.loop, self.peer_manager, self.node_id, split_buckets_under_index) + self.routing_table = TreeRoutingTable( + self.loop, self.peer_manager, self.node_id, split_buckets_under_index, is_bootstrap_node=is_boostrap_node) self.data_store = DictDataStore(self.loop, self.peer_manager) self.ping_queue = PingQueue(self.loop, self) self.node_rpc = KademliaRPC(self, self.loop, self.peer_port) diff --git a/lbry/dht/protocol/routing_table.py b/lbry/dht/protocol/routing_table.py index 73c18ac47..b2e927b57 100644 --- a/lbry/dht/protocol/routing_table.py +++ b/lbry/dht/protocol/routing_table.py @@ -28,7 +28,8 @@ class KBucket: namespace="dht_node", labelnames=("amount",) ) - def __init__(self, peer_manager: 'PeerManager', range_min: int, range_max: int, node_id: bytes): + def __init__(self, peer_manager: 'PeerManager', range_min: int, range_max: int, + node_id: bytes, capacity: int = constants.K): """ @param range_min: The lower boundary for the range in the n-bit ID space covered by this k-bucket @@ -42,6 +43,7 @@ class KBucket: self.peers: typing.List['KademliaPeer'] = [] self._node_id = node_id self._distance_to_self = Distance(node_id) + self.capacity = capacity def add_peer(self, peer: 'KademliaPeer') -> bool: """ Add contact to _contact list in the right order. This will move the @@ -68,7 +70,7 @@ class KBucket: self.peers.remove(local_peer) self.peers.append(peer) return True - if len(self.peers) < constants.K: + if len(self.peers) < self.capacity: self.peers.append(peer) self.peer_in_routing_table_metric.labels("global").inc() bits_colliding = utils.get_colliding_prefix_bits(peer.node_id, self._node_id) @@ -76,7 +78,6 @@ class KBucket: return True else: return False - # raise BucketFull("No space in bucket to insert contact") def get_peer(self, node_id: bytes) -> 'KademliaPeer': for peer in self.peers: @@ -178,6 +179,13 @@ class TreeRoutingTable: version of the Kademlia paper, in section 2.4. It does, however, use the ping RPC-based k-bucket eviction algorithm described in section 2.2 of that paper. + + BOOTSTRAP MODE: if set to True, we always add all peers. This is so a + bootstrap node does not get a bias towards its own node id and replies are + the best it can provide (joining peer knows its neighbors immediately). + Over time, this will need to be optimized so we use the disk as holding + everything in memory won't be feasible anymore. + See: https://github.com/bittorrent/bootstrap-dht """ bucket_in_routing_table_metric = Gauge( "buckets_in_routing_table", "Number of buckets on routing table", namespace="dht_node", @@ -185,14 +193,15 @@ class TreeRoutingTable: ) def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', parent_node_id: bytes, - split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX): + split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX, is_bootstrap_node: bool = False): self._loop = loop self._peer_manager = peer_manager self._parent_node_id = parent_node_id self._split_buckets_under_index = split_buckets_under_index self.buckets: typing.List[KBucket] = [ KBucket( - self._peer_manager, range_min=0, range_max=2 ** constants.HASH_BITS, node_id=self._parent_node_id + self._peer_manager, range_min=0, range_max=2 ** constants.HASH_BITS, node_id=self._parent_node_id, + capacity=1 << 32 if is_bootstrap_node else constants.K ) ] diff --git a/scripts/dht_node.py b/scripts/dht_node.py index 3dd6dca82..8e03dad93 100644 --- a/scripts/dht_node.py +++ b/scripts/dht_node.py @@ -83,7 +83,7 @@ async def main(host: str, port: int, db_file_path: str, bootstrap_node: Optional await storage.open() node = Node( loop, PeerManager(loop), node_id, port, port, 3333, None, - storage=storage + storage=storage, is_bootstrap_node=True ) if prometheus_port > 0: metrics = SimpleMetrics(prometheus_port, node if export else None) diff --git a/tests/unit/dht/test_node.py b/tests/unit/dht/test_node.py index 5ecad5181..051a2e6db 100644 --- a/tests/unit/dht/test_node.py +++ b/tests/unit/dht/test_node.py @@ -11,6 +11,34 @@ from lbry.dht.peer import PeerManager, make_kademlia_peer from lbry.extras.daemon.storage import SQLiteStorage +class TestBootstrapNode(AsyncioTestCase): + TIMEOUT = 10.0 # do not increase. Hitting a timeout is a real failure + async def test_it_adds_all(self): + loop = asyncio.get_event_loop() + loop.set_debug(False) + + with dht_mocks.mock_network_loop(loop): + advance = dht_mocks.get_time_accelerator(loop) + self.bootstrap_node = Node(self.loop, PeerManager(loop), constants.generate_id(), + 4444, 4444, 3333, '1.2.3.4', is_bootstrap_node=True) + self.bootstrap_node.start('1.2.3.4', []) + self.bootstrap_node.protocol.ping_queue._default_delay = 0 + self.addCleanup(self.bootstrap_node.stop) + + # start the nodes + nodes = {} + futs = [] + for i in range(100): + nodes[i] = Node(loop, PeerManager(loop), constants.generate_id(i), 4444, 4444, 3333, f'1.3.3.{i}') + nodes[i].start(f'1.3.3.{i}', [('1.2.3.4', 4444)]) + self.addCleanup(nodes[i].stop) + futs.append(nodes[i].joined.wait()) + await asyncio.gather(*futs) + while self.bootstrap_node.protocol.ping_queue.busy: + await advance(1) + self.assertEqual(100, len(self.bootstrap_node.protocol.routing_table.get_peers())) + + class TestNodePingQueueDiscover(AsyncioTestCase): async def test_ping_queue_discover(self): loop = asyncio.get_event_loop() From c3e4f0b9882f761286ab6ef4186535b2e3868049 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 12 Jul 2022 23:26:36 -0300 Subject: [PATCH 6/9] add 'is_bootstrap_node' conf --- lbry/conf.py | 4 ++++ lbry/extras/daemon/components.py | 1 + 2 files changed, 5 insertions(+) diff --git a/lbry/conf.py b/lbry/conf.py index 6da4952d1..d2dc8bb3a 100644 --- a/lbry/conf.py +++ b/lbry/conf.py @@ -624,6 +624,10 @@ class Config(CLIConfig): "will increase. This setting is used by seed nodes, you probably don't want to change it during normal " "use.", 2 ) + is_bootstrap_node = Toggle( + "When running as a bootstrap node, disable all logic related to balancing the routing table, so we can " + "add as many peers as possible and better help first-runs.", False + ) # protocol timeouts download_timeout = Float("Cumulative timeout for a stream to begin downloading before giving up", 30.0) diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index e061c4363..637ede42e 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -297,6 +297,7 @@ class DHTComponent(Component): peer_port=self.external_peer_port, rpc_timeout=self.conf.node_rpc_timeout, split_buckets_under_index=self.conf.split_buckets_under_index, + is_bootstrap_node=self.conf.is_bootstrap_node, storage=storage ) self.dht_node.start(self.conf.network_interface, self.conf.known_dht_nodes) From e887453aa5f3ea33cddd79c1ae8f54029393dc78 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 11 Aug 2022 20:39:51 -0300 Subject: [PATCH 7/9] remove unused last_accessed --- lbry/dht/protocol/routing_table.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/lbry/dht/protocol/routing_table.py b/lbry/dht/protocol/routing_table.py index b2e927b57..516b72818 100644 --- a/lbry/dht/protocol/routing_table.py +++ b/lbry/dht/protocol/routing_table.py @@ -37,7 +37,6 @@ class KBucket: covered by this k-bucket """ self._peer_manager = peer_manager - self.last_accessed = 0 self.range_min = range_min self.range_max = range_max self.peers: typing.List['KademliaPeer'] = [] @@ -240,9 +239,8 @@ class TreeRoutingTable: refresh_ids = [] now = int(self._loop.time()) for bucket in self.buckets[start_index:]: - if force or now - bucket.last_accessed >= constants.REFRESH_INTERVAL: - to_search = self._midpoint_id_in_bucket_range(bucket_index) - refresh_ids.append(to_search) + to_search = self._midpoint_id_in_bucket_range(bucket_index) + refresh_ids.append(to_search) bucket_index += 1 # if we have 3 or fewer populated buckets get two random ids in the range of each to try and # populate/split the buckets further From d61accea1a704ebb001bafade22e73f917af89ab Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 11 Aug 2022 21:14:56 -0300 Subject: [PATCH 8/9] simplify bucket refresh loop --- lbry/dht/protocol/routing_table.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/lbry/dht/protocol/routing_table.py b/lbry/dht/protocol/routing_table.py index 516b72818..11b64b8f7 100644 --- a/lbry/dht/protocol/routing_table.py +++ b/lbry/dht/protocol/routing_table.py @@ -235,13 +235,9 @@ class TreeRoutingTable: return self.buckets[self._kbucket_index(contact_id)].get_peer(contact_id) def get_refresh_list(self, start_index: int = 0, force: bool = False) -> typing.List[bytes]: - bucket_index = start_index refresh_ids = [] - now = int(self._loop.time()) - for bucket in self.buckets[start_index:]: - to_search = self._midpoint_id_in_bucket_range(bucket_index) - refresh_ids.append(to_search) - bucket_index += 1 + for offset, _ in enumerate(self.buckets[start_index:]): + refresh_ids.append(self._midpoint_id_in_bucket_range(start_index + offset)) # if we have 3 or fewer populated buckets get two random ids in the range of each to try and # populate/split the buckets further buckets_with_contacts = self.buckets_with_contacts() From e3ee3892b2dfdbc765714cbd9bf7f20ee5601313 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 22 Aug 2022 18:45:10 -0300 Subject: [PATCH 9/9] better test name --- tests/unit/dht/test_node.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/unit/dht/test_node.py b/tests/unit/dht/test_node.py index 051a2e6db..9d4e530c8 100644 --- a/tests/unit/dht/test_node.py +++ b/tests/unit/dht/test_node.py @@ -13,7 +13,8 @@ from lbry.extras.daemon.storage import SQLiteStorage class TestBootstrapNode(AsyncioTestCase): TIMEOUT = 10.0 # do not increase. Hitting a timeout is a real failure - async def test_it_adds_all(self): + + async def test_bootstrap_node_adds_all_peers(self): loop = asyncio.get_event_loop() loop.set_debug(False)