From 1cc6b7658cae96d9331e0733da38754d9dbf3ec0 Mon Sep 17 00:00:00 2001 From: Jack Date: Tue, 27 Sep 2016 23:56:08 -0400 Subject: [PATCH] get uploads working -add error catching in exchange rate manager -add free data on first request with default negotiation strategy --- lbrynet/core/Strategy.py | 26 +++++++---- lbrynet/core/server/BlobRequestHandler.py | 45 ++++++++----------- lbrynet/lbrynet_daemon/LBRYDaemon.py | 8 +--- .../lbrynet_daemon/LBRYExchangeRateManager.py | 22 +++++++-- 4 files changed, 57 insertions(+), 44 deletions(-) diff --git a/lbrynet/core/Strategy.py b/lbrynet/core/Strategy.py index b4d52fc20..ead935dc4 100644 --- a/lbrynet/core/Strategy.py +++ b/lbrynet/core/Strategy.py @@ -18,9 +18,10 @@ class BasicAvailabilityWeightedStrategy(object): until the rate is accepted or a threshold is reached """ - def __init__(self, blob_tracker, acceleration=1.25, deceleration=0.9, max_rate=0.005): + def __init__(self, blob_tracker, acceleration=1.25, deceleration=0.9, max_rate=0.005, min_rate=0.0): self._acceleration = acceleration # rate of how quickly to ramp offer self._deceleration = deceleration + self._min_rate = min_rate self._max_rate = max_rate self._count_up = {} self._count_down = {} @@ -31,13 +32,17 @@ class BasicAvailabilityWeightedStrategy(object): def respond_to_offer(self, offer, peer, blobs): request_count = self._count_up.get(peer, 0) rates = [self._calculate_price(blob) for blob in blobs] - rate = self._discount(sum(rates) / max(len(blobs), 1), request_count) - log.info("Target rate: %s", rate) + rate = sum(rates) / max(len(rates), 1) + discounted = self._discount(rate, request_count) + price = self._bounded_price(discounted) + log.info("Price target: %f, final: %f", discounted, price) self._inc_up_count(peer) - if offer.accepted: + if offer.rate == 0.0 and request_count == 0: + # give blobs away for free by default on the first request + offer.accept() return offer - elif offer.rate >= rate: + elif offer.rate >= price: log.info("Accept: %f", offer.rate) offer.accept() return offer @@ -57,9 +62,14 @@ class BasicAvailabilityWeightedStrategy(object): rates = [self._calculate_price(blob) for blob in blobs] mean_rate = sum(rates) / max(len(blobs), 1) with_premium = self._premium(mean_rate, request_count) - offer = Offer(with_premium) + price = self._bounded_price(with_premium) + offer = Offer(price) return offer + def _bounded_price(self, price): + price_for_return = min(self._max_rate, max(price, self._min_rate)) + return price_for_return + def _inc_up_count(self, peer): turn = self._count_up.get(peer, 0) + 1 self._count_up.update({peer: turn}) @@ -72,7 +82,7 @@ class BasicAvailabilityWeightedStrategy(object): return self.model.calculate_price(blob) def _premium(self, rate, turn): - return min(rate * (self._acceleration ** turn), self._max_rate) + return rate * (self._acceleration ** turn) def _discount(self, rate, turn): - return min(rate * (self._deceleration ** turn), self._max_rate) \ No newline at end of file + return rate * (self._deceleration ** turn) \ No newline at end of file diff --git a/lbrynet/core/server/BlobRequestHandler.py b/lbrynet/core/server/BlobRequestHandler.py index a594b9c42..a76d3428d 100644 --- a/lbrynet/core/server/BlobRequestHandler.py +++ b/lbrynet/core/server/BlobRequestHandler.py @@ -16,8 +16,7 @@ log = logging.getLogger(__name__) class BlobRequestHandlerFactory(object): implements(IQueryHandlerFactory) - def __init__(self, blob_manager, blob_tracker, wallet, payment_rate_manager): - self.blob_tracker = blob_tracker + def __init__(self, blob_manager, wallet, payment_rate_manager): self.blob_manager = blob_manager self.wallet = wallet self.payment_rate_manager = payment_rate_manager @@ -25,7 +24,7 @@ class BlobRequestHandlerFactory(object): ######### IQueryHandlerFactory ######### def build_query_handler(self): - q_h = BlobRequestHandler(self.blob_manager, self.blob_tracker, self.wallet, self.payment_rate_manager) + q_h = BlobRequestHandler(self.blob_manager, self.wallet, self.payment_rate_manager) return q_h def get_primary_query_identifier(self): @@ -38,9 +37,8 @@ class BlobRequestHandlerFactory(object): class BlobRequestHandler(object): implements(IQueryHandler, IBlobSender) - def __init__(self, blob_manager, blob_tracker, wallet, payment_rate_manager): + def __init__(self, blob_manager, wallet, payment_rate_manager): self.blob_manager = blob_manager - self.blob_tracker = blob_tracker self.payment_rate_manager = payment_rate_manager self.wallet = wallet self.query_identifiers = ['blob_data_payment_rate', 'requested_blob', 'requested_blobs'] @@ -50,7 +48,6 @@ class BlobRequestHandler(object): self.currently_uploading = None self.file_sender = None self.blob_bytes_uploaded = 0 - self.strategy = get_default_strategy(self.blob_tracker) self._blobs_requested = [] ######### IQueryHandler ######### @@ -73,8 +70,7 @@ class BlobRequestHandler(object): if self.query_identifiers[1] in queries: incoming = queries[self.query_identifiers[1]] - log.info("Request download: %s", str(incoming)) - response.addCallback(lambda r: self._reply_to_send_request({}, incoming)) + response.addCallback(lambda r: self._reply_to_send_request(r, incoming)) return response @@ -94,10 +90,6 @@ class BlobRequestHandler(object): ######### internal ######### - def _add_to_response(self, response, to_add): - - return response - def _reply_to_availability(self, request, blobs): d = self._get_available_blobs(blobs) @@ -111,25 +103,26 @@ class BlobRequestHandler(object): def open_blob_for_reading(self, blob, response): response_fields = {} + d = defer.succeed(None) if blob.is_validated(): read_handle = blob.open_for_reading() if read_handle is not None: self.currently_uploading = blob self.read_handle = read_handle - log.info("Sending %s to client", str(blob)) + log.debug("Sending %s to client", str(blob)) response_fields['blob_hash'] = blob.blob_hash response_fields['length'] = blob.length response['incoming_blob'] = response_fields - log.info(response) - return response, blob + d.addCallback(lambda _: self.record_transaction(blob)) + d.addCallback(lambda _: response) + return d log.warning("We can not send %s", str(blob)) response['error'] = "BLOB_UNAVAILABLE" - return response, blob - - def record_transaction(self, response, blob, rate): - d = self.blob_manager.add_blob_to_upload_history(str(blob), self.peer.host, rate) d.addCallback(lambda _: response) - log.info(response) + return d + + def record_transaction(self, blob): + d = self.blob_manager.add_blob_to_upload_history(str(blob), self.peer.host, self.blob_data_payment_rate) return d def _reply_to_send_request(self, response, incoming): @@ -142,18 +135,18 @@ class BlobRequestHandler(object): response['error'] = "RATE_UNSET" return defer.succeed(response) else: + log.debug("Requested blob: %s", str(incoming)) d = self.blob_manager.get_blob(incoming, True) d.addCallback(lambda blob: self.open_blob_for_reading(blob, response)) - d.addCallback(lambda (r, blob): self.record_transaction(r, blob, rate)) return d def reply_to_offer(self, offer, request): blobs = request.get("available_blobs", []) log.info("Offered rate %f/mb for %i blobs", offer.rate, len(blobs)) - reply = self.strategy.respond_to_offer(offer, self.peer, blobs) - if reply.accepted: - self.blob_data_payment_rate = reply.rate - r = Negotiate.make_dict_from_offer(reply) + accepted = self.payment_rate_manager.accept_rate_blob_data(self.peer, blobs, offer) + if accepted: + self.blob_data_payment_rate = offer.rate + r = Negotiate.make_dict_from_offer(offer) request.update(r) return request @@ -178,7 +171,7 @@ class BlobRequestHandler(object): def start_transfer(): self.file_sender = FileSender() - log.info("Starting the file upload") + log.debug("Starting the file upload") assert self.read_handle is not None, "self.read_handle was None when trying to start the transfer" d = self.file_sender.beginFileTransfer(self.read_handle, consumer, count_bytes) return d diff --git a/lbrynet/lbrynet_daemon/LBRYDaemon.py b/lbrynet/lbrynet_daemon/LBRYDaemon.py index e0d64b783..24967b5c6 100644 --- a/lbrynet/lbrynet_daemon/LBRYDaemon.py +++ b/lbrynet/lbrynet_daemon/LBRYDaemon.py @@ -673,7 +673,6 @@ class LBRYDaemon(jsonrpc.JSONRPC): # TODO: this was blatantly copied from jsonrpc_start_lbry_file. Be DRY. def _start_file(f): d = self.lbry_file_manager.toggle_lbry_file_running(f) - d.addCallback(lambda _: self.lighthouse_client.announce_sd(f.sd_hash)) return defer.succeed("Started LBRY file") def _get_and_start_file(name): @@ -775,15 +774,13 @@ class LBRYDaemon(jsonrpc.JSONRPC): # CryptBlobInfoQueryHandlerFactory(self.lbry_file_metadata_manager, self.session.wallet, # self._server_payment_rate_manager), # BlobAvailabilityHandlerFactory(self.session.blob_manager), - BlobRequestHandlerFactory(self.session.blob_manager, self.session.blob_tracker, self.session.wallet, - self.session.payment_rate_manager), + BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet, + self.session.payment_rate_manager), self.session.wallet.get_wallet_info_query_handler_factory(), ] def get_blob_request_handler_factory(rate): self.blob_request_payment_rate_manager = self.session.payment_rate_manager - handlers.append(BlobRequestHandlerFactory(self.session.blob_manager, self.session.blob_tracker, self.session.wallet, - self.blob_request_payment_rate_manager)) d1 = self.settings.get_server_data_payment_rate() d1.addCallback(get_blob_request_handler_factory) @@ -2552,7 +2549,6 @@ class LBRYDaemon(jsonrpc.JSONRPC): d = self._render_response(SEARCH_SERVERS, OK_CODE) return d - def jsonrpc_get_mean_availability(self): """ Get mean blob availability diff --git a/lbrynet/lbrynet_daemon/LBRYExchangeRateManager.py b/lbrynet/lbrynet_daemon/LBRYExchangeRateManager.py index e896c8b0f..17006e574 100644 --- a/lbrynet/lbrynet_daemon/LBRYExchangeRateManager.py +++ b/lbrynet/lbrynet_daemon/LBRYExchangeRateManager.py @@ -37,24 +37,34 @@ class MarketFeed(object): self._updater = LoopingCall(self._update_price) def _make_request(self): - r = requests.get(self.url, self.params) - return r.text + try: + r = requests.get(self.url, self.params) + return defer.succeed(r.text) + except Exception as err: + log.error(err) + return defer.fail(err) def _handle_response(self, response): return NotImplementedError def _subtract_fee(self, from_amount): + # increase amount to account for market fees return defer.succeed(from_amount / (1.0 - self.fee)) def _save_price(self, price): log.debug("Saving price update %f for %s" % (price, self.market)) self.rate = ExchangeRate(self.market, price, int(time.time())) + def _log_error(self, err): + log.error(err) + log.warning("There was a problem updating %s exchange rate information from %s", self.market, self.name) + def _update_price(self): - d = defer.succeed(self._make_request()) + d = self._make_request() d.addCallback(self._handle_response) d.addCallback(self._subtract_fee) d.addCallback(self._save_price) + d.addErrback(self._log_error) def start(self): if not self._updater.running: @@ -94,7 +104,11 @@ class GoogleBTCFeed(MarketFeed): ) def _make_request(self): - return googlefinance.getQuotes('CURRENCY:USDBTC')[0] + try: + r = googlefinance.getQuotes('CURRENCY:USDBTC')[0] + return defer.succeed(r) + except Exception as err: + return defer.fail(err) def _handle_response(self, response): return float(response['LastTradePrice'])