From a31b6b192f07504ecbad5a9f9946bedea062c7fb Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Fri, 28 Jul 2017 12:55:04 -0400 Subject: [PATCH 1/5] Create an optional way of downloading by head blob first in ConnectionManager --- lbrynet/conf.py | 1 + lbrynet/core/client/BlobRequester.py | 17 +++-- lbrynet/core/client/ConnectionManager.py | 63 ++++++++++--------- .../core/client/test_ConnectionManager.py | 44 ++++++++++++- 4 files changed, 88 insertions(+), 37 deletions(-) diff --git a/lbrynet/conf.py b/lbrynet/conf.py index caa9ff134..9fde690ad 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -255,6 +255,7 @@ ADJUSTABLE_SETTINGS = { 'known_dht_nodes': (list, DEFAULT_DHT_NODES, server_port), 'lbryum_wallet_dir': (str, default_lbryum_dir), 'max_connections_per_stream': (int, 5), + 'seek_head_blob_first': (bool, False), # TODO: writing json on the cmd line is a pain, come up with a nicer # parser for this data structure. maybe 'USD:25' 'max_key_fee': (json.loads, {'currency': 'USD', 'amount': 50.0}), diff --git a/lbrynet/core/client/BlobRequester.py b/lbrynet/core/client/BlobRequester.py index d7492d041..d3d11163c 100644 --- a/lbrynet/core/client/BlobRequester.py +++ b/lbrynet/core/client/BlobRequester.py @@ -64,10 +64,19 @@ class BlobRequester(object): return defer.succeed(False) return self._send_next_request(peer, protocol) - def get_new_peers(self): - d = self._get_hash_for_peer_search() - d.addCallback(self._find_peers_for_hash) - return d + @defer.inlineCallbacks + def get_new_peers_for_head_blob(self): + """ look for peers for the head blob """ + head_blob_hash = self._download_manager.get_head_blob_hash() + peers = yield self._find_peers_for_hash(head_blob_hash) + defer.returnValue(peers) + + @defer.inlineCallbacks + def get_new_peers_for_next_unavailable(self): + """ look for peers for the next unavailable blob """ + blob_hash = yield self._get_hash_for_peer_search() + peers = yield self._find_peers_for_hash(blob_hash) + defer.returnValue(peers) ######### internal calls ######### def should_send_next_request(self, peer): diff --git a/lbrynet/core/client/ConnectionManager.py b/lbrynet/core/client/ConnectionManager.py index 29fc7be22..294172619 100644 --- a/lbrynet/core/client/ConnectionManager.py +++ b/lbrynet/core/client/ConnectionManager.py @@ -25,10 +25,12 @@ class ConnectionManager(object): def __init__(self, downloader, rate_limiter, primary_request_creators, secondary_request_creators): + self.downloader = downloader self.rate_limiter = rate_limiter self._primary_request_creators = primary_request_creators self._secondary_request_creators = secondary_request_creators + self.seek_head_blob_first = conf.settings['seek_head_blob_first'] self._peer_connections = {} # {Peer: PeerConnectionHandler} self._connections_closing = {} # {Peer: deferred (fired when the connection is closed)} self._next_manage_call = None @@ -150,10 +152,7 @@ class ConnectionManager(object): log.debug("%s have %d connections, looking for %d", self._get_log_name(), len(self._peer_connections), conf.settings['max_connections_per_stream']) - ordered_request_creators = self._rank_request_creator_connections() - peers = yield self._get_new_peers(ordered_request_creators) - new_conns = conf.settings['max_connections_per_stream'] - len(self._peer_connections) - peers = self._pick_best_peers(peers, new_conns) + peers = yield self._get_new_peers() for peer in peers: self._connect_to_peer(peer) self._manage_deferred.callback(None) @@ -161,42 +160,46 @@ class ConnectionManager(object): if not self.stopped and schedule_next_call: self._next_manage_call = utils.call_later(self.MANAGE_CALL_INTERVAL_SEC, self.manage) - def _rank_request_creator_connections(self): - """Returns an ordered list of our request creators, ranked according - to which has the least number of connections open that it - likes - """ - def count_peers(request_creator): - return len([ - p for p in self._peer_connections.itervalues() - if request_creator in p.request_creators]) - - return sorted(self._primary_request_creators, key=count_peers) + def return_shuffled_peers_not_connected_to(self, peers, new_conns_needed): + if peers is None: + # can happen if there is some error in the lookup + return [] + out = [peer for peer in peers if peer not in self._peer_connections] + random.shuffle(out) + out = out[0:new_conns_needed] + return out @defer.inlineCallbacks - def _get_new_peers(self, request_creators): + def _get_new_peers(self): + new_conns_needed = conf.settings['max_connections_per_stream'] - len(self._peer_connections) + if new_conns_needed < 1: + defer.returnValue([]) + # we always get the peer from the first request creator + # must be a type BlobRequester... + request_creator = self._primary_request_creators[0] log.debug("%s Trying to get a new peer to connect to", self._get_log_name()) - if not request_creators: - defer.returnValue(None) - new_peers = yield request_creators[0].get_new_peers() - if not new_peers: - new_peers = yield self._get_new_peers(request_creators[1:]) - defer.returnValue(new_peers) - def _pick_best_peers(self, peers, num_peers_to_pick): - # TODO: Eventually rank them based on past performance/reputation. For now - # TODO: just pick the first to which we don't have an open connection + # find peers for the head blob if configured to do so + if self.seek_head_blob_first is True: + peers = yield request_creator.get_new_peers_for_head_blob() + peers = self.return_shuffled_peers_not_connected_to(peers, new_conns_needed) + else: + peers = [] + + # we didn't find any new peers on the head blob, + # we have to look for the first unavailable blob + if len(peers) == 0: + peers = yield request_creator.get_new_peers_for_next_unavailable() + peers = self.return_shuffled_peers_not_connected_to(peers, new_conns_needed) + log.debug("%s Got a list of peers to choose from: %s", self._get_log_name(), peers) log.debug("%s Current connections: %s", self._get_log_name(), self._peer_connections.keys()) log.debug("%s List of connection states: %s", self._get_log_name(), [p_c_h.connection.state for p_c_h in self._peer_connections.values()]) - if peers is None: - return [] - out = [peer for peer in peers if peer not in self._peer_connections] - random.shuffle(out) - return out[0:num_peers_to_pick] + + defer.returnValue(peers) def _connect_to_peer(self, peer): diff --git a/tests/unit/core/client/test_ConnectionManager.py b/tests/unit/core/client/test_ConnectionManager.py index b24011a46..fa7a5ee30 100644 --- a/tests/unit/core/client/test_ConnectionManager.py +++ b/tests/unit/core/client/test_ConnectionManager.py @@ -30,8 +30,9 @@ class MocDownloader(object): class MocRequestCreator(object): implements(IRequestCreator) - def __init__(self, peers_to_return): + def __init__(self, peers_to_return, peers_to_return_head_blob=[]): self.peers_to_return = peers_to_return + self.peers_to_return_head_blob = peers_to_return_head_blob self.sent_request = False def send_next_request(self, peer, protocol): @@ -53,9 +54,12 @@ class MocRequestCreator(object): if isinstance(err.value, NoResponseError): return err - def get_new_peers(self): + def get_new_peers_for_next_unavailable(self): return self.peers_to_return + def get_new_peers_for_head_blob(self): + return self.peers_to_return_head_blob + class MocFunctionalQueryHandler(object): implements(IQueryHandler) @@ -125,12 +129,17 @@ class TestIntegrationConnectionManager(unittest.TestCase): self.primary_request_creator = MocRequestCreator([self.TEST_PEER]) self.clock = task.Clock() utils.call_later = self.clock.callLater + self.server_port = None + + def _init_connection_manager(self, seek_head_blob_first=False): + # this import is requierd here so utils.call_later is replaced by self.clock.callLater from lbrynet.core.client.ConnectionManager import ConnectionManager self.connection_manager = ConnectionManager(self.downloader, self.rate_limiter, [self.primary_request_creator], []) + + self.connection_manager.seek_head_blob_first = seek_head_blob_first self.connection_manager._start() - self.server_port = None def tearDown(self): if self.server_port is not None: @@ -140,6 +149,7 @@ class TestIntegrationConnectionManager(unittest.TestCase): @defer.inlineCallbacks def test_success(self): + self._init_connection_manager() # test to see that if we setup a server, we get a connection self.server = MocServerProtocolFactory(self.clock) self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST) @@ -153,6 +163,7 @@ class TestIntegrationConnectionManager(unittest.TestCase): @defer.inlineCallbacks def test_server_with_improper_reply(self): + self._init_connection_manager() self.server = MocServerProtocolFactory(self.clock, is_good=False) self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST) yield self.connection_manager.manage(schedule_next_call=False) @@ -166,6 +177,8 @@ class TestIntegrationConnectionManager(unittest.TestCase): @defer.inlineCallbacks def test_non_existing_server(self): # Test to see that if we don't setup a server, we don't get a connection + + self._init_connection_manager() yield self.connection_manager.manage(schedule_next_call=False) self.assertEqual(1, self.connection_manager.num_peer_connections()) connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].factory.connection_was_made_deferred @@ -180,6 +193,8 @@ class TestIntegrationConnectionManager(unittest.TestCase): def test_parallel_connections(self): # Test to see that we make two new connections at a manage call, # without it waiting for the connection to complete + + self._init_connection_manager() test_peer2 = Peer(LOCAL_HOST, PEER_PORT+1) self.primary_request_creator.peers_to_return = [self.TEST_PEER, test_peer2] yield self.connection_manager.manage(schedule_next_call=False) @@ -203,6 +218,7 @@ class TestIntegrationConnectionManager(unittest.TestCase): # test to see that when we call stop, the ConnectionManager waits for the # current manage call to finish, closes connections, # and removes scheduled manage calls + self._init_connection_manager() self.connection_manager.manage(schedule_next_call=True) yield self.connection_manager.stop() self.assertEqual(0, self.TEST_PEER.success_count) @@ -212,6 +228,7 @@ class TestIntegrationConnectionManager(unittest.TestCase): @defer.inlineCallbacks def test_closed_connection_when_server_is_slow(self): + self._init_connection_manager() self.server = MocServerProtocolFactory(self.clock, has_moc_query_handler=True,is_delayed=True) self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST) @@ -224,3 +241,24 @@ class TestIntegrationConnectionManager(unittest.TestCase): self.assertEqual(1, self.TEST_PEER.down_count) + """ test header first seeks """ + @defer.inlineCallbacks + def test_no_peer_for_head_blob(self): + # test that if we can't find blobs for the head blob, + # it looks at the next unavailable and makes connection + self._init_connection_manager(seek_head_blob_first=True) + self.server = MocServerProtocolFactory(self.clock) + self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST) + + self.primary_request_creator.peers_to_return_head_blob = [] + self.primary_request_creator.peers_to_return = [self.TEST_PEER] + + yield self.connection_manager.manage(schedule_next_call=False) + self.assertEqual(1, self.connection_manager.num_peer_connections()) + connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].factory.connection_was_made_deferred + self.assertEqual(0, self.connection_manager.num_peer_connections()) + self.assertTrue(connection_made) + self.assertEqual(1, self.TEST_PEER.success_count) + self.assertEqual(0, self.TEST_PEER.down_count) + + From e2e28338f3e7fccf61fd5cd2c9b7931fcc88d205 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Wed, 2 Aug 2017 12:45:42 -0400 Subject: [PATCH 2/5] in ConnectionManager, be consistent and initialize conf values in __init__ --- lbrynet/core/client/ConnectionManager.py | 10 ++++++---- tests/unit/core/client/test_ConnectionManager.py | 8 +++----- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/lbrynet/core/client/ConnectionManager.py b/lbrynet/core/client/ConnectionManager.py index 294172619..98efe2fed 100644 --- a/lbrynet/core/client/ConnectionManager.py +++ b/lbrynet/core/client/ConnectionManager.py @@ -26,11 +26,13 @@ class ConnectionManager(object): def __init__(self, downloader, rate_limiter, primary_request_creators, secondary_request_creators): + self.seek_head_blob_first = conf.settings['seek_head_blob_first'] + self.max_connections_per_stream = conf.settings['max_connections_per_stream'] + self.downloader = downloader self.rate_limiter = rate_limiter self._primary_request_creators = primary_request_creators self._secondary_request_creators = secondary_request_creators - self.seek_head_blob_first = conf.settings['seek_head_blob_first'] self._peer_connections = {} # {Peer: PeerConnectionHandler} self._connections_closing = {} # {Peer: deferred (fired when the connection is closed)} self._next_manage_call = None @@ -148,10 +150,10 @@ class ConnectionManager(object): @defer.inlineCallbacks def manage(self, schedule_next_call=True): self._manage_deferred = defer.Deferred() - if len(self._peer_connections) < conf.settings['max_connections_per_stream']: + if len(self._peer_connections) < self.max_connections_per_stream: log.debug("%s have %d connections, looking for %d", self._get_log_name(), len(self._peer_connections), - conf.settings['max_connections_per_stream']) + self.max_connections_per_stream) peers = yield self._get_new_peers() for peer in peers: self._connect_to_peer(peer) @@ -171,7 +173,7 @@ class ConnectionManager(object): @defer.inlineCallbacks def _get_new_peers(self): - new_conns_needed = conf.settings['max_connections_per_stream'] - len(self._peer_connections) + new_conns_needed = self.max_connections_per_stream - len(self._peer_connections) if new_conns_needed < 1: defer.returnValue([]) # we always get the peer from the first request creator diff --git a/tests/unit/core/client/test_ConnectionManager.py b/tests/unit/core/client/test_ConnectionManager.py index fa7a5ee30..4fa508d19 100644 --- a/tests/unit/core/client/test_ConnectionManager.py +++ b/tests/unit/core/client/test_ConnectionManager.py @@ -136,8 +136,6 @@ class TestIntegrationConnectionManager(unittest.TestCase): from lbrynet.core.client.ConnectionManager import ConnectionManager self.connection_manager = ConnectionManager(self.downloader, self.rate_limiter, [self.primary_request_creator], []) - - self.connection_manager.seek_head_blob_first = seek_head_blob_first self.connection_manager._start() @@ -242,14 +240,14 @@ class TestIntegrationConnectionManager(unittest.TestCase): """ test header first seeks """ - @defer.inlineCallbacks + @defer.inlineCallbacks def test_no_peer_for_head_blob(self): - # test that if we can't find blobs for the head blob, + # test that if we can't find blobs for the head blob, # it looks at the next unavailable and makes connection self._init_connection_manager(seek_head_blob_first=True) self.server = MocServerProtocolFactory(self.clock) self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST) - + self.primary_request_creator.peers_to_return_head_blob = [] self.primary_request_creator.peers_to_return = [self.TEST_PEER] From e0985695ac5894f7384c5122015044b4c2928e59 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Tue, 15 Aug 2017 20:05:03 -0400 Subject: [PATCH 3/5] some pep8 fixes --- lbrynet/core/client/ConnectionManager.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lbrynet/core/client/ConnectionManager.py b/lbrynet/core/client/ConnectionManager.py index 98efe2fed..594536006 100644 --- a/lbrynet/core/client/ConnectionManager.py +++ b/lbrynet/core/client/ConnectionManager.py @@ -168,8 +168,7 @@ class ConnectionManager(object): return [] out = [peer for peer in peers if peer not in self._peer_connections] random.shuffle(out) - out = out[0:new_conns_needed] - return out + return out[0:new_conns_needed] @defer.inlineCallbacks def _get_new_peers(self): @@ -182,7 +181,7 @@ class ConnectionManager(object): log.debug("%s Trying to get a new peer to connect to", self._get_log_name()) # find peers for the head blob if configured to do so - if self.seek_head_blob_first is True: + if self.seek_head_blob_first: peers = yield request_creator.get_new_peers_for_head_blob() peers = self.return_shuffled_peers_not_connected_to(peers, new_conns_needed) else: @@ -190,7 +189,7 @@ class ConnectionManager(object): # we didn't find any new peers on the head blob, # we have to look for the first unavailable blob - if len(peers) == 0: + if not peers: peers = yield request_creator.get_new_peers_for_next_unavailable() peers = self.return_shuffled_peers_not_connected_to(peers, new_conns_needed) From a9e6c896931d5ece6f9d1f51e9717cbc2166b80e Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Wed, 16 Aug 2017 15:04:50 -0400 Subject: [PATCH 4/5] clean up and clarify in docstring when/where None is returned while searching for peers --- lbrynet/core/client/BlobRequester.py | 40 ++++++++++++++---------- lbrynet/core/client/ConnectionManager.py | 5 +-- 2 files changed, 24 insertions(+), 21 deletions(-) diff --git a/lbrynet/core/client/BlobRequester.py b/lbrynet/core/client/BlobRequester.py index d3d11163c..95ffaa327 100644 --- a/lbrynet/core/client/BlobRequester.py +++ b/lbrynet/core/client/BlobRequester.py @@ -73,8 +73,13 @@ class BlobRequester(object): @defer.inlineCallbacks def get_new_peers_for_next_unavailable(self): - """ look for peers for the next unavailable blob """ + """ + Look for peers for the next unavailable blob, if we have + all blobs, return an empty list + """ blob_hash = yield self._get_hash_for_peer_search() + if blob_hash is None: + defer.returnValue([]) peers = yield self._find_peers_for_hash(blob_hash) defer.returnValue(peers) @@ -112,6 +117,10 @@ class BlobRequester(object): return defer.succeed(sent_request) def _get_hash_for_peer_search(self): + """ + Get next unavailable hash for blob, + returns None if there is nothing left to download + """ r = None blobs_to_download = self._blobs_to_download() if blobs_to_download: @@ -125,26 +134,23 @@ class BlobRequester(object): return defer.succeed(r) def _find_peers_for_hash(self, h): - if h is None: - return None - else: - d = self.peer_finder.find_peers_for_blob(h) + d = self.peer_finder.find_peers_for_blob(h) - def choose_best_peers(peers): - bad_peers = self._get_bad_peers() - without_bad_peers = [p for p in peers if not p in bad_peers] - without_maxed_out_peers = [ - p for p in without_bad_peers if p not in self._maxed_out_peers] - return without_maxed_out_peers + def choose_best_peers(peers): + bad_peers = self._get_bad_peers() + without_bad_peers = [p for p in peers if not p in bad_peers] + without_maxed_out_peers = [ + p for p in without_bad_peers if p not in self._maxed_out_peers] + return without_maxed_out_peers - d.addCallback(choose_best_peers) + d.addCallback(choose_best_peers) - def lookup_failed(err): - log.error("An error occurred looking up peers for a hash: %s", err.getTraceback()) - return [] + def lookup_failed(err): + log.error("An error occurred looking up peers for a hash: %s", err.getTraceback()) + return [] - d.addErrback(lookup_failed) - return d + d.addErrback(lookup_failed) + return d def _should_send_request_to(self, peer): if self._peers[peer] < -5.0: diff --git a/lbrynet/core/client/ConnectionManager.py b/lbrynet/core/client/ConnectionManager.py index 594536006..ae04e8355 100644 --- a/lbrynet/core/client/ConnectionManager.py +++ b/lbrynet/core/client/ConnectionManager.py @@ -163,9 +163,6 @@ class ConnectionManager(object): self._next_manage_call = utils.call_later(self.MANAGE_CALL_INTERVAL_SEC, self.manage) def return_shuffled_peers_not_connected_to(self, peers, new_conns_needed): - if peers is None: - # can happen if there is some error in the lookup - return [] out = [peer for peer in peers if peer not in self._peer_connections] random.shuffle(out) return out[0:new_conns_needed] @@ -204,7 +201,7 @@ class ConnectionManager(object): def _connect_to_peer(self, peer): - if peer is None or self.stopped: + if self.stopped: return log.debug("%s Trying to connect to %s", self._get_log_name(), peer) From ed62566981ef074ed90135eace672fd37ce64cae Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Wed, 2 Aug 2017 12:58:54 -0400 Subject: [PATCH 5/5] add changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d5c655e29..f15f32a18 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ at anytime. ### Added * Added option to announce head blob only if seeding + * Adeed option to download by seeking head blob first * ### Fixed