From b036961954e4f311d61bd60d7757a62784776f08 Mon Sep 17 00:00:00 2001 From: Jonathan Moody <103143855+moodyjon@users.noreply.github.com> Date: Mon, 11 Apr 2022 18:17:16 -0400 Subject: [PATCH 1/9] Tighten up IterativeFinder logic to respect max_records better, and wait after task cancel(). Also make IterativeFinder a proper AsyncGenerator. This gives it an offically recognized aclose() method and could help with clean finalization. --- lbry/dht/protocol/iterative_find.py | 99 ++++++++++++++++++++++------- 1 file changed, 76 insertions(+), 23 deletions(-) diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index ab89edddc..2b1e70e7a 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -1,6 +1,7 @@ import asyncio from itertools import chain from collections import defaultdict, OrderedDict +from collections.abc import AsyncGenerator import typing import logging from typing import TYPE_CHECKING @@ -71,7 +72,7 @@ def get_shortlist(routing_table: 'TreeRoutingTable', key: bytes, return shortlist or routing_table.find_close_peers(key) -class IterativeFinder: +class IterativeFinder(AsyncGenerator): def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes, max_results: typing.Optional[int] = constants.K, @@ -98,6 +99,8 @@ class IterativeFinder: self.iteration_count = 0 self.running = False self.tasks: typing.List[asyncio.Task] = [] + self.generator = None + for peer in get_shortlist(routing_table, key, shortlist): if peer.node_id: self._add_active(peer, force=True) @@ -163,12 +166,16 @@ class IterativeFinder: except asyncio.TimeoutError: self._reset_closest(peer) return + except asyncio.CancelledError: + log.debug("%s[%x] cancelled probe", + type(self).__name__, id(self)) + return except ValueError as err: log.warning(str(err)) self._reset_closest(peer) return except TransportNotConnected: - return self.aclose() + return self._aclose() except RemoteException: self._reset_closest(peer) return @@ -182,13 +189,17 @@ class IterativeFinder: added = 0 for index, peer in enumerate(self.active.keys()): if index == 0: - log.debug("closest to probe: %s", peer.node_id.hex()[:8]) + log.debug("%s[%x] closest to probe: %s", + type(self).__name__, id(self), + peer.node_id.hex()[:8]) if peer in self.contacted: continue if len(self.running_probes) >= constants.ALPHA: break if index > (constants.K + len(self.running_probes)): break + if self.iteration_count + self.iteration_queue.qsize() >= self.max_results: + break origin_address = (peer.address, peer.udp_port) if origin_address in self.exclude: continue @@ -198,9 +209,13 @@ class IterativeFinder: continue self._schedule_probe(peer) added += 1 - log.debug("running %d probes for key %s", len(self.running_probes), self.key.hex()[:8]) + log.debug("%s[%x] running %d probes for key %s", + type(self).__name__, id(self), + len(self.running_probes), self.key.hex()[:8]) if not added and not self.running_probes: - log.debug("search for %s exhausted", self.key.hex()[:8]) + log.debug("%s[%x] search for %s exhausted", + type(self).__name__, id(self), + self.key.hex()[:8]) self.search_exhausted() def _schedule_probe(self, peer: 'KademliaPeer'): @@ -217,38 +232,76 @@ class IterativeFinder: self.running_probes[peer] = t def _log_state(self): - log.debug("[%s] check result: %i active nodes %i contacted", - self.key.hex()[:8], len(self.active), len(self.contacted)) + log.debug("%s[%x] [%s] check result: %i active nodes %i contacted %i produced %i queued", + type(self).__name__, id(self), self.key.hex()[:8], + len(self.active), len(self.contacted), + self.iteration_count, self.iteration_queue.qsize()) + + async def _generator_func(self): + try: + while self.iteration_count < self.max_results: + if self.iteration_count == 0: + result = self.get_initial_result() or await self.iteration_queue.get() + else: + result = await self.iteration_queue.get() + if not result: + # no more results + await self._aclose(reason="no more results") + self.generator = None + return + self.iteration_count += 1 + yield result + # reached max_results limit + await self._aclose(reason="max_results reached") + self.generator = None + return + except asyncio.CancelledError: + await self._aclose(reason="cancelled") + self.generator = None + raise + except GeneratorExit: + await self._aclose(reason="generator exit") + self.generator = None + raise def __aiter__(self): if self.running: raise Exception("already running") self.running = True + self.generator = self._generator_func() self.loop.call_soon(self._search_round) - return self + return super().__aiter__() async def __anext__(self) -> typing.List['KademliaPeer']: - try: - if self.iteration_count == 0: - result = self.get_initial_result() or await self.iteration_queue.get() - else: - result = await self.iteration_queue.get() - if not result: - raise StopAsyncIteration - self.iteration_count += 1 - return result - except (asyncio.CancelledError, StopAsyncIteration): - self.loop.call_soon(self.aclose) - raise + return await super().__anext__() - def aclose(self): + async def asend(self, val): + return await self.generator.asend(val) + + async def athrow(self, typ, val=None, tb=None): + return await self.generator.athrow(typ, val, tb) + + async def _aclose(self, reason="?"): self.running = False - self.iteration_queue.put_nowait(None) - for task in chain(self.tasks, self.running_probes.values()): + running_tasks = list(chain(self.tasks, self.running_probes.values())) + for task in running_tasks: task.cancel() + if len(running_tasks): + await asyncio.wait(running_tasks, loop=self.loop) + log.debug("%s[%x] [%s] async close because %s: %i active nodes %i contacted %i produced %i queued", + type(self).__name__, id(self), self.key.hex()[:8], + reason, len(self.active), len(self.contacted), + self.iteration_count, self.iteration_queue.qsize()) self.tasks.clear() self.running_probes.clear() + async def aclose(self): + if self.generator: + await super().aclose() + self.generator = None + log.debug("%s[%x] [%s] async close completed", + type(self).__name__, id(self), self.key.hex()[:8]) + class IterativeNodeFinder(IterativeFinder): def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', From 82d7f81f417998dab788483c60a8ba26cd3616de Mon Sep 17 00:00:00 2001 From: Jonathan Moody <103143855+moodyjon@users.noreply.github.com> Date: Tue, 12 Apr 2022 12:32:16 -0400 Subject: [PATCH 2/9] Correct call to _aclose() in response to TransportNotConnected. --- lbry/dht/protocol/iterative_find.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index 2b1e70e7a..6fa179de1 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -175,7 +175,8 @@ class IterativeFinder(AsyncGenerator): self._reset_closest(peer) return except TransportNotConnected: - return self._aclose() + await self._aclose(reason="not connected") + return except RemoteException: self._reset_closest(peer) return From 4767bb9dee881f10c16d1ff4de46e66c74475dab Mon Sep 17 00:00:00 2001 From: Jonathan Moody <103143855+moodyjon@users.noreply.github.com> Date: Tue, 12 Apr 2022 12:49:32 -0400 Subject: [PATCH 3/9] Wrap "async for" over IterativeXXXFinder in try/finally ensuring aclose(). --- lbry/dht/node.py | 71 ++++++++++++++++++++++++++---------------------- 1 file changed, 39 insertions(+), 32 deletions(-) diff --git a/lbry/dht/node.py b/lbry/dht/node.py index 864edc077..3f3c73776 100644 --- a/lbry/dht/node.py +++ b/lbry/dht/node.py @@ -217,9 +217,13 @@ class Node: shortlist: typing.Optional[typing.List['KademliaPeer']] = None ) -> typing.List['KademliaPeer']: peers = [] - async for iteration_peers in self.get_iterative_node_finder( - node_id, shortlist=shortlist, max_results=max_results): - peers.extend(iteration_peers) + node_finder = self.get_iterative_node_finder( + node_id, shortlist=shortlist, max_results=max_results) + try: + async for iteration_peers in node_finder: + peers.extend(iteration_peers) + finally: + await node_finder.aclose() distance = Distance(node_id) peers.sort(key=lambda peer: distance(peer.node_id)) return peers[:count] @@ -245,36 +249,39 @@ class Node: # prioritize peers who reply to a dht ping first # this minimizes attempting to make tcp connections that won't work later to dead or unreachable peers - - async for results in self.get_iterative_value_finder(bytes.fromhex(blob_hash)): - to_put = [] - for peer in results: - if peer.address == self.protocol.external_ip and self.protocol.peer_port == peer.tcp_port: - continue - is_good = self.protocol.peer_manager.peer_is_good(peer) - if is_good: - # the peer has replied recently over UDP, it can probably be reached on the TCP port - to_put.append(peer) - elif is_good is None: - if not peer.udp_port: - # TODO: use the same port for TCP and UDP - # the udp port must be guessed - # default to the ports being the same. if the TCP port appears to be <=0.48.0 default, - # including on a network with several nodes, then assume the udp port is proportionately - # based on a starting port of 4444 - udp_port_to_try = peer.tcp_port - if 3400 > peer.tcp_port > 3332: - udp_port_to_try = (peer.tcp_port - 3333) + 4444 - self.loop.create_task(put_into_result_queue_after_pong( - make_kademlia_peer(peer.node_id, peer.address, udp_port_to_try, peer.tcp_port) - )) + value_finder = self.get_iterative_value_finder(bytes.fromhex(blob_hash)) + try: + async for results in value_finder: + to_put = [] + for peer in results: + if peer.address == self.protocol.external_ip and self.protocol.peer_port == peer.tcp_port: + continue + is_good = self.protocol.peer_manager.peer_is_good(peer) + if is_good: + # the peer has replied recently over UDP, it can probably be reached on the TCP port + to_put.append(peer) + elif is_good is None: + if not peer.udp_port: + # TODO: use the same port for TCP and UDP + # the udp port must be guessed + # default to the ports being the same. if the TCP port appears to be <=0.48.0 default, + # including on a network with several nodes, then assume the udp port is proportionately + # based on a starting port of 4444 + udp_port_to_try = peer.tcp_port + if 3400 > peer.tcp_port > 3332: + udp_port_to_try = (peer.tcp_port - 3333) + 4444 + self.loop.create_task(put_into_result_queue_after_pong( + make_kademlia_peer(peer.node_id, peer.address, udp_port_to_try, peer.tcp_port) + )) + else: + self.loop.create_task(put_into_result_queue_after_pong(peer)) else: - self.loop.create_task(put_into_result_queue_after_pong(peer)) - else: - # the peer is known to be bad/unreachable, skip trying to connect to it over TCP - log.debug("skip bad peer %s:%i for %s", peer.address, peer.tcp_port, blob_hash) - if to_put: - result_queue.put_nowait(to_put) + # the peer is known to be bad/unreachable, skip trying to connect to it over TCP + log.debug("skip bad peer %s:%i for %s", peer.address, peer.tcp_port, blob_hash) + if to_put: + result_queue.put_nowait(to_put) + finally: + await value_finder.aclose() def accumulate_peers(self, search_queue: asyncio.Queue, peer_queue: typing.Optional[asyncio.Queue] = None From 5852fcd287dd1d5365036665aecdee12a2751a09 Mon Sep 17 00:00:00 2001 From: Jonathan Moody <103143855+moodyjon@users.noreply.github.com> Date: Mon, 18 Apr 2022 13:51:29 -0400 Subject: [PATCH 4/9] Don't wait on running_tasks after cancel(). Sometimes a CancelledError exception is received, which is unhelpful, and complicates shutting down the generator. --- lbry/dht/protocol/iterative_find.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index 6fa179de1..7c7741902 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -287,8 +287,6 @@ class IterativeFinder(AsyncGenerator): running_tasks = list(chain(self.tasks, self.running_probes.values())) for task in running_tasks: task.cancel() - if len(running_tasks): - await asyncio.wait(running_tasks, loop=self.loop) log.debug("%s[%x] [%s] async close because %s: %i active nodes %i contacted %i produced %i queued", type(self).__name__, id(self), self.key.hex()[:8], reason, len(self.active), len(self.contacted), From 91a6eae83178738a0e89830bc69cc5ab836549ef Mon Sep 17 00:00:00 2001 From: Jonathan Moody <103143855+moodyjon@users.noreply.github.com> Date: Tue, 26 Apr 2022 14:05:58 -0400 Subject: [PATCH 5/9] Fix lint issue in iterative_find.py. --- lbry/dht/protocol/iterative_find.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index 7c7741902..ab4a66e3d 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -276,8 +276,8 @@ class IterativeFinder(AsyncGenerator): async def __anext__(self) -> typing.List['KademliaPeer']: return await super().__anext__() - async def asend(self, val): - return await self.generator.asend(val) + async def asend(self, value): + return await self.generator.asend(value) async def athrow(self, typ, val=None, tb=None): return await self.generator.athrow(typ, val, tb) From fe07aac79c730d0fd94d20ca4338a6e72aa67991 Mon Sep 17 00:00:00 2001 From: Jonathan Moody <103143855+moodyjon@users.noreply.github.com> Date: Fri, 29 Apr 2022 08:36:14 -0400 Subject: [PATCH 6/9] Define and use lbry.utils.aclosing() in lieu of official contextlib.aclosing(). --- lbry/dht/node.py | 14 ++++---------- lbry/utils.py | 6 ++++++ 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/lbry/dht/node.py b/lbry/dht/node.py index 3f3c73776..2bb6edfcd 100644 --- a/lbry/dht/node.py +++ b/lbry/dht/node.py @@ -5,7 +5,7 @@ import socket from prometheus_client import Gauge -from lbry.utils import resolve_host +from lbry.utils import aclosing, resolve_host from lbry.dht import constants from lbry.dht.peer import make_kademlia_peer from lbry.dht.protocol.distance import Distance @@ -217,13 +217,10 @@ class Node: shortlist: typing.Optional[typing.List['KademliaPeer']] = None ) -> typing.List['KademliaPeer']: peers = [] - node_finder = self.get_iterative_node_finder( - node_id, shortlist=shortlist, max_results=max_results) - try: + async with aclosing(self.get_iterative_node_finder( + node_id, shortlist=shortlist, max_results=max_results)) as node_finder: async for iteration_peers in node_finder: peers.extend(iteration_peers) - finally: - await node_finder.aclose() distance = Distance(node_id) peers.sort(key=lambda peer: distance(peer.node_id)) return peers[:count] @@ -249,8 +246,7 @@ class Node: # prioritize peers who reply to a dht ping first # this minimizes attempting to make tcp connections that won't work later to dead or unreachable peers - value_finder = self.get_iterative_value_finder(bytes.fromhex(blob_hash)) - try: + async with aclosing(self.get_iterative_value_finder(bytes.fromhex(blob_hash))) as value_finder: async for results in value_finder: to_put = [] for peer in results: @@ -280,8 +276,6 @@ class Node: log.debug("skip bad peer %s:%i for %s", peer.address, peer.tcp_port, blob_hash) if to_put: result_queue.put_nowait(to_put) - finally: - await value_finder.aclose() def accumulate_peers(self, search_queue: asyncio.Queue, peer_queue: typing.Optional[asyncio.Queue] = None diff --git a/lbry/utils.py b/lbry/utils.py index 7a92ccc6a..cebba675e 100644 --- a/lbry/utils.py +++ b/lbry/utils.py @@ -130,6 +130,12 @@ def get_sd_hash(stream_info): def json_dumps_pretty(obj, **kwargs): return json.dumps(obj, sort_keys=True, indent=2, separators=(',', ': '), **kwargs) +@contextlib.asynccontextmanager +async def aclosing(thing): + try: + yield thing + finally: + await thing.aclose() def async_timed_cache(duration: int): def wrapper(func): From fad84c771cfde67cf29f54642f2634968276ff4b Mon Sep 17 00:00:00 2001 From: Jonathan Moody <103143855+moodyjon@users.noreply.github.com> Date: Fri, 29 Apr 2022 08:39:31 -0400 Subject: [PATCH 7/9] Support official contextlib.aclosing() when it's available. --- lbry/utils.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/lbry/utils.py b/lbry/utils.py index cebba675e..75f6c7a58 100644 --- a/lbry/utils.py +++ b/lbry/utils.py @@ -130,12 +130,16 @@ def get_sd_hash(stream_info): def json_dumps_pretty(obj, **kwargs): return json.dumps(obj, sort_keys=True, indent=2, separators=(',', ': '), **kwargs) -@contextlib.asynccontextmanager -async def aclosing(thing): - try: - yield thing - finally: - await thing.aclose() +try: + # the standard contextlib.aclosing() is available in 3.10+ + from contextlib import aclosing +except ImportError: + @contextlib.asynccontextmanager + async def aclosing(thing): + try: + yield thing + finally: + await thing.aclose() def async_timed_cache(duration: int): def wrapper(func): From 530f9c72ea79ee50d1204ba2eda61109e9e1bbc7 Mon Sep 17 00:00:00 2001 From: Jonathan Moody <103143855+moodyjon@users.noreply.github.com> Date: Fri, 29 Apr 2022 15:39:43 -0400 Subject: [PATCH 8/9] Fix lint error lbry/utils.py --- lbry/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbry/utils.py b/lbry/utils.py index 75f6c7a58..08b445e1f 100644 --- a/lbry/utils.py +++ b/lbry/utils.py @@ -132,7 +132,7 @@ def json_dumps_pretty(obj, **kwargs): try: # the standard contextlib.aclosing() is available in 3.10+ - from contextlib import aclosing + from contextlib import aclosing # pylint: disable=unused-import except ImportError: @contextlib.asynccontextmanager async def aclosing(thing): From e5e9873f79d078ff46fb07718e9e2c7c82392fd4 Mon Sep 17 00:00:00 2001 From: Jonathan Moody <103143855+moodyjon@users.noreply.github.com> Date: Tue, 3 May 2022 16:41:32 -0400 Subject: [PATCH 9/9] Simplify by eliminating AsyncGenerator base and generator function. Remove any new places enforcing max_results. --- lbry/dht/protocol/iterative_find.py | 86 ++++++++++------------------- 1 file changed, 30 insertions(+), 56 deletions(-) diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index ab4a66e3d..9a557ef51 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -1,7 +1,7 @@ import asyncio from itertools import chain from collections import defaultdict, OrderedDict -from collections.abc import AsyncGenerator +from collections.abc import AsyncIterator import typing import logging from typing import TYPE_CHECKING @@ -72,7 +72,7 @@ def get_shortlist(routing_table: 'TreeRoutingTable', key: bytes, return shortlist or routing_table.find_close_peers(key) -class IterativeFinder(AsyncGenerator): +class IterativeFinder(AsyncIterator): def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes, max_results: typing.Optional[int] = constants.K, @@ -99,8 +99,6 @@ class IterativeFinder(AsyncGenerator): self.iteration_count = 0 self.running = False self.tasks: typing.List[asyncio.Task] = [] - self.generator = None - for peer in get_shortlist(routing_table, key, shortlist): if peer.node_id: self._add_active(peer, force=True) @@ -154,7 +152,7 @@ class IterativeFinder(AsyncGenerator): log.warning("misbehaving peer %s:%i returned peer with reserved ip %s:%i", peer.address, peer.udp_port, address, udp_port) self.check_result_ready(response) - self._log_state() + self._log_state(reason="check result") def _reset_closest(self, peer): if peer in self.active: @@ -169,7 +167,7 @@ class IterativeFinder(AsyncGenerator): except asyncio.CancelledError: log.debug("%s[%x] cancelled probe", type(self).__name__, id(self)) - return + raise except ValueError as err: log.warning(str(err)) self._reset_closest(peer) @@ -199,8 +197,6 @@ class IterativeFinder(AsyncGenerator): break if index > (constants.K + len(self.running_probes)): break - if self.iteration_count + self.iteration_queue.qsize() >= self.max_results: - break origin_address = (peer.address, peer.udp_port) if origin_address in self.exclude: continue @@ -232,76 +228,54 @@ class IterativeFinder(AsyncGenerator): t.add_done_callback(callback) self.running_probes[peer] = t - def _log_state(self): - log.debug("%s[%x] [%s] check result: %i active nodes %i contacted %i produced %i queued", + def _log_state(self, reason="?"): + log.debug("%s[%x] [%s] %s: %i active nodes %i contacted %i produced %i queued", type(self).__name__, id(self), self.key.hex()[:8], - len(self.active), len(self.contacted), + reason, len(self.active), len(self.contacted), self.iteration_count, self.iteration_queue.qsize()) - async def _generator_func(self): - try: - while self.iteration_count < self.max_results: - if self.iteration_count == 0: - result = self.get_initial_result() or await self.iteration_queue.get() - else: - result = await self.iteration_queue.get() - if not result: - # no more results - await self._aclose(reason="no more results") - self.generator = None - return - self.iteration_count += 1 - yield result - # reached max_results limit - await self._aclose(reason="max_results reached") - self.generator = None - return - except asyncio.CancelledError: - await self._aclose(reason="cancelled") - self.generator = None - raise - except GeneratorExit: - await self._aclose(reason="generator exit") - self.generator = None - raise - def __aiter__(self): if self.running: raise Exception("already running") self.running = True - self.generator = self._generator_func() self.loop.call_soon(self._search_round) - return super().__aiter__() + return self async def __anext__(self) -> typing.List['KademliaPeer']: - return await super().__anext__() - - async def asend(self, value): - return await self.generator.asend(value) - - async def athrow(self, typ, val=None, tb=None): - return await self.generator.athrow(typ, val, tb) + try: + if self.iteration_count == 0: + result = self.get_initial_result() or await self.iteration_queue.get() + else: + result = await self.iteration_queue.get() + if not result: + raise StopAsyncIteration + self.iteration_count += 1 + return result + except asyncio.CancelledError: + await self._aclose(reason="cancelled") + raise + except StopAsyncIteration: + await self._aclose(reason="no more results") + raise async def _aclose(self, reason="?"): - self.running = False - running_tasks = list(chain(self.tasks, self.running_probes.values())) - for task in running_tasks: - task.cancel() - log.debug("%s[%x] [%s] async close because %s: %i active nodes %i contacted %i produced %i queued", + log.debug("%s[%x] [%s] shutdown because %s: %i active nodes %i contacted %i produced %i queued", type(self).__name__, id(self), self.key.hex()[:8], reason, len(self.active), len(self.contacted), self.iteration_count, self.iteration_queue.qsize()) + self.running = False + self.iteration_queue.put_nowait(None) + for task in chain(self.tasks, self.running_probes.values()): + task.cancel() self.tasks.clear() self.running_probes.clear() async def aclose(self): - if self.generator: - await super().aclose() - self.generator = None + if self.running: + await self._aclose(reason="aclose") log.debug("%s[%x] [%s] async close completed", type(self).__name__, id(self), self.key.hex()[:8]) - class IterativeNodeFinder(IterativeFinder): def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,