From 5d212a0f824b56937f29c95e2a1f93c4fa9ed8b3 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sun, 10 Mar 2019 21:55:33 -0400 Subject: [PATCH] time to first bytes analytics --- lbrynet/error.py | 3 +- lbrynet/extras/daemon/Components.py | 16 +- lbrynet/extras/daemon/Daemon.py | 2 +- lbrynet/extras/daemon/analytics.py | 88 +++++-- lbrynet/stream/managed_stream.py | 5 +- lbrynet/stream/stream_manager.py | 299 ++++++++++++----------- lbrynet/utils.py | 10 + tests/unit/stream/test_stream_manager.py | 48 ++-- 8 files changed, 268 insertions(+), 203 deletions(-) diff --git a/lbrynet/error.py b/lbrynet/error.py index 33e36aaef..fe7e627ed 100644 --- a/lbrynet/error.py +++ b/lbrynet/error.py @@ -28,8 +28,7 @@ class DownloadTimeoutError(Exception): class DownloadDataTimeout(Exception): def __init__(self, download): - super().__init__('Failed to download data blobs for sd hash ' - '{} within timeout'.format(download)) + super().__init__(f'Failed to download data blobs for sd hash {download} within timeout ') self.download = download diff --git a/lbrynet/extras/daemon/Components.py b/lbrynet/extras/daemon/Components.py index df67dea8c..ad803a292 100644 --- a/lbrynet/extras/daemon/Components.py +++ b/lbrynet/extras/daemon/Components.py @@ -51,16 +51,6 @@ async def gather_dict(tasks: dict): ))) -async def get_external_ip(): # used if upnp is disabled or non-functioning - try: - async with utils.aiohttp_request("get", "https://api.lbry.io/ip") as resp: - response = await resp.json() - if response['success']: - return response['data']['ip'] - except Exception as e: - pass - - class DatabaseComponent(Component): component_name = DATABASE_COMPONENT @@ -358,7 +348,7 @@ class DHTComponent(Component): external_ip = self.upnp_component.external_ip if not external_ip: log.warning("UPnP component failed to get external ip") - external_ip = await get_external_ip() + external_ip = await utils.get_external_ip() if not external_ip: log.warning("failed to get external ip") @@ -523,7 +513,7 @@ class UPnPComponent(Component): if external_ip == "0.0.0.0" or not external_ip: log.warning("unable to get external ip from UPnP, checking lbry.io fallback") - external_ip = await get_external_ip() + external_ip = await utils.get_external_ip() if self.external_ip and self.external_ip != external_ip: log.info("external ip changed from %s to %s", self.external_ip, external_ip) self.external_ip = external_ip @@ -574,7 +564,7 @@ class UPnPComponent(Component): async def start(self): log.info("detecting external ip") if not self.use_upnp: - self.external_ip = await get_external_ip() + self.external_ip = await utils.get_external_ip() return success = False await self._maintain_redirects() diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index 178f7ab83..7b43880e2 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -416,7 +416,7 @@ class Daemon(metaclass=JSONRPCServerType): self.ensure_wallet_dir() self.ensure_download_dir() if not self.analytics_manager.is_started: - self.analytics_manager.start() + await self.analytics_manager.start() self.component_startup_task = asyncio.create_task(self.component_manager.start()) await self.component_startup_task diff --git a/lbrynet/extras/daemon/analytics.py b/lbrynet/extras/daemon/analytics.py index 8a90bc6e1..cd33bd0a7 100644 --- a/lbrynet/extras/daemon/analytics.py +++ b/lbrynet/extras/daemon/analytics.py @@ -3,7 +3,6 @@ import collections import logging import aiohttp import typing -import binascii from lbrynet import utils from lbrynet.conf import Config from lbrynet.extras import system_info @@ -26,6 +25,10 @@ UPNP_SETUP = "UPnP Setup" BLOB_BYTES_UPLOADED = 'Blob Bytes Uploaded' + +TIME_TO_FIRST_BYTES = "Time To First Bytes" + + log = logging.getLogger(__name__) @@ -39,13 +42,40 @@ def _event_properties(installation_id: str, session_id: str, return properties -def _download_properties(download_id: str, name: str, sd_hash: str) -> typing.Dict: - p = { - 'download_id': download_id, +def _download_properties(conf: Config, external_ip: str, resolve_duration: float, + total_duration: typing.Optional[float], download_id: str, name: str, + outpoint: str, active_peer_count: int, tried_peers_count: int, + sd_hash: str, sd_download_duration: typing.Optional[float] = None, + head_blob_hash: typing.Optional[str] = None, + head_blob_length: typing.Optional[int] = None, + head_blob_download_duration: typing.Optional[float] = None, + error: typing.Optional[str] = None) -> typing.Dict: + return { + "external_ip": external_ip, + "download_id": download_id, + "total_duration": round(total_duration, 4), + "resolve_duration": None if not resolve_duration else round(resolve_duration, 4), + "error": error, 'name': name, - 'stream_info': sd_hash + "outpoint": outpoint, + + "node_rpc_timeout": conf.node_rpc_timeout, + "peer_connect_timeout": conf.peer_connect_timeout, + "blob_download_timeout": conf.blob_download_timeout, + "use_fixed_peers": len(conf.reflector_servers) > 0, + "fixed_peer_delay": conf.fixed_peer_delay, + "added_fixed_peers": (conf.fixed_peer_delay < total_duration) and len(conf.reflector_servers) > 0, + + "active_peer_count": active_peer_count, + "tried_peers_count": tried_peers_count, + + "sd_blob_hash": sd_hash, + "sd_blob_duration": None if not sd_download_duration else round(sd_download_duration, 4), + + "head_blob_hash": head_blob_hash, + "head_blob_length": head_blob_length, + "head_blob_duration": None if not head_blob_download_duration else round(head_blob_download_duration, 4) } - return p def _make_context(platform): @@ -71,6 +101,7 @@ def _make_context(platform): class AnalyticsManager: def __init__(self, conf: Config, installation_id: str, session_id: str): + self.conf = conf self.cookies = {} self.url = ANALYTICS_ENDPOINT self._write_key = utils.deobfuscate(ANALYTICS_TOKEN) @@ -80,19 +111,22 @@ class AnalyticsManager: self.installation_id = installation_id self.session_id = session_id self.task: asyncio.Task = None + self.external_ip: typing.Optional[str] = None @property def is_started(self): return self.task is not None - def start(self): + async def start(self): if self._enabled and self.task is None: + self.external_ip = await utils.get_external_ip() self.task = asyncio.create_task(self.run()) async def run(self): while True: await self._send_heartbeat() await asyncio.sleep(1800) + self.external_ip = await utils.get_external_ip() def stop(self): if self.task is not None and not self.task.done(): @@ -111,7 +145,7 @@ class AnalyticsManager: async with utils.aiohttp_request(**request_kwargs) as response: self.cookies.update(response.cookies) except Exception as e: - log.exception('Encountered an exception while POSTing to %s: ', self.url + '/track', exc_info=e) + log.debug('Encountered an exception while POSTing to %s: ', self.url + '/track', exc_info=e) async def track(self, event: typing.Dict): """Send a single tracking event""" @@ -136,23 +170,31 @@ class AnalyticsManager: async def send_server_startup_error(self, message): await self.track(self._event(SERVER_STARTUP_ERROR, {'message': message})) - async def send_download_started(self, download_id, name, sd_hash): - await self.track( - self._event(DOWNLOAD_STARTED, _download_properties(download_id, name, sd_hash)) - ) + async def send_time_to_first_bytes(self, resolve_duration: typing.Optional[float], + total_duration: typing.Optional[float], download_id: str, + name: str, outpoint: str, found_peers_count: int, + tried_peers_count: int, sd_hash: str, + sd_download_duration: typing.Optional[float] = None, + head_blob_hash: typing.Optional[str] = None, + head_blob_length: typing.Optional[int] = None, + head_blob_duration: typing.Optional[int] = None, + error: typing.Optional[str] = None): + await self.track(self._event(TIME_TO_FIRST_BYTES, _download_properties( + self.conf, self.external_ip, resolve_duration, total_duration, download_id, name, outpoint, + found_peers_count, tried_peers_count, sd_hash, sd_download_duration, head_blob_hash, head_blob_length, + head_blob_duration, error + ))) async def send_download_finished(self, download_id, name, sd_hash): - await self.track(self._event(DOWNLOAD_FINISHED, _download_properties(download_id, name, sd_hash))) - - async def send_download_errored(self, error: Exception, name: str): - await self.track(self._event(DOWNLOAD_ERRORED, { - 'download_id': binascii.hexlify(utils.generate_id()).decode(), - 'name': name, - 'stream_info': None, - 'error': type(error).__name__, - 'reason': str(error), - 'report': None - })) + await self.track( + self._event( + DOWNLOAD_FINISHED, { + 'download_id': download_id, + 'name': name, + 'stream_info': sd_hash + } + ) + ) async def send_claim_action(self, action): await self.track(self._event(CLAIM_ACTION, {'action': action})) diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py index 176c96deb..53f01ad00 100644 --- a/lbrynet/stream/managed_stream.py +++ b/lbrynet/stream/managed_stream.py @@ -25,7 +25,8 @@ class ManagedStream: def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', rowid: int, descriptor: 'StreamDescriptor', download_directory: str, file_name: typing.Optional[str], downloader: typing.Optional[StreamDownloader] = None, - status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredStreamClaim] = None): + status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredStreamClaim] = None, + download_id: typing.Optional[str] = None): self.loop = loop self.blob_manager = blob_manager self.rowid = rowid @@ -39,7 +40,7 @@ class ManagedStream: self.fully_reflected = asyncio.Event(loop=self.loop) self.tx = None - self.download_id = binascii.hexlify(generate_id()).decode() + self.download_id = download_id or binascii.hexlify(generate_id()).decode() @property def file_name(self) -> typing.Optional[str]: diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index 0214d1299..fa9328886 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -6,6 +6,7 @@ import logging import random from lbrynet.error import ResolveError, InvalidStreamDescriptorError, KeyFeeAboveMaxAllowed, InsufficientFundsError, \ DownloadDataTimeout, DownloadSDTimeout +from lbrynet.utils import generate_id from lbrynet.stream.descriptor import StreamDescriptor from lbrynet.stream.downloader import StreamDownloader from lbrynet.stream.managed_stream import ManagedStream @@ -275,101 +276,6 @@ class StreamManager: if delete_file and stream.output_file_exists: os.remove(stream.full_path) - def wait_for_stream_finished(self, stream: ManagedStream): - async def _wait_for_stream_finished(): - if stream.downloader and stream.running: - await stream.downloader.stream_finished_event.wait() - stream.update_status(ManagedStream.STATUS_FINISHED) - if self.analytics_manager: - self.loop.create_task(self.analytics_manager.send_download_finished( - stream.download_id, stream.claim_name, stream.sd_hash - )) - - task = self.loop.create_task(_wait_for_stream_finished()) - self.update_stream_finished_futs.append(task) - task.add_done_callback( - lambda _: None if task not in self.update_stream_finished_futs else - self.update_stream_finished_futs.remove(task) - ) - - async def _download_stream_from_claim(self, node: 'Node', download_directory: str, claim_info: typing.Dict, - file_name: typing.Optional[str] = None) -> typing.Optional[ManagedStream]: - - claim = smart_decode(claim_info['value']) - downloader = StreamDownloader(self.loop, self.config, self.blob_manager, claim.source_hash.decode(), - download_directory, file_name) - try: - downloader.download(node) - await downloader.got_descriptor.wait() - log.info("got descriptor %s for %s", claim.source_hash.decode(), claim_info['name']) - except (asyncio.TimeoutError, asyncio.CancelledError): - log.info("stream timeout") - downloader.stop() - log.info("stopped stream") - raise DownloadSDTimeout(downloader.sd_hash) - rowid = await self._store_stream(downloader) - await self.storage.save_content_claim( - downloader.descriptor.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}" - ) - stream = ManagedStream(self.loop, self.blob_manager, rowid, downloader.descriptor, download_directory, - file_name, downloader, ManagedStream.STATUS_RUNNING) - stream.set_claim(claim_info, claim) - self.streams.add(stream) - try: - await stream.downloader.wrote_bytes_event.wait() - self.wait_for_stream_finished(stream) - return stream - except asyncio.CancelledError: - downloader.stop() - log.debug("stopped stream") - await self.stop_stream(stream) - raise DownloadDataTimeout(downloader.sd_hash) - - async def _store_stream(self, downloader: StreamDownloader) -> int: - file_name = os.path.basename(downloader.output_path) - download_directory = os.path.dirname(downloader.output_path) - if not await self.storage.stream_exists(downloader.sd_hash): - await self.storage.store_stream(downloader.sd_blob, downloader.descriptor) - if not await self.storage.file_exists(downloader.sd_hash): - return await self.storage.save_downloaded_file( - downloader.descriptor.stream_hash, file_name, download_directory, - 0.0 - ) - else: - return await self.storage.rowid_for_stream(downloader.descriptor.stream_hash) - - async def download_stream_from_claim(self, node: 'Node', claim_info: typing.Dict, - file_name: typing.Optional[str] = None, - timeout: typing.Optional[float] = 60, - fee_amount: typing.Optional[float] = 0.0, - fee_address: typing.Optional[str] = None, - should_pay: typing.Optional[bool] = True) -> typing.Optional[ManagedStream]: - claim = ClaimDict.load_dict(claim_info['value']) - sd_hash = claim.source_hash.decode() - if sd_hash in self.starting_streams: - return await self.starting_streams[sd_hash] - already_started = tuple(filter(lambda s: s.descriptor.sd_hash == sd_hash, self.streams)) - if already_started: - return already_started[0] - self.starting_streams[sd_hash] = asyncio.Future(loop=self.loop) - stream_task = self.loop.create_task( - self._download_stream_from_claim(node, self.config.download_dir, claim_info, file_name) - ) - try: - await asyncio.wait_for(stream_task, timeout or self.config.download_timeout) - stream = await stream_task - self.starting_streams[sd_hash].set_result(stream) - if should_pay and fee_address and fee_amount: - stream.tx = await self.wallet.send_amount_to_address( - lbc_to_dewies(str(fee_amount)), fee_address.encode('latin1')) - return stream - except asyncio.TimeoutError as e: - if stream_task.exception(): - raise stream_task.exception() - finally: - if sd_hash in self.starting_streams: - del self.starting_streams[sd_hash] - def get_stream_by_stream_hash(self, stream_hash: str) -> typing.Optional[ManagedStream]: streams = tuple(filter(lambda stream: stream.stream_hash == stream_hash, self.streams)) if streams: @@ -411,30 +317,111 @@ class StreamManager: streams.reverse() return streams + def wait_for_stream_finished(self, stream: ManagedStream): + async def _wait_for_stream_finished(): + if stream.downloader and stream.running: + await stream.downloader.stream_finished_event.wait() + stream.update_status(ManagedStream.STATUS_FINISHED) + if self.analytics_manager: + self.loop.create_task(self.analytics_manager.send_download_finished( + stream.download_id, stream.claim_name, stream.sd_hash + )) + + task = self.loop.create_task(_wait_for_stream_finished()) + self.update_stream_finished_futs.append(task) + task.add_done_callback( + lambda _: None if task not in self.update_stream_finished_futs else + self.update_stream_finished_futs.remove(task) + ) + + async def _store_stream(self, downloader: StreamDownloader) -> int: + file_name = os.path.basename(downloader.output_path) + download_directory = os.path.dirname(downloader.output_path) + if not await self.storage.stream_exists(downloader.sd_hash): + await self.storage.store_stream(downloader.sd_blob, downloader.descriptor) + if not await self.storage.file_exists(downloader.sd_hash): + return await self.storage.save_downloaded_file( + downloader.descriptor.stream_hash, file_name, download_directory, + 0.0 + ) + else: + return await self.storage.rowid_for_stream(downloader.descriptor.stream_hash) + + async def _check_update_or_replace(self, outpoint: str, claim_id: str, claim: ClaimDict) -> typing.Tuple[ + typing.Optional[ManagedStream], typing.Optional[ManagedStream]]: + existing = self.get_filtered_streams(outpoint=outpoint) + if existing: + await self.start_stream(existing[0]) + return existing[0], None + existing = self.get_filtered_streams(sd_hash=claim.source_hash.decode()) + if existing and existing[0].claim_id != claim_id: + raise ResolveError(f"stream for {existing[0].claim_id} collides with existing " + f"download {claim_id}") + if existing: + log.info("claim contains a metadata only update to a stream we have") + await self.storage.save_content_claim( + existing[0].stream_hash, outpoint + ) + await self._update_content_claim(existing[0]) + await self.start_stream(existing[0]) + return existing[0], None + else: + existing_for_claim_id = self.get_filtered_streams(claim_id=claim_id) + if existing_for_claim_id: + log.info("claim contains an update to a stream we have, downloading it") + return None, existing_for_claim_id[0] + return None, None + + async def start_downloader(self, got_descriptor_time: asyncio.Future, downloader: StreamDownloader, + download_id: str, outpoint: str, claim: ClaimDict, resolved: typing.Dict, + file_name: typing.Optional[str] = None) -> ManagedStream: + start_time = self.loop.time() + downloader.download(self.node) + await downloader.got_descriptor.wait() + got_descriptor_time.set_result(self.loop.time() - start_time) + rowid = await self._store_stream(downloader) + await self.storage.save_content_claim( + downloader.descriptor.stream_hash, outpoint + ) + stream = ManagedStream(self.loop, self.blob_manager, rowid, downloader.descriptor, self.config.download_dir, + file_name, downloader, ManagedStream.STATUS_RUNNING, download_id=download_id) + stream.set_claim(resolved, claim) + await stream.downloader.wrote_bytes_event.wait() + self.streams.add(stream) + return stream + async def _download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager', file_name: typing.Optional[str] = None, - timeout: typing.Optional[float] = None) -> typing.Optional[ManagedStream]: - timeout = timeout or self.config.download_timeout + timeout: typing.Optional[float] = None) -> ManagedStream: + start_time = self.loop.time() parsed_uri = parse_lbry_uri(uri) if parsed_uri.is_channel: raise ResolveError("cannot download a channel claim, specify a /path") + # resolve the claim resolved = (await self.wallet.resolve(uri)).get(uri, {}) resolved = resolved if 'value' in resolved else resolved.get('claim') if not resolved: - raise ResolveError( - "Failed to resolve stream at lbry://{}".format(uri.replace("lbry://", "")) - ) + raise ResolveError(f"Failed to resolve stream at '{uri}'") if 'error' in resolved: raise ResolveError(f"error resolving stream: {resolved['error']}") claim = ClaimDict.load_dict(resolved['value']) + outpoint = f"{resolved['txid']}:{resolved['nout']}" + resolved_time = self.loop.time() - start_time + + # resume or update an existing stream, if the stream changed download it and delete the old one after + updated_stream, to_replace = await self._check_update_or_replace(outpoint, resolved['claim_id'], claim) + if updated_stream: + return updated_stream + + # check that the fee is payable fee_amount, fee_address = None, None if claim.has_fee: fee_amount = round(exchange_rate_manager.convert_currency( - claim.source_fee.currency, "LBC", claim.source_fee.amount - ), 5) + claim.source_fee.currency, "LBC", claim.source_fee.amount + ), 5) max_fee_amount = round(exchange_rate_manager.convert_currency( self.config.max_key_fee['currency'], "LBC", self.config.max_key_fee['amount'] ), 5) @@ -448,52 +435,76 @@ class StreamManager: log.warning(msg) raise InsufficientFundsError(msg) fee_address = claim.source_fee.address.decode() - outpoint = f"{resolved['txid']}:{resolved['nout']}" - existing = self.get_filtered_streams(outpoint=outpoint) - if not existing: - existing.extend(self.get_filtered_streams(sd_hash=claim.source_hash.decode())) - if existing and existing[0].claim_id != resolved['claim_id']: - raise Exception(f"stream for {existing[0].claim_id} collides with existing " - f"download {resolved['claim_id']}") - existing.extend(self.get_filtered_streams(claim_id=resolved['claim_id'])) - if existing and existing[0].sd_hash != claim.source_hash.decode(): - log.info("claim contains an update to a stream we have, downloading it") - stream = await self.download_stream_from_claim( - self.node, resolved, file_name, timeout, fee_amount, fee_address, False - ) - log.info("started new stream, deleting old one") - if self.analytics_manager: - self.loop.create_task(self.analytics_manager.send_download_started( - stream.download_id, parsed_uri.name, claim.source_hash.decode() - )) - await self.delete_stream(existing[0]) - return stream - elif existing: - log.info("already have matching stream for %s", uri) - stream = existing[0] - await self.start_stream(stream) - return stream - else: - stream = existing[0] - await self.start_stream(stream) - return stream - log.info("download stream from %s", uri) - stream = await self.download_stream_from_claim( - self.node, resolved, file_name, timeout, fee_amount, fee_address - ) + # download the stream + download_id = binascii.hexlify(generate_id()).decode() + downloader = StreamDownloader(self.loop, self.config, self.blob_manager, claim.source_hash.decode(), + self.config.download_dir, file_name) + + stream = None + descriptor_time_fut = self.loop.create_future() + start_download_time = self.loop.time() + time_to_descriptor = None + time_to_first_bytes = None + error = None + try: + stream = await asyncio.wait_for( + asyncio.ensure_future( + self.start_downloader(descriptor_time_fut, downloader, download_id, outpoint, claim, resolved, + file_name) + ), timeout + ) + time_to_descriptor = await descriptor_time_fut + time_to_first_bytes = self.loop.time() - start_download_time + self.wait_for_stream_finished(stream) + if fee_address and fee_amount and not to_replace: + stream.tx = await self.wallet.send_amount_to_address( + lbc_to_dewies(str(fee_amount)), fee_address.encode('latin1')) + elif to_replace: # delete old stream now that the replacement has started downloading + await self.delete_stream(to_replace) + except asyncio.TimeoutError: + if descriptor_time_fut.done(): + time_to_descriptor = descriptor_time_fut.result() + error = DownloadDataTimeout(downloader.descriptor.blobs[0].blob_hash) + else: + descriptor_time_fut.cancel() + error = DownloadSDTimeout(downloader.sd_hash) + if stream: + await self.stop_stream(stream) + elif downloader: + downloader.stop() + if error: + log.warning(error) if self.analytics_manager: - self.loop.create_task(self.analytics_manager.send_download_started( - stream.download_id, parsed_uri.name, claim.source_hash.decode() - )) + self.loop.create_task( + self.analytics_manager.send_time_to_first_bytes( + resolved_time, self.loop.time() - start_time, download_id, parse_lbry_uri(uri).name, outpoint, + None if not stream else len(stream.downloader.blob_downloader.active_connections), + None if not stream else len(stream.downloader.blob_downloader.scores), + claim.source_hash.decode(), time_to_descriptor, + None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].blob_hash, + None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].length, + time_to_first_bytes, None if not error else error.__class__.__name__ + ) + ) + if error: + raise error return stream async def download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager', file_name: typing.Optional[str] = None, - timeout: typing.Optional[float] = None) -> typing.Optional[ManagedStream]: + timeout: typing.Optional[float] = None) -> ManagedStream: + if uri in self.starting_streams: + return await self.starting_streams[uri] + fut = asyncio.Future(loop=self.loop) + self.starting_streams[uri] = fut try: - return await self._download_stream_from_uri(uri, exchange_rate_manager, file_name, timeout) - except Exception as e: - if self.analytics_manager: - await self.analytics_manager.send_download_errored(e, uri) - raise e + stream = await self._download_stream_from_uri(uri, exchange_rate_manager, file_name, timeout) + fut.set_result(stream) + return stream + except Exception as err: + fut.set_exception(err) + try: + return await fut + finally: + del self.starting_streams[uri] diff --git a/lbrynet/utils.py b/lbrynet/utils.py index 86d917b25..f7fc11f2e 100644 --- a/lbrynet/utils.py +++ b/lbrynet/utils.py @@ -173,3 +173,13 @@ async def aiohttp_request(method, url, **kwargs) -> typing.AsyncContextManager[a async with aiohttp.ClientSession() as session: async with session.request(method, url, ssl=get_ssl_context(), **kwargs) as response: yield response + + +async def get_external_ip() -> typing.Optional[str]: # used if upnp is disabled or non-functioning + try: + async with aiohttp_request("get", "https://api.lbry.io/ip") as resp: + response = await resp.json() + if response['success']: + return response['data']['ip'] + except Exception as e: + return diff --git a/tests/unit/stream/test_stream_manager.py b/tests/unit/stream/test_stream_manager.py index bf966c0ea..8a1ff02aa 100644 --- a/tests/unit/stream/test_stream_manager.py +++ b/tests/unit/stream/test_stream_manager.py @@ -3,11 +3,12 @@ import binascii from unittest import mock import asyncio import time -import typing +import json from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase from tests.unit.lbrynet_daemon.test_ExchangeRateManager import get_dummy_exchange_rate_manager from lbrynet.utils import generate_id -from lbrynet.error import InsufficientFundsError, KeyFeeAboveMaxAllowed, ResolveError, DownloadSDTimeout +from lbrynet.error import InsufficientFundsError, KeyFeeAboveMaxAllowed, ResolveError, DownloadSDTimeout, \ + DownloadDataTimeout from lbrynet.extras.wallet.manager import LbryWalletManager from lbrynet.extras.daemon.analytics import AnalyticsManager from lbrynet.stream.stream_manager import StreamManager @@ -100,8 +101,9 @@ class TestStreamManager(BlobExchangeTestBase): file_path = os.path.join(self.server_dir, "test_file") with open(file_path, 'wb') as f: f.write(os.urandom(20000000)) - descriptor = await StreamDescriptor.create_stream(self.loop, self.server_blob_manager.blob_dir, file_path, - old_sort=old_sort) + descriptor = await StreamDescriptor.create_stream( + self.loop, self.server_blob_manager.blob_dir, file_path, old_sort=old_sort + ) self.sd_hash = descriptor.sd_hash self.mock_wallet, self.uri = get_mock_wallet(self.sd_hash, self.client_storage, balance, fee) self.stream_manager = StreamManager(self.loop, self.client_config, self.client_blob_manager, self.mock_wallet, @@ -114,7 +116,7 @@ class TestStreamManager(BlobExchangeTestBase): async def test_download_stop_resume_delete(self): await self.setup_stream_manager() received = [] - expected_events = ['Download Started', 'Download Finished'] + expected_events = ['Time To First Bytes', 'Download Finished'] async def check_post(event): received.append(event['event']) @@ -145,7 +147,7 @@ class TestStreamManager(BlobExchangeTestBase): await self.stream_manager.start_stream(stream) await stream.downloader.stream_finished_event.wait() - await asyncio.sleep(0.01) + await asyncio.sleep(0, loop=self.loop) self.assertTrue(stream.finished) self.assertFalse(stream.running) self.assertTrue(os.path.isfile(os.path.join(self.client_dir, "test_file"))) @@ -163,17 +165,20 @@ class TestStreamManager(BlobExchangeTestBase): self.assertEqual(stored_status, None) self.assertListEqual(expected_events, received) - async def _test_download_error(self, expected_error): + async def _test_download_error_on_start(self, expected_error, timeout=None): + with self.assertRaises(expected_error): + await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager, timeout=timeout) + + async def _test_download_error_analytics_on_start(self, expected_error, timeout=None): received = [] async def check_post(event): - self.assertEqual("Download Errored", event['event']) + self.assertEqual("Time To First Bytes", event['event']) received.append(event['properties']['error']) self.stream_manager.analytics_manager._post = check_post - - with self.assertRaises(expected_error): - await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) + await self._test_download_error_on_start(expected_error, timeout) + await asyncio.sleep(0, loop=self.loop) self.assertListEqual([expected_error.__name__], received) async def test_insufficient_funds(self): @@ -184,7 +189,7 @@ class TestStreamManager(BlobExchangeTestBase): 'version': '_0_0_1' } await self.setup_stream_manager(10.0, fee) - await self._test_download_error(InsufficientFundsError) + await self._test_download_error_on_start(InsufficientFundsError) async def test_fee_above_max_allowed(self): fee = { @@ -194,22 +199,28 @@ class TestStreamManager(BlobExchangeTestBase): 'version': '_0_0_1' } await self.setup_stream_manager(1000000.0, fee) - await self._test_download_error(KeyFeeAboveMaxAllowed) + await self._test_download_error_on_start(KeyFeeAboveMaxAllowed) async def test_resolve_error(self): await self.setup_stream_manager() self.uri = "fake" - await self._test_download_error(ResolveError) + await self._test_download_error_on_start(ResolveError) - async def test_download_timeout(self): + async def test_download_sd_timeout(self): self.server.stop_server() - self.client_config.download_timeout = 1.0 await self.setup_stream_manager() - await self._test_download_error(DownloadSDTimeout) + await self._test_download_error_analytics_on_start(DownloadSDTimeout, timeout=1) + + async def test_download_data_timeout(self): + await self.setup_stream_manager() + with open(os.path.join(self.server_dir, self.sd_hash), 'r') as sdf: + head_blob_hash = json.loads(sdf.read())['blobs'][0]['blob_hash'] + self.server_blob_manager.delete_blob(head_blob_hash) + await self._test_download_error_analytics_on_start(DownloadDataTimeout, timeout=1) async def test_download_then_recover_stream_on_startup(self, old_sort=False): expected_analytics_events = [ - 'Download Started', + 'Time To First Bytes', 'Download Finished' ] received_events = [] @@ -223,6 +234,7 @@ class TestStreamManager(BlobExchangeTestBase): self.assertSetEqual(self.stream_manager.streams, set()) stream = await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) await stream.downloader.stream_finished_event.wait() + await asyncio.sleep(0, loop=self.loop) self.stream_manager.stop() self.client_blob_manager.stop() os.remove(os.path.join(self.client_blob_manager.blob_dir, stream.sd_hash))