cleanup tests to improve readability

This commit is contained in:
Job Evers-Meltzer 2016-10-20 09:23:39 -07:00
parent 3f5efb1fa3
commit bfcfa9dc3c
6 changed files with 322 additions and 433 deletions

View file

@ -1,10 +1,11 @@
import shutil import io
from multiprocessing import Process, Event, Queue
import logging import logging
from multiprocessing import Process, Event, Queue
import os
import platform import platform
import shutil
import sys import sys
import random import random
import io
import unittest import unittest
from Crypto.PublicKey import RSA from Crypto.PublicKey import RSA
@ -14,7 +15,8 @@ from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE
from lbrynet.lbrylive.LiveStreamCreator import FileLiveStreamCreator from lbrynet.lbrylive.LiveStreamCreator import FileLiveStreamCreator
from lbrynet.lbrylive.LiveStreamMetadataManager import DBLiveStreamMetadataManager from lbrynet.lbrylive.LiveStreamMetadataManager import DBLiveStreamMetadataManager
from lbrynet.lbrylive.LiveStreamMetadataManager import TempLiveStreamMetadataManager from lbrynet.lbrylive.LiveStreamMetadataManager import TempLiveStreamMetadataManager
from lbrynet.lbryfile.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager, DBEncryptedFileMetadataManager from lbrynet.lbryfile.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager
from lbrynet.lbryfile.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager
from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager
from lbrynet.core.PTCWallet import PointTraderKeyQueryHandlerFactory, PointTraderKeyExchanger from lbrynet.core.PTCWallet import PointTraderKeyQueryHandlerFactory, PointTraderKeyExchanger
from lbrynet.core.Session import Session from lbrynet.core.Session import Session
@ -28,17 +30,26 @@ from lbrynet.lbryfile.StreamDescriptor import get_sd_info
from twisted.internet import defer, threads, task from twisted.internet import defer, threads, task
from twisted.trial.unittest import TestCase from twisted.trial.unittest import TestCase
from twisted.python.failure import Failure from twisted.python.failure import Failure
import os
from lbrynet.dht.node import Node from lbrynet.dht.node import Node
from tests.mocks import DummyBlobAvailabilityTracker
from lbrynet.core.PeerManager import PeerManager from lbrynet.core.PeerManager import PeerManager
from lbrynet.core.RateLimiter import DummyRateLimiter, RateLimiter from lbrynet.core.RateLimiter import DummyRateLimiter, RateLimiter
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
from lbrynet.lbrylive.server.LiveBlobInfoQueryHandler import CryptBlobInfoQueryHandlerFactory from lbrynet.lbrylive.server.LiveBlobInfoQueryHandler import CryptBlobInfoQueryHandlerFactory
from lbrynet.lbrylive.client.LiveStreamOptions import add_live_stream_to_sd_identifier from lbrynet.lbrylive.client.LiveStreamOptions import add_live_stream_to_sd_identifier
from lbrynet.lbrylive.client.LiveStreamDownloader import add_full_live_stream_downloader_to_sd_identifier from lbrynet.lbrylive.client.LiveStreamDownloader import add_full_live_stream_downloader_to_sd_identifier
from tests import mocks
FakeNode = mocks.Node
FakeWallet = mocks.Wallet
FakePeerFinder = mocks.PeerFinder
FakeAnnouncer = mocks.Announcer
GenFile = mocks.GenFile
test_create_stream_sd_file = mocks.create_stream_sd_file
DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker
log_format = "%(funcName)s(): %(message)s" log_format = "%(funcName)s(): %(message)s"
logging.basicConfig(level=logging.WARNING, format=log_format) logging.basicConfig(level=logging.WARNING, format=log_format)
@ -52,162 +63,7 @@ def require_system(system):
else: else:
return unittest.skip("Skipping. Test can only be run on " + system) return unittest.skip("Skipping. Test can only be run on " + system)
def use_epoll_on_linux():
class FakeNode(object):
def __init__(self, *args, **kwargs):
pass
def joinNetwork(self, *args):
pass
def stop(self):
pass
class FakeWallet(object):
def __init__(self):
self.private_key = RSA.generate(1024)
self.encoded_public_key = self.private_key.publickey().exportKey()
def start(self):
return defer.succeed(True)
def stop(self):
return defer.succeed(True)
def get_info_exchanger(self):
return PointTraderKeyExchanger(self)
def get_wallet_info_query_handler_factory(self):
return PointTraderKeyQueryHandlerFactory(self)
def reserve_points(self, *args):
return True
def cancel_point_reservation(self, *args):
pass
def send_points(self, *args):
return defer.succeed(True)
def add_expected_payment(self, *args):
pass
def get_balance(self):
return defer.succeed(1000)
def set_public_key_for_peer(self, peer, public_key):
pass
def get_claim_metadata_for_sd_hash(self, sd_hash):
return "fakeuri", "faketxid"
class FakePeerFinder(object):
def __init__(self, start_port, peer_manager, num_peers):
self.start_port = start_port
self.peer_manager = peer_manager
self.num_peers = num_peers
self.count = 0
def find_peers_for_blob(self, *args):
peer_port = self.start_port + self.count
self.count += 1
if self.count >= self.num_peers:
self.count = 0
return defer.succeed([self.peer_manager.get_peer("127.0.0.1", peer_port)])
def run_manage_loop(self):
pass
def stop(self):
pass
class FakeAnnouncer(object):
def __init__(self, *args):
pass
def add_supplier(self, supplier):
pass
def immediate_announce(self, *args):
pass
def run_manage_loop(self):
pass
def stop(self):
pass
class GenFile(io.RawIOBase):
def __init__(self, size, pattern):
io.RawIOBase.__init__(self)
self.size = size
self.pattern = pattern
self.read_so_far = 0
self.buff = b''
self.last_offset = 0
def readable(self):
return True
def writable(self):
return False
def read(self, n=-1):
if n > -1:
bytes_to_read = min(n, self.size - self.read_so_far)
else:
bytes_to_read = self.size - self.read_so_far
output, self.buff = self.buff[:bytes_to_read], self.buff[bytes_to_read:]
bytes_to_read -= len(output)
while bytes_to_read > 0:
self.buff = self._generate_chunk()
new_output, self.buff = self.buff[:bytes_to_read], self.buff[bytes_to_read:]
bytes_to_read -= len(new_output)
output += new_output
self.read_so_far += len(output)
return output
def readall(self):
return self.read()
def _generate_chunk(self, n=2**10):
output = self.pattern[self.last_offset:self.last_offset + n]
n_left = n - len(output)
whole_patterns = n_left / len(self.pattern)
output += self.pattern * whole_patterns
self.last_offset = n - len(output)
output += self.pattern[:self.last_offset]
return output
test_create_stream_sd_file = {
'stream_name': '746573745f66696c65',
'blobs': [
{'length': 2097152, 'blob_num': 0,
'blob_hash':
'dc4708f76a5e7af0f1cae0ee96b824e2ed9250c9346c093b441f0a20d3607c17948b6fcfb4bc62020fe5286693d08586',
'iv': '30303030303030303030303030303031'},
{'length': 2097152, 'blob_num': 1,
'blob_hash':
'f4067522c1b49432a2a679512e3917144317caa1abba0c041e0cd2cf9f635d4cf127ce1824fa04189b63916174951f70',
'iv': '30303030303030303030303030303032'},
{'length': 1015056, 'blob_num': 2,
'blob_hash':
'305486c434260484fcb2968ce0e963b72f81ba56c11b08b1af0789b55b44d78422600f9a38e3cf4f2e9569897e5646a9',
'iv': '30303030303030303030303030303033'},
{'length': 0, 'blob_num': 3, 'iv': '30303030303030303030303030303034'}],
'stream_type': 'lbryfile',
'key': '30313233343536373031323334353637',
'suggested_file_name': '746573745f66696c65',
'stream_hash': '6d27fbe10c86d81aacfb897c7a426d0a2214f5a299455a6d315c0f998c4b3545c2dc60906122d94653c23b1898229e3f'}
def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rate_limit=None, is_generous=False):
if sys.platform.startswith("linux"): if sys.platform.startswith("linux"):
sys.modules = sys.modules.copy() sys.modules = sys.modules.copy()
del sys.modules['twisted.internet.reactor'] del sys.modules['twisted.internet.reactor']
@ -215,47 +71,63 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rat
twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor() twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor()
sys.modules['twisted.internet.reactor'] = twisted.internet.reactor sys.modules['twisted.internet.reactor'] = twisted.internet.reactor
from twisted.internet import reactor
logging.debug("Starting the uploader") class LbryUploader(object):
def __init__(self, sd_hash_queue, kill_event, dead_event,
file_size, ul_rate_limit=None, is_generous=False):
self.sd_hash_queue = sd_hash_queue
self.kill_event = kill_event
self.dead_event = dead_event
self.file_size = file_size
self.ul_rate_limit = ul_rate_limit
self.is_generous = is_generous
# these attributes get defined in `start`
self.reactor = None
self.sd_identifier = None
self.session = None
self.lbry_file_manager = None
self.server_port = None
self.kill_check = None
Random.atfork() def start(self):
use_epoll_on_linux()
from twisted.internet import reactor
self.reactor = reactor
logging.debug("Starting the uploader")
Random.atfork()
r = random.Random()
r.seed("start_lbry_uploader")
wallet = FakeWallet()
peer_manager = PeerManager()
peer_finder = FakePeerFinder(5553, peer_manager, 1)
hash_announcer = FakeAnnouncer()
rate_limiter = RateLimiter()
self.sd_identifier = StreamDescriptorIdentifier()
db_dir = "server"
os.mkdir(db_dir)
self.session = Session(
MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
dht_node_class=Node, is_generous=self.is_generous)
stream_info_manager = TempEncryptedFileMetadataManager()
self.lbry_file_manager = EncryptedFileManager(
self.session, stream_info_manager, self.sd_identifier)
if self.ul_rate_limit is not None:
self.session.rate_limiter.set_ul_limit(self.ul_rate_limit)
reactor.callLater(1, self.start_all)
if not reactor.running:
reactor.run()
r = random.Random() def start_all(self):
r.seed("start_lbry_uploader") d = self.session.setup()
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(self.sd_identifier))
wallet = FakeWallet() d.addCallback(lambda _: self.lbry_file_manager.setup())
peer_manager = PeerManager() d.addCallback(lambda _: self.start_server())
peer_finder = FakePeerFinder(5553, peer_manager, 1) d.addCallback(lambda _: self.create_stream())
hash_announcer = FakeAnnouncer() d.addCallback(self.create_stream_descriptor)
rate_limiter = RateLimiter() d.addCallback(self.put_sd_hash_on_queue)
sd_identifier = StreamDescriptorIdentifier()
db_dir = "server"
os.mkdir(db_dir)
session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker,
dht_node_class=Node, is_generous=is_generous)
stream_info_manager = TempEncryptedFileMetadataManager()
lbry_file_manager = EncryptedFileManager(session, stream_info_manager, sd_identifier)
if ul_rate_limit is not None:
session.rate_limiter.set_ul_limit(ul_rate_limit)
def start_all():
d = session.setup()
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier))
d.addCallback(lambda _: lbry_file_manager.setup())
d.addCallback(lambda _: start_server())
d.addCallback(lambda _: create_stream())
d.addCallback(create_stream_descriptor)
d.addCallback(put_sd_hash_on_queue)
def print_error(err): def print_error(err):
logging.critical("Server error: %s", err.getErrorMessage()) logging.critical("Server error: %s", err.getErrorMessage())
@ -263,71 +135,57 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rat
d.addErrback(print_error) d.addErrback(print_error)
return d return d
def start_server(): def start_server(self):
session = self.session
server_port = None
query_handler_factories = { query_handler_factories = {
BlobRequestHandlerFactory(session.blob_manager, session.wallet, BlobRequestHandlerFactory(session.blob_manager, session.wallet,
session.payment_rate_manager): True, session.payment_rate_manager): True,
session.wallet.get_wallet_info_query_handler_factory(): True, session.wallet.get_wallet_info_query_handler_factory(): True,
} }
server_factory = ServerProtocolFactory(session.rate_limiter, server_factory = ServerProtocolFactory(session.rate_limiter,
query_handler_factories, query_handler_factories,
session.peer_manager) session.peer_manager)
self.server_port = self.reactor.listenTCP(5553, server_factory)
server_port = reactor.listenTCP(5553, server_factory)
logging.debug("Started listening") logging.debug("Started listening")
self.kill_check = task.LoopingCall(self.check_for_kill)
def kill_server(): self.kill_check.start(1.0)
ds = []
ds.append(session.shut_down())
ds.append(lbry_file_manager.stop())
if server_port:
ds.append(server_port.stopListening())
kill_check.stop()
dead_event.set()
dl = defer.DeferredList(ds)
dl.addCallback(lambda _: reactor.stop())
return dl
def check_for_kill():
if kill_event.is_set():
kill_server()
kill_check = task.LoopingCall(check_for_kill)
kill_check.start(1.0)
return True return True
def create_stream(): def kill_server(self):
test_file = GenFile(file_size, b''.join([chr(i) for i in xrange(0, 64, 6)])) session = self.session
d = create_lbry_file(session, lbry_file_manager, "test_file", test_file) ds = []
ds.append(session.shut_down())
ds.append(self.lbry_file_manager.stop())
if self.server_port:
ds.append(self.server_port.stopListening())
self.kill_check.stop()
self.dead_event.set()
dl = defer.DeferredList(ds)
dl.addCallback(lambda _: self.reactor.stop())
return dl
def check_for_kill(self):
if self.kill_event.is_set():
self.kill_server()
def create_stream(self):
test_file = GenFile(self.file_size, b''.join([chr(i) for i in xrange(0, 64, 6)]))
d = create_lbry_file(self.session, self.lbry_file_manager, "test_file", test_file)
return d return d
def create_stream_descriptor(stream_hash): def create_stream_descriptor(self, stream_hash):
descriptor_writer = BlobStreamDescriptorWriter(session.blob_manager) descriptor_writer = BlobStreamDescriptorWriter(self.session.blob_manager)
d = get_sd_info(lbry_file_manager.stream_info_manager, stream_hash, True) d = get_sd_info(self.lbry_file_manager.stream_info_manager, stream_hash, True)
d.addCallback(descriptor_writer.create_descriptor) d.addCallback(descriptor_writer.create_descriptor)
return d return d
def put_sd_hash_on_queue(sd_hash): def put_sd_hash_on_queue(self, sd_hash):
sd_hash_queue.put(sd_hash) self.sd_hash_queue.put(sd_hash)
reactor.callLater(1, start_all)
if not reactor.running:
reactor.run()
def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_rate_limit=None, is_generous=False): def start_lbry_reuploader(sd_hash, kill_event, dead_event,
ready_event, n, ul_rate_limit=None, is_generous=False):
if sys.platform.startswith("linux"): use_epoll_on_linux()
sys.modules = sys.modules.copy()
del sys.modules['twisted.internet.reactor']
import twisted.internet
twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor()
sys.modules['twisted.internet.reactor'] = twisted.internet.reactor
from twisted.internet import reactor from twisted.internet import reactor
logging.debug("Starting the uploader") logging.debug("Starting the uploader")
@ -335,7 +193,7 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_ra
Random.atfork() Random.atfork()
r = random.Random() r = random.Random()
r.seed("start_lbry_uploader") r.seed("start_lbry_reuploader")
wallet = FakeWallet() wallet = FakeWallet()
peer_port = 5553 + n peer_port = 5553 + n
@ -434,14 +292,7 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_ra
def start_live_server(sd_hash_queue, kill_event, dead_event): def start_live_server(sd_hash_queue, kill_event, dead_event):
use_epoll_on_linux()
if sys.platform.startswith("linux"):
sys.modules = sys.modules.copy()
del sys.modules['twisted.internet.reactor']
import twisted.internet
twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor()
sys.modules['twisted.internet.reactor'] = twisted.internet.reactor
from twisted.internet import reactor from twisted.internet import reactor
logging.debug("In start_server.") logging.debug("In start_server.")
@ -566,14 +417,7 @@ def start_live_server(sd_hash_queue, kill_event, dead_event):
def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_generous=False): def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_generous=False):
use_epoll_on_linux()
if sys.platform.startswith("linux"):
sys.modules = sys.modules.copy()
del sys.modules['twisted.internet.reactor']
import twisted.internet
twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor()
sys.modules['twisted.internet.reactor'] = twisted.internet.reactor
from twisted.internet import reactor from twisted.internet import reactor
logging.debug("Starting the uploader") logging.debug("Starting the uploader")
@ -751,7 +595,8 @@ class TestTransfer(TestCase):
sd_hash_queue = Queue() sd_hash_queue = Queue()
kill_event = Event() kill_event = Event()
dead_event = Event() dead_event = Event()
uploader = Process(target=start_lbry_uploader, args=(sd_hash_queue, kill_event, dead_event, 5209343)) lbry_uploader = LbryUploader(sd_hash_queue, kill_event, dead_event, 5209343)
uploader = Process(target=lbry_uploader.start)
uploader.start() uploader.start()
self.server_processes.append(uploader) self.server_processes.append(uploader)
@ -770,21 +615,25 @@ class TestTransfer(TestCase):
os.mkdir(db_dir) os.mkdir(db_dir)
os.mkdir(blob_dir) os.mkdir(blob_dir)
self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", self.session = Session(
peer_finder=peer_finder, hash_announcer=hash_announcer, MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
blob_dir=blob_dir, peer_port=5553, peer_finder=peer_finder, hash_announcer=hash_announcer,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, blob_dir=blob_dir, peer_port=5553,
dht_node_class=Node, is_generous=self.is_generous) use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
dht_node_class=Node, is_generous=self.is_generous)
self.stream_info_manager = TempEncryptedFileMetadataManager() self.stream_info_manager = TempEncryptedFileMetadataManager()
self.lbry_file_manager = EncryptedFileManager(self.session, self.stream_info_manager, sd_identifier) self.lbry_file_manager = EncryptedFileManager(
self.session, self.stream_info_manager, sd_identifier)
def make_downloader(metadata, prm): def make_downloader(metadata, prm):
info_validator = metadata.validator info_validator = metadata.validator
options = metadata.options options = metadata.options
factories = metadata.factories factories = metadata.factories
chosen_options = [o.default_value for o in options.get_downloader_options(info_validator, prm)] chosen_options = [
o.default_value for o in options.get_downloader_options(info_validator, prm)]
return factories[0].make_downloader(metadata, chosen_options, prm) return factories[0].make_downloader(metadata, chosen_options, prm)
def download_file(sd_hash): def download_file(sd_hash):
@ -856,10 +705,12 @@ class TestTransfer(TestCase):
db_dir = "client" db_dir = "client"
os.mkdir(db_dir) os.mkdir(db_dir)
self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", self.session = Session(
peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=None, MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=None,
blob_tracker_class=DummyBlobAvailabilityTracker, dht_node_class=Node) peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker, dht_node_class=Node
)
self.stream_info_manager = TempLiveStreamMetadataManager(hash_announcer) self.stream_info_manager = TempLiveStreamMetadataManager(hash_announcer)
@ -869,7 +720,8 @@ class TestTransfer(TestCase):
info_validator = metadata.validator info_validator = metadata.validator
options = metadata.options options = metadata.options
factories = metadata.factories factories = metadata.factories
chosen_options = [o.default_value for o in options.get_downloader_options(info_validator, prm)] chosen_options = [
o.default_value for o in options.get_downloader_options(info_validator, prm)]
return factories[0].make_downloader(metadata, chosen_options, prm) return factories[0].make_downloader(metadata, chosen_options, prm)
def start_lbry_file(lbry_file): def start_lbry_file(lbry_file):
@ -928,7 +780,6 @@ class TestTransfer(TestCase):
return d return d
def test_last_blob_retrieval(self): def test_last_blob_retrieval(self):
kill_event = Event() kill_event = Event()
dead_event_1 = Event() dead_event_1 = Event()
blob_hash_queue_1 = Queue() blob_hash_queue_1 = Queue()
@ -957,10 +808,12 @@ class TestTransfer(TestCase):
os.mkdir(db_dir) os.mkdir(db_dir)
os.mkdir(blob_dir) os.mkdir(blob_dir)
self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", self.session = Session(
peer_finder=peer_finder, hash_announcer=hash_announcer, MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
blob_dir=blob_dir, peer_port=5553, peer_finder=peer_finder, hash_announcer=hash_announcer,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker) blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker)
d1 = self.wait_for_hash_from_queue(blob_hash_queue_1) d1 = self.wait_for_hash_from_queue(blob_hash_queue_1)
d2 = self.wait_for_hash_from_queue(blob_hash_queue_2) d2 = self.wait_for_hash_from_queue(blob_hash_queue_2)
@ -974,8 +827,8 @@ class TestTransfer(TestCase):
def download_blob(blob_hash): def download_blob(blob_hash):
prm = self.session.payment_rate_manager prm = self.session.payment_rate_manager
downloader = StandaloneBlobDownloader(blob_hash, self.session.blob_manager, peer_finder, downloader = StandaloneBlobDownloader(
rate_limiter, prm, wallet) blob_hash, self.session.blob_manager, peer_finder, rate_limiter, prm, wallet)
d = downloader.download() d = downloader.download()
return d return d
@ -1000,23 +853,20 @@ class TestTransfer(TestCase):
d1 = self.wait_for_event(dead_event_1, 15) d1 = self.wait_for_event(dead_event_1, 15)
d2 = self.wait_for_event(dead_event_2, 15) d2 = self.wait_for_event(dead_event_2, 15)
dl = defer.DeferredList([d1, d2]) dl = defer.DeferredList([d1, d2])
def print_shutting_down(): def print_shutting_down():
logging.info("Client is shutting down") logging.info("Client is shutting down")
dl.addCallback(lambda _: print_shutting_down()) dl.addCallback(lambda _: print_shutting_down())
dl.addCallback(lambda _: arg) dl.addCallback(lambda _: arg)
return dl return dl
d.addBoth(stop) d.addBoth(stop)
return d return d
def test_double_download(self): def test_double_download(self):
sd_hash_queue = Queue() sd_hash_queue = Queue()
kill_event = Event() kill_event = Event()
dead_event = Event() dead_event = Event()
uploader = Process(target=start_lbry_uploader, args=(sd_hash_queue, kill_event, dead_event, 5209343)) lbry_uploader = LbryUploader(sd_hash_queue, kill_event, dead_event, 5209343)
uploader = Process(target=lbry_uploader.start)
uploader.start() uploader.start()
self.server_processes.append(uploader) self.server_processes.append(uploader)
@ -1132,8 +982,9 @@ class TestTransfer(TestCase):
kill_event = Event() kill_event = Event()
dead_events = [Event() for _ in range(num_uploaders)] dead_events = [Event() for _ in range(num_uploaders)]
ready_events = [Event() for _ in range(1, num_uploaders)] ready_events = [Event() for _ in range(1, num_uploaders)]
uploader = Process(target=start_lbry_uploader, args=(sd_hash_queue, kill_event, dead_events[0], lbry_uploader = LbryUploader(
9373419, 2**22)) sd_hash_queue, kill_event, dead_events[0], 5209343, 9373419, 2**22)
uploader = Process(target=lbry_uploader.start)
uploader.start() uploader.start()
self.server_processes.append(uploader) self.server_processes.append(uploader)
@ -1228,140 +1079,3 @@ class TestTransfer(TestCase):
d.addBoth(stop) d.addBoth(stop)
return d return d
class TestStreamify(TestCase):
def setUp(self):
self.session = None
self.stream_info_manager = None
self.lbry_file_manager = None
self.addCleanup(self.take_down_env)
self.is_generous = True
def take_down_env(self):
d = defer.succeed(True)
if self.lbry_file_manager is not None:
d.addCallback(lambda _: self.lbry_file_manager.stop())
if self.session is not None:
d.addCallback(lambda _: self.session.shut_down())
if self.stream_info_manager is not None:
d.addCallback(lambda _: self.stream_info_manager.stop())
def delete_test_env():
shutil.rmtree('client')
if os.path.exists("test_file"):
os.remove("test_file")
d.addCallback(lambda _: threads.deferToThread(delete_test_env))
return d
def test_create_stream(self):
wallet = FakeWallet()
peer_manager = PeerManager()
peer_finder = FakePeerFinder(5553, peer_manager, 2)
hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier()
db_dir = "client"
blob_dir = os.path.join(db_dir, "blobfiles")
os.mkdir(db_dir)
os.mkdir(blob_dir)
self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker, is_generous=self.is_generous)
self.stream_info_manager = TempEncryptedFileMetadataManager()
self.lbry_file_manager = EncryptedFileManager(self.session, self.stream_info_manager, sd_identifier)
d = self.session.setup()
d.addCallback(lambda _: self.stream_info_manager.setup())
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier))
d.addCallback(lambda _: self.lbry_file_manager.setup())
def verify_equal(sd_info):
self.assertEqual(sd_info, test_create_stream_sd_file)
def verify_stream_descriptor_file(stream_hash):
d = get_sd_info(self.lbry_file_manager.stream_info_manager, stream_hash, True)
d.addCallback(verify_equal)
return d
def iv_generator():
iv = 0
while 1:
iv += 1
yield "%016d" % iv
def create_stream():
test_file = GenFile(5209343, b''.join([chr(i + 3) for i in xrange(0, 64, 6)]))
d = create_lbry_file(self.session, self.lbry_file_manager, "test_file", test_file,
key="0123456701234567", iv_generator=iv_generator())
return d
d.addCallback(lambda _: create_stream())
d.addCallback(verify_stream_descriptor_file)
return d
def test_create_and_combine_stream(self):
wallet = FakeWallet()
peer_manager = PeerManager()
peer_finder = FakePeerFinder(5553, peer_manager, 2)
hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier()
db_dir = "client"
blob_dir = os.path.join(db_dir, "blobfiles")
os.mkdir(db_dir)
os.mkdir(blob_dir)
self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker)
self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir)
self.lbry_file_manager = EncryptedFileManager(self.session, self.stream_info_manager, sd_identifier)
def start_lbry_file(lbry_file):
logging.debug("Calling lbry_file.start()")
d = lbry_file.start()
return d
def combine_stream(stream_hash):
prm = self.session.payment_rate_manager
d = self.lbry_file_manager.add_lbry_file(stream_hash, prm)
d.addCallback(start_lbry_file)
def check_md5_sum():
f = open('test_file')
hashsum = MD5.new()
hashsum.update(f.read())
self.assertEqual(hashsum.hexdigest(), "68959747edc73df45e45db6379dd7b3b")
d.addCallback(lambda _: check_md5_sum())
return d
def create_stream():
test_file = GenFile(53209343, b''.join([chr(i + 5) for i in xrange(0, 64, 6)]))
return create_lbry_file(self.session, self.lbry_file_manager, "test_file", test_file,
suggested_file_name="test_file")
d = self.session.setup()
d.addCallback(lambda _: self.stream_info_manager.setup())
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier))
d.addCallback(lambda _: self.lbry_file_manager.setup())
d.addCallback(lambda _: create_stream())
d.addCallback(combine_stream)
return d

