From 508bdb8e94f1b22860066ff71d2c035a2803565b Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 15 Jun 2022 11:28:17 -0300 Subject: [PATCH] dht_crawler: keep working set in memory, flush to db on intervals --- scripts/dht_crawler.py | 77 +++++++++++++++++++++--------------------- 1 file changed, 39 insertions(+), 38 deletions(-) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index e0ffa071b..39a07441f 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -52,7 +52,7 @@ class DHTPeer(Base): return DHTPeer(node_id=node_id, address=peer.address, udp_port=peer.udp_port, tcp_port=peer.tcp_port) def to_kad_peer(self): - node_id = bytes.fromhex(self.node_id.hex()) if self.node_id else None + node_id = bytes.fromhex(self.node_id) if self.node_id else None return make_kademlia_peer(node_id, self.address, self.udp_port, self.tcp_port) @@ -73,40 +73,46 @@ def new_node(address="0.0.0.0", udp_port=4444, node_id=None): class Crawler: def __init__(self, db_path: str): self.node = new_node() - self.semaphore = asyncio.Semaphore(20) + self.semaphore = asyncio.Semaphore(200) engine = sqla.create_engine(f"sqlite:///{db_path}") Base.metadata.create_all(engine) session = sqla.orm.sessionmaker(engine, autocommit=False, autoflush=False, expire_on_commit=False) self.db = session() + self._memory_peers = { + (peer.address, peer.udp_port): peer for peer in self.db.query(DHTPeer).all() + } @property def refresh_limit(self): return datetime.datetime.utcnow() - datetime.timedelta(hours=1) @property - def active_peers_query(self): - return self.db.query(DHTPeer).filter(sqla.or_(DHTPeer.last_seen > self.refresh_limit, DHTPeer.latency > 0)) + def all_peers(self): + return [ + peer for peer in self._memory_peers.values() + if (peer.last_seen and peer.last_seen > self.refresh_limit) or (peer.latency or 0) > 0 + ] @property - def all_peers(self): - return set([peer.to_kad_peer() for peer in self.active_peers_query.all()]) + def active_peers_count(self): + return len(self.all_peers) @property def checked_peers_count(self): - return self.active_peers_query.filter(DHTPeer.last_check > self.refresh_limit).count() + return len([peer for peer in self.all_peers if peer.last_check and peer.last_check > self.refresh_limit]) @property def unreachable_peers_count(self): - return self.active_peers_query.filter(DHTPeer.latency == None, DHTPeer.last_check > self.refresh_limit).count() + return len([peer for peer in self.all_peers + if peer.last_check and peer.last_check > self.refresh_limit and not peer.latency]) @property def peers_with_errors_count(self): - return self.active_peers_query.filter(DHTPeer.errors > 0).count() + return len([peer for peer in self.all_peers if (peer.errors or 0) > 0]) def get_peers_needing_check(self): - return set([peer.to_kad_peer() for peer in self.active_peers_query.filter( - sqla.or_(DHTPeer.last_check == None, - DHTPeer.last_check < self.refresh_limit)).order_by(DHTPeer.last_seen.desc()).all()]) + to_check = [peer for peer in self.all_peers if peer.last_check is None or peer.last_check < self.refresh_limit] + return to_check def add_peers(self, *peers): db_peers = [] @@ -116,13 +122,16 @@ class Crawler: db_peer.node_id = peer.node_id.hex() elif not db_peer: db_peer = DHTPeer.from_kad_peer(peer) - self.db.add(db_peer) + self._memory_peers[(peer.address, peer.udp_port)] = db_peer + db_peer.last_seen = datetime.datetime.utcnow() db_peers.append(db_peer) - self.db.flush() - return [dbp.peer_id for dbp in db_peers] + + def flush_to_db(self): + self.db.add_all(self._memory_peers.values()) + self.db.commit() def get_from_peer(self, peer): - return self.db.query(DHTPeer).filter(DHTPeer.address==peer.address, DHTPeer.udp_port==peer.udp_port).first() + return self._memory_peers.get((peer.address, peer.udp_port), None) def set_latency(self, peer, latency=None): db_peer = self.get_from_peer(peer) @@ -134,27 +143,13 @@ class Crawler: elif latency is not None and db_peer.first_online is None: db_peer.first_online = datetime.datetime.utcnow() db_peer.last_check = datetime.datetime.utcnow() - self.db.add(db_peer) def inc_errors(self, peer): db_peer = self.get_from_peer(peer) - db_peer.errors += 1 - self.db.add(db_peer) - - def count_peers(self): - return self.db.query(DHTPeer).count() + db_peer.errors = (db_peer.errors or 0) + 1 def associate_peers(self, target_peer_id, db_peer_ids): - connections = { - DHTConnection( - from_peer_id=target_peer_id, - to_peer_id=peer_id) - for peer_id in db_peer_ids - } - self.db.query(DHTPeer).filter(DHTPeer.peer_id.in_(set(db_peer_ids))).update( - {DHTPeer.last_seen: datetime.datetime.utcnow()}) - self.db.query(DHTConnection).filter(DHTConnection.from_peer_id == target_peer_id).delete() - self.db.add_all(connections) + return # todo async def request_peers(self, host, port, key) -> typing.List['KademliaPeer']: async with self.semaphore: @@ -177,7 +172,7 @@ class Crawler: start = time.time() log.info("querying %s:%d", host, port) address = await resolve_host(host, port, 'udp') - this_peer_id, = self.add_peers(make_kademlia_peer(None, address, port)) + self.add_peers(make_kademlia_peer(None, address, port)) key = self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port) latency = None for _ in range(3): @@ -225,8 +220,8 @@ class Crawler: factor = 2048 log.info("Done querying %s:%d in %.2f seconds: %d peers found over %d requests.", host, port, (time.time() - start), len(peers), i) - db_peer_ids = self.add_peers(*peers) - self.associate_peers(this_peer_id, db_peer_ids) + self.add_peers(*peers) + #self.associate_peers(this_peer_id, db_peer_ids) self.db.commit() return peers @@ -239,19 +234,25 @@ class Crawler: f.add_done_callback(lambda _: to_process.pop(_peer)) to_check = self.get_peers_needing_check() + last_flush = datetime.datetime.utcnow() while True: for peer in to_check: if peer not in to_process: submit(peer) - if len(to_process) > 20: + await asyncio.sleep(.1) + if len(to_process) > 100: break await asyncio.sleep(0) log.info("%d known, %d contacted recently, %d unreachable, %d error, %d processing, %d on queue", - self.active_peers_query.count(), self.checked_peers_count, self.unreachable_peers_count, + self.active_peers_count, self.checked_peers_count, self.unreachable_peers_count, self.peers_with_errors_count, len(to_process), len(to_check)) if to_process: await asyncio.wait(to_process.values(), return_when=asyncio.FIRST_COMPLETED) to_check = self.get_peers_needing_check() + if (datetime.datetime.utcnow() - last_flush).seconds > 60: + log.info("flushing to db") + self.flush_to_db() + last_flush = datetime.datetime.utcnow() while not to_check and not to_process: log.info("Idle, sleeping a minute.") await asyncio.sleep(60.0) @@ -262,7 +263,7 @@ async def test(): crawler = Crawler("/tmp/a.db") await crawler.node.start_listening() conf = Config() - if crawler.active_peers_query.count() < 100: + if crawler.active_peers_count < 100: for (host, port) in conf.known_dht_nodes: await crawler.crawl_routing_table(host, port) await crawler.process()