diff --git a/lbrynet/core/HashBlob.py b/lbrynet/core/HashBlob.py index 0df2f3208..072d2c5a6 100644 --- a/lbrynet/core/HashBlob.py +++ b/lbrynet/core/HashBlob.py @@ -59,6 +59,9 @@ class HashBlobWriter(object): " %s to %s" % (str(self.len_so_far), str(self.length_getter()))))) else: + if self.write_handle is None: + log.debug("Tried to write to a write_handle that was None.") + return self.write_handle.write(data) if self.len_so_far == self.length_getter(): self.finished_cb(self) diff --git a/lbrynet/core/LBRYcrdWallet.py b/lbrynet/core/LBRYcrdWallet.py index aff9fd6ed..370fb03a9 100644 --- a/lbrynet/core/LBRYcrdWallet.py +++ b/lbrynet/core/LBRYcrdWallet.py @@ -612,7 +612,7 @@ class LBRYcrdAddressRequester(object): if not err.check(RequestCanceledError): log.warning("A peer failed to send a valid public key response. Error: %s, peer: %s", err.getErrorMessage(), str(peer)) - #return err + return err class LBRYcrdAddressQueryHandlerFactory(object): diff --git a/lbrynet/core/PTCWallet.py b/lbrynet/core/PTCWallet.py index 654d373ac..bf02a3b7e 100644 --- a/lbrynet/core/PTCWallet.py +++ b/lbrynet/core/PTCWallet.py @@ -266,7 +266,7 @@ class PointTraderKeyExchanger(object): if not err.check(RequestCanceledError): log.warning("A peer failed to send a valid public key response. Error: %s, peer: %s", err.getErrorMessage(), str(peer)) - #return err + return err class PointTraderKeyQueryHandlerFactory(object): diff --git a/lbrynet/core/RateLimiter.py b/lbrynet/core/RateLimiter.py index 0038cceaf..2063dd939 100644 --- a/lbrynet/core/RateLimiter.py +++ b/lbrynet/core/RateLimiter.py @@ -1,5 +1,6 @@ from zope.interface import implements from lbrynet.interfaces import IRateLimiter +from twisted.internet import task class DummyRateLimiter(object): @@ -10,22 +11,20 @@ class DummyRateLimiter(object): self.total_ul_bytes = 0 self.target_dl = 0 self.target_ul = 0 - self.ul_delay = 0.00 - self.dl_delay = 0.00 - self.next_tick = None + self.tick_call = None + + def start(self): + self.tick_call = task.LoopingCall(self.tick) + self.tick_call.start(1) def tick(self): - - from twisted.internet import reactor - self.dl_bytes_this_second = 0 self.ul_bytes_this_second = 0 - self.next_tick = reactor.callLater(1.0, self.tick) def stop(self): - if self.next_tick is not None: - self.next_tick.cancel() - self.next_tick = None + if self.tick_call is not None: + self.tick_call.stop() + self.tick_call = None def set_dl_limit(self, limit): pass @@ -33,12 +32,6 @@ class DummyRateLimiter(object): def set_ul_limit(self, limit): pass - def ul_wait_time(self): - return self.ul_delay - - def dl_wait_time(self): - return self.dl_delay - def report_dl_bytes(self, num_bytes): self.dl_bytes_this_second += num_bytes self.total_dl_bytes += num_bytes @@ -58,66 +51,32 @@ class RateLimiter(object): def __init__(self, max_dl_bytes=None, max_ul_bytes=None): self.max_dl_bytes = max_dl_bytes self.max_ul_bytes = max_ul_bytes - self.dl_bytes_this_second = 0 - self.ul_bytes_this_second = 0 + self.dl_bytes_this_interval = 0 + self.ul_bytes_this_interval = 0 self.total_dl_bytes = 0 self.total_ul_bytes = 0 - self.next_tick = None - self.next_unthrottle_dl = None - self.next_unthrottle_ul = None - - self.next_dl_check = None - self.next_ul_check = None - - self.dl_check_interval = 1.0 - self.ul_check_interval = 1.0 + self.tick_call = None + self.tick_interval = 0.1 self.dl_throttled = False self.ul_throttled = False self.protocols = [] + def start(self): + self.tick_call = task.LoopingCall(self.tick) + self.tick_call.start(self.tick_interval) + def tick(self): - - from twisted.internet import reactor - - # happens once per second - if self.next_dl_check is not None: - self.next_dl_check.cancel() - self.next_dl_check = None - if self.next_ul_check is not None: - self.next_ul_check.cancel() - self.next_ul_check = None - if self.max_dl_bytes is not None: - if self.dl_bytes_this_second == 0: - self.dl_check_interval = 1.0 - else: - self.dl_check_interval = min(1.0, self.dl_check_interval * - self.max_dl_bytes / self.dl_bytes_this_second) - self.next_dl_check = reactor.callLater(self.dl_check_interval, self.check_dl) - if self.max_ul_bytes is not None: - if self.ul_bytes_this_second == 0: - self.ul_check_interval = 1.0 - else: - self.ul_check_interval = min(1.0, self.ul_check_interval * - self.max_ul_bytes / self.ul_bytes_this_second) - self.next_ul_check = reactor.callLater(self.ul_check_interval, self.check_ul) - self.dl_bytes_this_second = 0 - self.ul_bytes_this_second = 0 + self.dl_bytes_this_interval = 0 + self.ul_bytes_this_interval = 0 self.unthrottle_dl() self.unthrottle_ul() - self.next_tick = reactor.callLater(1.0, self.tick) def stop(self): - if self.next_tick is not None: - self.next_tick.cancel() - self.next_tick = None - if self.next_dl_check is not None: - self.next_dl_check.cancel() - self.next_dl_check = None - if self.next_ul_check is not None: - self.next_ul_check.cancel() - self.next_ul_check = None + if self.tick_call is not None: + self.tick_call.stop() + self.tick_call = None def set_dl_limit(self, limit): self.max_dl_bytes = limit @@ -129,27 +88,15 @@ class RateLimiter(object): def check_dl(self): - from twisted.internet import reactor - - self.next_dl_check = None - - if self.dl_bytes_this_second > self.max_dl_bytes: - self.throttle_dl() - else: - self.next_dl_check = reactor.callLater(self.dl_check_interval, self.check_dl) - self.dl_check_interval = min(self.dl_check_interval * 2, 1.0) + if self.max_dl_bytes is not None and self.dl_bytes_this_interval > self.max_dl_bytes * self.tick_interval: + from twisted.internet import reactor + reactor.callLater(0, self.throttle_dl) def check_ul(self): - from twisted.internet import reactor - - self.next_ul_check = None - - if self.ul_bytes_this_second > self.max_ul_bytes: - self.throttle_ul() - else: - self.next_ul_check = reactor.callLater(self.ul_check_interval, self.check_ul) - self.ul_check_interval = min(self.ul_check_interval * 2, 1.0) + if self.max_ul_bytes is not None and self.ul_bytes_this_interval > self.max_ul_bytes * self.tick_interval: + from twisted.internet import reactor + reactor.callLater(0, self.throttle_ul) def throttle_dl(self): if self.dl_throttled is False: @@ -175,23 +122,17 @@ class RateLimiter(object): protocol.unthrottle_upload() self.ul_throttled = False - #deprecated - - def ul_wait_time(self): - return 0 - - def dl_wait_time(self): - return 0 - #called by protocols def report_dl_bytes(self, num_bytes): - self.dl_bytes_this_second += num_bytes + self.dl_bytes_this_interval += num_bytes self.total_dl_bytes += num_bytes + self.check_dl() def report_ul_bytes(self, num_bytes): - self.ul_bytes_this_second += num_bytes + self.ul_bytes_this_interval += num_bytes self.total_ul_bytes += num_bytes + self.check_ul() def register_protocol(self, protocol): if protocol not in self.protocols: diff --git a/lbrynet/core/Session.py b/lbrynet/core/Session.py index 545415cd9..d919e33b3 100644 --- a/lbrynet/core/Session.py +++ b/lbrynet/core/Session.py @@ -243,7 +243,7 @@ class LBRYSession(object): else: self.blob_manager = DiskBlobManager(self.hash_announcer, self.blob_dir, self.db_dir) - self.rate_limiter.tick() + self.rate_limiter.start() d1 = self.blob_manager.setup() d2 = self.wallet.start() diff --git a/lbrynet/core/client/BlobRequester.py b/lbrynet/core/client/BlobRequester.py index 39b214313..add4279a5 100644 --- a/lbrynet/core/client/BlobRequester.py +++ b/lbrynet/core/client/BlobRequester.py @@ -307,16 +307,13 @@ class BlobRequester(object): return if reason.check(NoResponseError): self._incompatible_peers.append(peer) - return log.warning("Blob requester: a request of type '%s' failed. Reason: %s, Error type: %s", str(request_type), reason.getErrorMessage(), reason.type) self._update_local_score(peer, -10.0) - if isinstance(reason, InvalidResponseError): + if isinstance(reason, InvalidResponseError) or isinstance(reason, NoResponseError): peer.update_score(-10.0) else: peer.update_score(-2.0) if reason.check(ConnectionClosedBeforeResponseError): return - # Only unexpected errors should be returned, as they are indicative of real problems - # and may be shown to the user. return reason \ No newline at end of file diff --git a/lbrynet/core/client/ClientProtocol.py b/lbrynet/core/client/ClientProtocol.py index 2b033500d..aa4381e3d 100644 --- a/lbrynet/core/client/ClientProtocol.py +++ b/lbrynet/core/client/ClientProtocol.py @@ -1,6 +1,6 @@ import json import logging -from twisted.internet import error, defer, reactor +from twisted.internet import error, defer from twisted.internet.protocol import Protocol, ClientFactory from twisted.python import failure from lbrynet.conf import MAX_RESPONSE_INFO_SIZE as MAX_RESPONSE_SIZE @@ -111,6 +111,8 @@ class ClientProtocol(Protocol): def _ask_for_request(self): + log.debug("In _ask_for_request") + if self.connection_closed is True or self.connection_closing is True: return @@ -158,8 +160,9 @@ class ClientProtocol(Protocol): RequestCanceledError): log.error("The connection to %s is closing due to an unexpected error: %s", str(self.peer), err.getErrorMessage()) - if not err.check(RequestCanceledError): - self.transport.loseConnection() + if not err.check(RequestCanceledError): # The connection manager is closing the connection, so + # there's no need to do it here. + return err def _handle_response(self, response): ds = [] @@ -181,9 +184,24 @@ class ClientProtocol(Protocol): d.addErrback(self._handle_response_error) ds.append(d) - dl = defer.DeferredList(ds) + dl = defer.DeferredList(ds, consumeErrors=True) - dl.addCallback(lambda _: self._ask_for_request()) + def get_next_request(results): + failed = False + for success, result in results: + if success is False: + failed = True + log.info("The connection is closing due to an error: %s", str(result.getTraceback())) + if failed is False: + log.debug("Asking for another request.") + from twisted.internet import reactor + reactor.callLater(0, self._ask_for_request) + #self._ask_for_request() + else: + log.debug("Not asking for another request.") + self.transport.loseConnection() + + dl.addCallback(get_next_request) def _downloading_finished(self, arg): log.debug("The blob has finished downloading") diff --git a/lbrynet/core/server/BlobRequestHandler.py b/lbrynet/core/server/BlobRequestHandler.py index 2f167e93b..2ce3688b4 100644 --- a/lbrynet/core/server/BlobRequestHandler.py +++ b/lbrynet/core/server/BlobRequestHandler.py @@ -154,6 +154,6 @@ class BlobRequestHandler(object): self.currently_uploading = None self.file_sender = None if reason is not None and isinstance(reason, Failure): - log.warning("Upload has failed. Reason: %s", reason.getErrorMessage()) + log.info("Upload has failed. Reason: %s", reason.getErrorMessage()) return _send_file() \ No newline at end of file diff --git a/lbrynet/lbrylive/client/LiveStreamMetadataHandler.py b/lbrynet/lbrylive/client/LiveStreamMetadataHandler.py index f38528abb..33a3ad381 100644 --- a/lbrynet/lbrylive/client/LiveStreamMetadataHandler.py +++ b/lbrynet/lbrylive/client/LiveStreamMetadataHandler.py @@ -338,12 +338,9 @@ class LiveStreamMetadataHandler(object): return if reason.check(NoResponseError): self._incompatible_peers.append(peer) - return log.warning("Crypt stream info finder: a request failed. Reason: %s", reason.getErrorMessage()) self._update_local_score(peer, -5.0) peer.update_score(-10.0) if reason.check(ConnectionClosedBeforeResponseError): return - # Only unexpected errors should be returned, as they are indicative of real problems - # and may be shown to the user. return reason \ No newline at end of file diff --git a/lbrynet/lbrynet_console/plugins/BlindRepeater/BlindMetadataHandler.py b/lbrynet/lbrynet_console/plugins/BlindRepeater/BlindMetadataHandler.py index 906571ec4..c66c94c6d 100644 --- a/lbrynet/lbrynet_console/plugins/BlindRepeater/BlindMetadataHandler.py +++ b/lbrynet/lbrynet_console/plugins/BlindRepeater/BlindMetadataHandler.py @@ -290,15 +290,12 @@ class BlindMetadataHandler(object): return if reason.check(NoResponseError): self._incompatible_peers.append(peer) - return log.warning("Valuable blob info requester: a request of type %s has failed. Reason: %s", str(request_type), str(reason.getErrorMessage())) self._update_local_score(peer, -10.0) peer.update_score(-5.0) if reason.check(ConnectionClosedBeforeResponseError): return - # Only unexpected errors should be returned, as they are indicative of real problems - # and may be shown to the user. return reason def _search_for_peers(self): diff --git a/tests/functional_tests.py b/tests/functional_tests.py index f6516cdad..76af2b5ed 100644 --- a/tests/functional_tests.py +++ b/tests/functional_tests.py @@ -92,30 +92,17 @@ class FakeWallet(object): class FakePeerFinder(object): - def __init__(self, port, peer_manager): - self.peer_manager = peer_manager - - def find_peers_for_blob(self, *args): - return defer.succeed([self.peer_manager.get_peer("127.0.0.1", 5553)]) - - def run_manage_loop(self): - pass - - def stop(self): - pass - - -class FakeTwoPeerFinder(object): - def __init__(self, port, peer_manager): + def __init__(self, start_port, peer_manager, num_peers): + self.start_port = start_port self.peer_manager = peer_manager + self.num_peers = num_peers self.count = 0 def find_peers_for_blob(self, *args): - if self.count % 2 == 0: - peer_port = 5553 - else: - peer_port = 5554 + peer_port = self.start_port + self.count self.count += 1 + if self.count >= self.num_peers: + self.count = 0 return defer.succeed([self.peer_manager.get_peer("127.0.0.1", peer_port)]) def run_manage_loop(self): @@ -208,7 +195,7 @@ test_create_stream_sd_file = { 'stream_hash': '6d27fbe10c86d81aacfb897c7a426d0a2214f5a299455a6d315c0f998c4b3545c2dc60906122d94653c23b1898229e3f'} -def start_lbry_uploader(sd_hash_queue, kill_event, dead_event): +def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rate_limit=None): sys.modules = sys.modules.copy() @@ -231,9 +218,9 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event): wallet = FakeWallet() peer_manager = PeerManager() - peer_finder = FakePeerFinder(5553, peer_manager) + peer_finder = FakePeerFinder(5553, peer_manager, 1) hash_announcer = FakeAnnouncer() - rate_limiter = DummyRateLimiter() + rate_limiter = RateLimiter() sd_identifier = StreamDescriptorIdentifier() db_dir = "server" @@ -247,6 +234,9 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event): lbry_file_manager = LBRYFileManager(session, stream_info_manager, sd_identifier) + if ul_rate_limit is not None: + session.rate_limiter.set_ul_limit(ul_rate_limit) + def start_all(): d = session.setup() @@ -302,7 +292,7 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event): return True def create_stream(): - test_file = GenFile(5209343, b''.join([chr(i) for i in xrange(0, 64, 6)])) + test_file = GenFile(file_size, b''.join([chr(i) for i in xrange(0, 64, 6)])) d = create_lbry_file(session, lbry_file_manager, "test_file", test_file) return d @@ -319,6 +309,123 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event): reactor.run() +def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_rate_limit=None): + + sys.modules = sys.modules.copy() + + del sys.modules['twisted.internet.reactor'] + + import twisted.internet + + twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor() + + sys.modules['twisted.internet.reactor'] = twisted.internet.reactor + + from twisted.internet import reactor + + logging.debug("Starting the uploader") + + Random.atfork() + + r = random.Random() + r.seed("start_lbry_uploader") + + wallet = FakeWallet() + peer_port = 5553 + n + peer_manager = PeerManager() + peer_finder = FakePeerFinder(5553, peer_manager, 1) + hash_announcer = FakeAnnouncer() + rate_limiter = RateLimiter() + sd_identifier = StreamDescriptorIdentifier() + + db_dir = "server_" + str(n) + blob_dir = os.path.join(db_dir, "blobfiles") + os.mkdir(db_dir) + os.mkdir(blob_dir) + + session = LBRYSession(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd" + str(n), + peer_finder=peer_finder, hash_announcer=hash_announcer, + blob_dir=None, peer_port=peer_port, + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet) + + stream_info_manager = TempLBRYFileMetadataManager() + + lbry_file_manager = LBRYFileManager(session, stream_info_manager, sd_identifier) + + if ul_rate_limit is not None: + session.rate_limiter.set_ul_limit(ul_rate_limit) + + def make_downloader(metadata, prm): + info_validator = metadata.validator + options = metadata.options + factories = metadata.factories + chosen_options = [o.default_value for o in options.get_downloader_options(info_validator, prm)] + return factories[0].make_downloader(metadata, chosen_options, prm) + + def download_file(): + prm = PaymentRateManager(session.base_payment_rate_manager) + d = download_sd_blob(session, sd_hash, prm) + d.addCallback(sd_identifier.get_metadata_for_sd_blob) + d.addCallback(make_downloader, prm) + d.addCallback(lambda downloader: downloader.start()) + return d + + def start_transfer(): + + logging.debug("Starting the transfer") + + d = session.setup() + d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier)) + d.addCallback(lambda _: lbry_file_manager.setup()) + d.addCallback(lambda _: download_file()) + + return d + + def start_server(): + + server_port = None + + query_handler_factories = { + BlobAvailabilityHandlerFactory(session.blob_manager): True, + BlobRequestHandlerFactory(session.blob_manager, session.wallet, + PaymentRateManager(session.base_payment_rate_manager)): True, + session.wallet.get_wallet_info_query_handler_factory(): True, + } + + server_factory = ServerProtocolFactory(session.rate_limiter, + query_handler_factories, + session.peer_manager) + + server_port = reactor.listenTCP(peer_port, server_factory) + logging.debug("Started listening") + + def kill_server(): + ds = [] + ds.append(session.shut_down()) + ds.append(lbry_file_manager.stop()) + if server_port: + ds.append(server_port.stopListening()) + kill_check.stop() + dead_event.set() + dl = defer.DeferredList(ds) + dl.addCallback(lambda _: reactor.stop()) + return dl + + def check_for_kill(): + if kill_event.is_set(): + kill_server() + + kill_check = task.LoopingCall(check_for_kill) + kill_check.start(1.0) + ready_event.set() + logging.debug("set the ready event") + + d = task.deferLater(reactor, 1.0, start_transfer) + d.addCallback(lambda _: start_server()) + + reactor.run() + + def start_live_server(sd_hash_queue, kill_event, dead_event): sys.modules = sys.modules.copy() @@ -342,7 +449,7 @@ def start_live_server(sd_hash_queue, kill_event, dead_event): wallet = FakeWallet() peer_manager = PeerManager() - peer_finder = FakePeerFinder(5553, peer_manager) + peer_finder = FakePeerFinder(5553, peer_manager, 1) hash_announcer = FakeAnnouncer() rate_limiter = DummyRateLimiter() sd_identifier = StreamDescriptorIdentifier() @@ -481,7 +588,7 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow): wallet = FakeWallet() peer_manager = PeerManager() - peer_finder = FakePeerFinder(5554, peer_manager) + peer_finder = FakePeerFinder(5553, peer_manager, 1) hash_announcer = FakeAnnouncer() rate_limiter = RateLimiter() @@ -601,29 +708,29 @@ class TestTransfer(TestCase): return d @staticmethod - def wait_for_dead_event(dead_event): + def wait_for_event(event, timeout): from twisted.internet import reactor d = defer.Deferred() def stop(): - dead_check.stop() + set_check.stop() if stop_call.active(): stop_call.cancel() d.callback(True) - def check_if_dead_event_set(): - if dead_event.is_set(): + def check_if_event_set(): + if event.is_set(): logging.debug("Dead event has been found set") stop() def done_waiting(): - logging.warning("Dead event has not been found set and timeout has expired") + logging.warning("Event has not been found set and timeout has expired") stop() - dead_check = task.LoopingCall(check_if_dead_event_set) - dead_check.start(.1) - stop_call = reactor.callLater(15, done_waiting) + set_check = task.LoopingCall(check_if_event_set) + set_check.start(.1) + stop_call = reactor.callLater(timeout, done_waiting) return d @staticmethod @@ -650,7 +757,7 @@ class TestTransfer(TestCase): sd_hash_queue = Queue() kill_event = Event() dead_event = Event() - uploader = Process(target=start_lbry_uploader, args=(sd_hash_queue, kill_event, dead_event)) + uploader = Process(target=start_lbry_uploader, args=(sd_hash_queue, kill_event, dead_event, 5209343)) uploader.start() self.server_processes.append(uploader) @@ -658,7 +765,7 @@ class TestTransfer(TestCase): wallet = FakeWallet() peer_manager = PeerManager() - peer_finder = FakePeerFinder(5553, peer_manager) + peer_finder = FakePeerFinder(5553, peer_manager, 1) hash_announcer = FakeAnnouncer() rate_limiter = DummyRateLimiter() sd_identifier = StreamDescriptorIdentifier() @@ -717,7 +824,7 @@ class TestTransfer(TestCase): logging.debug("Client is stopping normally.") kill_event.set() logging.debug("Set the kill event") - d = self.wait_for_dead_event(dead_event) + d = self.wait_for_event(dead_event, 15) def print_shutting_down(): logging.info("Client is shutting down") @@ -744,7 +851,7 @@ class TestTransfer(TestCase): wallet = FakeWallet() peer_manager = PeerManager() - peer_finder = FakePeerFinder(5553, peer_manager) + peer_finder = FakePeerFinder(5553, peer_manager, 1) hash_announcer = FakeAnnouncer() rate_limiter = DummyRateLimiter() sd_identifier = StreamDescriptorIdentifier() @@ -815,7 +922,7 @@ class TestTransfer(TestCase): logging.debug("Client is stopping normally.") kill_event.set() logging.debug("Set the kill event") - d = self.wait_for_dead_event(dead_event) + d = self.wait_for_event(dead_event, 15) def print_shutting_down(): logging.info("Client is shutting down") @@ -847,7 +954,7 @@ class TestTransfer(TestCase): wallet = FakeWallet() peer_manager = PeerManager() - peer_finder = FakeTwoPeerFinder(5553, peer_manager) + peer_finder = FakePeerFinder(5553, peer_manager, 2) hash_announcer = FakeAnnouncer() rate_limiter = DummyRateLimiter() @@ -896,8 +1003,8 @@ class TestTransfer(TestCase): logging.debug("Client is stopping normally.") kill_event.set() logging.debug("Set the kill event") - d1 = self.wait_for_dead_event(dead_event_1) - d2 = self.wait_for_dead_event(dead_event_2) + d1 = self.wait_for_event(dead_event_1, 15) + d2 = self.wait_for_event(dead_event_2, 15) dl = defer.DeferredList([d1, d2]) def print_shutting_down(): @@ -916,7 +1023,7 @@ class TestTransfer(TestCase): sd_hash_queue = Queue() kill_event = Event() dead_event = Event() - uploader = Process(target=start_lbry_uploader, args=(sd_hash_queue, kill_event, dead_event)) + uploader = Process(target=start_lbry_uploader, args=(sd_hash_queue, kill_event, dead_event, 5209343)) uploader.start() self.server_processes.append(uploader) @@ -924,7 +1031,7 @@ class TestTransfer(TestCase): wallet = FakeWallet() peer_manager = PeerManager() - peer_finder = FakePeerFinder(5553, peer_manager) + peer_finder = FakePeerFinder(5553, peer_manager, 1) hash_announcer = FakeAnnouncer() rate_limiter = DummyRateLimiter() sd_identifier = StreamDescriptorIdentifier() @@ -1012,7 +1119,7 @@ class TestTransfer(TestCase): logging.debug("Client is stopping normally.") kill_event.set() logging.debug("Set the kill event") - d = self.wait_for_dead_event(dead_event) + d = self.wait_for_event(dead_event, 15) def print_shutting_down(): logging.info("Client is shutting down") @@ -1026,6 +1133,109 @@ class TestTransfer(TestCase): d.addBoth(stop) return d + def test_multiple_uploaders(self): + + sd_hash_queue = Queue() + num_uploaders = 3 + kill_event = Event() + dead_events = [Event() for _ in range(num_uploaders)] + ready_events = [Event() for _ in range(1, num_uploaders)] + uploader = Process(target=start_lbry_uploader, args=(sd_hash_queue, kill_event, dead_events[0], + 9373419, 2**22)) + uploader.start() + self.server_processes.append(uploader) + + logging.debug("Testing multiple uploaders") + + wallet = FakeWallet() + peer_manager = PeerManager() + peer_finder = FakePeerFinder(5553, peer_manager, num_uploaders) + hash_announcer = FakeAnnouncer() + rate_limiter = DummyRateLimiter() + sd_identifier = StreamDescriptorIdentifier() + + db_dir = "client" + blob_dir = os.path.join(db_dir, "blobfiles") + os.mkdir(db_dir) + os.mkdir(blob_dir) + + self.session = LBRYSession(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", + peer_finder=peer_finder, hash_announcer=hash_announcer, + blob_dir=None, peer_port=5553, + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet) + + self.stream_info_manager = TempLBRYFileMetadataManager() + + self.lbry_file_manager = LBRYFileManager(self.session, self.stream_info_manager, sd_identifier) + + def start_additional_uploaders(sd_hash): + for i in range(1, num_uploaders): + uploader = Process(target=start_lbry_reuploader, + args=(sd_hash, kill_event, dead_events[i], ready_events[i-1], i, 2**10)) + uploader.start() + self.server_processes.append(uploader) + return defer.succeed(True) + + def wait_for_ready_events(): + return defer.DeferredList([self.wait_for_event(ready_event, 60) for ready_event in ready_events]) + + def make_downloader(metadata, prm): + info_validator = metadata.validator + options = metadata.options + factories = metadata.factories + chosen_options = [o.default_value for o in options.get_downloader_options(info_validator, prm)] + return factories[0].make_downloader(metadata, chosen_options, prm) + + def download_file(sd_hash): + prm = PaymentRateManager(self.session.base_payment_rate_manager) + d = download_sd_blob(self.session, sd_hash, prm) + d.addCallback(sd_identifier.get_metadata_for_sd_blob) + d.addCallback(make_downloader, prm) + d.addCallback(lambda downloader: downloader.start()) + return d + + def check_md5_sum(): + f = open('test_file') + hashsum = MD5.new() + hashsum.update(f.read()) + self.assertEqual(hashsum.hexdigest(), "e5941d615f53312fd66638239c1f90d5") + + def start_transfer(sd_hash): + + logging.debug("Starting the transfer") + + d = start_additional_uploaders(sd_hash) + d.addCallback(lambda _: wait_for_ready_events()) + d.addCallback(lambda _: self.session.setup()) + d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier)) + d.addCallback(lambda _: self.lbry_file_manager.setup()) + d.addCallback(lambda _: download_file(sd_hash)) + d.addCallback(lambda _: check_md5_sum()) + + return d + + def stop(arg): + if isinstance(arg, Failure): + logging.debug("Client is stopping due to an error. Error: %s", arg.getTraceback()) + else: + logging.debug("Client is stopping normally.") + kill_event.set() + logging.debug("Set the kill event") + d = defer.DeferredList([self.wait_for_event(dead_event, 15) for dead_event in dead_events]) + + def print_shutting_down(): + logging.info("Client is shutting down") + + d.addCallback(lambda _: print_shutting_down()) + d.addCallback(lambda _: arg) + return d + + d = self.wait_for_hash_from_queue(sd_hash_queue) + d.addCallback(start_transfer) + d.addBoth(stop) + + return d + class TestStreamify(TestCase): @@ -1057,7 +1267,7 @@ class TestStreamify(TestCase): wallet = FakeWallet() peer_manager = PeerManager() - peer_finder = FakeTwoPeerFinder(5553, peer_manager) + peer_finder = FakePeerFinder(5553, peer_manager, 2) hash_announcer = FakeAnnouncer() rate_limiter = DummyRateLimiter() sd_identifier = StreamDescriptorIdentifier() @@ -1109,7 +1319,7 @@ class TestStreamify(TestCase): wallet = FakeWallet() peer_manager = PeerManager() - peer_finder = FakeTwoPeerFinder(5553, peer_manager) + peer_finder = FakePeerFinder(5553, peer_manager, 2) hash_announcer = FakeAnnouncer() rate_limiter = DummyRateLimiter() sd_identifier = StreamDescriptorIdentifier()