From 9cdcff0e1e28ec43e8183a277754fb4c47db2a9d Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 28 Nov 2021 21:58:15 -0300 Subject: [PATCH 01/22] first attempt at crawling --- scripts/dht_node.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/scripts/dht_node.py b/scripts/dht_node.py index 360e00eac..f2e0f1197 100644 --- a/scripts/dht_node.py +++ b/scripts/dht_node.py @@ -56,11 +56,20 @@ class SimpleMetrics: writer.writerow({"blob_hash": blob.hex()}) return web.Response(text=out.getvalue(), content_type='text/csv') + async def estimate_peers(self, request: web.Request): + amount = 2000 + peers = await self.dht_node.peer_search(self.dht_node.protocol.node_id, count=amount, max_results=amount) + close_ids = [peer for peer in peers if peer.node_id[0] == self.dht_node.protocol.node_id[0]] + print(self.dht_node.protocol.node_id.hex()) + print([cid.node_id.hex() for cid in close_ids]) + return web.json_response({"total": len(peers), "close": len(close_ids)}) + async def start(self): prom_app = web.Application() prom_app.router.add_get('/metrics', self.handle_metrics_get_request) prom_app.router.add_get('/peers.csv', self.handle_peers_csv) prom_app.router.add_get('/blobs.csv', self.handle_blobs_csv) + prom_app.router.add_get('/estimate', self.estimate_peers) metrics_runner = web.AppRunner(prom_app) await metrics_runner.setup() prom_site = web.TCPSite(metrics_runner, "0.0.0.0", self.prometheus_port) From e7d9079389c5dc4d87d59a85c7ead9bb1101235e Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 3 Dec 2021 10:56:22 -0500 Subject: [PATCH 02/22] improve script --- scripts/dht_node.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/scripts/dht_node.py b/scripts/dht_node.py index f2e0f1197..efbb6a21a 100644 --- a/scripts/dht_node.py +++ b/scripts/dht_node.py @@ -64,12 +64,18 @@ class SimpleMetrics: print([cid.node_id.hex() for cid in close_ids]) return web.json_response({"total": len(peers), "close": len(close_ids)}) + async def peers_in_routing_table(self, request: web.Request): + total_peers = self.dht_node.protocol.routing_table.get_peers() + close_ids = [peer for peer in total_peers if peer.node_id[0] == self.dht_node.protocol.node_id[0]] + return web.json_response({"total": len(total_peers), "close": len(close_ids), 'estimated_network_size': len(close_ids) * 256}) + async def start(self): prom_app = web.Application() prom_app.router.add_get('/metrics', self.handle_metrics_get_request) prom_app.router.add_get('/peers.csv', self.handle_peers_csv) prom_app.router.add_get('/blobs.csv', self.handle_blobs_csv) prom_app.router.add_get('/estimate', self.estimate_peers) + prom_app.router.add_get('/count', self.peers_in_routing_table) metrics_runner = web.AppRunner(prom_app) await metrics_runner.setup() prom_site = web.TCPSite(metrics_runner, "0.0.0.0", self.prometheus_port) From 5cb4c06d0cb649caa00ac684953d5345e4ae1595 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 3 Dec 2021 14:52:30 -0300 Subject: [PATCH 03/22] add prefix_neighbors_count to routing table debug api --- lbry/extras/daemon/daemon.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index c67a5b7e4..195564c54 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -5149,10 +5149,12 @@ class Daemon(metaclass=JSONRPCServerType): ] }, "node_id": (str) the local dht node id + "prefix_neighbors_count": (int) the amount of peers sharing the same byte prefix of the local node id } """ result = { - 'buckets': {} + 'buckets': {}, + 'prefix_neighbors_count': 0 } for i, _ in enumerate(self.dht_node.protocol.routing_table.buckets): @@ -5165,6 +5167,7 @@ class Daemon(metaclass=JSONRPCServerType): "node_id": hexlify(peer.node_id).decode(), } result['buckets'][i].append(host) + result['prefix_neighbors_count'] += 1 if peer.node_id[0] == self.dht_node.protocol.node_id[0] else 0 result['node_id'] = hexlify(self.dht_node.protocol.node_id).decode() return result From 87ff3f95ffb8981ed75d363b10dfa9ae630fc46b Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 3 Dec 2021 15:40:19 -0300 Subject: [PATCH 04/22] better endpoint names, small docs --- scripts/dht_node.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/scripts/dht_node.py b/scripts/dht_node.py index efbb6a21a..a9f5ea80d 100644 --- a/scripts/dht_node.py +++ b/scripts/dht_node.py @@ -56,26 +56,30 @@ class SimpleMetrics: writer.writerow({"blob_hash": blob.hex()}) return web.Response(text=out.getvalue(), content_type='text/csv') - async def estimate_peers(self, request: web.Request): - amount = 2000 + async def active_estimation(self, request: web.Request): + # - "crawls" the network for peers close to our node id (not a full aggressive crawler yet) + # given everything is random, the odds of a peer having the same X prefix bits matching ours is roughly 1/(2^X) + # we use that to estimate the network size, see issue #3463 for related papers and details + amount = 20_000 peers = await self.dht_node.peer_search(self.dht_node.protocol.node_id, count=amount, max_results=amount) close_ids = [peer for peer in peers if peer.node_id[0] == self.dht_node.protocol.node_id[0]] - print(self.dht_node.protocol.node_id.hex()) - print([cid.node_id.hex() for cid in close_ids]) return web.json_response({"total": len(peers), "close": len(close_ids)}) - async def peers_in_routing_table(self, request: web.Request): + async def passive_estimation(self, request: web.Request): + # same method as above but instead we use the routing table and assume our implementation was able to add + # all the reachable close peers, which should be usable for seed nodes since they are super popular total_peers = self.dht_node.protocol.routing_table.get_peers() close_ids = [peer for peer in total_peers if peer.node_id[0] == self.dht_node.protocol.node_id[0]] - return web.json_response({"total": len(total_peers), "close": len(close_ids), 'estimated_network_size': len(close_ids) * 256}) + return web.json_response( + {"total": len(total_peers), "close": len(close_ids), 'estimated_network_size': len(close_ids) * 256}) async def start(self): prom_app = web.Application() prom_app.router.add_get('/metrics', self.handle_metrics_get_request) prom_app.router.add_get('/peers.csv', self.handle_peers_csv) prom_app.router.add_get('/blobs.csv', self.handle_blobs_csv) - prom_app.router.add_get('/estimate', self.estimate_peers) - prom_app.router.add_get('/count', self.peers_in_routing_table) + prom_app.router.add_get('/active_estimation', self.active_estimation) + prom_app.router.add_get('/passive_estimation', self.passive_estimation) metrics_runner = web.AppRunner(prom_app) await metrics_runner.setup() prom_site = web.TCPSite(metrics_runner, "0.0.0.0", self.prometheus_port) From 1c857b8dd880bc145d089b8a76bc050ff9d7c4a7 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 3 Dec 2021 15:42:20 -0300 Subject: [PATCH 05/22] be explicit about ignoring params --- scripts/dht_node.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/scripts/dht_node.py b/scripts/dht_node.py index a9f5ea80d..229c8fc45 100644 --- a/scripts/dht_node.py +++ b/scripts/dht_node.py @@ -30,7 +30,7 @@ class SimpleMetrics: self.prometheus_port = port self.dht_node: Node = node - async def handle_metrics_get_request(self, request: web.Request): + async def handle_metrics_get_request(self, _): try: return web.Response( text=prom_generate_latest().decode(), @@ -40,7 +40,7 @@ class SimpleMetrics: log.exception('could not generate prometheus data') raise - async def handle_peers_csv(self, request: web.Request): + async def handle_peers_csv(self, _): out = StringIO() writer = csv.DictWriter(out, fieldnames=["ip", "port", "dht_id"]) writer.writeheader() @@ -48,7 +48,7 @@ class SimpleMetrics: writer.writerow({"ip": peer.address, "port": peer.udp_port, "dht_id": peer.node_id.hex()}) return web.Response(text=out.getvalue(), content_type='text/csv') - async def handle_blobs_csv(self, request: web.Request): + async def handle_blobs_csv(self, _): out = StringIO() writer = csv.DictWriter(out, fieldnames=["blob_hash"]) writer.writeheader() @@ -56,7 +56,7 @@ class SimpleMetrics: writer.writerow({"blob_hash": blob.hex()}) return web.Response(text=out.getvalue(), content_type='text/csv') - async def active_estimation(self, request: web.Request): + async def active_estimation(self, _): # - "crawls" the network for peers close to our node id (not a full aggressive crawler yet) # given everything is random, the odds of a peer having the same X prefix bits matching ours is roughly 1/(2^X) # we use that to estimate the network size, see issue #3463 for related papers and details @@ -65,7 +65,7 @@ class SimpleMetrics: close_ids = [peer for peer in peers if peer.node_id[0] == self.dht_node.protocol.node_id[0]] return web.json_response({"total": len(peers), "close": len(close_ids)}) - async def passive_estimation(self, request: web.Request): + async def passive_estimation(self, _): # same method as above but instead we use the routing table and assume our implementation was able to add # all the reachable close peers, which should be usable for seed nodes since they are super popular total_peers = self.dht_node.protocol.routing_table.get_peers() From a6ca7a6f38406d73ed548e0c7e571ad33addea3a Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 3 Dec 2021 16:38:35 -0300 Subject: [PATCH 06/22] same api across different estimation methods --- scripts/dht_node.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/scripts/dht_node.py b/scripts/dht_node.py index 229c8fc45..9045789d2 100644 --- a/scripts/dht_node.py +++ b/scripts/dht_node.py @@ -63,15 +63,20 @@ class SimpleMetrics: amount = 20_000 peers = await self.dht_node.peer_search(self.dht_node.protocol.node_id, count=amount, max_results=amount) close_ids = [peer for peer in peers if peer.node_id[0] == self.dht_node.protocol.node_id[0]] - return web.json_response({"total": len(peers), "close": len(close_ids)}) + return web.json_response( + {"total_peers_found_during_estimation": len(peers), + "peers_with_the_same_byte_prefix": len(close_ids), + 'estimated_network_size': len(close_ids) * 256}) async def passive_estimation(self, _): # same method as above but instead we use the routing table and assume our implementation was able to add # all the reachable close peers, which should be usable for seed nodes since they are super popular - total_peers = self.dht_node.protocol.routing_table.get_peers() - close_ids = [peer for peer in total_peers if peer.node_id[0] == self.dht_node.protocol.node_id[0]] + peers = self.dht_node.protocol.routing_table.get_peers() + close_ids = [peer for peer in peers if peer.node_id[0] == self.dht_node.protocol.node_id[0]] return web.json_response( - {"total": len(total_peers), "close": len(close_ids), 'estimated_network_size': len(close_ids) * 256}) + {"total_peers_found_during_estimation": len(peers), + "peers_with_the_same_byte_prefix": len(close_ids), + 'estimated_network_size': len(close_ids) * 256}) async def start(self): prom_app = web.Application() From 7ed5fe8f66a8cfe31eac63d87851c4fb2f772b89 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 3 Dec 2021 16:41:18 -0300 Subject: [PATCH 07/22] add semaphore on active estimation to avoid abuse --- scripts/dht_node.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/scripts/dht_node.py b/scripts/dht_node.py index 9045789d2..1c96e5458 100644 --- a/scripts/dht_node.py +++ b/scripts/dht_node.py @@ -29,6 +29,7 @@ class SimpleMetrics: def __init__(self, port, node): self.prometheus_port = port self.dht_node: Node = node + self.active_estimation_semaphore = asyncio.Semaphore(1) async def handle_metrics_get_request(self, _): try: @@ -61,7 +62,8 @@ class SimpleMetrics: # given everything is random, the odds of a peer having the same X prefix bits matching ours is roughly 1/(2^X) # we use that to estimate the network size, see issue #3463 for related papers and details amount = 20_000 - peers = await self.dht_node.peer_search(self.dht_node.protocol.node_id, count=amount, max_results=amount) + with self.active_estimation_semaphore: # this is resource intensive, limit concurrency to 1 + peers = await self.dht_node.peer_search(self.dht_node.protocol.node_id, count=amount, max_results=amount) close_ids = [peer for peer in peers if peer.node_id[0] == self.dht_node.protocol.node_id[0]] return web.json_response( {"total_peers_found_during_estimation": len(peers), From 371df6e6c29e6166cf2e0eb577778a7420ebda54 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 6 Dec 2021 11:07:55 -0300 Subject: [PATCH 08/22] keep same node id between runs --- scripts/dht_node.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/scripts/dht_node.py b/scripts/dht_node.py index 1c96e5458..8a5e18561 100644 --- a/scripts/dht_node.py +++ b/scripts/dht_node.py @@ -2,6 +2,7 @@ import asyncio import argparse import logging import csv +import os.path from io import StringIO from typing import Optional from aiohttp import web @@ -96,6 +97,16 @@ class SimpleMetrics: async def main(host: str, port: int, db_file_path: str, bootstrap_node: Optional[str], prometheus_port: int): loop = asyncio.get_event_loop() conf = Config() + if not db_file_path.startswith(':memory:'): + node_id_file_path = db_file_path + 'node_id' + if os.path.exists(node_id_file_path): + with open(node_id_file_path, 'rb') as node_id_file: + node_id = node_id_file.read() + else: + with open(node_id_file_path, 'wb') as node_id_file: + node_id = generate_id() + node_id_file.write(node_id) + storage = SQLiteStorage(conf, db_file_path, loop, loop.time) if bootstrap_node: nodes = bootstrap_node.split(':') @@ -104,13 +115,14 @@ async def main(host: str, port: int, db_file_path: str, bootstrap_node: Optional nodes = conf.known_dht_nodes await storage.open() node = Node( - loop, PeerManager(loop), generate_id(), port, port, 3333, None, + loop, PeerManager(loop), node_id, port, port, 3333, None, storage=storage ) if prometheus_port > 0: metrics = SimpleMetrics(prometheus_port, node) await metrics.start() node.start(host, nodes) + log.info("Peer with id %s started", node_id.hex()) while True: await asyncio.sleep(10) PEERS.labels('main').set(len(node.protocol.routing_table.get_peers())) From add147b409ca366eabdb7ab53b7b156799deb48d Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 6 Dec 2021 11:34:09 -0300 Subject: [PATCH 09/22] fix missing async --- scripts/dht_node.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/dht_node.py b/scripts/dht_node.py index 8a5e18561..2686b0e9d 100644 --- a/scripts/dht_node.py +++ b/scripts/dht_node.py @@ -63,7 +63,7 @@ class SimpleMetrics: # given everything is random, the odds of a peer having the same X prefix bits matching ours is roughly 1/(2^X) # we use that to estimate the network size, see issue #3463 for related papers and details amount = 20_000 - with self.active_estimation_semaphore: # this is resource intensive, limit concurrency to 1 + async with self.active_estimation_semaphore: # this is resource intensive, limit concurrency to 1 peers = await self.dht_node.peer_search(self.dht_node.protocol.node_id, count=amount, max_results=amount) close_ids = [peer for peer in peers if peer.node_id[0] == self.dht_node.protocol.node_id[0]] return web.json_response( From 470ee7246284516c1a702366dc0cbe4cdda00fa5 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 6 Dec 2021 11:40:35 -0300 Subject: [PATCH 10/22] add passive estimation to prometheus --- scripts/dht_node.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/scripts/dht_node.py b/scripts/dht_node.py index 2686b0e9d..945b8988b 100644 --- a/scripts/dht_node.py +++ b/scripts/dht_node.py @@ -24,6 +24,10 @@ PEERS = Gauge( "known_peers", "Number of peers on routing table", namespace="dht_node", labelnames=("method",) ) +ESTIMATED_SIZE = Gauge( + "passively_estimated_network_size", "Estimated network size from routing table", namespace="dht_node", + labelnames=("method",) +) class SimpleMetrics: @@ -126,6 +130,9 @@ async def main(host: str, port: int, db_file_path: str, bootstrap_node: Optional while True: await asyncio.sleep(10) PEERS.labels('main').set(len(node.protocol.routing_table.get_peers())) + peers = node.protocol.routing_table.get_peers() + close_ids = [peer for peer in peers if peer.node_id[0] == node.protocol.node_id[0]] + ESTIMATED_SIZE.labels('main').set(len(close_ids) * 256) BLOBS_STORED.labels('main').set(len(node.protocol.data_store.get_storing_contacts())) log.info("Known peers: %d. Storing contact information for %d blobs from %d peers.", len(node.protocol.routing_table.get_peers()), len(node.protocol.data_store), From 2d9130b4e0913df68c3b13d2f551c70edf0d15d2 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 10 Dec 2021 02:44:51 -0300 Subject: [PATCH 11/22] prometheus: move blobs_stored and peers to SDK. add buckets_in_routing_table --- lbry/dht/protocol/protocol.py | 8 ++++++++ lbry/dht/protocol/routing_table.py | 17 ++++++++++++++++- scripts/dht_node.py | 19 +------------------ 3 files changed, 25 insertions(+), 19 deletions(-) diff --git a/lbry/dht/protocol/protocol.py b/lbry/dht/protocol/protocol.py index 66165740b..718e339f5 100644 --- a/lbry/dht/protocol/protocol.py +++ b/lbry/dht/protocol/protocol.py @@ -8,6 +8,8 @@ import random from asyncio.protocols import DatagramProtocol from asyncio.transports import DatagramTransport +from prometheus_client import Gauge + from lbry.dht import constants from lbry.dht.serialization.bencoding import DecodeError from lbry.dht.serialization.datagram import decode_datagram, ErrorDatagram, ResponseDatagram, RequestDatagram @@ -30,6 +32,11 @@ OLD_PROTOCOL_ERRORS = { class KademliaRPC: + stored_blobs_metric = Gauge( + "stored_blobs", "Number of blobs announced by other peers", namespace="dht_node", + labelnames=("scope",), + ) + def __init__(self, protocol: 'KademliaProtocol', loop: asyncio.AbstractEventLoop, peer_port: int = 3333): self.protocol = protocol self.loop = loop @@ -61,6 +68,7 @@ class KademliaRPC: self.protocol.data_store.add_peer_to_blob( rpc_contact, blob_hash ) + self.stored_blobs_metric.labels("global").set(len(self.protocol.data_store)) return b'OK' def find_node(self, rpc_contact: 'KademliaPeer', key: bytes) -> typing.List[typing.Tuple[bytes, str, int]]: diff --git a/lbry/dht/protocol/routing_table.py b/lbry/dht/protocol/routing_table.py index 4fea1266f..1dcfc9db0 100644 --- a/lbry/dht/protocol/routing_table.py +++ b/lbry/dht/protocol/routing_table.py @@ -4,6 +4,8 @@ import logging import typing import itertools +from prometheus_client import Gauge + from lbry.dht import constants from lbry.dht.protocol.distance import Distance if typing.TYPE_CHECKING: @@ -13,8 +15,13 @@ log = logging.getLogger(__name__) class KBucket: - """ Description - later """ + Kademlia K-bucket implementation. + """ + peers_in_routing_table_metric = Gauge( + "peers_in_routing_table", "Number of peers on routing table", namespace="dht_node", + labelnames=("scope",) + ) def __init__(self, peer_manager: 'PeerManager', range_min: int, range_max: int, node_id: bytes): """ @@ -58,6 +65,7 @@ class KBucket: return True if len(self.peers) < constants.K: self.peers.append(peer) + self.peers_in_routing_table_metric.labels("global").inc() return True else: return False @@ -124,6 +132,7 @@ class KBucket: def remove_peer(self, peer: 'KademliaPeer') -> None: self.peers.remove(peer) + self.peers_in_routing_table_metric.labels("global").dec() def key_in_range(self, key: bytes) -> bool: """ Tests whether the specified key (i.e. node ID) is in the range @@ -162,6 +171,10 @@ class TreeRoutingTable: ping RPC-based k-bucket eviction algorithm described in section 2.2 of that paper. """ + buckets_in_routing_table_metric = Gauge( + "buckets_in_routing_table", "Number of buckets on routing table", namespace="dht_node", + labelnames=("scope",) + ) def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', parent_node_id: bytes, split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX): @@ -279,6 +292,7 @@ class TreeRoutingTable: # ...and remove them from the old bucket for contact in new_bucket.peers: old_bucket.remove_peer(contact) + self.buckets_in_routing_table_metric.labels("global").set(len(self.buckets)) def join_buckets(self): if len(self.buckets) == 1: @@ -302,6 +316,7 @@ class TreeRoutingTable: elif can_go_higher: self.buckets[bucket_index_to_pop + 1].range_min = bucket.range_min self.buckets.remove(bucket) + self.buckets_in_routing_table_metric.labels("global").set(len(self.buckets)) return self.join_buckets() def contact_in_routing_table(self, address_tuple: typing.Tuple[str, int]) -> bool: diff --git a/scripts/dht_node.py b/scripts/dht_node.py index 945b8988b..7343b0979 100644 --- a/scripts/dht_node.py +++ b/scripts/dht_node.py @@ -6,7 +6,7 @@ import os.path from io import StringIO from typing import Optional from aiohttp import web -from prometheus_client import generate_latest as prom_generate_latest, Gauge +from prometheus_client import generate_latest as prom_generate_latest from lbry.dht.constants import generate_id from lbry.dht.node import Node @@ -16,18 +16,6 @@ from lbry.conf import Config logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s") log = logging.getLogger(__name__) -BLOBS_STORED = Gauge( - "blobs_stored", "Number of blob info received", namespace="dht_node", - labelnames=("method",) -) -PEERS = Gauge( - "known_peers", "Number of peers on routing table", namespace="dht_node", - labelnames=("method",) -) -ESTIMATED_SIZE = Gauge( - "passively_estimated_network_size", "Estimated network size from routing table", namespace="dht_node", - labelnames=("method",) -) class SimpleMetrics: @@ -129,11 +117,6 @@ async def main(host: str, port: int, db_file_path: str, bootstrap_node: Optional log.info("Peer with id %s started", node_id.hex()) while True: await asyncio.sleep(10) - PEERS.labels('main').set(len(node.protocol.routing_table.get_peers())) - peers = node.protocol.routing_table.get_peers() - close_ids = [peer for peer in peers if peer.node_id[0] == node.protocol.node_id[0]] - ESTIMATED_SIZE.labels('main').set(len(close_ids) * 256) - BLOBS_STORED.labels('main').set(len(node.protocol.data_store.get_storing_contacts())) log.info("Known peers: %d. Storing contact information for %d blobs from %d peers.", len(node.protocol.routing_table.get_peers()), len(node.protocol.data_store), len(node.protocol.data_store.get_storing_contacts())) From a22f50aa84ce8f00e1e2a3548064dcf68db433c0 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 10 Dec 2021 03:48:33 -0300 Subject: [PATCH 12/22] add storing_peers and peer_manager_keys --- lbry/dht/node.py | 11 ++++++++++- lbry/dht/peer.py | 13 +++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/lbry/dht/node.py b/lbry/dht/node.py index 300c1a774..d9217529b 100644 --- a/lbry/dht/node.py +++ b/lbry/dht/node.py @@ -2,6 +2,9 @@ import logging import asyncio import typing import socket + +from prometheus_client import Gauge + from lbry.utils import resolve_host from lbry.dht import constants from lbry.dht.peer import make_kademlia_peer @@ -17,6 +20,10 @@ log = logging.getLogger(__name__) class Node: + storing_peers_metric = Gauge( + "storing_peers", "Number of peers storing blobs announced to this node", namespace="dht_node", + labelnames=("scope",), + ) def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int, internal_udp_port: int, peer_port: int, external_ip: str, rpc_timeout: float = constants.RPC_TIMEOUT, split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX, @@ -44,7 +51,9 @@ class Node: # add all peers in the routing table total_peers.extend(self.protocol.routing_table.get_peers()) # add all the peers who have announced blobs to us - total_peers.extend(self.protocol.data_store.get_storing_contacts()) + storing_peers = self.protocol.data_store.get_storing_contacts() + self.storing_peers_metric.labels("global").set(len(storing_peers)) + total_peers.extend(storing_peers) # get ids falling in the midpoint of each bucket that hasn't been recently updated node_ids = self.protocol.routing_table.get_refresh_list(0, True) diff --git a/lbry/dht/peer.py b/lbry/dht/peer.py index ae4fe2191..9413eec94 100644 --- a/lbry/dht/peer.py +++ b/lbry/dht/peer.py @@ -3,6 +3,9 @@ import asyncio import logging from dataclasses import dataclass, field from functools import lru_cache + +from prometheus_client import Gauge + from lbry.utils import is_valid_public_ipv4 as _is_valid_public_ipv4, LRUCache from lbry.dht import constants from lbry.dht.serialization.datagram import make_compact_address, make_compact_ip, decode_compact_address @@ -26,6 +29,10 @@ def is_valid_public_ipv4(address, allow_localhost: bool = False): class PeerManager: + peer_manager_keys_metric = Gauge( + "peer_manager_keys", "Number of keys tracked by PeerManager dicts (sum)", namespace="dht_node", + labelnames=("scope",) + ) def __init__(self, loop: asyncio.AbstractEventLoop): self._loop = loop self._rpc_failures: typing.Dict[ @@ -38,6 +45,11 @@ class PeerManager: self._node_id_reverse_mapping: typing.Dict[bytes, typing.Tuple[str, int]] = LRUCache(CACHE_SIZE) self._node_tokens: typing.Dict[bytes, (float, bytes)] = LRUCache(CACHE_SIZE) + def count_cache_keys(self): + return len(self._rpc_failures) + len(self._last_replied) + len(self._last_sent) + len( + self._last_requested) + len(self._node_id_mapping) + len(self._node_id_reverse_mapping) + len( + self._node_tokens) + def reset(self): for statistic in (self._rpc_failures, self._last_replied, self._last_sent, self._last_requested): statistic.clear() @@ -86,6 +98,7 @@ class PeerManager: self._node_id_mapping.pop(self._node_id_reverse_mapping.pop(node_id)) self._node_id_mapping[(address, udp_port)] = node_id self._node_id_reverse_mapping[node_id] = (address, udp_port) + self.peer_manager_keys_metric.labels("global").set(self.count_cache_keys()) def prune(self): # TODO: periodically call this now = self._loop.time() From 7b09c34fce9bfe26c70df58df99a62181bd10c25 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 10 Dec 2021 04:10:15 -0300 Subject: [PATCH 13/22] add request_sent and request_time metric on dht --- lbry/dht/protocol/protocol.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/lbry/dht/protocol/protocol.py b/lbry/dht/protocol/protocol.py index 718e339f5..6a13b5346 100644 --- a/lbry/dht/protocol/protocol.py +++ b/lbry/dht/protocol/protocol.py @@ -3,12 +3,13 @@ import socket import functools import hashlib import asyncio +import time import typing import random from asyncio.protocols import DatagramProtocol from asyncio.transports import DatagramTransport -from prometheus_client import Gauge +from prometheus_client import Gauge, Counter, Histogram from lbry.dht import constants from lbry.dht.serialization.bencoding import DecodeError @@ -267,6 +268,18 @@ class PingQueue: class KademliaProtocol(DatagramProtocol): + requests_sent_metric = Counter( + "request_sent", "Number of requests send from DHT RPC protocol", namespace="dht_node", + labelnames=("method",), + ) + HISTOGRAM_BUCKETS = ( + .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 3.0, 3.5, 4.0, 4.50, 5.0, 5.50, 6.0, float('inf') + ) + response_time_metric = Histogram( + "response_time", "Response times of DHT RPC requests", namespace="dht_node", buckets=HISTOGRAM_BUCKETS, + labelnames=("method",) + ) + def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, external_ip: str, udp_port: int, peer_port: int, rpc_timeout: float = constants.RPC_TIMEOUT, split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX): @@ -583,7 +596,10 @@ class KademliaProtocol(DatagramProtocol): self._send(peer, request) response_fut = self.sent_messages[request.rpc_id][1] try: + self.requests_sent_metric.labels(method=request.method).inc() + start = time.perf_counter() response = await asyncio.wait_for(response_fut, self.rpc_timeout) + self.response_time_metric.labels(method=request.method).observe(time.perf_counter() - start) self.peer_manager.report_last_replied(peer.address, peer.udp_port) return response except asyncio.CancelledError: From 46f576de46e6101d67180e88dccb98ed462b51aa Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 13 Dec 2021 04:29:29 -0300 Subject: [PATCH 14/22] add request received --- lbry/dht/protocol/protocol.py | 18 ++++++++++++++---- lbry/dht/protocol/routing_table.py | 12 ++++++------ 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/lbry/dht/protocol/protocol.py b/lbry/dht/protocol/protocol.py index 6a13b5346..bd89f0426 100644 --- a/lbry/dht/protocol/protocol.py +++ b/lbry/dht/protocol/protocol.py @@ -33,7 +33,7 @@ OLD_PROTOCOL_ERRORS = { class KademliaRPC: - stored_blobs_metric = Gauge( + stored_blob_metric = Gauge( "stored_blobs", "Number of blobs announced by other peers", namespace="dht_node", labelnames=("scope",), ) @@ -69,7 +69,7 @@ class KademliaRPC: self.protocol.data_store.add_peer_to_blob( rpc_contact, blob_hash ) - self.stored_blobs_metric.labels("global").set(len(self.protocol.data_store)) + self.stored_blob_metric.labels("global").set(len(self.protocol.data_store)) return b'OK' def find_node(self, rpc_contact: 'KademliaPeer', key: bytes) -> typing.List[typing.Tuple[bytes, str, int]]: @@ -268,10 +268,14 @@ class PingQueue: class KademliaProtocol(DatagramProtocol): - requests_sent_metric = Counter( + request_sent_metric = Counter( "request_sent", "Number of requests send from DHT RPC protocol", namespace="dht_node", labelnames=("method",), ) + request_success_metric = Counter( + "request_success", "Number of successful requests", namespace="dht_node", + labelnames=("method",), + ) HISTOGRAM_BUCKETS = ( .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 3.0, 3.5, 4.0, 4.50, 5.0, 5.50, 6.0, float('inf') ) @@ -279,6 +283,10 @@ class KademliaProtocol(DatagramProtocol): "response_time", "Response times of DHT RPC requests", namespace="dht_node", buckets=HISTOGRAM_BUCKETS, labelnames=("method",) ) + received_request_metric = Counter( + "received_request", "Number of received DHT RPC requests", namespace="dht_node", + labelnames=("method",), + ) def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, external_ip: str, udp_port: int, peer_port: int, rpc_timeout: float = constants.RPC_TIMEOUT, @@ -468,6 +476,7 @@ class KademliaProtocol(DatagramProtocol): def handle_request_datagram(self, address: typing.Tuple[str, int], request_datagram: RequestDatagram): # This is an RPC method request + self.received_request_metric.labels(method=request_datagram.method).inc() self.peer_manager.report_last_requested(address[0], address[1]) try: peer = self.routing_table.get_peer(request_datagram.node_id) @@ -596,11 +605,12 @@ class KademliaProtocol(DatagramProtocol): self._send(peer, request) response_fut = self.sent_messages[request.rpc_id][1] try: - self.requests_sent_metric.labels(method=request.method).inc() + self.request_sent_metric.labels(method=request.method).inc() start = time.perf_counter() response = await asyncio.wait_for(response_fut, self.rpc_timeout) self.response_time_metric.labels(method=request.method).observe(time.perf_counter() - start) self.peer_manager.report_last_replied(peer.address, peer.udp_port) + self.request_success_metric.labels(method=request.method).inc() return response except asyncio.CancelledError: if not response_fut.done(): diff --git a/lbry/dht/protocol/routing_table.py b/lbry/dht/protocol/routing_table.py index 1dcfc9db0..4515b8adc 100644 --- a/lbry/dht/protocol/routing_table.py +++ b/lbry/dht/protocol/routing_table.py @@ -18,7 +18,7 @@ class KBucket: """ Kademlia K-bucket implementation. """ - peers_in_routing_table_metric = Gauge( + peer_in_routing_table_metric = Gauge( "peers_in_routing_table", "Number of peers on routing table", namespace="dht_node", labelnames=("scope",) ) @@ -65,7 +65,7 @@ class KBucket: return True if len(self.peers) < constants.K: self.peers.append(peer) - self.peers_in_routing_table_metric.labels("global").inc() + self.peer_in_routing_table_metric.labels("global").inc() return True else: return False @@ -132,7 +132,7 @@ class KBucket: def remove_peer(self, peer: 'KademliaPeer') -> None: self.peers.remove(peer) - self.peers_in_routing_table_metric.labels("global").dec() + self.peer_in_routing_table_metric.labels("global").dec() def key_in_range(self, key: bytes) -> bool: """ Tests whether the specified key (i.e. node ID) is in the range @@ -171,7 +171,7 @@ class TreeRoutingTable: ping RPC-based k-bucket eviction algorithm described in section 2.2 of that paper. """ - buckets_in_routing_table_metric = Gauge( + bucket_in_routing_table_metric = Gauge( "buckets_in_routing_table", "Number of buckets on routing table", namespace="dht_node", labelnames=("scope",) ) @@ -292,7 +292,7 @@ class TreeRoutingTable: # ...and remove them from the old bucket for contact in new_bucket.peers: old_bucket.remove_peer(contact) - self.buckets_in_routing_table_metric.labels("global").set(len(self.buckets)) + self.bucket_in_routing_table_metric.labels("global").set(len(self.buckets)) def join_buckets(self): if len(self.buckets) == 1: @@ -316,7 +316,7 @@ class TreeRoutingTable: elif can_go_higher: self.buckets[bucket_index_to_pop + 1].range_min = bucket.range_min self.buckets.remove(bucket) - self.buckets_in_routing_table_metric.labels("global").set(len(self.buckets)) + self.bucket_in_routing_table_metric.labels("global").set(len(self.buckets)) return self.join_buckets() def contact_in_routing_table(self, address_tuple: typing.Tuple[str, int]) -> bool: From ff36bdc8024063104c3ae91f88abc56b12673002 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 13 Dec 2021 04:40:44 -0300 Subject: [PATCH 15/22] add requests in flight and error --- lbry/dht/protocol/protocol.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/lbry/dht/protocol/protocol.py b/lbry/dht/protocol/protocol.py index bd89f0426..805d567de 100644 --- a/lbry/dht/protocol/protocol.py +++ b/lbry/dht/protocol/protocol.py @@ -276,6 +276,14 @@ class KademliaProtocol(DatagramProtocol): "request_success", "Number of successful requests", namespace="dht_node", labelnames=("method",), ) + request_flight_metric = Gauge( + "request_flight", "Number of ongoing requests", namespace="dht_node", + labelnames=("method",), + ) + request_error_metric = Counter( + "request_error", "Number of errors returned from request to other peers", namespace="dht_node", + labelnames=("method",), + ) HISTOGRAM_BUCKETS = ( .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 3.0, 3.5, 4.0, 4.50, 5.0, 5.50, 6.0, float('inf') ) @@ -606,6 +614,7 @@ class KademliaProtocol(DatagramProtocol): response_fut = self.sent_messages[request.rpc_id][1] try: self.request_sent_metric.labels(method=request.method).inc() + self.request_flight_metric.labels(method=request.method).inc() start = time.perf_counter() response = await asyncio.wait_for(response_fut, self.rpc_timeout) self.response_time_metric.labels(method=request.method).observe(time.perf_counter() - start) @@ -617,10 +626,13 @@ class KademliaProtocol(DatagramProtocol): response_fut.cancel() raise except (asyncio.TimeoutError, RemoteException): + self.request_error_metric.labels(method=request.method).inc() self.peer_manager.report_failure(peer.address, peer.udp_port) if self.peer_manager.peer_is_good(peer) is False: self.remove_peer(peer) raise + finally: + self.request_flight_metric.labels(method=request.method).dec() def send_response(self, peer: 'KademliaPeer', response: ResponseDatagram): self._send(peer, response) From 06e94640b5c025b14474e02a685a07852c7c39cd Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 13 Dec 2021 04:56:25 -0300 Subject: [PATCH 16/22] add counter for peers with colliding bytes --- lbry/dht/protocol/routing_table.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/lbry/dht/protocol/routing_table.py b/lbry/dht/protocol/routing_table.py index 4515b8adc..742b1aa03 100644 --- a/lbry/dht/protocol/routing_table.py +++ b/lbry/dht/protocol/routing_table.py @@ -4,7 +4,7 @@ import logging import typing import itertools -from prometheus_client import Gauge +from prometheus_client import Gauge, Counter from lbry.dht import constants from lbry.dht.protocol.distance import Distance @@ -22,6 +22,10 @@ class KBucket: "peers_in_routing_table", "Number of peers on routing table", namespace="dht_node", labelnames=("scope",) ) + peer_with_x_byte_colliding_metric = Counter( + "peer_x_byte_colliding", "Number of peers with at least X bytes colliding with this node id", + namespace="dht_node", labelnames=("amount",) + ) def __init__(self, peer_manager: 'PeerManager', range_min: int, range_max: int, node_id: bytes): """ @@ -66,6 +70,9 @@ class KBucket: if len(self.peers) < constants.K: self.peers.append(peer) self.peer_in_routing_table_metric.labels("global").inc() + if self._node_id[0] == peer.node_id[0]: + amount = 2 if self._node_id[1] == peer.node_id[1] else 1 + self.peer_with_x_byte_colliding_metric.labels(amount=amount).inc() return True else: return False From b44e2c0b38d2f05f066939fb34aa805553f84d52 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 15 Dec 2021 04:36:24 -0300 Subject: [PATCH 17/22] count bit collisions between 8 and 16 --- lbry/dht/protocol/routing_table.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lbry/dht/protocol/routing_table.py b/lbry/dht/protocol/routing_table.py index 742b1aa03..c0e1f77b9 100644 --- a/lbry/dht/protocol/routing_table.py +++ b/lbry/dht/protocol/routing_table.py @@ -22,8 +22,8 @@ class KBucket: "peers_in_routing_table", "Number of peers on routing table", namespace="dht_node", labelnames=("scope",) ) - peer_with_x_byte_colliding_metric = Counter( - "peer_x_byte_colliding", "Number of peers with at least X bytes colliding with this node id", + peer_with_x_bit_colliding_metric = Counter( + "peer_x_bit_colliding", "Number of peers with at least X bits colliding with this node id", namespace="dht_node", labelnames=("amount",) ) @@ -70,9 +70,9 @@ class KBucket: if len(self.peers) < constants.K: self.peers.append(peer) self.peer_in_routing_table_metric.labels("global").inc() - if self._node_id[0] == peer.node_id[0]: - amount = 2 if self._node_id[1] == peer.node_id[1] else 1 - self.peer_with_x_byte_colliding_metric.labels(amount=amount).inc() + if peer.node_id[0] == self._node_id[0]: + bits_colliding = 8 - (peer.node_id[1] ^ self._node_id[1]).bit_length() + self.peer_with_x_bit_colliding_metric.labels(amount=(bits_colliding + 8)).inc() return True else: return False From beb8583436ccf33c56d1a31bd261cb9e5bf93ccf Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 16 Dec 2021 16:50:14 -0300 Subject: [PATCH 18/22] change colliding bits metric to gauge --- lbry/dht/protocol/routing_table.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lbry/dht/protocol/routing_table.py b/lbry/dht/protocol/routing_table.py index c0e1f77b9..344158a95 100644 --- a/lbry/dht/protocol/routing_table.py +++ b/lbry/dht/protocol/routing_table.py @@ -4,7 +4,7 @@ import logging import typing import itertools -from prometheus_client import Gauge, Counter +from prometheus_client import Gauge from lbry.dht import constants from lbry.dht.protocol.distance import Distance @@ -22,7 +22,7 @@ class KBucket: "peers_in_routing_table", "Number of peers on routing table", namespace="dht_node", labelnames=("scope",) ) - peer_with_x_bit_colliding_metric = Counter( + peer_with_x_bit_colliding_metric = Gauge( "peer_x_bit_colliding", "Number of peers with at least X bits colliding with this node id", namespace="dht_node", labelnames=("amount",) ) @@ -140,6 +140,9 @@ class KBucket: def remove_peer(self, peer: 'KademliaPeer') -> None: self.peers.remove(peer) self.peer_in_routing_table_metric.labels("global").dec() + if peer.node_id[0] == self._node_id[0]: + bits_colliding = 8 - (peer.node_id[1] ^ self._node_id[1]).bit_length() + self.peer_with_x_bit_colliding_metric.labels(amount=(bits_colliding + 8)).dec() def key_in_range(self, key: bytes) -> bool: """ Tests whether the specified key (i.e. node ID) is in the range From fd9dcbf9a8e79db1deac99da61d0b89f75d01833 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 22 Dec 2021 17:53:24 -0300 Subject: [PATCH 19/22] add granular metric for stored blob prefix, for network announcements calculation --- lbry/dht/node.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/lbry/dht/node.py b/lbry/dht/node.py index d9217529b..345662460 100644 --- a/lbry/dht/node.py +++ b/lbry/dht/node.py @@ -24,6 +24,10 @@ class Node: "storing_peers", "Number of peers storing blobs announced to this node", namespace="dht_node", labelnames=("scope",), ) + stored_blob_with_x_bytes_colliding = Gauge( + "stored_blobs_x_bytes_colliding", "Number of blobs with at least X bytes colliding with this node id prefix", + namespace="dht_node", labelnames=("amount",) + ) def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int, internal_udp_port: int, peer_port: int, external_ip: str, rpc_timeout: float = constants.RPC_TIMEOUT, split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX, @@ -55,6 +59,15 @@ class Node: self.storing_peers_metric.labels("global").set(len(storing_peers)) total_peers.extend(storing_peers) + counts = {0: 0, 1: 0, 2: 0} + node_id = self.protocol.node_id + for blob_hash in self.protocol.data_store.keys(): + bytes_colliding = 0 if blob_hash[0] != node_id[0] else 2 if blob_hash[1] == node_id[1] else 1 + counts[bytes_colliding] += 1 + self.stored_blob_with_x_bytes_colliding.labels(amount=0).set(counts[0]) + self.stored_blob_with_x_bytes_colliding.labels(amount=1).set(counts[1]) + self.stored_blob_with_x_bytes_colliding.labels(amount=2).set(counts[2]) + # get ids falling in the midpoint of each bucket that hasn't been recently updated node_ids = self.protocol.routing_table.get_refresh_list(0, True) # if we have 3 or fewer populated buckets get two random ids in the range of each to try and From 8a1a1a40002405c516902c389105fe374f904b48 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 24 Dec 2021 10:54:03 -0300 Subject: [PATCH 20/22] remove estimation endpoints as that is done over prometheus metrics now --- scripts/dht_node.py | 26 -------------------------- 1 file changed, 26 deletions(-) diff --git a/scripts/dht_node.py b/scripts/dht_node.py index 7343b0979..406aaf569 100644 --- a/scripts/dht_node.py +++ b/scripts/dht_node.py @@ -22,7 +22,6 @@ class SimpleMetrics: def __init__(self, port, node): self.prometheus_port = port self.dht_node: Node = node - self.active_estimation_semaphore = asyncio.Semaphore(1) async def handle_metrics_get_request(self, _): try: @@ -50,36 +49,11 @@ class SimpleMetrics: writer.writerow({"blob_hash": blob.hex()}) return web.Response(text=out.getvalue(), content_type='text/csv') - async def active_estimation(self, _): - # - "crawls" the network for peers close to our node id (not a full aggressive crawler yet) - # given everything is random, the odds of a peer having the same X prefix bits matching ours is roughly 1/(2^X) - # we use that to estimate the network size, see issue #3463 for related papers and details - amount = 20_000 - async with self.active_estimation_semaphore: # this is resource intensive, limit concurrency to 1 - peers = await self.dht_node.peer_search(self.dht_node.protocol.node_id, count=amount, max_results=amount) - close_ids = [peer for peer in peers if peer.node_id[0] == self.dht_node.protocol.node_id[0]] - return web.json_response( - {"total_peers_found_during_estimation": len(peers), - "peers_with_the_same_byte_prefix": len(close_ids), - 'estimated_network_size': len(close_ids) * 256}) - - async def passive_estimation(self, _): - # same method as above but instead we use the routing table and assume our implementation was able to add - # all the reachable close peers, which should be usable for seed nodes since they are super popular - peers = self.dht_node.protocol.routing_table.get_peers() - close_ids = [peer for peer in peers if peer.node_id[0] == self.dht_node.protocol.node_id[0]] - return web.json_response( - {"total_peers_found_during_estimation": len(peers), - "peers_with_the_same_byte_prefix": len(close_ids), - 'estimated_network_size': len(close_ids) * 256}) - async def start(self): prom_app = web.Application() prom_app.router.add_get('/metrics', self.handle_metrics_get_request) prom_app.router.add_get('/peers.csv', self.handle_peers_csv) prom_app.router.add_get('/blobs.csv', self.handle_blobs_csv) - prom_app.router.add_get('/active_estimation', self.active_estimation) - prom_app.router.add_get('/passive_estimation', self.passive_estimation) metrics_runner = web.AppRunner(prom_app) await metrics_runner.setup() prom_site = web.TCPSite(metrics_runner, "0.0.0.0", self.prometheus_port) From 8e6fa3490ca0509b134de04f199dfbdea055b23b Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 24 Dec 2021 11:47:33 -0300 Subject: [PATCH 21/22] disable CSV endpoints by default --- scripts/dht_node.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/scripts/dht_node.py b/scripts/dht_node.py index 406aaf569..3dd6dca82 100644 --- a/scripts/dht_node.py +++ b/scripts/dht_node.py @@ -52,15 +52,16 @@ class SimpleMetrics: async def start(self): prom_app = web.Application() prom_app.router.add_get('/metrics', self.handle_metrics_get_request) - prom_app.router.add_get('/peers.csv', self.handle_peers_csv) - prom_app.router.add_get('/blobs.csv', self.handle_blobs_csv) + if self.dht_node: + prom_app.router.add_get('/peers.csv', self.handle_peers_csv) + prom_app.router.add_get('/blobs.csv', self.handle_blobs_csv) metrics_runner = web.AppRunner(prom_app) await metrics_runner.setup() prom_site = web.TCPSite(metrics_runner, "0.0.0.0", self.prometheus_port) await prom_site.start() -async def main(host: str, port: int, db_file_path: str, bootstrap_node: Optional[str], prometheus_port: int): +async def main(host: str, port: int, db_file_path: str, bootstrap_node: Optional[str], prometheus_port: int, export: bool): loop = asyncio.get_event_loop() conf = Config() if not db_file_path.startswith(':memory:'): @@ -85,7 +86,7 @@ async def main(host: str, port: int, db_file_path: str, bootstrap_node: Optional storage=storage ) if prometheus_port > 0: - metrics = SimpleMetrics(prometheus_port, node) + metrics = SimpleMetrics(prometheus_port, node if export else None) await metrics.start() node.start(host, nodes) log.info("Peer with id %s started", node_id.hex()) @@ -105,6 +106,7 @@ if __name__ == '__main__': parser.add_argument("--bootstrap_node", default=None, type=str, help="Node to connect for bootstraping this node. Leave unset to use the default ones. " "Format: host:port Example: lbrynet1.lbry.com:4444") - parser.add_argument("--metrics_port", default=0, type=int, help="Port for Prometheus and raw CSV metrics. 0 to disable. Default: 0") + parser.add_argument("--metrics_port", default=0, type=int, help="Port for Prometheus metrics. 0 to disable. Default: 0") + parser.add_argument("--enable_csv_export", action='store_true', help="Enable CSV endpoints on metrics server.") args = parser.parse_args() - asyncio.run(main(args.host, args.port, args.db_file, args.bootstrap_node, args.metrics_port)) + asyncio.run(main(args.host, args.port, args.db_file, args.bootstrap_node, args.metrics_port, args.enable_csv_export)) From 0618053bd42035781031495850209b9b216f5127 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 12 Jan 2022 12:41:04 -0300 Subject: [PATCH 22/22] remove request_flight metric --- lbry/dht/protocol/protocol.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/lbry/dht/protocol/protocol.py b/lbry/dht/protocol/protocol.py index 805d567de..6741fa828 100644 --- a/lbry/dht/protocol/protocol.py +++ b/lbry/dht/protocol/protocol.py @@ -276,10 +276,6 @@ class KademliaProtocol(DatagramProtocol): "request_success", "Number of successful requests", namespace="dht_node", labelnames=("method",), ) - request_flight_metric = Gauge( - "request_flight", "Number of ongoing requests", namespace="dht_node", - labelnames=("method",), - ) request_error_metric = Counter( "request_error", "Number of errors returned from request to other peers", namespace="dht_node", labelnames=("method",), @@ -614,7 +610,6 @@ class KademliaProtocol(DatagramProtocol): response_fut = self.sent_messages[request.rpc_id][1] try: self.request_sent_metric.labels(method=request.method).inc() - self.request_flight_metric.labels(method=request.method).inc() start = time.perf_counter() response = await asyncio.wait_for(response_fut, self.rpc_timeout) self.response_time_metric.labels(method=request.method).observe(time.perf_counter() - start) @@ -631,8 +626,6 @@ class KademliaProtocol(DatagramProtocol): if self.peer_manager.peer_is_good(peer) is False: self.remove_peer(peer) raise - finally: - self.request_flight_metric.labels(method=request.method).dec() def send_response(self, peer: 'KademliaPeer', response: ResponseDatagram): self._send(peer, response)