From bfcfa9dc3c6d8b587d0f61a469acf7e52fdd3e24 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Thu, 20 Oct 2016 09:23:39 -0700 Subject: [PATCH] cleanup tests to improve readability --- tests/functional/test_misc.py | 570 +++++------------- tests/functional/test_reflector.py | 2 +- tests/functional/test_streamify.py | 172 ++++++ tests/mocks.py | 7 +- .../core/server/test_BlobRequestHandler.py | 2 +- tests/unit/core/test_Strategy.py | 2 +- 6 files changed, 322 insertions(+), 433 deletions(-) create mode 100644 tests/functional/test_streamify.py diff --git a/tests/functional/test_misc.py b/tests/functional/test_misc.py index 8b8ec0de8..d88acc8f2 100644 --- a/tests/functional/test_misc.py +++ b/tests/functional/test_misc.py @@ -1,10 +1,11 @@ -import shutil -from multiprocessing import Process, Event, Queue +import io import logging +from multiprocessing import Process, Event, Queue +import os import platform +import shutil import sys import random -import io import unittest from Crypto.PublicKey import RSA @@ -14,7 +15,8 @@ from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE from lbrynet.lbrylive.LiveStreamCreator import FileLiveStreamCreator from lbrynet.lbrylive.LiveStreamMetadataManager import DBLiveStreamMetadataManager from lbrynet.lbrylive.LiveStreamMetadataManager import TempLiveStreamMetadataManager -from lbrynet.lbryfile.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager, DBEncryptedFileMetadataManager +from lbrynet.lbryfile.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager +from lbrynet.lbryfile.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager from lbrynet.core.PTCWallet import PointTraderKeyQueryHandlerFactory, PointTraderKeyExchanger from lbrynet.core.Session import Session @@ -28,17 +30,26 @@ from lbrynet.lbryfile.StreamDescriptor import get_sd_info from twisted.internet import defer, threads, task from twisted.trial.unittest import TestCase from twisted.python.failure import Failure -import os + from lbrynet.dht.node import Node -from tests.mocks import DummyBlobAvailabilityTracker from lbrynet.core.PeerManager import PeerManager from lbrynet.core.RateLimiter import DummyRateLimiter, RateLimiter from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory + from lbrynet.lbrylive.server.LiveBlobInfoQueryHandler import CryptBlobInfoQueryHandlerFactory from lbrynet.lbrylive.client.LiveStreamOptions import add_live_stream_to_sd_identifier from lbrynet.lbrylive.client.LiveStreamDownloader import add_full_live_stream_downloader_to_sd_identifier +from tests import mocks + +FakeNode = mocks.Node +FakeWallet = mocks.Wallet +FakePeerFinder = mocks.PeerFinder +FakeAnnouncer = mocks.Announcer +GenFile = mocks.GenFile +test_create_stream_sd_file = mocks.create_stream_sd_file +DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker log_format = "%(funcName)s(): %(message)s" logging.basicConfig(level=logging.WARNING, format=log_format) @@ -52,162 +63,7 @@ def require_system(system): else: return unittest.skip("Skipping. Test can only be run on " + system) - -class FakeNode(object): - def __init__(self, *args, **kwargs): - pass - - def joinNetwork(self, *args): - pass - - def stop(self): - pass - - -class FakeWallet(object): - def __init__(self): - self.private_key = RSA.generate(1024) - self.encoded_public_key = self.private_key.publickey().exportKey() - - def start(self): - return defer.succeed(True) - - def stop(self): - return defer.succeed(True) - - def get_info_exchanger(self): - return PointTraderKeyExchanger(self) - - def get_wallet_info_query_handler_factory(self): - return PointTraderKeyQueryHandlerFactory(self) - - def reserve_points(self, *args): - return True - - def cancel_point_reservation(self, *args): - pass - - def send_points(self, *args): - return defer.succeed(True) - - def add_expected_payment(self, *args): - pass - - def get_balance(self): - return defer.succeed(1000) - - def set_public_key_for_peer(self, peer, public_key): - pass - - def get_claim_metadata_for_sd_hash(self, sd_hash): - return "fakeuri", "faketxid" - - -class FakePeerFinder(object): - def __init__(self, start_port, peer_manager, num_peers): - self.start_port = start_port - self.peer_manager = peer_manager - self.num_peers = num_peers - self.count = 0 - - def find_peers_for_blob(self, *args): - peer_port = self.start_port + self.count - self.count += 1 - if self.count >= self.num_peers: - self.count = 0 - return defer.succeed([self.peer_manager.get_peer("127.0.0.1", peer_port)]) - - def run_manage_loop(self): - pass - - def stop(self): - pass - - -class FakeAnnouncer(object): - - def __init__(self, *args): - pass - - def add_supplier(self, supplier): - pass - - def immediate_announce(self, *args): - pass - - def run_manage_loop(self): - pass - - def stop(self): - pass - - -class GenFile(io.RawIOBase): - def __init__(self, size, pattern): - io.RawIOBase.__init__(self) - self.size = size - self.pattern = pattern - self.read_so_far = 0 - self.buff = b'' - self.last_offset = 0 - - def readable(self): - return True - - def writable(self): - return False - - def read(self, n=-1): - if n > -1: - bytes_to_read = min(n, self.size - self.read_so_far) - else: - bytes_to_read = self.size - self.read_so_far - output, self.buff = self.buff[:bytes_to_read], self.buff[bytes_to_read:] - bytes_to_read -= len(output) - while bytes_to_read > 0: - self.buff = self._generate_chunk() - new_output, self.buff = self.buff[:bytes_to_read], self.buff[bytes_to_read:] - bytes_to_read -= len(new_output) - output += new_output - self.read_so_far += len(output) - return output - - def readall(self): - return self.read() - - def _generate_chunk(self, n=2**10): - output = self.pattern[self.last_offset:self.last_offset + n] - n_left = n - len(output) - whole_patterns = n_left / len(self.pattern) - output += self.pattern * whole_patterns - self.last_offset = n - len(output) - output += self.pattern[:self.last_offset] - return output - - -test_create_stream_sd_file = { - 'stream_name': '746573745f66696c65', - 'blobs': [ - {'length': 2097152, 'blob_num': 0, - 'blob_hash': - 'dc4708f76a5e7af0f1cae0ee96b824e2ed9250c9346c093b441f0a20d3607c17948b6fcfb4bc62020fe5286693d08586', - 'iv': '30303030303030303030303030303031'}, - {'length': 2097152, 'blob_num': 1, - 'blob_hash': - 'f4067522c1b49432a2a679512e3917144317caa1abba0c041e0cd2cf9f635d4cf127ce1824fa04189b63916174951f70', - 'iv': '30303030303030303030303030303032'}, - {'length': 1015056, 'blob_num': 2, - 'blob_hash': - '305486c434260484fcb2968ce0e963b72f81ba56c11b08b1af0789b55b44d78422600f9a38e3cf4f2e9569897e5646a9', - 'iv': '30303030303030303030303030303033'}, - {'length': 0, 'blob_num': 3, 'iv': '30303030303030303030303030303034'}], - 'stream_type': 'lbryfile', - 'key': '30313233343536373031323334353637', - 'suggested_file_name': '746573745f66696c65', - 'stream_hash': '6d27fbe10c86d81aacfb897c7a426d0a2214f5a299455a6d315c0f998c4b3545c2dc60906122d94653c23b1898229e3f'} - - -def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rate_limit=None, is_generous=False): +def use_epoll_on_linux(): if sys.platform.startswith("linux"): sys.modules = sys.modules.copy() del sys.modules['twisted.internet.reactor'] @@ -215,47 +71,63 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rat twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor() sys.modules['twisted.internet.reactor'] = twisted.internet.reactor - from twisted.internet import reactor - logging.debug("Starting the uploader") +class LbryUploader(object): + def __init__(self, sd_hash_queue, kill_event, dead_event, + file_size, ul_rate_limit=None, is_generous=False): + self.sd_hash_queue = sd_hash_queue + self.kill_event = kill_event + self.dead_event = dead_event + self.file_size = file_size + self.ul_rate_limit = ul_rate_limit + self.is_generous = is_generous + # these attributes get defined in `start` + self.reactor = None + self.sd_identifier = None + self.session = None + self.lbry_file_manager = None + self.server_port = None + self.kill_check = None - Random.atfork() + def start(self): + use_epoll_on_linux() + from twisted.internet import reactor + self.reactor = reactor + logging.debug("Starting the uploader") + Random.atfork() + r = random.Random() + r.seed("start_lbry_uploader") + wallet = FakeWallet() + peer_manager = PeerManager() + peer_finder = FakePeerFinder(5553, peer_manager, 1) + hash_announcer = FakeAnnouncer() + rate_limiter = RateLimiter() + self.sd_identifier = StreamDescriptorIdentifier() + db_dir = "server" + os.mkdir(db_dir) + self.session = Session( + MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", + peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553, + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, + blob_tracker_class=DummyBlobAvailabilityTracker, + dht_node_class=Node, is_generous=self.is_generous) + stream_info_manager = TempEncryptedFileMetadataManager() + self.lbry_file_manager = EncryptedFileManager( + self.session, stream_info_manager, self.sd_identifier) + if self.ul_rate_limit is not None: + self.session.rate_limiter.set_ul_limit(self.ul_rate_limit) + reactor.callLater(1, self.start_all) + if not reactor.running: + reactor.run() - r = random.Random() - r.seed("start_lbry_uploader") - - wallet = FakeWallet() - peer_manager = PeerManager() - peer_finder = FakePeerFinder(5553, peer_manager, 1) - hash_announcer = FakeAnnouncer() - rate_limiter = RateLimiter() - sd_identifier = StreamDescriptorIdentifier() - - - db_dir = "server" - os.mkdir(db_dir) - - session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", - peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553, - use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - dht_node_class=Node, is_generous=is_generous) - - stream_info_manager = TempEncryptedFileMetadataManager() - - lbry_file_manager = EncryptedFileManager(session, stream_info_manager, sd_identifier) - - if ul_rate_limit is not None: - session.rate_limiter.set_ul_limit(ul_rate_limit) - - def start_all(): - - d = session.setup() - d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier)) - d.addCallback(lambda _: lbry_file_manager.setup()) - d.addCallback(lambda _: start_server()) - d.addCallback(lambda _: create_stream()) - d.addCallback(create_stream_descriptor) - d.addCallback(put_sd_hash_on_queue) + def start_all(self): + d = self.session.setup() + d.addCallback(lambda _: add_lbry_file_to_sd_identifier(self.sd_identifier)) + d.addCallback(lambda _: self.lbry_file_manager.setup()) + d.addCallback(lambda _: self.start_server()) + d.addCallback(lambda _: self.create_stream()) + d.addCallback(self.create_stream_descriptor) + d.addCallback(self.put_sd_hash_on_queue) def print_error(err): logging.critical("Server error: %s", err.getErrorMessage()) @@ -263,71 +135,57 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rat d.addErrback(print_error) return d - def start_server(): - - server_port = None - + def start_server(self): + session = self.session query_handler_factories = { BlobRequestHandlerFactory(session.blob_manager, session.wallet, session.payment_rate_manager): True, session.wallet.get_wallet_info_query_handler_factory(): True, } - server_factory = ServerProtocolFactory(session.rate_limiter, query_handler_factories, session.peer_manager) - - server_port = reactor.listenTCP(5553, server_factory) + self.server_port = self.reactor.listenTCP(5553, server_factory) logging.debug("Started listening") - - def kill_server(): - ds = [] - ds.append(session.shut_down()) - ds.append(lbry_file_manager.stop()) - if server_port: - ds.append(server_port.stopListening()) - kill_check.stop() - dead_event.set() - dl = defer.DeferredList(ds) - dl.addCallback(lambda _: reactor.stop()) - return dl - - def check_for_kill(): - if kill_event.is_set(): - kill_server() - - kill_check = task.LoopingCall(check_for_kill) - kill_check.start(1.0) + self.kill_check = task.LoopingCall(self.check_for_kill) + self.kill_check.start(1.0) return True - def create_stream(): - test_file = GenFile(file_size, b''.join([chr(i) for i in xrange(0, 64, 6)])) - d = create_lbry_file(session, lbry_file_manager, "test_file", test_file) + def kill_server(self): + session = self.session + ds = [] + ds.append(session.shut_down()) + ds.append(self.lbry_file_manager.stop()) + if self.server_port: + ds.append(self.server_port.stopListening()) + self.kill_check.stop() + self.dead_event.set() + dl = defer.DeferredList(ds) + dl.addCallback(lambda _: self.reactor.stop()) + return dl + + def check_for_kill(self): + if self.kill_event.is_set(): + self.kill_server() + + def create_stream(self): + test_file = GenFile(self.file_size, b''.join([chr(i) for i in xrange(0, 64, 6)])) + d = create_lbry_file(self.session, self.lbry_file_manager, "test_file", test_file) return d - def create_stream_descriptor(stream_hash): - descriptor_writer = BlobStreamDescriptorWriter(session.blob_manager) - d = get_sd_info(lbry_file_manager.stream_info_manager, stream_hash, True) + def create_stream_descriptor(self, stream_hash): + descriptor_writer = BlobStreamDescriptorWriter(self.session.blob_manager) + d = get_sd_info(self.lbry_file_manager.stream_info_manager, stream_hash, True) d.addCallback(descriptor_writer.create_descriptor) return d - def put_sd_hash_on_queue(sd_hash): - sd_hash_queue.put(sd_hash) - - reactor.callLater(1, start_all) - if not reactor.running: - reactor.run() + def put_sd_hash_on_queue(self, sd_hash): + self.sd_hash_queue.put(sd_hash) -def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_rate_limit=None, is_generous=False): - - if sys.platform.startswith("linux"): - sys.modules = sys.modules.copy() - del sys.modules['twisted.internet.reactor'] - import twisted.internet - twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor() - sys.modules['twisted.internet.reactor'] = twisted.internet.reactor - +def start_lbry_reuploader(sd_hash, kill_event, dead_event, + ready_event, n, ul_rate_limit=None, is_generous=False): + use_epoll_on_linux() from twisted.internet import reactor logging.debug("Starting the uploader") @@ -335,7 +193,7 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_ra Random.atfork() r = random.Random() - r.seed("start_lbry_uploader") + r.seed("start_lbry_reuploader") wallet = FakeWallet() peer_port = 5553 + n @@ -434,14 +292,7 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_ra def start_live_server(sd_hash_queue, kill_event, dead_event): - - if sys.platform.startswith("linux"): - sys.modules = sys.modules.copy() - del sys.modules['twisted.internet.reactor'] - import twisted.internet - twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor() - sys.modules['twisted.internet.reactor'] = twisted.internet.reactor - + use_epoll_on_linux() from twisted.internet import reactor logging.debug("In start_server.") @@ -566,14 +417,7 @@ def start_live_server(sd_hash_queue, kill_event, dead_event): def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_generous=False): - - if sys.platform.startswith("linux"): - sys.modules = sys.modules.copy() - del sys.modules['twisted.internet.reactor'] - import twisted.internet - twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor() - sys.modules['twisted.internet.reactor'] = twisted.internet.reactor - + use_epoll_on_linux() from twisted.internet import reactor logging.debug("Starting the uploader") @@ -751,7 +595,8 @@ class TestTransfer(TestCase): sd_hash_queue = Queue() kill_event = Event() dead_event = Event() - uploader = Process(target=start_lbry_uploader, args=(sd_hash_queue, kill_event, dead_event, 5209343)) + lbry_uploader = LbryUploader(sd_hash_queue, kill_event, dead_event, 5209343) + uploader = Process(target=lbry_uploader.start) uploader.start() self.server_processes.append(uploader) @@ -770,21 +615,25 @@ class TestTransfer(TestCase): os.mkdir(db_dir) os.mkdir(blob_dir) - self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", - peer_finder=peer_finder, hash_announcer=hash_announcer, - blob_dir=blob_dir, peer_port=5553, - use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - dht_node_class=Node, is_generous=self.is_generous) + self.session = Session( + MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", + peer_finder=peer_finder, hash_announcer=hash_announcer, + blob_dir=blob_dir, peer_port=5553, + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, + blob_tracker_class=DummyBlobAvailabilityTracker, + dht_node_class=Node, is_generous=self.is_generous) self.stream_info_manager = TempEncryptedFileMetadataManager() - self.lbry_file_manager = EncryptedFileManager(self.session, self.stream_info_manager, sd_identifier) + self.lbry_file_manager = EncryptedFileManager( + self.session, self.stream_info_manager, sd_identifier) def make_downloader(metadata, prm): info_validator = metadata.validator options = metadata.options factories = metadata.factories - chosen_options = [o.default_value for o in options.get_downloader_options(info_validator, prm)] + chosen_options = [ + o.default_value for o in options.get_downloader_options(info_validator, prm)] return factories[0].make_downloader(metadata, chosen_options, prm) def download_file(sd_hash): @@ -856,10 +705,12 @@ class TestTransfer(TestCase): db_dir = "client" os.mkdir(db_dir) - self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", - peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=None, - peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, - blob_tracker_class=DummyBlobAvailabilityTracker, dht_node_class=Node) + self.session = Session( + MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", + peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=None, + peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, + blob_tracker_class=DummyBlobAvailabilityTracker, dht_node_class=Node + ) self.stream_info_manager = TempLiveStreamMetadataManager(hash_announcer) @@ -869,7 +720,8 @@ class TestTransfer(TestCase): info_validator = metadata.validator options = metadata.options factories = metadata.factories - chosen_options = [o.default_value for o in options.get_downloader_options(info_validator, prm)] + chosen_options = [ + o.default_value for o in options.get_downloader_options(info_validator, prm)] return factories[0].make_downloader(metadata, chosen_options, prm) def start_lbry_file(lbry_file): @@ -928,7 +780,6 @@ class TestTransfer(TestCase): return d def test_last_blob_retrieval(self): - kill_event = Event() dead_event_1 = Event() blob_hash_queue_1 = Queue() @@ -957,10 +808,12 @@ class TestTransfer(TestCase): os.mkdir(db_dir) os.mkdir(blob_dir) - self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", - peer_finder=peer_finder, hash_announcer=hash_announcer, - blob_dir=blob_dir, peer_port=5553, - use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker) + self.session = Session( + MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", + peer_finder=peer_finder, hash_announcer=hash_announcer, + blob_dir=blob_dir, peer_port=5553, + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, + blob_tracker_class=DummyBlobAvailabilityTracker) d1 = self.wait_for_hash_from_queue(blob_hash_queue_1) d2 = self.wait_for_hash_from_queue(blob_hash_queue_2) @@ -974,8 +827,8 @@ class TestTransfer(TestCase): def download_blob(blob_hash): prm = self.session.payment_rate_manager - downloader = StandaloneBlobDownloader(blob_hash, self.session.blob_manager, peer_finder, - rate_limiter, prm, wallet) + downloader = StandaloneBlobDownloader( + blob_hash, self.session.blob_manager, peer_finder, rate_limiter, prm, wallet) d = downloader.download() return d @@ -1000,23 +853,20 @@ class TestTransfer(TestCase): d1 = self.wait_for_event(dead_event_1, 15) d2 = self.wait_for_event(dead_event_2, 15) dl = defer.DeferredList([d1, d2]) - def print_shutting_down(): logging.info("Client is shutting down") - dl.addCallback(lambda _: print_shutting_down()) dl.addCallback(lambda _: arg) return dl - d.addBoth(stop) - return d def test_double_download(self): sd_hash_queue = Queue() kill_event = Event() dead_event = Event() - uploader = Process(target=start_lbry_uploader, args=(sd_hash_queue, kill_event, dead_event, 5209343)) + lbry_uploader = LbryUploader(sd_hash_queue, kill_event, dead_event, 5209343) + uploader = Process(target=lbry_uploader.start) uploader.start() self.server_processes.append(uploader) @@ -1132,8 +982,9 @@ class TestTransfer(TestCase): kill_event = Event() dead_events = [Event() for _ in range(num_uploaders)] ready_events = [Event() for _ in range(1, num_uploaders)] - uploader = Process(target=start_lbry_uploader, args=(sd_hash_queue, kill_event, dead_events[0], - 9373419, 2**22)) + lbry_uploader = LbryUploader( + sd_hash_queue, kill_event, dead_events[0], 5209343, 9373419, 2**22) + uploader = Process(target=lbry_uploader.start) uploader.start() self.server_processes.append(uploader) @@ -1228,140 +1079,3 @@ class TestTransfer(TestCase): d.addBoth(stop) return d - - -class TestStreamify(TestCase): - - def setUp(self): - self.session = None - self.stream_info_manager = None - self.lbry_file_manager = None - self.addCleanup(self.take_down_env) - self.is_generous = True - - def take_down_env(self): - - d = defer.succeed(True) - if self.lbry_file_manager is not None: - d.addCallback(lambda _: self.lbry_file_manager.stop()) - if self.session is not None: - d.addCallback(lambda _: self.session.shut_down()) - if self.stream_info_manager is not None: - d.addCallback(lambda _: self.stream_info_manager.stop()) - - def delete_test_env(): - shutil.rmtree('client') - if os.path.exists("test_file"): - os.remove("test_file") - - d.addCallback(lambda _: threads.deferToThread(delete_test_env)) - return d - - def test_create_stream(self): - wallet = FakeWallet() - peer_manager = PeerManager() - peer_finder = FakePeerFinder(5553, peer_manager, 2) - hash_announcer = FakeAnnouncer() - rate_limiter = DummyRateLimiter() - sd_identifier = StreamDescriptorIdentifier() - - - db_dir = "client" - blob_dir = os.path.join(db_dir, "blobfiles") - os.mkdir(db_dir) - os.mkdir(blob_dir) - - self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", - peer_finder=peer_finder, hash_announcer=hash_announcer, - blob_dir=blob_dir, peer_port=5553, - use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, - blob_tracker_class=DummyBlobAvailabilityTracker, is_generous=self.is_generous) - - self.stream_info_manager = TempEncryptedFileMetadataManager() - - self.lbry_file_manager = EncryptedFileManager(self.session, self.stream_info_manager, sd_identifier) - - d = self.session.setup() - d.addCallback(lambda _: self.stream_info_manager.setup()) - d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier)) - d.addCallback(lambda _: self.lbry_file_manager.setup()) - - def verify_equal(sd_info): - self.assertEqual(sd_info, test_create_stream_sd_file) - - def verify_stream_descriptor_file(stream_hash): - d = get_sd_info(self.lbry_file_manager.stream_info_manager, stream_hash, True) - d.addCallback(verify_equal) - return d - - def iv_generator(): - iv = 0 - while 1: - iv += 1 - yield "%016d" % iv - - def create_stream(): - test_file = GenFile(5209343, b''.join([chr(i + 3) for i in xrange(0, 64, 6)])) - d = create_lbry_file(self.session, self.lbry_file_manager, "test_file", test_file, - key="0123456701234567", iv_generator=iv_generator()) - return d - - d.addCallback(lambda _: create_stream()) - d.addCallback(verify_stream_descriptor_file) - return d - - def test_create_and_combine_stream(self): - - wallet = FakeWallet() - peer_manager = PeerManager() - peer_finder = FakePeerFinder(5553, peer_manager, 2) - hash_announcer = FakeAnnouncer() - rate_limiter = DummyRateLimiter() - sd_identifier = StreamDescriptorIdentifier() - - db_dir = "client" - blob_dir = os.path.join(db_dir, "blobfiles") - os.mkdir(db_dir) - os.mkdir(blob_dir) - - self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", - peer_finder=peer_finder, hash_announcer=hash_announcer, - blob_dir=blob_dir, peer_port=5553, - use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker) - - self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir) - - self.lbry_file_manager = EncryptedFileManager(self.session, self.stream_info_manager, sd_identifier) - - def start_lbry_file(lbry_file): - logging.debug("Calling lbry_file.start()") - d = lbry_file.start() - return d - - def combine_stream(stream_hash): - - prm = self.session.payment_rate_manager - d = self.lbry_file_manager.add_lbry_file(stream_hash, prm) - d.addCallback(start_lbry_file) - - def check_md5_sum(): - f = open('test_file') - hashsum = MD5.new() - hashsum.update(f.read()) - self.assertEqual(hashsum.hexdigest(), "68959747edc73df45e45db6379dd7b3b") - - d.addCallback(lambda _: check_md5_sum()) - return d - - def create_stream(): - test_file = GenFile(53209343, b''.join([chr(i + 5) for i in xrange(0, 64, 6)])) - return create_lbry_file(self.session, self.lbry_file_manager, "test_file", test_file, - suggested_file_name="test_file") - - d = self.session.setup() - d.addCallback(lambda _: self.stream_info_manager.setup()) - d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier)) - d.addCallback(lambda _: self.lbry_file_manager.setup()) - d.addCallback(lambda _: create_stream()) - d.addCallback(combine_stream) - return d diff --git a/tests/functional/test_reflector.py b/tests/functional/test_reflector.py index 7dd0ad6e7..942684b46 100644 --- a/tests/functional/test_reflector.py +++ b/tests/functional/test_reflector.py @@ -93,7 +93,7 @@ class TestReflector(unittest.TestCase): use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, - blob_tracker_class=mocks.DummyBlobAvailabilityTracker, + blob_tracker_class=mocks.BlobAvailabilityTracker, dht_node_class=Node ) diff --git a/tests/functional/test_streamify.py b/tests/functional/test_streamify.py new file mode 100644 index 000000000..436455f3b --- /dev/null +++ b/tests/functional/test_streamify.py @@ -0,0 +1,172 @@ +import logging +import os +import shutil + +from Crypto.Hash import MD5 +from twisted.trial.unittest import TestCase +from twisted.internet import defer, threads + +from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE +from lbrynet.lbryfile.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager +from lbrynet.lbryfile.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager +from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager +from lbrynet.core.Session import Session +from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier +from lbrynet.lbryfilemanager.EncryptedFileCreator import create_lbry_file +from lbrynet.lbryfile.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier +from lbrynet.lbryfile.StreamDescriptor import get_sd_info +from lbrynet.core.PeerManager import PeerManager +from lbrynet.core.RateLimiter import DummyRateLimiter, RateLimiter + +from tests import mocks + + +FakeNode = mocks.Node +FakeWallet = mocks.Wallet +FakePeerFinder = mocks.PeerFinder +FakeAnnouncer = mocks.Announcer +GenFile = mocks.GenFile +test_create_stream_sd_file = mocks.create_stream_sd_file +DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker + + +class TestStreamify(TestCase): + def setUp(self): + self.session = None + self.stream_info_manager = None + self.lbry_file_manager = None + self.addCleanup(self.take_down_env) + self.is_generous = True + + def take_down_env(self): + d = defer.succeed(True) + if self.lbry_file_manager is not None: + d.addCallback(lambda _: self.lbry_file_manager.stop()) + if self.session is not None: + d.addCallback(lambda _: self.session.shut_down()) + if self.stream_info_manager is not None: + d.addCallback(lambda _: self.stream_info_manager.stop()) + + def delete_test_env(): + shutil.rmtree('client') + if os.path.exists("test_file"): + os.remove("test_file") + + d.addCallback(lambda _: threads.deferToThread(delete_test_env)) + return d + + def test_create_stream(self): + wallet = FakeWallet() + peer_manager = PeerManager() + peer_finder = FakePeerFinder(5553, peer_manager, 2) + hash_announcer = FakeAnnouncer() + rate_limiter = DummyRateLimiter() + sd_identifier = StreamDescriptorIdentifier() + + + db_dir = "client" + blob_dir = os.path.join(db_dir, "blobfiles") + os.mkdir(db_dir) + os.mkdir(blob_dir) + + self.session = Session( + MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", + peer_finder=peer_finder, hash_announcer=hash_announcer, + blob_dir=blob_dir, peer_port=5553, + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, + blob_tracker_class=DummyBlobAvailabilityTracker, + is_generous=self.is_generous + ) + + self.stream_info_manager = TempEncryptedFileMetadataManager() + + self.lbry_file_manager = EncryptedFileManager( + self.session, self.stream_info_manager, sd_identifier) + + d = self.session.setup() + d.addCallback(lambda _: self.stream_info_manager.setup()) + d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier)) + d.addCallback(lambda _: self.lbry_file_manager.setup()) + + def verify_equal(sd_info): + self.assertEqual(sd_info, test_create_stream_sd_file) + + def verify_stream_descriptor_file(stream_hash): + d = get_sd_info(self.lbry_file_manager.stream_info_manager, stream_hash, True) + d.addCallback(verify_equal) + return d + + def iv_generator(): + iv = 0 + while 1: + iv += 1 + yield "%016d" % iv + + def create_stream(): + test_file = GenFile(5209343, b''.join([chr(i + 3) for i in xrange(0, 64, 6)])) + d = create_lbry_file(self.session, self.lbry_file_manager, "test_file", test_file, + key="0123456701234567", iv_generator=iv_generator()) + return d + + d.addCallback(lambda _: create_stream()) + d.addCallback(verify_stream_descriptor_file) + return d + + def test_create_and_combine_stream(self): + wallet = FakeWallet() + peer_manager = PeerManager() + peer_finder = FakePeerFinder(5553, peer_manager, 2) + hash_announcer = FakeAnnouncer() + rate_limiter = DummyRateLimiter() + sd_identifier = StreamDescriptorIdentifier() + + db_dir = "client" + blob_dir = os.path.join(db_dir, "blobfiles") + os.mkdir(db_dir) + os.mkdir(blob_dir) + + self.session = Session( + MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", + peer_finder=peer_finder, hash_announcer=hash_announcer, + blob_dir=blob_dir, peer_port=5553, + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, + blob_tracker_class=DummyBlobAvailabilityTracker + ) + + self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir) + + self.lbry_file_manager = EncryptedFileManager( + self.session, self.stream_info_manager, sd_identifier) + + def start_lbry_file(lbry_file): + logging.debug("Calling lbry_file.start()") + d = lbry_file.start() + return d + + def combine_stream(stream_hash): + prm = self.session.payment_rate_manager + d = self.lbry_file_manager.add_lbry_file(stream_hash, prm) + d.addCallback(start_lbry_file) + + def check_md5_sum(): + f = open('test_file') + hashsum = MD5.new() + hashsum.update(f.read()) + self.assertEqual(hashsum.hexdigest(), "68959747edc73df45e45db6379dd7b3b") + + d.addCallback(lambda _: check_md5_sum()) + return d + + def create_stream(): + test_file = GenFile(53209343, b''.join([chr(i + 5) for i in xrange(0, 64, 6)])) + return create_lbry_file( + self.session, self.lbry_file_manager, "test_file", test_file, + suggested_file_name="test_file") + + d = self.session.setup() + d.addCallback(lambda _: self.stream_info_manager.setup()) + d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier)) + d.addCallback(lambda _: self.lbry_file_manager.setup()) + d.addCallback(lambda _: create_stream()) + d.addCallback(combine_stream) + return d diff --git a/tests/mocks.py b/tests/mocks.py index 1dbb3fdf3..da43e86fa 100644 --- a/tests/mocks.py +++ b/tests/mocks.py @@ -5,7 +5,7 @@ from decimal import Decimal from twisted.internet import defer, threads, task, error from lbrynet.core import PTCWallet -from lbrynet.core.BlobAvailability import BlobAvailabilityTracker +from lbrynet.core import BlobAvailability class Node(object): @@ -54,6 +54,9 @@ class Wallet(object): def set_public_key_for_peer(self, peer, public_key): pass + def get_claim_metadata_for_sd_hash(self, sd_hash): + return "fakeuri", "faketxid" + class PeerFinder(object): def __init__(self, start_port, peer_manager, num_peers): @@ -136,7 +139,7 @@ class GenFile(io.RawIOBase): return output -class DummyBlobAvailabilityTracker(BlobAvailabilityTracker): +class BlobAvailabilityTracker(BlobAvailability.BlobAvailabilityTracker): """ Class to track peer counts for known blobs, and to discover new popular blobs diff --git a/tests/unit/core/server/test_BlobRequestHandler.py b/tests/unit/core/server/test_BlobRequestHandler.py index 31d7e48ee..066abc850 100644 --- a/tests/unit/core/server/test_BlobRequestHandler.py +++ b/tests/unit/core/server/test_BlobRequestHandler.py @@ -8,7 +8,7 @@ from twisted.trial import unittest from lbrynet.core import Peer from lbrynet.core.server import BlobRequestHandler from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager -from tests.mocks import DummyBlobAvailabilityTracker +from tests.mocks import BlobAvailabilityTracker as DummyBlobAvailabilityTracker class TestBlobRequestHandlerQueries(unittest.TestCase): diff --git a/tests/unit/core/test_Strategy.py b/tests/unit/core/test_Strategy.py index 62e18d7f7..1cf5bb39e 100644 --- a/tests/unit/core/test_Strategy.py +++ b/tests/unit/core/test_Strategy.py @@ -5,7 +5,7 @@ import mock from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager from lbrynet.core.Strategy import BasicAvailabilityWeightedStrategy from lbrynet.core.Offer import Offer -from tests.mocks import DummyBlobAvailabilityTracker +from tests.mocks import BlobAvailabilityTracker as DummyBlobAvailabilityTracker MAX_NEGOTIATION_TURNS = 10 random.seed(12345)