View file

@ -93,7 +93,7 @@ class TestReflector(unittest.TestCase):
use_upnp=False, use_upnp=False,
rate_limiter=rate_limiter, rate_limiter=rate_limiter,
wallet=wallet, wallet=wallet,
blob_tracker_class=mocks.DummyBlobAvailabilityTracker, blob_tracker_class=mocks.BlobAvailabilityTracker,
dht_node_class=Node dht_node_class=Node
) )

View file

@ -0,0 +1,172 @@
import logging
import os
import shutil
from Crypto.Hash import MD5
from twisted.trial.unittest import TestCase
from twisted.internet import defer, threads
from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE
from lbrynet.lbryfile.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager
from lbrynet.lbryfile.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager
from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager
from lbrynet.core.Session import Session
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier
from lbrynet.lbryfilemanager.EncryptedFileCreator import create_lbry_file
from lbrynet.lbryfile.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier
from lbrynet.lbryfile.StreamDescriptor import get_sd_info
from lbrynet.core.PeerManager import PeerManager
from lbrynet.core.RateLimiter import DummyRateLimiter, RateLimiter
from tests import mocks
FakeNode = mocks.Node
FakeWallet = mocks.Wallet
FakePeerFinder = mocks.PeerFinder
FakeAnnouncer = mocks.Announcer
GenFile = mocks.GenFile
test_create_stream_sd_file = mocks.create_stream_sd_file
DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker
class TestStreamify(TestCase):
def setUp(self):
self.session = None
self.stream_info_manager = None
self.lbry_file_manager = None
self.addCleanup(self.take_down_env)
self.is_generous = True
def take_down_env(self):
d = defer.succeed(True)
if self.lbry_file_manager is not None:
d.addCallback(lambda _: self.lbry_file_manager.stop())
if self.session is not None:
d.addCallback(lambda _: self.session.shut_down())
if self.stream_info_manager is not None:
d.addCallback(lambda _: self.stream_info_manager.stop())
def delete_test_env():
shutil.rmtree('client')
if os.path.exists("test_file"):
os.remove("test_file")
d.addCallback(lambda _: threads.deferToThread(delete_test_env))
return d
def test_create_stream(self):
wallet = FakeWallet()
peer_manager = PeerManager()
peer_finder = FakePeerFinder(5553, peer_manager, 2)
hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier()
db_dir = "client"
blob_dir = os.path.join(db_dir, "blobfiles")
os.mkdir(db_dir)
os.mkdir(blob_dir)
self.session = Session(
MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
is_generous=self.is_generous
)
self.stream_info_manager = TempEncryptedFileMetadataManager()
self.lbry_file_manager = EncryptedFileManager(
self.session, self.stream_info_manager, sd_identifier)
d = self.session.setup()
d.addCallback(lambda _: self.stream_info_manager.setup())
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier))
d.addCallback(lambda _: self.lbry_file_manager.setup())
def verify_equal(sd_info):
self.assertEqual(sd_info, test_create_stream_sd_file)
def verify_stream_descriptor_file(stream_hash):
d = get_sd_info(self.lbry_file_manager.stream_info_manager, stream_hash, True)
d.addCallback(verify_equal)
return d
def iv_generator():
iv = 0
while 1:
iv += 1
yield "%016d" % iv
def create_stream():
test_file = GenFile(5209343, b''.join([chr(i + 3) for i in xrange(0, 64, 6)]))
d = create_lbry_file(self.session, self.lbry_file_manager, "test_file", test_file,
key="0123456701234567", iv_generator=iv_generator())
return d
d.addCallback(lambda _: create_stream())
d.addCallback(verify_stream_descriptor_file)
return d
def test_create_and_combine_stream(self):
wallet = FakeWallet()
peer_manager = PeerManager()
peer_finder = FakePeerFinder(5553, peer_manager, 2)
hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier()
db_dir = "client"
blob_dir = os.path.join(db_dir, "blobfiles")
os.mkdir(db_dir)
os.mkdir(blob_dir)
self.session = Session(
MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker
)
self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir)
self.lbry_file_manager = EncryptedFileManager(
self.session, self.stream_info_manager, sd_identifier)
def start_lbry_file(lbry_file):
logging.debug("Calling lbry_file.start()")
d = lbry_file.start()
return d
def combine_stream(stream_hash):
prm = self.session.payment_rate_manager
d = self.lbry_file_manager.add_lbry_file(stream_hash, prm)
d.addCallback(start_lbry_file)
def check_md5_sum():
f = open('test_file')
hashsum = MD5.new()
hashsum.update(f.read())
self.assertEqual(hashsum.hexdigest(), "68959747edc73df45e45db6379dd7b3b")
d.addCallback(lambda _: check_md5_sum())
return d
def create_stream():
test_file = GenFile(53209343, b''.join([chr(i + 5) for i in xrange(0, 64, 6)]))
return create_lbry_file(
self.session, self.lbry_file_manager, "test_file", test_file,
suggested_file_name="test_file")
d = self.session.setup()
d.addCallback(lambda _: self.stream_info_manager.setup())
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier))
d.addCallback(lambda _: self.lbry_file_manager.setup())
d.addCallback(lambda _: create_stream())
d.addCallback(combine_stream)
return d

