From cfe5c8de8aa90f026f69cf238a8e7adcd2f67a90 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 17 Jun 2022 06:09:57 -0300 Subject: [PATCH] dht_crawler: serve prometheus metrics at 7070 --- scripts/dht_crawler.py | 64 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/scripts/dht_crawler.py b/scripts/dht_crawler.py index c4ea6c75f..8acd6c6e6 100644 --- a/scripts/dht_crawler.py +++ b/scripts/dht_crawler.py @@ -4,6 +4,9 @@ import asyncio import time import typing +from aiohttp import web +from prometheus_client import Gauge, Counter, generate_latest as prom_generate_latest + import lbry.dht.error from lbry.dht.constants import generate_id from lbry.dht.node import Node @@ -71,6 +74,34 @@ def new_node(address="0.0.0.0", udp_port=4444, node_id=None): class Crawler: + unique_total_hosts_metric = Gauge( + "unique_total_hosts", "Number of unique hosts seen in the last interval", namespace="dht_crawler_node", + labelnames=("scope",) + ) + reachable_hosts_metric = Gauge( + "reachable_hosts", "Number of hosts that replied in the last interval", namespace="dht_crawler_node", + labelnames=("scope",) + ) + total_historic_hosts_metric = Gauge( + "history_total_hosts", "Number of hosts seen since first run.", namespace="dht_crawler_node", + labelnames=("scope",) + ) + pending_check_hosts_metric = Gauge( + "pending_hosts", "Number of hosts on queue to be checked.", namespace="dht_crawler_node", + labelnames=("scope",) + ) + hosts_with_errors_metric = Gauge( + "error_hosts", "Number of hosts that raised errors during contact.", namespace="dht_crawler_node", + labelnames=("scope",) + ) + connections_found_metric = Gauge( + "connections_found", "Number of hosts returned by the last successful contact.", namespace="dht_crawler_node", + labelnames=("host", "port") + ) + host_latency_metric = Gauge( + "host_latency", "Time spent on the last request, in nanoseconds.", namespace="dht_crawler_node", + labelnames=("host", "port") + ) def __init__(self, db_path: str): self.node = new_node() self.semaphore = asyncio.Semaphore(200) @@ -134,6 +165,8 @@ class Crawler: return self._memory_peers.get((peer.address, peer.udp_port), None) def set_latency(self, peer, latency=None): + if latency: + self.host_latency_metric.labels(host=peer.address, port=peer.udp_port).set(latency) db_peer = self.get_from_peer(peer) db_peer.latency = latency if not db_peer.node_id and peer.node_id: @@ -227,6 +260,7 @@ class Crawler: log.info("Done querying %s:%d in %.2f seconds: %d peers found over %d requests.", host, port, (time.time() - start), len(peers), i) self.add_peers(*peers) + self.connections_found_metric.labels(host=host, port=port).set(len(peers)) #self.associate_peers(this_peer_id, db_peer_ids) self.db.commit() return peers @@ -250,6 +284,11 @@ class Crawler: if len(to_process) > 100: break await asyncio.sleep(0) + self.unique_total_hosts_metric.labels("global").set(self.checked_peers_count) + self.reachable_hosts_metric.labels("global").set(self.checked_peers_count - self.unreachable_peers_count) + self.total_historic_hosts_metric.labels("global").set(len(self._memory_peers)) + self.pending_check_hosts_metric.labels("global").set(len(to_check)) + self.hosts_with_errors_metric.labels("global").set(self.peers_with_errors_count) log.info("%d known, %d contacted recently, %d unreachable, %d error, %d processing, %d on queue", self.active_peers_count, self.checked_peers_count, self.unreachable_peers_count, self.peers_with_errors_count, len(to_process), len(to_check)) @@ -266,7 +305,32 @@ class Crawler: to_check = self.get_peers_needing_check() +class SimpleMetrics: + def __init__(self, port): + self.prometheus_port = port + + async def handle_metrics_get_request(self, _): + try: + return web.Response( + text=prom_generate_latest().decode(), + content_type='text/plain; version=0.0.4' + ) + except Exception: + log.exception('could not generate prometheus data') + raise + + async def start(self): + prom_app = web.Application() + prom_app.router.add_get('/metrics', self.handle_metrics_get_request) + metrics_runner = web.AppRunner(prom_app) + await metrics_runner.setup() + prom_site = web.TCPSite(metrics_runner, "0.0.0.0", self.prometheus_port) + await prom_site.start() + + async def test(): + metrics = SimpleMetrics('7070') + await metrics.start() crawler = Crawler("/tmp/a.db") await crawler.node.start_listening() conf = Config()