From 5aecd02668c68eff15e7cb1640304a6ab3c9595e Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Fri, 15 Jul 2016 11:09:20 -0500 Subject: [PATCH 1/5] Refactor jsonrpc_get. Move parameter handling into its own function and better use the `.get()` function for dictionaries. Early return on the failed checks is more readable. The lambda function in the callback was long and hard to read so moved it out. --- .pylintrc | 2 +- lbrynet/lbrynet_daemon/LBRYDaemon.py | 107 +++++++++++++++------------ 2 files changed, 60 insertions(+), 49 deletions(-) diff --git a/.pylintrc b/.pylintrc index 5e5aab35c..e49732469 100644 --- a/.pylintrc +++ b/.pylintrc @@ -298,7 +298,7 @@ ignored-classes=twisted.internet,RequestMessage # List of members which are set dynamically and missed by pylint inference # system, and so shouldn't trigger E1101 when accessed. Python regular # expressions are accepted. -generated-members= +generated-members=lbrynet.lbrynet_daemon.LBRYDaemon.Parameters [IMPORTS] diff --git a/lbrynet/lbrynet_daemon/LBRYDaemon.py b/lbrynet/lbrynet_daemon/LBRYDaemon.py index 8e2c98cc5..c5280bfd2 100644 --- a/lbrynet/lbrynet_daemon/LBRYDaemon.py +++ b/lbrynet/lbrynet_daemon/LBRYDaemon.py @@ -138,6 +138,11 @@ OK_CODE = 200 REMOTE_SERVER = "www.google.com" +class Parameters(object): + def __init__(self, **kwargs): + self.__dict__.update(kwargs) + + class LBRYDaemon(jsonrpc.JSONRPC): """ LBRYnet daemon, a jsonrpc interface to lbry functions @@ -1651,65 +1656,53 @@ class LBRYDaemon(jsonrpc.JSONRPC): d.addCallbacks(lambda info: self._render_response(info, OK_CODE), lambda _: server.failure) return d + def _process_get_parameters(self, p): + """Extract info from input parameters and fill in default values for `get` call.""" + # TODO: this process can be abstracted s.t. each method + # can spec what parameters it expects and how to set default values + timeout = p.get('timeout', self.download_timeout) + download_directory = p.get('download_directory', self.download_directory) + file_name = p.get('file_name') + stream_info = p.get('stream_info') + sd_hash = get_sd_hash(stream_info) + wait_for_write = p.get('wait_for_write', True) + name = p.get('name') + return Parameters( + timout=timeout, + download_directory=download_directory, + file_name=file_name, + stream_info=stream_info, + sd_hash=sd_hash, + wait_for_write=wait_for_write, + name=name + ) + def jsonrpc_get(self, p): - """ - Download stream from a LBRY uri + """Download stream from a LBRY uri. Args: 'name': name to download, string 'download_directory': optional, path to directory where file will be saved, string 'file_name': optional, a user specified name for the downloaded file 'stream_info': optional, specified stream info overrides name + 'timout': optional Returns: 'stream_hash': hex string 'path': path of download """ - - if 'timeout' not in p.keys(): - timeout = self.download_timeout - else: - timeout = p['timeout'] - - if 'download_directory' not in p.keys(): - download_directory = self.download_directory - else: - download_directory = p['download_directory'] - - if 'file_name' in p.keys(): - file_name = p['file_name'] - else: - file_name = None - - if 'stream_info' in p.keys(): - stream_info = p['stream_info'] - if 'sources' in stream_info.keys(): - sd_hash = stream_info['sources']['lbry_sd_hash'] - else: - sd_hash = stream_info['stream_hash'] - else: - stream_info = None - - if 'wait_for_write' in p.keys(): - wait_for_write = p['wait_for_write'] - else: - wait_for_write = True - - if 'name' in p.keys(): - name = p['name'] - if p['name'] not in self.waiting_on.keys(): - d = self._download_name(name=name, timeout=timeout, download_directory=download_directory, - stream_info=stream_info, file_name=file_name, wait_for_write=wait_for_write) - d.addCallback(lambda l: {'stream_hash': sd_hash, - 'path': os.path.join(self.download_directory, l.file_name)} - if stream_info else - {'stream_hash': l.sd_hash, - 'path': os.path.join(self.download_directory, l.file_name)}) - d.addCallback(lambda message: self._render_response(message, OK_CODE)) - else: - d = server.failure - else: - d = server.failure - + params = self._process_get_parameters(p) + if not params.name: + return server.failure + if params.name in self.waiting_on: + return server.failure + d = self._download_name(name=params.name, + timeout=params.timeout, + download_directory=params.download_directory, + stream_info=params.stream_info, + file_name=params.file_name, + wait_for_write=params.wait_for_write) + d.addCallback(get_output_callback(params)) + d.addCallback(lambda message: self._render_response(message, OK_CODE)) return d def jsonrpc_stop_lbry_file(self, p): @@ -2261,3 +2254,21 @@ class LBRYDaemon(jsonrpc.JSONRPC): d.addCallback(lambda _: self._render_response(True, OK_CODE)) return d + + +def get_sd_hash(stream_info): + if not stream_info: + return None + try: + return stream_info['sources']['lbry_sd_hash'] + except KeyError: + return stream_info.get('stream_hash') + + +def get_output_callback(params): + def callback(l): + return { + 'stream_hash': params.sd_hash if params.stream_info else l.sd_hash, + 'path': os.path.join(params.download_directory, l.file_name) + } + return callback From 563896b126564d6e249cea3486cc193e52db0fb2 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Fri, 15 Jul 2016 11:33:38 -0500 Subject: [PATCH 2/5] fix bug in reveal code --- lbrynet/lbrynet_daemon/LBRYDaemon.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/lbrynet_daemon/LBRYDaemon.py b/lbrynet/lbrynet_daemon/LBRYDaemon.py index c5280bfd2..6fa942b22 100644 --- a/lbrynet/lbrynet_daemon/LBRYDaemon.py +++ b/lbrynet/lbrynet_daemon/LBRYDaemon.py @@ -2249,7 +2249,7 @@ class LBRYDaemon(jsonrpc.JSONRPC): d = threads.deferToThread(subprocess.Popen, ['open', '-R', path]) else: # No easy way to reveal specific files on Linux, so just open the containing directory - d = threads.deferToThread(subprocess.Popen, ['xdg-open', os.dirname(path)]) + d = threads.deferToThread(subprocess.Popen, ['xdg-open', os.path.dirname(path)]) d.addCallback(lambda _: self._render_response(True, OK_CODE)) From a90029ec50bbb94ec4625a4d9b57f367edd1b7b4 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Fri, 15 Jul 2016 11:37:04 -0500 Subject: [PATCH 3/5] update doc string --- lbrynet/lbrynet_daemon/LBRYDaemon.py | 1 + 1 file changed, 1 insertion(+) diff --git a/lbrynet/lbrynet_daemon/LBRYDaemon.py b/lbrynet/lbrynet_daemon/LBRYDaemon.py index 6fa942b22..0c2ce0303 100644 --- a/lbrynet/lbrynet_daemon/LBRYDaemon.py +++ b/lbrynet/lbrynet_daemon/LBRYDaemon.py @@ -1686,6 +1686,7 @@ class LBRYDaemon(jsonrpc.JSONRPC): 'file_name': optional, a user specified name for the downloaded file 'stream_info': optional, specified stream info overrides name 'timout': optional + 'wait_for_write': optional, defaults to True Returns: 'stream_hash': hex string 'path': path of download From a15d7ca54311f3248427bcf3a1b4eb42d1fc889a Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Fri, 15 Jul 2016 12:42:26 -0500 Subject: [PATCH 4/5] Refactor _download_name Nested functions are the devil, especially ones that use variables from the outer scope. Refactoring _download_name to use a helper class helps make the scoping more explicit and will undoubtably prevent bugs in the future. I think this makes _download_name drastically more readable. Also cleaned up some duplicated code and made download_directory respect the passed in parameter instead of being the default. --- lbrynet/lbrynet_daemon/LBRYDaemon.py | 189 +++++++++++++++------------ 1 file changed, 109 insertions(+), 80 deletions(-) diff --git a/lbrynet/lbrynet_daemon/LBRYDaemon.py b/lbrynet/lbrynet_daemon/LBRYDaemon.py index 0c2ce0303..fad2583bd 100644 --- a/lbrynet/lbrynet_daemon/LBRYDaemon.py +++ b/lbrynet/lbrynet_daemon/LBRYDaemon.py @@ -999,97 +999,37 @@ class LBRYDaemon(jsonrpc.JSONRPC): return defer.succeed(True) def _download_name(self, name, timeout=DEFAULT_TIMEOUT, download_directory=None, - file_name=None, stream_info=None, wait_for_write=True): + file_name=None, stream_info=None, wait_for_write=True): """ Add a lbry file to the file manager, start the download, and return the new lbry file. If it already exists in the file manager, return the existing lbry file """ - - if not download_directory: - download_directory = self.download_directory - elif not os.path.isdir(download_directory): - download_directory = self.download_directory - - def _remove_from_wait(r): - del self.waiting_on[name] - return r - - def _setup_stream(stream_info): - if 'sources' in stream_info.keys(): - stream_hash = stream_info['sources']['lbry_sd_hash'] - else: - stream_hash = stream_info['stream_hash'] - - d = self._get_lbry_file_by_sd_hash(stream_hash) - def _add_results(l): - if l: - if os.path.isfile(os.path.join(self.download_directory, l.file_name)): - return defer.succeed((stream_info, l)) - return defer.succeed((stream_info, None)) - d.addCallback(_add_results) - return d - - def _wait_on_lbry_file(f): - if os.path.isfile(os.path.join(self.download_directory, f.file_name)): - written_file = file(os.path.join(self.download_directory, f.file_name)) - written_file.seek(0, os.SEEK_END) - written_bytes = written_file.tell() - written_file.close() - else: - written_bytes = False - - if not written_bytes: - d = defer.succeed(None) - d.addCallback(lambda _: reactor.callLater(1, _wait_on_lbry_file, f)) - return d - else: - return defer.succeed(_disp_file(f)) - - def _disp_file(f): - file_path = os.path.join(self.download_directory, f.file_name) - log.info("[" + str(datetime.now()) + "] Already downloaded: " + str(f.sd_hash) + " --> " + file_path) - return f - - def _get_stream(stream_info): - def _wait_for_write(): - try: - if os.path.isfile(os.path.join(self.download_directory, self.streams[name].downloader.file_name)): - written_file = file(os.path.join(self.download_directory, self.streams[name].downloader.file_name)) - written_file.seek(0, os.SEEK_END) - written_bytes = written_file.tell() - written_file.close() - else: - written_bytes = False - except: - written_bytes = False - - if not written_bytes: - d = defer.succeed(None) - d.addCallback(lambda _: reactor.callLater(1, _wait_for_write)) - return d - else: - return defer.succeed(None) - - self.streams[name] = GetStream(self.sd_identifier, self.session, self.session.wallet, - self.lbry_file_manager, max_key_fee=self.max_key_fee, - data_rate=self.data_rate, timeout=timeout, - download_directory=download_directory, file_name=file_name) - d = self.streams[name].start(stream_info, name) - if wait_for_write: - d.addCallback(lambda _: _wait_for_write()) - d.addCallback(lambda _: self.streams[name].downloader) - - return d + helper = _DownloadNameHelper( + self, name, timeout, download_directory, file_name, wait_for_write) if not stream_info: self.waiting_on[name] = True d = self._resolve_name(name) else: d = defer.succeed(stream_info) - d.addCallback(_setup_stream) - d.addCallback(lambda (stream_info, lbry_file): _get_stream(stream_info) if not lbry_file else _wait_on_lbry_file(lbry_file)) + d.addCallback(helper._setup_stream) + d.addCallback(helper.wait_or_get_stream) if not stream_info: - d.addCallback(_remove_from_wait) + d.addCallback(helper._remove_from_wait) + return d + + def add_stream(self, name, timeout, download_directory, file_name, stream_info): + """Makes, adds and starts a stream""" + self.streams[name] = GetStream(self.sd_identifier, + self.session, + self.session.wallet, + self.lbry_file_manager, + max_key_fee=self.max_key_fee, + data_rate=self.data_rate, + timeout=timeout, + download_directory=download_directory, + file_name=file_name) + d = self.streams[name].start(stream_info, name) return d def _get_long_count_timestamp(self): @@ -2273,3 +2213,92 @@ def get_output_callback(params): 'path': os.path.join(params.download_directory, l.file_name) } return callback + + +class _DownloadNameHelper(object): + def __init__(self, daemon, name, timeout=DEFAULT_TIMEOUT, download_directory=None, + file_name=None, wait_for_write=True): + self.daemon = daemon + self.name = name + self.timeout = timeout + if not download_directory or not os.path.isdir(download_directory): + self.download_directory = daemon.download_directory + else: + self.download_directory = download_directory + self.file_name = file_name + self.wait_for_write = wait_for_write + + def _setup_stream(self, stream_info): + stream_hash = get_sd_hash(stream_info) + d = self.daemon._get_lbry_file_by_sd_hash(stream_hash) + d.addCallback(self._add_results_callback(stream_info)) + return d + + def _add_results_callback(self, stream_info): + def add_results(l): + if l: + if os.path.isfile(os.path.join(self.download_directory, l.file_name)): + return defer.succeed((stream_info, l)) + return defer.succeed((stream_info, None)) + return add_results + + def wait_or_get_stream(self, args): + stream_info, lbry_file = args + if lbry_file: + return self._get_stream(stream_info) + else: + return self._wait_on_lbry_file(lbry_file) + + def _get_stream(self, stream_info): + d = self.daemon.add_stream( + self.name, self.timeout, self.download_directory, self.file_name, stream_info) + if self.wait_for_write: + d.addCallback(lambda _: self._wait_for_write()) + d.addCallback(lambda _: self.daemon.streams[self.name].downloader) + return d + + def _wait_for_write(self): + file_name = self.daemon.streams[self.name].downloader.file_name + written_bytes = self.get_written_bytes(file_name) + d = defer.succeed(None) + if not written_bytes: + d.addCallback(lambda _: reactor.callLater(1, self._wait_for_write)) + return d + + def _wait_on_lbry_file(self, f): + written_bytes = self.get_written_bytes(f.file_name) + if not written_bytes: + d = defer.succeed(None) + d.addCallback(lambda _: reactor.callLater(1, self._wait_on_lbry_file, f)) + return d + else: + return defer.succeed(self._disp_file(f)) + + def get_written_bytes(self, file_name): + try: + file_path = os.path.join(self.download_directory, file_name) + if os.path.isfile(file_path): + written_file = file(file_path) + written_file.seek(0, os.SEEK_END) + written_bytes = written_file.tell() + written_file.close() + else: + written_bytes = False + except Exception: + writen_bytes = False + return written_bytes + + def _disp_file(self, f): + file_path = os.path.join(self.download_directory, f.file_name) + log.info("[%s] Already downloaded: %s --> %s", datetime.now(), f.sd_hash, file_path) + return f + + def _remove_from_wait(self, r): + del self.daemon.waiting_on[self.name] + return r + + + + + + From 172f275bc70a696f491d62d654304f4402be831f Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Sat, 16 Jul 2016 09:24:27 -0500 Subject: [PATCH 5/5] Refactor _resolve_name. Continue using the delegation/helper pattern for the daemon. --- lbrynet/lbrynet_daemon/LBRYDaemon.py | 86 ++++++++++++++++++---------- 1 file changed, 55 insertions(+), 31 deletions(-) diff --git a/lbrynet/lbrynet_daemon/LBRYDaemon.py b/lbrynet/lbrynet_daemon/LBRYDaemon.py index fad2583bd..828cbd65f 100644 --- a/lbrynet/lbrynet_daemon/LBRYDaemon.py +++ b/lbrynet/lbrynet_daemon/LBRYDaemon.py @@ -1042,38 +1042,16 @@ class LBRYDaemon(jsonrpc.JSONRPC): return defer.succeed(True) def _resolve_name(self, name, force_refresh=False): - def _cache_stream_info(stream_info): - def _add_txid(txid): - self.name_cache[name]['txid'] = txid - return defer.succeed(None) + """Resolves a name. Checks the cache first before going out to the blockchain. - self.name_cache[name] = {'claim_metadata': stream_info, 'timestamp': self._get_long_count_timestamp()} - d = self.session.wallet.get_txid_for_name(name) - d.addCallback(_add_txid) - d.addCallback(lambda _: self._update_claim_cache()) - d.addCallback(lambda _: self.name_cache[name]['claim_metadata']) - - return d - - if not force_refresh: - if name in self.name_cache.keys(): - if (self._get_long_count_timestamp() - self.name_cache[name]['timestamp']) < self.cache_time: - log.info("[" + str(datetime.now()) + "] Returning cached stream info for lbry://" + name) - d = defer.succeed(self.name_cache[name]['claim_metadata']) - else: - log.info("[" + str(datetime.now()) + "] Refreshing stream info for lbry://" + name) - d = self.session.wallet.get_stream_info_for_name(name) - d.addCallbacks(_cache_stream_info, lambda _: defer.fail(UnknownNameError)) - else: - log.info("[" + str(datetime.now()) + "] Resolving stream info for lbry://" + name) - d = self.session.wallet.get_stream_info_for_name(name) - d.addCallbacks(_cache_stream_info, lambda _: defer.fail(UnknownNameError)) - else: - log.info("[" + str(datetime.now()) + "] Resolving stream info for lbry://" + name) - d = self.session.wallet.get_stream_info_for_name(name) - d.addCallbacks(_cache_stream_info, lambda _: defer.fail(UnknownNameError)) - - return d + Args: + name: the lbry:// to resolve + force_refresh: if True, always go out to the blockchain to resolve. + """ + if name.startswith('lbry://'): + raise ValueError('name %s should not start with lbry://') + helper = _ResolveNameHelper(self, name, force_refresh) + return helper.get_deferred() def _delete_lbry_file(self, lbry_file, delete_file=True): d = self.lbry_file_manager.delete_lbry_file(lbry_file) @@ -2297,8 +2275,54 @@ class _DownloadNameHelper(object): del self.daemon.waiting_on[self.name] return r +class _ResolveNameHelper(object): + def __init__(self, daemon, name, force_refresh): + self.daemon = daemon + self.name = name + self.force_refresh = force_refresh + def get_deferred(self): + if self.need_fresh_stream(): + log.info("Resolving stream info for lbry://%s", self.name) + d = self.wallet.get_stream_info_for_name(self.name) + d.addCallbacks(self._cache_stream_info, lambda _: defer.fail(UnknownNameError)) + else: + log.info("Returning cached stream info for lbry://%s", self.name) + d = defer.succeed(self.name_data['claim_metadata']) + return d + @property + def name_data(self): + return self.daemon.name_cache[self.name] + @property + def wallet(self): + return self.daemon.session.wallet + def now(self): + return self.daemon._get_long_count_timestamp() + def _add_txid(self, txid): + self.name_data['txid'] = txid + return defer.succeed(None) + + def _cache_stream_info(self, stream_info): + self.daemon.name_cache[self.name] = { + 'claim_metadata': stream_info, + 'timestamp': self.now() + } + d = self.wallet.get_txid_for_name(self.name) + d.addCallback(self._add_txid) + d.addCallback(lambda _: self.daemon._update_claim_cache()) + d.addCallback(lambda _: self.name_data['claim_metadata']) + return d + + def need_fresh_stream(self): + return self.force_refresh or not self.is_in_cache() or self.is_cached_name_expired() + + def is_in_cache(self): + return self.name in self.daemon.name_cache + + def is_cached_name_expired(self): + time_in_cache = self.now() - self.name_data['timestamp'] + return time_in_cache >= self.daemon.cache_time