diff --git a/lbrynet/lbrynet_daemon/LBRYDaemon.py b/lbrynet/lbrynet_daemon/LBRYDaemon.py index aa21d434a..8c7eede53 100644 --- a/lbrynet/lbrynet_daemon/LBRYDaemon.py +++ b/lbrynet/lbrynet_daemon/LBRYDaemon.py @@ -1,5 +1,3 @@ -import binascii -import webbrowser from lbrynet.core.Error import UnknownNameError from lbrynet.lbryfile.StreamDescriptor import LBRYFileStreamType from lbrynet.lbryfile.client.LBRYFileDownloader import LBRYFileSaverFactory, LBRYFileOpenerFactory @@ -20,29 +18,24 @@ from datetime import datetime import logging import os import sys -import sqlite3 import json +import binascii +import webbrowser log = logging.getLogger(__name__) #TODO add login credentials in a conf file +#issues with delete: +#TODO when stream is complete the generated file doesn't delete, but blobs do +#TODO when stream is stopped the generated file is deleted -class DummyDownloader(object): - def __init__(self, directory, file_name): - self.download_directory = directory - self.file_name = file_name - - -class DummyStream(object): - def __init__(self, row): - download_directory = os.path.join(*row[2].split('/')[:-1]) - file_name = row[2].split('/')[len(row[2].split('/')) - 1] - - self.stream_hash = row[0] - self.downloader = DummyDownloader(download_directory, file_name) - self.is_dummy = True +#functions to add: +#TODO publish +#TODO send credits to address +#TODO get new address +#TODO alert if your copy of a lbry file is out of date with the name record class LBRYDaemon(xmlrpc.XMLRPC): @@ -62,7 +55,6 @@ class LBRYDaemon(xmlrpc.XMLRPC): self.peer_port = 3333 self.dht_node_port = 4444 self.first_run = False - self.current_db_revision = 1 if os.name == "nt": from lbrynet.winhelpers.knownpaths import get_path, FOLDERID, UserHandle self.download_directory = get_path(FOLDERID.Downloads, UserHandle.current) @@ -106,8 +98,6 @@ class LBRYDaemon(xmlrpc.XMLRPC): self.session_settings = None self.data_rate = 0.5 self.max_key_fee = 100.0 - self.db = None - self.cur = None return defer.succeed(None) def _disp_startup(): @@ -119,14 +109,13 @@ class LBRYDaemon(xmlrpc.XMLRPC): d.addCallback(lambda _: threads.deferToThread(self._setup_data_directory)) d.addCallback(lambda _: self._check_db_migration()) d.addCallback(lambda _: self._get_settings()) - d.addCallback(lambda _: self.get_lbrycrdd_path()) + d.addCallback(lambda _: self._get_lbrycrdd_path()) d.addCallback(lambda _: self._get_session()) d.addCallback(lambda _: add_lbry_file_to_sd_identifier(self.sd_identifier)) d.addCallback(lambda _: self._setup_stream_identifier()) d.addCallback(lambda _: self._setup_lbry_file_manager()) d.addCallback(lambda _: self._setup_lbry_file_opener()) d.addCallback(lambda _: self._setup_fetcher()) - d.addCallback(lambda _: self._setup_daemon_db()) d.addCallback(lambda _: _disp_startup()) d.callback(None) @@ -184,38 +173,11 @@ class LBRYDaemon(xmlrpc.XMLRPC): return d return defer.succeed(True) - def _setup_daemon_db(self): - self.db = sqlite3.connect(os.path.join(self.db_dir, 'daemon.sqlite')) - self.cur = self.db.cursor() - - query = "create table if not exists history \ - (stream_hash char(96) primary key not null,\ - uri text not null, \ - path text not null);" - - self.cur.execute(query) - self.db.commit() - - r = self.cur.execute("select * from history") - files = r.fetchall() - - print "Checking files in download history still exist, pruning records of those that don't" - - for file in files: - if not os.path.isfile(file[2]): - print "Couldn't find", file[2], ", removing record" - self.cur.execute("delete from history where stream_hash='" + file[0] + "'") - self.db.commit() - - print "Done checking records" - - return defer.succeed(None) - def _get_settings(self): d = self.settings.start() d.addCallback(lambda _: self.settings.get_lbryid()) d.addCallback(self.set_lbryid) - d.addCallback(lambda _: self.get_lbrycrdd_path()) + d.addCallback(lambda _: self._get_lbrycrdd_path()) return d def set_lbryid(self, lbryid): @@ -285,7 +247,7 @@ class LBRYDaemon(xmlrpc.XMLRPC): dl.addCallback(lambda _: self.session.setup()) return dl - def get_lbrycrdd_path(self): + def _get_lbrycrdd_path(self): def get_lbrycrdd_path_conf_file(): lbrycrdd_path_conf_path = os.path.join(os.path.expanduser("~"), ".lbrycrddpath.conf") if not os.path.exists(lbrycrdd_path_conf_path): @@ -336,91 +298,107 @@ class LBRYDaemon(xmlrpc.XMLRPC): self.sd_identifier.add_stream_downloader_factory(LBRYFileStreamType, downloader_factory) return defer.succeed(True) - def _download_name(self, history, name): - def _disp(stream): - print '[' + str(datetime.now()) + ']' + ' Downloading: ' + str(stream.stream_hash) - log.debug('[' + str(datetime.now()) + ']' + ' Downloading: ' + str(stream.stream_hash)) - return defer.succeed(None) - - if history == 'UnknownNameError': - return 'UnknownNameError' - - if not history: - stream = GetStream(self.sd_identifier, self.session, self.session.wallet, self.lbry_file_manager, - max_key_fee=self.max_key_fee, data_rate=self.data_rate) - - self.downloads.append(stream) - - d = self.session.wallet.get_stream_info_for_name(name) - d.addCallback(lambda stream_info: stream.start(stream_info)) - d.addCallback(lambda _: _disp(stream)) - d.addCallback(lambda _: {'ts': datetime.now(),'name': name}) - d.addErrback(lambda err: str(err.getTraceback())) + def _download_name(self, name): + def _disp_file(file): + print '[' + str(datetime.now()) + ']' + ' Already downloaded: ' + str(file.stream_hash) + d = self._path_from_lbry_file(file) return d - else: - self.downloads.append(DummyStream(history[0])) - return defer.succeed(None) + def _get_stream(name): + def _disp(stream): + print '[' + str(datetime.now()) + ']' + ' Start stream: ' + stream['stream_hash'] + return stream + + d = self.session.wallet.get_stream_info_for_name(name) + stream = GetStream(self.sd_identifier, self.session, self.session.wallet, self.lbry_file_manager, + max_key_fee=self.max_key_fee, data_rate=self.data_rate) + d.addCallback(_disp) + d.addCallback(lambda stream_info: stream.start(stream_info)) + d.addCallback(lambda _: self._path_from_name(name)) + + return d + + d = self._check_history(name) + d.addCallback(lambda lbry_file: _get_stream(name) if not lbry_file else _disp_file(lbry_file)) + d.addCallback(lambda _: self._check_history(name)) + d.addCallback(lambda lbry_file: self._path_from_lbry_file(lbry_file) if lbry_file else 'Not found') + d.addErrback(lambda err: str(err)) - def _path_from_name(self, name): - d = self.session.wallet.get_stream_info_for_name(name) - d.addCallback(lambda stream_info: stream_info['stream_hash']) - d.addCallback(lambda stream_hash: [{'stream_hash': stream.stream_hash, - 'path': os.path.join(stream.downloader.download_directory, - stream.downloader.file_name)} - for stream in self.downloads if stream.stream_hash == stream_hash][0]) - d.addErrback(lambda _: 'UnknownNameError') return d - def _get_downloads(self): - downloads = [] - for stream in self.downloads: - try: - downloads.append({'stream_hash': stream.stream_hash, - 'path': os.path.join(stream.downloader.download_directory, stream.downloader.file_name)}) - except: - pass - return downloads - def _resolve_name(self, name): d = defer.Deferred() d.addCallback(lambda _: self.session.wallet.get_stream_info_for_name(name)) - d.addErrback(lambda _: 'UnknownNameError') - d.callback(None) + d.addErrback(lambda _: defer.fail(UnknownNameError)) + return d - def _check_history(self, name, metadata): - if metadata == 'UnknownNameError': - return 'UnknownNameError' - r = self.cur.execute("select * from history where stream_hash='" + metadata['stream_hash'] + "'") - files = r.fetchall() + def _resolve_name_wc(self, name): + d = defer.Deferred() + d.addCallback(lambda _: self.session.wallet.get_stream_info_for_name(name)) + d.addErrback(lambda _: defer.fail(UnknownNameError)) + d.callback(None) - if files: - if not os.path.isfile(files[0][2]): - print "[" + str(datetime.now()) + "] Couldn't find", files[0][2], ", trying to redownload it" - self.cur.execute("delete from history where stream_hash='" + files[0][0] + "'") - self.db.commit() - return [] + return d + + def _check_history(self, name): + def _get_lbry_file(path): + f = open(path, 'r') + l = json.loads(f.read()) + f.close() + file_name = l['stream_name'].decode('hex') + lbry_file = [file for file in self.lbry_file_manager.lbry_files if file.stream_name == file_name][0] + return lbry_file + + def _check(info): + stream_hash = info['stream_hash'] + path = os.path.join(self.blobfile_dir, stream_hash) + if os.path.isfile(path): + print "[" + str(datetime.now()) + "] Search for lbry_file, returning: " + stream_hash + return defer.succeed(_get_lbry_file(path)) else: - return files + print "[" + str(datetime.now()) + "] Search for lbry_file didn't return anything" + return defer.succeed(False) + + d = self._resolve_name(name) + d.addCallbacks(_check, lambda _: False) + d.callback(None) + + return d + + def _delete_lbry_file(self, lbry_file): + d = self.lbry_file_manager.delete_lbry_file(lbry_file) + + def finish_deletion(lbry_file): + d = lbry_file.delete_data() + d.addCallback(lambda _: _delete_stream_data(lbry_file)) + return d + + def _delete_stream_data(lbry_file): + s_h = lbry_file.stream_hash + d = self.lbry_file_manager.get_count_for_stream_hash(s_h) + # TODO: could possibly be a timing issue here + d.addCallback(lambda c: self.stream_info_manager.delete_stream(s_h) if c == 0 else True) + return d + + d.addCallback(lambda _: finish_deletion(lbry_file)) + return d + + def _path_from_name(self, name): + d = self._check_history(name) + d.addCallback(lambda lbry_file: {'stream_hash': lbry_file.stream_hash, + 'path': os.path.join(self.download_directory, lbry_file.file_name)} + if lbry_file else defer.fail(UnknownNameError)) + return d + + def _path_from_lbry_file(self, lbry_file): + if lbry_file: + r = {'stream_hash': lbry_file.stream_hash, + 'path': os.path.join(self.download_directory, lbry_file.file_name)} + return defer.succeed(r) else: - return files - - def _add_to_history(self, name, path): - if path == 'UnknownNameError': - return 'UnknownNameError' - - r = self.cur.execute("select * from history where stream_hash='" + path['stream_hash'] + "'") - files = r.fetchall() - if not files: - vals = path['stream_hash'], name, path['path'] - self.cur.execute("insert into history values (?, ?, ?)", vals) - self.db.commit() - else: - print '[' + str(datetime.now()) + '] Already downloaded', path['stream_hash'], '-->', path['path'] - - return path + return defer.fail(UnknownNameError) def xmlrpc_get_settings(self): """ @@ -483,7 +461,6 @@ class LBRYDaemon(xmlrpc.XMLRPC): print 'Shutting down lbrynet daemon' d = self._shutdown() - d.addCallback(lambda _: self.db.close()) d.addCallback(lambda _: _disp_shutdown()) d.addCallback(lambda _: reactor.stop()) d.callback(None) @@ -523,95 +500,53 @@ class LBRYDaemon(xmlrpc.XMLRPC): """ def _disp(info): - log.debug('[' + str(datetime.now()) + ']' + ' Resolved info: ' + str(info)) - print '[' + str(datetime.now()) + ']' + ' Resolved info: ' + str(info) + log.debug('[' + str(datetime.now()) + ']' + ' Resolved info: ' + str(info['stream_hash'])) + print '[' + str(datetime.now()) + ']' + ' Resolved info: ' + str(info['stream_hash']) return info - d = defer.Deferred() - d.addCallback(lambda _: self.session.wallet.get_stream_info_for_name(name)) + d = self._resolve_name(name) d.addCallbacks(_disp, lambda _: str('UnknownNameError')) d.callback(None) return d - def xmlrpc_get_downloads(self): - """ - Get files downloaded in this session - - @return: [{stream_hash, path}] - """ - - downloads = [] - - for stream in self.downloads: - try: - downloads.append({'stream_hash': stream.stream_hash, - 'path': os.path.join(stream.downloader.download_directory, stream.downloader.file_name)}) - except: - pass - - print '[' + str(datetime.now()) + '] Get downloads' - return downloads - - def xmlrpc_download_name(self, name): + def xmlrpc_get(self, name): """ Download stream from a LBRY uri @param: name """ - def _disp(stream): - print '[' + str(datetime.now()) + ']' + ' Downloading: ' + str(stream.stream_hash) - log.debug('[' + str(datetime.now()) + ']' + ' Downloading: ' + str(stream.stream_hash)) - return defer.succeed(None) + d = self._download_name(name) - stream = GetStream(self.sd_identifier, self.session, self.session.wallet, self.lbry_file_manager, - max_key_fee=self.max_key_fee, data_rate=self.data_rate) - - self.downloads.append(stream) - - d = self.session.wallet.get_stream_info_for_name(name) - d.addCallback(lambda stream_info: stream.start(stream_info)) - d.addCallback(lambda _: _disp(stream)) - d.addCallback(lambda _: {'ts': datetime.now(),'name': name}) - d.addErrback(lambda err: str(err.getTraceback())) return d - def xmlrpc_path_from_name(self, name): - """ - Get file path for a downloaded name + def xmlrpc_stop_lbry_file(self, stream_hash): + try: + lbry_file = [f for f in self.lbry_file_manager.lbry_files if f.stream_hash == stream_hash][0] + except IndexError: + return defer.fail(UnknownNameError) - @param: name - @return: {stream_hash, path}: - """ + if not lbry_file.stopped: + d = self.lbry_file_manager.toggle_lbry_file_running(lbry_file) + d.addCallback(lambda _: 'Stream has been stopped') + d.addErrback(lambda err: str(err)) + return d + else: + return defer.succeed('Stream was already stopped') - d = self.session.wallet.get_stream_info_for_name(name) - d.addCallback(lambda stream_info: stream_info['stream_hash']) - d.addCallback(lambda stream_hash: [{'stream_hash': stream.stream_hash, - 'path': os.path.join(stream.downloader.download_directory, - stream.downloader.file_name)} - for stream in self.downloads if stream.stream_hash == stream_hash][0]) - d.addErrback(lambda _: 'UnknownNameError') - return d + def xmlrpc_start_lbry_file(self, stream_hash): + try: + lbry_file = [f for f in self.lbry_file_manager.lbry_files if f.stream_hash == stream_hash][0] + except IndexError: + return defer.fail(UnknownNameError) - def xmlrpc_get(self, name): - """ - Download a name and return the path of the resulting file + if lbry_file.stopped: + d = self.lbry_file_manager.toggle_lbry_file_running(lbry_file) + d.callback(None) + return defer.succeed('Stream started') + else: + return defer.succeed('Stream was already running') - @param: name: - @return: {stream_hash, path}: - """ - - d = self._resolve_name(name) - d.addCallback(lambda metadata: self._check_history(name, metadata)) - d.addCallback(lambda hist: self._download_name(hist, name)) - d.addCallback(lambda _: self._path_from_name(name)) - d.addCallback(lambda path: self._add_to_history(name, path)) - return d - - def xmlrpc_toggle_lbry_file_status(self, stream_hash): - d = self.lbry_file_manager.toggle_lbry_file_running(stream_hash) - d.addErrback(lambda err: str(err)) - return d def xmlrpc_render_html(self, html): def _make_file(html, path): @@ -690,7 +625,7 @@ class LBRYDaemon(xmlrpc.XMLRPC): filtered_results = [n for n in self.rpc_conn.getnametrie() if n['name'].startswith(search)] filtered_results = [n for n in filtered_results if 'txid' in n.keys()] - resolved_results = [defer.DeferredList([_return_d(n), self._resolve_name(n['name'])]) for n in filtered_results] + resolved_results = [defer.DeferredList([_return_d(n), self._resolve_name_wc(n['name'])]) for n in filtered_results] d = defer.DeferredList(resolved_results) d.addCallback(_clean) @@ -699,6 +634,23 @@ class LBRYDaemon(xmlrpc.XMLRPC): return d + def xmlrpc_delete_lbry_file(self, file_name): + def _disp(file_name): + print '[' + str(datetime.now()) + '] Deleted: ' + file_name + return defer.succeed(str('Deleted: ' + file_name)) + + lbry_files = [self._delete_lbry_file(f) for f in self.lbry_file_manager.lbry_files if file_name == f.file_name] + d = defer.DeferredList(lbry_files) + d.addCallback(lambda _: _disp(file_name)) + return d + + def xmlrpc_check(self, name): + d = self._check_history(name) + d.addCallback(lambda lbry_file: self._path_from_lbry_file(lbry_file) if lbry_file else 'Not found') + d.addErrback(lambda err: str(err)) + + return d + def main(): daemon = LBRYDaemon() diff --git a/lbrynet/lbrynet_daemon/LBRYDownloader.py b/lbrynet/lbrynet_daemon/LBRYDownloader.py index 15c5eafde..6895ded68 100644 --- a/lbrynet/lbrynet_daemon/LBRYDownloader.py +++ b/lbrynet/lbrynet_daemon/LBRYDownloader.py @@ -13,31 +13,23 @@ log = logging.getLogger(__name__) class GetStream(object): def __init__(self, sd_identifier, session, wallet, lbry_file_manager, max_key_fee, pay_key=True, data_rate=0.5): - self.finished_deferred = defer.Deferred(None) self.wallet = wallet self.resolved_name = None self.description = None self.key_fee = None self.key_fee_address = None + self.data_rate = data_rate + self.pay_key = pay_key self.name = None self.session = session self.payment_rate_manager = PaymentRateManager(self.session.base_payment_rate_manager) - self.loading_metadata_deferred = defer.Deferred() self.lbry_file_manager = lbry_file_manager self.sd_identifier = sd_identifier - self.metadata = None - self.loading_failed = False - self.resolved_name = None - self.description = None - self.key_fee = None - self.key_fee_address = None self.stream_hash = None self.max_key_fee = max_key_fee self.stream_info = None self.stream_info_manager = None - self.downloader = None - self.data_rate = data_rate - self.pay_key = pay_key + def start(self, stream_info): self.stream_info = stream_info @@ -52,18 +44,9 @@ class GetStream(object): else: self.key_fee = None self.key_fee_address = None - self.stream_hash = self.stream_info['stream_hash'] - elif 'stream_hash' in json.loads(self.stream_info['value']): - self.resolved_name = self.stream_info.get('name', None) - self.description = json.loads(self.stream_info['value']).get('description', None) - try: - if 'key_fee' in json.loads(self.stream_info['value']): - self.key_fee = float(json.loads(self.stream_info['value'])['key_fee']) - except ValueError: - self.key_fee = None - self.key_fee_address = json.loads(self.stream_info['value']).get('key_fee_address', None) - self.stream_hash = json.loads(self.stream_info['value'])['stream_hash'] + self.stream_hash = self.stream_info['stream_hash'] + else: print 'InvalidStreamInfoError' raise InvalidStreamInfoError(self.stream_info) @@ -72,71 +55,42 @@ class GetStream(object): if self.pay_key: print "Key fee (" + str(self.key_fee) + ") above limit of " + str( self.max_key_fee) + ", didn't download lbry://" + str(self.resolved_name) - return self.finished_deferred.callback(None) + return defer.fail(None) else: pass - def _get_downloader_for_return(): - return defer.succeed(self.downloader) + d = defer.Deferred(None) + d.addCallback(lambda _: download_sd_blob(self.session, self.stream_hash, self.payment_rate_manager)) + d.addCallback(self.sd_identifier.get_metadata_for_sd_blob) + d.addCallback(lambda metadata: + metadata.factories[1].make_downloader(metadata, [self.data_rate, True], self.payment_rate_manager)) + d.addErrback(lambda err: err.trap(defer.CancelledError)) + d.addErrback(lambda err: log.error("An exception occurred attempting to load the stream descriptor: %s", err.getTraceback())) + d.addCallback(self._start_download) + d.callback(None) + + return d + + def _start_download(self, downloader): + def _pay_key_fee(): + if self.key_fee is not None and self.key_fee_address is not None: + reserved_points = self.wallet.reserve_points(self.key_fee_address, self.key_fee) + if reserved_points is None: + return defer.fail(InsufficientFundsError()) + print 'Key fee: ' + str(self.key_fee) + ' | ' + str(self.key_fee_address) + return self.wallet.send_points_to_address(reserved_points, self.key_fee) + return defer.succeed(None) - self.loading_metadata_deferred = defer.Deferred(None) - self.loading_metadata_deferred.addCallback( - lambda _: download_sd_blob(self.session, self.stream_hash, self.payment_rate_manager)) - self.loading_metadata_deferred.addCallback(self.sd_identifier.get_metadata_for_sd_blob) - self.loading_metadata_deferred.addCallback(self._handle_metadata) - self.loading_metadata_deferred.addErrback(self._handle_load_canceled) - self.loading_metadata_deferred.addErrback(self._handle_load_failed) if self.pay_key: - self.loading_metadata_deferred.addCallback(lambda _: self._pay_key_fee()) - self.loading_metadata_deferred.addCallback(lambda _: self._make_downloader()) - self.loading_metadata_deferred.addCallback(lambda _: self.downloader.start()) - self.loading_metadata_deferred.addErrback(self._handle_download_error) - self.loading_metadata_deferred.addCallback(lambda _: _get_downloader_for_return()) - self.loading_metadata_deferred.callback(None) - - return defer.succeed(None) - - def _pay_key_fee(self): - if self.key_fee is not None and self.key_fee_address is not None: - reserved_points = self.wallet.reserve_points(self.key_fee_address, self.key_fee) - if reserved_points is None: - return defer.fail(InsufficientFundsError()) - print 'Key fee: ' + str(self.key_fee) + ' | ' + str(self.key_fee_address) - return self.wallet.send_points_to_address(reserved_points, self.key_fee) - return defer.succeed(None) - - def _handle_load_canceled(self, err): - err.trap(defer.CancelledError) - self.finished_deferred.callback(None) - - def _handle_load_failed(self, err): - self.loading_failed = True - log.error("An exception occurred attempting to load the stream descriptor: %s", err.getTraceback()) - print 'Load Failed: ', err.getTraceback() - self.finished_deferred.callback(None) - - def _handle_metadata(self, metadata): - self.metadata = metadata - self.factory = self.metadata.factories[1] - return defer.succeed(None) - - def _handle_download_error(self, err): - if err.check(InsufficientFundsError): - print "Download stopped due to insufficient funds." + d = _pay_key_fee() else: - print "Autoaddstream: An unexpected error has caused the download to stop: ", err.getTraceback() + d = defer.Deferred() - def _make_downloader(self): + downloader.start() - def _set_downloader(downloader): - self.downloader = downloader - print "Downloading", self.stream_hash, "-->", os.path.join(self.downloader.download_directory, - self.downloader.file_name) - return self.downloader + print "Downloading", self.stream_hash, "-->", os.path.join(downloader.download_directory, downloader.file_name) - downloader = self.factory.make_downloader(self.metadata, [self.data_rate, True], self.payment_rate_manager) - downloader.addCallback(_set_downloader) - return downloader + return d class FetcherDaemon(object):