From 9d44bbdb48a7c23037c04a6636a45912842a1285 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 4 Jun 2020 09:25:41 -0400 Subject: [PATCH] don't block the notification loop on sending the notifications --- lbry/wallet/rpc/session.py | 12 +++++++++--- lbry/wallet/server/session.py | 17 +++-------------- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/lbry/wallet/rpc/session.py b/lbry/wallet/rpc/session.py index 58d8a10fc..0589b3edb 100644 --- a/lbry/wallet/rpc/session.py +++ b/lbry/wallet/rpc/session.py @@ -206,7 +206,7 @@ class SessionBase(asyncio.Protocol): """ return self._address - def peer_address_str(self): + def peer_address_str(self, for_log=True): """Returns the peer's IP address and port as a human-readable string.""" if not self._address: @@ -483,11 +483,17 @@ class RPCSession(SessionBase): raise result return result - async def send_notification(self, method, args=()): + async def send_notification(self, method, args=()) -> bool: """Send an RPC notification over the network.""" message = self.connection.send_notification(Notification(method, args)) self.NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc() - await self._send_message(message) + try: + await self._send_message(message) + return True + except asyncio.TimeoutError: + self.logger.info("timeout sending address notification to %s", self.peer_address_str(for_log=True)) + self.abort() + return False def send_batch(self, raise_errors=False): """Return a BatchRequest. Intended to be used like so: diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 55820fdda..3e6bfb771 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -921,13 +921,8 @@ class LBRYElectrumX(SessionBase): """ if height_changed and self.subscribe_headers: args = (await self.subscribe_headers_result(), ) - try: - await self.send_notification('blockchain.headers.subscribe', args) - except asyncio.TimeoutError: - self.logger.info("timeout sending headers notification to %s", self.peer_address_str(for_log=True)) - self.abort() + if not (await self.send_notification('blockchain.headers.subscribe', args)): return - touched = touched.intersection(self.hashX_subs) if touched or (height_changed and self.mempool_statuses): changed = {} @@ -954,14 +949,7 @@ class LBRYElectrumX(SessionBase): method = 'blockchain.scripthash.subscribe' else: method = 'blockchain.address.subscribe' - - try: - await self.send_notification(method, (alias, status)) - except asyncio.TimeoutError: - self.logger.info("timeout sending address notification to %s", self.peer_address_str(for_log=True)) - self.abort() - return - + asyncio.create_task(self.send_notification(method, (alias, status))) if changed: es = '' if len(changed) == 1 else 'es' self.logger.info(f'notified of {len(changed):,d} address{es}') @@ -1174,6 +1162,7 @@ class LBRYElectrumX(SessionBase): """ # Note history is ordered and mempool unordered in electrum-server # For mempool, height is -1 if it has unconfirmed inputs, otherwise 0 + db_history = await self.session_mgr.limited_history(hashX) mempool = await self.mempool.transaction_summaries(hashX)