diff --git a/CHANGELOG.md b/CHANGELOG.md index 36afb0d5d..9dc7d21c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,14 +9,14 @@ at anytime. ## [Unreleased] ### Added - * + * Add file filters: `claim_id`, `outpoint`, and `rowid` * * ### Changed - * - * - * + * Change file filter `uri` to `name` and return field `lbry_uri` to `name` + * Refactor file_list, add `full_status` argument to populate resource intensive fields + * Remove deprecated file commands: `get_lbry_files`, `get_lbry_file`, and `file_get` ### Fixed * diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 9a4e63ac8..0cff44eb1 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -42,7 +42,7 @@ from lbrynet.core import log_support, utils, file_utils from lbrynet.core import system_info from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob from lbrynet.core.Session import Session -from lbrynet.core.Wallet import LBRYumWallet, SqliteStorage +from lbrynet.core.Wallet import LBRYumWallet, SqliteStorage, ClaimOutpoint from lbrynet.core.looping_call_manager import LoopingCallManager from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory @@ -92,6 +92,19 @@ PENDING_ID = "not set" SHORT_ID_LEN = 20 +class IterableContainer(object): + def __iter__(self): + for attr in dir(self): + if not attr.startswith("_"): + yield getattr(self, attr) + + def __contains__(self, item): + for attr in self: + if item == attr: + return True + return False + + class Checker: """The looping calls the daemon runs""" INTERNET_CONNECTION = 'internet_connection_checker' @@ -100,12 +113,18 @@ class Checker: PENDING_CLAIM = 'pending_claim_checker' -class FileID: +class _FileID(IterableContainer): """The different ways a file can be identified""" NAME = 'name' SD_HASH = 'sd_hash' FILE_NAME = 'file_name' STREAM_HASH = 'stream_hash' + CLAIM_ID = "claim_id" + OUTPOINT = "outpoint" + ROWID = "rowid" + + +FileID = _FileID() # TODO add login credentials in a conf file @@ -420,7 +439,7 @@ class Daemon(AuthJSONRPCServer): for name in self.pending_claims: log.info("Checking if new claim for lbry://%s is confirmed" % name) d = self._resolve_name(name, force_refresh=True) - d.addCallback(lambda _: self._get_lbry_file_by_uri(name)) + d.addCallback(lambda _: self._get_lbry_file(FileID.NAME, name)) d.addCallbacks( lambda lbry_file: _process_lbry_file(name, lbry_file), lambda _: re_add_to_pending_claims(name) @@ -1008,77 +1027,89 @@ class Daemon(AuthJSONRPCServer): return self.get_est_cost_using_known_size(name, size) return self.get_est_cost_from_name(name) - def _find_lbry_file_by_uri(self, uri): - for lbry_file in self.lbry_file_manager.lbry_files: - if uri == lbry_file.uri: - return lbry_file - raise UnknownNameError(uri) - - def _find_lbry_file_by_sd_hash(self, sd_hash): - for lbry_file in self.lbry_file_manager.lbry_files: - if lbry_file.sd_hash == sd_hash: - return lbry_file - raise NoSuchSDHash(sd_hash) - - def _find_lbry_file_by_file_name(self, file_name): - for lbry_file in self.lbry_file_manager.lbry_files: - if lbry_file.file_name == file_name: - return lbry_file - raise Exception("File %s not found" % file_name) - - def _find_lbry_file_by_stream_hash(self, stream_hash): - for lbry_file in self.lbry_file_manager.lbry_files: - if lbry_file.stream_hash == stream_hash: - return lbry_file - raise NoSuchStreamHash(stream_hash) - @defer.inlineCallbacks - def _get_lbry_file_by_uri(self, name): + def _get_lbry_file_dict(self, lbry_file, full_status=False): + key = binascii.b2a_hex(lbry_file.key) if lbry_file.key else None + full_path = os.path.join(lbry_file.download_directory, lbry_file.file_name) + mime_type = mimetypes.guess_type(full_path)[0] + if os.path.isfile(full_path): + with open(full_path) as written_file: + written_file.seek(0, os.SEEK_END) + written_bytes = written_file.tell() + else: + written_bytes = False + + if full_status: + size = yield lbry_file.get_total_bytes() + file_status = yield lbry_file.status() + message = STREAM_STAGES[2][1] % (file_status.name, file_status.num_completed, + file_status.num_known, file_status.running_status) + else: + size = None + message = None + claim = yield self.session.wallet.get_claim_info(lbry_file.name, + lbry_file.txid, + lbry_file.nout) try: - stream_info = yield self._resolve_name(name) - sd_hash = stream_info['sources']['lbry_sd_hash'] - lbry_file = yield self._get_lbry_file_by_sd_hash(sd_hash) - except (UnknownNameError, NoSuchSDHash): - lbry_file = yield self._find_lbry_file_by_uri(name) - defer.returnValue(lbry_file) - - @defer.inlineCallbacks - def _get_lbry_file_by_sd_hash(self, sd_hash): - lbry_file = yield self._find_lbry_file_by_sd_hash(sd_hash) - defer.returnValue(lbry_file) - - @defer.inlineCallbacks - def _get_lbry_file_by_file_name(self, file_name): - lbry_file = yield self._get_lbry_file_by_file_name(file_name) - defer.returnValue(lbry_file) - - @defer.inlineCallbacks - def _get_lbry_file_by_stream_hash(self, stream_hash): - lbry_file = yield self._find_lbry_file_by_stream_hash(stream_hash) - defer.returnValue(lbry_file) - - @defer.inlineCallbacks - def _get_lbry_file(self, search_by, val, return_json=True): - helper = _GetFileHelper(self, search_by, val, return_json) + metadata = claim['value'] + except: + metadata = None try: - lbry_file = yield helper.retrieve_file() - defer.returnValue(lbry_file) - except Exception as err: - # TODO: do something with the error, don't return None when a file isn't found - defer.returnValue(False) + outpoint = repr(ClaimOutpoint(lbry_file.txid, lbry_file.nout)) + except TypeError: + outpoint = None - def _get_lbry_files(self): - def safe_get(sd_hash): - d = self._get_lbry_file(FileID.SD_HASH, sd_hash) - d.addErrback(log.fail(), 'Failed to get file for hash: %s', sd_hash) - return d + defer.returnValue({ + 'completed': lbry_file.completed, + 'file_name': lbry_file.file_name, + 'download_directory': lbry_file.download_directory, + 'points_paid': lbry_file.points_paid, + 'stopped': lbry_file.stopped, + 'stream_hash': lbry_file.stream_hash, + 'stream_name': lbry_file.stream_name, + 'suggested_file_name': lbry_file.suggested_file_name, + 'sd_hash': lbry_file.sd_hash, + 'name': lbry_file.name, + 'outpoint': outpoint, + 'claim_id': lbry_file.claim_id, + 'download_path': full_path, + 'mime_type': mime_type, + 'key': key, + 'total_bytes': size, + 'written_bytes': written_bytes, + 'message': message, + 'metadata': metadata + }) - d = defer.DeferredList([ - safe_get(l.sd_hash) - for l in self.lbry_file_manager.lbry_files - ]) - return d + @defer.inlineCallbacks + def _get_lbry_file(self, search_by, val, return_json=True, full_status=False): + lbry_file = None + if search_by in FileID: + for l_f in self.lbry_file_manager.lbry_files: + if l_f.__dict__.get(search_by) == val: + lbry_file = l_f + break + else: + raise NoValidSearch('{} is not a valid search operation'.format(search_by)) + if return_json and lbry_file: + lbry_file = yield self._get_lbry_file_dict(lbry_file, full_status=full_status) + defer.returnValue(lbry_file) + @defer.inlineCallbacks + def _get_lbry_files(self, as_json=False, full_status=False, **kwargs): + lbry_files = list(self.lbry_file_manager.lbry_files) + if kwargs: + for search_type, value in iter_lbry_file_search_values(kwargs): + lbry_files = [l_f for l_f in lbry_files if l_f.__dict__[search_type] == value] + if as_json: + file_dicts = [] + for lbry_file in lbry_files: + lbry_file_dict = yield self._get_lbry_file_dict(lbry_file, full_status=full_status) + file_dicts.append(lbry_file_dict) + lbry_files = file_dicts + defer.returnValue(lbry_files) + + # TODO: do this and get_blobs_for_sd_hash in the stream info manager def get_blobs_for_stream_hash(self, stream_hash): def _iter_blobs(blob_hashes): for blob_hash, blob_num, blob_iv, blob_length in blob_hashes: @@ -1112,7 +1143,6 @@ class Daemon(AuthJSONRPCServer): Args: session_status: bool - blockchain_status: bool Returns: daemon status """ @@ -1152,7 +1182,6 @@ class Daemon(AuthJSONRPCServer): 'managed_blobs': len(blobs), 'managed_streams': len(self.lbry_file_manager.lbry_files), } - defer.returnValue(response) def jsonrpc_get_best_blockhash(self): @@ -1426,90 +1455,55 @@ class Daemon(AuthJSONRPCServer): d.addCallback(lambda _: reactor.callLater(0.0, reactor.stop)) return self._render_response("Shutting down") - def jsonrpc_get_lbry_files(self): + @defer.inlineCallbacks + def jsonrpc_file_list(self, **kwargs): """ - DEPRECATED. Use `file_list` instead. - """ - return self.jsonrpc_file_list() - - def jsonrpc_file_list(self): - """ - List files + List files limited by optional filters Args: - None + 'name' (optional): filter files by lbry name, + 'sd_hash' (optional): filter files by sd hash, + 'file_name' (optional): filter files by the name in the downloads folder, + 'stream_hash' (optional): filter files by stream hash, + 'claim_id' (optional): filter files by claim id, + 'outpoint' (optional): filter files by claim outpoint, + 'rowid' (optional): filter files by internal row id, + 'full_status': (optional): bool, if true populate the 'message' and 'size' fields + Returns: - List of files, with the following keys: - 'completed': bool - 'file_name': string - 'key': hex string - 'points_paid': float - 'stopped': bool - 'stream_hash': base 58 string - 'stream_name': string - 'suggested_file_name': string - 'sd_hash': string + [ + { + 'completed': bool, + 'file_name': str, + 'download_directory': str, + 'points_paid': float, + 'stopped': bool, + 'stream_hash': str (hex), + 'stream_name': str, + 'suggested_file_name': str, + 'sd_hash': str (hex), + 'name': str, + 'outpoint': str, (txid:nout) + 'claim_id': str (hex), + 'download_path': str, + 'mime_type': str, + 'key': str (hex), + 'total_bytes': int, None if full_status is False + 'written_bytes': int, + 'message': str, None if full_status is False + 'metadata': Metadata dict + } + ] """ - d = self._get_lbry_files() - d.addCallback(lambda r: self._render_response([d[1] for d in r if d[0]])) - - return d - - def jsonrpc_get_lbry_file(self, **kwargs): - """ - DEPRECATED. Use `file_get` instead. - """ - return self.jsonrpc_file_get(**kwargs) - - def jsonrpc_file_get(self, **kwargs): - """ - Get a file, if no matching file exists returns False - - Args: - 'name': get file by lbry uri, - 'sd_hash': get file by the hash in the name claim, - 'file_name': get file by its name in the downloads folder, - 'stream_hash': get file by its stream hash - Returns: - 'completed': bool, - 'file_name': str, - 'download_directory': str, - 'points_paid': float, - 'stopped': bool, - 'stream_hash': str (hex), - 'stream_name': str, - 'suggested_file_name': str, - 'sd_hash': str (hex), - 'lbry_uri': str, - 'txid': str (b58), - 'claim_id': str (b58), - 'download_path': str, - 'mime_type': str, - 'key': str (hex), - 'total_bytes': int, - 'written_bytes': int, - 'code': str, - 'message': str - 'metadata': Metadata dict if claim is valid, otherwise status str - } - """ - d = self._get_deferred_for_lbry_file(kwargs) - d.addCallback(lambda r: self._render_response(r)) - return d - - def _get_deferred_for_lbry_file(self, search_fields): - try: - searchtype, value = get_lbry_file_search_value(search_fields) - except NoValidSearch: - return defer.fail() - else: - return self._get_lbry_file(searchtype, value) + result = yield self._get_lbry_files(as_json=True, **kwargs) + response = yield self._render_response(result) + defer.returnValue(response) @defer.inlineCallbacks def jsonrpc_resolve_name(self, name, force=False): """ - Resolve stream info from a LBRY uri + Resolve stream info from a LBRY name Args: 'name': name to look up, string, do not include lbry:// prefix @@ -1538,7 +1532,7 @@ class Daemon(AuthJSONRPCServer): def jsonrpc_claim_show(self, name, txid=None, nout=None): """ - Resolve claim info from a LBRY uri + Resolve claim info from a LBRY name Args: 'name': name to look up, string, do not include lbry:// prefix @@ -1567,7 +1561,7 @@ class Daemon(AuthJSONRPCServer): self, name, file_name=None, stream_info=None, timeout=None, download_directory=None, wait_for_write=True): """ - Download stream from a LBRY uri. + Download stream from a LBRY name. Args: 'name': name to download, string @@ -1734,7 +1728,7 @@ class Daemon(AuthJSONRPCServer): Get estimated cost for a lbry stream Args: - 'name': lbry uri + 'name': lbry name 'size': stream size, in bytes. if provided an sd blob won't be downloaded. Returns: estimated cost @@ -2368,7 +2362,7 @@ class Daemon(AuthJSONRPCServer): if uri: metadata = yield self._resolve_name(uri) - sd_hash = get_sd_hash(metadata) + sd_hash = utils.get_sd_hash(metadata) blobs = yield self.get_blobs_for_sd_hash(sd_hash) elif stream_hash: try: @@ -2437,7 +2431,7 @@ class Daemon(AuthJSONRPCServer): Get stream availability for a winning claim Arg: - name (str): lbry uri + name (str): lbry name sd_timeout (int, optional): sd blob download timeout peer_timeout (int, optional): how long to look for peers @@ -2462,7 +2456,7 @@ class Daemon(AuthJSONRPCServer): return decoded_sd_blob metadata = yield self._resolve_name(name) - sd_hash = get_sd_hash(metadata) + sd_hash = utils.get_sd_hash(metadata) sd_timeout = sd_timeout or conf.settings['sd_download_timeout'] peer_timeout = peer_timeout or conf.settings['peer_search_timeout'] blobs = [] @@ -2574,6 +2568,7 @@ class _DownloadNameHelper(object): try: download_path = yield self.daemon.add_stream( self.name, self.timeout, self.download_directory, self.file_name, stream_info) + self.remove_from_wait(None) except (InsufficientFundsError, Exception) as err: if Failure(err).check(InsufficientFundsError): log.warning("Insufficient funds to download lbry://%s", self.name) @@ -2679,126 +2674,6 @@ class _ResolveNameHelper(object): return time_in_cache >= self.daemon.cache_time -class _GetFileHelper(object): - def __init__(self, daemon, search_by, val, return_json=True): - self.daemon = daemon - self.search_by = search_by - self.val = val - self.return_json = return_json - - def retrieve_file(self): - d = self.search_for_file() - if self.return_json: - d.addCallback(self._get_json) - return d - - def search_for_file(self): - if self.search_by == FileID.NAME: - return self.daemon._get_lbry_file_by_uri(self.val) - elif self.search_by == FileID.SD_HASH: - return self.daemon._get_lbry_file_by_sd_hash(self.val) - elif self.search_by == FileID.FILE_NAME: - return self.daemon._get_lbry_file_by_file_name(self.val) - elif self.search_by == FileID.STREAM_HASH: - return self.daemon._get_lbry_file_by_stream_hash(self.val) - raise Exception('{} is not a valid search operation'.format(self.search_by)) - - def _get_json(self, lbry_file): - if lbry_file: - d = lbry_file.get_total_bytes() - d.addCallback(self._generate_reply, lbry_file) - d.addCallback(self._add_metadata, lbry_file) - return d - else: - return False - - def _generate_reply(self, size, lbry_file): - written_bytes = self._get_written_bytes(lbry_file) - code, message = self._get_status(lbry_file) - - if code == DOWNLOAD_RUNNING_CODE: - d = lbry_file.status() - d.addCallback(self._get_msg_for_file_status) - d.addCallback( - lambda msg: self._get_properties_dict(lbry_file, code, msg, written_bytes, size)) - else: - d = defer.succeed( - self._get_properties_dict(lbry_file, code, message, written_bytes, size)) - return d - - def _get_msg_for_file_status(self, file_status): - message = STREAM_STAGES[2][1] % ( - file_status.name, file_status.num_completed, file_status.num_known, - file_status.running_status) - return defer.succeed(message) - - def _get_key(self, lbry_file): - return binascii.b2a_hex(lbry_file.key) if lbry_file.key else None - - def _full_path(self, lbry_file): - return os.path.join(lbry_file.download_directory, lbry_file.file_name) - - def _get_status(self, lbry_file): - if self.search_by == FileID.NAME: - if self.val in self.daemon.streams.keys(): - status = self.daemon.streams[self.val].code - elif lbry_file in self.daemon.lbry_file_manager.lbry_files: - status = STREAM_STAGES[2] - else: - status = [False, False] - else: - status = [False, False] - return status - - def _get_written_bytes(self, lbry_file): - full_path = self._full_path(lbry_file) - if os.path.isfile(full_path): - with open(full_path) as written_file: - written_file.seek(0, os.SEEK_END) - written_bytes = written_file.tell() - else: - written_bytes = False - return written_bytes - - def _get_properties_dict(self, lbry_file, code, message, written_bytes, size): - key = self._get_key(lbry_file) - full_path = self._full_path(lbry_file) - mime_type = mimetypes.guess_type(full_path)[0] - return { - 'completed': lbry_file.completed, - 'file_name': lbry_file.file_name, - 'download_directory': lbry_file.download_directory, - 'points_paid': lbry_file.points_paid, - 'stopped': lbry_file.stopped, - 'stream_hash': lbry_file.stream_hash, - 'stream_name': lbry_file.stream_name, - 'suggested_file_name': lbry_file.suggested_file_name, - 'sd_hash': lbry_file.sd_hash, - 'lbry_uri': lbry_file.uri, - 'txid': lbry_file.txid, - 'claim_id': lbry_file.claim_id, - 'download_path': full_path, - 'mime_type': mime_type, - 'key': key, - 'total_bytes': size, - 'written_bytes': written_bytes, - 'code': code, - 'message': message - } - - def _add_metadata(self, message, lbry_file): - def _add_to_dict(metadata): - message['metadata'] = metadata - return defer.succeed(message) - - if lbry_file.txid: - d = self.daemon._resolve_name(lbry_file.uri) - d.addCallbacks(_add_to_dict, lambda _: _add_to_dict("Pending confirmation")) - else: - d = defer.succeed(message) - return d - - def loggly_time_string(dt): formatted_dt = dt.strftime("%Y-%m-%dT%H:%M:%S") milliseconds = str(round(dt.microsecond * (10.0 ** -5), 3)) @@ -2834,13 +2709,20 @@ def report_bug_to_slack(message, installation_id, platform_name, app_version): def get_lbry_file_search_value(search_fields): - for searchtype in (FileID.SD_HASH, FileID.NAME, FileID.FILE_NAME, FileID.STREAM_HASH): - value = search_fields.get(searchtype) - if value: + for searchtype in FileID: + value = search_fields.get(searchtype, None) + if value is not None: return searchtype, value raise NoValidSearch('{} is missing a valid search type'.format(search_fields)) +def iter_lbry_file_search_values(search_fields): + for searchtype in FileID: + value = search_fields.get(searchtype, None) + if value is not None: + yield searchtype, value + + def get_blob_payment_rate_manager(session, payment_rate_manager=None): if payment_rate_manager: rate_managers = {