From 9a9b2f47ceba13ad371032d6fb7ef8d88a41e11c Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 29 Mar 2018 09:45:36 -0400 Subject: [PATCH] pass sd_hash to reflector client factory instead of looking it up --- CHANGELOG.md | 1 + lbrynet/reflector/client/client.py | 16 ++++++++-------- lbrynet/reflector/reupload.py | 11 +++++++---- lbrynet/tests/functional/test_reflector.py | 4 ++-- 4 files changed, 18 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bbbd467e2..9605f478b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ at anytime. * raise the default number of concurrent blob announcers to 100 * dht logging to be more verbose with errors and warnings * added `single_announce` and `last_announced_time` columns to the `blob` table in sqlite + * pass the sd hash to reflector ClientFactory instead of looking it up ### Added * virtual kademlia network and mock udp transport for dht integration tests diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index 7c5bc3e3e..329eeb5e0 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -37,6 +37,7 @@ class EncryptedFileReflectorClient(Protocol): self.blob_manager = self.factory.blob_manager self.protocol_version = self.factory.protocol_version self.stream_hash = self.factory.stream_hash + self.sd_hash = self.factory.sd_hash d = self.load_descriptor() d.addCallback(lambda _: self.send_handshake()) @@ -135,14 +136,12 @@ class EncryptedFileReflectorClient(Protocol): def send_handshake(self): self.send_request({'version': self.protocol_version}) + @defer.inlineCallbacks def load_descriptor(self): - def _save_descriptor_blob(sd_blob): - self.stream_descriptor = sd_blob - - d = self.factory.blob_manager.storage.get_sd_blob_hash_for_stream(self.stream_hash) - d.addCallback(self.factory.blob_manager.get_blob) - d.addCallback(_save_descriptor_blob) - return d + if self.sd_hash: + self.stream_descriptor = yield self.factory.blob_manager.get_blob(self.sd_hash) + else: + raise ValueError("no sd hash for stream %s" % self.stream_hash) def parse_response(self, buff): try: @@ -305,9 +304,10 @@ class EncryptedFileReflectorClientFactory(ClientFactory): protocol = EncryptedFileReflectorClient protocol_version = REFLECTOR_V2 - def __init__(self, blob_manager, stream_hash): + def __init__(self, blob_manager, stream_hash, sd_hash): self.blob_manager = blob_manager self.stream_hash = stream_hash + self.sd_hash = sd_hash self.p = None self.finished_deferred = defer.Deferred() diff --git a/lbrynet/reflector/reupload.py b/lbrynet/reflector/reupload.py index 5f31b5c32..e770cc883 100644 --- a/lbrynet/reflector/reupload.py +++ b/lbrynet/reflector/reupload.py @@ -24,9 +24,9 @@ def resolve(host): @defer.inlineCallbacks -def _reflect_stream(blob_manager, stream_hash, reflector_server): +def _reflect_stream(blob_manager, stream_hash, sd_hash, reflector_server): reflector_address, reflector_port = reflector_server[0], reflector_server[1] - factory = ClientFactory(blob_manager, stream_hash) + factory = ClientFactory(blob_manager, stream_hash, sd_hash) ip = yield resolve(reflector_address) yield reactor.connectTCP(ip, reflector_port, factory) result = yield factory.finished_deferred @@ -34,7 +34,7 @@ def _reflect_stream(blob_manager, stream_hash, reflector_server): def _reflect_file(lbry_file, reflector_server): - return _reflect_stream(lbry_file.blob_manager, lbry_file.stream_hash, reflector_server) + return _reflect_stream(lbry_file.blob_manager, lbry_file.stream_hash, lbry_file.sd_hash, reflector_server) @defer.inlineCallbacks @@ -59,6 +59,7 @@ def reflect_file(lbry_file, reflector_server=None): return _reflect_file(lbry_file, reflector_server) +@defer.inlineCallbacks def reflect_stream(blob_manager, stream_hash, reflector_server=None): if reflector_server: if len(reflector_server.split(":")) == 2: @@ -68,7 +69,9 @@ def reflect_stream(blob_manager, stream_hash, reflector_server=None): reflector_server = reflector_server, 5566 else: reflector_server = random.choice(conf.settings['reflector_servers']) - return _reflect_stream(blob_manager, stream_hash, reflector_server) + sd_hash = yield blob_manager.storage.get_sd_blob_hash_for_stream(stream_hash) + result = yield _reflect_stream(blob_manager, stream_hash, sd_hash, reflector_server) + defer.returnValue(result) def reflect_blob_hashes(blob_hashes, blob_manager, reflector_server=None): diff --git a/lbrynet/tests/functional/test_reflector.py b/lbrynet/tests/functional/test_reflector.py index 9cebda795..73aaaf361 100644 --- a/lbrynet/tests/functional/test_reflector.py +++ b/lbrynet/tests/functional/test_reflector.py @@ -213,7 +213,7 @@ class TestReflector(unittest.TestCase): return d def send_to_server(): - factory = reflector.ClientFactory(self.session.blob_manager, self.stream_hash) + factory = reflector.ClientFactory(self.session.blob_manager, self.stream_hash, self.sd_hash) from twisted.internet import reactor reactor.connectTCP('localhost', self.port, factory) @@ -346,7 +346,7 @@ class TestReflector(unittest.TestCase): return factory.finished_deferred def send_to_server_as_stream(result): - factory = reflector.ClientFactory(self.session.blob_manager, self.stream_hash) + factory = reflector.ClientFactory(self.session.blob_manager, self.stream_hash, self.sd_hash) from twisted.internet import reactor reactor.connectTCP('localhost', self.port, factory)