From be0637843766b4c6118da9af2bb5c49019418a45 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 10 Jun 2022 17:15:13 -0300 Subject: [PATCH 01/45] add method for getting the node_id from a known peer on peer manager --- lbry/dht/peer.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lbry/dht/peer.py b/lbry/dht/peer.py index c5a9c9e84..8fef68181 100644 --- a/lbry/dht/peer.py +++ b/lbry/dht/peer.py @@ -100,6 +100,9 @@ class PeerManager: self._node_id_reverse_mapping[node_id] = (address, udp_port) self.peer_manager_keys_metric.labels("global").set(self.count_cache_keys()) + def get_node_id_for_endpoint(self, address, port): + return self._node_id_mapping.get((address, port)) + def prune(self): # TODO: periodically call this now = self._loop.time() to_pop = [] From 2361e345412e537fbf2b4fda89f9bbac84e7616f Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 10 Jun 2022 17:16:20 -0300 Subject: [PATCH 02/45] dht crawler, initial version --- scripts/dht_crawler.py | 130 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 130 insertions(+) create mode 100644 scripts/dht_crawler.py diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py new file mode 100644 index 000000000..8ab620385 --- /dev/null +++ b/scripts/dht_crawler.py @@ -0,0 +1,130 @@ +import logging +import asyncio +import time +import typing + +import lbry.dht.error +from lbry.dht.constants import generate_id +from lbry.dht.node import Node +from lbry.dht.peer import make_kademlia_peer, PeerManager +from lbry.dht.protocol.distance import Distance +from lbry.extras.daemon.storage import SQLiteStorage +from lbry.conf import Config +from lbry.utils import resolve_host + + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s") +log = logging.getLogger(__name__) + + +def new_node(address="0.0.0.0", udp_port=4444, node_id=None): + node_id = node_id or generate_id() + loop = asyncio.get_event_loop() + return Node(loop, PeerManager(loop), node_id, udp_port, udp_port, 3333, address) + + +class Crawler: + def __init__(self): + self.node = new_node() + self.crawled = set() + self.known_peers = set() + self.unreachable = set() + self.error = set() + self.semaphore = asyncio.Semaphore(10) + + async def request_peers(self, host, port, key) -> typing.List['KademliaPeer']: + async with self.semaphore: + peer = make_kademlia_peer(None, await resolve_host(host, port, 'udp'), port) + for attempt in range(3): + try: + response = await self.node.protocol.get_rpc_peer(peer).find_node(key) + return [make_kademlia_peer(*peer_tuple) for peer_tuple in response] + except asyncio.TimeoutError: + log.info('Previously responding peer timed out: %s:%d attempt #%d', host, port, (attempt + 1)) + continue + except lbry.dht.error.RemoteException as e: + log.info('Previously responding peer errored: %s:%d attempt #%d - %s', + host, port, (attempt + 1), str(e)) + self.error.add((host, port)) + continue + return [] + + async def crawl_routing_table(self, host, port): + start = time.time() + log.info("querying %s:%d", host, port) + self.known_peers.add((host, port)) + self.crawled.add((host, port)) + address = await resolve_host(host, port, 'udp') + key = self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port) + if not key: + for _ in range(3): + try: + async with self.semaphore: + await self.node.protocol.get_rpc_peer(make_kademlia_peer(None, address, port)).ping() + key = self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port) + except asyncio.TimeoutError: + pass + except lbry.dht.error.RemoteException: + self.error.add((host, port)) + if not key: + self.unreachable.add((host, port)) + return set() + node_id = key + distance = Distance(key) + max_distance = int.from_bytes(bytes([0xff] * 48), 'big') + peers = set() + factor = 2048 + for i in range(200): + #print(i, len(peers), key.hex(), host) + new_peers = await self.request_peers(address, port, key) + if not new_peers: + break + new_peers.sort(key=lambda peer: distance(peer.node_id)) + peers.update(new_peers) + far_key = new_peers[-1].node_id + if distance(far_key) <= distance(key): + current_distance = distance(key) + next_jump = current_distance + int(max_distance // factor) # jump closer + factor /= 2 + if factor > 8 and next_jump < max_distance: + key = int.from_bytes(node_id, 'big') ^ next_jump + if key.bit_length() > 384: + break + key = key.to_bytes(48, 'big') + else: + break + else: + key = far_key + factor = 2048 + log.info("Done querying %s:%d in %.2f seconds: %d peers found over %d requests.", + host, port, (time.time() - start), len(peers), i) + self.crawled.update(peers) + return peers + + async def process(self): + to_process = {} + + def submit(_peer): + f = asyncio.ensure_future(self.crawl_routing_table(_peer.address, peer.udp_port)) + to_process[_peer] = f + f.add_done_callback(lambda _: to_process.pop(_peer)) + + while to_process or len(self.known_peers) < len(self.crawled): + log.info("%d known, %d unreachable, %d error.. %d processing", + len(self.known_peers), len(self.unreachable), len(self.error), len(to_process)) + for peer in self.crawled.difference(self.known_peers): + self.known_peers.add(peer) + submit(peer) + await asyncio.wait(to_process.values(), return_when=asyncio.FIRST_COMPLETED) + + +async def test(): + crawler = Crawler() + await crawler.node.start_listening() + conf = Config() + for (host, port) in conf.known_dht_nodes: + await crawler.crawl_routing_table(host, port) + await crawler.process() + +if __name__ == '__main__': + asyncio.run(test()) From 7ea88e7b31ecfa505d7908a5a0b5afe13138cdd1 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 11 Jun 2022 01:05:30 -0300 Subject: [PATCH 03/45] dht_crawler: store data --- scripts/dht_crawler.py | 189 ++++++++++++++++++++++++++++++++++------- 1 file changed, 156 insertions(+), 33 deletions(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index 8ab620385..1b55d486e 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -1,3 +1,4 @@ +import datetime import logging import asyncio import time @@ -8,13 +9,50 @@ from lbry.dht.constants import generate_id from lbry.dht.node import Node from lbry.dht.peer import make_kademlia_peer, PeerManager from lbry.dht.protocol.distance import Distance -from lbry.extras.daemon.storage import SQLiteStorage +from lbry.extras.daemon.storage import SQLiteMixin from lbry.conf import Config from lbry.utils import resolve_host +from sqlalchemy.orm import declarative_base, relationship +import sqlalchemy as sqla + + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s") log = logging.getLogger(__name__) +Base = declarative_base() + + +class DHTPeer(Base): + __tablename__ = "peer" + peer_id = sqla.Column(sqla.Integer(), sqla.Identity(), primary_key=True) + node_id = sqla.Column(sqla.String(96)) + address = sqla.Column(sqla.String()) + udp_port = sqla.Column(sqla.Integer()) + tcp_port = sqla.Column(sqla.Integer()) + first_online = sqla.Column(sqla.DateTime()) + errors = sqla.Column(sqla.Integer(), default=0) + last_churn = sqla.Column(sqla.Integer()) + added_on = sqla.Column(sqla.DateTime(), nullable=False, default=datetime.datetime.utcnow) + last_check = sqla.Column(sqla.DateTime()) + last_seen = sqla.Column(sqla.DateTime()) + latency = sqla.Column(sqla.Integer()) + endpoint_unique = sqla.UniqueConstraint("node_id", "udp_port") + + @classmethod + def from_kad_peer(cls, peer): + return DHTPeer(node_id=peer.node_id, address=peer.address, udp_port=peer.udp_port, tcp_port=peer.tcp_port) + + def to_kad_peer(self): + return make_kademlia_peer(self.node_id, self.address, self.udp_port, self.tcp_port) + + +class DHTConnection(Base): + __tablename__ = "connection" + from_peer_id = sqla.Column(sqla.Integer(), sqla.ForeignKey("peer.peer_id"), primary_key=True) + connected_by = relationship("DHTPeer", backref="known_by", primaryjoin=(DHTPeer.peer_id == from_peer_id)) + to_peer_id = sqla.Column(sqla.Integer(), sqla.ForeignKey("peer.peer_id"), primary_key=True) + connected_to = relationship("DHTPeer", backref="connections", primaryjoin=(DHTPeer.peer_id == to_peer_id)) def new_node(address="0.0.0.0", udp_port=4444, node_id=None): @@ -24,13 +62,85 @@ def new_node(address="0.0.0.0", udp_port=4444, node_id=None): class Crawler: - def __init__(self): + def __init__(self, db_path: str): self.node = new_node() - self.crawled = set() - self.known_peers = set() - self.unreachable = set() - self.error = set() - self.semaphore = asyncio.Semaphore(10) + self.semaphore = asyncio.Semaphore(20) + engine = sqla.create_engine(f"sqlite:///{db_path}") + Base.metadata.create_all(engine) + session = sqla.orm.sessionmaker(engine) + self.db = session() + + @property + def recent_peers_query(self): + half_hour_ago = datetime.datetime.utcnow() - datetime.timedelta(minutes=30) + return self.db.query(DHTPeer).filter(DHTPeer.last_seen > half_hour_ago) + + @property + def all_peers(self): + return set([peer.to_kad_peer() for peer in self.recent_peers_query.all()]) + + @property + def unreachable_peers_count(self): + return self.recent_peers_query.filter(DHTPeer.latency == None).count() + + @property + def peers_with_errors_count(self): + return self.recent_peers_query.filter(DHTPeer.errors > 0).count() + + def get_peers_needing_check(self): + half_hour_ago = datetime.datetime.utcnow() - datetime.timedelta(minutes=30) + return set([peer.to_kad_peer() for peer in self.recent_peers_query.filter( + sqla.or_(DHTPeer.last_check == None, DHTPeer.last_check < half_hour_ago)).all()]) + + def add_peers(self, *peers): + for peer in peers: + db_peer = self.get_from_peer(peer) + if db_peer and db_peer.node_id is None and peer.node_id: + db_peer.node_id = peer.node_id + self.db.add(db_peer) + elif not db_peer: + self.db.add(DHTPeer.from_kad_peer(peer)) + self.db.commit() + + def get_from_peer(self, peer): + return self.db.query(DHTPeer).filter(DHTPeer.address==peer.address, DHTPeer.udp_port==peer.udp_port).first() + + def set_latency(self, peer, latency=None): + db_peer = self.get_from_peer(peer) + db_peer.latency = latency + if not db_peer.node_id: + db_peer.node_id = peer.node_id + if db_peer.first_online and latency is None: + db_peer.last_churn = (datetime.datetime.utcnow() - db_peer.first_online).seconds + elif latency is not None and db_peer.first_online is None: + db_peer.first_online = datetime.datetime.utcnow() + db_peer.last_check = datetime.datetime.utcnow() + self.db.add(db_peer) + self.db.commit() + + def inc_errors(self, peer): + db_peer = self.get_from_peer(peer) + db_peer.errors += 1 + self.db.add(db_peer) + self.db.commit() + + def count_peers(self): + return self.db.query(DHTPeer).count() + + def associate_peers(self, target_peer, peers): + db_peer = self.get_from_peer(target_peer) + connections = [ + DHTConnection( + from_peer_id=db_peer.peer_id, + to_peer_id=self.get_from_peer(peer).peer_id) + for peer in peers + ] + for peer in peers: + self.db.query(DHTPeer).filter(DHTPeer.address == peer.address, DHTPeer.udp_port == peer.udp_port).update( + {DHTPeer.last_seen: datetime.datetime.utcnow()}) + self.db.query(DHTConnection).filter(DHTConnection.from_peer_id == db_peer.peer_id).delete() + self.db.add_all(connections) + self.db.commit() async def request_peers(self, host, port, key) -> typing.List['KademliaPeer']: async with self.semaphore: @@ -45,30 +155,32 @@ class Crawler: except lbry.dht.error.RemoteException as e: log.info('Previously responding peer errored: %s:%d attempt #%d - %s', host, port, (attempt + 1), str(e)) - self.error.add((host, port)) + self.inc_errors(peer) continue return [] async def crawl_routing_table(self, host, port): start = time.time() log.info("querying %s:%d", host, port) - self.known_peers.add((host, port)) - self.crawled.add((host, port)) address = await resolve_host(host, port, 'udp') + self.add_peers(make_kademlia_peer(None, address, port)) key = self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port) - if not key: - for _ in range(3): - try: - async with self.semaphore: - await self.node.protocol.get_rpc_peer(make_kademlia_peer(None, address, port)).ping() - key = self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port) - except asyncio.TimeoutError: - pass - except lbry.dht.error.RemoteException: - self.error.add((host, port)) - if not key: - self.unreachable.add((host, port)) - return set() + latency = None + for _ in range(3): + try: + async with self.semaphore: + ping_start = time.perf_counter_ns() + await self.node.protocol.get_rpc_peer(make_kademlia_peer(None, address, port)).ping() + key = self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port) + latency = time.perf_counter_ns() - ping_start + except asyncio.TimeoutError: + pass + except lbry.dht.error.RemoteException: + self.inc_errors(make_kademlia_peer(None, address, port)) + pass + self.set_latency(make_kademlia_peer(key, address, port), latency) + if not latency: + return set() node_id = key distance = Distance(key) max_distance = int.from_bytes(bytes([0xff] * 48), 'big') @@ -98,7 +210,8 @@ class Crawler: factor = 2048 log.info("Done querying %s:%d in %.2f seconds: %d peers found over %d requests.", host, port, (time.time() - start), len(peers), i) - self.crawled.update(peers) + self.add_peers(*peers) + self.associate_peers(make_kademlia_peer(key, address, port), peers) return peers async def process(self): @@ -109,21 +222,31 @@ class Crawler: to_process[_peer] = f f.add_done_callback(lambda _: to_process.pop(_peer)) - while to_process or len(self.known_peers) < len(self.crawled): - log.info("%d known, %d unreachable, %d error.. %d processing", - len(self.known_peers), len(self.unreachable), len(self.error), len(to_process)) - for peer in self.crawled.difference(self.known_peers): - self.known_peers.add(peer) - submit(peer) + to_check = self.get_peers_needing_check() + while True: + for peer in to_check: + if peer not in to_process: + submit(peer) + if len(to_process) > 20: + break + await asyncio.sleep(0) + log.info("%d known, %d unreachable, %d error, %d processing, %d on queue", + self.recent_peers_query.count(), self.unreachable_peers_count, self.peers_with_errors_count, + len(to_process), len(to_check)) await asyncio.wait(to_process.values(), return_when=asyncio.FIRST_COMPLETED) + to_check = self.get_peers_needing_check() + while not to_check and not to_process: + log.info("Idle, sleeping a minute.") + await asyncio.sleep(60.0) + to_check = self.get_peers_needing_check() async def test(): - crawler = Crawler() + crawler = Crawler("/tmp/a.db") await crawler.node.start_listening() conf = Config() - for (host, port) in conf.known_dht_nodes: - await crawler.crawl_routing_table(host, port) + #for (host, port) in conf.known_dht_nodes: + # await crawler.crawl_routing_table(host, port) await crawler.process() if __name__ == '__main__': From fb7a93096e0d2ad5d7e983858b504e2e9dc24724 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 11 Jun 2022 01:28:57 -0300 Subject: [PATCH 04/45] only count checked unreachable --- scripts/dht_crawler.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index 1b55d486e..b2d2fe182 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -81,7 +81,8 @@ class Crawler: @property def unreachable_peers_count(self): - return self.recent_peers_query.filter(DHTPeer.latency == None).count() + half_hour_ago = datetime.datetime.utcnow() - datetime.timedelta(minutes=30) + return self.recent_peers_query.filter(DHTPeer.latency == None, DHTPeer.last_check > half_hour_ago).count() @property def peers_with_errors_count(self): From 6c350e57dd1d527a95020e519f13051b94fc0acd Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 11 Jun 2022 01:35:30 -0300 Subject: [PATCH 05/45] dht_crawler: query recently checked as stats --- scripts/dht_crawler.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index b2d2fe182..1b7ca0c78 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -79,6 +79,11 @@ class Crawler: def all_peers(self): return set([peer.to_kad_peer() for peer in self.recent_peers_query.all()]) + @property + def checked_peers_count(self): + half_hour_ago = datetime.datetime.utcnow() - datetime.timedelta(minutes=30) + return self.recent_peers_query.filter(DHTPeer.last_check > half_hour_ago).count() + @property def unreachable_peers_count(self): half_hour_ago = datetime.datetime.utcnow() - datetime.timedelta(minutes=30) @@ -231,9 +236,9 @@ class Crawler: if len(to_process) > 20: break await asyncio.sleep(0) - log.info("%d known, %d unreachable, %d error, %d processing, %d on queue", - self.recent_peers_query.count(), self.unreachable_peers_count, self.peers_with_errors_count, - len(to_process), len(to_check)) + log.info("%d known, %d contacted recently, %d unreachable, %d error, %d processing, %d on queue", + self.recent_peers_query.count(), self.checked_peers_count, self.unreachable_peers_count, + self.peers_with_errors_count, len(to_process), len(to_check)) await asyncio.wait(to_process.values(), return_when=asyncio.FIRST_COMPLETED) to_check = self.get_peers_needing_check() while not to_check and not to_process: From abf4d888af3852df0706d08aaa789c512d5068e4 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 11 Jun 2022 03:04:45 -0300 Subject: [PATCH 06/45] dht_crawler: warn if we cannot get node id --- scripts/dht_crawler.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index 1b7ca0c78..c087d06fc 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -184,8 +184,10 @@ class Crawler: except lbry.dht.error.RemoteException: self.inc_errors(make_kademlia_peer(None, address, port)) pass - self.set_latency(make_kademlia_peer(key, address, port), latency) - if not latency: + self.set_latency(make_kademlia_peer(key, address, port), latency if key else None) + if not latency or not key: + if not key: + log.warning("No node id from %s:%d", host, port) return set() node_id = key distance = Distance(key) From 137d8ca4ac08fa2f915c70284e7ef1ff859eae84 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 11 Jun 2022 03:04:56 -0300 Subject: [PATCH 07/45] dht_crawler: enable WAL --- scripts/dht_crawler.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index c087d06fc..0f6436253 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -18,6 +18,13 @@ from sqlalchemy.orm import declarative_base, relationship import sqlalchemy as sqla +@sqla.event.listens_for(sqla.engine.Engine, "connect") +def set_sqlite_pragma(dbapi_connection, _): + cursor = dbapi_connection.cursor() + cursor.execute("PRAGMA journal_mode=WAL") + cursor.close() + + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s") log = logging.getLogger(__name__) Base = declarative_base() From adc79ec4041af934cab33f13c3736ba327ee857b Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 11 Jun 2022 03:10:30 -0300 Subject: [PATCH 08/45] dht_crawler: only warn for missing key if it replied --- scripts/dht_crawler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index 0f6436253..1e6a7db20 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -193,7 +193,7 @@ class Crawler: pass self.set_latency(make_kademlia_peer(key, address, port), latency if key else None) if not latency or not key: - if not key: + if latency and not key: log.warning("No node id from %s:%d", host, port) return set() node_id = key From 90c2a584700caae91c92e6af7c51f102ca4ea37e Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 11 Jun 2022 15:57:20 -0300 Subject: [PATCH 09/45] dht_crawler: dont gather empty, fix crash --- scripts/dht_crawler.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index 1e6a7db20..4b1f11e4f 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -248,7 +248,8 @@ class Crawler: log.info("%d known, %d contacted recently, %d unreachable, %d error, %d processing, %d on queue", self.recent_peers_query.count(), self.checked_peers_count, self.unreachable_peers_count, self.peers_with_errors_count, len(to_process), len(to_check)) - await asyncio.wait(to_process.values(), return_when=asyncio.FIRST_COMPLETED) + if to_process: + await asyncio.wait(to_process.values(), return_when=asyncio.FIRST_COMPLETED) to_check = self.get_peers_needing_check() while not to_check and not to_process: log.info("Idle, sleeping a minute.") From 443a1c32fa8f978e60535692618289df7999d0ea Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 11 Jun 2022 17:40:42 -0300 Subject: [PATCH 10/45] dht_crawler: save a set of connections to avoid dupes, enable initial crawl --- scripts/dht_crawler.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index 4b1f11e4f..093092a72 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -142,12 +142,12 @@ class Crawler: def associate_peers(self, target_peer, peers): db_peer = self.get_from_peer(target_peer) - connections = [ + connections = { DHTConnection( from_peer_id=db_peer.peer_id, to_peer_id=self.get_from_peer(peer).peer_id) for peer in peers - ] + } for peer in peers: self.db.query(DHTPeer).filter(DHTPeer.address == peer.address, DHTPeer.udp_port == peer.udp_port).update( {DHTPeer.last_seen: datetime.datetime.utcnow()}) @@ -261,8 +261,8 @@ async def test(): crawler = Crawler("/tmp/a.db") await crawler.node.start_listening() conf = Config() - #for (host, port) in conf.known_dht_nodes: - # await crawler.crawl_routing_table(host, port) + for (host, port) in conf.known_dht_nodes: + await crawler.crawl_routing_table(host, port) await crawler.process() if __name__ == '__main__': From be4c62cf32f84be08d15bbdc3cd0a492e23e05e0 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 11 Jun 2022 18:38:08 -0300 Subject: [PATCH 11/45] check membership instead of one update per peer --- scripts/dht_crawler.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index 093092a72..5c3935888 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -148,9 +148,9 @@ class Crawler: to_peer_id=self.get_from_peer(peer).peer_id) for peer in peers } - for peer in peers: - self.db.query(DHTPeer).filter(DHTPeer.address == peer.address, DHTPeer.udp_port == peer.udp_port).update( - {DHTPeer.last_seen: datetime.datetime.utcnow()}) + all_peer_ids = {peer.node_id for peer in peers if peer.node_id} + print(self.db.query(DHTPeer).filter(DHTPeer.node_id.in_(all_peer_ids)).update( + {DHTPeer.last_seen: datetime.datetime.utcnow()})) self.db.query(DHTConnection).filter(DHTConnection.from_peer_id == db_peer.peer_id).delete() self.db.add_all(connections) self.db.commit() From c6c27925b738a3a4a3c2ca2eb576e25a79801d8c Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 11 Jun 2022 19:38:00 -0300 Subject: [PATCH 12/45] dht_crawler: flush/commit only when finished --- scripts/dht_crawler.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index 5c3935888..57914ac38 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -74,7 +74,7 @@ class Crawler: self.semaphore = asyncio.Semaphore(20) engine = sqla.create_engine(f"sqlite:///{db_path}") Base.metadata.create_all(engine) - session = sqla.orm.sessionmaker(engine) + session = sqla.orm.sessionmaker(engine, autocommit=False, autoflush=False, expire_on_commit=False) self.db = session() @property @@ -113,7 +113,6 @@ class Crawler: self.db.add(db_peer) elif not db_peer: self.db.add(DHTPeer.from_kad_peer(peer)) - self.db.commit() def get_from_peer(self, peer): return self.db.query(DHTPeer).filter(DHTPeer.address==peer.address, DHTPeer.udp_port==peer.udp_port).first() @@ -129,13 +128,11 @@ class Crawler: db_peer.first_online = datetime.datetime.utcnow() db_peer.last_check = datetime.datetime.utcnow() self.db.add(db_peer) - self.db.commit() def inc_errors(self, peer): db_peer = self.get_from_peer(peer) db_peer.errors += 1 self.db.add(db_peer) - self.db.commit() def count_peers(self): return self.db.query(DHTPeer).count() @@ -149,11 +146,10 @@ class Crawler: for peer in peers } all_peer_ids = {peer.node_id for peer in peers if peer.node_id} - print(self.db.query(DHTPeer).filter(DHTPeer.node_id.in_(all_peer_ids)).update( - {DHTPeer.last_seen: datetime.datetime.utcnow()})) + self.db.query(DHTPeer).filter(DHTPeer.node_id.in_(all_peer_ids)).update( + {DHTPeer.last_seen: datetime.datetime.utcnow()}) self.db.query(DHTConnection).filter(DHTConnection.from_peer_id == db_peer.peer_id).delete() self.db.add_all(connections) - self.db.commit() async def request_peers(self, host, port, key) -> typing.List['KademliaPeer']: async with self.semaphore: @@ -227,6 +223,8 @@ class Crawler: host, port, (time.time() - start), len(peers), i) self.add_peers(*peers) self.associate_peers(make_kademlia_peer(key, address, port), peers) + self.db.flush() + self.db.commit() return peers async def process(self): From 61f7fbe23036bcdb4a8cba6df2986b30aa3b84b1 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 11 Jun 2022 19:58:26 -0300 Subject: [PATCH 13/45] dht_crawler: avoid reads --- scripts/dht_crawler.py | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index 57914ac38..44b50466e 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -106,13 +106,17 @@ class Crawler: sqla.or_(DHTPeer.last_check == None, DHTPeer.last_check < half_hour_ago)).all()]) def add_peers(self, *peers): + db_peers = [] for peer in peers: db_peer = self.get_from_peer(peer) if db_peer and db_peer.node_id is None and peer.node_id: db_peer.node_id = peer.node_id - self.db.add(db_peer) elif not db_peer: - self.db.add(DHTPeer.from_kad_peer(peer)) + db_peer = DHTPeer.from_kad_peer(peer) + self.db.add(db_peer) + db_peers.append(db_peer) + self.db.flush() + return [dbp.peer_id for dbp in db_peers] def get_from_peer(self, peer): return self.db.query(DHTPeer).filter(DHTPeer.address==peer.address, DHTPeer.udp_port==peer.udp_port).first() @@ -137,18 +141,16 @@ class Crawler: def count_peers(self): return self.db.query(DHTPeer).count() - def associate_peers(self, target_peer, peers): - db_peer = self.get_from_peer(target_peer) + def associate_peers(self, target_peer_id, db_peer_ids): connections = { DHTConnection( - from_peer_id=db_peer.peer_id, - to_peer_id=self.get_from_peer(peer).peer_id) - for peer in peers + from_peer_id=target_peer_id, + to_peer_id=peer_id) + for peer_id in db_peer_ids } - all_peer_ids = {peer.node_id for peer in peers if peer.node_id} - self.db.query(DHTPeer).filter(DHTPeer.node_id.in_(all_peer_ids)).update( + self.db.query(DHTPeer).filter(DHTPeer.node_id.in_(set(db_peer_ids))).update( {DHTPeer.last_seen: datetime.datetime.utcnow()}) - self.db.query(DHTConnection).filter(DHTConnection.from_peer_id == db_peer.peer_id).delete() + self.db.query(DHTConnection).filter(DHTConnection.from_peer_id == target_peer_id).delete() self.db.add_all(connections) async def request_peers(self, host, port, key) -> typing.List['KademliaPeer']: @@ -172,7 +174,7 @@ class Crawler: start = time.time() log.info("querying %s:%d", host, port) address = await resolve_host(host, port, 'udp') - self.add_peers(make_kademlia_peer(None, address, port)) + this_peer_id, = self.add_peers(make_kademlia_peer(None, address, port)) key = self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port) latency = None for _ in range(3): @@ -221,9 +223,8 @@ class Crawler: factor = 2048 log.info("Done querying %s:%d in %.2f seconds: %d peers found over %d requests.", host, port, (time.time() - start), len(peers), i) - self.add_peers(*peers) - self.associate_peers(make_kademlia_peer(key, address, port), peers) - self.db.flush() + db_peer_ids = self.add_peers(*peers) + self.associate_peers(this_peer_id, db_peer_ids) self.db.commit() return peers From baf422fc033d30dff7daf0d93117154478ab8d3d Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 11 Jun 2022 20:20:20 -0300 Subject: [PATCH 14/45] dht_crawler: extract refresh_limit, bump to 1h --- scripts/dht_crawler.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index 44b50466e..048eb34a4 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -77,10 +77,13 @@ class Crawler: session = sqla.orm.sessionmaker(engine, autocommit=False, autoflush=False, expire_on_commit=False) self.db = session() + @property + def refresh_limit(self): + return datetime.datetime.utcnow() - datetime.timedelta(hours=1) + @property def recent_peers_query(self): - half_hour_ago = datetime.datetime.utcnow() - datetime.timedelta(minutes=30) - return self.db.query(DHTPeer).filter(DHTPeer.last_seen > half_hour_ago) + return self.db.query(DHTPeer).filter(DHTPeer.last_seen > self.refresh_limit) @property def all_peers(self): @@ -88,22 +91,19 @@ class Crawler: @property def checked_peers_count(self): - half_hour_ago = datetime.datetime.utcnow() - datetime.timedelta(minutes=30) - return self.recent_peers_query.filter(DHTPeer.last_check > half_hour_ago).count() + return self.recent_peers_query.filter(DHTPeer.last_check > self.refresh_limit).count() @property def unreachable_peers_count(self): - half_hour_ago = datetime.datetime.utcnow() - datetime.timedelta(minutes=30) - return self.recent_peers_query.filter(DHTPeer.latency == None, DHTPeer.last_check > half_hour_ago).count() + return self.recent_peers_query.filter(DHTPeer.latency == None, DHTPeer.last_check > self.refresh_limit).count() @property def peers_with_errors_count(self): return self.recent_peers_query.filter(DHTPeer.errors > 0).count() def get_peers_needing_check(self): - half_hour_ago = datetime.datetime.utcnow() - datetime.timedelta(minutes=30) return set([peer.to_kad_peer() for peer in self.recent_peers_query.filter( - sqla.or_(DHTPeer.last_check == None, DHTPeer.last_check < half_hour_ago)).all()]) + sqla.or_(DHTPeer.last_check == None, DHTPeer.last_check < self.refresh_limit)).all()]) def add_peers(self, *peers): db_peers = [] From 174439f517873e9984a6b8ffd1ceefa96c588e32 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 11 Jun 2022 20:53:25 -0300 Subject: [PATCH 15/45] dht_crawler: cleanup, try not to reset key --- scripts/dht_crawler.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index 048eb34a4..65e043e16 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -179,10 +179,10 @@ class Crawler: latency = None for _ in range(3): try: + ping_start = time.perf_counter_ns() async with self.semaphore: - ping_start = time.perf_counter_ns() await self.node.protocol.get_rpc_peer(make_kademlia_peer(None, address, port)).ping() - key = self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port) + key = key or self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port) latency = time.perf_counter_ns() - ping_start except asyncio.TimeoutError: pass @@ -200,7 +200,6 @@ class Crawler: peers = set() factor = 2048 for i in range(200): - #print(i, len(peers), key.hex(), host) new_peers = await self.request_peers(address, port, key) if not new_peers: break From 965389b759abf2b0590b3ff00cef4dbb7e73f7dc Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 11 Jun 2022 21:06:27 -0300 Subject: [PATCH 16/45] dht_crawler: process older first, avoid discarding --- scripts/dht_crawler.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index 65e043e16..85a35fa4d 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -103,7 +103,8 @@ class Crawler: def get_peers_needing_check(self): return set([peer.to_kad_peer() for peer in self.recent_peers_query.filter( - sqla.or_(DHTPeer.last_check == None, DHTPeer.last_check < self.refresh_limit)).all()]) + sqla.or_(DHTPeer.last_check == None, + DHTPeer.last_check < self.refresh_limit)).order_by(DHTPeer.last_seen.desc()).all()]) def add_peers(self, *peers): db_peers = [] From 29c2d5715dafa6a56b8a14d8eef89bc34571bdaa Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 11 Jun 2022 21:11:37 -0300 Subject: [PATCH 17/45] dht_crawler: fix last_seen update --- scripts/dht_crawler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index 85a35fa4d..b41097909 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -149,7 +149,7 @@ class Crawler: to_peer_id=peer_id) for peer_id in db_peer_ids } - self.db.query(DHTPeer).filter(DHTPeer.node_id.in_(set(db_peer_ids))).update( + self.db.query(DHTPeer).filter(DHTPeer.peer_id.in_(set(db_peer_ids))).update( {DHTPeer.last_seen: datetime.datetime.utcnow()}) self.db.query(DHTConnection).filter(DHTConnection.from_peer_id == target_peer_id).delete() self.db.add_all(connections) From 2706b66a9239cacae154139013d109dd002d446d Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 13 Jun 2022 16:29:28 -0300 Subject: [PATCH 18/45] dht_crawler: dont re-bootstrap. try known reachable even when they expire --- scripts/dht_crawler.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index b41097909..b7eb96374 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -82,27 +82,27 @@ class Crawler: return datetime.datetime.utcnow() - datetime.timedelta(hours=1) @property - def recent_peers_query(self): - return self.db.query(DHTPeer).filter(DHTPeer.last_seen > self.refresh_limit) + def active_peers_query(self): + return self.db.query(DHTPeer).filter(sqla.or_(DHTPeer.last_seen > self.refresh_limit, DHTPeer.latency > 0)) @property def all_peers(self): - return set([peer.to_kad_peer() for peer in self.recent_peers_query.all()]) + return set([peer.to_kad_peer() for peer in self.active_peers_query.all()]) @property def checked_peers_count(self): - return self.recent_peers_query.filter(DHTPeer.last_check > self.refresh_limit).count() + return self.active_peers_query.filter(DHTPeer.last_check > self.refresh_limit).count() @property def unreachable_peers_count(self): - return self.recent_peers_query.filter(DHTPeer.latency == None, DHTPeer.last_check > self.refresh_limit).count() + return self.active_peers_query.filter(DHTPeer.latency == None, DHTPeer.last_check > self.refresh_limit).count() @property def peers_with_errors_count(self): - return self.recent_peers_query.filter(DHTPeer.errors > 0).count() + return self.active_peers_query.filter(DHTPeer.errors > 0).count() def get_peers_needing_check(self): - return set([peer.to_kad_peer() for peer in self.recent_peers_query.filter( + return set([peer.to_kad_peer() for peer in self.active_peers_query.filter( sqla.or_(DHTPeer.last_check == None, DHTPeer.last_check < self.refresh_limit)).order_by(DHTPeer.last_seen.desc()).all()]) @@ -245,7 +245,7 @@ class Crawler: break await asyncio.sleep(0) log.info("%d known, %d contacted recently, %d unreachable, %d error, %d processing, %d on queue", - self.recent_peers_query.count(), self.checked_peers_count, self.unreachable_peers_count, + self.active_peers_query.count(), self.checked_peers_count, self.unreachable_peers_count, self.peers_with_errors_count, len(to_process), len(to_check)) if to_process: await asyncio.wait(to_process.values(), return_when=asyncio.FIRST_COMPLETED) @@ -260,8 +260,9 @@ async def test(): crawler = Crawler("/tmp/a.db") await crawler.node.start_listening() conf = Config() - for (host, port) in conf.known_dht_nodes: - await crawler.crawl_routing_table(host, port) + if crawler.active_peers_query.count() < 100: + for (host, port) in conf.known_dht_nodes: + await crawler.crawl_routing_table(host, port) await crawler.process() if __name__ == '__main__': From cd42f0d726cdeb571f0e8dd5f114900eb61a8bd3 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 15 Jun 2022 10:39:15 -0300 Subject: [PATCH 19/45] dht_crawler: fix node id store --- scripts/dht_crawler.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index b7eb96374..e0ffa071b 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -48,10 +48,12 @@ class DHTPeer(Base): @classmethod def from_kad_peer(cls, peer): - return DHTPeer(node_id=peer.node_id, address=peer.address, udp_port=peer.udp_port, tcp_port=peer.tcp_port) + node_id = peer.node_id.hex() if peer.node_id else None + return DHTPeer(node_id=node_id, address=peer.address, udp_port=peer.udp_port, tcp_port=peer.tcp_port) def to_kad_peer(self): - return make_kademlia_peer(self.node_id, self.address, self.udp_port, self.tcp_port) + node_id = bytes.fromhex(self.node_id.hex()) if self.node_id else None + return make_kademlia_peer(node_id, self.address, self.udp_port, self.tcp_port) class DHTConnection(Base): @@ -111,7 +113,7 @@ class Crawler: for peer in peers: db_peer = self.get_from_peer(peer) if db_peer and db_peer.node_id is None and peer.node_id: - db_peer.node_id = peer.node_id + db_peer.node_id = peer.node_id.hex() elif not db_peer: db_peer = DHTPeer.from_kad_peer(peer) self.db.add(db_peer) @@ -126,7 +128,7 @@ class Crawler: db_peer = self.get_from_peer(peer) db_peer.latency = latency if not db_peer.node_id: - db_peer.node_id = peer.node_id + db_peer.node_id = peer.node_id.hex() if db_peer.first_online and latency is None: db_peer.last_churn = (datetime.datetime.utcnow() - db_peer.first_online).seconds elif latency is not None and db_peer.first_online is None: From 508bdb8e94f1b22860066ff71d2c035a2803565b Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 15 Jun 2022 11:28:17 -0300 Subject: [PATCH 20/45] dht_crawler: keep working set in memory, flush to db on intervals --- scripts/dht_crawler.py | 77 +++++++++++++++++++++--------------------- 1 file changed, 39 insertions(+), 38 deletions(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index e0ffa071b..39a07441f 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -52,7 +52,7 @@ class DHTPeer(Base): return DHTPeer(node_id=node_id, address=peer.address, udp_port=peer.udp_port, tcp_port=peer.tcp_port) def to_kad_peer(self): - node_id = bytes.fromhex(self.node_id.hex()) if self.node_id else None + node_id = bytes.fromhex(self.node_id) if self.node_id else None return make_kademlia_peer(node_id, self.address, self.udp_port, self.tcp_port) @@ -73,40 +73,46 @@ def new_node(address="0.0.0.0", udp_port=4444, node_id=None): class Crawler: def __init__(self, db_path: str): self.node = new_node() - self.semaphore = asyncio.Semaphore(20) + self.semaphore = asyncio.Semaphore(200) engine = sqla.create_engine(f"sqlite:///{db_path}") Base.metadata.create_all(engine) session = sqla.orm.sessionmaker(engine, autocommit=False, autoflush=False, expire_on_commit=False) self.db = session() + self._memory_peers = { + (peer.address, peer.udp_port): peer for peer in self.db.query(DHTPeer).all() + } @property def refresh_limit(self): return datetime.datetime.utcnow() - datetime.timedelta(hours=1) @property - def active_peers_query(self): - return self.db.query(DHTPeer).filter(sqla.or_(DHTPeer.last_seen > self.refresh_limit, DHTPeer.latency > 0)) + def all_peers(self): + return [ + peer for peer in self._memory_peers.values() + if (peer.last_seen and peer.last_seen > self.refresh_limit) or (peer.latency or 0) > 0 + ] @property - def all_peers(self): - return set([peer.to_kad_peer() for peer in self.active_peers_query.all()]) + def active_peers_count(self): + return len(self.all_peers) @property def checked_peers_count(self): - return self.active_peers_query.filter(DHTPeer.last_check > self.refresh_limit).count() + return len([peer for peer in self.all_peers if peer.last_check and peer.last_check > self.refresh_limit]) @property def unreachable_peers_count(self): - return self.active_peers_query.filter(DHTPeer.latency == None, DHTPeer.last_check > self.refresh_limit).count() + return len([peer for peer in self.all_peers + if peer.last_check and peer.last_check > self.refresh_limit and not peer.latency]) @property def peers_with_errors_count(self): - return self.active_peers_query.filter(DHTPeer.errors > 0).count() + return len([peer for peer in self.all_peers if (peer.errors or 0) > 0]) def get_peers_needing_check(self): - return set([peer.to_kad_peer() for peer in self.active_peers_query.filter( - sqla.or_(DHTPeer.last_check == None, - DHTPeer.last_check < self.refresh_limit)).order_by(DHTPeer.last_seen.desc()).all()]) + to_check = [peer for peer in self.all_peers if peer.last_check is None or peer.last_check < self.refresh_limit] + return to_check def add_peers(self, *peers): db_peers = [] @@ -116,13 +122,16 @@ class Crawler: db_peer.node_id = peer.node_id.hex() elif not db_peer: db_peer = DHTPeer.from_kad_peer(peer) - self.db.add(db_peer) + self._memory_peers[(peer.address, peer.udp_port)] = db_peer + db_peer.last_seen = datetime.datetime.utcnow() db_peers.append(db_peer) - self.db.flush() - return [dbp.peer_id for dbp in db_peers] + + def flush_to_db(self): + self.db.add_all(self._memory_peers.values()) + self.db.commit() def get_from_peer(self, peer): - return self.db.query(DHTPeer).filter(DHTPeer.address==peer.address, DHTPeer.udp_port==peer.udp_port).first() + return self._memory_peers.get((peer.address, peer.udp_port), None) def set_latency(self, peer, latency=None): db_peer = self.get_from_peer(peer) @@ -134,27 +143,13 @@ class Crawler: elif latency is not None and db_peer.first_online is None: db_peer.first_online = datetime.datetime.utcnow() db_peer.last_check = datetime.datetime.utcnow() - self.db.add(db_peer) def inc_errors(self, peer): db_peer = self.get_from_peer(peer) - db_peer.errors += 1 - self.db.add(db_peer) - - def count_peers(self): - return self.db.query(DHTPeer).count() + db_peer.errors = (db_peer.errors or 0) + 1 def associate_peers(self, target_peer_id, db_peer_ids): - connections = { - DHTConnection( - from_peer_id=target_peer_id, - to_peer_id=peer_id) - for peer_id in db_peer_ids - } - self.db.query(DHTPeer).filter(DHTPeer.peer_id.in_(set(db_peer_ids))).update( - {DHTPeer.last_seen: datetime.datetime.utcnow()}) - self.db.query(DHTConnection).filter(DHTConnection.from_peer_id == target_peer_id).delete() - self.db.add_all(connections) + return # todo async def request_peers(self, host, port, key) -> typing.List['KademliaPeer']: async with self.semaphore: @@ -177,7 +172,7 @@ class Crawler: start = time.time() log.info("querying %s:%d", host, port) address = await resolve_host(host, port, 'udp') - this_peer_id, = self.add_peers(make_kademlia_peer(None, address, port)) + self.add_peers(make_kademlia_peer(None, address, port)) key = self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port) latency = None for _ in range(3): @@ -225,8 +220,8 @@ class Crawler: factor = 2048 log.info("Done querying %s:%d in %.2f seconds: %d peers found over %d requests.", host, port, (time.time() - start), len(peers), i) - db_peer_ids = self.add_peers(*peers) - self.associate_peers(this_peer_id, db_peer_ids) + self.add_peers(*peers) + #self.associate_peers(this_peer_id, db_peer_ids) self.db.commit() return peers @@ -239,19 +234,25 @@ class Crawler: f.add_done_callback(lambda _: to_process.pop(_peer)) to_check = self.get_peers_needing_check() + last_flush = datetime.datetime.utcnow() while True: for peer in to_check: if peer not in to_process: submit(peer) - if len(to_process) > 20: + await asyncio.sleep(.1) + if len(to_process) > 100: break await asyncio.sleep(0) log.info("%d known, %d contacted recently, %d unreachable, %d error, %d processing, %d on queue", - self.active_peers_query.count(), self.checked_peers_count, self.unreachable_peers_count, + self.active_peers_count, self.checked_peers_count, self.unreachable_peers_count, self.peers_with_errors_count, len(to_process), len(to_check)) if to_process: await asyncio.wait(to_process.values(), return_when=asyncio.FIRST_COMPLETED) to_check = self.get_peers_needing_check() + if (datetime.datetime.utcnow() - last_flush).seconds > 60: + log.info("flushing to db") + self.flush_to_db() + last_flush = datetime.datetime.utcnow() while not to_check and not to_process: log.info("Idle, sleeping a minute.") await asyncio.sleep(60.0) @@ -262,7 +263,7 @@ async def test(): crawler = Crawler("/tmp/a.db") await crawler.node.start_listening() conf = Config() - if crawler.active_peers_query.count() < 100: + if crawler.active_peers_count < 100: for (host, port) in conf.known_dht_nodes: await crawler.crawl_routing_table(host, port) await crawler.process() From 0497698c5b5dd3e0c0c0a3404ce15f9a20a2c362 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 17 Jun 2022 00:21:43 -0300 Subject: [PATCH 21/45] dht_crawler: skip ping if known node_id --- scripts/dht_crawler.py | 55 ++++++++++++++++++++++++------------------ 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index 39a07441f..c4ea6c75f 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -136,7 +136,7 @@ class Crawler: def set_latency(self, peer, latency=None): db_peer = self.get_from_peer(peer) db_peer.latency = latency - if not db_peer.node_id: + if not db_peer.node_id and peer.node_id: db_peer.node_id = peer.node_id.hex() if db_peer.first_online and latency is None: db_peer.last_churn = (datetime.datetime.utcnow() - db_peer.first_online).seconds @@ -156,42 +156,48 @@ class Crawler: peer = make_kademlia_peer(None, await resolve_host(host, port, 'udp'), port) for attempt in range(3): try: + req_start = time.perf_counter_ns() response = await self.node.protocol.get_rpc_peer(peer).find_node(key) + latency = time.perf_counter_ns() - req_start + self.set_latency(make_kademlia_peer(key, host, port), latency) return [make_kademlia_peer(*peer_tuple) for peer_tuple in response] except asyncio.TimeoutError: - log.info('Previously responding peer timed out: %s:%d attempt #%d', host, port, (attempt + 1)) + self.set_latency(make_kademlia_peer(key, host, port), None) continue except lbry.dht.error.RemoteException as e: - log.info('Previously responding peer errored: %s:%d attempt #%d - %s', + log.info('Peer errored: %s:%d attempt #%d - %s', host, port, (attempt + 1), str(e)) self.inc_errors(peer) + self.set_latency(make_kademlia_peer(key, host, port), None) continue return [] - async def crawl_routing_table(self, host, port): + async def crawl_routing_table(self, host, port, node_id=None): start = time.time() log.info("querying %s:%d", host, port) address = await resolve_host(host, port, 'udp') self.add_peers(make_kademlia_peer(None, address, port)) - key = self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port) - latency = None - for _ in range(3): - try: - ping_start = time.perf_counter_ns() - async with self.semaphore: - await self.node.protocol.get_rpc_peer(make_kademlia_peer(None, address, port)).ping() - key = key or self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port) - latency = time.perf_counter_ns() - ping_start - except asyncio.TimeoutError: - pass - except lbry.dht.error.RemoteException: - self.inc_errors(make_kademlia_peer(None, address, port)) - pass - self.set_latency(make_kademlia_peer(key, address, port), latency if key else None) - if not latency or not key: - if latency and not key: - log.warning("No node id from %s:%d", host, port) - return set() + key = node_id or self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port) + if not key: + latency = None + for _ in range(3): + try: + ping_start = time.perf_counter_ns() + async with self.semaphore: + await self.node.protocol.get_rpc_peer(make_kademlia_peer(None, address, port)).ping() + key = key or self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port) + latency = time.perf_counter_ns() - ping_start + break + except asyncio.TimeoutError: + pass + except lbry.dht.error.RemoteException: + self.inc_errors(make_kademlia_peer(None, address, port)) + pass + self.set_latency(make_kademlia_peer(key, address, port), latency if key else None) + if not latency or not key: + if latency and not key: + log.warning("No node id from %s:%d", host, port) + return set() node_id = key distance = Distance(key) max_distance = int.from_bytes(bytes([0xff] * 48), 'big') @@ -229,7 +235,8 @@ class Crawler: to_process = {} def submit(_peer): - f = asyncio.ensure_future(self.crawl_routing_table(_peer.address, peer.udp_port)) + f = asyncio.ensure_future( + self.crawl_routing_table(_peer.address, peer.udp_port, bytes.fromhex(peer.node_id))) to_process[_peer] = f f.add_done_callback(lambda _: to_process.pop(_peer)) From cfe5c8de8aa90f026f69cf238a8e7adcd2f67a90 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 17 Jun 2022 06:09:57 -0300 Subject: [PATCH 22/45] dht_crawler: serve prometheus metrics at 7070 --- scripts/dht_crawler.py | 64 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index c4ea6c75f..8acd6c6e6 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -4,6 +4,9 @@ import asyncio import time import typing +from aiohttp import web +from prometheus_client import Gauge, Counter, generate_latest as prom_generate_latest + import lbry.dht.error from lbry.dht.constants import generate_id from lbry.dht.node import Node @@ -71,6 +74,34 @@ def new_node(address="0.0.0.0", udp_port=4444, node_id=None): class Crawler: + unique_total_hosts_metric = Gauge( + "unique_total_hosts", "Number of unique hosts seen in the last interval", namespace="dht_crawler_node", + labelnames=("scope",) + ) + reachable_hosts_metric = Gauge( + "reachable_hosts", "Number of hosts that replied in the last interval", namespace="dht_crawler_node", + labelnames=("scope",) + ) + total_historic_hosts_metric = Gauge( + "history_total_hosts", "Number of hosts seen since first run.", namespace="dht_crawler_node", + labelnames=("scope",) + ) + pending_check_hosts_metric = Gauge( + "pending_hosts", "Number of hosts on queue to be checked.", namespace="dht_crawler_node", + labelnames=("scope",) + ) + hosts_with_errors_metric = Gauge( + "error_hosts", "Number of hosts that raised errors during contact.", namespace="dht_crawler_node", + labelnames=("scope",) + ) + connections_found_metric = Gauge( + "connections_found", "Number of hosts returned by the last successful contact.", namespace="dht_crawler_node", + labelnames=("host", "port") + ) + host_latency_metric = Gauge( + "host_latency", "Time spent on the last request, in nanoseconds.", namespace="dht_crawler_node", + labelnames=("host", "port") + ) def __init__(self, db_path: str): self.node = new_node() self.semaphore = asyncio.Semaphore(200) @@ -134,6 +165,8 @@ class Crawler: return self._memory_peers.get((peer.address, peer.udp_port), None) def set_latency(self, peer, latency=None): + if latency: + self.host_latency_metric.labels(host=peer.address, port=peer.udp_port).set(latency) db_peer = self.get_from_peer(peer) db_peer.latency = latency if not db_peer.node_id and peer.node_id: @@ -227,6 +260,7 @@ class Crawler: log.info("Done querying %s:%d in %.2f seconds: %d peers found over %d requests.", host, port, (time.time() - start), len(peers), i) self.add_peers(*peers) + self.connections_found_metric.labels(host=host, port=port).set(len(peers)) #self.associate_peers(this_peer_id, db_peer_ids) self.db.commit() return peers @@ -250,6 +284,11 @@ class Crawler: if len(to_process) > 100: break await asyncio.sleep(0) + self.unique_total_hosts_metric.labels("global").set(self.checked_peers_count) + self.reachable_hosts_metric.labels("global").set(self.checked_peers_count - self.unreachable_peers_count) + self.total_historic_hosts_metric.labels("global").set(len(self._memory_peers)) + self.pending_check_hosts_metric.labels("global").set(len(to_check)) + self.hosts_with_errors_metric.labels("global").set(self.peers_with_errors_count) log.info("%d known, %d contacted recently, %d unreachable, %d error, %d processing, %d on queue", self.active_peers_count, self.checked_peers_count, self.unreachable_peers_count, self.peers_with_errors_count, len(to_process), len(to_check)) @@ -266,7 +305,32 @@ class Crawler: to_check = self.get_peers_needing_check() +class SimpleMetrics: + def __init__(self, port): + self.prometheus_port = port + + async def handle_metrics_get_request(self, _): + try: + return web.Response( + text=prom_generate_latest().decode(), + content_type='text/plain; version=0.0.4' + ) + except Exception: + log.exception('could not generate prometheus data') + raise + + async def start(self): + prom_app = web.Application() + prom_app.router.add_get('/metrics', self.handle_metrics_get_request) + metrics_runner = web.AppRunner(prom_app) + await metrics_runner.setup() + prom_site = web.TCPSite(metrics_runner, "0.0.0.0", self.prometheus_port) + await prom_site.start() + + async def test(): + metrics = SimpleMetrics('7070') + await metrics.start() crawler = Crawler("/tmp/a.db") await crawler.node.start_listening() conf = Config() From 62eb9d5c75f2e70fee85edbd15a54d59a1e2ac89 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 20 Jun 2022 06:07:05 -0300 Subject: [PATCH 23/45] dht_crawler: only count non zero connections --- scripts/dht_crawler.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index 8acd6c6e6..75479053d 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -260,7 +260,8 @@ class Crawler: log.info("Done querying %s:%d in %.2f seconds: %d peers found over %d requests.", host, port, (time.time() - start), len(peers), i) self.add_peers(*peers) - self.connections_found_metric.labels(host=host, port=port).set(len(peers)) + if peers: + self.connections_found_metric.labels(host=host, port=port).set(len(peers)) #self.associate_peers(this_peer_id, db_peer_ids) self.db.commit() return peers From 85ff487af517a733e79e8fbce0c372ad92b3f2d8 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 21 Jun 2022 05:58:19 -0300 Subject: [PATCH 24/45] dht_crawler: randomize port when idle --- scripts/dht_crawler.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index 75479053d..523f96a61 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -67,7 +67,7 @@ class DHTConnection(Base): connected_to = relationship("DHTPeer", backref="connections", primaryjoin=(DHTPeer.peer_id == to_peer_id)) -def new_node(address="0.0.0.0", udp_port=4444, node_id=None): +def new_node(address="0.0.0.0", udp_port=0, node_id=None): node_id = node_id or generate_id() loop = asyncio.get_event_loop() return Node(loop, PeerManager(loop), node_id, udp_port, udp_port, 3333, address) @@ -301,7 +301,10 @@ class Crawler: self.flush_to_db() last_flush = datetime.datetime.utcnow() while not to_check and not to_process: - log.info("Idle, sleeping a minute.") + port = self.node.listening_port.get_extra_info('socket').getsockname()[1] + self.node.stop() + await self.node.start_listening() + log.info("Idle, sleeping a minute. Port changed to %d", port) await asyncio.sleep(60.0) to_check = self.get_peers_needing_check() From 4a3a7e318d52222ffff6bb431bbe6ab47086f0cd Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 27 Jun 2022 04:12:30 -0300 Subject: [PATCH 25/45] update pip and setuptools on dht dockerfile --- docker/Dockerfile.dht_node | 1 + 1 file changed, 1 insertion(+) diff --git a/docker/Dockerfile.dht_node b/docker/Dockerfile.dht_node index d44370fc5..9399911d9 100644 --- a/docker/Dockerfile.dht_node +++ b/docker/Dockerfile.dht_node @@ -31,6 +31,7 @@ RUN chown -R $user:$user $projects_dir USER $user WORKDIR $projects_dir +RUN python3 -m pip install -U setuptools pip RUN make install RUN python3 docker/set_build.py RUN rm ~/.cache -rf From 47a5d37d7c1ee7d4086befd646fa8ff901920ef4 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 27 Jun 2022 04:28:26 -0300 Subject: [PATCH 26/45] change default metric port, add sqlalchemy to dockerfile --- docker/Dockerfile.dht_node | 2 +- scripts/dht_crawler.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docker/Dockerfile.dht_node b/docker/Dockerfile.dht_node index 9399911d9..0b7932a84 100644 --- a/docker/Dockerfile.dht_node +++ b/docker/Dockerfile.dht_node @@ -31,7 +31,7 @@ RUN chown -R $user:$user $projects_dir USER $user WORKDIR $projects_dir -RUN python3 -m pip install -U setuptools pip +RUN python3 -m pip install -U setuptools pip sqlalchemy RUN make install RUN python3 docker/set_build.py RUN rm ~/.cache -rf diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index 523f96a61..bc8cee37e 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -333,7 +333,7 @@ class SimpleMetrics: async def test(): - metrics = SimpleMetrics('7070') + metrics = SimpleMetrics('8080') await metrics.start() crawler = Crawler("/tmp/a.db") await crawler.node.start_listening() From 13af7800c2c5c20633744b165699cb1eb0b20015 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 5 Jul 2022 01:35:44 -0300 Subject: [PATCH 27/45] refactor script, remove dep --- docker/Dockerfile.dht_node | 2 +- scripts/dht_crawler.py | 255 ++++++++++++++++++++++--------------- 2 files changed, 154 insertions(+), 103 deletions(-) diff --git a/docker/Dockerfile.dht_node b/docker/Dockerfile.dht_node index 0b7932a84..9399911d9 100644 --- a/docker/Dockerfile.dht_node +++ b/docker/Dockerfile.dht_node @@ -31,7 +31,7 @@ RUN chown -R $user:$user $projects_dir USER $user WORKDIR $projects_dir -RUN python3 -m pip install -U setuptools pip sqlalchemy +RUN python3 -m pip install -U setuptools pip RUN make install RUN python3 docker/set_build.py RUN rm ~/.cache -rf diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index bc8cee37e..7a76e1639 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -3,9 +3,10 @@ import logging import asyncio import time import typing +from dataclasses import dataclass, astuple, replace from aiohttp import web -from prometheus_client import Gauge, Counter, generate_latest as prom_generate_latest +from prometheus_client import Gauge, generate_latest as prom_generate_latest import lbry.dht.error from lbry.dht.constants import generate_id @@ -17,56 +18,92 @@ from lbry.conf import Config from lbry.utils import resolve_host -from sqlalchemy.orm import declarative_base, relationship -import sqlalchemy as sqla - - -@sqla.event.listens_for(sqla.engine.Engine, "connect") -def set_sqlite_pragma(dbapi_connection, _): - cursor = dbapi_connection.cursor() - cursor.execute("PRAGMA journal_mode=WAL") - cursor.close() - - logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s") log = logging.getLogger(__name__) -Base = declarative_base() -class DHTPeer(Base): - __tablename__ = "peer" - peer_id = sqla.Column(sqla.Integer(), sqla.Identity(), primary_key=True) - node_id = sqla.Column(sqla.String(96)) - address = sqla.Column(sqla.String()) - udp_port = sqla.Column(sqla.Integer()) - tcp_port = sqla.Column(sqla.Integer()) - first_online = sqla.Column(sqla.DateTime()) - errors = sqla.Column(sqla.Integer(), default=0) - last_churn = sqla.Column(sqla.Integer()) - added_on = sqla.Column(sqla.DateTime(), nullable=False, default=datetime.datetime.utcnow) - last_check = sqla.Column(sqla.DateTime()) - last_seen = sqla.Column(sqla.DateTime()) - latency = sqla.Column(sqla.Integer()) - endpoint_unique = sqla.UniqueConstraint("node_id", "udp_port") +class PeerStorage(SQLiteMixin): + CREATE_TABLES_QUERY = """ + PRAGMA JOURNAL_MODE=WAL; + CREATE TABLE IF NOT EXISTS peer ( + peer_id INTEGER NOT NULL, + node_id VARCHAR(96), + address VARCHAR, + udp_port INTEGER, + tcp_port INTEGER, + first_online DATETIME, + errors INTEGER, + last_churn INTEGER, + added_on DATETIME NOT NULL, + last_check DATETIME, + last_seen DATETIME, + latency INTEGER, + PRIMARY KEY (peer_id) + ); + CREATE TABLE IF NOT EXISTS connection ( + from_peer_id INTEGER NOT NULL, + to_peer_id INTEGER NOT NULL, + PRIMARY KEY (from_peer_id, to_peer_id), + FOREIGN KEY(from_peer_id) REFERENCES peer (peer_id), + FOREIGN KEY(to_peer_id) REFERENCES peer (peer_id) + ); +""" + + async def open(self): + await super().open() + self.db.writer_connection.row_factory = dict_row_factory + + async def all_peers(self): + return [DHTPeer(**peer) for peer in await self.db.execute_fetchall("select * from peer")] + + async def save_peers(self, *peers): + log.info("Saving graph nodes (peers) to DB") + await self.db.executemany( + "INSERT OR REPLACE INTO peer(" + "node_id, address, udp_port, tcp_port, first_online, errors, last_churn," + "added_on, last_check, last_seen, latency, peer_id) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)", + [astuple(peer) for peer in peers] + ) + log.info("Finished saving graph nodes (peers) to DB") + + async def save_connections(self, connections_map): + log.info("Saving graph edges (connections) to DB") + await self.db.executemany( + "DELETE FROM connection WHERE from_peer_id = ?", [(key,) for key in connections_map]) + for from_peer_id in connections_map: + await self.db.executemany( + "INSERT INTO connection(from_peer_id, to_peer_id) VALUES(?,?)", + [(from_peer_id, to_peer_id) for to_peer_id in connections_map[from_peer_id]]) + log.info("Finished saving graph edges (connections) to DB") + + +@dataclass(frozen=True) +class DHTPeer: + node_id: str + address: str + udp_port: int + tcp_port: int = None + first_online: datetime.datetime = None + errors: int = None + last_churn: int = None + added_on: datetime.datetime = None + last_check: datetime.datetime = None + last_seen: datetime.datetime = None + latency: int = None + peer_id: int = None @classmethod - def from_kad_peer(cls, peer): + def from_kad_peer(cls, peer, peer_id): node_id = peer.node_id.hex() if peer.node_id else None - return DHTPeer(node_id=node_id, address=peer.address, udp_port=peer.udp_port, tcp_port=peer.tcp_port) + return DHTPeer( + node_id=node_id, address=peer.address, udp_port=peer.udp_port, tcp_port=peer.tcp_port, + peer_id=peer_id, added_on=datetime.datetime.utcnow()) def to_kad_peer(self): node_id = bytes.fromhex(self.node_id) if self.node_id else None return make_kademlia_peer(node_id, self.address, self.udp_port, self.tcp_port) -class DHTConnection(Base): - __tablename__ = "connection" - from_peer_id = sqla.Column(sqla.Integer(), sqla.ForeignKey("peer.peer_id"), primary_key=True) - connected_by = relationship("DHTPeer", backref="known_by", primaryjoin=(DHTPeer.peer_id == from_peer_id)) - to_peer_id = sqla.Column(sqla.Integer(), sqla.ForeignKey("peer.peer_id"), primary_key=True) - connected_to = relationship("DHTPeer", backref="connections", primaryjoin=(DHTPeer.peer_id == to_peer_id)) - - def new_node(address="0.0.0.0", udp_port=0, node_id=None): node_id = node_id or generate_id() loop = asyncio.get_event_loop() @@ -102,15 +139,17 @@ class Crawler: "host_latency", "Time spent on the last request, in nanoseconds.", namespace="dht_crawler_node", labelnames=("host", "port") ) + def __init__(self, db_path: str): self.node = new_node() - self.semaphore = asyncio.Semaphore(200) - engine = sqla.create_engine(f"sqlite:///{db_path}") - Base.metadata.create_all(engine) - session = sqla.orm.sessionmaker(engine, autocommit=False, autoflush=False, expire_on_commit=False) - self.db = session() + self.db = PeerStorage(db_path) + self._memory_peers = {} + self._connections = {} + + async def open(self): + await self.db.open() self._memory_peers = { - (peer.address, peer.udp_port): peer for peer in self.db.query(DHTPeer).all() + (peer.address, peer.udp_port): peer for peer in await self.db.all_peers() } @property @@ -146,20 +185,20 @@ class Crawler: return to_check def add_peers(self, *peers): - db_peers = [] for peer in peers: db_peer = self.get_from_peer(peer) - if db_peer and db_peer.node_id is None and peer.node_id: - db_peer.node_id = peer.node_id.hex() + if db_peer and db_peer.node_id is None and peer.node_id is not None: + db_peer = replace(db_peer, node_id=peer.node_id.hex()) elif not db_peer: - db_peer = DHTPeer.from_kad_peer(peer) - self._memory_peers[(peer.address, peer.udp_port)] = db_peer - db_peer.last_seen = datetime.datetime.utcnow() - db_peers.append(db_peer) + db_peer = DHTPeer.from_kad_peer(peer, len(self._memory_peers) + 1) + db_peer = replace(db_peer, last_seen=datetime.datetime.utcnow()) + self._memory_peers[(peer.address, peer.udp_port)] = db_peer - def flush_to_db(self): - self.db.add_all(self._memory_peers.values()) - self.db.commit() + async def flush_to_db(self): + await self.db.save_peers(*self._memory_peers.values()) + connections_to_save = self._connections + self._connections = {} + await self.db.save_connections(connections_to_save) def get_from_peer(self, peer): return self._memory_peers.get((peer.address, peer.udp_port), None) @@ -167,71 +206,73 @@ class Crawler: def set_latency(self, peer, latency=None): if latency: self.host_latency_metric.labels(host=peer.address, port=peer.udp_port).set(latency) - db_peer = self.get_from_peer(peer) - db_peer.latency = latency + db_peer = replace(self.get_from_peer(peer), latency=latency) if not db_peer.node_id and peer.node_id: - db_peer.node_id = peer.node_id.hex() + db_peer = replace(db_peer, node_id=peer.node_id.hex()) if db_peer.first_online and latency is None: - db_peer.last_churn = (datetime.datetime.utcnow() - db_peer.first_online).seconds + db_peer = replace(db_peer, last_churn=(datetime.datetime.utcnow() - db_peer.first_online).seconds) elif latency is not None and db_peer.first_online is None: - db_peer.first_online = datetime.datetime.utcnow() - db_peer.last_check = datetime.datetime.utcnow() + db_peer = replace(db_peer, first_online=datetime.datetime.utcnow()) + db_peer = replace(db_peer, last_check=datetime.datetime.utcnow()) + self._memory_peers[(db_peer.address, db_peer.udp_port)] = db_peer def inc_errors(self, peer): db_peer = self.get_from_peer(peer) - db_peer.errors = (db_peer.errors or 0) + 1 + self._memory_peers[(peer.address, peer.node_id)] = replace(db_peer, errors=(db_peer.errors or 0) + 1) - def associate_peers(self, target_peer_id, db_peer_ids): - return # todo + def associate_peers(self, peer, other_peers): + self._connections[self.get_from_peer(peer).peer_id] = [ + self.get_from_peer(other_peer).peer_id for other_peer in other_peers] async def request_peers(self, host, port, key) -> typing.List['KademliaPeer']: - async with self.semaphore: - peer = make_kademlia_peer(None, await resolve_host(host, port, 'udp'), port) - for attempt in range(3): - try: - req_start = time.perf_counter_ns() - response = await self.node.protocol.get_rpc_peer(peer).find_node(key) - latency = time.perf_counter_ns() - req_start - self.set_latency(make_kademlia_peer(key, host, port), latency) - return [make_kademlia_peer(*peer_tuple) for peer_tuple in response] - except asyncio.TimeoutError: - self.set_latency(make_kademlia_peer(key, host, port), None) - continue - except lbry.dht.error.RemoteException as e: - log.info('Peer errored: %s:%d attempt #%d - %s', - host, port, (attempt + 1), str(e)) - self.inc_errors(peer) - self.set_latency(make_kademlia_peer(key, host, port), None) - continue + peer = make_kademlia_peer(key, await resolve_host(host, port, 'udp'), port) + for attempt in range(3): + try: + req_start = time.perf_counter_ns() + response = await self.node.protocol.get_rpc_peer(peer).find_node(key) + await asyncio.sleep(0.05) + latency = time.perf_counter_ns() - req_start + self.set_latency(peer, latency) + return [make_kademlia_peer(*peer_tuple) for peer_tuple in response] + except asyncio.TimeoutError: + self.set_latency(peer, None) + continue + except lbry.dht.error.RemoteException as e: + log.info('Peer errored: %s:%d attempt #%d - %s', + host, port, (attempt + 1), str(e)) + self.inc_errors(peer) + self.set_latency(peer, None) + continue return [] async def crawl_routing_table(self, host, port, node_id=None): start = time.time() log.info("querying %s:%d", host, port) address = await resolve_host(host, port, 'udp') - self.add_peers(make_kademlia_peer(None, address, port)) key = node_id or self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port) + peer = make_kademlia_peer(key, address, port) + self.add_peers(peer) if not key: latency = None for _ in range(3): try: ping_start = time.perf_counter_ns() - async with self.semaphore: - await self.node.protocol.get_rpc_peer(make_kademlia_peer(None, address, port)).ping() - key = key or self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port) + await self.node.protocol.get_rpc_peer(peer).ping() + await asyncio.sleep(0.05) + key = key or self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port) + peer = make_kademlia_peer(key, address, port) latency = time.perf_counter_ns() - ping_start break except asyncio.TimeoutError: pass except lbry.dht.error.RemoteException: - self.inc_errors(make_kademlia_peer(None, address, port)) + self.inc_errors(peer) pass - self.set_latency(make_kademlia_peer(key, address, port), latency if key else None) - if not latency or not key: - if latency and not key: + self.set_latency(peer, latency if peer.node_id else None) + if not latency or not peer.node_id: + if latency and not peer.node_id: log.warning("No node id from %s:%d", host, port) return set() - node_id = key distance = Distance(key) max_distance = int.from_bytes(bytes([0xff] * 48), 'big') peers = set() @@ -248,7 +289,7 @@ class Crawler: next_jump = current_distance + int(max_distance // factor) # jump closer factor /= 2 if factor > 8 and next_jump < max_distance: - key = int.from_bytes(node_id, 'big') ^ next_jump + key = int.from_bytes(peer.node_id, 'big') ^ next_jump if key.bit_length() > 384: break key = key.to_bytes(48, 'big') @@ -262,8 +303,7 @@ class Crawler: self.add_peers(*peers) if peers: self.connections_found_metric.labels(host=host, port=port).set(len(peers)) - #self.associate_peers(this_peer_id, db_peer_ids) - self.db.commit() + self.associate_peers(peer, peers) return peers async def process(self): @@ -271,19 +311,17 @@ class Crawler: def submit(_peer): f = asyncio.ensure_future( - self.crawl_routing_table(_peer.address, peer.udp_port, bytes.fromhex(peer.node_id))) - to_process[_peer] = f - f.add_done_callback(lambda _: to_process.pop(_peer)) + self.crawl_routing_table(_peer.address, _peer.udp_port, bytes.fromhex(_peer.node_id))) + to_process[_peer.peer_id] = f + f.add_done_callback(lambda _: to_process.pop(_peer.peer_id)) to_check = self.get_peers_needing_check() last_flush = datetime.datetime.utcnow() while True: - for peer in to_check: - if peer not in to_process: + for peer in to_check[:200]: + if peer.peer_id not in to_process: submit(peer) - await asyncio.sleep(.1) - if len(to_process) > 100: - break + await asyncio.sleep(.05) await asyncio.sleep(0) self.unique_total_hosts_metric.labels("global").set(self.checked_peers_count) self.reachable_hosts_metric.labels("global").set(self.checked_peers_count - self.unreachable_peers_count) @@ -298,7 +336,7 @@ class Crawler: to_check = self.get_peers_needing_check() if (datetime.datetime.utcnow() - last_flush).seconds > 60: log.info("flushing to db") - self.flush_to_db() + await self.flush_to_db() last_flush = datetime.datetime.utcnow() while not to_check and not to_process: port = self.node.listening_port.get_extra_info('socket').getsockname()[1] @@ -332,10 +370,23 @@ class SimpleMetrics: await prom_site.start() +def dict_row_factory(cursor, row): + d = {} + for idx, col in enumerate(cursor.description): + if col[0] in ('added_on', 'first_online', 'last_seen', 'last_check'): + d[col[0]] = datetime.datetime.fromisoformat(row[idx]) if row[idx] else None + else: + d[col[0]] = row[idx] + return d + + async def test(): + asyncio.get_event_loop().set_debug(True) metrics = SimpleMetrics('8080') await metrics.start() crawler = Crawler("/tmp/a.db") + await crawler.open() + await crawler.flush_to_db() await crawler.node.start_listening() conf = Config() if crawler.active_peers_count < 100: From 0d6125de0ba2f052e9b845d323502acb77123143 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 16 Jul 2022 00:43:45 -0300 Subject: [PATCH 28/45] add sd_hash prober --- lbry/dht/peer.py | 7 +- lbry/dht/protocol/iterative_find.py | 24 +++--- lbry/extras/daemon/client.py | 2 +- scripts/dht_crawler.py | 110 ++++++++++++++++++++++++---- 4 files changed, 115 insertions(+), 28 deletions(-) diff --git a/lbry/dht/peer.py b/lbry/dht/peer.py index 8fef68181..db4635447 100644 --- a/lbry/dht/peer.py +++ b/lbry/dht/peer.py @@ -153,9 +153,10 @@ class PeerManager: def peer_is_good(self, peer: 'KademliaPeer'): return self.contact_triple_is_good(peer.node_id, peer.address, peer.udp_port) - def decode_tcp_peer_from_compact_address(self, compact_address: bytes) -> 'KademliaPeer': # pylint: disable=no-self-use - node_id, address, tcp_port = decode_compact_address(compact_address) - return make_kademlia_peer(node_id, address, udp_port=None, tcp_port=tcp_port) + +def decode_tcp_peer_from_compact_address(compact_address: bytes) -> 'KademliaPeer': # pylint: disable=no-self-use + node_id, address, tcp_port = decode_compact_address(compact_address) + return make_kademlia_peer(node_id, address, udp_port=None, tcp_port=tcp_port) @dataclass(unsafe_hash=True) diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index d41b3b2a0..b9678ea1e 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -8,7 +8,7 @@ from typing import TYPE_CHECKING from lbry.dht import constants from lbry.dht.error import RemoteException, TransportNotConnected from lbry.dht.protocol.distance import Distance -from lbry.dht.peer import make_kademlia_peer +from lbry.dht.peer import make_kademlia_peer, decode_tcp_peer_from_compact_address from lbry.dht.serialization.datagram import PAGE_KEY if TYPE_CHECKING: @@ -26,6 +26,15 @@ class FindResponse: def get_close_triples(self) -> typing.List[typing.Tuple[bytes, str, int]]: raise NotImplementedError() + def get_close_kademlia_peers(self, peer_info) -> typing.Generator[typing.Iterator['KademliaPeer'], None, None]: + for contact_triple in self.get_close_triples(): + node_id, address, udp_port = contact_triple + try: + yield make_kademlia_peer(node_id, address, udp_port) + except ValueError: + log.warning("misbehaving peer %s:%i returned peer with reserved ip %s:%i", peer_info.address, + peer_info.udp_port, address, udp_port) + class FindNodeResponse(FindResponse): def __init__(self, key: bytes, close_triples: typing.List[typing.Tuple[bytes, str, int]]): @@ -125,13 +134,8 @@ class IterativeFinder(AsyncIterator): async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse): self._add_active(peer) - for contact_triple in response.get_close_triples(): - node_id, address, udp_port = contact_triple - try: - self._add_active(make_kademlia_peer(node_id, address, udp_port)) - except ValueError: - log.warning("misbehaving peer %s:%i returned peer with reserved ip %s:%i", peer.address, - peer.udp_port, address, udp_port) + for new_peer in response.get_close_kademlia_peers(peer): + self._add_active(new_peer) self.check_result_ready(response) self._log_state(reason="check result") @@ -319,7 +323,7 @@ class IterativeValueFinder(IterativeFinder): decoded_peers = set() for compact_addr in parsed.found_compact_addresses: try: - decoded_peers.add(self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr)) + decoded_peers.add(decode_tcp_peer_from_compact_address(compact_addr)) except ValueError: log.warning("misbehaving peer %s:%i returned invalid peer for blob", peer.address, peer.udp_port) @@ -341,7 +345,7 @@ class IterativeValueFinder(IterativeFinder): def check_result_ready(self, response: FindValueResponse): if response.found: - blob_peers = [self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr) + blob_peers = [decode_tcp_peer_from_compact_address(compact_addr) for compact_addr in response.found_compact_addresses] to_yield = [] for blob_peer in blob_peers: diff --git a/lbry/extras/daemon/client.py b/lbry/extras/daemon/client.py index 7f0997320..9e9b1694a 100644 --- a/lbry/extras/daemon/client.py +++ b/lbry/extras/daemon/client.py @@ -1,5 +1,5 @@ -from lbry.conf import Config from lbry.extras.cli import execute_command +from lbry.conf import Config def daemon_rpc(conf: Config, method: str, **kwargs): diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index 7a76e1639..3aa872bcc 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -1,18 +1,21 @@ import datetime import logging import asyncio +import os.path +import random import time import typing from dataclasses import dataclass, astuple, replace from aiohttp import web -from prometheus_client import Gauge, generate_latest as prom_generate_latest +from prometheus_client import Gauge, generate_latest as prom_generate_latest, Counter import lbry.dht.error from lbry.dht.constants import generate_id from lbry.dht.node import Node -from lbry.dht.peer import make_kademlia_peer, PeerManager +from lbry.dht.peer import make_kademlia_peer, PeerManager, decode_tcp_peer_from_compact_address from lbry.dht.protocol.distance import Distance +from lbry.dht.protocol.iterative_find import FindValueResponse, FindNodeResponse, FindResponse from lbry.extras.daemon.storage import SQLiteMixin from lbry.conf import Config from lbry.utils import resolve_host @@ -22,6 +25,19 @@ logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(na log = logging.getLogger(__name__) +class SDHashSamples: + def __init__(self, samples_file_path): + with open(samples_file_path, "rb") as sample_file: + self._samples = sample_file.read() + assert len(self._samples) % 48 == 0 + self.size = len(self._samples) // 48 + + def read_samples(self, count=1): + for _ in range(count): + offset = 48 * random.randrange(0, self.size) + yield self._samples[offset:offset + 48] + + class PeerStorage(SQLiteMixin): CREATE_TABLES_QUERY = """ PRAGMA JOURNAL_MODE=WAL; @@ -139,11 +155,25 @@ class Crawler: "host_latency", "Time spent on the last request, in nanoseconds.", namespace="dht_crawler_node", labelnames=("host", "port") ) + probed_streams_metric = Counter( + "probed_streams", "Amount of streams probed.", namespace="dht_crawler_node", + labelnames=("sd_hash",) + ) + announced_streams_metric = Counter( + "announced_streams", "Amount of streams where announcements were found.", namespace="dht_crawler_node", + labelnames=("sd_hash",) + ) + working_streams_metric = Counter( + "working_streams", "Amount of streams with reachable hosts.", namespace="dht_crawler_node", + labelnames=("sd_hash",) + ) - def __init__(self, db_path: str): + def __init__(self, db_path: str, sd_hash_samples: SDHashSamples): self.node = new_node() self.db = PeerStorage(db_path) + self.sd_hashes = sd_hash_samples self._memory_peers = {} + self._reachable_by_node_id = {} self._connections = {} async def open(self): @@ -151,6 +181,46 @@ class Crawler: self._memory_peers = { (peer.address, peer.udp_port): peer for peer in await self.db.all_peers() } + self.refresh_reachable_set() + + def refresh_reachable_set(self): + self._reachable_by_node_id = { + bytes.fromhex(peer.node_id): peer for peer in self._memory_peers.values() if (peer.latency or 0) > 0 + } + + async def probe_files(self): + if not self.sd_hashes: + return + while True: + for sd_hash in self.sd_hashes.read_samples(10_000): + self.refresh_reachable_set() + print(sd_hash.hex()) + distance = Distance(sd_hash) + node_ids = list(self._reachable_by_node_id.keys()) + node_ids.sort(key=lambda node_id: distance(node_id)) + k_closest = [self._reachable_by_node_id[node_id] for node_id in node_ids[:8]] + for response in asyncio.as_completed( + [self.request_peers(peer.address, peer.udp_port, peer.node_id, sd_hash) for peer in k_closest]): + response = await response + self.probed_streams_metric.labels(sd_hash).inc() + if response and response.found: + self.announced_streams_metric.labels(sd_hash).inc() + blob_peers = [decode_tcp_peer_from_compact_address(compact_addr) + for compact_addr in response.found_compact_addresses] + print('FOUND', blob_peers, response.pages) + for blob_peer in blob_peers: + response = await self.request_peers(blob_peer.address, blob_peer.tcp_port, blob_peer.node_id, sd_hash) + if response: + self.working_streams_metric.labels(sd_hash).inc() + print('ALIVE', blob_peer.address) + if response.found: + blob_peers = [decode_tcp_peer_from_compact_address(compact_addr) + for compact_addr in response.found_compact_addresses] + print('REPLIED+FOUND', blob_peers, response.pages) + else: + print('DEAD', blob_peer.address, blob_peer.tcp_port) + else: + print('NOT FOUND', response) @property def refresh_limit(self): @@ -206,7 +276,10 @@ class Crawler: def set_latency(self, peer, latency=None): if latency: self.host_latency_metric.labels(host=peer.address, port=peer.udp_port).set(latency) - db_peer = replace(self.get_from_peer(peer), latency=latency) + db_peer = self.get_from_peer(peer) + if not db_peer: + return + db_peer = replace(db_peer, latency=latency) if not db_peer.node_id and peer.node_id: db_peer = replace(db_peer, node_id=peer.node_id.hex()) if db_peer.first_online and latency is None: @@ -224,16 +297,22 @@ class Crawler: self._connections[self.get_from_peer(peer).peer_id] = [ self.get_from_peer(other_peer).peer_id for other_peer in other_peers] - async def request_peers(self, host, port, key) -> typing.List['KademliaPeer']: + async def request_peers(self, host, port, node_id, key=None) -> typing.Optional[FindResponse]: + key = key or node_id peer = make_kademlia_peer(key, await resolve_host(host, port, 'udp'), port) for attempt in range(3): try: req_start = time.perf_counter_ns() - response = await self.node.protocol.get_rpc_peer(peer).find_node(key) + if key == node_id: + response = await self.node.protocol.get_rpc_peer(peer).find_node(key) + response = FindNodeResponse(key, response) + else: + response = await self.node.protocol.get_rpc_peer(peer).find_value(key) + response = FindValueResponse(key, response) await asyncio.sleep(0.05) latency = time.perf_counter_ns() - req_start self.set_latency(peer, latency) - return [make_kademlia_peer(*peer_tuple) for peer_tuple in response] + return response except asyncio.TimeoutError: self.set_latency(peer, None) continue @@ -243,11 +322,10 @@ class Crawler: self.inc_errors(peer) self.set_latency(peer, None) continue - return [] async def crawl_routing_table(self, host, port, node_id=None): start = time.time() - log.info("querying %s:%d", host, port) + log.debug("querying %s:%d", host, port) address = await resolve_host(host, port, 'udp') key = node_id or self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port) peer = make_kademlia_peer(key, address, port) @@ -278,7 +356,8 @@ class Crawler: peers = set() factor = 2048 for i in range(200): - new_peers = await self.request_peers(address, port, key) + response = await self.request_peers(address, port, key) + new_peers = list(response.get_close_kademlia_peers(peer)) if response else None if not new_peers: break new_peers.sort(key=lambda peer: distance(peer.node_id)) @@ -298,8 +377,9 @@ class Crawler: else: key = far_key factor = 2048 - log.info("Done querying %s:%d in %.2f seconds: %d peers found over %d requests.", - host, port, (time.time() - start), len(peers), i) + if peers: + log.info("Done querying %s:%d in %.2f seconds: %d peers found over %d requests.", + host, port, (time.time() - start), len(peers), i) self.add_peers(*peers) if peers: self.connections_found_metric.labels(host=host, port=port).set(len(peers)) @@ -384,14 +464,16 @@ async def test(): asyncio.get_event_loop().set_debug(True) metrics = SimpleMetrics('8080') await metrics.start() - crawler = Crawler("/tmp/a.db") + conf = Config() + hosting_samples = SDHashSamples("test.sample") if os.path.isfile("test.sample") else None + crawler = Crawler("/tmp/a.db", hosting_samples) await crawler.open() await crawler.flush_to_db() await crawler.node.start_listening() - conf = Config() if crawler.active_peers_count < 100: for (host, port) in conf.known_dht_nodes: await crawler.crawl_routing_table(host, port) + probe_task = asyncio.ensure_future(crawler.probe_files()) await crawler.process() if __name__ == '__main__': From e2922a434febf19eda46b6d659fbb3f3fa93717f Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 16 Jul 2022 00:43:56 -0300 Subject: [PATCH 29/45] add script to generate probe dataset --- scripts/sd_hash_sampler.py | 44 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 scripts/sd_hash_sampler.py diff --git a/scripts/sd_hash_sampler.py b/scripts/sd_hash_sampler.py new file mode 100644 index 000000000..f651cd1be --- /dev/null +++ b/scripts/sd_hash_sampler.py @@ -0,0 +1,44 @@ +import asyncio +from typing import Iterable + +from lbry.extras.daemon.client import daemon_rpc +from lbry.conf import Config +conf = Config() + + +async def sample_prefix(prefix: bytes): + result = await daemon_rpc(conf, "claim_search", sd_hash=prefix.hex(), page_size=50) + total_pages = result['total_pages'] + print(total_pages) + sd_hashes = set() + for page in range(1, total_pages + 1): + if page > 1: + result = await daemon_rpc(conf, "claim_search", sd_hash=prefix.hex(), page=page, page_size=50) + for item in result['items']: + sd_hash = item.get('value', {}).get('source', {}).get('sd_hash') + if not sd_hash: + print('err', item) + continue + sd_hashes.add(sd_hash) + print('page', page, len(sd_hashes)) + return sd_hashes + + +def save_sample(name: str, samples: Iterable[str]): + with open(name, 'wb') as outfile: + for sample in samples: + outfile.write(bytes.fromhex(sample)) + outfile.flush() + print(outfile.tell()) + + +async def main(): + samples = set() + futs = [asyncio.ensure_future(sample_prefix(bytes([i]))) for i in range(256)] + for i, completed in enumerate(asyncio.as_completed(futs)): + samples.update(await completed) + print(i, len(samples)) + print(save_sample("test.sample", samples)) + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file From 43b45a939b9e9c5f1af5123e03d9a5c1a660a1f8 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 21 Jul 2022 15:51:43 -0300 Subject: [PATCH 30/45] format logging --- scripts/dht_crawler.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index 3aa872bcc..d35ee7216 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -194,7 +194,7 @@ class Crawler: while True: for sd_hash in self.sd_hashes.read_samples(10_000): self.refresh_reachable_set() - print(sd_hash.hex()) + log.info("Querying stream %s for peers.", sd_hash[:8]) distance = Distance(sd_hash) node_ids = list(self._reachable_by_node_id.keys()) node_ids.sort(key=lambda node_id: distance(node_id)) @@ -207,20 +207,16 @@ class Crawler: self.announced_streams_metric.labels(sd_hash).inc() blob_peers = [decode_tcp_peer_from_compact_address(compact_addr) for compact_addr in response.found_compact_addresses] - print('FOUND', blob_peers, response.pages) for blob_peer in blob_peers: response = await self.request_peers(blob_peer.address, blob_peer.tcp_port, blob_peer.node_id, sd_hash) if response: self.working_streams_metric.labels(sd_hash).inc() - print('ALIVE', blob_peer.address) - if response.found: - blob_peers = [decode_tcp_peer_from_compact_address(compact_addr) - for compact_addr in response.found_compact_addresses] - print('REPLIED+FOUND', blob_peers, response.pages) + log.info("Found responsive peer for %s: %s:%d(%d)", + sd_hash[:8], blob_peer.address, blob_peer.udp_port, blob_peer.tcp_port) else: - print('DEAD', blob_peer.address, blob_peer.tcp_port) - else: - print('NOT FOUND', response) + log.info("Found dead peer for %s: %s:%d(%d)", + sd_hash[:8], blob_peer.address, blob_peer.udp_port, blob_peer.tcp_port) + await asyncio.sleep(.5) @property def refresh_limit(self): From 9aa9ecdc0a90161e4ea407dc03e5c1ba89cf43f8 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 21 Jul 2022 15:52:19 -0300 Subject: [PATCH 31/45] add arg for db path --- scripts/dht_crawler.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index d35ee7216..64261f007 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -1,3 +1,4 @@ +import sys import datetime import logging import asyncio @@ -457,12 +458,13 @@ def dict_row_factory(cursor, row): async def test(): + db_path = "/tmp/peers.db" if len(sys.argv) == 1 else sys.argv[-1] asyncio.get_event_loop().set_debug(True) metrics = SimpleMetrics('8080') await metrics.start() conf = Config() hosting_samples = SDHashSamples("test.sample") if os.path.isfile("test.sample") else None - crawler = Crawler("/tmp/a.db", hosting_samples) + crawler = Crawler(db_path, hosting_samples) await crawler.open() await crawler.flush_to_db() await crawler.node.start_listening() From df77392fe0067983b3b034aafc77e84a0041cb60 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 23 Jul 2022 14:58:02 -0300 Subject: [PATCH 32/45] dht crawler:improve logging, metrics, make startup concurrent --- scripts/dht_crawler.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index 64261f007..3345a6852 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -158,15 +158,15 @@ class Crawler: ) probed_streams_metric = Counter( "probed_streams", "Amount of streams probed.", namespace="dht_crawler_node", - labelnames=("sd_hash",) + labelnames=("scope",) ) announced_streams_metric = Counter( "announced_streams", "Amount of streams where announcements were found.", namespace="dht_crawler_node", - labelnames=("sd_hash",) + labelnames=("scope",) ) working_streams_metric = Counter( "working_streams", "Amount of streams with reachable hosts.", namespace="dht_crawler_node", - labelnames=("sd_hash",) + labelnames=("scope",) ) def __init__(self, db_path: str, sd_hash_samples: SDHashSamples): @@ -195,7 +195,7 @@ class Crawler: while True: for sd_hash in self.sd_hashes.read_samples(10_000): self.refresh_reachable_set() - log.info("Querying stream %s for peers.", sd_hash[:8]) + log.info("Querying stream %s for peers.", sd_hash.hex()[:8]) distance = Distance(sd_hash) node_ids = list(self._reachable_by_node_id.keys()) node_ids.sort(key=lambda node_id: distance(node_id)) @@ -203,20 +203,20 @@ class Crawler: for response in asyncio.as_completed( [self.request_peers(peer.address, peer.udp_port, peer.node_id, sd_hash) for peer in k_closest]): response = await response - self.probed_streams_metric.labels(sd_hash).inc() + self.probed_streams_metric.labels("global").inc() if response and response.found: - self.announced_streams_metric.labels(sd_hash).inc() + self.announced_streams_metric.labels("global").inc() blob_peers = [decode_tcp_peer_from_compact_address(compact_addr) for compact_addr in response.found_compact_addresses] for blob_peer in blob_peers: response = await self.request_peers(blob_peer.address, blob_peer.tcp_port, blob_peer.node_id, sd_hash) if response: - self.working_streams_metric.labels(sd_hash).inc() + self.working_streams_metric.labels("global").inc() log.info("Found responsive peer for %s: %s:%d(%d)", - sd_hash[:8], blob_peer.address, blob_peer.udp_port, blob_peer.tcp_port) + sd_hash.hex()[:8], blob_peer.address, blob_peer.udp_port, blob_peer.tcp_port) else: log.info("Found dead peer for %s: %s:%d(%d)", - sd_hash[:8], blob_peer.address, blob_peer.udp_port, blob_peer.tcp_port) + sd_hash.hex()[:8], blob_peer.address, blob_peer.udp_port, blob_peer.tcp_port) await asyncio.sleep(.5) @property @@ -469,8 +469,10 @@ async def test(): await crawler.flush_to_db() await crawler.node.start_listening() if crawler.active_peers_count < 100: + probes = [] for (host, port) in conf.known_dht_nodes: - await crawler.crawl_routing_table(host, port) + probes.append(asyncio.create_task(crawler.crawl_routing_table(host, port))) + await asyncio.gather(*probes) probe_task = asyncio.ensure_future(crawler.probe_files()) await crawler.process() From da2ffb000ede4972052fec69480caee57481cc09 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 25 Jul 2022 09:27:45 -0300 Subject: [PATCH 33/45] skip peers with bad ports without raising --- scripts/dht_crawler.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index 3345a6852..a2f34d11b 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -206,8 +206,12 @@ class Crawler: self.probed_streams_metric.labels("global").inc() if response and response.found: self.announced_streams_metric.labels("global").inc() - blob_peers = [decode_tcp_peer_from_compact_address(compact_addr) - for compact_addr in response.found_compact_addresses] + blob_peers = [] + for compact_addr in response.found_compact_addresses: + try: + blob_peers.append(decode_tcp_peer_from_compact_address(compact_addr)) + except ValueError as e: + log.error("Error decoding compact peers: %s", e) for blob_peer in blob_peers: response = await self.request_peers(blob_peer.address, blob_peer.tcp_port, blob_peer.node_id, sd_hash) if response: From b5c390ca0440a057e53b27a9abd8521724b8e159 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 25 Jul 2022 09:50:26 -0300 Subject: [PATCH 34/45] docker: add volume declaration --- docker/Dockerfile.dht_node | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docker/Dockerfile.dht_node b/docker/Dockerfile.dht_node index 9399911d9..ce91ec961 100644 --- a/docker/Dockerfile.dht_node +++ b/docker/Dockerfile.dht_node @@ -2,6 +2,7 @@ FROM debian:10-slim ARG user=lbry ARG projects_dir=/home/$user +ARG db_dir=/database ARG DOCKER_TAG ARG DOCKER_COMMIT=docker @@ -27,6 +28,8 @@ RUN groupadd -g 999 $user && useradd -m -u 999 -g $user $user COPY . $projects_dir RUN chown -R $user:$user $projects_dir +RUN mkdir -p $db_dir +RUN chown -R $user:$user $db_dir USER $user WORKDIR $projects_dir @@ -35,5 +38,6 @@ RUN python3 -m pip install -U setuptools pip RUN make install RUN python3 docker/set_build.py RUN rm ~/.cache -rf +VOLUME $db_dir ENTRYPOINT ["python3", "scripts/dht_node.py"] From cc64789e96bd01cfdedc1bbe68dcf1b7ef9138ee Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 25 Jul 2022 11:28:26 -0300 Subject: [PATCH 35/45] dht_crawler: fix logging for missing ports --- scripts/dht_crawler.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index a2f34d11b..f5107b9e9 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -217,10 +217,12 @@ class Crawler: if response: self.working_streams_metric.labels("global").inc() log.info("Found responsive peer for %s: %s:%d(%d)", - sd_hash.hex()[:8], blob_peer.address, blob_peer.udp_port, blob_peer.tcp_port) + sd_hash.hex()[:8], blob_peer.address, + blob_peer.udp_port or -1, blob_peer.tcp_port or -1) else: log.info("Found dead peer for %s: %s:%d(%d)", - sd_hash.hex()[:8], blob_peer.address, blob_peer.udp_port, blob_peer.tcp_port) + sd_hash.hex()[:8], blob_peer.address, + blob_peer.udp_port or -1, blob_peer.tcp_port or -1) await asyncio.sleep(.5) @property From 5e58c2f2245b90fe1653972f8d5985abf951fc17 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 26 Jul 2022 23:35:53 -0300 Subject: [PATCH 36/45] fix hosting metrics, improve logging --- scripts/dht_crawler.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index f5107b9e9..658b0eeb6 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -195,17 +195,17 @@ class Crawler: while True: for sd_hash in self.sd_hashes.read_samples(10_000): self.refresh_reachable_set() - log.info("Querying stream %s for peers.", sd_hash.hex()[:8]) distance = Distance(sd_hash) node_ids = list(self._reachable_by_node_id.keys()) node_ids.sort(key=lambda node_id: distance(node_id)) k_closest = [self._reachable_by_node_id[node_id] for node_id in node_ids[:8]] + found = False + working = False for response in asyncio.as_completed( [self.request_peers(peer.address, peer.udp_port, peer.node_id, sd_hash) for peer in k_closest]): response = await response - self.probed_streams_metric.labels("global").inc() if response and response.found: - self.announced_streams_metric.labels("global").inc() + found = True blob_peers = [] for compact_addr in response.found_compact_addresses: try: @@ -215,7 +215,7 @@ class Crawler: for blob_peer in blob_peers: response = await self.request_peers(blob_peer.address, blob_peer.tcp_port, blob_peer.node_id, sd_hash) if response: - self.working_streams_metric.labels("global").inc() + working = True log.info("Found responsive peer for %s: %s:%d(%d)", sd_hash.hex()[:8], blob_peer.address, blob_peer.udp_port or -1, blob_peer.tcp_port or -1) @@ -223,6 +223,12 @@ class Crawler: log.info("Found dead peer for %s: %s:%d(%d)", sd_hash.hex()[:8], blob_peer.address, blob_peer.udp_port or -1, blob_peer.tcp_port or -1) + self.probed_streams_metric.labels("global").inc() + if found: + self.announced_streams_metric.labels("global").inc() + if working: + self.working_streams_metric.labels("global").inc() + log.info("Done querying stream %s for peers. Found: %s, working: %s", sd_hash.hex()[:8], found, working) await asyncio.sleep(.5) @property @@ -358,7 +364,7 @@ class Crawler: max_distance = int.from_bytes(bytes([0xff] * 48), 'big') peers = set() factor = 2048 - for i in range(200): + for i in range(1000): response = await self.request_peers(address, port, key) new_peers = list(response.get_close_kademlia_peers(peer)) if response else None if not new_peers: @@ -479,6 +485,7 @@ async def test(): for (host, port) in conf.known_dht_nodes: probes.append(asyncio.create_task(crawler.crawl_routing_table(host, port))) await asyncio.gather(*probes) + await crawler.flush_to_db() probe_task = asyncio.ensure_future(crawler.probe_files()) await crawler.process() From f077e56cec29d2bfa6c63019ad0e16cbd8ced8cb Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 27 Jul 2022 10:29:01 -0300 Subject: [PATCH 37/45] dht_crawler:only count latency during findNode --- scripts/dht_crawler.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index 658b0eeb6..b4c1043d3 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -315,21 +315,23 @@ class Crawler: if key == node_id: response = await self.node.protocol.get_rpc_peer(peer).find_node(key) response = FindNodeResponse(key, response) + latency = time.perf_counter_ns() - req_start + self.set_latency(peer, latency) else: response = await self.node.protocol.get_rpc_peer(peer).find_value(key) response = FindValueResponse(key, response) await asyncio.sleep(0.05) - latency = time.perf_counter_ns() - req_start - self.set_latency(peer, latency) return response except asyncio.TimeoutError: - self.set_latency(peer, None) + if key == node_id: + self.set_latency(peer, None) continue except lbry.dht.error.RemoteException as e: log.info('Peer errored: %s:%d attempt #%d - %s', host, port, (attempt + 1), str(e)) - self.inc_errors(peer) - self.set_latency(peer, None) + if key == node_id: + self.inc_errors(peer) + self.set_latency(peer, None) continue async def crawl_routing_table(self, host, port, node_id=None): From c38573d5de74ccfc9bfdae8775b4fbc33fadc1da Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 28 Jul 2022 21:40:01 -0300 Subject: [PATCH 38/45] dht_crawler: gather both loops, avoid task exceptions being hidden --- scripts/dht_crawler.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index b4c1043d3..9da3283b3 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -488,8 +488,7 @@ async def test(): probes.append(asyncio.create_task(crawler.crawl_routing_table(host, port))) await asyncio.gather(*probes) await crawler.flush_to_db() - probe_task = asyncio.ensure_future(crawler.probe_files()) - await crawler.process() + await asyncio.gather(crawler.process(), crawler.probe_files()) if __name__ == '__main__': asyncio.run(test()) From d0497cf6b50a8e657d0f1bc02a3482d8ff0936fa Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 28 Jul 2022 21:40:55 -0300 Subject: [PATCH 39/45] dht_crawler: skip saving connections for now --- scripts/dht_crawler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index 9da3283b3..3766b3e99 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -277,7 +277,7 @@ class Crawler: await self.db.save_peers(*self._memory_peers.values()) connections_to_save = self._connections self._connections = {} - await self.db.save_connections(connections_to_save) + # await self.db.save_connections(connections_to_save) heavy call def get_from_peer(self, peer): return self._memory_peers.get((peer.address, peer.udp_port), None) From 0e7a1aee0ad17f086f7c9a55b68ba0f08ad18c80 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 28 Jul 2022 22:03:38 -0300 Subject: [PATCH 40/45] dht_crawler: clean in memory set for expired peers --- scripts/dht_crawler.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index 3766b3e99..d7d2d1979 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -71,7 +71,10 @@ class PeerStorage(SQLiteMixin): self.db.writer_connection.row_factory = dict_row_factory async def all_peers(self): - return [DHTPeer(**peer) for peer in await self.db.execute_fetchall("select * from peer")] + return [ + DHTPeer(**peer) for peer in await self.db.execute_fetchall( + "select * from peer where latency > 0 or last_seen < datetime('now', '-1h')") + ] async def save_peers(self, *peers): log.info("Saving graph nodes (peers) to DB") @@ -263,6 +266,11 @@ class Crawler: to_check = [peer for peer in self.all_peers if peer.last_check is None or peer.last_check < self.refresh_limit] return to_check + def remove_expired_peers(self): + for key, peer in list(self._memory_peers.items()): + if (peer.latency or 0) < 1 and peer.last_seen < self.refresh_limit: + del self._memory_peers[key] + def add_peers(self, *peers): for peer in peers: db_peer = self.get_from_peer(peer) @@ -278,6 +286,7 @@ class Crawler: connections_to_save = self._connections self._connections = {} # await self.db.save_connections(connections_to_save) heavy call + self.remove_expired_peers() def get_from_peer(self, peer): return self._memory_peers.get((peer.address, peer.udp_port), None) From ab67f417eeadb41cbe5978942f3b2ad2881b3d9e Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 29 Jul 2022 11:08:56 -0300 Subject: [PATCH 41/45] dht_crawler: wait and retry during port switch --- scripts/dht_crawler.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index d7d2d1979..e2bd0b5a1 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -335,6 +335,9 @@ class Crawler: if key == node_id: self.set_latency(peer, None) continue + except lbry.dht.error.TransportNotConnected: + log.info("Transport unavailable, waiting 1s to retry") + await asyncio.sleep(1) except lbry.dht.error.RemoteException as e: log.info('Peer errored: %s:%d attempt #%d - %s', host, port, (attempt + 1), str(e)) From 0b059a5445b5db6caa5df460081a374f6effbd4f Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 11 Aug 2022 22:00:41 -0300 Subject: [PATCH 42/45] use a histogram for latency, remove labels --- scripts/dht_crawler.py | 40 +++++++++++++++++----------------------- 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index e2bd0b5a1..8d7290f5b 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -9,7 +9,7 @@ import typing from dataclasses import dataclass, astuple, replace from aiohttp import web -from prometheus_client import Gauge, generate_latest as prom_generate_latest, Counter +from prometheus_client import Gauge, generate_latest as prom_generate_latest, Counter, Histogram import lbry.dht.error from lbry.dht.constants import generate_id @@ -133,43 +133,37 @@ def new_node(address="0.0.0.0", udp_port=0, node_id=None): class Crawler: unique_total_hosts_metric = Gauge( "unique_total_hosts", "Number of unique hosts seen in the last interval", namespace="dht_crawler_node", - labelnames=("scope",) ) reachable_hosts_metric = Gauge( "reachable_hosts", "Number of hosts that replied in the last interval", namespace="dht_crawler_node", - labelnames=("scope",) ) total_historic_hosts_metric = Gauge( "history_total_hosts", "Number of hosts seen since first run.", namespace="dht_crawler_node", - labelnames=("scope",) ) pending_check_hosts_metric = Gauge( "pending_hosts", "Number of hosts on queue to be checked.", namespace="dht_crawler_node", - labelnames=("scope",) ) hosts_with_errors_metric = Gauge( "error_hosts", "Number of hosts that raised errors during contact.", namespace="dht_crawler_node", - labelnames=("scope",) ) connections_found_metric = Gauge( "connections_found", "Number of hosts returned by the last successful contact.", namespace="dht_crawler_node", - labelnames=("host", "port") ) - host_latency_metric = Gauge( - "host_latency", "Time spent on the last request, in nanoseconds.", namespace="dht_crawler_node", - labelnames=("host", "port") + LATENCY_HISTOGRAM_BUCKETS = ( + 0., 5., 10., 15., 30., 60., 120., 180., 240., 300., 600., 1200., 1800., 4000., 6000., float('inf') + ) + host_latency_metric = Histogram( + "host_latency", "Time spent on the last request, in milliseconds.", namespace="dht_crawler_node", + buckets=LATENCY_HISTOGRAM_BUCKETS ) probed_streams_metric = Counter( "probed_streams", "Amount of streams probed.", namespace="dht_crawler_node", - labelnames=("scope",) ) announced_streams_metric = Counter( "announced_streams", "Amount of streams where announcements were found.", namespace="dht_crawler_node", - labelnames=("scope",) ) working_streams_metric = Counter( "working_streams", "Amount of streams with reachable hosts.", namespace="dht_crawler_node", - labelnames=("scope",) ) def __init__(self, db_path: str, sd_hash_samples: SDHashSamples): @@ -226,11 +220,11 @@ class Crawler: log.info("Found dead peer for %s: %s:%d(%d)", sd_hash.hex()[:8], blob_peer.address, blob_peer.udp_port or -1, blob_peer.tcp_port or -1) - self.probed_streams_metric.labels("global").inc() + self.probed_streams_metric.inc() if found: - self.announced_streams_metric.labels("global").inc() + self.announced_streams_metric.inc() if working: - self.working_streams_metric.labels("global").inc() + self.working_streams_metric.inc() log.info("Done querying stream %s for peers. Found: %s, working: %s", sd_hash.hex()[:8], found, working) await asyncio.sleep(.5) @@ -293,7 +287,7 @@ class Crawler: def set_latency(self, peer, latency=None): if latency: - self.host_latency_metric.labels(host=peer.address, port=peer.udp_port).set(latency) + self.host_latency_metric.observe(latency / 1_000_000.0) db_peer = self.get_from_peer(peer) if not db_peer: return @@ -405,7 +399,7 @@ class Crawler: host, port, (time.time() - start), len(peers), i) self.add_peers(*peers) if peers: - self.connections_found_metric.labels(host=host, port=port).set(len(peers)) + self.connections_found_metric.set(len(peers)) self.associate_peers(peer, peers) return peers @@ -426,11 +420,11 @@ class Crawler: submit(peer) await asyncio.sleep(.05) await asyncio.sleep(0) - self.unique_total_hosts_metric.labels("global").set(self.checked_peers_count) - self.reachable_hosts_metric.labels("global").set(self.checked_peers_count - self.unreachable_peers_count) - self.total_historic_hosts_metric.labels("global").set(len(self._memory_peers)) - self.pending_check_hosts_metric.labels("global").set(len(to_check)) - self.hosts_with_errors_metric.labels("global").set(self.peers_with_errors_count) + self.unique_total_hosts_metric.set(self.checked_peers_count) + self.reachable_hosts_metric.set(self.checked_peers_count - self.unreachable_peers_count) + self.total_historic_hosts_metric.set(len(self._memory_peers)) + self.pending_check_hosts_metric.set(len(to_check)) + self.hosts_with_errors_metric.set(self.peers_with_errors_count) log.info("%d known, %d contacted recently, %d unreachable, %d error, %d processing, %d on queue", self.active_peers_count, self.checked_peers_count, self.unreachable_peers_count, self.peers_with_errors_count, len(to_process), len(to_check)) From 8d9d2c76ae56828248b60dc312b07bb71bce1261 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 11 Aug 2022 22:17:58 -0300 Subject: [PATCH 43/45] routing table sizes as histogram --- scripts/dht_crawler.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index 8d7290f5b..c37188741 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -146,8 +146,12 @@ class Crawler: hosts_with_errors_metric = Gauge( "error_hosts", "Number of hosts that raised errors during contact.", namespace="dht_crawler_node", ) - connections_found_metric = Gauge( + ROUTING_TABLE_SIZE_HISTOGRAM_BUCKETS = ( + 0., 5., 10., 15., 20., 25., 30., 35., 40., 45., 60., 70., 80., 100., 200., 1000., 3000., float('inf') + ) + connections_found_metric = Histogram( "connections_found", "Number of hosts returned by the last successful contact.", namespace="dht_crawler_node", + buckets=ROUTING_TABLE_SIZE_HISTOGRAM_BUCKETS ) LATENCY_HISTOGRAM_BUCKETS = ( 0., 5., 10., 15., 30., 60., 120., 180., 240., 300., 600., 1200., 1800., 4000., 6000., float('inf') From c6c0228970cd3c0cc35d27a8be1ef87357330a4b Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 11 Aug 2022 22:56:46 -0300 Subject: [PATCH 44/45] fix crawler startup query --- scripts/dht_crawler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index c37188741..0c52e7028 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -73,7 +73,7 @@ class PeerStorage(SQLiteMixin): async def all_peers(self): return [ DHTPeer(**peer) for peer in await self.db.execute_fetchall( - "select * from peer where latency > 0 or last_seen < datetime('now', '-1h')") + "select * from peer where latency > 0 or last_seen > datetime('now', '-1 hour')") ] async def save_peers(self, *peers): From c7c2d6fe5adf87136f8c2a9eb1c48b67d1d82b98 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 11 Aug 2022 22:57:11 -0300 Subject: [PATCH 45/45] collect connections reachability --- scripts/dht_crawler.py | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index 0c52e7028..f39d9c1bf 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -146,13 +146,21 @@ class Crawler: hosts_with_errors_metric = Gauge( "error_hosts", "Number of hosts that raised errors during contact.", namespace="dht_crawler_node", ) - ROUTING_TABLE_SIZE_HISTOGRAM_BUCKETS = ( - 0., 5., 10., 15., 20., 25., 30., 35., 40., 45., 60., 70., 80., 100., 200., 1000., 3000., float('inf') + ROUTING_TABLE_SIZE_HISTOGRAM_BUCKETS = tuple(map(float, range(100))) + ( + 500., 1000., 2000., float('inf') ) connections_found_metric = Histogram( "connections_found", "Number of hosts returned by the last successful contact.", namespace="dht_crawler_node", buckets=ROUTING_TABLE_SIZE_HISTOGRAM_BUCKETS ) + known_connections_found_metric = Histogram( + "known_connections_found", "Number of already known hosts returned by last contact.", + namespace="dht_crawler_node", buckets=ROUTING_TABLE_SIZE_HISTOGRAM_BUCKETS + ) + reachable_connections_found_metric = Histogram( + "reachable_connections_found", "Number of reachable known hosts returned by last contact.", + namespace="dht_crawler_node", buckets=ROUTING_TABLE_SIZE_HISTOGRAM_BUCKETS + ) LATENCY_HISTOGRAM_BUCKETS = ( 0., 5., 10., 15., 30., 60., 120., 180., 240., 300., 600., 1200., 1800., 4000., 6000., float('inf') ) @@ -401,9 +409,17 @@ class Crawler: if peers: log.info("Done querying %s:%d in %.2f seconds: %d peers found over %d requests.", host, port, (time.time() - start), len(peers), i) - self.add_peers(*peers) if peers: - self.connections_found_metric.set(len(peers)) + self.connections_found_metric.observe(len(peers)) + known_peers = 0 + reachable_connections = 0 + for peer in peers: + known_peer = self.get_from_peer(peer) + known_peers += 1 if known_peer else 0 + reachable_connections += 1 if known_peer and (known_peer.latency or 0) > 0 else 0 + self.known_connections_found_metric.observe(known_peers) + self.reachable_connections_found_metric.observe(reachable_connections) + self.add_peers(*peers) self.associate_peers(peer, peers) return peers