View file

@ -5,7 +5,7 @@ from decimal import Decimal
from twisted.internet import defer, threads, task, error from twisted.internet import defer, threads, task, error
from lbrynet.core import PTCWallet from lbrynet.core import PTCWallet
from lbrynet.core.BlobAvailability import BlobAvailabilityTracker from lbrynet.core import BlobAvailability
class Node(object): class Node(object):
@ -54,6 +54,9 @@ class Wallet(object):
def set_public_key_for_peer(self, peer, public_key): def set_public_key_for_peer(self, peer, public_key):
pass pass
def get_claim_metadata_for_sd_hash(self, sd_hash):
return "fakeuri", "faketxid"
class PeerFinder(object): class PeerFinder(object):
def __init__(self, start_port, peer_manager, num_peers): def __init__(self, start_port, peer_manager, num_peers):
@ -136,7 +139,7 @@ class GenFile(io.RawIOBase):
return output return output
class DummyBlobAvailabilityTracker(BlobAvailabilityTracker): class BlobAvailabilityTracker(BlobAvailability.BlobAvailabilityTracker):
""" """
Class to track peer counts for known blobs, and to discover new popular blobs Class to track peer counts for known blobs, and to discover new popular blobs

View file

@ -8,7 +8,7 @@ from twisted.trial import unittest
from lbrynet.core import Peer from lbrynet.core import Peer
from lbrynet.core.server import BlobRequestHandler from lbrynet.core.server import BlobRequestHandler
from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager
from tests.mocks import DummyBlobAvailabilityTracker from tests.mocks import BlobAvailabilityTracker as DummyBlobAvailabilityTracker
class TestBlobRequestHandlerQueries(unittest.TestCase): class TestBlobRequestHandlerQueries(unittest.TestCase):

View file

@ -5,7 +5,7 @@ import mock
from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager
from lbrynet.core.Strategy import BasicAvailabilityWeightedStrategy from lbrynet.core.Strategy import BasicAvailabilityWeightedStrategy
from lbrynet.core.Offer import Offer from lbrynet.core.Offer import Offer
from tests.mocks import DummyBlobAvailabilityTracker from tests.mocks import BlobAvailabilityTracker as DummyBlobAvailabilityTracker
MAX_NEGOTIATION_TURNS = 10 MAX_NEGOTIATION_TURNS = 10
random.seed(12345) random.seed(12345)