diff --git a/CHANGELOG.md b/CHANGELOG.md index 89e8eabe2..139dcba1f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,7 +16,16 @@ at anytime. ## [0.8.3rc0] - 2017-02-10 ### Changed - * add uri to stream reflector to de-obfuscate reflector logs + * Convert EncryptedFileDownloader to inlineCallbacks + * Convert EncryptedFileManager to use inlineCallbacks + * Convert Daemon._delete_lbry_file to inlineCallbacks + * Add uri to stream reflector to de-obfuscate reflector logs + * Simplify lbrynet.lbrynet_daemon.Publisher + * Reflect streams in file manager looping call rather than in each file + * Convert GetStream to inclineCallbacks + * Change callback condition in GetStream to the first data blob completing + * Add local and remote heights to blockchain status + ### Fixed * Fix recursion depth error upon failed blob * Call stopProducing in reflector client file_sender when uploading is done @@ -24,6 +33,10 @@ at anytime. * Fixed file_delete not deleting data from stream_info_manager [#470](https://github.com/lbryio/lbry/issues/470) * Fixed upload of bug reports to Slack ([#472](https://github.com/lbryio/lbry/issues/472)) * Fixed claim updates [#473](https://github.com/lbryio/lbry/issues/473) + * Handle ConnectionLost error in reflector client + * Fix updating a claim where the stream doesn't change + * Fix claim_abandon + ## [0.8.1] - 2017-02-01 ### Changed diff --git a/lbrynet/core/Wallet.py b/lbrynet/core/Wallet.py index f02a4dcfa..0aefe563e 100644 --- a/lbrynet/core/Wallet.py +++ b/lbrynet/core/Wallet.py @@ -482,7 +482,7 @@ class Wallet(object): return defer.succeed(None) if 'error' in result: - log.warning("Got an error looking up a name: %s", result['error']) + log.warning("Got an error looking up lbry://%s: %s", name, result['error']) return Failure(UnknownNameError(name)) _check_result_fields(result) try: @@ -657,18 +657,20 @@ class Wallet(object): yield self._save_name_metadata(name, claim_outpoint, _metadata['sources']['lbry_sd_hash']) defer.returnValue(claim) + @defer.inlineCallbacks def abandon_claim(self, txid, nout): def _parse_abandon_claim_out(claim_out): if not claim_out['success']: - msg = 'Abandon of {}:{} failed: {}'.format(txid, nout, claim_out['resason']) + msg = 'Abandon of {}:{} failed: {}'.format(txid, nout, claim_out['reason']) raise Exception(msg) claim_out = self._process_claim_out(claim_out) + log.info("Abandoned claim tx %s (n: %i) --> %s", txid, nout, claim_out) return defer.succeed(claim_out) claim_outpoint = ClaimOutpoint(txid, nout) - d = self._abandon_claim(claim_outpoint) - d.addCallback(lambda claim_out: _parse_abandon_claim_out(claim_out)) - return d + claim_out = yield self._abandon_claim(claim_outpoint) + result = yield _parse_abandon_claim_out(claim_out) + defer.returnValue(result) def support_claim(self, name, claim_id, amount): def _parse_support_claim_out(claim_out): diff --git a/lbrynet/core/file_utils.py b/lbrynet/core/file_utils.py index d779fa674..b7611763a 100644 --- a/lbrynet/core/file_utils.py +++ b/lbrynet/core/file_utils.py @@ -1,6 +1,8 @@ import os import sys import subprocess +from contextlib import contextmanager + def start(path): """ @@ -33,3 +35,18 @@ def reveal(path): subprocess.Popen(['xdg-open', os.path.dirname(path)]) elif sys.platform == 'win32': subprocess.Popen(['explorer', '/select', path]) + + +@contextmanager +def get_read_handle(path): + """ + Get os independent read handle for a file + """ + + if os.name == "nt": + file_mode = 'rb' + else: + file_mode = 'r' + read_handle = open(path, file_mode) + yield read_handle + read_handle.close() diff --git a/lbrynet/lbryfile/StreamDescriptor.py b/lbrynet/lbryfile/StreamDescriptor.py index 464c763c8..a114acc5f 100644 --- a/lbrynet/lbryfile/StreamDescriptor.py +++ b/lbrynet/lbryfile/StreamDescriptor.py @@ -80,19 +80,13 @@ def get_sd_info(stream_info_manager, stream_hash, include_blobs): return d +@defer.inlineCallbacks def publish_sd_blob(stream_info_manager, blob_manager, stream_hash): descriptor_writer = BlobStreamDescriptorWriter(blob_manager) - - d = get_sd_info(stream_info_manager, stream_hash, True) - d.addCallback(descriptor_writer.create_descriptor) - - def add_sd_blob_to_stream(sd_blob_hash): - d = stream_info_manager.save_sd_blob_hash_to_stream(stream_hash, sd_blob_hash) - d.addCallback(lambda _: sd_blob_hash) - return d - - d.addCallback(add_sd_blob_to_stream) - return d + sd_info = yield get_sd_info(stream_info_manager, stream_hash, True) + sd_blob_hash = yield descriptor_writer.create_descriptor(sd_info) + yield stream_info_manager.save_sd_blob_hash_to_stream(stream_hash, sd_blob_hash) + defer.returnValue(sd_blob_hash) def create_plain_sd(stream_info_manager, stream_hash, file_name, overwrite_existing=False): diff --git a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py index 85bc46dd2..5b69d982d 100644 --- a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py +++ b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py @@ -1,7 +1,6 @@ """ Download LBRY Files from LBRYnet and save them to disk. """ -import random import logging from zope.interface import implements @@ -14,8 +13,6 @@ from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileDownloa from lbrynet.lbryfilemanager.EncryptedFileStatusReport import EncryptedFileStatusReport from lbrynet.interfaces import IStreamDownloaderFactory from lbrynet.lbryfile.StreamDescriptor import save_sd_info -from lbrynet.reflector import reupload -from lbrynet import conf log = logging.getLogger(__name__) @@ -48,57 +45,28 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): def saving_status(self): return self._saving_status + @defer.inlineCallbacks def restore(self): - d = self.stream_info_manager._get_sd_blob_hashes_for_stream(self.stream_hash) - - def _save_stream_info(sd_hash): - if sd_hash: - self.sd_hash = sd_hash[0] - d = self.wallet.get_claim_metadata_for_sd_hash(self.sd_hash) - d.addCallback(lambda r: _save_claim(r[0], r[1], r[2])) - return d - else: - return None - - def _save_claim_id(claim_id): - self.claim_id = claim_id - return defer.succeed(None) - - def _notify_bad_claim(name, txid, nout): - err_msg = "Error loading name claim for lbry file: \ - lbry://%s, tx %s output %i does not contain a valid claim, deleting it" - log.error(err_msg, name, txid, nout) - return self.lbry_file_manager.delete_lbry_file(self) - - def _save_claim(name, txid, nout): - self.uri = name - self.txid = txid - self.nout = nout - d = self.wallet.get_claimid(name, txid, nout) - d.addCallbacks(_save_claim_id, lambda err: _notify_bad_claim(name, txid, nout)) - return d - - d.addCallback(_save_stream_info) - d.addCallback(lambda _: self._reupload()) - d.addCallback(lambda _: self.lbry_file_manager.get_lbry_file_status(self)) - - def restore_status(status): - if status == ManagedEncryptedFileDownloader.STATUS_RUNNING: - return self.start() - elif status == ManagedEncryptedFileDownloader.STATUS_STOPPED: - return defer.succeed(False) - elif status == ManagedEncryptedFileDownloader.STATUS_FINISHED: - self.completed = True - return defer.succeed(True) - - d.addCallback(restore_status) - return d - - def _reupload(self): - if not conf.settings['reflector_reupload']: - return - reflector_server = random.choice(conf.settings['reflector_servers']) - return reupload.check_and_restore_availability(self, reflector_server) + sd_hash = yield self.stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash) + if sd_hash: + self.sd_hash = sd_hash[0] + else: + raise Exception("No sd hash for stream hash %s", self.stream_hash) + claim_metadata = yield self.wallet.get_claim_metadata_for_sd_hash(self.sd_hash) + if claim_metadata is None: + raise Exception("A claim doesn't exist for sd %s" % self.sd_hash) + self.uri, self.txid, self.nout = claim_metadata + self.claim_id = yield self.wallet.get_claimid(self.uri, self.txid, self.nout) + status = yield self.lbry_file_manager.get_lbry_file_status(self) + if status == ManagedEncryptedFileDownloader.STATUS_RUNNING: + yield self.start() + elif status == ManagedEncryptedFileDownloader.STATUS_STOPPED: + defer.returnValue(False) + elif status == ManagedEncryptedFileDownloader.STATUS_FINISHED: + self.completed = True + defer.returnValue(True) + else: + raise Exception("Unknown status for stream %s: %s", self.stream_hash, status) @defer.inlineCallbacks def stop(self, err=None, change_status=True): @@ -107,34 +75,24 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): yield EncryptedFileDownloader.stop(self, err=err) if change_status is True: status = yield self._save_status() + defer.returnValue(status) + @defer.inlineCallbacks def status(self): - def find_completed_blobhashes(blobs): - blobhashes = [b[0] for b in blobs if b[0] is not None] + blobs = yield self.stream_info_manager.get_blobs_for_stream(self.stream_hash) + blob_hashes = [b[0] for b in blobs if b[0] is not None] + completed_blobs = yield self.blob_manager.completed_blobs(blob_hashes) + num_blobs_completed = len(completed_blobs) + num_blobs_known = len(blob_hashes) - def get_num_completed(completed_blobs): - return len(completed_blobs), len(blobhashes) - - inner_d = self.blob_manager.completed_blobs(blobhashes) - inner_d.addCallback(get_num_completed) - return inner_d - - def make_full_status(progress): - num_completed = progress[0] - num_known = progress[1] - if self.completed is True: - s = "completed" - elif self.stopped is True: - s = "stopped" - else: - s = "running" - status = EncryptedFileStatusReport(self.file_name, num_completed, num_known, s) - return status - - d = self.stream_info_manager.get_blobs_for_stream(self.stream_hash) - d.addCallback(find_completed_blobhashes) - d.addCallback(make_full_status) - return d + if self.completed: + status = "completed" + elif self.stopped: + status = "stopped" + else: + status = "running" + defer.returnValue(EncryptedFileStatusReport(self.file_name, num_blobs_completed, + num_blobs_known, status)) @defer.inlineCallbacks def _start(self): @@ -166,8 +124,9 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): status = ManagedEncryptedFileDownloader.STATUS_STOPPED else: status = ManagedEncryptedFileDownloader.STATUS_RUNNING - yield self.lbry_file_manager.change_lbry_file_status(self, status) + status = yield self.lbry_file_manager.change_lbry_file_status(self, status) self._saving_status = False + defer.returnValue(status) def _get_progress_manager(self, download_manager): return FullStreamProgressManager(self._finished_downloading, @@ -181,35 +140,23 @@ class ManagedEncryptedFileDownloaderFactory(object): self.lbry_file_manager = lbry_file_manager def can_download(self, sd_validator): + # TODO: add a sd_validator for non live streams, use it return True - def make_downloader(self, metadata, options, payment_rate_manager, - download_directory=None, file_name=None): + @defer.inlineCallbacks + def make_downloader(self, metadata, options, payment_rate_manager, download_directory=None, + file_name=None): data_rate = options[0] upload_allowed = options[1] - - def save_source_if_blob(stream_hash): - if metadata.metadata_source == StreamMetadata.FROM_BLOB: - # TODO: should never have to dig this deep into a another classes - # members. lbry_file_manager should have a - # save_sd_blob_hash_to_stream method - d = self.lbry_file_manager.stream_info_manager.save_sd_blob_hash_to_stream( - stream_hash, metadata.source_blob_hash) - else: - d = defer.succeed(True) - d.addCallback(lambda _: stream_hash) - return d - - d = save_sd_info(self.lbry_file_manager.stream_info_manager, metadata.validator.raw_info) - d.addCallback(save_source_if_blob) - d.addCallback(lambda stream_hash: self.lbry_file_manager.add_lbry_file( - stream_hash, - payment_rate_manager, - data_rate, - upload_allowed, - download_directory=download_directory, - file_name=file_name)) - return d + stream_hash = yield save_sd_info(self.lbry_file_manager.stream_info_manager, + metadata.validator.raw_info) + if metadata.metadata_source == StreamMetadata.FROM_BLOB: + yield self.lbry_file_manager.save_sd_blob_hash_to_stream(stream_hash, + metadata.source_blob_hash) + lbry_file = yield self.lbry_file_manager.add_lbry_file(stream_hash, payment_rate_manager, + data_rate, upload_allowed, + download_directory, file_name) + defer.returnValue(lbry_file) @staticmethod def get_description(): diff --git a/lbrynet/lbryfilemanager/EncryptedFileManager.py b/lbrynet/lbryfilemanager/EncryptedFileManager.py index 7bf21c769..1fca82272 100644 --- a/lbrynet/lbryfilemanager/EncryptedFileManager.py +++ b/lbrynet/lbryfilemanager/EncryptedFileManager.py @@ -9,6 +9,7 @@ from twisted.enterprise import adbapi from twisted.internet import defer, task, reactor from twisted.python.failure import Failure +from lbrynet.reflector.reupload import reflect_stream from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory @@ -21,6 +22,16 @@ from lbrynet.core.sqlite_helpers import rerun_if_locked log = logging.getLogger(__name__) +def safe_start_looping_call(looping_call, seconds=3600): + if not looping_call.running: + looping_call.start(seconds) + + +def safe_stop_looping_call(looping_call): + if looping_call.running: + looping_call.stop() + + class EncryptedFileManager(object): """Keeps track of currently opened LBRY Files, their options, and their LBRY File specific metadata. @@ -32,11 +43,13 @@ class EncryptedFileManager(object): self.stream_info_manager = stream_info_manager self.sd_identifier = sd_identifier self.lbry_files = [] + self.lbry_files_setup_deferred = None self.sql_db = None if download_directory: self.download_directory = download_directory else: self.download_directory = os.getcwd() + self.lbry_file_reflector = task.LoopingCall(self.reflect_lbry_files) log.debug("Download directory for EncryptedFileManager: %s", str(self.download_directory)) @defer.inlineCallbacks @@ -44,6 +57,7 @@ class EncryptedFileManager(object): yield self._open_db() yield self._add_to_sd_identifier() yield self._start_lbry_files() + safe_start_looping_call(self.lbry_file_reflector) def get_lbry_file_status(self, lbry_file): return self._get_lbry_file_status(lbry_file.rowid) @@ -69,6 +83,9 @@ class EncryptedFileManager(object): dl.addCallback(filter_failures) return dl + def save_sd_blob_hash_to_stream(self, stream_hash, sd_hash): + return self.stream_info_manager.save_sd_blob_hash_to_stream(stream_hash, sd_hash) + def _add_to_sd_identifier(self): downloader_factory = ManagedEncryptedFileDownloaderFactory(self) self.sd_identifier.add_stream_downloader_factory( @@ -96,65 +113,78 @@ class EncryptedFileManager(object): log.debug("Checking %s streams", len(stream_hashes)) yield defer.DeferredList(list(_iter_streams(stream_hashes))) + @defer.inlineCallbacks + def _restore_lbry_file(self, lbry_file): + try: + yield lbry_file.restore() + except Exception as err: + log.error("Failed to start stream: %s, error: %s", lbry_file.stream_hash, err) + self.lbry_files.remove(lbry_file) + # TODO: delete stream without claim instead of just removing from manager? + @defer.inlineCallbacks def _start_lbry_files(self): - def set_options_and_restore(rowid, stream_hash, options): - b_prm = self.session.base_payment_rate_manager - payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker) - - d = self.start_lbry_file(rowid, stream_hash, payment_rate_manager, - blob_data_rate=options) - d.addCallback(lambda downloader: downloader.restore()) - return d - - def log_error(err, rowid, stream_hash, options): - log.error("An error occurred while starting a lbry file: %s", err.getErrorMessage()) - log.debug(rowid) - log.debug(stream_hash) - log.debug(options) - - def start_lbry_files(lbry_files_and_options): - for rowid, stream_hash, options in lbry_files_and_options: - d = set_options_and_restore(rowid, stream_hash, options) - d.addErrback(lambda err: log_error(err, rowid, stream_hash, options)) - log.info("Started %i lbry files", len(self.lbry_files)) - return True - + b_prm = self.session.base_payment_rate_manager + payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker) yield self._check_stream_info_manager() - files_and_options = yield self._get_all_lbry_files() - yield start_lbry_files(files_and_options) + lbry_files_and_options = yield self._get_all_lbry_files() + dl = [] + for rowid, stream_hash, options in lbry_files_and_options: + lbry_file = yield self.start_lbry_file(rowid, stream_hash, payment_rate_manager, + blob_data_rate=options) + dl.append(self._restore_lbry_file(lbry_file)) + log.debug("Started %s", lbry_file) + self.lbry_files_setup_deferred = defer.DeferredList(dl) + log.info("Started %i lbry files", len(self.lbry_files)) + defer.returnValue(True) + @defer.inlineCallbacks def start_lbry_file(self, rowid, stream_hash, payment_rate_manager, blob_data_rate=None, upload_allowed=True, download_directory=None, file_name=None): if not download_directory: download_directory = self.download_directory payment_rate_manager.min_blob_data_payment_rate = blob_data_rate - lbry_file_downloader = ManagedEncryptedFileDownloader(rowid, stream_hash, - self.session.peer_finder, - self.session.rate_limiter, - self.session.blob_manager, - self.stream_info_manager, self, - payment_rate_manager, self.session.wallet, - download_directory, - upload_allowed, - file_name=file_name) - self.lbry_files.append(lbry_file_downloader) - d = lbry_file_downloader.set_stream_info() - d.addCallback(lambda _: lbry_file_downloader) - return d + lbry_file = ManagedEncryptedFileDownloader(rowid, stream_hash, self.session.peer_finder, + self.session.rate_limiter, + self.session.blob_manager, + self.stream_info_manager, + self, payment_rate_manager, self.session.wallet, + download_directory, upload_allowed, + file_name=file_name) + yield lbry_file.set_stream_info() + self.lbry_files.append(lbry_file) + defer.returnValue(lbry_file) - def add_lbry_file(self, stream_hash, payment_rate_manager, - blob_data_rate=None, - upload_allowed=True, - download_directory=None, - file_name=None): - d = self._save_lbry_file(stream_hash, blob_data_rate) - d.addCallback( - lambda rowid: self.start_lbry_file( - rowid, stream_hash, payment_rate_manager, - blob_data_rate, upload_allowed, download_directory, file_name)) - return d + @defer.inlineCallbacks + def _stop_lbry_file(self, lbry_file): + def wait_for_finished(lbry_file, count=2): + if count or lbry_file.saving_status is not False: + return task.deferLater(reactor, 1, self._stop_lbry_file, lbry_file, count=count - 1) + try: + yield lbry_file.stop(change_status=False) + self.lbry_files.remove(lbry_file) + except CurrentlyStoppingError: + yield wait_for_finished(lbry_file) + except AlreadyStoppedError: + pass + finally: + defer.returnValue(None) + + def _stop_lbry_files(self): + log.info("Stopping %i lbry files", len(self.lbry_files)) + lbry_files = self.lbry_files + for lbry_file in lbry_files: + yield self._stop_lbry_file(lbry_file) + + @defer.inlineCallbacks + def add_lbry_file(self, stream_hash, payment_rate_manager, blob_data_rate=None, + upload_allowed=True, download_directory=None, file_name=None): + rowid = yield self._save_lbry_file(stream_hash, blob_data_rate) + lbry_file = yield self.start_lbry_file(rowid, stream_hash, payment_rate_manager, + blob_data_rate, upload_allowed, download_directory, + file_name) + defer.returnValue(lbry_file) def delete_lbry_file(self, lbry_file): for l in self.lbry_files: @@ -192,31 +222,22 @@ class EncryptedFileManager(object): else: return defer.fail(Failure(ValueError("Could not find that LBRY file"))) - def stop(self): - log.info('Stopping %s', self) - ds = [] - - def wait_for_finished(lbry_file, count=2): - if count <= 0 or lbry_file.saving_status is False: - return True - else: - return task.deferLater(reactor, 1, wait_for_finished, lbry_file, count=count - 1) - - def ignore_stopped(err, lbry_file): - err.trap(AlreadyStoppedError, CurrentlyStoppingError) - return wait_for_finished(lbry_file) - + def _reflect_lbry_files(self): for lbry_file in self.lbry_files: - d = lbry_file.stop(change_status=False) - d.addErrback(ignore_stopped, lbry_file) - ds.append(d) - dl = defer.DeferredList(ds) + yield reflect_stream(lbry_file) - def close_db(): - self.db = None + @defer.inlineCallbacks + def reflect_lbry_files(self): + yield defer.DeferredList(list(self._reflect_lbry_files())) - dl.addCallback(lambda _: close_db()) - return dl + @defer.inlineCallbacks + def stop(self): + safe_stop_looping_call(self.lbry_file_reflector) + yield defer.DeferredList(list(self._stop_lbry_files())) + yield self.sql_db.close() + self.sql_db = None + log.info("Stopped %s", self) + defer.returnValue(True) def get_count_for_stream_hash(self, stream_hash): return self._get_count_for_stream_hash(stream_hash) diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index cf4d15ef5..8b54148ba 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -2,7 +2,6 @@ import binascii import logging.handlers import mimetypes import os -import random import re import base58 import requests @@ -16,15 +15,16 @@ from twisted.web import server from twisted.internet import defer, threads, error, reactor, task from twisted.internet.task import LoopingCall from twisted.python.failure import Failure -from jsonschema import ValidationError # TODO: importing this when internet is disabled raises a socket.gaierror from lbryum.version import LBRYUM_VERSION as lbryum_version from lbrynet import __version__ as lbrynet_version -from lbrynet import conf, reflector, analytics +from lbrynet import conf, analytics from lbrynet.conf import LBRYCRD_WALLET, LBRYUM_WALLET, PTC_WALLET +from lbrynet.reflector import reupload +from lbrynet.reflector import ServerFactory as reflector_server_factory from lbrynet.metadata.Fee import FeeValidator -from lbrynet.metadata.Metadata import Metadata, verify_name_characters +from lbrynet.metadata.Metadata import verify_name_characters from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaverFactory from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileOpenerFactory from lbrynet.lbryfile.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier @@ -369,7 +369,7 @@ class Daemon(AuthJSONRPCServer): self.connection_status_code = CONNECTION_STATUS_NETWORK # claim_out is dictionary containing 'txid' and 'nout' - def _add_to_pending_claims(self, name, claim_out): + def _add_to_pending_claims(self, claim_out, name): txid = claim_out['txid'] nout = claim_out['nout'] log.info("Adding lbry://%s to pending claims, txid %s nout %d" % (name, txid, nout)) @@ -397,7 +397,7 @@ class Daemon(AuthJSONRPCServer): log.warning("Re-add %s to pending claims", name) txid, nout = self.pending_claims.pop(name) claim_out = {'txid': txid, 'nout': nout} - self._add_to_pending_claims(name, claim_out) + self._add_to_pending_claims(claim_out, name) def _process_lbry_file(name, lbry_file): # lbry_file is an instance of ManagedEncryptedFileDownloader or None @@ -438,13 +438,13 @@ class Daemon(AuthJSONRPCServer): if self.run_reflector_server: log.info("Starting reflector server") if self.reflector_port is not None: - reflector_factory = reflector.ServerFactory( + reflector_factory = reflector_server_factory( self.session.peer_manager, self.session.blob_manager ) try: - self.reflector_server_port = reactor.listenTCP( - self.reflector_port, reflector_factory) + self.reflector_server_port = reactor.listenTCP(self.reflector_port, + reflector_factory) log.info('Started reflector on port %s', self.reflector_port) except error.CannotListenError as e: log.exception("Couldn't bind reflector to port %d", self.reflector_port) @@ -763,8 +763,12 @@ class Daemon(AuthJSONRPCServer): """ timeout = timeout if timeout is not None else conf.settings['download_timeout'] - helper = _DownloadNameHelper( - self, name, timeout, download_directory, file_name, wait_for_write) + try: + helper = _DownloadNameHelper(self, name, timeout, download_directory, file_name, + wait_for_write) + except Exception as err: + log.exception(err) + raise err if not stream_info: self.waiting_on[name] = True @@ -774,6 +778,28 @@ class Daemon(AuthJSONRPCServer): sd_hash, file_path = yield helper.wait_or_get_stream(stream_info, lbry_file) defer.returnValue((sd_hash, file_path)) + @defer.inlineCallbacks + def _publish_stream(self, name, bid, metadata, file_path=None, fee=None): + publisher = Publisher(self.session, self.lbry_file_manager, self.session.wallet) + verify_name_characters(name) + if bid <= 0.0: + raise Exception("Invalid bid") + if fee: + metadata = yield publisher.add_fee_to_metadata(metadata, fee) + if not file_path: + claim_out = yield publisher.update_stream(name, bid, metadata) + else: + claim_out = yield publisher.publish_stream(name, file_path, bid, metadata) + d = reupload.reflect_stream(publisher.lbry_file) + d.addCallbacks(lambda _: log.info("Reflected new publication to lbry://%s", name), + log.exception) + + log.info("Success! Published to lbry://%s txid: %s nout: %d", name, claim_out['txid'], + claim_out['nout']) + yield self._add_to_pending_claims(claim_out, name) + self.looping_call_manager.start(Checker.PENDING_CLAIM, 30) + defer.returnValue(claim_out) + def add_stream(self, name, timeout, download_directory, file_name, stream_info): """Makes, adds and starts a stream""" self.streams[name] = GetStream(self.sd_identifier, @@ -786,8 +812,7 @@ class Daemon(AuthJSONRPCServer): timeout=timeout, download_directory=download_directory, file_name=file_name) - d = self.streams[name].start(stream_info, name) - return d + return self.streams[name].start(stream_info, name) def _get_long_count_timestamp(self): dt = utils.utcnow() - utils.datetime_obj(year=2012, month=12, day=21) @@ -811,33 +836,24 @@ class Daemon(AuthJSONRPCServer): helper = _ResolveNameHelper(self, name, force_refresh) return helper.get_deferred() + @defer.inlineCallbacks def _delete_lbry_file(self, lbry_file, delete_file=True): - d = self.lbry_file_manager.delete_lbry_file(lbry_file) + stream_hash = lbry_file.stream_hash + filename = os.path.join(self.download_directory, lbry_file.file_name) - def finish_deletion(lbry_file): - d = lbry_file.delete_data() - d.addCallback(lambda _: _delete_stream_data(lbry_file)) - return d - - def _delete_stream_data(lbry_file): - s_h = lbry_file.stream_hash - d = self.lbry_file_manager.get_count_for_stream_hash(s_h) - # TODO: could possibly be a timing issue here - d.addCallback(lambda c: self.stream_info_manager.delete_stream(s_h) if c == 0 else True) - if delete_file: - def remove_if_file(): - filename = os.path.join(self.download_directory, lbry_file.file_name) - if os.path.isfile(filename): - os.remove(filename) - else: - return defer.succeed(None) - - d.addCallback(lambda _: remove_if_file) - return d - - d.addCallback(lambda _: finish_deletion(lbry_file)) - d.addCallback(lambda _: log.info("Delete lbry file")) - return d + yield self.lbry_file_manager.delete_lbry_file(lbry_file) + yield lbry_file.delete_data() + stream_count = yield self.lbry_file_manager.get_count_for_stream_hash(stream_hash) + if stream_count == 0: + yield self.stream_info_manager.delete_stream(stream_hash) + else: + log.warning("Can't delete stream info for %s, count is %i", stream_hash, stream_count) + if delete_file: + if os.path.isfile(filename): + os.remove(filename) + log.info("Deleted file %s", filename) + log.info("Deleted stream %s", stream_hash) + defer.returnValue(True) def _get_or_download_sd_blob(self, blob, sd_hash): if blob: @@ -981,29 +997,6 @@ class Daemon(AuthJSONRPCServer): ]) return d - def _reflect(self, lbry_file): - if not lbry_file: - return defer.fail(Exception("no lbry file given to reflect")) - if lbry_file.stream_hash is None: - return defer.fail(Exception("no stream hash")) - factory = reflector.ClientFactory( - self.session.blob_manager, - self.lbry_file_manager.stream_info_manager, - lbry_file.stream_hash, - lbry_file.uri - ) - return run_reflector_factory(factory) - - def _reflect_blobs(self, blob_hashes): - if not blob_hashes: - return defer.fail(Exception("no lbry file given to reflect")) - log.info("Reflecting %i blobs" % len(blob_hashes)) - factory = reflector.BlobClientFactory( - self.session.blob_manager, - blob_hashes - ) - return run_reflector_factory(factory) - ############################################################################ # # # JSON-RPC API methods start here # @@ -1026,7 +1019,7 @@ class Daemon(AuthJSONRPCServer): 'lbry_id': base58.b58encode(self.lbryid)[:SHORT_ID_LEN], 'installation_id': conf.settings.get_installation_id()[:SHORT_ID_LEN], 'is_running': self.announced_startup, - 'is_first_run': self.session.wallet.is_first_run if has_wallet else None, + 'is_first_run': self.session.wallet.is_first_run if has_wallet else None, 'startup_status': { 'code': self.startup_status[0], 'message': self.startup_status[1], @@ -1056,6 +1049,8 @@ class Daemon(AuthJSONRPCServer): local_height = self.session.wallet.network.get_local_height() remote_height = self.session.wallet.network.get_server_height() response['blocks_behind'] = remote_height - local_height + response['local_height'] = local_height + response['remote_height'] = remote_height best_hash = yield self.session.wallet.get_best_blockhash() response['blockchain_status'] = {'best_blockhash': best_hash} defer.returnValue(response) @@ -1502,8 +1497,7 @@ class Daemon(AuthJSONRPCServer): max_tries = 3 while tries <= max_tries: try: - log.info( - 'Making try %s / %s to start download of %s', tries, max_tries, name) + log.info('Making try %s / %s to start download of %s', tries, max_tries, name) new_sd_hash, file_path = yield self._download_name( name=name, timeout=timeout, @@ -1514,10 +1508,10 @@ class Daemon(AuthJSONRPCServer): ) break except Exception as e: - log.exception('Failed to get %s', name) + log.warning('Failed to get %s', name) if tries == max_tries: self.analytics_manager.send_download_errored(download_id, name, stream_info) - response = yield self._render_response(str(e)) + response = yield self._render_response(e.message) defer.returnValue(response) tries += 1 # TODO: should stream_hash key be changed to sd_hash? @@ -1527,7 +1521,7 @@ class Daemon(AuthJSONRPCServer): } stream = self.streams.get(name) if stream: - stream.downloader.finished_deferred.addCallback( + stream.finished_deferred.addCallback( lambda _: self.analytics_manager.send_download_finished( download_id, name, stream_info) ) @@ -1640,16 +1634,17 @@ class Daemon(AuthJSONRPCServer): return d @AuthJSONRPCServer.auth_required - def jsonrpc_publish(self, name, file_path, bid, metadata, fee=None): + def jsonrpc_publish(self, name, bid, metadata, file_path=None, fee=None): """ Make a new name claim and publish associated data to lbrynet Args: - 'name': name to be claimed, string - 'file_path': path to file to be associated with name, string - 'bid': amount of credits to commit in this claim, float - 'metadata': metadata dictionary - optional 'fee' + 'name': str, name to be claimed, string + 'bid': float, amount of credits to commit in this claim, + 'metadata': dict, Metadata compliant (can be missing sources if a file is provided) + 'file_path' (optional): str, path to file to be associated with name, if not given + the stream from your existing claim for the name will be used + 'fee' (optional): dict, FeeValidator compliant Returns: 'success' : True if claim was succesful , False otherwise 'reason' : if not succesful, give reason @@ -1659,17 +1654,6 @@ class Daemon(AuthJSONRPCServer): 'claim_id' : claim id of the resulting transaction """ - def _set_address(address, currency, m): - log.info("Generated new address for key fee: " + str(address)) - m['fee'][currency]['address'] = address - return m - - def _reflect_if_possible(sd_hash, claim_out): - d = self._get_lbry_file(FileID.SD_HASH, sd_hash, return_json=False) - d.addCallback(self._reflect) - d.addCallback(lambda _: claim_out) - return d - log.info("Publish: %s", { 'name': name, 'file_path': file_path, @@ -1677,49 +1661,9 @@ class Daemon(AuthJSONRPCServer): 'metadata': metadata, 'fee': fee, }) - verify_name_characters(name) - if bid <= 0.0: - return defer.fail(Exception("Invalid bid")) - - try: - metadata = Metadata(metadata) - make_lbry_file = False - sd_hash = metadata['sources']['lbry_sd_hash'] - log.info("Update publish for %s using existing stream", name) - except ValidationError: - make_lbry_file = True - sd_hash = None - if not file_path: - raise Exception("No file given to publish") - if not os.path.isfile(file_path): - raise Exception("Specified file for publish doesnt exist: %s" % file_path) - - self.looping_call_manager.start(Checker.PENDING_CLAIM, 30) - - d = self._resolve_name(name, force_refresh=True) - d.addErrback(lambda _: None) - - if fee is not None: - metadata['fee'] = fee - assert len(metadata['fee']) == 1, "Too many fees" - for c in metadata['fee']: - if 'address' not in metadata['fee'][c]: - d.addCallback(lambda _: self.session.wallet.get_new_address()) - d.addCallback(lambda addr: _set_address(addr, c, metadata)) - else: - d.addCallback(lambda _: metadata) - if make_lbry_file: - pub = Publisher(self.session, self.lbry_file_manager, self.session.wallet) - d.addCallback(lambda meta: pub.start(name, file_path, bid, meta)) - else: - d.addCallback(lambda meta: self.session.wallet.claim_name(name, bid, meta)) - if sd_hash: - d.addCallback(lambda claim_out: _reflect_if_possible(sd_hash, claim_out)) - - d.addCallback(lambda claim_out: self._add_to_pending_claims(name, claim_out)) + d = self._publish_stream(name, bid, metadata, file_path, fee) d.addCallback(lambda r: self._render_response(r)) - return d @AuthJSONRPCServer.auth_required @@ -1730,6 +1674,7 @@ class Daemon(AuthJSONRPCServer): return self.jsonrpc_claim_abandon(**kwargs) @AuthJSONRPCServer.auth_required + @defer.inlineCallbacks def jsonrpc_claim_abandon(self, txid, nout): """ Abandon a name and reclaim credits from the claim @@ -1741,15 +1686,14 @@ class Daemon(AuthJSONRPCServer): txid : txid of resulting transaction if succesful fee : fee paid for the transaction if succesful """ - def _disp(x): - log.info("Abandoned name claim tx " + str(x)) - return self._render_response(x) - d = defer.Deferred() - d.addCallback(lambda _: self.session.wallet.abandon_claim(txid, nout)) - d.addCallback(_disp) - d.callback(None) # TODO: is this line necessary??? - return d + try: + abandon_claim_tx = yield self.session.wallet.abandon_claim(txid, nout) + response = yield self._render_response(abandon_claim_tx) + except Exception as err: + log.warning(err) + response = yield self._render_response(err) + defer.returnValue(response) @AuthJSONRPCServer.auth_required def jsonrpc_abandon_name(self, **kwargs): @@ -2279,7 +2223,7 @@ class Daemon(AuthJSONRPCServer): """ d = self.session.blob_manager.get_all_verified_blobs() - d.addCallback(self._reflect_blobs) + d.addCallback(reupload.reflect_blob_hashes, self.session.blob_manager) d.addCallback(lambda r: self._render_response(r)) return d @@ -2388,9 +2332,7 @@ def get_sd_hash(stream_info): class _DownloadNameHelper(object): - def __init__(self, daemon, name, - timeout=None, - download_directory=None, file_name=None, + def __init__(self, daemon, name, timeout=None, download_directory=None, file_name=None, wait_for_write=True): self.daemon = daemon self.name = name @@ -2440,16 +2382,24 @@ class _DownloadNameHelper(object): @defer.inlineCallbacks def _get_stream(self, stream_info): - was_successful, sd_hash, download_path = yield self.daemon.add_stream( - self.name, self.timeout, self.download_directory, self.file_name, stream_info) - if not was_successful: - log.warning("lbry://%s timed out, removing from streams", self.name) + try: + download_path = yield self.daemon.add_stream( + self.name, self.timeout, self.download_directory, self.file_name, stream_info) + except (InsufficientFundsError, Exception) as err: + if Failure(err).check(InsufficientFundsError): + log.warning("Insufficient funds to download lbry://%s", self.name) + self.remove_from_wait("Insufficient funds") + else: + log.warning("lbry://%s timed out, removing from streams", self.name) + self.remove_from_wait("Timed out") + if self.daemon.streams[self.name].downloader is not None: + yield self.daemon._delete_lbry_file(self.daemon.streams[self.name].downloader) del self.daemon.streams[self.name] - self.remove_from_wait("Timed out") - raise Exception("Timed out") + raise err + if self.wait_for_write: yield self._wait_for_write() - defer.returnValue((sd_hash, download_path)) + defer.returnValue((self.daemon.streams[self.name].sd_hash, download_path)) def _wait_for_write(self): d = defer.succeed(None) @@ -2699,13 +2649,3 @@ def get_lbry_file_search_value(search_fields): if value: return searchtype, value raise NoValidSearch('{} is missing a valid search type'.format(search_fields)) - - -def run_reflector_factory(factory): - reflector_server = random.choice(conf.settings['reflector_servers']) - reflector_address, reflector_port = reflector_server - log.info("Start reflector client") - d = reactor.resolve(reflector_address) - d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) - d.addCallback(lambda _: factory.finished_deferred) - return d diff --git a/lbrynet/lbrynet_daemon/Downloader.py b/lbrynet/lbrynet_daemon/Downloader.py index af2beb4bd..abcc351ac 100644 --- a/lbrynet/lbrynet_daemon/Downloader.py +++ b/lbrynet/lbrynet_daemon/Downloader.py @@ -1,7 +1,5 @@ import logging import os - -from copy import deepcopy from twisted.internet import defer from twisted.internet.task import LoopingCall @@ -15,7 +13,6 @@ INITIALIZING_CODE = 'initializing' DOWNLOAD_METADATA_CODE = 'downloading_metadata' DOWNLOAD_TIMEOUT_CODE = 'timeout' DOWNLOAD_RUNNING_CODE = 'running' -# TODO: is this ever used? DOWNLOAD_STOPPED_CODE = 'stopped' STREAM_STAGES = [ (INITIALIZING_CODE, 'Initializing'), @@ -29,144 +26,160 @@ STREAM_STAGES = [ log = logging.getLogger(__name__) +def safe_start(looping_call): + if not looping_call.running: + looping_call.start(1) + + +def safe_stop(looping_call): + if looping_call.running: + looping_call.stop() + + class GetStream(object): - def __init__(self, sd_identifier, session, wallet, - lbry_file_manager, exchange_rate_manager, - max_key_fee, data_rate=0.5, timeout=None, - download_directory=None, file_name=None): - if timeout is None: - timeout = conf.settings['download_timeout'] - self.wallet = wallet - self.resolved_name = None - self.description = None - self.fee = None - self.data_rate = data_rate + def __init__(self, sd_identifier, session, wallet, lbry_file_manager, exchange_rate_manager, + max_key_fee, data_rate=None, timeout=None, download_directory=None, + file_name=None): + self.timeout = timeout or conf.settings['download_timeout'] + self.data_rate = data_rate or conf.settings['data_rate'] + self.max_key_fee = max_key_fee or conf.settings['max_key_fee'][1] + self.download_directory = download_directory or conf.settings['download_directory'] self.file_name = file_name + self.timeout_counter = 0 + self.code = None + self.sd_hash = None + self.wallet = wallet self.session = session self.exchange_rate_manager = exchange_rate_manager self.payment_rate_manager = self.session.payment_rate_manager self.lbry_file_manager = lbry_file_manager self.sd_identifier = sd_identifier - self.sd_hash = None - self.max_key_fee = max_key_fee - self.stream_info = None - self.stream_info_manager = None - self._d = defer.Deferred(None) - self.timeout = timeout - self.timeout_counter = 0 - self.download_directory = download_directory - self.download_path = None self.downloader = None - # fired after the metadata has been downloaded and the - # actual file has been started - self.finished = defer.Deferred(None) self.checker = LoopingCall(self.check_status) - self.code = STREAM_STAGES[0] + + # fired when the download is complete + self.finished_deferred = defer.Deferred(None) + # fired after the metadata and the first data blob have been downloaded + self.data_downloading_deferred = defer.Deferred(None) + + @property + def download_path(self): + return os.path.join(self.download_directory, self.downloader.file_name) + + def _check_status(self, status): + if status.num_completed and not self.data_downloading_deferred.called: + self.data_downloading_deferred.callback(True) + if self.data_downloading_deferred.called: + safe_stop(self.checker) + else: + log.info("Downloading stream data (%i seconds)", self.timeout_counter) def check_status(self): + """ + Check if we've got the first data blob in the stream yet + """ + self.timeout_counter += 1 + if self.timeout_counter >= self.timeout: + if not self.data_downloading_deferred.called: + self.data_downloading_deferred.errback(Exception("Timeout")) + safe_stop(self.checker) + elif self.downloader: + d = self.downloader.status() + d.addCallback(self._check_status) + else: + log.info("Downloading stream descriptor blob (%i seconds)", self.timeout_counter) - # download_path is set after the sd blob has been downloaded - if self.download_path: - self.checker.stop() - self.finished.callback((True, self.sd_hash, self.download_path)) - - elif self.timeout_counter >= self.timeout: - log.info("Timeout downloading lbry://%s", self.resolved_name) - self.checker.stop() - self._d.cancel() - self.code = STREAM_STAGES[4] - self.finished.callback((False, None, None)) - - def _convert_max_fee(self): + def convert_max_fee(self): max_fee = FeeValidator(self.max_key_fee) if max_fee.currency_symbol == "LBC": return max_fee.amount return self.exchange_rate_manager.to_lbc(self.max_key_fee).amount + def set_status(self, status, name): + log.info("Download lbry://%s status changed to %s" % (name, status)) + self.code = next(s for s in STREAM_STAGES if s[0] == status) + + def check_fee(self, fee): + validated_fee = FeeValidator(fee) + max_key_fee = self.convert_max_fee() + converted_fee = self.exchange_rate_manager.to_lbc(validated_fee).amount + if converted_fee > self.wallet.get_balance(): + raise InsufficientFundsError('Unable to pay the key fee of %s' % converted_fee) + if converted_fee > max_key_fee: + raise KeyFeeAboveMaxAllowed('Key fee %s above max allowed %s' % (converted_fee, + max_key_fee)) + return validated_fee + + def get_downloader_factory(self, factories): + for factory in factories: + if isinstance(factory, ManagedEncryptedFileDownloaderFactory): + return factory + raise Exception('No suitable factory was found in {}'.format(factories)) + + @defer.inlineCallbacks + def get_downloader(self, factory, stream_metadata): + downloader_options = [self.data_rate, True] + downloader = yield factory.make_downloader(stream_metadata, downloader_options, + self.payment_rate_manager, + download_directory=self.download_directory, + file_name=self.file_name) + defer.returnValue(downloader) + + def _pay_key_fee(self, address, fee_lbc, name): + log.info("Pay key fee %f --> %s", fee_lbc, address) + reserved_points = self.wallet.reserve_points(address, fee_lbc) + if reserved_points is None: + raise InsufficientFundsError('Unable to pay the key fee of %s for %s' % (fee_lbc, name)) + return self.wallet.send_points_to_address(reserved_points, fee_lbc) + + @defer.inlineCallbacks + def pay_key_fee(self, fee, name): + if fee is not None: + fee_lbc = self.exchange_rate_manager.to_lbc(fee).amount + yield self._pay_key_fee(fee.address, fee_lbc, name) + else: + defer.returnValue(None) + + @defer.inlineCallbacks + def finish(self, results, name): + self.set_status(DOWNLOAD_STOPPED_CODE, name) + log.info("Finished downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], + self.download_path) + safe_stop(self.checker) + status = yield self.downloader.status() + self._check_status(status) + defer.returnValue(self.download_path) + + @defer.inlineCallbacks + def download(self, stream_info, name): + self.set_status(INITIALIZING_CODE, name) + self.sd_hash = stream_info['sources']['lbry_sd_hash'] + if 'fee' in stream_info: + fee = self.check_fee(stream_info['fee']) + else: + fee = None + + self.set_status(DOWNLOAD_METADATA_CODE, name) + sd_blob = yield download_sd_blob(self.session, self.sd_hash, self.payment_rate_manager) + stream_metadata = yield self.sd_identifier.get_metadata_for_sd_blob(sd_blob) + factory = self.get_downloader_factory(stream_metadata.factories) + self.downloader = yield self.get_downloader(factory, stream_metadata) + + self.set_status(DOWNLOAD_RUNNING_CODE, name) + if fee: + yield self.pay_key_fee(fee, name) + log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path) + self.finished_deferred = self.downloader.start() + self.finished_deferred.addCallback(self.finish, name) + yield self.data_downloading_deferred + + @defer.inlineCallbacks def start(self, stream_info, name): - def _cancel(err): - # this callback sequence gets cancelled in check_status if - # it takes too long when that happens, we want the logic - # to live in check_status - if err.check(defer.CancelledError): - return - if self.checker: - self.checker.stop() - self.finished.errback(err) - - def _set_status(x, status): - log.info("Download lbry://%s status changed to %s" % (self.resolved_name, status)) - self.code = next(s for s in STREAM_STAGES if s[0] == status) - return x - - def get_downloader_factory(metadata): - for factory in metadata.factories: - if isinstance(factory, ManagedEncryptedFileDownloaderFactory): - return factory, metadata - raise Exception('No suitable factory was found in {}'.format(metadata.factories)) - - def make_downloader(args): - factory, metadata = args - return factory.make_downloader(metadata, - [self.data_rate, True], - self.payment_rate_manager, - download_directory=self.download_directory, - file_name=self.file_name) - - self.resolved_name = name - self.stream_info = deepcopy(stream_info) - self.description = self.stream_info['description'] - self.sd_hash = self.stream_info['sources']['lbry_sd_hash'] - - if 'fee' in self.stream_info: - self.fee = FeeValidator(self.stream_info['fee']) - max_key_fee = self._convert_max_fee() - converted_fee = self.exchange_rate_manager.to_lbc(self.fee).amount - if converted_fee > self.wallet.get_balance(): - msg = "Insufficient funds to download lbry://{}. Need {:0.2f}, have {:0.2f}".format( - self.resolved_name, converted_fee, self.wallet.get_balance()) - raise InsufficientFundsError(msg) - if converted_fee > max_key_fee: - msg = "Key fee {:0.2f} above limit of {:0.2f} didn't download lbry://{}".format( - converted_fee, max_key_fee, self.resolved_name) - raise KeyFeeAboveMaxAllowed(msg) - log.info( - "Key fee %f below limit of %f, downloading lbry://%s", - converted_fee, max_key_fee, self.resolved_name) - - self.checker.start(1) - - self._d.addCallback(lambda _: _set_status(None, DOWNLOAD_METADATA_CODE)) - self._d.addCallback(lambda _: download_sd_blob( - self.session, self.sd_hash, self.payment_rate_manager)) - self._d.addCallback(self.sd_identifier.get_metadata_for_sd_blob) - self._d.addCallback(lambda r: _set_status(r, DOWNLOAD_RUNNING_CODE)) - self._d.addCallback(get_downloader_factory) - self._d.addCallback(make_downloader) - self._d.addCallbacks(self._start_download, _cancel) - self._d.callback(None) - - return self.finished - - def _start_download(self, downloader): - log.info('Starting download for %s', self.resolved_name) - self.downloader = downloader - self.download_path = os.path.join(downloader.download_directory, downloader.file_name) - - d = self._pay_key_fee() - d.addCallback(lambda _: log.info( - "Downloading %s --> %s", self.sd_hash, self.downloader.file_name)) - d.addCallback(lambda _: self.downloader.start()) - - def _pay_key_fee(self): - if self.fee is not None: - fee_lbc = self.exchange_rate_manager.to_lbc(self.fee).amount - reserved_points = self.wallet.reserve_points(self.fee.address, fee_lbc) - if reserved_points is None: - log.warning('Unable to pay the key fee of %s for %s', fee_lbc, self.resolved_name) - # TODO: If we get here, nobody will know that there was an error - # as nobody actually cares about self._d - return defer.fail(InsufficientFundsError()) - return self.wallet.send_points_to_address(reserved_points, fee_lbc) - return defer.succeed(None) + try: + safe_start(self.checker) + yield self.download(stream_info, name) + defer.returnValue(self.download_path) + except Exception as err: + safe_stop(self.checker) + raise err diff --git a/lbrynet/lbrynet_daemon/Publisher.py b/lbrynet/lbrynet_daemon/Publisher.py index 39dcb98e6..ad7078714 100644 --- a/lbrynet/lbrynet_daemon/Publisher.py +++ b/lbrynet/lbrynet_daemon/Publisher.py @@ -1,16 +1,13 @@ import logging import mimetypes import os -import random - -from twisted.internet import threads, defer, reactor +from twisted.internet import defer +from lbrynet.core import file_utils from lbrynet.lbryfilemanager.EncryptedFileCreator import create_lbry_file from lbrynet.lbryfile.StreamDescriptor import publish_sd_blob from lbrynet.metadata.Metadata import Metadata -from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloader -from lbrynet import reflector -from lbrynet import conf +from lbrynet.metadata.Fee import FeeValidator log = logging.getLogger(__name__) @@ -21,154 +18,52 @@ class Publisher(object): self.session = session self.lbry_file_manager = lbry_file_manager self.wallet = wallet - self.received_file_name = False - self.file_path = None - self.file_name = None - self.publish_name = None - self.bid_amount = None - self.verified = False self.lbry_file = None - self.txid = None - self.nout = None - self.claim_id = None - self.fee = None - self.stream_hash = None - # TODO: this needs to be passed into the constructor - reflector_server = random.choice(conf.settings['reflector_servers']) - self.reflector_server, self.reflector_port = reflector_server[0], reflector_server[1] - self.metadata = {} - def start(self, name, file_path, bid, metadata): + @defer.inlineCallbacks + def add_fee_to_metadata(self, metadata, fee): + metadata['fee'] = FeeValidator(fee) + assert len(fee) == 1, "Too many fees" + for currency in fee: + if 'address' not in fee[currency]: + new_address = yield self.session.wallet.get_new_address() + fee[currency]['address'] = new_address + metadata['fee'] = FeeValidator(fee) + defer.returnValue(metadata) + + @defer.inlineCallbacks + def publish_stream(self, name, file_path, bid, metadata): log.info('Starting publish for %s', name) - - def _show_result(): - log.info( - "Success! Published %s --> lbry://%s txid: %s nout: %d", - self.file_name, self.publish_name, self.txid, self.nout - ) - return defer.succeed({ - 'nout': self.nout, - 'txid': self.txid, - 'claim_id': self.claim_id, - 'fee': self.fee, - }) - - self.publish_name = name - self.file_path = file_path - self.bid_amount = bid - self.metadata = metadata - - # TODO: we cannot have this sort of code scattered throughout - # our code base. Use polymorphism instead - if os.name == "nt": - file_mode = 'rb' - else: - file_mode = 'r' - - d = self._check_file_path(self.file_path) - # TODO: ensure that we aren't leaving this resource open - d.addCallback(lambda _: create_lbry_file(self.session, self.lbry_file_manager, - self.file_name, open(self.file_path, file_mode))) - d.addCallback(self.add_to_lbry_files) - d.addCallback(lambda _: self._create_sd_blob()) - d.addCallback(lambda _: self._claim_name()) - d.addCallback(lambda _: self.set_status()) - d.addCallback(lambda _: self.start_reflector()) - d.addCallbacks( - lambda _: _show_result(), - errback=log.fail(self._throw_publish_error), - errbackArgs=( - "An error occurred publishing %s to %s", self.file_name, self.publish_name) - ) - return d - - def start_reflector(self): - # TODO: is self.reflector_server unused? - reflector_server = random.choice(conf.settings['reflector_servers']) - reflector_address, reflector_port = reflector_server[0], reflector_server[1] - log.info("Reflecting new publication") - factory = reflector.ClientFactory( - self.session.blob_manager, - self.lbry_file_manager.stream_info_manager, - self.stream_hash, - self.publish_name - ) - d = reactor.resolve(reflector_address) - d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) - d.addCallback(lambda _: factory.finished_deferred) - return d - - def _check_file_path(self, file_path): - def check_file_threaded(): - f = open(file_path) - f.close() - self.file_name = os.path.basename(self.file_path) - return True - return threads.deferToThread(check_file_threaded) - - def set_lbry_file(self, lbry_file_downloader): - self.lbry_file = lbry_file_downloader - return defer.succeed(None) - - def add_to_lbry_files(self, stream_hash): - self.stream_hash = stream_hash + file_name = os.path.basename(file_path) + with file_utils.get_read_handle(file_path) as read_handle: + stream_hash = yield create_lbry_file(self.session, self.lbry_file_manager, file_name, + read_handle) prm = self.session.payment_rate_manager - d = self.lbry_file_manager.add_lbry_file(stream_hash, prm) - d.addCallback(self.set_lbry_file) - return d + self.lbry_file = yield self.lbry_file_manager.add_lbry_file(stream_hash, prm) + sd_hash = yield publish_sd_blob(self.lbry_file_manager.stream_info_manager, + self.session.blob_manager, self.lbry_file.stream_hash) + if 'sources' not in metadata: + metadata['sources'] = {} + metadata['sources']['lbry_sd_hash'] = sd_hash + metadata['content_type'] = get_content_type(file_path) + metadata['ver'] = Metadata.current_version + claim_out = yield self.make_claim(name, bid, metadata) + defer.returnValue(claim_out) - def _create_sd_blob(self): - log.debug('Creating stream descriptor blob') - d = publish_sd_blob(self.lbry_file_manager.stream_info_manager, - self.session.blob_manager, - self.lbry_file.stream_hash) + @defer.inlineCallbacks + def update_stream(self, name, bid, metadata): + my_claim = yield self.wallet.get_my_claim(name) + updated_metadata = my_claim['value'] + for meta_key in metadata: + updated_metadata[meta_key] = metadata[meta_key] + claim_out = yield self.make_claim(name, bid, updated_metadata) + defer.returnValue(claim_out) - def set_sd_hash(sd_hash): - log.debug('stream descriptor hash: %s', sd_hash) - if 'sources' not in self.metadata: - self.metadata['sources'] = {} - self.metadata['sources']['lbry_sd_hash'] = sd_hash - - d.addCallback(set_sd_hash) - return d - - def set_status(self): - log.debug('Setting status') - d = self.lbry_file_manager.change_lbry_file_status( - self.lbry_file, ManagedEncryptedFileDownloader.STATUS_FINISHED) - d.addCallback(lambda _: self.lbry_file.restore()) - return d - - def _claim_name(self): - log.debug('Claiming name') - self._update_metadata() - m = Metadata(self.metadata) - - def set_claim_out(claim_out): - log.debug('Name claimed using txid: %s, nout: %d, claim_id: %s, fee :%f', - claim_out['txid'], claim_out['nout'], - claim_out['claim_id'], claim_out['fee']) - self.txid = claim_out['txid'] - self.nout = claim_out['nout'] - self.claim_id = claim_out['claim_id'] - self.fee = claim_out['fee'] - - d = self.wallet.claim_name(self.publish_name, self.bid_amount, m) - d.addCallback(set_claim_out) - return d - - def _update_metadata(self): - filename = os.path.join(self.lbry_file.download_directory, self.lbry_file.file_name) - self.metadata['content_type'] = get_content_type(filename) - self.metadata['ver'] = Metadata.current_version - - def _throw_publish_error(self, err): - # TODO: I'm not a fan of the log and re-throw, especially when - # the new exception is more generic. Look over this to - # see if there is a reason not to remove the errback - # handler and allow the original exception to move up - # the stack. - raise Exception("Publish failed") + @defer.inlineCallbacks + def make_claim(self, name, bid, metadata): + validated_metadata = Metadata(metadata) + claim_out = yield self.wallet.claim_name(name, bid, validated_metadata) + defer.returnValue(claim_out) def get_content_type(filename): diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index bc9b2e728..b67ca1b1a 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -16,7 +16,6 @@ class EncryptedFileReflectorClient(Protocol): # Protocol stuff def connectionMade(self): log.debug("Connected to reflector") - self.blob_manager = self.factory.blob_manager self.response_buff = '' self.outgoing_buff = '' self.blob_hashes_to_send = [] @@ -25,8 +24,6 @@ class EncryptedFileReflectorClient(Protocol): self.read_handle = None self.sent_stream_info = False self.received_descriptor_response = False - self.protocol_version = self.factory.protocol_version - self.lbry_uri = "lbry://%s" % self.factory.lbry_uri self.received_server_version = False self.server_version = None self.stream_descriptor = None @@ -41,6 +38,26 @@ class EncryptedFileReflectorClient(Protocol): d.addErrback( lambda err: log.warning("An error occurred immediately: %s", err.getTraceback())) + @property + def lbry_uri(self): + return "lbry://%s" % self.factory.lbry_uri + + @property + def blob_manager(self): + return self.factory.blob_manager + + @property + def protocol_version(self): + return self.factory.protocol_version + + @property + def stream_info_manager(self): + return self.factory.stream_info_manager + + @property + def stream_hash(self): + return self.factory.stream_hash + def dataReceived(self, data): self.response_buff += data try: @@ -56,16 +73,22 @@ class EncryptedFileReflectorClient(Protocol): def connectionLost(self, reason): if reason.check(error.ConnectionDone): if not self.needed_blobs: - log.info("Reflector has all blobs for %s", self.lbry_uri) + log.info("Reflector has all blobs for %s (%s)", + self.lbry_uri, self.stream_descriptor) elif not self.reflected_blobs: - log.info("No more completed blobs for %s to reflect, %i are still needed", - self.lbry_uri, len(self.needed_blobs)) + log.info("No more completed blobs for %s (%s) to reflect, %i are still needed", + self.lbry_uri, self.stream_descriptor, len(self.needed_blobs)) else: - log.info('Finished sending reflector %i blobs for %s', - len(self.reflected_blobs), self.lbry_uri) + log.info('Finished sending reflector %i blobs for %s (%s)', + len(self.reflected_blobs), self.lbry_uri, self.stream_descriptor) + self.factory.finished_deferred.callback(self.reflected_blobs) + elif reason.check(error.ConnectionLost): + log.warning("Stopped reflecting %s (%s) after sending %i blobs", self.lbry_uri, + self.stream_descriptor, len(self.reflected_blobs)) self.factory.finished_deferred.callback(self.reflected_blobs) else: - log.info('Reflector finished for %s: %s', self.lbry_uri, reason) + log.info('Reflector finished for %s (%s): %s', self.lbry_uri, self.stream_descriptor, + reason) self.factory.finished_deferred.callback(reason) # IConsumer stuff @@ -118,6 +141,7 @@ class EncryptedFileReflectorClient(Protocol): [blob for blob in filtered if blob.blob_hash in self.needed_blobs]) d.addCallback(_show_missing_blobs) d.addCallback(self.set_blobs_to_send) + d.addCallback(lambda _: None if self.descriptor_needed else self.set_not_uploading()) return d def send_request(self, request_dict): @@ -158,8 +182,9 @@ class EncryptedFileReflectorClient(Protocol): self.next_blob_to_send.close_read_handle(self.read_handle) self.read_handle = None self.next_blob_to_send = None - self.file_sender.stopProducing() - self.file_sender = None + if self.file_sender is not None: + self.file_sender.stopProducing() + self.file_sender = None return defer.succeed(None) def start_transfer(self): @@ -292,15 +317,31 @@ class EncryptedFileReflectorClient(Protocol): class EncryptedFileReflectorClientFactory(ClientFactory): protocol = EncryptedFileReflectorClient - def __init__(self, blob_manager, stream_info_manager, stream_hash, lbry_uri): - self.protocol_version = REFLECTOR_V2 - self.blob_manager = blob_manager - self.stream_info_manager = stream_info_manager - self.stream_hash = stream_hash - self.lbry_uri = lbry_uri + def __init__(self, lbry_file): + self._lbry_file = lbry_file self.p = None self.finished_deferred = defer.Deferred() + @property + def blob_manager(self): + return self._lbry_file.blob_manager + + @property + def stream_info_manager(self): + return self._lbry_file.stream_info_manager + + @property + def stream_hash(self): + return self._lbry_file.stream_hash + + @property + def lbry_uri(self): + return self._lbry_file.uri + + @property + def protocol_version(self): + return REFLECTOR_V2 + def buildProtocol(self, addr): p = self.protocol() p.factory = self diff --git a/lbrynet/reflector/reupload.py b/lbrynet/reflector/reupload.py index 1a64f8a86..9c7432f48 100644 --- a/lbrynet/reflector/reupload.py +++ b/lbrynet/reflector/reupload.py @@ -1,45 +1,33 @@ -import logging -from twisted.internet import reactor -from twisted.internet.error import ConnectionLost, ConnectionDone -from lbrynet.reflector import BlobClientFactory, ClientFactory +import random -log = logging.getLogger(__name__) - - -def _check_if_reflector_has_stream(lbry_file, reflector_server): - reflector_address, reflector_port = reflector_server[0], reflector_server[1] - factory = BlobClientFactory( - lbry_file.blob_manager, - [lbry_file.sd_hash] - ) - d = reactor.resolve(reflector_address) - d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) - d.addCallback(lambda _: factory.finished_deferred) - d.addCallback(lambda _: not factory.sent_blobs) - return d +from twisted.internet import reactor, defer +from lbrynet import conf +from lbrynet.reflector import ClientFactory, BlobClientFactory +@defer.inlineCallbacks def _reflect_stream(lbry_file, reflector_server): reflector_address, reflector_port = reflector_server[0], reflector_server[1] - factory = ClientFactory( - lbry_file.blob_manager, - lbry_file.stream_info_manager, - lbry_file.stream_hash, - lbry_file.uri - ) - d = reactor.resolve(reflector_address) - d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) - d.addCallback(lambda _: factory.finished_deferred) - return d + factory = ClientFactory(lbry_file) + ip = yield reactor.resolve(reflector_address) + yield reactor.connectTCP(ip, reflector_port, factory) + yield factory.finished_deferred -def _catch_error(err, uri): - msg = "An error occurred while checking availability for lbry://%s: %s" - log.error(msg, uri, err.getTraceback()) +@defer.inlineCallbacks +def _reflect_blobs(blob_manager, blob_hashes, reflector_server): + reflector_address, reflector_port = reflector_server[0], reflector_server[1] + factory = BlobClientFactory(blob_manager, blob_hashes) + ip = yield reactor.resolve(reflector_address) + yield reactor.connectTCP(ip, reflector_port, factory) + yield factory.finished_deferred -def check_and_restore_availability(lbry_file, reflector_server): - d = _reflect_stream(lbry_file, reflector_server) - d.addErrback(lambda err: err.trap(ConnectionDone, ConnectionLost)) - d.addErrback(_catch_error, lbry_file.uri) - return d +def reflect_stream(lbry_file): + reflector_server = random.choice(conf.settings['reflector_servers']) + return _reflect_stream(lbry_file, reflector_server) + + +def reflect_blob_hashes(blob_hashes, blob_manager): + reflector_server = random.choice(conf.settings['reflector_servers']) + return _reflect_blobs(blob_manager, blob_hashes, reflector_server) diff --git a/tests/functional/test_reflector.py b/tests/functional/test_reflector.py index 4d88842ab..7067d9776 100644 --- a/tests/functional/test_reflector.py +++ b/tests/functional/test_reflector.py @@ -170,12 +170,10 @@ class TestReflector(unittest.TestCase): return d def send_to_server(): - factory = reflector.ClientFactory( - self.session.blob_manager, - self.stream_info_manager, - self.stream_hash, - "fake_uri" - ) + fake_lbry_file = mocks.FakeLBRYFile(self.session.blob_manager, + self.stream_info_manager, + self.stream_hash) + factory = reflector.ClientFactory(fake_lbry_file) from twisted.internet import reactor reactor.connectTCP('localhost', self.port, factory) diff --git a/tests/mocks.py b/tests/mocks.py index 96845ad43..69f89b15d 100644 --- a/tests/mocks.py +++ b/tests/mocks.py @@ -10,6 +10,15 @@ from lbrynet import conf KB = 2**10 + +class FakeLBRYFile(object): + def __init__(self, blob_manager, stream_info_manager, stream_hash, uri="fake_uri"): + self.blob_manager = blob_manager + self.stream_info_manager = stream_info_manager + self.stream_hash = stream_hash + self.uri = "fake_uri" + + class Node(object): def __init__(self, *args, **kwargs): pass