diff --git a/CHANGELOG.md b/CHANGELOG.md index f5aabf193..715b4a591 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,9 @@ at anytime. * Fixed value error due to a race condition when saving to the claim cache (https://github.com/lbryio/lbry/issues/1013) * Fixed being unable to re-download updated content (#951) * Fixed sending error messages for failed api requests + * Fixed the file manager startup being slow when handling thousands of files + * Fixed handling decryption error for blobs encrypted with an invalid key + * Fixed handling stream with no data blob (https://github.com/lbryio/lbry/issues/905) ### Deprecated * `channel_list_mine`, replaced with `channel_list` diff --git a/lbrynet/conf.py b/lbrynet/conf.py index 3fad69588..29fb2b327 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -566,7 +566,7 @@ class Config(object): if not self._installation_id: if os.path.isfile(install_id_filename): with open(install_id_filename, "r") as install_id_file: - self._installation_id = install_id_file.read() + self._installation_id = str(install_id_file.read()).strip() if not self._installation_id: self._installation_id = base58.b58encode(utils.generate_id()) with open(install_id_filename, "w") as install_id_file: @@ -578,7 +578,7 @@ class Config(object): if not self._node_id: if os.path.isfile(node_id_filename): with open(node_id_filename, "r") as node_id_file: - self._node_id = base58.b58decode(node_id_file.read()) + self._node_id = base58.b58decode(str(node_id_file.read()).strip()) if not self._node_id: self._node_id = utils.generate_id() with open(node_id_filename, "w") as node_id_file: diff --git a/lbrynet/core/Wallet.py b/lbrynet/core/Wallet.py index 6c237c368..501c0d959 100644 --- a/lbrynet/core/Wallet.py +++ b/lbrynet/core/Wallet.py @@ -941,6 +941,7 @@ class Wallet(object): bid, certificate_id, claim_address, change_address) if not claim['success']: + log.error(claim) msg = 'Claim to name {} failed: {}'.format(name, claim['reason']) raise Exception(msg) @@ -1312,9 +1313,14 @@ class LBRYumWallet(Wallet): return defer.succeed(True) def _check_large_wallet(self): - if len(self.wallet.addresses(include_change=False)) > 1000: - log.warning(("Your wallet is excessively large, please follow instructions here: ", - "https://github.com/lbryio/lbry/issues/437 to reduce your wallet size")) + addr_count = len(self.wallet.addresses(include_change=False)) + if addr_count > 1000: + log.warning("Your wallet is excessively large (%i addresses), " + "please follow instructions here: " + "https://github.com/lbryio/lbry/issues/437 to reduce your wallet size", + addr_count) + else: + log.info("Wallet has %i addresses", addr_count) def _load_blockchain(self): blockchain_caught_d = defer.Deferred() diff --git a/lbrynet/core/client/ConnectionManager.py b/lbrynet/core/client/ConnectionManager.py index af15c509e..b781628fb 100644 --- a/lbrynet/core/client/ConnectionManager.py +++ b/lbrynet/core/client/ConnectionManager.py @@ -179,8 +179,12 @@ class ConnectionManager(object): # find peers for the head blob if configured to do so if self.seek_head_blob_first: - peers = yield request_creator.get_new_peers_for_head_blob() - peers = self.return_shuffled_peers_not_connected_to(peers, new_conns_needed) + try: + peers = yield request_creator.get_new_peers_for_head_blob() + peers = self.return_shuffled_peers_not_connected_to(peers, new_conns_needed) + except KeyError: + log.warning("%s does not have a head blob", self._get_log_name()) + peers = [] else: peers = [] @@ -196,10 +200,8 @@ class ConnectionManager(object): self._get_log_name(), self._peer_connections.keys()) log.debug("%s List of connection states: %s", self._get_log_name(), [p_c_h.connection.state for p_c_h in self._peer_connections.values()]) - defer.returnValue(peers) - def _connect_to_peer(self, peer): if self.stopped: return diff --git a/lbrynet/core/client/StreamProgressManager.py b/lbrynet/core/client/StreamProgressManager.py index bc16fe560..9bfee80b5 100644 --- a/lbrynet/core/client/StreamProgressManager.py +++ b/lbrynet/core/client/StreamProgressManager.py @@ -154,7 +154,8 @@ class FullStreamProgressManager(StreamProgressManager): d.addCallback(lambda _: check_if_finished()) def log_error(err): - log.warning("Error occurred in the output loop. Error: %s", err) + log.warning("Error outputting blob %s: %s", blobs[current_blob_num].blob_hash, + err.getErrorMessage()) if self.outputting_d is not None and not self.outputting_d.called: self.outputting_d.callback(True) self.outputting_d = None diff --git a/lbrynet/cryptstream/client/CryptBlobHandler.py b/lbrynet/cryptstream/client/CryptBlobHandler.py index c8b52a473..3df94f5bd 100644 --- a/lbrynet/cryptstream/client/CryptBlobHandler.py +++ b/lbrynet/cryptstream/client/CryptBlobHandler.py @@ -1,5 +1,6 @@ import binascii from zope.interface import implements +from twisted.internet import defer from lbrynet.cryptstream.CryptBlob import StreamBlobDecryptor from lbrynet.interfaces import IBlobHandler @@ -14,7 +15,10 @@ class CryptBlobHandler(object): ######## IBlobHandler ######### def handle_blob(self, blob, blob_info): - blob_decryptor = StreamBlobDecryptor( - blob, self.key, binascii.unhexlify(blob_info.iv), blob_info.length) + try: + blob_decryptor = StreamBlobDecryptor(blob, self.key, binascii.unhexlify(blob_info.iv), + blob_info.length) + except ValueError as err: + return defer.fail(err) d = blob_decryptor.decrypt(self.write_func) return d diff --git a/lbrynet/cryptstream/client/CryptStreamDownloader.py b/lbrynet/cryptstream/client/CryptStreamDownloader.py index 9d2ba3b4f..706c12903 100644 --- a/lbrynet/cryptstream/client/CryptStreamDownloader.py +++ b/lbrynet/cryptstream/client/CryptStreamDownloader.py @@ -1,3 +1,4 @@ +import binascii import logging from zope.interface import implements from lbrynet.interfaces import IStreamDownloader @@ -37,8 +38,8 @@ class CryptStreamDownloader(object): implements(IStreamDownloader) - def __init__(self, peer_finder, rate_limiter, blob_manager, - payment_rate_manager, wallet): + def __init__(self, peer_finder, rate_limiter, blob_manager, payment_rate_manager, wallet, + key, stream_name): """Initialize a CryptStreamDownloader @param peer_finder: An object which implements the IPeerFinder @@ -61,8 +62,8 @@ class CryptStreamDownloader(object): self.blob_manager = blob_manager self.payment_rate_manager = payment_rate_manager self.wallet = wallet - self.key = None - self.stream_name = None + self.key = binascii.unhexlify(key) + self.stream_name = binascii.unhexlify(stream_name) self.completed = False self.stopped = True self.stopping = False diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 7bdb66323..b628831a2 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -514,7 +514,7 @@ class Daemon(AuthJSONRPCServer): @defer.inlineCallbacks def _setup_lbry_file_manager(self): - log.info('Starting to setup up file manager') + log.info('Starting the file manager') self.startup_status = STARTUP_STAGES[3] self.stream_info_manager = DBEncryptedFileMetadataManager(self.db_dir) self.lbry_file_manager = EncryptedFileManager( @@ -670,8 +670,7 @@ class Daemon(AuthJSONRPCServer): self.streams[sd_hash] = GetStream(self.sd_identifier, self.session, self.exchange_rate_manager, self.max_key_fee, self.disable_max_key_fee, - conf.settings['data_rate'], timeout, - file_name) + conf.settings['data_rate'], timeout) try: lbry_file, finished_deferred = yield self.streams[sd_hash].start(claim_dict, name) yield self.stream_info_manager.save_outpoint_to_file(lbry_file.rowid, txid, nout) @@ -917,7 +916,7 @@ class Daemon(AuthJSONRPCServer): defer.returnValue(lbry_file) @defer.inlineCallbacks - def _get_lbry_files(self, return_json=False, full_status=False, **kwargs): + def _get_lbry_files(self, return_json=False, full_status=True, **kwargs): lbry_files = list(self.lbry_file_manager.lbry_files) if kwargs: for search_type, value in iter_lbry_file_search_values(kwargs): @@ -2041,6 +2040,8 @@ class Daemon(AuthJSONRPCServer): 'claim_address': claim_address, 'change_address': change_address, 'claim_dict': claim_dict, + 'channel_id': channel_id, + 'channel_name': channel_name }) if channel_id: diff --git a/lbrynet/daemon/Downloader.py b/lbrynet/daemon/Downloader.py index e3820521d..0cc4f7454 100644 --- a/lbrynet/daemon/Downloader.py +++ b/lbrynet/daemon/Downloader.py @@ -31,15 +31,13 @@ log = logging.getLogger(__name__) class GetStream(object): def __init__(self, sd_identifier, session, exchange_rate_manager, - max_key_fee, disable_max_key_fee, data_rate=None, timeout=None, - file_name=None): + max_key_fee, disable_max_key_fee, data_rate=None, timeout=None): self.timeout = timeout or conf.settings['download_timeout'] self.data_rate = data_rate or conf.settings['data_rate'] self.max_key_fee = max_key_fee or conf.settings['max_key_fee'][1] self.disable_max_key_fee = disable_max_key_fee or conf.settings['disable_max_key_fee'] self.download_directory = conf.settings['download_directory'] - self.file_name = file_name self.timeout_counter = 0 self.code = None self.sd_hash = None @@ -126,7 +124,6 @@ class GetStream(object): [self.data_rate], self.payment_rate_manager, download_directory=self.download_directory, - file_name=self.file_name ) defer.returnValue(downloader) diff --git a/lbrynet/daemon/Publisher.py b/lbrynet/daemon/Publisher.py index f4e6e5501..569fa64ec 100644 --- a/lbrynet/daemon/Publisher.py +++ b/lbrynet/daemon/Publisher.py @@ -6,6 +6,7 @@ from twisted.internet import defer from lbrynet.core import file_utils from lbrynet.file_manager.EncryptedFileCreator import create_lbry_file +from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.lbry_file.StreamDescriptor import publish_sd_blob @@ -36,13 +37,15 @@ class Publisher(object): read_handle) sd_hash = yield publish_sd_blob(self.lbry_file_manager.stream_info_manager, self.session.blob_manager, stream_hash) - self.lbry_file = yield self.lbry_file_manager.add_lbry_file(stream_hash) + status = ManagedEncryptedFileDownloader.STATUS_FINISHED + self.lbry_file = yield self.lbry_file_manager.add_lbry_file(stream_hash, sd_hash, + status=status) if 'source' not in claim_dict['stream']: claim_dict['stream']['source'] = {} claim_dict['stream']['source']['source'] = sd_hash claim_dict['stream']['source']['sourceType'] = 'lbry_sd_hash' claim_dict['stream']['source']['contentType'] = get_content_type(file_path) - claim_dict['stream']['source']['version'] = "_0_0_1" # need current version here + claim_dict['stream']['source']['version'] = "_0_0_1" # need current version here claim_out = yield self.make_claim(name, bid, claim_dict, claim_address, change_address) self.lbry_file.completed = True diff --git a/lbrynet/file_manager/EncryptedFileDownloader.py b/lbrynet/file_manager/EncryptedFileDownloader.py index 38445cdbe..6e7491be6 100644 --- a/lbrynet/file_manager/EncryptedFileDownloader.py +++ b/lbrynet/file_manager/EncryptedFileDownloader.py @@ -27,28 +27,24 @@ def log_status(sd_hash, status): status_string = "finished" else: status_string = "unknown" - log.info("stream %s is %s", short_hash(sd_hash), status_string) + log.debug("stream %s is %s", short_hash(sd_hash), status_string) class ManagedEncryptedFileDownloader(EncryptedFileSaver): STATUS_RUNNING = "running" STATUS_STOPPED = "stopped" STATUS_FINISHED = "finished" - """ - These are started by EncryptedFileManager, aka, file_manager - """ - def __init__(self, rowid, stream_hash, peer_finder, rate_limiter, - blob_manager, stream_info_manager, lbry_file_manager, - payment_rate_manager, wallet, download_directory, - file_name=None): + def __init__(self, rowid, stream_hash, peer_finder, rate_limiter, blob_manager, + stream_info_manager, lbry_file_manager, payment_rate_manager, wallet, + download_directory, sd_hash=None, key=None, stream_name=None, + suggested_file_name=None): EncryptedFileSaver.__init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager, payment_rate_manager, wallet, - download_directory, - file_name) - + download_directory, key, stream_name, suggested_file_name) + self.sd_hash = sd_hash self.rowid = rowid self.lbry_file_manager = lbry_file_manager self._saving_status = False @@ -57,22 +53,16 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): def saving_status(self): return self._saving_status - @defer.inlineCallbacks - def restore(self): - - status = yield self.lbry_file_manager.get_lbry_file_status(self) - log_status(self.sd_hash, status) - + def restore(self, status): if status == ManagedEncryptedFileDownloader.STATUS_RUNNING: # start returns self.finished_deferred # which fires when we've finished downloading the file # and we don't want to wait for the entire download self.start() elif status == ManagedEncryptedFileDownloader.STATUS_STOPPED: - defer.returnValue(False) + pass elif status == ManagedEncryptedFileDownloader.STATUS_FINISHED: self.completed = True - defer.returnValue(True) else: raise Exception("Unknown status for stream %s: %s" % (self.stream_hash, status)) @@ -147,8 +137,7 @@ class ManagedEncryptedFileDownloaderFactory(object): return True @defer.inlineCallbacks - def make_downloader(self, metadata, options, payment_rate_manager, download_directory=None, - file_name=None): + def make_downloader(self, metadata, options, payment_rate_manager, download_directory=None): assert len(options) == 1 data_rate = options[0] stream_hash = yield save_sd_info(self.lbry_file_manager.stream_info_manager, @@ -156,9 +145,11 @@ class ManagedEncryptedFileDownloaderFactory(object): if metadata.metadata_source == StreamMetadata.FROM_BLOB: yield self.lbry_file_manager.save_sd_blob_hash_to_stream(stream_hash, metadata.source_blob_hash) - lbry_file = yield self.lbry_file_manager.add_lbry_file(stream_hash, payment_rate_manager, + lbry_file = yield self.lbry_file_manager.add_lbry_file(stream_hash, + metadata.source_blob_hash, + payment_rate_manager, data_rate, - download_directory, file_name) + download_directory) defer.returnValue(lbry_file) @staticmethod diff --git a/lbrynet/file_manager/EncryptedFileManager.py b/lbrynet/file_manager/EncryptedFileManager.py index 5c6f2a349..9d022c1d7 100644 --- a/lbrynet/file_manager/EncryptedFileManager.py +++ b/lbrynet/file_manager/EncryptedFileManager.py @@ -12,7 +12,7 @@ from lbrynet.reflector.reupload import reflect_stream from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory -from lbrynet.lbry_file.StreamDescriptor import EncryptedFileStreamType +from lbrynet.lbry_file.StreamDescriptor import EncryptedFileStreamType, get_sd_info from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError from lbrynet.cryptstream.client.CryptStreamDownloader import CurrentlyStoppingError from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call @@ -50,8 +50,7 @@ class EncryptedFileManager(object): def setup(self): yield self.stream_info_manager.setup() yield self._add_to_sd_identifier() - # don't block on starting the lbry files - self._start_lbry_files() + yield self._start_lbry_files() log.info("Started file manager") def get_lbry_file_status(self, lbry_file): @@ -86,62 +85,11 @@ class EncryptedFileManager(object): self.sd_identifier.add_stream_downloader_factory( EncryptedFileStreamType, downloader_factory) - @defer.inlineCallbacks - def _check_stream_is_managed(self, stream_hash): - # check that all the streams in the stream_info_manager are also - # tracked by lbry_file_manager and fix any streams that aren't. - rowid = yield self._get_rowid_for_stream_hash(stream_hash) - if rowid is not None: - defer.returnValue(True) - rate = self.session.base_payment_rate_manager.min_blob_data_payment_rate - key, stream_name, file_name = yield self.stream_info_manager.get_stream_info(stream_hash) - log.warning("Trying to fix missing lbry file for %s", stream_name.decode('hex')) - yield self._save_lbry_file(stream_hash, rate) - - @defer.inlineCallbacks - def _check_stream_info_manager(self): - def _iter_streams(stream_hashes): - for stream_hash in stream_hashes: - yield self._check_stream_is_managed(stream_hash) - - stream_hashes = yield self.stream_info_manager.get_all_streams() - log.debug("Checking %s streams", len(stream_hashes)) - yield defer.DeferredList(list(_iter_streams(stream_hashes))) - - @defer.inlineCallbacks - def _start_lbry_files(self): - yield self._check_stream_info_manager() - files_and_options = yield self._get_all_lbry_files() - yield defer.DeferredList([ - self._set_options_and_restore(rowid, stream_hash, options) - for rowid, stream_hash, options in files_and_options - ]) - - if self.auto_re_reflect is True: - safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval) - log.info("Started %i lbry files", len(self.lbry_files)) - - @defer.inlineCallbacks - def _set_options_and_restore(self, rowid, stream_hash, options): - try: - b_prm = self.session.base_payment_rate_manager - payment_rate_manager = NegotiatedPaymentRateManager( - b_prm, self.session.blob_tracker) - downloader = yield self.start_lbry_file( - rowid, stream_hash, payment_rate_manager, blob_data_rate=options) - yield downloader.restore() - except Exception: - log.error('An error occurred while starting a lbry file (%s, %s, %s)', - rowid, stream_hash, options) - - @defer.inlineCallbacks - def start_lbry_file(self, rowid, stream_hash, - payment_rate_manager, blob_data_rate=None, - download_directory=None, file_name=None): - if not download_directory: - download_directory = self.download_directory - payment_rate_manager.min_blob_data_payment_rate = blob_data_rate - lbry_file_downloader = ManagedEncryptedFileDownloader( + def _get_lbry_file(self, rowid, stream_hash, payment_rate_manager, sd_hash, key, + stream_name, suggested_file_name, download_directory=None): + download_directory = download_directory or self.download_directory + payment_rate_manager = payment_rate_manager or self.session.payment_rate_manager + return ManagedEncryptedFileDownloader( rowid, stream_hash, self.session.peer_finder, @@ -152,17 +100,46 @@ class EncryptedFileManager(object): payment_rate_manager, self.session.wallet, download_directory, - file_name=file_name + sd_hash=sd_hash, + key=key, + stream_name=stream_name, + suggested_file_name=suggested_file_name ) - yield lbry_file_downloader.set_stream_info() - self.lbry_files.append(lbry_file_downloader) - defer.returnValue(lbry_file_downloader) + + @defer.inlineCallbacks + def _start_lbry_files(self): + files_and_options = yield self._get_all_lbry_files() + stream_infos = yield self.stream_info_manager._get_all_stream_infos() + b_prm = self.session.base_payment_rate_manager + payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker) + log.info("Trying to start %i files", len(stream_infos)) + for i, (rowid, stream_hash, blob_data_rate, status) in enumerate(files_and_options): + if len(files_and_options) > 500 and i % 500 == 0: + log.info("Started %i/%i files", i, len(stream_infos)) + if stream_hash in stream_infos: + lbry_file = self._get_lbry_file(rowid, stream_hash, payment_rate_manager, + stream_infos[stream_hash]['sd_hash'], + stream_infos[stream_hash]['key'], + stream_infos[stream_hash]['stream_name'], + stream_infos[stream_hash]['suggested_file_name']) + log.info("initialized file %s", lbry_file.stream_name) + try: + # restore will raise an Exception if status is unknown + lbry_file.restore(status) + self.lbry_files.append(lbry_file) + except Exception: + log.warning("Failed to start %i", rowid) + continue + log.info("Started %i lbry files", len(self.lbry_files)) + if self.auto_re_reflect is True: + safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval) @defer.inlineCallbacks def _stop_lbry_file(self, lbry_file): def wait_for_finished(lbry_file, count=2): if count or lbry_file.saving_status is not False: - return task.deferLater(reactor, 1, self._stop_lbry_file, lbry_file, count=count - 1) + return task.deferLater(reactor, 1, self._stop_lbry_file, lbry_file, + count=count - 1) try: yield lbry_file.stop(change_status=False) self.lbry_files.remove(lbry_file) @@ -180,14 +157,18 @@ class EncryptedFileManager(object): yield self._stop_lbry_file(lbry_file) @defer.inlineCallbacks - def add_lbry_file(self, stream_hash, payment_rate_manager=None, blob_data_rate=None, - download_directory=None, file_name=None): - if not payment_rate_manager: - payment_rate_manager = self.session.payment_rate_manager + def add_lbry_file(self, stream_hash, sd_hash, payment_rate_manager=None, blob_data_rate=None, + download_directory=None, status=None): rowid = yield self._save_lbry_file(stream_hash, blob_data_rate) - lbry_file = yield self.start_lbry_file(rowid, stream_hash, payment_rate_manager, - blob_data_rate, download_directory, - file_name) + stream_metadata = yield get_sd_info(self.stream_info_manager, + stream_hash, False) + key = stream_metadata['key'] + stream_name = stream_metadata['stream_name'] + suggested_file_name = stream_metadata['suggested_file_name'] + lbry_file = self._get_lbry_file(rowid, stream_hash, payment_rate_manager, sd_hash, key, + stream_name, suggested_file_name, download_directory) + lbry_file.restore(status or ManagedEncryptedFileDownloader.STATUS_STOPPED) + self.lbry_files.append(lbry_file) defer.returnValue(lbry_file) @defer.inlineCallbacks diff --git a/lbrynet/lbry_file/EncryptedFileMetadataManager.py b/lbrynet/lbry_file/EncryptedFileMetadataManager.py index ce1340c82..be189fdb6 100644 --- a/lbrynet/lbry_file/EncryptedFileMetadataManager.py +++ b/lbrynet/lbry_file/EncryptedFileMetadataManager.py @@ -1,6 +1,6 @@ +import os import logging import sqlite3 -import os from twisted.internet import defer from twisted.python.failure import Failure from twisted.enterprise import adbapi @@ -207,6 +207,30 @@ class DBEncryptedFileMetadataManager(object): d.addCallback(get_result) return d + @rerun_if_locked + @defer.inlineCallbacks + def _get_all_stream_infos(self): + file_results = yield self.db_conn.runQuery("select rowid, * from lbry_files") + descriptor_results = yield self.db_conn.runQuery("select stream_hash, sd_blob_hash " + "from lbry_file_descriptors") + response = {} + for (stream_hash, sd_hash) in descriptor_results: + if stream_hash in response: + log.warning("Duplicate stream %s (sd: %s)", stream_hash, sd_hash[:16]) + continue + response[stream_hash] = { + 'sd_hash': sd_hash + } + for (rowid, stream_hash, key, stream_name, suggested_file_name) in file_results: + if stream_hash not in response: + log.warning("Missing sd hash for %s", stream_hash) + continue + response[stream_hash]['rowid'] = rowid + response[stream_hash]['key'] = key + response[stream_hash]['stream_name'] = stream_name + response[stream_hash]['suggested_file_name'] = suggested_file_name + defer.returnValue(response) + @rerun_if_locked def _check_if_stream_exists(self, stream_hash): d = self.db_conn.runQuery( @@ -321,8 +345,8 @@ class DBEncryptedFileMetadataManager(object): @rerun_if_locked def _get_all_lbry_files(self): - d = self.db_conn.runQuery("select rowid, stream_hash, " - "blob_data_rate from lbry_file_options") + d = self.db_conn.runQuery("select rowid, stream_hash, blob_data_rate, status " + "from lbry_file_options") return d @rerun_if_locked diff --git a/lbrynet/lbry_file/client/EncryptedFileDownloader.py b/lbrynet/lbry_file/client/EncryptedFileDownloader.py index 9445ccaa4..17b3cf501 100644 --- a/lbrynet/lbry_file/client/EncryptedFileDownloader.py +++ b/lbrynet/lbry_file/client/EncryptedFileDownloader.py @@ -6,7 +6,6 @@ from lbrynet.lbry_file.StreamDescriptor import save_sd_info from lbrynet.cryptstream.client.CryptStreamDownloader import CryptStreamDownloader from lbrynet.core.client.StreamProgressManager import FullStreamProgressManager from lbrynet.core.StreamDescriptor import StreamMetadata -from lbrynet.core.Error import NoSuchStreamHash from lbrynet.interfaces import IStreamDownloaderFactory from lbrynet.lbry_file.client.EncryptedFileMetadataHandler import EncryptedFileMetadataHandler import os @@ -22,29 +21,14 @@ class EncryptedFileDownloader(CryptStreamDownloader): """Classes which inherit from this class download LBRY files""" def __init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, - stream_info_manager, payment_rate_manager, wallet): + stream_info_manager, payment_rate_manager, wallet, key, stream_name, + suggested_file_name=None): CryptStreamDownloader.__init__(self, peer_finder, rate_limiter, blob_manager, - payment_rate_manager, wallet) + payment_rate_manager, wallet, key, stream_name) self.stream_hash = stream_hash self.stream_info_manager = stream_info_manager - self.suggested_file_name = None + self.suggested_file_name = binascii.unhexlify(suggested_file_name) self._calculated_total_bytes = None - self.sd_hash = None - - @defer.inlineCallbacks - def set_stream_info(self): - if self.key is None: - out = yield self.stream_info_manager.get_stream_info(self.stream_hash) - key, stream_name, suggested_file_name = out - self.key = binascii.unhexlify(key) - self.stream_name = binascii.unhexlify(stream_name) - self.suggested_file_name = binascii.unhexlify(suggested_file_name) - - out = yield self.stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash) - if out: - self.sd_hash = out[0] - else: - raise NoSuchStreamHash(self.stream_hash) def delete_data(self): d1 = self.stream_info_manager.get_blobs_for_stream(self.stream_hash) @@ -171,13 +155,13 @@ class EncryptedFileDownloaderFactory(object): class EncryptedFileSaver(EncryptedFileDownloader): def __init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager, - payment_rate_manager, wallet, download_directory, file_name=None): - EncryptedFileDownloader.__init__(self, stream_hash, - peer_finder, rate_limiter, - blob_manager, stream_info_manager, - payment_rate_manager, wallet) + payment_rate_manager, wallet, download_directory, key, stream_name, + suggested_file_name): + EncryptedFileDownloader.__init__(self, stream_hash, peer_finder, rate_limiter, + blob_manager, stream_info_manager, payment_rate_manager, + wallet, key, stream_name, suggested_file_name) self.download_directory = download_directory - self.file_name = file_name + self.file_name = os.path.basename(self.suggested_file_name) self.file_written_to = None self.file_handle = None @@ -187,19 +171,6 @@ class EncryptedFileSaver(EncryptedFileDownloader): else: return str(self.file_name) - def set_stream_info(self): - d = EncryptedFileDownloader.set_stream_info(self) - - def set_file_name(): - if self.file_name is None: - if self.suggested_file_name: - self.file_name = os.path.basename(self.suggested_file_name) - else: - self.file_name = os.path.basename(self.stream_name) - - d.addCallback(lambda _: set_file_name()) - return d - def stop(self, err=None): d = EncryptedFileDownloader.stop(self, err=err) d.addCallback(lambda _: self._delete_from_info_manager()) @@ -273,11 +244,14 @@ class EncryptedFileSaverFactory(EncryptedFileDownloaderFactory): self.download_directory = download_directory def _make_downloader(self, stream_hash, payment_rate_manager, stream_info): - return EncryptedFileSaver(stream_hash, self.peer_finder, - self.rate_limiter, self.blob_manager, - self.stream_info_manager, - payment_rate_manager, self.wallet, - self.download_directory) + stream_name = stream_info.raw_info['stream_name'] + key = stream_info.raw_info['key'] + suggested_file_name = stream_info.raw_info['suggested_file_name'] + return EncryptedFileSaver(stream_hash, self.peer_finder, self.rate_limiter, + self.blob_manager, self.stream_info_manager, + payment_rate_manager, self.wallet, self.download_directory, + key=key, stream_name=stream_name, + suggested_file_name=suggested_file_name) @staticmethod def get_description(): diff --git a/lbrynet/reflector/server/server.py b/lbrynet/reflector/server/server.py index 995c35141..74e457c1d 100644 --- a/lbrynet/reflector/server/server.py +++ b/lbrynet/reflector/server/server.py @@ -108,7 +108,7 @@ class ReflectorServer(Protocol): yield save_sd_info(self.stream_info_manager, sd_info) yield self.stream_info_manager.save_sd_blob_hash_to_stream(sd_info['stream_hash'], blob.blob_hash) - self.lbry_file_manager.add_lbry_file(sd_info['stream_hash']) + yield self.lbry_file_manager.add_lbry_file(sd_info['stream_hash'], blob.blob_hash) should_announce = True # if we already have the head blob, set it to be announced now that we know it's diff --git a/lbrynet/tests/functional/test_streamify.py b/lbrynet/tests/functional/test_streamify.py index 9fe4a29c1..31e5a7dad 100644 --- a/lbrynet/tests/functional/test_streamify.py +++ b/lbrynet/tests/functional/test_streamify.py @@ -144,9 +144,10 @@ class TestStreamify(TestCase): d = lbry_file.start() return d - def combine_stream(stream_hash): + def combine_stream(info): + stream_hash, sd_hash = info prm = self.session.payment_rate_manager - d = self.lbry_file_manager.add_lbry_file(stream_hash, prm) + d = self.lbry_file_manager.add_lbry_file(stream_hash, sd_hash, prm) d.addCallback(start_lbry_file) def check_md5_sum(): @@ -163,8 +164,9 @@ class TestStreamify(TestCase): test_file = GenFile(53209343, b''.join([chr(i + 5) for i in xrange(0, 64, 6)])) stream_hash = yield create_lbry_file(self.session, self.lbry_file_manager, "test_file", test_file, suggested_file_name="test_file") - yield publish_sd_blob(self.stream_info_manager, self.session.blob_manager, stream_hash) - defer.returnValue(stream_hash) + sd_hash = yield publish_sd_blob(self.stream_info_manager, self.session.blob_manager, + stream_hash) + defer.returnValue((stream_hash, sd_hash)) d = self.session.setup() d.addCallback(lambda _: self.stream_info_manager.setup()) diff --git a/lbrynet/tests/unit/lbryfile/client/test_EncryptedFileDownloader.py b/lbrynet/tests/unit/lbryfile/client/test_EncryptedFileDownloader.py index 036ab3a3e..bc5a65251 100644 --- a/lbrynet/tests/unit/lbryfile/client/test_EncryptedFileDownloader.py +++ b/lbrynet/tests/unit/lbryfile/client/test_EncryptedFileDownloader.py @@ -10,6 +10,7 @@ class TestEncryptedFileSaver(unittest.TestCase): @defer.inlineCallbacks def test_setup_output(self): file_name = 'encrypted_file_saver_test.tmp' + file_name_hex = file_name.encode('hex') self.assertFalse(os.path.isfile(file_name)) # create file in the temporary trial folder @@ -21,13 +22,13 @@ class TestEncryptedFileSaver(unittest.TestCase): payment_rate_manager = None wallet = None download_directory = '.' - upload_allowed = False - saver = EncryptedFileSaver( - stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager, - payment_rate_manager, wallet, download_directory, file_name) + key = '' + + saver = EncryptedFileSaver(stream_hash, peer_finder, rate_limiter, blob_manager, + stream_info_manager, payment_rate_manager, wallet, + download_directory, key, + file_name_hex, file_name_hex) yield saver._setup_output() self.assertTrue(os.path.isfile(file_name)) saver._close_output() - - diff --git a/lbrynet/tests/unit/lbrynet_daemon/test_Downloader.py b/lbrynet/tests/unit/lbrynet_daemon/test_Downloader.py index 1a650f495..c8ef2feb4 100644 --- a/lbrynet/tests/unit/lbrynet_daemon/test_Downloader.py +++ b/lbrynet/tests/unit/lbrynet_daemon/test_Downloader.py @@ -103,7 +103,7 @@ class GetStreamTests(unittest.TestCase): DownloadTimeoutError is raised """ def download_sd_blob(self): - raise DownloadSDTimeout(self.file_name) + raise DownloadSDTimeout(self) getstream = self.init_getstream_with_mocs() getstream._initialize = types.MethodType(moc_initialize, getstream)