From 4767bb9dee881f10c16d1ff4de46e66c74475dab Mon Sep 17 00:00:00 2001 From: Jonathan Moody <103143855+moodyjon@users.noreply.github.com> Date: Tue, 12 Apr 2022 12:49:32 -0400 Subject: [PATCH] Wrap "async for" over IterativeXXXFinder in try/finally ensuring aclose(). --- lbry/dht/node.py | 71 ++++++++++++++++++++++++++---------------------- 1 file changed, 39 insertions(+), 32 deletions(-) diff --git a/lbry/dht/node.py b/lbry/dht/node.py index 864edc077..3f3c73776 100644 --- a/lbry/dht/node.py +++ b/lbry/dht/node.py @@ -217,9 +217,13 @@ class Node: shortlist: typing.Optional[typing.List['KademliaPeer']] = None ) -> typing.List['KademliaPeer']: peers = [] - async for iteration_peers in self.get_iterative_node_finder( - node_id, shortlist=shortlist, max_results=max_results): - peers.extend(iteration_peers) + node_finder = self.get_iterative_node_finder( + node_id, shortlist=shortlist, max_results=max_results) + try: + async for iteration_peers in node_finder: + peers.extend(iteration_peers) + finally: + await node_finder.aclose() distance = Distance(node_id) peers.sort(key=lambda peer: distance(peer.node_id)) return peers[:count] @@ -245,36 +249,39 @@ class Node: # prioritize peers who reply to a dht ping first # this minimizes attempting to make tcp connections that won't work later to dead or unreachable peers - - async for results in self.get_iterative_value_finder(bytes.fromhex(blob_hash)): - to_put = [] - for peer in results: - if peer.address == self.protocol.external_ip and self.protocol.peer_port == peer.tcp_port: - continue - is_good = self.protocol.peer_manager.peer_is_good(peer) - if is_good: - # the peer has replied recently over UDP, it can probably be reached on the TCP port - to_put.append(peer) - elif is_good is None: - if not peer.udp_port: - # TODO: use the same port for TCP and UDP - # the udp port must be guessed - # default to the ports being the same. if the TCP port appears to be <=0.48.0 default, - # including on a network with several nodes, then assume the udp port is proportionately - # based on a starting port of 4444 - udp_port_to_try = peer.tcp_port - if 3400 > peer.tcp_port > 3332: - udp_port_to_try = (peer.tcp_port - 3333) + 4444 - self.loop.create_task(put_into_result_queue_after_pong( - make_kademlia_peer(peer.node_id, peer.address, udp_port_to_try, peer.tcp_port) - )) + value_finder = self.get_iterative_value_finder(bytes.fromhex(blob_hash)) + try: + async for results in value_finder: + to_put = [] + for peer in results: + if peer.address == self.protocol.external_ip and self.protocol.peer_port == peer.tcp_port: + continue + is_good = self.protocol.peer_manager.peer_is_good(peer) + if is_good: + # the peer has replied recently over UDP, it can probably be reached on the TCP port + to_put.append(peer) + elif is_good is None: + if not peer.udp_port: + # TODO: use the same port for TCP and UDP + # the udp port must be guessed + # default to the ports being the same. if the TCP port appears to be <=0.48.0 default, + # including on a network with several nodes, then assume the udp port is proportionately + # based on a starting port of 4444 + udp_port_to_try = peer.tcp_port + if 3400 > peer.tcp_port > 3332: + udp_port_to_try = (peer.tcp_port - 3333) + 4444 + self.loop.create_task(put_into_result_queue_after_pong( + make_kademlia_peer(peer.node_id, peer.address, udp_port_to_try, peer.tcp_port) + )) + else: + self.loop.create_task(put_into_result_queue_after_pong(peer)) else: - self.loop.create_task(put_into_result_queue_after_pong(peer)) - else: - # the peer is known to be bad/unreachable, skip trying to connect to it over TCP - log.debug("skip bad peer %s:%i for %s", peer.address, peer.tcp_port, blob_hash) - if to_put: - result_queue.put_nowait(to_put) + # the peer is known to be bad/unreachable, skip trying to connect to it over TCP + log.debug("skip bad peer %s:%i for %s", peer.address, peer.tcp_port, blob_hash) + if to_put: + result_queue.put_nowait(to_put) + finally: + await value_finder.aclose() def accumulate_peers(self, search_queue: asyncio.Queue, peer_queue: typing.Optional[asyncio.Queue] = None