mirror of
https://github.com/LBRYFoundation/lbry-sdk.git
synced 2025-08-28 16:01:28 +00:00
add kademlia store and expiration test
This commit is contained in:
parent
470ebe2de3
commit
d02ed29e50
3 changed files with 80 additions and 18 deletions
|
@ -23,18 +23,16 @@ class DictDataStore(UserDict.DictMixin):
|
||||||
|
|
||||||
def removeExpiredPeers(self):
|
def removeExpiredPeers(self):
|
||||||
now = int(self._getTime())
|
now = int(self._getTime())
|
||||||
|
|
||||||
def notExpired(peer):
|
|
||||||
if (now - peer[2]) > constants.dataExpireTimeout:
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
for key in self._dict.keys():
|
for key in self._dict.keys():
|
||||||
unexpired_peers = filter(notExpired, self._dict[key])
|
unexpired_peers = filter(lambda peer: now - peer[2] < constants.dataExpireTimeout, self._dict[key])
|
||||||
|
if not unexpired_peers:
|
||||||
|
del self._dict[key]
|
||||||
|
else:
|
||||||
self._dict[key] = unexpired_peers
|
self._dict[key] = unexpired_peers
|
||||||
|
|
||||||
def hasPeersForBlob(self, key):
|
def hasPeersForBlob(self, key):
|
||||||
if key in self._dict and len(self._dict[key]) > 0:
|
if key in self._dict and len(filter(lambda peer: self._getTime() - peer[2] < constants.dataExpireTimeout,
|
||||||
|
self._dict[key])):
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@ -46,8 +44,10 @@ class DictDataStore(UserDict.DictMixin):
|
||||||
self._dict[key] = [(value, lastPublished, originallyPublished, originalPublisherID)]
|
self._dict[key] = [(value, lastPublished, originallyPublished, originalPublisherID)]
|
||||||
|
|
||||||
def getPeersForBlob(self, key):
|
def getPeersForBlob(self, key):
|
||||||
if key in self._dict:
|
return [] if key not in self._dict else [
|
||||||
return [val[0] for val in self._dict[key]]
|
val[0] for val in filter(lambda peer: self._getTime() - peer[2] < constants.dataExpireTimeout,
|
||||||
|
self._dict[key])
|
||||||
|
]
|
||||||
|
|
||||||
def removePeer(self, value):
|
def removePeer(self, value):
|
||||||
for key in self._dict:
|
for key in self._dict:
|
||||||
|
|
|
@ -30,6 +30,14 @@ from iterativefind import iterativeFind
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def expand_peer(compact_peer_info):
|
||||||
|
host = ".".join([str(ord(d)) for d in compact_peer_info[:4]])
|
||||||
|
port, = struct.unpack('>H', compact_peer_info[4:6])
|
||||||
|
peer_node_id = compact_peer_info[6:]
|
||||||
|
return (peer_node_id, host, port)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def rpcmethod(func):
|
def rpcmethod(func):
|
||||||
""" Decorator to expose Node methods as remote procedure calls
|
""" Decorator to expose Node methods as remote procedure calls
|
||||||
|
|
||||||
|
@ -142,7 +150,7 @@ class Node(MockKademliaHelper):
|
||||||
self.old_token_secret = None
|
self.old_token_secret = None
|
||||||
self.externalIP = externalIP
|
self.externalIP = externalIP
|
||||||
self.peerPort = peerPort
|
self.peerPort = peerPort
|
||||||
self._dataStore = dataStore or datastore.DictDataStore()
|
self._dataStore = dataStore or datastore.DictDataStore(self.clock.seconds)
|
||||||
self.peer_manager = peer_manager or PeerManager()
|
self.peer_manager = peer_manager or PeerManager()
|
||||||
self.peer_finder = peer_finder or DHTPeerFinder(self, self.peer_manager)
|
self.peer_finder = peer_finder or DHTPeerFinder(self, self.peer_manager)
|
||||||
self._join_deferred = None
|
self._join_deferred = None
|
||||||
|
@ -428,11 +436,9 @@ class Node(MockKademliaHelper):
|
||||||
if find_result:
|
if find_result:
|
||||||
if key in find_result:
|
if key in find_result:
|
||||||
for peer in find_result[key]:
|
for peer in find_result[key]:
|
||||||
host = ".".join([str(ord(d)) for d in peer[:4]])
|
expanded = expand_peer(peer)
|
||||||
port, = struct.unpack('>H', peer[4:6])
|
if expanded not in expanded_peers:
|
||||||
peer_node_id = peer[6:]
|
expanded_peers.append(expanded)
|
||||||
if (host, port, peer_node_id) not in expanded_peers:
|
|
||||||
expanded_peers.append((peer_node_id, host, port))
|
|
||||||
# TODO: get this working
|
# TODO: get this working
|
||||||
# if 'closestNodeNoValue' in find_result:
|
# if 'closestNodeNoValue' in find_result:
|
||||||
# closest_node_without_value = find_result['closestNodeNoValue']
|
# closest_node_without_value = find_result['closestNodeNoValue']
|
||||||
|
@ -532,7 +538,7 @@ class Node(MockKademliaHelper):
|
||||||
raise TypeError('Invalid port')
|
raise TypeError('Invalid port')
|
||||||
|
|
||||||
compact_address = compact_ip + compact_port + rpc_contact.id
|
compact_address = compact_ip + compact_port + rpc_contact.id
|
||||||
now = int(time.time())
|
now = int(self.clock.seconds())
|
||||||
originallyPublished = now - age
|
originallyPublished = now - age
|
||||||
self._dataStore.addPeerToBlob(blob_hash, compact_address, now, originallyPublished, originalPublisherID)
|
self._dataStore.addPeerToBlob(blob_hash, compact_address, now, originallyPublished, originalPublisherID)
|
||||||
return 'OK'
|
return 'OK'
|
||||||
|
|
56
lbrynet/tests/functional/dht/test_store.py
Normal file
56
lbrynet/tests/functional/dht/test_store.py
Normal file
|
@ -0,0 +1,56 @@
|
||||||
|
import struct
|
||||||
|
from twisted.internet import defer
|
||||||
|
from lbrynet.dht import constants
|
||||||
|
from lbrynet.core.utils import generate_id
|
||||||
|
from dht_test_environment import TestKademliaBase
|
||||||
|
import logging
|
||||||
|
|
||||||
|
log = logging.getLogger()
|
||||||
|
|
||||||
|
|
||||||
|
class TestStore(TestKademliaBase):
|
||||||
|
network_size = 40
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def test_store_and_expire(self):
|
||||||
|
blob_hash = generate_id()
|
||||||
|
announcing_node = self.nodes[20]
|
||||||
|
# announce the blob
|
||||||
|
announce_d = announcing_node.announceHaveBlob(blob_hash)
|
||||||
|
self.pump_clock(5)
|
||||||
|
storing_node_ids = yield announce_d
|
||||||
|
all_nodes = set(self.nodes).union(set(self._seeds))
|
||||||
|
|
||||||
|
# verify the nodes we think stored it did actually store it
|
||||||
|
storing_nodes = [node for node in all_nodes if node.node_id.encode('hex') in storing_node_ids]
|
||||||
|
self.assertEquals(len(storing_nodes), len(storing_node_ids))
|
||||||
|
self.assertEquals(len(storing_nodes), constants.k)
|
||||||
|
for node in storing_nodes:
|
||||||
|
self.assertTrue(node._dataStore.hasPeersForBlob(blob_hash))
|
||||||
|
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
|
||||||
|
self.assertEquals(len(datastore_result), 1)
|
||||||
|
expanded_peers = []
|
||||||
|
for peer in datastore_result:
|
||||||
|
host = ".".join([str(ord(d)) for d in peer[:4]])
|
||||||
|
port, = struct.unpack('>H', peer[4:6])
|
||||||
|
peer_node_id = peer[6:]
|
||||||
|
if (host, port, peer_node_id) not in expanded_peers:
|
||||||
|
expanded_peers.append((peer_node_id, host, port))
|
||||||
|
self.assertEquals(expanded_peers[0],
|
||||||
|
(announcing_node.node_id, announcing_node.externalIP, announcing_node.peerPort))
|
||||||
|
|
||||||
|
# verify the announced blob expires in the storing nodes datastores
|
||||||
|
|
||||||
|
self.clock.advance(constants.dataExpireTimeout) # skip the clock directly ahead
|
||||||
|
for node in storing_nodes:
|
||||||
|
self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash))
|
||||||
|
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
|
||||||
|
self.assertEquals(len(datastore_result), 0)
|
||||||
|
self.assertTrue(blob_hash in node._dataStore._dict) # the looping call shouldn't have removed it yet
|
||||||
|
|
||||||
|
self.pump_clock(constants.checkRefreshInterval + 1) # tick the clock forward (so the nodes refresh)
|
||||||
|
for node in storing_nodes:
|
||||||
|
self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash))
|
||||||
|
datastore_result = node._dataStore.getPeersForBlob(blob_hash)
|
||||||
|
self.assertEquals(len(datastore_result), 0)
|
||||||
|
self.assertTrue(blob_hash not in node._dataStore._dict) # the looping call should have fired after
|
Loading…
Add table
Reference in a new issue