From 0d6125de0ba2f052e9b845d323502acb77123143 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 16 Jul 2022 00:43:45 -0300 Subject: [PATCH] add sd_hash prober --- lbry/dht/peer.py | 7 +- lbry/dht/protocol/iterative_find.py | 24 +++--- lbry/extras/daemon/client.py | 2 +- scripts/dht_crawler.py | 110 ++++++++++++++++++++++++---- 4 files changed, 115 insertions(+), 28 deletions(-) diff --git a/lbry/dht/peer.py b/lbry/dht/peer.py index 8fef68181..db4635447 100644 --- a/lbry/dht/peer.py +++ b/lbry/dht/peer.py @@ -153,9 +153,10 @@ class PeerManager: def peer_is_good(self, peer: 'KademliaPeer'): return self.contact_triple_is_good(peer.node_id, peer.address, peer.udp_port) - def decode_tcp_peer_from_compact_address(self, compact_address: bytes) -> 'KademliaPeer': # pylint: disable=no-self-use - node_id, address, tcp_port = decode_compact_address(compact_address) - return make_kademlia_peer(node_id, address, udp_port=None, tcp_port=tcp_port) + +def decode_tcp_peer_from_compact_address(compact_address: bytes) -> 'KademliaPeer': # pylint: disable=no-self-use + node_id, address, tcp_port = decode_compact_address(compact_address) + return make_kademlia_peer(node_id, address, udp_port=None, tcp_port=tcp_port) @dataclass(unsafe_hash=True) diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index d41b3b2a0..b9678ea1e 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -8,7 +8,7 @@ from typing import TYPE_CHECKING from lbry.dht import constants from lbry.dht.error import RemoteException, TransportNotConnected from lbry.dht.protocol.distance import Distance -from lbry.dht.peer import make_kademlia_peer +from lbry.dht.peer import make_kademlia_peer, decode_tcp_peer_from_compact_address from lbry.dht.serialization.datagram import PAGE_KEY if TYPE_CHECKING: @@ -26,6 +26,15 @@ class FindResponse: def get_close_triples(self) -> typing.List[typing.Tuple[bytes, str, int]]: raise NotImplementedError() + def get_close_kademlia_peers(self, peer_info) -> typing.Generator[typing.Iterator['KademliaPeer'], None, None]: + for contact_triple in self.get_close_triples(): + node_id, address, udp_port = contact_triple + try: + yield make_kademlia_peer(node_id, address, udp_port) + except ValueError: + log.warning("misbehaving peer %s:%i returned peer with reserved ip %s:%i", peer_info.address, + peer_info.udp_port, address, udp_port) + class FindNodeResponse(FindResponse): def __init__(self, key: bytes, close_triples: typing.List[typing.Tuple[bytes, str, int]]): @@ -125,13 +134,8 @@ class IterativeFinder(AsyncIterator): async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse): self._add_active(peer) - for contact_triple in response.get_close_triples(): - node_id, address, udp_port = contact_triple - try: - self._add_active(make_kademlia_peer(node_id, address, udp_port)) - except ValueError: - log.warning("misbehaving peer %s:%i returned peer with reserved ip %s:%i", peer.address, - peer.udp_port, address, udp_port) + for new_peer in response.get_close_kademlia_peers(peer): + self._add_active(new_peer) self.check_result_ready(response) self._log_state(reason="check result") @@ -319,7 +323,7 @@ class IterativeValueFinder(IterativeFinder): decoded_peers = set() for compact_addr in parsed.found_compact_addresses: try: - decoded_peers.add(self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr)) + decoded_peers.add(decode_tcp_peer_from_compact_address(compact_addr)) except ValueError: log.warning("misbehaving peer %s:%i returned invalid peer for blob", peer.address, peer.udp_port) @@ -341,7 +345,7 @@ class IterativeValueFinder(IterativeFinder): def check_result_ready(self, response: FindValueResponse): if response.found: - blob_peers = [self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr) + blob_peers = [decode_tcp_peer_from_compact_address(compact_addr) for compact_addr in response.found_compact_addresses] to_yield = [] for blob_peer in blob_peers: diff --git a/lbry/extras/daemon/client.py b/lbry/extras/daemon/client.py index 7f0997320..9e9b1694a 100644 --- a/lbry/extras/daemon/client.py +++ b/lbry/extras/daemon/client.py @@ -1,5 +1,5 @@ -from lbry.conf import Config from lbry.extras.cli import execute_command +from lbry.conf import Config def daemon_rpc(conf: Config, method: str, **kwargs): diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index 7a76e1639..3aa872bcc 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -1,18 +1,21 @@ import datetime import logging import asyncio +import os.path +import random import time import typing from dataclasses import dataclass, astuple, replace from aiohttp import web -from prometheus_client import Gauge, generate_latest as prom_generate_latest +from prometheus_client import Gauge, generate_latest as prom_generate_latest, Counter import lbry.dht.error from lbry.dht.constants import generate_id from lbry.dht.node import Node -from lbry.dht.peer import make_kademlia_peer, PeerManager +from lbry.dht.peer import make_kademlia_peer, PeerManager, decode_tcp_peer_from_compact_address from lbry.dht.protocol.distance import Distance +from lbry.dht.protocol.iterative_find import FindValueResponse, FindNodeResponse, FindResponse from lbry.extras.daemon.storage import SQLiteMixin from lbry.conf import Config from lbry.utils import resolve_host @@ -22,6 +25,19 @@ logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(na log = logging.getLogger(__name__) +class SDHashSamples: + def __init__(self, samples_file_path): + with open(samples_file_path, "rb") as sample_file: + self._samples = sample_file.read() + assert len(self._samples) % 48 == 0 + self.size = len(self._samples) // 48 + + def read_samples(self, count=1): + for _ in range(count): + offset = 48 * random.randrange(0, self.size) + yield self._samples[offset:offset + 48] + + class PeerStorage(SQLiteMixin): CREATE_TABLES_QUERY = """ PRAGMA JOURNAL_MODE=WAL; @@ -139,11 +155,25 @@ class Crawler: "host_latency", "Time spent on the last request, in nanoseconds.", namespace="dht_crawler_node", labelnames=("host", "port") ) + probed_streams_metric = Counter( + "probed_streams", "Amount of streams probed.", namespace="dht_crawler_node", + labelnames=("sd_hash",) + ) + announced_streams_metric = Counter( + "announced_streams", "Amount of streams where announcements were found.", namespace="dht_crawler_node", + labelnames=("sd_hash",) + ) + working_streams_metric = Counter( + "working_streams", "Amount of streams with reachable hosts.", namespace="dht_crawler_node", + labelnames=("sd_hash",) + ) - def __init__(self, db_path: str): + def __init__(self, db_path: str, sd_hash_samples: SDHashSamples): self.node = new_node() self.db = PeerStorage(db_path) + self.sd_hashes = sd_hash_samples self._memory_peers = {} + self._reachable_by_node_id = {} self._connections = {} async def open(self): @@ -151,6 +181,46 @@ class Crawler: self._memory_peers = { (peer.address, peer.udp_port): peer for peer in await self.db.all_peers() } + self.refresh_reachable_set() + + def refresh_reachable_set(self): + self._reachable_by_node_id = { + bytes.fromhex(peer.node_id): peer for peer in self._memory_peers.values() if (peer.latency or 0) > 0 + } + + async def probe_files(self): + if not self.sd_hashes: + return + while True: + for sd_hash in self.sd_hashes.read_samples(10_000): + self.refresh_reachable_set() + print(sd_hash.hex()) + distance = Distance(sd_hash) + node_ids = list(self._reachable_by_node_id.keys()) + node_ids.sort(key=lambda node_id: distance(node_id)) + k_closest = [self._reachable_by_node_id[node_id] for node_id in node_ids[:8]] + for response in asyncio.as_completed( + [self.request_peers(peer.address, peer.udp_port, peer.node_id, sd_hash) for peer in k_closest]): + response = await response + self.probed_streams_metric.labels(sd_hash).inc() + if response and response.found: + self.announced_streams_metric.labels(sd_hash).inc() + blob_peers = [decode_tcp_peer_from_compact_address(compact_addr) + for compact_addr in response.found_compact_addresses] + print('FOUND', blob_peers, response.pages) + for blob_peer in blob_peers: + response = await self.request_peers(blob_peer.address, blob_peer.tcp_port, blob_peer.node_id, sd_hash) + if response: + self.working_streams_metric.labels(sd_hash).inc() + print('ALIVE', blob_peer.address) + if response.found: + blob_peers = [decode_tcp_peer_from_compact_address(compact_addr) + for compact_addr in response.found_compact_addresses] + print('REPLIED+FOUND', blob_peers, response.pages) + else: + print('DEAD', blob_peer.address, blob_peer.tcp_port) + else: + print('NOT FOUND', response) @property def refresh_limit(self): @@ -206,7 +276,10 @@ class Crawler: def set_latency(self, peer, latency=None): if latency: self.host_latency_metric.labels(host=peer.address, port=peer.udp_port).set(latency) - db_peer = replace(self.get_from_peer(peer), latency=latency) + db_peer = self.get_from_peer(peer) + if not db_peer: + return + db_peer = replace(db_peer, latency=latency) if not db_peer.node_id and peer.node_id: db_peer = replace(db_peer, node_id=peer.node_id.hex()) if db_peer.first_online and latency is None: @@ -224,16 +297,22 @@ class Crawler: self._connections[self.get_from_peer(peer).peer_id] = [ self.get_from_peer(other_peer).peer_id for other_peer in other_peers] - async def request_peers(self, host, port, key) -> typing.List['KademliaPeer']: + async def request_peers(self, host, port, node_id, key=None) -> typing.Optional[FindResponse]: + key = key or node_id peer = make_kademlia_peer(key, await resolve_host(host, port, 'udp'), port) for attempt in range(3): try: req_start = time.perf_counter_ns() - response = await self.node.protocol.get_rpc_peer(peer).find_node(key) + if key == node_id: + response = await self.node.protocol.get_rpc_peer(peer).find_node(key) + response = FindNodeResponse(key, response) + else: + response = await self.node.protocol.get_rpc_peer(peer).find_value(key) + response = FindValueResponse(key, response) await asyncio.sleep(0.05) latency = time.perf_counter_ns() - req_start self.set_latency(peer, latency) - return [make_kademlia_peer(*peer_tuple) for peer_tuple in response] + return response except asyncio.TimeoutError: self.set_latency(peer, None) continue @@ -243,11 +322,10 @@ class Crawler: self.inc_errors(peer) self.set_latency(peer, None) continue - return [] async def crawl_routing_table(self, host, port, node_id=None): start = time.time() - log.info("querying %s:%d", host, port) + log.debug("querying %s:%d", host, port) address = await resolve_host(host, port, 'udp') key = node_id or self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port) peer = make_kademlia_peer(key, address, port) @@ -278,7 +356,8 @@ class Crawler: peers = set() factor = 2048 for i in range(200): - new_peers = await self.request_peers(address, port, key) + response = await self.request_peers(address, port, key) + new_peers = list(response.get_close_kademlia_peers(peer)) if response else None if not new_peers: break new_peers.sort(key=lambda peer: distance(peer.node_id)) @@ -298,8 +377,9 @@ class Crawler: else: key = far_key factor = 2048 - log.info("Done querying %s:%d in %.2f seconds: %d peers found over %d requests.", - host, port, (time.time() - start), len(peers), i) + if peers: + log.info("Done querying %s:%d in %.2f seconds: %d peers found over %d requests.", + host, port, (time.time() - start), len(peers), i) self.add_peers(*peers) if peers: self.connections_found_metric.labels(host=host, port=port).set(len(peers)) @@ -384,14 +464,16 @@ async def test(): asyncio.get_event_loop().set_debug(True) metrics = SimpleMetrics('8080') await metrics.start() - crawler = Crawler("/tmp/a.db") + conf = Config() + hosting_samples = SDHashSamples("test.sample") if os.path.isfile("test.sample") else None + crawler = Crawler("/tmp/a.db", hosting_samples) await crawler.open() await crawler.flush_to_db() await crawler.node.start_listening() - conf = Config() if crawler.active_peers_count < 100: for (host, port) in conf.known_dht_nodes: await crawler.crawl_routing_table(host, port) + probe_task = asyncio.ensure_future(crawler.probe_files()) await crawler.process() if __name__ == '__main__':