diff --git a/lbry/wallet/rpc/session.py b/lbry/wallet/rpc/session.py index 58d8a10fc..0589b3edb 100644 --- a/lbry/wallet/rpc/session.py +++ b/lbry/wallet/rpc/session.py @@ -206,7 +206,7 @@ class SessionBase(asyncio.Protocol): """ return self._address - def peer_address_str(self): + def peer_address_str(self, for_log=True): """Returns the peer's IP address and port as a human-readable string.""" if not self._address: @@ -483,11 +483,17 @@ class RPCSession(SessionBase): raise result return result - async def send_notification(self, method, args=()): + async def send_notification(self, method, args=()) -> bool: """Send an RPC notification over the network.""" message = self.connection.send_notification(Notification(method, args)) self.NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc() - await self._send_message(message) + try: + await self._send_message(message) + return True + except asyncio.TimeoutError: + self.logger.info("timeout sending address notification to %s", self.peer_address_str(for_log=True)) + self.abort() + return False def send_batch(self, raise_errors=False): """Return a BatchRequest. Intended to be used like so: diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 69a57a2eb..437b24f18 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -183,6 +183,7 @@ class BlockProcessor: self.state_lock = asyncio.Lock() self.search_cache = {} + self.history_cache = {} async def run_in_thread_with_lock(self, func, *args): # Run in a thread to prevent blocking. Shielded so that @@ -213,6 +214,7 @@ class BlockProcessor: await self.run_in_thread_with_lock(self.advance_blocks, blocks) for cache in self.search_cache.values(): cache.clear() + self.history_cache.clear() await self._maybe_flush() processed_time = time.perf_counter() - start self.block_count_metric.set(self.height) diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 186755b73..3e6bfb771 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -135,7 +135,7 @@ class SessionManager: "docker_tag": DOCKER_TAG, 'version': lbry.__version__, "min_version": util.version_string(VERSION.PROTOCOL_MIN), - "cpu_count": os.cpu_count() + "cpu_count": str(os.cpu_count()) }) session_count_metric = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE, labelnames=("version",)) @@ -177,7 +177,7 @@ class SessionManager: self.cur_group = SessionGroup(0) self.txs_sent = 0 self.start_time = time.time() - self.history_cache = pylru.lrucache(256) + self.history_cache = self.bp.history_cache self.notified_height: typing.Optional[int] = None # Cache some idea of room to avoid recounting on each subscription self.subs_room = 0 @@ -608,26 +608,20 @@ class SessionManager: async def limited_history(self, hashX): """A caching layer.""" - hc = self.history_cache - if hashX not in hc: + if hashX not in self.history_cache: # History DoS limit. Each element of history is about 99 # bytes when encoded as JSON. This limits resource usage # on bloated history requests, and uses a smaller divisor # so large requests are logged before refusing them. limit = self.env.max_send // 97 - hc[hashX] = await self.db.limited_history(hashX, limit=limit) - return hc[hashX] + self.history_cache[hashX] = await self.db.limited_history(hashX, limit=limit) + return self.history_cache[hashX] async def _notify_sessions(self, height, touched): """Notify sessions about height changes and touched addresses.""" height_changed = height != self.notified_height if height_changed: await self._refresh_hsub_results(height) - # Invalidate our history cache for touched hashXs - hc = self.history_cache - for hashX in set(hc).intersection(touched): - del hc[hashX] - if self.sessions: await asyncio.wait([ session.notify(touched, height_changed) for session in self.sessions @@ -927,13 +921,8 @@ class LBRYElectrumX(SessionBase): """ if height_changed and self.subscribe_headers: args = (await self.subscribe_headers_result(), ) - try: - await self.send_notification('blockchain.headers.subscribe', args) - except asyncio.TimeoutError: - self.logger.info("timeout sending headers notification to %s", self.peer_address_str(for_log=True)) - self.abort() + if not (await self.send_notification('blockchain.headers.subscribe', args)): return - touched = touched.intersection(self.hashX_subs) if touched or (height_changed and self.mempool_statuses): changed = {} @@ -960,14 +949,7 @@ class LBRYElectrumX(SessionBase): method = 'blockchain.scripthash.subscribe' else: method = 'blockchain.address.subscribe' - - try: - await self.send_notification(method, (alias, status)) - except asyncio.TimeoutError: - self.logger.info("timeout sending address notification to %s", self.peer_address_str(for_log=True)) - self.abort() - return - + asyncio.create_task(self.send_notification(method, (alias, status))) if changed: es = '' if len(changed) == 1 else 'es' self.logger.info(f'notified of {len(changed):,d} address{es}') @@ -1180,6 +1162,7 @@ class LBRYElectrumX(SessionBase): """ # Note history is ordered and mempool unordered in electrum-server # For mempool, height is -1 if it has unconfirmed inputs, otherwise 0 + db_history = await self.session_mgr.limited_history(hashX) mempool = await self.mempool.transaction_summaries(hashX)