diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index 8ab620385..1b55d486e 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -1,3 +1,4 @@ +import datetime import logging import asyncio import time @@ -8,13 +9,50 @@ from lbry.dht.constants import generate_id from lbry.dht.node import Node from lbry.dht.peer import make_kademlia_peer, PeerManager from lbry.dht.protocol.distance import Distance -from lbry.extras.daemon.storage import SQLiteStorage +from lbry.extras.daemon.storage import SQLiteMixin from lbry.conf import Config from lbry.utils import resolve_host +from sqlalchemy.orm import declarative_base, relationship +import sqlalchemy as sqla + + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s") log = logging.getLogger(__name__) +Base = declarative_base() + + +class DHTPeer(Base): + __tablename__ = "peer" + peer_id = sqla.Column(sqla.Integer(), sqla.Identity(), primary_key=True) + node_id = sqla.Column(sqla.String(96)) + address = sqla.Column(sqla.String()) + udp_port = sqla.Column(sqla.Integer()) + tcp_port = sqla.Column(sqla.Integer()) + first_online = sqla.Column(sqla.DateTime()) + errors = sqla.Column(sqla.Integer(), default=0) + last_churn = sqla.Column(sqla.Integer()) + added_on = sqla.Column(sqla.DateTime(), nullable=False, default=datetime.datetime.utcnow) + last_check = sqla.Column(sqla.DateTime()) + last_seen = sqla.Column(sqla.DateTime()) + latency = sqla.Column(sqla.Integer()) + endpoint_unique = sqla.UniqueConstraint("node_id", "udp_port") + + @classmethod + def from_kad_peer(cls, peer): + return DHTPeer(node_id=peer.node_id, address=peer.address, udp_port=peer.udp_port, tcp_port=peer.tcp_port) + + def to_kad_peer(self): + return make_kademlia_peer(self.node_id, self.address, self.udp_port, self.tcp_port) + + +class DHTConnection(Base): + __tablename__ = "connection" + from_peer_id = sqla.Column(sqla.Integer(), sqla.ForeignKey("peer.peer_id"), primary_key=True) + connected_by = relationship("DHTPeer", backref="known_by", primaryjoin=(DHTPeer.peer_id == from_peer_id)) + to_peer_id = sqla.Column(sqla.Integer(), sqla.ForeignKey("peer.peer_id"), primary_key=True) + connected_to = relationship("DHTPeer", backref="connections", primaryjoin=(DHTPeer.peer_id == to_peer_id)) def new_node(address="0.0.0.0", udp_port=4444, node_id=None): @@ -24,13 +62,85 @@ def new_node(address="0.0.0.0", udp_port=4444, node_id=None): class Crawler: - def __init__(self): + def __init__(self, db_path: str): self.node = new_node() - self.crawled = set() - self.known_peers = set() - self.unreachable = set() - self.error = set() - self.semaphore = asyncio.Semaphore(10) + self.semaphore = asyncio.Semaphore(20) + engine = sqla.create_engine(f"sqlite:///{db_path}") + Base.metadata.create_all(engine) + session = sqla.orm.sessionmaker(engine) + self.db = session() + + @property + def recent_peers_query(self): + half_hour_ago = datetime.datetime.utcnow() - datetime.timedelta(minutes=30) + return self.db.query(DHTPeer).filter(DHTPeer.last_seen > half_hour_ago) + + @property + def all_peers(self): + return set([peer.to_kad_peer() for peer in self.recent_peers_query.all()]) + + @property + def unreachable_peers_count(self): + return self.recent_peers_query.filter(DHTPeer.latency == None).count() + + @property + def peers_with_errors_count(self): + return self.recent_peers_query.filter(DHTPeer.errors > 0).count() + + def get_peers_needing_check(self): + half_hour_ago = datetime.datetime.utcnow() - datetime.timedelta(minutes=30) + return set([peer.to_kad_peer() for peer in self.recent_peers_query.filter( + sqla.or_(DHTPeer.last_check == None, DHTPeer.last_check < half_hour_ago)).all()]) + + def add_peers(self, *peers): + for peer in peers: + db_peer = self.get_from_peer(peer) + if db_peer and db_peer.node_id is None and peer.node_id: + db_peer.node_id = peer.node_id + self.db.add(db_peer) + elif not db_peer: + self.db.add(DHTPeer.from_kad_peer(peer)) + self.db.commit() + + def get_from_peer(self, peer): + return self.db.query(DHTPeer).filter(DHTPeer.address==peer.address, DHTPeer.udp_port==peer.udp_port).first() + + def set_latency(self, peer, latency=None): + db_peer = self.get_from_peer(peer) + db_peer.latency = latency + if not db_peer.node_id: + db_peer.node_id = peer.node_id + if db_peer.first_online and latency is None: + db_peer.last_churn = (datetime.datetime.utcnow() - db_peer.first_online).seconds + elif latency is not None and db_peer.first_online is None: + db_peer.first_online = datetime.datetime.utcnow() + db_peer.last_check = datetime.datetime.utcnow() + self.db.add(db_peer) + self.db.commit() + + def inc_errors(self, peer): + db_peer = self.get_from_peer(peer) + db_peer.errors += 1 + self.db.add(db_peer) + self.db.commit() + + def count_peers(self): + return self.db.query(DHTPeer).count() + + def associate_peers(self, target_peer, peers): + db_peer = self.get_from_peer(target_peer) + connections = [ + DHTConnection( + from_peer_id=db_peer.peer_id, + to_peer_id=self.get_from_peer(peer).peer_id) + for peer in peers + ] + for peer in peers: + self.db.query(DHTPeer).filter(DHTPeer.address == peer.address, DHTPeer.udp_port == peer.udp_port).update( + {DHTPeer.last_seen: datetime.datetime.utcnow()}) + self.db.query(DHTConnection).filter(DHTConnection.from_peer_id == db_peer.peer_id).delete() + self.db.add_all(connections) + self.db.commit() async def request_peers(self, host, port, key) -> typing.List['KademliaPeer']: async with self.semaphore: @@ -45,30 +155,32 @@ class Crawler: except lbry.dht.error.RemoteException as e: log.info('Previously responding peer errored: %s:%d attempt #%d - %s', host, port, (attempt + 1), str(e)) - self.error.add((host, port)) + self.inc_errors(peer) continue return [] async def crawl_routing_table(self, host, port): start = time.time() log.info("querying %s:%d", host, port) - self.known_peers.add((host, port)) - self.crawled.add((host, port)) address = await resolve_host(host, port, 'udp') + self.add_peers(make_kademlia_peer(None, address, port)) key = self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port) - if not key: - for _ in range(3): - try: - async with self.semaphore: - await self.node.protocol.get_rpc_peer(make_kademlia_peer(None, address, port)).ping() - key = self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port) - except asyncio.TimeoutError: - pass - except lbry.dht.error.RemoteException: - self.error.add((host, port)) - if not key: - self.unreachable.add((host, port)) - return set() + latency = None + for _ in range(3): + try: + async with self.semaphore: + ping_start = time.perf_counter_ns() + await self.node.protocol.get_rpc_peer(make_kademlia_peer(None, address, port)).ping() + key = self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port) + latency = time.perf_counter_ns() - ping_start + except asyncio.TimeoutError: + pass + except lbry.dht.error.RemoteException: + self.inc_errors(make_kademlia_peer(None, address, port)) + pass + self.set_latency(make_kademlia_peer(key, address, port), latency) + if not latency: + return set() node_id = key distance = Distance(key) max_distance = int.from_bytes(bytes([0xff] * 48), 'big') @@ -98,7 +210,8 @@ class Crawler: factor = 2048 log.info("Done querying %s:%d in %.2f seconds: %d peers found over %d requests.", host, port, (time.time() - start), len(peers), i) - self.crawled.update(peers) + self.add_peers(*peers) + self.associate_peers(make_kademlia_peer(key, address, port), peers) return peers async def process(self): @@ -109,21 +222,31 @@ class Crawler: to_process[_peer] = f f.add_done_callback(lambda _: to_process.pop(_peer)) - while to_process or len(self.known_peers) < len(self.crawled): - log.info("%d known, %d unreachable, %d error.. %d processing", - len(self.known_peers), len(self.unreachable), len(self.error), len(to_process)) - for peer in self.crawled.difference(self.known_peers): - self.known_peers.add(peer) - submit(peer) + to_check = self.get_peers_needing_check() + while True: + for peer in to_check: + if peer not in to_process: + submit(peer) + if len(to_process) > 20: + break + await asyncio.sleep(0) + log.info("%d known, %d unreachable, %d error, %d processing, %d on queue", + self.recent_peers_query.count(), self.unreachable_peers_count, self.peers_with_errors_count, + len(to_process), len(to_check)) await asyncio.wait(to_process.values(), return_when=asyncio.FIRST_COMPLETED) + to_check = self.get_peers_needing_check() + while not to_check and not to_process: + log.info("Idle, sleeping a minute.") + await asyncio.sleep(60.0) + to_check = self.get_peers_needing_check() async def test(): - crawler = Crawler() + crawler = Crawler("/tmp/a.db") await crawler.node.start_listening() conf = Config() - for (host, port) in conf.known_dht_nodes: - await crawler.crawl_routing_table(host, port) + #for (host, port) in conf.known_dht_nodes: + # await crawler.crawl_routing_table(host, port) await crawler.process() if __name__ == '__main__':