diff --git a/lbry/dht/node.py b/lbry/dht/node.py index 300c1a774..345662460 100644 --- a/lbry/dht/node.py +++ b/lbry/dht/node.py @@ -2,6 +2,9 @@ import logging import asyncio import typing import socket + +from prometheus_client import Gauge + from lbry.utils import resolve_host from lbry.dht import constants from lbry.dht.peer import make_kademlia_peer @@ -17,6 +20,14 @@ log = logging.getLogger(__name__) class Node: + storing_peers_metric = Gauge( + "storing_peers", "Number of peers storing blobs announced to this node", namespace="dht_node", + labelnames=("scope",), + ) + stored_blob_with_x_bytes_colliding = Gauge( + "stored_blobs_x_bytes_colliding", "Number of blobs with at least X bytes colliding with this node id prefix", + namespace="dht_node", labelnames=("amount",) + ) def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int, internal_udp_port: int, peer_port: int, external_ip: str, rpc_timeout: float = constants.RPC_TIMEOUT, split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX, @@ -44,7 +55,18 @@ class Node: # add all peers in the routing table total_peers.extend(self.protocol.routing_table.get_peers()) # add all the peers who have announced blobs to us - total_peers.extend(self.protocol.data_store.get_storing_contacts()) + storing_peers = self.protocol.data_store.get_storing_contacts() + self.storing_peers_metric.labels("global").set(len(storing_peers)) + total_peers.extend(storing_peers) + + counts = {0: 0, 1: 0, 2: 0} + node_id = self.protocol.node_id + for blob_hash in self.protocol.data_store.keys(): + bytes_colliding = 0 if blob_hash[0] != node_id[0] else 2 if blob_hash[1] == node_id[1] else 1 + counts[bytes_colliding] += 1 + self.stored_blob_with_x_bytes_colliding.labels(amount=0).set(counts[0]) + self.stored_blob_with_x_bytes_colliding.labels(amount=1).set(counts[1]) + self.stored_blob_with_x_bytes_colliding.labels(amount=2).set(counts[2]) # get ids falling in the midpoint of each bucket that hasn't been recently updated node_ids = self.protocol.routing_table.get_refresh_list(0, True) diff --git a/lbry/dht/peer.py b/lbry/dht/peer.py index ae4fe2191..9413eec94 100644 --- a/lbry/dht/peer.py +++ b/lbry/dht/peer.py @@ -3,6 +3,9 @@ import asyncio import logging from dataclasses import dataclass, field from functools import lru_cache + +from prometheus_client import Gauge + from lbry.utils import is_valid_public_ipv4 as _is_valid_public_ipv4, LRUCache from lbry.dht import constants from lbry.dht.serialization.datagram import make_compact_address, make_compact_ip, decode_compact_address @@ -26,6 +29,10 @@ def is_valid_public_ipv4(address, allow_localhost: bool = False): class PeerManager: + peer_manager_keys_metric = Gauge( + "peer_manager_keys", "Number of keys tracked by PeerManager dicts (sum)", namespace="dht_node", + labelnames=("scope",) + ) def __init__(self, loop: asyncio.AbstractEventLoop): self._loop = loop self._rpc_failures: typing.Dict[ @@ -38,6 +45,11 @@ class PeerManager: self._node_id_reverse_mapping: typing.Dict[bytes, typing.Tuple[str, int]] = LRUCache(CACHE_SIZE) self._node_tokens: typing.Dict[bytes, (float, bytes)] = LRUCache(CACHE_SIZE) + def count_cache_keys(self): + return len(self._rpc_failures) + len(self._last_replied) + len(self._last_sent) + len( + self._last_requested) + len(self._node_id_mapping) + len(self._node_id_reverse_mapping) + len( + self._node_tokens) + def reset(self): for statistic in (self._rpc_failures, self._last_replied, self._last_sent, self._last_requested): statistic.clear() @@ -86,6 +98,7 @@ class PeerManager: self._node_id_mapping.pop(self._node_id_reverse_mapping.pop(node_id)) self._node_id_mapping[(address, udp_port)] = node_id self._node_id_reverse_mapping[node_id] = (address, udp_port) + self.peer_manager_keys_metric.labels("global").set(self.count_cache_keys()) def prune(self): # TODO: periodically call this now = self._loop.time() diff --git a/lbry/dht/protocol/protocol.py b/lbry/dht/protocol/protocol.py index 66165740b..6741fa828 100644 --- a/lbry/dht/protocol/protocol.py +++ b/lbry/dht/protocol/protocol.py @@ -3,11 +3,14 @@ import socket import functools import hashlib import asyncio +import time import typing import random from asyncio.protocols import DatagramProtocol from asyncio.transports import DatagramTransport +from prometheus_client import Gauge, Counter, Histogram + from lbry.dht import constants from lbry.dht.serialization.bencoding import DecodeError from lbry.dht.serialization.datagram import decode_datagram, ErrorDatagram, ResponseDatagram, RequestDatagram @@ -30,6 +33,11 @@ OLD_PROTOCOL_ERRORS = { class KademliaRPC: + stored_blob_metric = Gauge( + "stored_blobs", "Number of blobs announced by other peers", namespace="dht_node", + labelnames=("scope",), + ) + def __init__(self, protocol: 'KademliaProtocol', loop: asyncio.AbstractEventLoop, peer_port: int = 3333): self.protocol = protocol self.loop = loop @@ -61,6 +69,7 @@ class KademliaRPC: self.protocol.data_store.add_peer_to_blob( rpc_contact, blob_hash ) + self.stored_blob_metric.labels("global").set(len(self.protocol.data_store)) return b'OK' def find_node(self, rpc_contact: 'KademliaPeer', key: bytes) -> typing.List[typing.Tuple[bytes, str, int]]: @@ -259,6 +268,30 @@ class PingQueue: class KademliaProtocol(DatagramProtocol): + request_sent_metric = Counter( + "request_sent", "Number of requests send from DHT RPC protocol", namespace="dht_node", + labelnames=("method",), + ) + request_success_metric = Counter( + "request_success", "Number of successful requests", namespace="dht_node", + labelnames=("method",), + ) + request_error_metric = Counter( + "request_error", "Number of errors returned from request to other peers", namespace="dht_node", + labelnames=("method",), + ) + HISTOGRAM_BUCKETS = ( + .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 3.0, 3.5, 4.0, 4.50, 5.0, 5.50, 6.0, float('inf') + ) + response_time_metric = Histogram( + "response_time", "Response times of DHT RPC requests", namespace="dht_node", buckets=HISTOGRAM_BUCKETS, + labelnames=("method",) + ) + received_request_metric = Counter( + "received_request", "Number of received DHT RPC requests", namespace="dht_node", + labelnames=("method",), + ) + def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, external_ip: str, udp_port: int, peer_port: int, rpc_timeout: float = constants.RPC_TIMEOUT, split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX): @@ -447,6 +480,7 @@ class KademliaProtocol(DatagramProtocol): def handle_request_datagram(self, address: typing.Tuple[str, int], request_datagram: RequestDatagram): # This is an RPC method request + self.received_request_metric.labels(method=request_datagram.method).inc() self.peer_manager.report_last_requested(address[0], address[1]) try: peer = self.routing_table.get_peer(request_datagram.node_id) @@ -575,14 +609,19 @@ class KademliaProtocol(DatagramProtocol): self._send(peer, request) response_fut = self.sent_messages[request.rpc_id][1] try: + self.request_sent_metric.labels(method=request.method).inc() + start = time.perf_counter() response = await asyncio.wait_for(response_fut, self.rpc_timeout) + self.response_time_metric.labels(method=request.method).observe(time.perf_counter() - start) self.peer_manager.report_last_replied(peer.address, peer.udp_port) + self.request_success_metric.labels(method=request.method).inc() return response except asyncio.CancelledError: if not response_fut.done(): response_fut.cancel() raise except (asyncio.TimeoutError, RemoteException): + self.request_error_metric.labels(method=request.method).inc() self.peer_manager.report_failure(peer.address, peer.udp_port) if self.peer_manager.peer_is_good(peer) is False: self.remove_peer(peer) diff --git a/lbry/dht/protocol/routing_table.py b/lbry/dht/protocol/routing_table.py index 4fea1266f..344158a95 100644 --- a/lbry/dht/protocol/routing_table.py +++ b/lbry/dht/protocol/routing_table.py @@ -4,6 +4,8 @@ import logging import typing import itertools +from prometheus_client import Gauge + from lbry.dht import constants from lbry.dht.protocol.distance import Distance if typing.TYPE_CHECKING: @@ -13,8 +15,17 @@ log = logging.getLogger(__name__) class KBucket: - """ Description - later """ + Kademlia K-bucket implementation. + """ + peer_in_routing_table_metric = Gauge( + "peers_in_routing_table", "Number of peers on routing table", namespace="dht_node", + labelnames=("scope",) + ) + peer_with_x_bit_colliding_metric = Gauge( + "peer_x_bit_colliding", "Number of peers with at least X bits colliding with this node id", + namespace="dht_node", labelnames=("amount",) + ) def __init__(self, peer_manager: 'PeerManager', range_min: int, range_max: int, node_id: bytes): """ @@ -58,6 +69,10 @@ class KBucket: return True if len(self.peers) < constants.K: self.peers.append(peer) + self.peer_in_routing_table_metric.labels("global").inc() + if peer.node_id[0] == self._node_id[0]: + bits_colliding = 8 - (peer.node_id[1] ^ self._node_id[1]).bit_length() + self.peer_with_x_bit_colliding_metric.labels(amount=(bits_colliding + 8)).inc() return True else: return False @@ -124,6 +139,10 @@ class KBucket: def remove_peer(self, peer: 'KademliaPeer') -> None: self.peers.remove(peer) + self.peer_in_routing_table_metric.labels("global").dec() + if peer.node_id[0] == self._node_id[0]: + bits_colliding = 8 - (peer.node_id[1] ^ self._node_id[1]).bit_length() + self.peer_with_x_bit_colliding_metric.labels(amount=(bits_colliding + 8)).dec() def key_in_range(self, key: bytes) -> bool: """ Tests whether the specified key (i.e. node ID) is in the range @@ -162,6 +181,10 @@ class TreeRoutingTable: ping RPC-based k-bucket eviction algorithm described in section 2.2 of that paper. """ + bucket_in_routing_table_metric = Gauge( + "buckets_in_routing_table", "Number of buckets on routing table", namespace="dht_node", + labelnames=("scope",) + ) def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', parent_node_id: bytes, split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX): @@ -279,6 +302,7 @@ class TreeRoutingTable: # ...and remove them from the old bucket for contact in new_bucket.peers: old_bucket.remove_peer(contact) + self.bucket_in_routing_table_metric.labels("global").set(len(self.buckets)) def join_buckets(self): if len(self.buckets) == 1: @@ -302,6 +326,7 @@ class TreeRoutingTable: elif can_go_higher: self.buckets[bucket_index_to_pop + 1].range_min = bucket.range_min self.buckets.remove(bucket) + self.bucket_in_routing_table_metric.labels("global").set(len(self.buckets)) return self.join_buckets() def contact_in_routing_table(self, address_tuple: typing.Tuple[str, int]) -> bool: diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index c67a5b7e4..195564c54 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -5149,10 +5149,12 @@ class Daemon(metaclass=JSONRPCServerType): ] }, "node_id": (str) the local dht node id + "prefix_neighbors_count": (int) the amount of peers sharing the same byte prefix of the local node id } """ result = { - 'buckets': {} + 'buckets': {}, + 'prefix_neighbors_count': 0 } for i, _ in enumerate(self.dht_node.protocol.routing_table.buckets): @@ -5165,6 +5167,7 @@ class Daemon(metaclass=JSONRPCServerType): "node_id": hexlify(peer.node_id).decode(), } result['buckets'][i].append(host) + result['prefix_neighbors_count'] += 1 if peer.node_id[0] == self.dht_node.protocol.node_id[0] else 0 result['node_id'] = hexlify(self.dht_node.protocol.node_id).decode() return result diff --git a/scripts/dht_node.py b/scripts/dht_node.py index 360e00eac..3dd6dca82 100644 --- a/scripts/dht_node.py +++ b/scripts/dht_node.py @@ -2,10 +2,11 @@ import asyncio import argparse import logging import csv +import os.path from io import StringIO from typing import Optional from aiohttp import web -from prometheus_client import generate_latest as prom_generate_latest, Gauge +from prometheus_client import generate_latest as prom_generate_latest from lbry.dht.constants import generate_id from lbry.dht.node import Node @@ -15,14 +16,6 @@ from lbry.conf import Config logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s") log = logging.getLogger(__name__) -BLOBS_STORED = Gauge( - "blobs_stored", "Number of blob info received", namespace="dht_node", - labelnames=("method",) -) -PEERS = Gauge( - "known_peers", "Number of peers on routing table", namespace="dht_node", - labelnames=("method",) -) class SimpleMetrics: @@ -30,7 +23,7 @@ class SimpleMetrics: self.prometheus_port = port self.dht_node: Node = node - async def handle_metrics_get_request(self, request: web.Request): + async def handle_metrics_get_request(self, _): try: return web.Response( text=prom_generate_latest().decode(), @@ -40,7 +33,7 @@ class SimpleMetrics: log.exception('could not generate prometheus data') raise - async def handle_peers_csv(self, request: web.Request): + async def handle_peers_csv(self, _): out = StringIO() writer = csv.DictWriter(out, fieldnames=["ip", "port", "dht_id"]) writer.writeheader() @@ -48,7 +41,7 @@ class SimpleMetrics: writer.writerow({"ip": peer.address, "port": peer.udp_port, "dht_id": peer.node_id.hex()}) return web.Response(text=out.getvalue(), content_type='text/csv') - async def handle_blobs_csv(self, request: web.Request): + async def handle_blobs_csv(self, _): out = StringIO() writer = csv.DictWriter(out, fieldnames=["blob_hash"]) writer.writeheader() @@ -59,17 +52,28 @@ class SimpleMetrics: async def start(self): prom_app = web.Application() prom_app.router.add_get('/metrics', self.handle_metrics_get_request) - prom_app.router.add_get('/peers.csv', self.handle_peers_csv) - prom_app.router.add_get('/blobs.csv', self.handle_blobs_csv) + if self.dht_node: + prom_app.router.add_get('/peers.csv', self.handle_peers_csv) + prom_app.router.add_get('/blobs.csv', self.handle_blobs_csv) metrics_runner = web.AppRunner(prom_app) await metrics_runner.setup() prom_site = web.TCPSite(metrics_runner, "0.0.0.0", self.prometheus_port) await prom_site.start() -async def main(host: str, port: int, db_file_path: str, bootstrap_node: Optional[str], prometheus_port: int): +async def main(host: str, port: int, db_file_path: str, bootstrap_node: Optional[str], prometheus_port: int, export: bool): loop = asyncio.get_event_loop() conf = Config() + if not db_file_path.startswith(':memory:'): + node_id_file_path = db_file_path + 'node_id' + if os.path.exists(node_id_file_path): + with open(node_id_file_path, 'rb') as node_id_file: + node_id = node_id_file.read() + else: + with open(node_id_file_path, 'wb') as node_id_file: + node_id = generate_id() + node_id_file.write(node_id) + storage = SQLiteStorage(conf, db_file_path, loop, loop.time) if bootstrap_node: nodes = bootstrap_node.split(':') @@ -78,17 +82,16 @@ async def main(host: str, port: int, db_file_path: str, bootstrap_node: Optional nodes = conf.known_dht_nodes await storage.open() node = Node( - loop, PeerManager(loop), generate_id(), port, port, 3333, None, + loop, PeerManager(loop), node_id, port, port, 3333, None, storage=storage ) if prometheus_port > 0: - metrics = SimpleMetrics(prometheus_port, node) + metrics = SimpleMetrics(prometheus_port, node if export else None) await metrics.start() node.start(host, nodes) + log.info("Peer with id %s started", node_id.hex()) while True: await asyncio.sleep(10) - PEERS.labels('main').set(len(node.protocol.routing_table.get_peers())) - BLOBS_STORED.labels('main').set(len(node.protocol.data_store.get_storing_contacts())) log.info("Known peers: %d. Storing contact information for %d blobs from %d peers.", len(node.protocol.routing_table.get_peers()), len(node.protocol.data_store), len(node.protocol.data_store.get_storing_contacts())) @@ -103,6 +106,7 @@ if __name__ == '__main__': parser.add_argument("--bootstrap_node", default=None, type=str, help="Node to connect for bootstraping this node. Leave unset to use the default ones. " "Format: host:port Example: lbrynet1.lbry.com:4444") - parser.add_argument("--metrics_port", default=0, type=int, help="Port for Prometheus and raw CSV metrics. 0 to disable. Default: 0") + parser.add_argument("--metrics_port", default=0, type=int, help="Port for Prometheus metrics. 0 to disable. Default: 0") + parser.add_argument("--enable_csv_export", action='store_true', help="Enable CSV endpoints on metrics server.") args = parser.parse_args() - asyncio.run(main(args.host, args.port, args.db_file, args.bootstrap_node, args.metrics_port)) + asyncio.run(main(args.host, args.port, args.db_file, args.bootstrap_node, args.metrics_port, args.enable_csv_export))