From 46f576de46e6101d67180e88dccb98ed462b51aa Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 13 Dec 2021 04:29:29 -0300 Subject: [PATCH] add request received --- lbry/dht/protocol/protocol.py | 18 ++++++++++++++---- lbry/dht/protocol/routing_table.py | 12 ++++++------ 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/lbry/dht/protocol/protocol.py b/lbry/dht/protocol/protocol.py index 6a13b5346..bd89f0426 100644 --- a/lbry/dht/protocol/protocol.py +++ b/lbry/dht/protocol/protocol.py @@ -33,7 +33,7 @@ OLD_PROTOCOL_ERRORS = { class KademliaRPC: - stored_blobs_metric = Gauge( + stored_blob_metric = Gauge( "stored_blobs", "Number of blobs announced by other peers", namespace="dht_node", labelnames=("scope",), ) @@ -69,7 +69,7 @@ class KademliaRPC: self.protocol.data_store.add_peer_to_blob( rpc_contact, blob_hash ) - self.stored_blobs_metric.labels("global").set(len(self.protocol.data_store)) + self.stored_blob_metric.labels("global").set(len(self.protocol.data_store)) return b'OK' def find_node(self, rpc_contact: 'KademliaPeer', key: bytes) -> typing.List[typing.Tuple[bytes, str, int]]: @@ -268,10 +268,14 @@ class PingQueue: class KademliaProtocol(DatagramProtocol): - requests_sent_metric = Counter( + request_sent_metric = Counter( "request_sent", "Number of requests send from DHT RPC protocol", namespace="dht_node", labelnames=("method",), ) + request_success_metric = Counter( + "request_success", "Number of successful requests", namespace="dht_node", + labelnames=("method",), + ) HISTOGRAM_BUCKETS = ( .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 3.0, 3.5, 4.0, 4.50, 5.0, 5.50, 6.0, float('inf') ) @@ -279,6 +283,10 @@ class KademliaProtocol(DatagramProtocol): "response_time", "Response times of DHT RPC requests", namespace="dht_node", buckets=HISTOGRAM_BUCKETS, labelnames=("method",) ) + received_request_metric = Counter( + "received_request", "Number of received DHT RPC requests", namespace="dht_node", + labelnames=("method",), + ) def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, external_ip: str, udp_port: int, peer_port: int, rpc_timeout: float = constants.RPC_TIMEOUT, @@ -468,6 +476,7 @@ class KademliaProtocol(DatagramProtocol): def handle_request_datagram(self, address: typing.Tuple[str, int], request_datagram: RequestDatagram): # This is an RPC method request + self.received_request_metric.labels(method=request_datagram.method).inc() self.peer_manager.report_last_requested(address[0], address[1]) try: peer = self.routing_table.get_peer(request_datagram.node_id) @@ -596,11 +605,12 @@ class KademliaProtocol(DatagramProtocol): self._send(peer, request) response_fut = self.sent_messages[request.rpc_id][1] try: - self.requests_sent_metric.labels(method=request.method).inc() + self.request_sent_metric.labels(method=request.method).inc() start = time.perf_counter() response = await asyncio.wait_for(response_fut, self.rpc_timeout) self.response_time_metric.labels(method=request.method).observe(time.perf_counter() - start) self.peer_manager.report_last_replied(peer.address, peer.udp_port) + self.request_success_metric.labels(method=request.method).inc() return response except asyncio.CancelledError: if not response_fut.done(): diff --git a/lbry/dht/protocol/routing_table.py b/lbry/dht/protocol/routing_table.py index 1dcfc9db0..4515b8adc 100644 --- a/lbry/dht/protocol/routing_table.py +++ b/lbry/dht/protocol/routing_table.py @@ -18,7 +18,7 @@ class KBucket: """ Kademlia K-bucket implementation. """ - peers_in_routing_table_metric = Gauge( + peer_in_routing_table_metric = Gauge( "peers_in_routing_table", "Number of peers on routing table", namespace="dht_node", labelnames=("scope",) ) @@ -65,7 +65,7 @@ class KBucket: return True if len(self.peers) < constants.K: self.peers.append(peer) - self.peers_in_routing_table_metric.labels("global").inc() + self.peer_in_routing_table_metric.labels("global").inc() return True else: return False @@ -132,7 +132,7 @@ class KBucket: def remove_peer(self, peer: 'KademliaPeer') -> None: self.peers.remove(peer) - self.peers_in_routing_table_metric.labels("global").dec() + self.peer_in_routing_table_metric.labels("global").dec() def key_in_range(self, key: bytes) -> bool: """ Tests whether the specified key (i.e. node ID) is in the range @@ -171,7 +171,7 @@ class TreeRoutingTable: ping RPC-based k-bucket eviction algorithm described in section 2.2 of that paper. """ - buckets_in_routing_table_metric = Gauge( + bucket_in_routing_table_metric = Gauge( "buckets_in_routing_table", "Number of buckets on routing table", namespace="dht_node", labelnames=("scope",) ) @@ -292,7 +292,7 @@ class TreeRoutingTable: # ...and remove them from the old bucket for contact in new_bucket.peers: old_bucket.remove_peer(contact) - self.buckets_in_routing_table_metric.labels("global").set(len(self.buckets)) + self.bucket_in_routing_table_metric.labels("global").set(len(self.buckets)) def join_buckets(self): if len(self.buckets) == 1: @@ -316,7 +316,7 @@ class TreeRoutingTable: elif can_go_higher: self.buckets[bucket_index_to_pop + 1].range_min = bucket.range_min self.buckets.remove(bucket) - self.buckets_in_routing_table_metric.labels("global").set(len(self.buckets)) + self.bucket_in_routing_table_metric.labels("global").set(len(self.buckets)) return self.join_buckets() def contact_in_routing_table(self, address_tuple: typing.Tuple[str, int]) -> bool: