From 2353dbcb0098789dd9532cbb2eddb7958d081be2 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Wed, 26 Apr 2017 14:15:38 -0400 Subject: [PATCH] refactor analytics --- lbrynet/analytics.py | 241 ++++++++++++++++++ lbrynet/analytics/__init__.py | 6 - lbrynet/analytics/api.py | 81 ------ lbrynet/analytics/constants.py | 4 - lbrynet/analytics/events.py | 128 ---------- lbrynet/analytics/logging_handler.py | 14 - lbrynet/analytics/manager.py | 116 --------- lbrynet/analytics/track.py | 24 -- lbrynet/conf.py | 16 +- lbrynet/core/log_support.py | 8 +- lbrynet/core/server/BlobRequestHandler.py | 15 +- lbrynet/lbrynet_daemon/Daemon.py | 11 +- lbrynet/lbrynet_daemon/DaemonControl.py | 1 - tests/functional/test_misc.py | 7 +- tests/unit/analytics/test_events.py | 39 --- tests/unit/analytics/test_track.py | 8 +- .../core/server/test_BlobRequestHandler.py | 4 +- 17 files changed, 270 insertions(+), 453 deletions(-) create mode 100644 lbrynet/analytics.py delete mode 100644 lbrynet/analytics/__init__.py delete mode 100644 lbrynet/analytics/api.py delete mode 100644 lbrynet/analytics/constants.py delete mode 100644 lbrynet/analytics/events.py delete mode 100644 lbrynet/analytics/logging_handler.py delete mode 100644 lbrynet/analytics/manager.py delete mode 100644 lbrynet/analytics/track.py delete mode 100644 tests/unit/analytics/test_events.py diff --git a/lbrynet/analytics.py b/lbrynet/analytics.py new file mode 100644 index 000000000..8286282a0 --- /dev/null +++ b/lbrynet/analytics.py @@ -0,0 +1,241 @@ +import collections +import logging +from twisted.internet import defer, task +from requests import auth +from txrequests import Session + +from lbrynet import conf +from lbrynet.core import looping_call_manager, utils, system_info + +# Things We Track +SERVER_STARTUP = 'Server Startup' +SERVER_STARTUP_SUCCESS = 'Server Startup Success' +SERVER_STARTUP_ERROR = 'Server Startup Error' +DOWNLOAD_STARTED = 'Download Started' +DOWNLOAD_ERRORED = 'Download Errored' +DOWNLOAD_FINISHED = 'Download Finished' +HEARTBEAT = 'Heartbeat' + +BLOB_BYTES_UPLOADED = 'Blob Bytes Uploaded' +BLOB_BYTES_AVAILABLE = 'Blob Bytes Available' + +log = logging.getLogger(__name__) + + +class Manager(object): + def __init__(self, analytics_api, context=None, installation_id=None, session_id=None): + self.analytics_api = analytics_api + self._tracked_data = collections.defaultdict(list) + self.looping_call_manager = self._setup_looping_calls() + self.context = context or self._make_context( + system_info.get_platform(), conf.settings['wallet']) + self.installation_id = installation_id or conf.settings.installation_id + self.session_id = session_id or conf.settings.get_session_id() + self.is_started = False + + @classmethod + def new_instance(cls, api=None): + if api is None: + api = Api.new_instance() + return cls(api) + + # Things We Track + + def send_server_startup(self): + self.analytics_api.track(self._event(SERVER_STARTUP)) + + def send_server_startup_success(self): + self.analytics_api.track(self._event(SERVER_STARTUP_SUCCESS)) + + def send_server_startup_error(self, message): + self.analytics_api.track(self._event(SERVER_STARTUP_ERROR, {'message': message})) + + def send_download_started(self, id_, name, stream_info=None): + self.analytics_api.track( + self._event(DOWNLOAD_STARTED, self._download_properties(id_, name, stream_info)) + ) + + def send_download_errored(self, id_, name, stream_info=None): + self.analytics_api.track( + self._event(DOWNLOAD_ERRORED, self._download_properties(id_, name, stream_info)) + ) + + def send_download_finished(self, id_, name, stream_info=None): + self.analytics_api.track( + self._event(DOWNLOAD_FINISHED, self._download_properties(id_, name, stream_info)) + ) + + def _send_heartbeat(self): + self.analytics_api.track(self._event(HEARTBEAT)) + + def _update_tracked_metrics(self): + should_send, value = self.summarize_and_reset(BLOB_BYTES_UPLOADED) + if should_send: + self.analytics_api.track(self._metric_event(BLOB_BYTES_UPLOADED, value)) + + # Setup / Shutdown + + def start(self): + if not self.is_started: + for name, _, interval in self._get_looping_calls(): + self.looping_call_manager.start(name, interval) + self.is_started = True + + def shutdown(self): + self.looping_call_manager.shutdown() + + def register_repeating_metric(self, event_name, value_generator, frequency=300): + lcall = task.LoopingCall(self._send_repeating_metric, event_name, value_generator) + self.looping_call_manager.register_looping_call(event_name, lcall) + lcall.start(frequency) + + def _get_looping_calls(self): + return [ + ('send_heartbeat', self._send_heartbeat, 60), + ('update_tracked_metrics', self._update_tracked_metrics, 300), + ] + + def _setup_looping_calls(self): + call_manager = looping_call_manager.LoopingCallManager() + for name, fn, _ in self._get_looping_calls(): + call_manager.register_looping_call(name, task.LoopingCall(fn)) + return call_manager + + def _send_repeating_metric(self, event_name, value_generator): + result = value_generator() + self._if_deferred(result, self._send_repeating_metric_value, event_name) + + def _send_repeating_metric_value(self, result, event_name): + should_send, value = result + if should_send: + self.analytics_api.track(self._metric_event(event_name, value)) + + def add_observation(self, metric, value): + self._tracked_data[metric].append(value) + + def summarize_and_reset(self, metric, op=sum): + """Apply `op` on the current values for `metric`. + + This operation also resets the metric. + + Returns: + a tuple (should_send, value) + """ + try: + values = self._tracked_data.pop(metric) + return True, op(values) + except KeyError: + return False, None + + def _event(self, event, event_properties=None): + return { + 'userId': 'lbry', + 'event': event, + 'properties': self._event_properties(event_properties), + 'context': self.context, + 'timestamp': utils.isonow() + } + + def _metric_event(self, metric_name, value): + return self._event(metric_name, {'value': value}) + + def _event_properties(self, event_properties=None): + properties = { + 'lbry_id': self.installation_id, + 'session_id': self.session_id, + } + properties.update(event_properties or {}) + return properties + + @staticmethod + def _download_properties(id_, name, stream_info=None): + sd_hash = None + if stream_info: + try: + sd_hash = stream_info['stream']['source']['source'] + except (KeyError, TypeError, ValueError): + log.debug('Failed to get sd_hash from %s', stream_info, exc_info=True) + return { + 'download_id': id_, + 'name': name, + 'stream_info': sd_hash + } + + @staticmethod + def _make_context(platform, wallet): + return { + 'app': { + 'name': 'lbrynet', + 'version': platform['lbrynet_version'], + 'python_version': platform['python_version'], + 'build': platform['build'], + 'wallet': { + 'name': wallet, + 'version': platform['lbryum_version'] if wallet == conf.LBRYUM_WALLET else None + }, + }, + # TODO: expand os info to give linux/osx specific info + 'os': { + 'name': platform['os_system'], + 'version': platform['os_release'] + }, + 'library': { + 'name': 'lbrynet-analytics', + 'version': '1.0.0' + }, + } + + @staticmethod + def _if_deferred(maybe_deferred, callback, *args, **kwargs): + if isinstance(maybe_deferred, defer.Deferred): + maybe_deferred.addCallback(callback, *args, **kwargs) + else: + callback(maybe_deferred, *args, **kwargs) + + +class Api(object): + def __init__(self, session, url, write_key): + self.session = session + self.url = url + self._write_key = write_key + + def _post(self, endpoint, data): + # there is an issue with a timing condition with keep-alive + # that is best explained here: https://github.com/mikem23/keepalive-race + # + # If you make a request, wait just the right amount of time, + # then make another request, the requests module may opt to + # reuse the connection, but by the time the server gets it the + # timeout will have expired. + # + # by forcing the connection to close, we will disable the keep-alive. + assert endpoint[0] == '/' + headers = {"Connection": "close"} + return self.session.post( + self.url + endpoint, + json=data, + auth=auth.HTTPBasicAuth(self._write_key, ''), + headers=headers + ) + + def track(self, event): + """Send a single tracking event""" + + def _log_error(failure): + log.warning('Failed to send track event. %s', failure.getTraceback()) + + log.debug('Sending track event: %s', event) + d = self._post('/track', event) + d.addErrback(_log_error) + return d + + @classmethod + def new_instance(cls, session=None): + """Initialize an instance using values from the configuration""" + if not session: + session = Session() + return cls( + session, + conf.settings['ANALYTICS_ENDPOINT'], + utils.deobfuscate(conf.settings['ANALYTICS_TOKEN']) + ) diff --git a/lbrynet/analytics/__init__.py b/lbrynet/analytics/__init__.py deleted file mode 100644 index 1aafe01b4..000000000 --- a/lbrynet/analytics/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -from constants import * -from events import * -from api import Api -from track import Track -from manager import Manager -from logging_handler import Handler diff --git a/lbrynet/analytics/api.py b/lbrynet/analytics/api.py deleted file mode 100644 index ec3d70f12..000000000 --- a/lbrynet/analytics/api.py +++ /dev/null @@ -1,81 +0,0 @@ -import functools -import json -import logging - -from requests import auth -from txrequests import Session - -from lbrynet import conf -from lbrynet.analytics import utils - -log = logging.getLogger(__name__) - - -def log_response(fn): - def _log_error(failure): - log.warning('Failed to send an analytics event. %s', failure.getTraceback()) - - @functools.wraps(fn) - def wrapper(*args, **kwargs): - d = fn(*args, **kwargs) - d.addErrback(_log_error) - return d - - return wrapper - - -class Api(object): - def __init__(self, session, url, write_key): - self.session = session - self.url = url - self.write_key = write_key - - def post(self, endpoint, data): - # there is an issue with a timing condition with keep-alive - # that is best explained here: https://github.com/mikem23/keepalive-race - # - # If you make a request, wait just the right amount of time, - # then make another request, the requests module may opt to - # reuse the connection, but by the time the server gets it the - # timeout will have expired. - # - # by forcing the connection to close, we will disable the keep-alive. - assert endpoint[0] == '/' - headers = {"Connection": "close"} - return self.session.post( - self.url + endpoint, json=data, auth=self.auth, headers=headers) - - @property - def auth(self): - return auth.HTTPBasicAuth(self.write_key, '') - - @log_response - def batch(self, events): - """Send multiple events in one request. - - Each event needs to have its type specified. - """ - data = json.dumps({ - 'batch': events, - 'sentAt': utils.now(), - }) - log.debug('sending %s events', len(events)) - log.debug('Data: %s', data) - return self.post('/batch', data) - - @log_response - def track(self, event): - """Send a single tracking event""" - log.debug('Sending track event: %s', event) - return self.post('/track', event) - - @classmethod - def new_instance(cls, session=None): - """Initialize an instance using values from the configuration""" - if not session: - session = Session() - return cls( - session, - conf.settings['ANALYTICS_ENDPOINT'], - utils.deobfuscate(conf.settings['ANALYTICS_TOKEN']) - ) diff --git a/lbrynet/analytics/constants.py b/lbrynet/analytics/constants.py deleted file mode 100644 index 280b51a72..000000000 --- a/lbrynet/analytics/constants.py +++ /dev/null @@ -1,4 +0,0 @@ -"""Constants for metrics""" - -BLOB_BYTES_UPLOADED = 'Blob Bytes Uploaded' -BLOB_BYTES_AVAILABLE = 'Blob Bytes Available' diff --git a/lbrynet/analytics/events.py b/lbrynet/analytics/events.py deleted file mode 100644 index 14cdabcf0..000000000 --- a/lbrynet/analytics/events.py +++ /dev/null @@ -1,128 +0,0 @@ -import logging - -from lbrynet.core import utils -from lbrynet.conf import LBRYUM_WALLET - -log = logging.getLogger(__name__) - - -def get_sd_hash(stream_info): - if not stream_info: - return None - try: - return stream_info['stream']['source']['source'] - except (KeyError, TypeError, ValueError): - log.debug('Failed to get sd_hash from %s', stream_info, exc_info=True) - return None - - -class Events(object): - def __init__(self, context, installation_id, session_id): - """Contains all of the analytics events that can be sent - - Attributes: - context: usually the output of `make_context` - installation_id: id unique to this installation. Can be anything, but - generally should be base58 encoded. - session_id: id for tracking events during this session. Can be - anything, but generally should be base58 encoded. - """ - self.context = context - self.installation_id = installation_id - self.session_id = session_id - - def update_context(self, context): - self.context = context - - def server_startup(self): - return self._event('Server Startup') - - def server_startup_success(self): - return self._event('Server Startup Success') - - def server_startup_error(self, message): - return self._event('Server Startup Error', { - 'message': message, - }) - - def heartbeat(self): - return self._event('Heartbeat') - - def download_started(self, *args, **kwargs): - properties = download_properties(*args, **kwargs) - return self._event('Download Started', properties) - - def download_errored(self, *args, **kwargs): - properties = download_properties(*args, **kwargs) - return self._event('Download Errored', properties) - - def download_finished(self, *args, **kwargs): - properties = download_properties(*args, **kwargs) - return self._event('Download Finished', properties) - - def error(self, log_record): - """Record when a log message of ERROR or higher was emitted""" - properties = { - 'message': log_record.message, - 'module': log_record.module, - 'lineno': log_record.lineno, - 'name': log_record.name, - 'traceback': log_record.exc_text, - } - return self._event('Error', properties) - - def metric_observed(self, metric_name, value): - properties = { - 'value': value, - } - return self._event(metric_name, properties) - - def _event(self, event, event_properties=None): - return { - 'userId': 'lbry', - 'event': event, - 'properties': self._properties(event_properties), - 'context': self.context, - 'timestamp': utils.isonow() - } - - def _properties(self, event_properties=None): - event_properties = event_properties or {} - properties = { - 'lbry_id': self.installation_id, - 'session_id': self.session_id, - } - properties.update(event_properties) - return properties - - -def make_context(platform, wallet): - return { - 'app': { - 'name': 'lbrynet', - 'version': platform['lbrynet_version'], - 'python_version': platform['python_version'], - 'build': platform['build'], - 'wallet': { - 'name': wallet, - 'version': platform['lbryum_version'] if wallet == LBRYUM_WALLET else None - }, - }, - # TODO: expand os info to give linux/osx specific info - 'os': { - 'name': platform['os_system'], - 'version': platform['os_release'] - }, - 'library': { - 'name': 'lbrynet-analytics', - 'version': '1.0.0' - }, - } - - -def download_properties(id_, name, stream_info=None): - return { - 'download_id': id_, - 'name': name, - 'stream_info': get_sd_hash(stream_info) - } diff --git a/lbrynet/analytics/logging_handler.py b/lbrynet/analytics/logging_handler.py deleted file mode 100644 index e7bdb4ac9..000000000 --- a/lbrynet/analytics/logging_handler.py +++ /dev/null @@ -1,14 +0,0 @@ -import logging - - -class Handler(logging.Handler): - """A logging handler that reports errors to the analytics manager""" - def __init__(self, manager, level=logging.ERROR): - self.manager = manager - logging.Handler.__init__(self, level) - - def emit(self, record): - # We need to call format to ensure that record.message and - # record.exc_text attributes are populated - self.format(record) - self.manager.send_error(record) diff --git a/lbrynet/analytics/manager.py b/lbrynet/analytics/manager.py deleted file mode 100644 index c8dbc9d9e..000000000 --- a/lbrynet/analytics/manager.py +++ /dev/null @@ -1,116 +0,0 @@ -from twisted.internet import defer -from twisted.internet import task - -from lbrynet import conf -from lbrynet.core import looping_call_manager -from lbrynet.core.system_info import get_platform - -import constants -from api import Api -from events import Events, make_context -from track import Track - - -class Manager(object): - def __init__(self, analytics_api, events_generator, track): - self.analytics_api = analytics_api - self.events_generator = events_generator - self.track = track - self.looping_call_manager = self.setup_looping_calls() - self.is_started = False - - @classmethod - def new_instance(cls, api=None, events=None): - if api is None: - api = Api.new_instance() - if events is None: - events = Events( - make_context(get_platform(), conf.settings['wallet']), - conf.settings.installation_id, - conf.settings.get_session_id(), - ) - return cls(api, events, Track()) - - def update_events_generator(self, events_generator): - self.events_generator = events_generator - - def _get_looping_calls(self): - return [ - ('send_heartbeat', self._send_heartbeat, 60), - ('update_tracked_metrics', self._update_tracked_metrics, 300), - ] - - def setup_looping_calls(self): - call_manager = looping_call_manager.LoopingCallManager() - for name, fn, _ in self._get_looping_calls(): - call_manager.register_looping_call(name, task.LoopingCall(fn)) - return call_manager - - def start(self): - if not self.is_started: - for name, _, interval in self._get_looping_calls(): - self.looping_call_manager.start(name, interval) - self.is_started = True - - def shutdown(self): - self.looping_call_manager.shutdown() - - def send_server_startup(self): - event = self.events_generator.server_startup() - self.analytics_api.track(event) - - def send_server_startup_success(self): - event = self.events_generator.server_startup_success() - self.analytics_api.track(event) - - def send_server_startup_error(self, message): - event = self.events_generator.server_startup_error(message) - self.analytics_api.track(event) - - def send_download_started(self, id_, name, stream_info=None): - event = self.events_generator.download_started(id_, name, stream_info) - self.analytics_api.track(event) - - def send_download_errored(self, id_, name, stream_info=None): - event = self.events_generator.download_errored(id_, name, stream_info) - self.analytics_api.track(event) - - def send_download_finished(self, id_, name, stream_info=None): - event = self.events_generator.download_finished(id_, name, stream_info) - self.analytics_api.track(event) - - def send_error(self, message): - event = self.events_generator.error(message) - self.analytics_api.track(event) - - def register_repeating_metric(self, event_name, value_generator, frequency=300): - lcall = task.LoopingCall(self._send_repeating_metric, event_name, value_generator) - self.looping_call_manager.register_looping_call(event_name, lcall) - lcall.start(frequency) - - def _send_heartbeat(self): - heartbeat = self.events_generator.heartbeat() - self.analytics_api.track(heartbeat) - - def _update_tracked_metrics(self): - should_send, value = self.track.summarize_and_reset(constants.BLOB_BYTES_UPLOADED) - if should_send: - event = self.events_generator.metric_observed(constants.BLOB_BYTES_UPLOADED, value) - self.analytics_api.track(event) - - def _send_repeating_metric(self, event_name, value_generator): - result = value_generator() - if_deferred(result, self._send_repeating_metric_value, event_name) - - def _send_repeating_metric_value(self, result, event_name): - should_send, value = result - if should_send: - event = self.events_generator.metric_observed(event_name, value) - self.analytics_api.track(event) - - -def if_deferred(maybe_deferred, callback, *args, **kwargs): - if isinstance(maybe_deferred, defer.Deferred): - maybe_deferred.addCallback(callback, *args, **kwargs) - else: - callback(maybe_deferred, *args, **kwargs) diff --git a/lbrynet/analytics/track.py b/lbrynet/analytics/track.py deleted file mode 100644 index 7643ebce9..000000000 --- a/lbrynet/analytics/track.py +++ /dev/null @@ -1,24 +0,0 @@ -import collections - - -class Track(object): - """Track and summarize observations of metrics.""" - def __init__(self): - self.data = collections.defaultdict(list) - - def add_observation(self, metric, value): - self.data[metric].append(value) - - def summarize_and_reset(self, metric, op=sum): - """Apply `op` on the current values for `metric`. - - This operation also resets the metric. - - Returns: - a tuple (should_send, value) - """ - try: - values = self.data.pop(metric) - return True, op(values) - except KeyError: - return False, None diff --git a/lbrynet/conf.py b/lbrynet/conf.py index d194f459e..f5654e65a 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -79,7 +79,7 @@ class Env(envparse.Env): my_schema = { self._convert_key(key): self._convert_value(value) for key, value in schema.items() - } + } envparse.Env.__init__(self, **my_schema) def __call__(self, key, *args, **kwargs): @@ -104,6 +104,7 @@ class Env(envparse.Env): return new_value return value + TYPE_DEFAULT = 'default' TYPE_PERSISTED = 'persisted' TYPE_ENV = 'env' @@ -183,7 +184,7 @@ ADJUSTABLE_SETTINGS = { # # TODO: writing json on the cmd line is a pain, come up with a nicer # parser for this data structure. (maybe MAX_KEY_FEE': USD:25 - 'max_key_fee': (json.loads, {'currency':'USD', 'amount': 25.0, 'address':''}), + 'max_key_fee': (json.loads, {'currency': 'USD', 'amount': 25.0, 'address': ''}), 'max_search_results': (int, 25), 'max_upload': (float, 0.0), @@ -221,11 +222,11 @@ class Config(object): self._adjustable_defaults = adjustable_defaults self._data = { - TYPE_DEFAULT: {}, # defaults + TYPE_DEFAULT: {}, # defaults TYPE_PERSISTED: {}, # stored settings from daemon_settings.yml (or from a db, etc) - TYPE_ENV: {}, # settings from environment variables - TYPE_CLI: {}, # command-line arguments - TYPE_RUNTIME: {}, # set during runtime (using self.set(), etc) + TYPE_ENV: {}, # settings from environment variables + TYPE_CLI: {}, # command-line arguments + TYPE_RUNTIME: {}, # set during runtime (using self.set(), etc) } # the order in which a piece of data is searched for. earlier types override later types @@ -359,7 +360,7 @@ class Config(object): return { key: val for key, val in self.get_current_settings_dict().iteritems() if key in self._adjustable_defaults - } + } def save_conf_file_settings(self): path = self.get_conf_filename() @@ -472,4 +473,3 @@ def initialize_settings(load_conf_file=True): settings.installation_id = settings.get_installation_id() if load_conf_file: settings.load_conf_file_settings() - diff --git a/lbrynet/core/log_support.py b/lbrynet/core/log_support.py index 4c0ec5da0..c06a36d10 100644 --- a/lbrynet/core/log_support.py +++ b/lbrynet/core/log_support.py @@ -9,7 +9,7 @@ import traceback from txrequests import Session import twisted.python.log -from lbrynet import __version__ as lbrynet_version, analytics, build_type, conf +from lbrynet import __version__ as lbrynet_version, build_type, conf from lbrynet.core import utils #### @@ -134,12 +134,6 @@ def configure_file_handler(file_name, **kwargs): return handler -def configure_analytics_handler(analytics_manager): - handler = analytics.Handler(analytics_manager) - handler.name = 'analytics' - return configure_handler(handler, logging.getLogger(), logging.ERROR) - - def get_loggly_url(token=None, version=None): token = token or utils.deobfuscate(conf.settings['LOGGLY_TOKEN']) version = version or lbrynet_version diff --git a/lbrynet/core/server/BlobRequestHandler.py b/lbrynet/core/server/BlobRequestHandler.py index 4cce9fb59..b95b3ca84 100644 --- a/lbrynet/core/server/BlobRequestHandler.py +++ b/lbrynet/core/server/BlobRequestHandler.py @@ -5,8 +5,8 @@ from twisted.protocols.basic import FileSender from twisted.python.failure import Failure from zope.interface import implements -from lbrynet.core.Offer import Offer from lbrynet import analytics +from lbrynet.core.Offer import Offer from lbrynet.interfaces import IQueryHandlerFactory, IQueryHandler, IBlobSender log = logging.getLogger(__name__) @@ -15,17 +15,17 @@ log = logging.getLogger(__name__) class BlobRequestHandlerFactory(object): implements(IQueryHandlerFactory) - def __init__(self, blob_manager, wallet, payment_rate_manager, track): + def __init__(self, blob_manager, wallet, payment_rate_manager, analytics_manager): self.blob_manager = blob_manager self.wallet = wallet self.payment_rate_manager = payment_rate_manager - self.track = track + self.analytics_manager = analytics_manager ######### IQueryHandlerFactory ######### def build_query_handler(self): q_h = BlobRequestHandler( - self.blob_manager, self.wallet, self.payment_rate_manager, self.track) + self.blob_manager, self.wallet, self.payment_rate_manager, self.analytics_manager) return q_h def get_primary_query_identifier(self): @@ -41,12 +41,12 @@ class BlobRequestHandler(object): BLOB_QUERY = 'requested_blob' AVAILABILITY_QUERY = 'requested_blobs' - def __init__(self, blob_manager, wallet, payment_rate_manager, track): + def __init__(self, blob_manager, wallet, payment_rate_manager, analytics_manager): self.blob_manager = blob_manager self.payment_rate_manager = payment_rate_manager self.wallet = wallet self.query_identifiers = [self.PAYMENT_RATE_QUERY, self.BLOB_QUERY, self.AVAILABILITY_QUERY] - self.track = track + self.analytics_manager = analytics_manager self.peer = None self.blob_data_payment_rate = None self.read_handle = None @@ -197,7 +197,8 @@ class BlobRequestHandler(object): uploaded = len(data) self.blob_bytes_uploaded += uploaded self.peer.update_stats('blob_bytes_uploaded', uploaded) - self.track.add_observation(analytics.BLOB_BYTES_UPLOADED, uploaded) + if self.analytics_manager is not None: + self.analytics_manager.add_observation(analytics.BLOB_BYTES_UPLOADED, uploaded) return data def start_transfer(): diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index c1fb7f0a8..ee04565ed 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -390,7 +390,7 @@ class Daemon(AuthJSONRPCServer): self.session.blob_manager, self.session.wallet, self.session.payment_rate_manager, - self.analytics_manager.track + self.analytics_manager ), self.session.wallet.get_wallet_info_query_handler_factory(), ] @@ -632,7 +632,7 @@ class Daemon(AuthJSONRPCServer): def eb(): if not finished_d.called: finished_d.errback(Exception("Blob (%s) download timed out" % - blob_hash[:SHORT_ID_LEN])) + blob_hash[:SHORT_ID_LEN])) if not blob_hash: raise Exception("Nothing to download") @@ -833,7 +833,7 @@ class Daemon(AuthJSONRPCServer): """ try: claim_response = yield self.session.wallet.resolve_uri(uri) - #TODO: fix me, this is a hack + # TODO: fix me, this is a hack except Exception: claim_response = None @@ -1275,7 +1275,6 @@ class Daemon(AuthJSONRPCServer): return self._render_response(float( self.session.wallet.get_address_balance(address, include_unconfirmed))) - def jsonrpc_stop(self): """ DEPRECATED. Use `daemon_stop` instead. @@ -1602,7 +1601,7 @@ class Daemon(AuthJSONRPCServer): else: msg = ( "File was already being downloaded" if status == 'start' - else "File was already stopped" + else "File was already stopped" ) response = yield self._render_response(msg) defer.returnValue(response) @@ -2182,7 +2181,6 @@ class Daemon(AuthJSONRPCServer): d.addCallback(lambda address: self._render_response(address)) return d - @AuthJSONRPCServer.auth_required def jsonrpc_wallet_unused_address(self): """ @@ -2204,7 +2202,6 @@ class Daemon(AuthJSONRPCServer): d.addCallback(lambda address: self._render_response(address)) return d - @AuthJSONRPCServer.auth_required def jsonrpc_send_amount_to_address(self, amount, address): """ diff --git a/lbrynet/lbrynet_daemon/DaemonControl.py b/lbrynet/lbrynet_daemon/DaemonControl.py index 4dbd24411..067590e7a 100644 --- a/lbrynet/lbrynet_daemon/DaemonControl.py +++ b/lbrynet/lbrynet_daemon/DaemonControl.py @@ -120,7 +120,6 @@ def start_server_and_listen(launchui, use_auth, analytics_manager, max_tries=5): analytics_manager: to send analytics """ analytics_manager.send_server_startup() - log_support.configure_analytics_handler(analytics_manager) tries = 1 while tries < max_tries: log.info('Making attempt %s / %s to startup', tries, max_tries) diff --git a/tests/functional/test_misc.py b/tests/functional/test_misc.py index d046f6e5d..f38d9a951 100644 --- a/tests/functional/test_misc.py +++ b/tests/functional/test_misc.py @@ -10,7 +10,6 @@ import unittest from Crypto import Random from Crypto.Hash import MD5 from lbrynet import conf -from lbrynet import analytics from lbrynet.lbryfile.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager from lbrynet.lbryfile.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager @@ -136,7 +135,7 @@ class LbryUploader(object): 2: BlobRequestHandlerFactory( session.blob_manager, session.wallet, session.payment_rate_manager, - analytics.Track()), + None), 3: session.wallet.get_wallet_info_query_handler_factory(), } server_factory = ServerProtocolFactory(session.rate_limiter, @@ -256,7 +255,7 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, 2: BlobRequestHandlerFactory( session.blob_manager, session.wallet, session.payment_rate_manager, - analytics.Track()), + None), 3: session.wallet.get_wallet_info_query_handler_factory(), } @@ -348,7 +347,7 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_genero 1: BlobAvailabilityHandlerFactory(session.blob_manager), 2: BlobRequestHandlerFactory(session.blob_manager, session.wallet, session.payment_rate_manager, - analytics.Track()), + None), 3: session.wallet.get_wallet_info_query_handler_factory(), } diff --git a/tests/unit/analytics/test_events.py b/tests/unit/analytics/test_events.py deleted file mode 100644 index 06b9bcd92..000000000 --- a/tests/unit/analytics/test_events.py +++ /dev/null @@ -1,39 +0,0 @@ -from lbrynet.analytics import events - -from twisted.trial import unittest - -from tests import util - - -class EventsTest(unittest.TestCase): - def setUp(self): - util.resetTime(self) - self.event_generator = events.Events('any valid json datatype', 'lbry123', 'session456') - - def test_heartbeat(self): - result = self.event_generator.heartbeat() - desired_result = { - 'context': 'any valid json datatype', - 'event': 'Heartbeat', - 'properties': {'lbry_id': 'lbry123', 'session_id': 'session456'}, - 'timestamp': '2016-01-01T00:00:00Z', - 'userId': 'lbry' - } - self.assertEqual(desired_result, result) - - def test_download_started(self): - result = self.event_generator.download_started('1', 'great gatsby') - desired_result = { - 'context': 'any valid json datatype', - 'event': 'Download Started', - 'properties': { - 'lbry_id': 'lbry123', - 'session_id': 'session456', - 'name': 'great gatsby', - 'stream_info': None, - 'download_id': '1' - }, - 'timestamp': '2016-01-01T00:00:00Z', - 'userId': 'lbry' - } - self.assertEqual(desired_result, result) diff --git a/tests/unit/analytics/test_track.py b/tests/unit/analytics/test_track.py index 531ec56a5..1688ab6bf 100644 --- a/tests/unit/analytics/test_track.py +++ b/tests/unit/analytics/test_track.py @@ -4,13 +4,13 @@ from twisted.trial import unittest class TrackTest(unittest.TestCase): - def test_empty_summarize_is_None(self): - track = analytics.Track() + def test_empty_summarize_is_none(self): + track = analytics.Manager(None, 'x', 'y', 'z') _, result = track.summarize_and_reset('a') self.assertEqual(None, result) def test_can_get_sum_of_metric(self): - track = analytics.Track() + track = analytics.Manager(None, 'x', 'y', 'z') track.add_observation('b', 1) track.add_observation('b', 2) @@ -18,7 +18,7 @@ class TrackTest(unittest.TestCase): self.assertEqual(3, result) def test_summarize_resets_metric(self): - track = analytics.Track() + track = analytics.Manager(None, 'x', 'y', 'z') track.add_observation('metric', 1) track.add_observation('metric', 2) diff --git a/tests/unit/core/server/test_BlobRequestHandler.py b/tests/unit/core/server/test_BlobRequestHandler.py index e238efe87..af2197d0b 100644 --- a/tests/unit/core/server/test_BlobRequestHandler.py +++ b/tests/unit/core/server/test_BlobRequestHandler.py @@ -5,7 +5,6 @@ from twisted.internet import defer from twisted.test import proto_helpers from twisted.trial import unittest -from lbrynet import analytics from lbrynet.core import Peer from lbrynet.core.server import BlobRequestHandler from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager @@ -120,8 +119,7 @@ class TestBlobRequestHandlerSender(unittest.TestCase): # TODO: also check that the expected payment values are set consumer = proto_helpers.StringTransport() test_file = StringIO.StringIO('test') - track = analytics.Track() - handler = BlobRequestHandler.BlobRequestHandler(None, None, None, track) + handler = BlobRequestHandler.BlobRequestHandler(None, None, None, None) handler.peer = mock.create_autospec(Peer.Peer) handler.currently_uploading = mock.Mock() handler.read_handle = test_file