diff --git a/lbry/conf.py b/lbry/conf.py index 40eaabce2..3f868f643 100644 --- a/lbry/conf.py +++ b/lbry/conf.py @@ -511,7 +511,7 @@ class Config(CLIConfig): download_timeout = Float("Cumulative timeout for a stream to begin downloading before giving up", 30.0) blob_download_timeout = Float("Timeout to download a blob from a peer", 30.0) peer_connect_timeout = Float("Timeout to establish a TCP connection to a peer", 3.0) - node_rpc_timeout = Float("Timeout when making a DHT request", constants.rpc_timeout) + node_rpc_timeout = Float("Timeout when making a DHT request", constants.RPC_TIMEOUT) # blob announcement and download save_blobs = Toggle("Save encrypted blob files for hosting, otherwise download blobs to memory only.", True) diff --git a/lbry/dht/constants.py b/lbry/dht/constants.py index a9754c2b6..07dcec18a 100644 --- a/lbry/dht/constants.py +++ b/lbry/dht/constants.py @@ -1,31 +1,31 @@ import hashlib import os -hash_class = hashlib.sha384 -hash_length = hash_class().digest_size -hash_bits = hash_length * 8 -alpha = 5 -k = 8 -split_buckets_under_index = 1 -replacement_cache_size = 8 -rpc_timeout = 5.0 -rpc_attempts = 5 -rpc_attempts_pruning_window = 600 -iterative_lookup_delay = rpc_timeout / 2.0 # TODO: use config val / 2 if rpc timeout is provided -refresh_interval = 3600 # 1 hour -replicate_interval = refresh_interval -data_expiration = 86400 # 24 hours -token_secret_refresh_interval = 300 # 5 minutes -maybe_ping_delay = 300 # 5 minutes -check_refresh_interval = refresh_interval / 5 -rpc_id_length = 20 -protocol_version = 1 -bottom_out_limit = 3 -msg_size_limit = 1400 +HASH_CLASS = hashlib.sha384 # pylint: disable=invalid-name +HASH_LENGTH = HASH_CLASS().digest_size +HASH_BITS = HASH_LENGTH * 8 +ALPHA = 5 +K = 8 +SPLIT_BUCKETS_UNDER_INDEX = 1 +REPLACEMENT_CACHE_SIZE = 8 +RPC_TIMEOUT = 5.0 +RPC_ATTEMPTS = 5 +RPC_ATTEMPTS_PRUNING_WINDOW = 600 +ITERATIVE_LOOKUP_DELAY = RPC_TIMEOUT / 2.0 # TODO: use config val / 2 if rpc timeout is provided +REFRESH_INTERVAL = 3600 # 1 hour +REPLICATE_INTERVAL = REFRESH_INTERVAL +DATA_EXPIRATION = 86400 # 24 hours +TOKEN_SECRET_REFRESH_INTERVAL = 300 # 5 minutes +MAYBE_PING_DELAY = 300 # 5 minutes +CHECK_REFRESH_INTERVAL = REFRESH_INTERVAL / 5 +RPC_ID_LENGTH = 20 +PROTOCOL_VERSION = 1 +BOTTOM_OUT_LIMIT = 3 +MSG_SIZE_LIMIT = 1400 def digest(data: bytes) -> bytes: - h = hash_class() + h = HASH_CLASS() h.update(data) return h.digest() @@ -38,4 +38,4 @@ def generate_id(num=None) -> bytes: def generate_rpc_id(num=None) -> bytes: - return generate_id(num)[:rpc_id_length] + return generate_id(num)[:RPC_ID_LENGTH] diff --git a/lbry/dht/node.py b/lbry/dht/node.py index 9f439e25d..31dc10bac 100644 --- a/lbry/dht/node.py +++ b/lbry/dht/node.py @@ -19,8 +19,8 @@ log = logging.getLogger(__name__) class Node: def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int, - internal_udp_port: int, peer_port: int, external_ip: str, rpc_timeout: float = constants.rpc_timeout, - split_buckets_under_index: int = constants.split_buckets_under_index, + internal_udp_port: int, peer_port: int, external_ip: str, rpc_timeout: float = constants.RPC_TIMEOUT, + split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX, storage: typing.Optional['SQLiteStorage'] = None): self.loop = loop self.internal_udp_port = internal_udp_port @@ -62,7 +62,7 @@ class Node: if force_once: break fut = asyncio.Future(loop=self.loop) - self.loop.call_later(constants.refresh_interval // 4, fut.set_result, None) + self.loop.call_later(constants.REFRESH_INTERVAL // 4, fut.set_result, None) await fut continue @@ -76,12 +76,12 @@ class Node: break fut = asyncio.Future(loop=self.loop) - self.loop.call_later(constants.refresh_interval, fut.set_result, None) + self.loop.call_later(constants.REFRESH_INTERVAL, fut.set_result, None) await fut async def announce_blob(self, blob_hash: str) -> typing.List[bytes]: hash_value = binascii.unhexlify(blob_hash.encode()) - assert len(hash_value) == constants.hash_length + assert len(hash_value) == constants.HASH_LENGTH peers = await self.peer_search(hash_value) if not self.protocol.external_ip: @@ -175,8 +175,8 @@ class Node: self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls)) def get_iterative_node_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None, - bottom_out_limit: int = constants.bottom_out_limit, - max_results: int = constants.k) -> IterativeNodeFinder: + bottom_out_limit: int = constants.BOTTOM_OUT_LIMIT, + max_results: int = constants.K) -> IterativeNodeFinder: return IterativeNodeFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol, key, bottom_out_limit, max_results, None, shortlist) @@ -188,7 +188,7 @@ class Node: return IterativeValueFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol, key, bottom_out_limit, max_results, None, shortlist) - async def peer_search(self, node_id: bytes, count=constants.k, max_results=constants.k*2, + async def peer_search(self, node_id: bytes, count=constants.K, max_results=constants.K * 2, bottom_out_limit=20, shortlist: typing.Optional[typing.List['KademliaPeer']] = None ) -> typing.List['KademliaPeer']: peers = [] diff --git a/lbry/dht/peer.py b/lbry/dht/peer.py index 0e302a6b9..1958447b9 100644 --- a/lbry/dht/peer.py +++ b/lbry/dht/peer.py @@ -85,7 +85,7 @@ class PeerManager: def get_node_token(self, node_id: bytes) -> typing.Optional[bytes]: ts, token = self._node_tokens.get(node_id, (0, None)) - if ts and ts > self._loop.time() - constants.token_secret_refresh_interval: + if ts and ts > self._loop.time() - constants.TOKEN_SECRET_REFRESH_INTERVAL: return token def get_last_replied(self, address: str, udp_port: int) -> typing.Optional[float]: @@ -108,13 +108,13 @@ class PeerManager: now = self._loop.time() to_pop = [] for (address, udp_port), (_, last_failure) in self._rpc_failures.items(): - if last_failure and last_failure < now - constants.rpc_attempts_pruning_window: + if last_failure and last_failure < now - constants.RPC_ATTEMPTS_PRUNING_WINDOW: to_pop.append((address, udp_port)) while to_pop: del self._rpc_failures[to_pop.pop()] to_pop = [] for node_id, (age, token) in self._node_tokens.items(): - if age < now - constants.token_secret_refresh_interval: + if age < now - constants.TOKEN_SECRET_REFRESH_INTERVAL: to_pop.append(node_id) while to_pop: del self._node_tokens[to_pop.pop()] @@ -124,7 +124,7 @@ class PeerManager: :return: False if peer is bad, None if peer is unknown, or True if peer is good """ - delay = self._loop.time() - constants.check_refresh_interval + delay = self._loop.time() - constants.CHECK_REFRESH_INTERVAL # fixme: find a way to re-enable that without breaking other parts # if node_id not in self._node_id_reverse_mapping or (address, udp_port) not in self._node_id_mapping: @@ -170,7 +170,7 @@ class KademliaPeer: def __post_init__(self): if self._node_id is not None: - if not len(self._node_id) == constants.hash_length: + if not len(self._node_id) == constants.HASH_LENGTH: raise ValueError("invalid node_id: {}".format(hexlify(self._node_id).decode())) if self.udp_port is not None and not 1 <= self.udp_port <= 65535: raise ValueError("invalid udp port") diff --git a/lbry/dht/protocol/data_store.py b/lbry/dht/protocol/data_store.py index 73e69c80e..e6880a18f 100644 --- a/lbry/dht/protocol/data_store.py +++ b/lbry/dht/protocol/data_store.py @@ -22,7 +22,7 @@ class DictDataStore: for key in keys: to_remove = [] for (peer, ts) in self._data_store[key]: - if ts + constants.data_expiration < now or self._peer_manager.peer_is_good(peer) is False: + if ts + constants.DATA_EXPIRATION < now or self._peer_manager.peer_is_good(peer) is False: to_remove.append((peer, ts)) for item in to_remove: self._data_store[key].remove(item) @@ -43,7 +43,7 @@ class DictDataStore: """ now = self.loop.time() for (peer, ts) in self._data_store.get(key, []): - if ts + constants.data_expiration > now: + if ts + constants.DATA_EXPIRATION > now: yield peer def has_peers_for_blob(self, key: bytes) -> bool: diff --git a/lbry/dht/protocol/distance.py b/lbry/dht/protocol/distance.py index 6834d9d5e..5a0b17e53 100644 --- a/lbry/dht/protocol/distance.py +++ b/lbry/dht/protocol/distance.py @@ -9,13 +9,13 @@ class Distance: """ def __init__(self, key: bytes): - if len(key) != constants.hash_length: + if len(key) != constants.HASH_LENGTH: raise ValueError(f"invalid key length: {len(key)}") self.key = key self.val_key_one = int.from_bytes(key, 'big') def __call__(self, key_two: bytes) -> int: - if len(key_two) != constants.hash_length: + if len(key_two) != constants.HASH_LENGTH: raise ValueError(f"invalid length of key to compare: {len(key_two)}") val_key_two = int.from_bytes(key_two, 'big') return self.val_key_one ^ val_key_two diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index 77bbfe190..d0162eefa 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -67,7 +67,7 @@ def get_shortlist(routing_table: 'TreeRoutingTable', key: bytes, :param shortlist: optional manually provided shortlist, this is done during bootstrapping when there are no peers in the routing table. During bootstrap the shortlist is set to be the seed nodes. """ - if len(key) != constants.hash_length: + if len(key) != constants.HASH_LENGTH: raise ValueError("invalid key length: %i" % len(key)) return shortlist or routing_table.find_close_peers(key) @@ -75,10 +75,10 @@ def get_shortlist(routing_table: 'TreeRoutingTable', key: bytes, class IterativeFinder: def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes, - bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.k, + bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K, exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None, shortlist: typing.Optional[typing.List['KademliaPeer']] = None): - if len(key) != constants.hash_length: + if len(key) != constants.HASH_LENGTH: raise ValueError("invalid key length: %i" % len(key)) self.loop = loop self.peer_manager = peer_manager @@ -185,7 +185,7 @@ class IterativeFinder: to_probe = list(self.active - self.contacted) to_probe.sort(key=lambda peer: self.distance(self.key)) for peer in to_probe: - if added >= constants.alpha: + if added >= constants.ALPHA: break origin_address = (peer.address, peer.udp_port) if origin_address in self.exclude: @@ -216,7 +216,7 @@ class IterativeFinder: t.add_done_callback(callback) self.running_probes.add(t) - async def _search_task(self, delay: typing.Optional[float] = constants.iterative_lookup_delay): + async def _search_task(self, delay: typing.Optional[float] = constants.ITERATIVE_LOOKUP_DELAY): try: if self.running: await self._search_round() @@ -263,7 +263,7 @@ class IterativeFinder: class IterativeNodeFinder(IterativeFinder): def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes, - bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.k, + bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K, exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None, shortlist: typing.Optional[typing.List['KademliaPeer']] = None): super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude, @@ -286,7 +286,7 @@ class IterativeNodeFinder(IterativeFinder): and self.peer_manager.peer_is_good(peer) is not False ] not_yet_yielded.sort(key=lambda peer: self.distance(peer.node_id)) - to_yield = not_yet_yielded[:min(constants.k, len(not_yet_yielded))] + to_yield = not_yet_yielded[:min(constants.K, len(not_yet_yielded))] if to_yield: self.yielded_peers.update(to_yield) self.iteration_queue.put_nowait(to_yield) @@ -314,7 +314,7 @@ class IterativeNodeFinder(IterativeFinder): class IterativeValueFinder(IterativeFinder): def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes, - bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.k, + bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K, exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None, shortlist: typing.Optional[typing.List['KademliaPeer']] = None): super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude, @@ -348,7 +348,7 @@ class IterativeValueFinder(IterativeFinder): if len(self.discovered_peers[peer]) != already_known + len(parsed.found_compact_addresses): log.warning("misbehaving peer %s:%i returned duplicate peers for blob", peer.address, peer.udp_port) parsed.found_compact_addresses.clear() - elif len(parsed.found_compact_addresses) >= constants.k and self.peer_pages[peer] < parsed.pages: + elif len(parsed.found_compact_addresses) >= constants.K and self.peer_pages[peer] < parsed.pages: # the peer returned a full page and indicates it has more self.peer_pages[peer] += 1 if peer in self.contacted: diff --git a/lbry/dht/protocol/protocol.py b/lbry/dht/protocol/protocol.py index 7cbaad3e5..472b26e3b 100644 --- a/lbry/dht/protocol/protocol.py +++ b/lbry/dht/protocol/protocol.py @@ -48,13 +48,13 @@ class KademliaRPC: return b'pong' def store(self, rpc_contact: 'KademliaPeer', blob_hash: bytes, token: bytes, port: int) -> bytes: - if len(blob_hash) != constants.hash_bits // 8: + if len(blob_hash) != constants.HASH_BITS // 8: raise ValueError(f"invalid length of blob hash: {len(blob_hash)}") if not 0 < port < 65535: raise ValueError(f"invalid tcp port: {port}") rpc_contact.update_tcp_port(port) if not self.verify_token(token, rpc_contact.compact_ip()): - if self.loop.time() - self.protocol.started_listening_time < constants.token_secret_refresh_interval: + if self.loop.time() - self.protocol.started_listening_time < constants.TOKEN_SECRET_REFRESH_INTERVAL: pass else: raise ValueError("Invalid token") @@ -64,19 +64,19 @@ class KademliaRPC: return b'OK' def find_node(self, rpc_contact: 'KademliaPeer', key: bytes) -> typing.List[typing.Tuple[bytes, str, int]]: - if len(key) != constants.hash_length: + if len(key) != constants.HASH_LENGTH: raise ValueError("invalid contact node_id length: %i" % len(key)) contacts = self.protocol.routing_table.find_close_peers(key, sender_node_id=rpc_contact.node_id) contact_triples = [] - for contact in contacts[:constants.k * 2]: + for contact in contacts[:constants.K * 2]: contact_triples.append((contact.node_id, contact.address, contact.udp_port)) return contact_triples def find_value(self, rpc_contact: 'KademliaPeer', key: bytes, page: int = 0): page = page if page > 0 else 0 - if len(key) != constants.hash_length: + if len(key) != constants.HASH_LENGTH: raise ValueError("invalid blob_exchange hash length: %i" % len(key)) response = { @@ -84,7 +84,7 @@ class KademliaRPC: } if not page: - response[b'contacts'] = self.find_node(rpc_contact, key)[:constants.k] + response[b'contacts'] = self.find_node(rpc_contact, key)[:constants.K] if self.protocol.protocol_version: response[b'protocolVersion'] = self.protocol.protocol_version @@ -96,16 +96,16 @@ class KademliaRPC: if not rpc_contact.tcp_port or peer.compact_address_tcp() != rpc_contact.compact_address_tcp() ] # if we don't have k storing peers to return and we have this hash locally, include our contact information - if len(peers) < constants.k and binascii.hexlify(key).decode() in self.protocol.data_store.completed_blobs: + if len(peers) < constants.K and binascii.hexlify(key).decode() in self.protocol.data_store.completed_blobs: peers.append(self.compact_address()) if not peers: response[PAGE_KEY] = 0 else: - response[PAGE_KEY] = (len(peers) // (constants.k + 1)) + 1 # how many pages of peers we have for the blob - if len(peers) > constants.k: + response[PAGE_KEY] = (len(peers) // (constants.K + 1)) + 1 # how many pages of peers we have for the blob + if len(peers) > constants.K: random.Random(self.protocol.node_id).shuffle(peers) - if page * constants.k < len(peers): - response[key] = peers[page * constants.k:page * constants.k + constants.k] + if page * constants.K < len(peers): + response[key] = peers[page * constants.K:page * constants.K + constants.K] return response def refresh_token(self): # TODO: this needs to be called periodically @@ -154,7 +154,7 @@ class RemoteKademliaRPC: :param blob_hash: blob hash as bytes :return: b'OK' """ - if len(blob_hash) != constants.hash_bits // 8: + if len(blob_hash) != constants.HASH_BITS // 8: raise ValueError(f"invalid length of blob hash: {len(blob_hash)}") if not self.protocol.peer_port or not 0 < self.protocol.peer_port < 65535: raise ValueError(f"invalid tcp port: {self.protocol.peer_port}") @@ -171,7 +171,7 @@ class RemoteKademliaRPC: """ :return: [(node_id, address, udp_port), ...] """ - if len(key) != constants.hash_bits // 8: + if len(key) != constants.HASH_BITS // 8: raise ValueError(f"invalid length of find node key: {len(key)}") response = await self.protocol.send_request( self.peer, RequestDatagram.make_find_node(self.protocol.node_id, key) @@ -186,7 +186,7 @@ class RemoteKademliaRPC: : [ constants.msg_size_limit: + if len(data) > constants.MSG_SIZE_LIMIT: log.warning("cannot send datagram larger than %i bytes (packet is %i bytes)", - constants.msg_size_limit, len(data)) + constants.MSG_SIZE_LIMIT, len(data)) log.debug("Packet is too large to send: %s", binascii.hexlify(data[:3500]).decode()) raise ValueError( - f"cannot send datagram larger than {constants.msg_size_limit} bytes (packet is {len(data)} bytes)" + f"cannot send datagram larger than {constants.MSG_SIZE_LIMIT} bytes (packet is {len(data)} bytes)" ) if isinstance(message, (RequestDatagram, ResponseDatagram)): assert message.node_id == self.node_id, message @@ -637,10 +637,10 @@ class KademliaProtocol(DatagramProtocol): return constants.digest(self.token_secret + compact_ip) def verify_token(self, token, compact_ip): - h = constants.hash_class() + h = constants.HASH_CLASS() h.update(self.token_secret + compact_ip) if self.old_token_secret and not token == h.digest(): # TODO: why should we be accepting the previous token? - h = constants.hash_class() + h = constants.HASH_CLASS() h.update(self.old_token_secret + compact_ip) if not token == h.digest(): return False diff --git a/lbry/dht/protocol/routing_table.py b/lbry/dht/protocol/routing_table.py index d0d1415bf..e936f5cc0 100644 --- a/lbry/dht/protocol/routing_table.py +++ b/lbry/dht/protocol/routing_table.py @@ -56,7 +56,7 @@ class KBucket: self.peers.remove(p) self.peers.append(peer) return True - if len(self.peers) < constants.k: + if len(self.peers) < constants.K: self.peers.append(peer) return True else: @@ -101,8 +101,8 @@ class KBucket: current_len = len(peers) # If count greater than k - return only k contacts - if count > constants.k: - count = constants.k + if count > constants.K: + count = constants.K if not current_len: return peers @@ -164,14 +164,14 @@ class TreeRoutingTable: """ def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', parent_node_id: bytes, - split_buckets_under_index: int = constants.split_buckets_under_index): + split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX): self._loop = loop self._peer_manager = peer_manager self._parent_node_id = parent_node_id self._split_buckets_under_index = split_buckets_under_index self.buckets: typing.List[KBucket] = [ KBucket( - self._peer_manager, range_min=0, range_max=2 ** constants.hash_bits, node_id=self._parent_node_id + self._peer_manager, range_min=0, range_max=2 ** constants.HASH_BITS, node_id=self._parent_node_id ) ] @@ -185,7 +185,7 @@ class TreeRoutingTable: contacts = self.get_peers() distance = Distance(self._parent_node_id) contacts.sort(key=lambda c: distance(c.node_id)) - kth_contact = contacts[-1] if len(contacts) < constants.k else contacts[constants.k - 1] + kth_contact = contacts[-1] if len(contacts) < constants.K else contacts[constants.K - 1] return distance(to_add) < distance(kth_contact.node_id) def find_close_peers(self, key: bytes, count: typing.Optional[int] = None, @@ -193,7 +193,7 @@ class TreeRoutingTable: exclude = [self._parent_node_id] if sender_node_id: exclude.append(sender_node_id) - count = count or constants.k + count = count or constants.K distance = Distance(key) contacts = self.get_peers() contacts = [c for c in contacts if c.node_id not in exclude] @@ -214,7 +214,7 @@ class TreeRoutingTable: refresh_ids = [] now = int(self._loop.time()) for bucket in self.buckets[start_index:]: - if force or now - bucket.last_accessed >= constants.refresh_interval: + if force or now - bucket.last_accessed >= constants.REFRESH_INTERVAL: to_search = self.midpoint_id_in_bucket_range(bucket_index) refresh_ids.append(to_search) bucket_index += 1 @@ -248,13 +248,13 @@ class TreeRoutingTable: random_id = int(random.randrange(self.buckets[bucket_index].range_min, self.buckets[bucket_index].range_max)) return Distance( self._parent_node_id - )(random_id.to_bytes(constants.hash_length, 'big')).to_bytes(constants.hash_length, 'big') + )(random_id.to_bytes(constants.HASH_LENGTH, 'big')).to_bytes(constants.HASH_LENGTH, 'big') def midpoint_id_in_bucket_range(self, bucket_index: int) -> bytes: half = int((self.buckets[bucket_index].range_max - self.buckets[bucket_index].range_min) // 2) return Distance(self._parent_node_id)( - int(self.buckets[bucket_index].range_min + half).to_bytes(constants.hash_length, 'big') - ).to_bytes(constants.hash_length, 'big') + int(self.buckets[bucket_index].range_min + half).to_bytes(constants.HASH_LENGTH, 'big') + ).to_bytes(constants.HASH_LENGTH, 'big') def split_bucket(self, old_bucket_index: int) -> None: """ Splits the specified k-bucket into two new buckets which together diff --git a/lbry/dht/serialization/datagram.py b/lbry/dht/serialization/datagram.py index d84401228..cddff67df 100644 --- a/lbry/dht/serialization/datagram.py +++ b/lbry/dht/serialization/datagram.py @@ -34,9 +34,9 @@ class KademliaDatagramBase: self.packet_type = packet_type if self.expected_packet_type != packet_type: raise ValueError(f"invalid packet type: {packet_type}, expected {self.expected_packet_type}") - if len(rpc_id) != constants.rpc_id_length: + if len(rpc_id) != constants.RPC_ID_LENGTH: raise ValueError(f"invalid rpc node_id: {len(rpc_id)} bytes (expected 20)") - if not len(node_id) == constants.hash_length: + if not len(node_id) == constants.HASH_LENGTH: raise ValueError(f"invalid node node_id: {len(node_id)} bytes (expected 48)") self.rpc_id = rpc_id self.node_id = node_id @@ -77,18 +77,18 @@ class RequestDatagram(KademliaDatagramBase): @classmethod def make_ping(cls, from_node_id: bytes, rpc_id: typing.Optional[bytes] = None) -> 'RequestDatagram': - rpc_id = rpc_id or constants.generate_id()[:constants.rpc_id_length] + rpc_id = rpc_id or constants.generate_id()[:constants.RPC_ID_LENGTH] return cls(REQUEST_TYPE, rpc_id, from_node_id, b'ping') @classmethod def make_store(cls, from_node_id: bytes, blob_hash: bytes, token: bytes, port: int, rpc_id: typing.Optional[bytes] = None) -> 'RequestDatagram': - rpc_id = rpc_id or constants.generate_id()[:constants.rpc_id_length] - if len(blob_hash) != constants.hash_bits // 8: + rpc_id = rpc_id or constants.generate_id()[:constants.RPC_ID_LENGTH] + if len(blob_hash) != constants.HASH_BITS // 8: raise ValueError(f"invalid blob hash length: {len(blob_hash)}") if not 0 < port < 65536: raise ValueError(f"invalid port: {port}") - if len(token) != constants.hash_bits // 8: + if len(token) != constants.HASH_BITS // 8: raise ValueError(f"invalid token length: {len(token)}") store_args = [blob_hash, token, port, from_node_id, 0] return cls(REQUEST_TYPE, rpc_id, from_node_id, b'store', store_args) @@ -96,16 +96,16 @@ class RequestDatagram(KademliaDatagramBase): @classmethod def make_find_node(cls, from_node_id: bytes, key: bytes, rpc_id: typing.Optional[bytes] = None) -> 'RequestDatagram': - rpc_id = rpc_id or constants.generate_id()[:constants.rpc_id_length] - if len(key) != constants.hash_bits // 8: + rpc_id = rpc_id or constants.generate_id()[:constants.RPC_ID_LENGTH] + if len(key) != constants.HASH_BITS // 8: raise ValueError(f"invalid key length: {len(key)}") return cls(REQUEST_TYPE, rpc_id, from_node_id, b'findNode', [key]) @classmethod def make_find_value(cls, from_node_id: bytes, key: bytes, rpc_id: typing.Optional[bytes] = None, page: int = 0) -> 'RequestDatagram': - rpc_id = rpc_id or constants.generate_id()[:constants.rpc_id_length] - if len(key) != constants.hash_bits // 8: + rpc_id = rpc_id or constants.generate_id()[:constants.RPC_ID_LENGTH] + if len(key) != constants.HASH_BITS // 8: raise ValueError(f"invalid key length: {len(key)}") if page < 0: raise ValueError(f"cannot request a negative page ({page})") @@ -179,7 +179,7 @@ def make_compact_address(node_id: bytes, address: str, port: int) -> bytearray: compact_ip = make_compact_ip(address) if not 0 < port < 65536: raise ValueError(f'Invalid port: {port}') - if len(node_id) != constants.hash_bits // 8: + if len(node_id) != constants.HASH_BITS // 8: raise ValueError(f"invalid node node_id length") return compact_ip + port.to_bytes(2, 'big') + node_id @@ -190,6 +190,6 @@ def decode_compact_address(compact_address: bytes) -> typing.Tuple[bytes, str, i node_id = compact_address[6:] if not 0 < port < 65536: raise ValueError(f'Invalid port: {port}') - if len(node_id) != constants.hash_bits // 8: + if len(node_id) != constants.HASH_BITS // 8: raise ValueError(f"invalid node node_id length") return node_id, address, port diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index 268f249f6..3db530e76 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -10,7 +10,7 @@ from lbry.conf import Config from lbry.wallet.dewies import dewies_to_lbc, lbc_to_dewies from lbry.wallet.transaction import Transaction from lbry.schema.claim import Claim -from lbry.dht.constants import data_expiration +from lbry.dht.constants import DATA_EXPIRATION from lbry.blob.blob_info import BlobInfo if typing.TYPE_CHECKING: @@ -386,7 +386,7 @@ class SQLiteStorage(SQLiteMixin): return transaction.executemany( "update blob set next_announce_time=?, last_announced_time=?, single_announce=0 " "where blob_hash=?", - ((int(last_announced + (data_expiration / 2)), int(last_announced), blob_hash) + ((int(last_announced + (DATA_EXPIRATION / 2)), int(last_announced), blob_hash) for blob_hash in blob_hashes) ).fetchall() return self.db.run(_update_last_announced_blobs) diff --git a/tests/unit/dht/protocol/test_kbucket.py b/tests/unit/dht/protocol/test_kbucket.py index 0063d51bd..13b907a54 100644 --- a/tests/unit/dht/protocol/test_kbucket.py +++ b/tests/unit/dht/protocol/test_kbucket.py @@ -26,7 +26,7 @@ class TestKBucket(AsyncioTestCase): self.loop = asyncio.get_event_loop() self.address_generator = address_generator() self.peer_manager = PeerManager(self.loop) - self.kbucket = KBucket(self.peer_manager, 0, 2**constants.hash_bits, generate_id()) + self.kbucket = KBucket(self.peer_manager, 0, 2 ** constants.HASH_BITS, generate_id()) def test_add_peer(self): peer = make_kademlia_peer(constants.generate_id(2), "1.2.3.4", udp_port=4444) @@ -58,7 +58,7 @@ class TestKBucket(AsyncioTestCase): # Test if contacts can be added to empty list # Add k contacts to bucket - for i in range(constants.k): + for i in range(constants.K): peer = make_kademlia_peer(generate_id(), next(self.address_generator), 4444) self.assertTrue(self.kbucket.add_peer(peer)) self.assertEqual(peer, self.kbucket.peers[i]) @@ -130,7 +130,7 @@ class TestKBucket(AsyncioTestCase): added = [] # Add couple contacts - for i in range(constants.k-2): + for i in range(constants.K - 2): peer = make_kademlia_peer(generate_id(), next(self.address_generator), 4444) self.assertTrue(self.kbucket.add_peer(peer)) added.append(peer) diff --git a/tests/unit/dht/test_blob_announcer.py b/tests/unit/dht/test_blob_announcer.py index 34be7c9aa..30a6dbceb 100644 --- a/tests/unit/dht/test_blob_announcer.py +++ b/tests/unit/dht/test_blob_announcer.py @@ -147,13 +147,13 @@ class TestBlobAnnouncer(AsyncioTestCase): self.assertNotIn(blob_hash, peers_for_blob) self.assertEqual(peers_for_blob[b'p'], 0) else: - self.assertEqual(len(peers_for_blob[blob_hash]), min(i - 1, constants.k)) + self.assertEqual(len(peers_for_blob[blob_hash]), min(i - 1, constants.K)) self.assertEqual(len(announced_to.protocol.data_store.get_peers_for_blob(blob_hash)), i) - if i - 1 > constants.k: - self.assertEqual(len(peers_for_blob[b'contacts']), constants.k) - self.assertEqual(peers_for_blob[b'p'], ((i - 1) // (constants.k + 1)) + 1) + if i - 1 > constants.K: + self.assertEqual(len(peers_for_blob[b'contacts']), constants.K) + self.assertEqual(peers_for_blob[b'p'], ((i - 1) // (constants.K + 1)) + 1) seen = set(peers_for_blob[blob_hash]) - self.assertEqual(len(seen), constants.k) + self.assertEqual(len(seen), constants.K) self.assertEqual(len(peers_for_blob[blob_hash]), len(seen)) for pg in range(1, peers_for_blob[b'p']):