From 6215d855811a71ed309f4ae3b521d78211976e4d Mon Sep 17 00:00:00 2001 From: Jack Date: Tue, 2 Aug 2016 18:34:48 -0400 Subject: [PATCH 1/8] add LighthouseClient, announce sd hash after publishing --- lbrynet/lbrynet_daemon/LBRYDaemon.py | 6 ++++-- lbrynet/lbrynet_daemon/Lighthouse.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 2 deletions(-) create mode 100644 lbrynet/lbrynet_daemon/Lighthouse.py diff --git a/lbrynet/lbrynet_daemon/LBRYDaemon.py b/lbrynet/lbrynet_daemon/LBRYDaemon.py index 58a11c69d..13e417c9d 100644 --- a/lbrynet/lbrynet_daemon/LBRYDaemon.py +++ b/lbrynet/lbrynet_daemon/LBRYDaemon.py @@ -41,6 +41,7 @@ from lbrynet.lbrynet_daemon.LBRYUIManager import LBRYUIManager from lbrynet.lbrynet_daemon.LBRYDownloader import GetStream from lbrynet.lbrynet_daemon.LBRYPublisher import Publisher from lbrynet.lbrynet_daemon.LBRYExchangeRateManager import ExchangeRateManager +from lbrynet.lbrynet_daemon.Lighthouse import LighthouseClient from lbrynet.core import utils from lbrynet.core.LBRYMetadata import verify_name_characters from lbrynet.core.utils import generate_id @@ -160,6 +161,7 @@ class LBRYDaemon(jsonrpc.JSONRPC): self.run_server = True self.session = None self.exchange_rate_manager = ExchangeRateManager() + self.lighthouse_client = LighthouseClient() self.waiting_on = {} self.streams = {} self.pending_claims = {} @@ -625,6 +627,7 @@ class LBRYDaemon(jsonrpc.JSONRPC): # TODO: this was blatantly copied from jsonrpc_start_lbry_file. Be DRY. def _start_file(f): d = self.lbry_file_manager.toggle_lbry_file_running(f) + d.addCallback(lambda _: self.lighthouse_client.announce_sd(f.sd_hash)) return defer.succeed("Started LBRY file") def _get_and_start_file(name): @@ -1444,8 +1447,7 @@ class LBRYDaemon(jsonrpc.JSONRPC): return defer.succeed(None) def _search(self, search): - proxy = Proxy(random.choice(SEARCH_SERVERS)) - return proxy.callRemote('search', search) + return self.lighthouse_client.search(search) def _render_response(self, result, code): return defer.succeed({'result': result, 'code': code}) diff --git a/lbrynet/lbrynet_daemon/Lighthouse.py b/lbrynet/lbrynet_daemon/Lighthouse.py new file mode 100644 index 000000000..245c0ae36 --- /dev/null +++ b/lbrynet/lbrynet_daemon/Lighthouse.py @@ -0,0 +1,28 @@ +import logging +import random +from txjsonrpc.web.jsonrpc import Proxy +from twisted.internet import defer +from lbrynet.conf import SEARCH_SERVERS + +log = logging.getLogger(__name__) + + +class LighthouseClient(object): + def __init__(self): + self.servers = SEARCH_SERVERS + + def _get_random_server(self): + return Proxy(random.choice(self.servers)) + + def _run_query(self, func, arg): + return self._get_random_server().callRemote(func, arg) + + def search(self, search): + return self._run_query('search', search) + + def announce_sd(self, sd_hash): + log.info("Announce sd to lighthouse") + return self._run_query('announce_sd', sd_hash) + + def check_available(self, sd_hash): + return self._run_query('check_available', sd_hash) From 3af8b7bd0ce518cead7d4aa56300bdaf223f66ac Mon Sep 17 00:00:00 2001 From: Jack Date: Wed, 3 Aug 2016 01:27:40 -0400 Subject: [PATCH 2/8] use upnp redirects if they're already set --- lbrynet/core/Session.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lbrynet/core/Session.py b/lbrynet/core/Session.py index c8ff3f290..ee8ae26c2 100644 --- a/lbrynet/core/Session.py +++ b/lbrynet/core/Session.py @@ -174,6 +174,7 @@ class LBRYSession(object): log.info("Set UPnP redirect for TCP port %d", self.peer_port) else: log.warning("UPnP redirect already set for TCP port %d", self.peer_port) + self.upnp_redirects.append((self.peer_port, 'TCP')) if self.dht_node_port is not None: if u.getspecificportmapping(self.dht_node_port, 'UDP') is None: u.addportmapping(self.dht_node_port, 'UDP', u.lanaddr, self.dht_node_port, 'LBRY DHT port', '') @@ -181,6 +182,7 @@ class LBRYSession(object): log.info("Set UPnP redirect for UPD port %d", self.dht_node_port) else: log.warning("UPnP redirect already set for UDP port %d", self.dht_node_port) + self.upnp_redirects.append((self.dht_node_port, 'UDP')) return True return False From 85d610bcdfdd0d2bd2968b9c8d22943fe1eaf604 Mon Sep 17 00:00:00 2001 From: Jack Date: Wed, 3 Aug 2016 03:16:06 -0400 Subject: [PATCH 3/8] add get_peers_for_hash --- lbrynet/lbrynet_daemon/LBRYDaemon.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/lbrynet/lbrynet_daemon/LBRYDaemon.py b/lbrynet/lbrynet_daemon/LBRYDaemon.py index 13e417c9d..f3b5cc658 100644 --- a/lbrynet/lbrynet_daemon/LBRYDaemon.py +++ b/lbrynet/lbrynet_daemon/LBRYDaemon.py @@ -2403,6 +2403,23 @@ class LBRYDaemon(jsonrpc.JSONRPC): d.addCallback(lambda _: self._render_response(True, OK_CODE)) return d + def jsonrpc_get_peers_for_hash(self, p): + """ + Get peers for blob hash + + Args: + 'blob_hash': blob hash + Returns: + List of contacts + """ + + blob_hash = p['blob_hash'] + + d = self.session.peer_finder.find_peers_for_blob(blob_hash) + d.addCallback(lambda r: [[c.host, c.port, c.is_available()] for c in r]) + d.addCallback(lambda r: self._render_response(r, OK_CODE)) + return d + def get_lbrynet_version_from_github(): """Return the latest released version from github.""" From 9fedf14e55d9e3e5ba1db47e2b03c511403307a4 Mon Sep 17 00:00:00 2001 From: Jack Date: Wed, 3 Aug 2016 12:46:46 -0400 Subject: [PATCH 4/8] request.notifyFinish --- lbrynet/lbrynet_daemon/LBRYDaemon.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lbrynet/lbrynet_daemon/LBRYDaemon.py b/lbrynet/lbrynet_daemon/LBRYDaemon.py index f3b5cc658..801736c16 100644 --- a/lbrynet/lbrynet_daemon/LBRYDaemon.py +++ b/lbrynet/lbrynet_daemon/LBRYDaemon.py @@ -377,6 +377,9 @@ class LBRYDaemon(jsonrpc.JSONRPC): f.write("rpcpassword=" + password) log.info("Done writing lbrycrd.conf") + def _responseFailed(self, err, call): + call.cancel() + def render(self, request): request.content.seek(0, 0) # Unmarshal the JSON-RPC data. @@ -416,6 +419,7 @@ class LBRYDaemon(jsonrpc.JSONRPC): d = defer.maybeDeferred(function) else: d = defer.maybeDeferred(function, *args) + request.notifyFinish().addErrback(self._responseFailed, d) d.addErrback(self._ebRender, id) d.addCallback(self._cbRender, request, id, version) return server.NOT_DONE_YET @@ -440,6 +444,7 @@ class LBRYDaemon(jsonrpc.JSONRPC): except: f = jsonrpclib.Fault(self.FAILURE, "can't serialize output") s = jsonrpclib.dumps(f, version=version) + request.setHeader("content-length", str(len(s))) request.write(s) request.finish() From e8d04b19215ded304615eca87ba3fac745dedde6 Mon Sep 17 00:00:00 2001 From: Jack Date: Wed, 3 Aug 2016 17:44:32 -0400 Subject: [PATCH 5/8] specify a lighthouse server --- lbrynet/lbrynet_daemon/Lighthouse.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lbrynet/lbrynet_daemon/Lighthouse.py b/lbrynet/lbrynet_daemon/Lighthouse.py index 245c0ae36..d12d87841 100644 --- a/lbrynet/lbrynet_daemon/Lighthouse.py +++ b/lbrynet/lbrynet_daemon/Lighthouse.py @@ -2,14 +2,14 @@ import logging import random from txjsonrpc.web.jsonrpc import Proxy from twisted.internet import defer -from lbrynet.conf import SEARCH_SERVERS +from lbrynet.conf import SEARCH_SERVERS log = logging.getLogger(__name__) class LighthouseClient(object): - def __init__(self): - self.servers = SEARCH_SERVERS + def __init__(self, servers=None): + self.servers = servers or SEARCH_SERVERS def _get_random_server(self): return Proxy(random.choice(self.servers)) From 64e9ce2bfaf21621bba4b3e94f01f400acbf7225 Mon Sep 17 00:00:00 2001 From: Jack Date: Wed, 3 Aug 2016 17:44:41 -0400 Subject: [PATCH 6/8] comments about the upnp redirects --- lbrynet/core/Session.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/lbrynet/core/Session.py b/lbrynet/core/Session.py index ee8ae26c2..a53df0cc1 100644 --- a/lbrynet/core/Session.py +++ b/lbrynet/core/Session.py @@ -173,6 +173,7 @@ class LBRYSession(object): self.upnp_redirects.append((self.peer_port, 'TCP')) log.info("Set UPnP redirect for TCP port %d", self.peer_port) else: + # see comment below log.warning("UPnP redirect already set for TCP port %d", self.peer_port) self.upnp_redirects.append((self.peer_port, 'TCP')) if self.dht_node_port is not None: @@ -181,6 +182,12 @@ class LBRYSession(object): self.upnp_redirects.append((self.dht_node_port, 'UDP')) log.info("Set UPnP redirect for UPD port %d", self.dht_node_port) else: + # TODO: check that the existing redirect was put up by an old lbrynet session before grabbing it + # if such a disconnected redirect exists, then upnp won't work unless the redirect is appended + # or is torn down and set back up. a bad shutdown of lbrynet could leave such a redirect up + # and cause problems on the next start. + # this could be problematic if a previous lbrynet session didn't make the redirect, and it was + # made by another application log.warning("UPnP redirect already set for UDP port %d", self.dht_node_port) self.upnp_redirects.append((self.dht_node_port, 'UDP')) return True From fc50a3242f434bfe973e6f2e83e93045c526eda3 Mon Sep 17 00:00:00 2001 From: Jack Date: Wed, 3 Aug 2016 17:49:13 -0400 Subject: [PATCH 7/8] add note about request.notifyFinish --- lbrynet/lbrynet_daemon/LBRYDaemon.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lbrynet/lbrynet_daemon/LBRYDaemon.py b/lbrynet/lbrynet_daemon/LBRYDaemon.py index 801736c16..7370a33d6 100644 --- a/lbrynet/lbrynet_daemon/LBRYDaemon.py +++ b/lbrynet/lbrynet_daemon/LBRYDaemon.py @@ -419,7 +419,10 @@ class LBRYDaemon(jsonrpc.JSONRPC): d = defer.maybeDeferred(function) else: d = defer.maybeDeferred(function, *args) + + # cancel the response if the connection is broken request.notifyFinish().addErrback(self._responseFailed, d) + d.addErrback(self._ebRender, id) d.addCallback(self._cbRender, request, id, version) return server.NOT_DONE_YET From f68fd9655544264bf40807b186b4ddd61f1ebb00 Mon Sep 17 00:00:00 2001 From: Jack Date: Wed, 3 Aug 2016 17:57:26 -0400 Subject: [PATCH 8/8] break out sd timeout into a variable --- lbrynet/conf.py | 1 + lbrynet/lbrynet_daemon/LBRYDaemon.py | 9 +++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/lbrynet/conf.py b/lbrynet/conf.py index 96e8b18dc..466c477ea 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -48,6 +48,7 @@ DEFAULT_TIMEOUT = 30 DEFAULT_MAX_SEARCH_RESULTS = 25 DEFAULT_MAX_KEY_FEE = {'USD': {'amount': 25.0, 'address': ''}} DEFAULT_SEARCH_TIMEOUT = 3.0 +DEFAULT_SD_DOWNLOAD_TIMEOUT = 3 DEFAULT_CACHE_TIME = 3600 DEFAULT_UI_BRANCH = "master" diff --git a/lbrynet/lbrynet_daemon/LBRYDaemon.py b/lbrynet/lbrynet_daemon/LBRYDaemon.py index 7370a33d6..40f75156f 100644 --- a/lbrynet/lbrynet_daemon/LBRYDaemon.py +++ b/lbrynet/lbrynet_daemon/LBRYDaemon.py @@ -48,7 +48,7 @@ from lbrynet.core.utils import generate_id from lbrynet.lbrynet_console.LBRYSettings import LBRYSettings from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE, DEFAULT_MAX_SEARCH_RESULTS, KNOWN_DHT_NODES, DEFAULT_MAX_KEY_FEE, \ DEFAULT_WALLET, DEFAULT_SEARCH_TIMEOUT, DEFAULT_CACHE_TIME, DEFAULT_UI_BRANCH, LOG_POST_URL, LOG_FILE_NAME, SOURCE_TYPES -from lbrynet.conf import SEARCH_SERVERS +from lbrynet.conf import DEFAULT_SD_DOWNLOAD_TIMEOUT from lbrynet.conf import DEFAULT_TIMEOUT, WALLET_TYPES from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob, BlobStreamDescriptorReader from lbrynet.core.Session import LBRYSession @@ -1083,7 +1083,7 @@ class LBRYDaemon(jsonrpc.JSONRPC): self.sd_identifier.add_stream_downloader_factory(LBRYFileStreamType, downloader_factory) return defer.succeed(True) - def _download_sd_blob(self, sd_hash): + def _download_sd_blob(self, sd_hash, timeout=DEFAULT_SD_DOWNLOAD_TIMEOUT): def cb(result): if not r.called: r.callback(result) @@ -1093,7 +1093,7 @@ class LBRYDaemon(jsonrpc.JSONRPC): r.errback(Exception("sd timeout")) r = defer.Deferred(None) - reactor.callLater(3, eb) + reactor.callLater(timeout, eb) d = download_sd_blob(self.session, sd_hash, PaymentRateManager(self.session.base_payment_rate_manager)) d.addCallback(BlobStreamDescriptorReader) d.addCallback(lambda blob: blob.get_info()) @@ -2260,8 +2260,9 @@ class LBRYDaemon(jsonrpc.JSONRPC): sd blob, dict """ sd_hash = p['sd_hash'] + timeout = p.get('timeout', DEFAULT_SD_DOWNLOAD_TIMEOUT) - d = self._download_sd_blob(sd_hash) + d = self._download_sd_blob(sd_hash, timeout) d.addCallbacks(lambda r: self._render_response(r, OK_CODE), lambda _: self._render_response(False, OK_CODE)) return d