From fcda9500fe2beee8712af0e0acdb59f960452333 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 30 Jan 2019 14:53:39 -0500 Subject: [PATCH] add accumulate_peers to Node simplifies collecting peers during a download by giving a queue of hashes to search for and a queue or peers as they are found --- lbrynet/dht/node.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index fcd4afe8f..9d890bab6 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -236,3 +236,19 @@ class Node: distance = Distance(node_id) accumulated.sort(key=lambda peer: distance(peer.node_id)) return accumulated[:count] + + async def _accumulate_search_junction(self, search_queue: asyncio.Queue, + result_queue: asyncio.Queue): + try: + async with self.stream_peer_search_junction(search_queue) as search_junction: + async for peers in search_junction: + if peers: + result_queue.put_nowait(peers) + except asyncio.CancelledError: + return + + def accumulate_peers(self, search_queue: asyncio.Queue, + peer_queue: typing.Optional[asyncio.Queue] = None) -> typing.Tuple[ + asyncio.Queue, asyncio.Task]: + q = peer_queue or asyncio.Queue() + return q, asyncio.create_task(self._accumulate_search_junction(search_queue, q))