diff --git a/lbrynet/conf.py b/lbrynet/conf.py index 96e8b18dc..466c477ea 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -48,6 +48,7 @@ DEFAULT_TIMEOUT = 30 DEFAULT_MAX_SEARCH_RESULTS = 25 DEFAULT_MAX_KEY_FEE = {'USD': {'amount': 25.0, 'address': ''}} DEFAULT_SEARCH_TIMEOUT = 3.0 +DEFAULT_SD_DOWNLOAD_TIMEOUT = 3 DEFAULT_CACHE_TIME = 3600 DEFAULT_UI_BRANCH = "master" diff --git a/lbrynet/core/Session.py b/lbrynet/core/Session.py index c8ff3f290..a53df0cc1 100644 --- a/lbrynet/core/Session.py +++ b/lbrynet/core/Session.py @@ -173,14 +173,23 @@ class LBRYSession(object): self.upnp_redirects.append((self.peer_port, 'TCP')) log.info("Set UPnP redirect for TCP port %d", self.peer_port) else: + # see comment below log.warning("UPnP redirect already set for TCP port %d", self.peer_port) + self.upnp_redirects.append((self.peer_port, 'TCP')) if self.dht_node_port is not None: if u.getspecificportmapping(self.dht_node_port, 'UDP') is None: u.addportmapping(self.dht_node_port, 'UDP', u.lanaddr, self.dht_node_port, 'LBRY DHT port', '') self.upnp_redirects.append((self.dht_node_port, 'UDP')) log.info("Set UPnP redirect for UPD port %d", self.dht_node_port) else: + # TODO: check that the existing redirect was put up by an old lbrynet session before grabbing it + # if such a disconnected redirect exists, then upnp won't work unless the redirect is appended + # or is torn down and set back up. a bad shutdown of lbrynet could leave such a redirect up + # and cause problems on the next start. + # this could be problematic if a previous lbrynet session didn't make the redirect, and it was + # made by another application log.warning("UPnP redirect already set for UDP port %d", self.dht_node_port) + self.upnp_redirects.append((self.dht_node_port, 'UDP')) return True return False diff --git a/lbrynet/lbrynet_daemon/LBRYDaemon.py b/lbrynet/lbrynet_daemon/LBRYDaemon.py index 58a11c69d..40f75156f 100644 --- a/lbrynet/lbrynet_daemon/LBRYDaemon.py +++ b/lbrynet/lbrynet_daemon/LBRYDaemon.py @@ -41,13 +41,14 @@ from lbrynet.lbrynet_daemon.LBRYUIManager import LBRYUIManager from lbrynet.lbrynet_daemon.LBRYDownloader import GetStream from lbrynet.lbrynet_daemon.LBRYPublisher import Publisher from lbrynet.lbrynet_daemon.LBRYExchangeRateManager import ExchangeRateManager +from lbrynet.lbrynet_daemon.Lighthouse import LighthouseClient from lbrynet.core import utils from lbrynet.core.LBRYMetadata import verify_name_characters from lbrynet.core.utils import generate_id from lbrynet.lbrynet_console.LBRYSettings import LBRYSettings from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE, DEFAULT_MAX_SEARCH_RESULTS, KNOWN_DHT_NODES, DEFAULT_MAX_KEY_FEE, \ DEFAULT_WALLET, DEFAULT_SEARCH_TIMEOUT, DEFAULT_CACHE_TIME, DEFAULT_UI_BRANCH, LOG_POST_URL, LOG_FILE_NAME, SOURCE_TYPES -from lbrynet.conf import SEARCH_SERVERS +from lbrynet.conf import DEFAULT_SD_DOWNLOAD_TIMEOUT from lbrynet.conf import DEFAULT_TIMEOUT, WALLET_TYPES from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob, BlobStreamDescriptorReader from lbrynet.core.Session import LBRYSession @@ -160,6 +161,7 @@ class LBRYDaemon(jsonrpc.JSONRPC): self.run_server = True self.session = None self.exchange_rate_manager = ExchangeRateManager() + self.lighthouse_client = LighthouseClient() self.waiting_on = {} self.streams = {} self.pending_claims = {} @@ -375,6 +377,9 @@ class LBRYDaemon(jsonrpc.JSONRPC): f.write("rpcpassword=" + password) log.info("Done writing lbrycrd.conf") + def _responseFailed(self, err, call): + call.cancel() + def render(self, request): request.content.seek(0, 0) # Unmarshal the JSON-RPC data. @@ -414,6 +419,10 @@ class LBRYDaemon(jsonrpc.JSONRPC): d = defer.maybeDeferred(function) else: d = defer.maybeDeferred(function, *args) + + # cancel the response if the connection is broken + request.notifyFinish().addErrback(self._responseFailed, d) + d.addErrback(self._ebRender, id) d.addCallback(self._cbRender, request, id, version) return server.NOT_DONE_YET @@ -438,6 +447,7 @@ class LBRYDaemon(jsonrpc.JSONRPC): except: f = jsonrpclib.Fault(self.FAILURE, "can't serialize output") s = jsonrpclib.dumps(f, version=version) + request.setHeader("content-length", str(len(s))) request.write(s) request.finish() @@ -625,6 +635,7 @@ class LBRYDaemon(jsonrpc.JSONRPC): # TODO: this was blatantly copied from jsonrpc_start_lbry_file. Be DRY. def _start_file(f): d = self.lbry_file_manager.toggle_lbry_file_running(f) + d.addCallback(lambda _: self.lighthouse_client.announce_sd(f.sd_hash)) return defer.succeed("Started LBRY file") def _get_and_start_file(name): @@ -1072,7 +1083,7 @@ class LBRYDaemon(jsonrpc.JSONRPC): self.sd_identifier.add_stream_downloader_factory(LBRYFileStreamType, downloader_factory) return defer.succeed(True) - def _download_sd_blob(self, sd_hash): + def _download_sd_blob(self, sd_hash, timeout=DEFAULT_SD_DOWNLOAD_TIMEOUT): def cb(result): if not r.called: r.callback(result) @@ -1082,7 +1093,7 @@ class LBRYDaemon(jsonrpc.JSONRPC): r.errback(Exception("sd timeout")) r = defer.Deferred(None) - reactor.callLater(3, eb) + reactor.callLater(timeout, eb) d = download_sd_blob(self.session, sd_hash, PaymentRateManager(self.session.base_payment_rate_manager)) d.addCallback(BlobStreamDescriptorReader) d.addCallback(lambda blob: blob.get_info()) @@ -1444,8 +1455,7 @@ class LBRYDaemon(jsonrpc.JSONRPC): return defer.succeed(None) def _search(self, search): - proxy = Proxy(random.choice(SEARCH_SERVERS)) - return proxy.callRemote('search', search) + return self.lighthouse_client.search(search) def _render_response(self, result, code): return defer.succeed({'result': result, 'code': code}) @@ -2250,8 +2260,9 @@ class LBRYDaemon(jsonrpc.JSONRPC): sd blob, dict """ sd_hash = p['sd_hash'] + timeout = p.get('timeout', DEFAULT_SD_DOWNLOAD_TIMEOUT) - d = self._download_sd_blob(sd_hash) + d = self._download_sd_blob(sd_hash, timeout) d.addCallbacks(lambda r: self._render_response(r, OK_CODE), lambda _: self._render_response(False, OK_CODE)) return d @@ -2401,6 +2412,23 @@ class LBRYDaemon(jsonrpc.JSONRPC): d.addCallback(lambda _: self._render_response(True, OK_CODE)) return d + def jsonrpc_get_peers_for_hash(self, p): + """ + Get peers for blob hash + + Args: + 'blob_hash': blob hash + Returns: + List of contacts + """ + + blob_hash = p['blob_hash'] + + d = self.session.peer_finder.find_peers_for_blob(blob_hash) + d.addCallback(lambda r: [[c.host, c.port, c.is_available()] for c in r]) + d.addCallback(lambda r: self._render_response(r, OK_CODE)) + return d + def get_lbrynet_version_from_github(): """Return the latest released version from github.""" diff --git a/lbrynet/lbrynet_daemon/Lighthouse.py b/lbrynet/lbrynet_daemon/Lighthouse.py new file mode 100644 index 000000000..d12d87841 --- /dev/null +++ b/lbrynet/lbrynet_daemon/Lighthouse.py @@ -0,0 +1,28 @@ +import logging +import random +from txjsonrpc.web.jsonrpc import Proxy +from twisted.internet import defer +from lbrynet.conf import SEARCH_SERVERS + +log = logging.getLogger(__name__) + + +class LighthouseClient(object): + def __init__(self, servers=None): + self.servers = servers or SEARCH_SERVERS + + def _get_random_server(self): + return Proxy(random.choice(self.servers)) + + def _run_query(self, func, arg): + return self._get_random_server().callRemote(func, arg) + + def search(self, search): + return self._run_query('search', search) + + def announce_sd(self, sd_hash): + log.info("Announce sd to lighthouse") + return self._run_query('announce_sd', sd_hash) + + def check_available(self, sd_hash): + return self._run_query('check_available', sd_hash)