diff --git a/lbrynet/extras/daemon/Components.py b/lbrynet/extras/daemon/Components.py index 4a930b050..df67dea8c 100644 --- a/lbrynet/extras/daemon/Components.py +++ b/lbrynet/extras/daemon/Components.py @@ -441,7 +441,7 @@ class StreamManagerComponent(Component): log.info('Starting the file manager') loop = asyncio.get_event_loop() self.stream_manager = StreamManager( - loop, self.conf, blob_manager, wallet, storage, node, + loop, self.conf, blob_manager, wallet, storage, node, self.component_manager.analytics_manager ) await self.stream_manager.start() log.info('Done setting up file manager') diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index c581169da..36fb3bd5c 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -250,7 +250,7 @@ class Daemon(metaclass=JSONRPCServerType): self._node_id = None self._installation_id = None self.session_id = base58.b58encode(utils.generate_id()).decode() - self.analytics_manager = analytics.Manager(conf, self.installation_id, self.session_id) + self.analytics_manager = analytics.AnalyticsManager(conf, self.installation_id, self.session_id) self.component_manager = component_manager or ComponentManager( conf, analytics_manager=self.analytics_manager, skip_components=conf.components_to_skip or [] diff --git a/lbrynet/extras/daemon/analytics.py b/lbrynet/extras/daemon/analytics.py index db4115c29..1ef9fc3ed 100644 --- a/lbrynet/extras/daemon/analytics.py +++ b/lbrynet/extras/daemon/analytics.py @@ -1,9 +1,9 @@ import asyncio import collections import logging - import aiohttp - +import typing +import binascii from lbrynet import utils from lbrynet.conf import Config from lbrynet.extras import system_info @@ -22,7 +22,6 @@ HEARTBEAT = 'Heartbeat' CLAIM_ACTION = 'Claim Action' # publish/create/update/abandon NEW_CHANNEL = 'New Channel' CREDITS_SENT = 'Credits Sent' -NEW_DOWNLOAD_STAT = 'Download' UPNP_SETUP = "UPnP Setup" BLOB_BYTES_UPLOADED = 'Blob Bytes Uploaded' @@ -30,7 +29,46 @@ BLOB_BYTES_UPLOADED = 'Blob Bytes Uploaded' log = logging.getLogger(__name__) -class Manager: +def _event_properties(installation_id: str, session_id: str, + event_properties: typing.Optional[typing.Dict]) -> typing.Dict: + properties = { + 'lbry_id': installation_id, + 'session_id': session_id, + } + properties.update(event_properties or {}) + return properties + + +def _download_properties(download_id: str, name: str, sd_hash: str) -> typing.Dict: + p = { + 'download_id': download_id, + 'name': name, + 'stream_info': sd_hash + } + return p + + +def _make_context(platform): + # see https://segment.com/docs/spec/common/#context + # they say they'll ignore fields outside the spec, but evidently they don't + context = { + 'app': { + 'version': platform['lbrynet_version'], + 'build': platform['build'], + }, + # TODO: expand os info to give linux/osx specific info + 'os': { + 'name': platform['os_system'], + 'version': platform['os_release'] + }, + } + if 'desktop' in platform and 'distro' in platform: + context['os']['desktop'] = platform['desktop'] + context['os']['distro'] = platform['distro'] + return context + + +class AnalyticsManager: def __init__(self, conf: Config, installation_id: str, session_id: str): self.cookies = {} @@ -38,7 +76,7 @@ class Manager: self._write_key = utils.deobfuscate(ANALYTICS_TOKEN) self._enabled = conf.share_usage_data self._tracked_data = collections.defaultdict(list) - self.context = self._make_context(system_info.get_platform(), 'torba') + self.context = _make_context(system_info.get_platform()) self.installation_id = installation_id self.session_id = session_id self.task: asyncio.Task = None @@ -60,21 +98,10 @@ class Manager: if self.task is not None and not self.task.done(): self.task.cancel() - async def _post(self, endpoint, data): - # there is an issue with a timing condition with keep-alive - # that is best explained here: https://github.com/mikem23/keepalive-race - # - # If you make a request, wait just the right amount of time, - # then make another request, the requests module may opt to - # reuse the connection, but by the time the server gets it the - # timeout will have expired. - # - # by forcing the connection to close, we will disable the keep-alive. - - assert endpoint[0] == '/' + async def _post(self, data: typing.Dict): request_kwargs = { 'method': 'POST', - 'url': self.url + endpoint, + 'url': self.url + '/track', 'headers': {'Connection': 'Close'}, 'auth': aiohttp.BasicAuth(self._write_key, ''), 'json': data, @@ -84,40 +111,13 @@ class Manager: async with utils.aiohttp_request(**request_kwargs) as response: self.cookies.update(response.cookies) except Exception as e: - log.exception('Encountered an exception while POSTing to %s: ', self.url + endpoint, exc_info=e) + log.exception('Encountered an exception while POSTing to %s: ', self.url + '/track', exc_info=e) - async def track(self, event): + async def track(self, event: typing.Dict): """Send a single tracking event""" if self._enabled: - log.debug('Sending track event: %s', event) - await self._post('/track', event) - - async def send_new_download_start(self, download_id, name, claim_dict): - await self._send_new_download_stats("start", download_id, name, claim_dict) - - async def send_new_download_success(self, download_id, name, claim_dict): - await self._send_new_download_stats("success", download_id, name, claim_dict) - - async def send_new_download_fail(self, download_id, name, claim_dict, e): - await self._send_new_download_stats("failure", download_id, name, claim_dict, { - 'name': type(e).__name__ if hasattr(type(e), "__name__") else str(type(e)), - 'message': str(e), - }) - - async def _send_new_download_stats(self, action, download_id, name, claim_dict, e=None): - await self.track({ - 'userId': 'lbry', # required, see https://segment.com/docs/sources/server/http/#track - 'event': NEW_DOWNLOAD_STAT, - 'properties': self._event_properties({ - 'download_id': download_id, - 'name': name, - 'sd_hash': None if not claim_dict else claim_dict.source_hash.decode(), - 'action': action, - 'error': e, - }), - 'context': self.context, - 'timestamp': utils.isonow(), - }) + log.info('Sending track event: %s', event) + await self._post(event) async def send_upnp_setup_success_fail(self, success, status): await self.track( @@ -136,19 +136,23 @@ class Manager: async def send_server_startup_error(self, message): await self.track(self._event(SERVER_STARTUP_ERROR, {'message': message})) - async def send_download_started(self, id_, name, claim_dict=None): + async def send_download_started(self, download_id, name, sd_hash): await self.track( - self._event(DOWNLOAD_STARTED, self._download_properties(id_, name, claim_dict)) + self._event(DOWNLOAD_STARTED, _download_properties(download_id, name, sd_hash)) ) - async def send_download_errored(self, err, id_, name, claim_dict, report): - download_error_properties = self._download_error_properties(err, id_, name, claim_dict, - report) - await self.track(self._event(DOWNLOAD_ERRORED, download_error_properties)) + async def send_download_finished(self, download_id, name, sd_hash): + await self.track(self._event(DOWNLOAD_FINISHED, _download_properties(download_id, name, sd_hash))) - async def send_download_finished(self, id_, name, report, claim_dict=None): - download_properties = self._download_properties(id_, name, claim_dict, report) - await self.track(self._event(DOWNLOAD_FINISHED, download_properties)) + async def send_download_errored(self, error: Exception, name: str): + await self.track(self._event(DOWNLOAD_ERRORED, { + 'download_id': binascii.hexlify(utils.generate_id()).decode(), + 'name': name, + 'stream_info': None, + 'error': type(error).__name__, + 'reason': str(error), + 'report': None + })) async def send_claim_action(self, action): await self.track(self._event(CLAIM_ACTION, {'action': action})) @@ -162,69 +166,11 @@ class Manager: async def _send_heartbeat(self): await self.track(self._event(HEARTBEAT)) - def _event(self, event, event_properties=None): + def _event(self, event, properties: typing.Optional[typing.Dict] = None): return { 'userId': 'lbry', 'event': event, - 'properties': self._event_properties(event_properties), + 'properties': _event_properties(self.installation_id, self.session_id, properties), 'context': self.context, 'timestamp': utils.isonow() } - - def _metric_event(self, metric_name, value): - return self._event(metric_name, {'value': value}) - - def _event_properties(self, event_properties=None): - properties = { - 'lbry_id': self.installation_id, - 'session_id': self.session_id, - } - properties.update(event_properties or {}) - return properties - - @staticmethod - def _download_properties(id_, name, claim_dict=None, report=None): - sd_hash = None if not claim_dict else claim_dict.source_hash.decode() - p = { - 'download_id': id_, - 'name': name, - 'stream_info': sd_hash - } - if report: - p['report'] = report - return p - - @staticmethod - def _download_error_properties(error, id_, name, claim_dict, report): - def error_name(err): - if not hasattr(type(err), "__name__"): - return str(type(err)) - return type(err).__name__ - return { - 'download_id': id_, - 'name': name, - 'stream_info': claim_dict.source_hash.decode(), - 'error': error_name(error), - 'reason': str(error), - 'report': report - } - - @staticmethod - def _make_context(platform, wallet): - # see https://segment.com/docs/spec/common/#context - # they say they'll ignore fields outside the spec, but evidently they don't - context = { - 'app': { - 'version': platform['lbrynet_version'], - 'build': platform['build'], - }, - # TODO: expand os info to give linux/osx specific info - 'os': { - 'name': platform['os_system'], - 'version': platform['os_release'] - }, - } - if 'desktop' in platform and 'distro' in platform: - context['os']['desktop'] = platform['desktop'] - context['os']['distro'] = platform['distro'] - return context diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py index 2ca0247ef..176c96deb 100644 --- a/lbrynet/stream/managed_stream.py +++ b/lbrynet/stream/managed_stream.py @@ -3,6 +3,7 @@ import asyncio import typing import logging import binascii +from lbrynet.utils import generate_id from lbrynet.extras.daemon.mime_types import guess_media_type from lbrynet.stream.downloader import StreamDownloader from lbrynet.stream.descriptor import StreamDescriptor @@ -35,8 +36,10 @@ class ManagedStream: self.stream_hash = descriptor.stream_hash self.stream_claim_info = claim self._status = status + self.fully_reflected = asyncio.Event(loop=self.loop) self.tx = None + self.download_id = binascii.hexlify(generate_id()).decode() @property def file_name(self) -> typing.Optional[str]: diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index 6b8b76ef9..36dc151ba 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -17,6 +17,7 @@ if typing.TYPE_CHECKING: from lbrynet.conf import Config from lbrynet.blob.blob_manager import BlobFileManager from lbrynet.dht.node import Node + from lbrynet.extras.daemon.analytics import AnalyticsManager from lbrynet.extras.daemon.storage import SQLiteStorage, StoredStreamClaim from lbrynet.extras.wallet import LbryWalletManager from lbrynet.extras.daemon.exchange_rate_manager import ExchangeRateManager @@ -54,13 +55,15 @@ comparison_operators = { class StreamManager: def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobFileManager', - wallet: 'LbryWalletManager', storage: 'SQLiteStorage', node: typing.Optional['Node']): + wallet: 'LbryWalletManager', storage: 'SQLiteStorage', node: typing.Optional['Node'], + analytics_manager: typing.Optional['AnalyticsManager'] = None): self.loop = loop self.config = config self.blob_manager = blob_manager self.wallet = wallet self.storage = storage self.node = node + self.analytics_manager = analytics_manager self.streams: typing.Set[ManagedStream] = set() self.starting_streams: typing.Dict[str, asyncio.Future] = {} self.resume_downloading_task: asyncio.Task = None @@ -277,6 +280,11 @@ class StreamManager: if stream.downloader and stream.running: await stream.downloader.stream_finished_event.wait() stream.update_status(ManagedStream.STATUS_FINISHED) + if self.analytics_manager: + await self.analytics_manager.send_download_finished( + stream.download_id, stream.claim_name, stream.sd_hash + ) + task = self.loop.create_task(_wait_for_stream_finished()) self.update_stream_finished_futs.append(task) task.add_done_callback( @@ -403,9 +411,9 @@ class StreamManager: streams.reverse() return streams - async def download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager', - file_name: typing.Optional[str] = None, - timeout: typing.Optional[float] = None) -> typing.Optional[ManagedStream]: + async def _download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager', + file_name: typing.Optional[str] = None, + timeout: typing.Optional[float] = None) -> typing.Optional[ManagedStream]: timeout = timeout or self.config.download_timeout parsed_uri = parse_lbry_uri(uri) if parsed_uri.is_channel: @@ -455,6 +463,10 @@ class StreamManager: self.node, resolved, file_name, timeout, fee_amount, fee_address, False ) log.info("started new stream, deleting old one") + if self.analytics_manager: + await self.analytics_manager.send_download_started( + stream.download_id, parsed_uri.name, claim.source_hash.decode() + ) await self.delete_stream(existing[0]) return stream elif existing: @@ -467,6 +479,21 @@ class StreamManager: await self.start_stream(stream) return stream log.info("download stream from %s", uri) - return await self.download_stream_from_claim( - self.node, resolved, file_name, timeout, fee_amount, fee_address + stream = await self.download_stream_from_claim( + self.node, resolved, file_name, timeout, fee_amount, fee_address ) + if self.analytics_manager: + await self.analytics_manager.send_download_started( + stream.download_id, parsed_uri.name, claim.source_hash.decode() + ) + return stream + + async def download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager', + file_name: typing.Optional[str] = None, + timeout: typing.Optional[float] = None) -> typing.Optional[ManagedStream]: + try: + return await self._download_stream_from_uri(uri, exchange_rate_manager, file_name, timeout) + except Exception as e: + if self.analytics_manager: + await self.analytics_manager.send_download_errored(e, uri) + raise e diff --git a/tests/unit/stream/test_stream_manager.py b/tests/unit/stream/test_stream_manager.py index 2d564e760..bf966c0ea 100644 --- a/tests/unit/stream/test_stream_manager.py +++ b/tests/unit/stream/test_stream_manager.py @@ -3,10 +3,13 @@ import binascii from unittest import mock import asyncio import time +import typing from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase from tests.unit.lbrynet_daemon.test_ExchangeRateManager import get_dummy_exchange_rate_manager -from lbrynet.error import InsufficientFundsError, KeyFeeAboveMaxAllowed +from lbrynet.utils import generate_id +from lbrynet.error import InsufficientFundsError, KeyFeeAboveMaxAllowed, ResolveError, DownloadSDTimeout from lbrynet.extras.wallet.manager import LbryWalletManager +from lbrynet.extras.daemon.analytics import AnalyticsManager from lbrynet.stream.stream_manager import StreamManager from lbrynet.stream.descriptor import StreamDescriptor from lbrynet.dht.node import Node @@ -102,11 +105,22 @@ class TestStreamManager(BlobExchangeTestBase): self.sd_hash = descriptor.sd_hash self.mock_wallet, self.uri = get_mock_wallet(self.sd_hash, self.client_storage, balance, fee) self.stream_manager = StreamManager(self.loop, self.client_config, self.client_blob_manager, self.mock_wallet, - self.client_storage, get_mock_node(self.server_from_client)) + self.client_storage, get_mock_node(self.server_from_client), + AnalyticsManager(self.client_config, + binascii.hexlify(generate_id()).decode(), + binascii.hexlify(generate_id()).decode())) self.exchange_rate_manager = get_dummy_exchange_rate_manager(time) async def test_download_stop_resume_delete(self): await self.setup_stream_manager() + received = [] + expected_events = ['Download Started', 'Download Finished'] + + async def check_post(event): + received.append(event['event']) + + self.stream_manager.analytics_manager._post = check_post + self.assertSetEqual(self.stream_manager.streams, set()) stream = await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) stream_hash = stream.stream_hash @@ -147,6 +161,20 @@ class TestStreamManager(BlobExchangeTestBase): "select status from file where stream_hash=?", stream_hash ) self.assertEqual(stored_status, None) + self.assertListEqual(expected_events, received) + + async def _test_download_error(self, expected_error): + received = [] + + async def check_post(event): + self.assertEqual("Download Errored", event['event']) + received.append(event['properties']['error']) + + self.stream_manager.analytics_manager._post = check_post + + with self.assertRaises(expected_error): + await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) + self.assertListEqual([expected_error.__name__], received) async def test_insufficient_funds(self): fee = { @@ -156,8 +184,7 @@ class TestStreamManager(BlobExchangeTestBase): 'version': '_0_0_1' } await self.setup_stream_manager(10.0, fee) - with self.assertRaises(InsufficientFundsError): - await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) + await self._test_download_error(InsufficientFundsError) async def test_fee_above_max_allowed(self): fee = { @@ -167,11 +194,32 @@ class TestStreamManager(BlobExchangeTestBase): 'version': '_0_0_1' } await self.setup_stream_manager(1000000.0, fee) - with self.assertRaises(KeyFeeAboveMaxAllowed): - await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) + await self._test_download_error(KeyFeeAboveMaxAllowed) + + async def test_resolve_error(self): + await self.setup_stream_manager() + self.uri = "fake" + await self._test_download_error(ResolveError) + + async def test_download_timeout(self): + self.server.stop_server() + self.client_config.download_timeout = 1.0 + await self.setup_stream_manager() + await self._test_download_error(DownloadSDTimeout) async def test_download_then_recover_stream_on_startup(self, old_sort=False): + expected_analytics_events = [ + 'Download Started', + 'Download Finished' + ] + received_events = [] + + async def check_post(event): + received_events.append(event['event']) + await self.setup_stream_manager(old_sort=old_sort) + self.stream_manager.analytics_manager._post = check_post + self.assertSetEqual(self.stream_manager.streams, set()) stream = await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) await stream.downloader.stream_finished_event.wait() @@ -189,6 +237,7 @@ class TestStreamManager(BlobExchangeTestBase): sd_blob = self.client_blob_manager.get_blob(stream.sd_hash) self.assertTrue(sd_blob.file_exists) self.assertTrue(sd_blob.get_is_verified()) + self.assertListEqual(expected_analytics_events, received_events) def test_download_then_recover_old_sort_stream_on_startup(self): return self.test_download_then_recover_stream_on_startup(old_sort=True)