From 5728493abb342ee5d6bdf35000ac5b6010e0171f Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 4 Aug 2019 19:09:40 -0300 Subject: [PATCH 01/13] refactor basenetwork so each session takes care of itself --- .../test_wallet_server_sessions.py | 1 - torba/torba/client/basenetwork.py | 178 ++++++++---------- 2 files changed, 76 insertions(+), 103 deletions(-) diff --git a/lbry/tests/integration/test_wallet_server_sessions.py b/lbry/tests/integration/test_wallet_server_sessions.py index b2e9c779d..451006d48 100644 --- a/lbry/tests/integration/test_wallet_server_sessions.py +++ b/lbry/tests/integration/test_wallet_server_sessions.py @@ -22,7 +22,6 @@ class TestSessionBloat(IntegrationTestCase): await self.conductor.start_spv() session = ClientSession(network=None, server=self.ledger.network.client.server, timeout=0.2) await session.create_connection() - session.ping_task.cancel() await session.send_request('server.banner', ()) self.assertEqual(len(self.conductor.spv_node.server.session_mgr.sessions), 1) self.assertFalse(session.is_closing()) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index 25c858dc4..7ba0fb215 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -1,9 +1,7 @@ import logging import asyncio -from asyncio import CancelledError +from typing import Dict from time import time -from typing import List -import socket from torba.rpc import RPCSession as BaseClientSession, Connector, RPCError @@ -24,11 +22,20 @@ class ClientSession(BaseClientSession): self.bw_limit = self.framer.max_size = self.max_errors = 1 << 32 self.timeout = timeout self.max_seconds_idle = timeout * 2 - self.ping_task = None + self.latency = 1 << 32 + + @property + def available(self): + return not self.is_closing() and self._can_send.is_set() async def send_request(self, method, args=()): try: - return await asyncio.wait_for(super().send_request(method, args), timeout=self.timeout) + start = time() + result = await asyncio.wait_for( + super().send_request(method, args), timeout=self.timeout + ) + self.latency = time() - start + return result except RPCError as e: log.warning("Wallet server returned an error. Code: %s Message: %s", *e.args) raise e @@ -36,21 +43,29 @@ class ClientSession(BaseClientSession): self.abort() raise - async def ping_forever(self): + async def ensure_session(self): + # Handles reconnecting and maintaining a session alive # TODO: change to 'ping' on newer protocol (above 1.2) - while not self.is_closing(): - if (time() - self.last_send) > self.max_seconds_idle: - try: + retry_delay = 1.0 + while True: + try: + if self.is_closing(): + await self.create_connection(self.timeout) + await self.ensure_server_version() + if (time() - self.last_send) > self.max_seconds_idle: await self.send_request('server.banner') - except: - self.abort() - raise - await asyncio.sleep(self.max_seconds_idle//3) + retry_delay = 1.0 + except asyncio.TimeoutError: + log.warning("Wallet server timeout (retry in %s seconds): %s:%d", retry_delay, *self.server) + retry_delay = max(60, retry_delay * 2) + await asyncio.sleep(retry_delay) + + def ensure_server_version(self, required='1.2'): + return self.send_request('server.version', [__version__, required]) async def create_connection(self, timeout=6): connector = Connector(lambda: self, *self.server) await asyncio.wait_for(connector.create_connection(), timeout=timeout) - self.ping_task = asyncio.create_task(self.ping_forever()) async def handle_request(self, request): controller = self.network.subscription_controllers[request.method] @@ -58,9 +73,8 @@ class ClientSession(BaseClientSession): def connection_lost(self, exc): super().connection_lost(exc) + self.latency = 1 << 32 self._on_disconnect_controller.add(True) - if self.ping_task: - self.ping_task.cancel() class BaseNetwork: @@ -92,26 +106,20 @@ class BaseNetwork: self.session_pool = SessionPool(network=self, timeout=connect_timeout) self.session_pool.start(self.config['default_servers']) self.on_header.listen(self._update_remote_height) - while True: + while self.running: try: - self.client = await self.pick_fastest_session() - if self.is_connected: - await self.ensure_server_version() - self._update_remote_height((await self.subscribe_headers(),)) - log.info("Successfully connected to SPV wallet server: %s:%d", *self.client.server) - self._on_connected_controller.add(True) - await self.client.on_disconnected.first - except CancelledError: - self.running = False + self.client = await self.session_pool.wait_for_fastest_session() + self._update_remote_height((await self.subscribe_headers(),)) + log.info("Successfully connected to SPV wallet server: %s:%d", *self.client.server) + self._on_connected_controller.add(True) + await self.client.on_disconnected.first + except asyncio.CancelledError: + await self.stop() + raise except asyncio.TimeoutError: - log.warning("Timed out while trying to find a server!") + pass except Exception: # pylint: disable=broad-except log.exception("Exception while trying to find a server!") - if not self.running: - return - elif self.client: - await self.client.close() - self.client.connection.cancel_pending_requests() async def stop(self): self.running = False @@ -124,25 +132,15 @@ class BaseNetwork: @property def is_connected(self): - return self.client is not None and not self.client.is_closing() + return self.session_pool.online - def rpc(self, list_or_method, args): + async def rpc(self, list_or_method, args): if self.is_connected: - return self.client.send_request(list_or_method, args) + await self.session_pool.wait_for_fastest_session() + return await self.session_pool.fastest_session.send_request(list_or_method, args) else: raise ConnectionError("Attempting to send rpc request when connection is not available.") - async def pick_fastest_session(self): - sessions = await self.session_pool.get_online_sessions() - done, pending = await asyncio.wait([ - self.probe_session(session) - for session in sessions if not session.is_closing() - ], return_when='FIRST_COMPLETED') - for task in pending: - task.cancel() - for session in done: - return await session - async def probe_session(self, session: ClientSession): await session.send_request('server.banner') return session @@ -150,9 +148,6 @@ class BaseNetwork: def _update_remote_height(self, header_args): self.remote_height = header_args[0]["height"] - def ensure_server_version(self, required='1.2'): - return self.rpc('server.version', [__version__, required]) - def broadcast(self, raw_transaction): return self.rpc('blockchain.transaction.broadcast', [raw_transaction]) @@ -172,83 +167,62 @@ class BaseNetwork: return self.rpc('blockchain.block.headers', [height, count]) def subscribe_headers(self): - return self.rpc('blockchain.headers.subscribe', [True]) + return self.client.send_request('blockchain.headers.subscribe', [True]) def subscribe_address(self, address): - return self.rpc('blockchain.address.subscribe', [address]) + return self.client.send_request('blockchain.address.subscribe', [address]) class SessionPool: def __init__(self, network: BaseNetwork, timeout: float): self.network = network - self.sessions: List[ClientSession] = [] - self._dead_servers: List[ClientSession] = [] + self.sessions: Dict[ClientSession, asyncio.Task] = dict() self.maintain_connections_task = None self.timeout = timeout - # triggered when the master server is out, to speed up reconnect - self._lost_master = asyncio.Event() @property def online(self): - for session in self.sessions: - if not session.is_closing(): - return True - return False + return any(not session.is_closing() for session in self.sessions) + + @property + def available_sessions(self): + return [session for session in self.sessions if session.available] + + @property + def fastest_session(self): + if not self.available_sessions: + return None + return min([(session.latency, session) for session in self.available_sessions])[1] def start(self, default_servers): - self.sessions = [ - ClientSession(network=self.network, server=server) - for server in default_servers - ] + for server in default_servers: + session = ClientSession(network=self.network, server=server) + self.sessions[session] = asyncio.create_task(session.ensure_session()) self.maintain_connections_task = asyncio.create_task(self.ensure_connections()) def stop(self): if self.maintain_connections_task: self.maintain_connections_task.cancel() - for session in self.sessions: + self.maintain_connections_task = None + for session, maintenance_task in self.sessions.items(): + maintenance_task.cancel() if not session.is_closing(): session.abort() - self.sessions, self._dead_servers, self.maintain_connections_task = [], [], None + self.sessions.clear() async def ensure_connections(self): while True: - await asyncio.gather(*[ - self.ensure_connection(session) - for session in self.sessions - ], return_exceptions=True) - try: - await asyncio.wait_for(self._lost_master.wait(), timeout=3) - except asyncio.TimeoutError: - pass - self._lost_master.clear() - if not self.sessions: - self.sessions.extend(self._dead_servers) - self._dead_servers = [] + log.info("Checking conns") + for session, task in list(self.sessions.items()): + if task.done(): + self.sessions[session] = asyncio.create_task(session.ensure_session()) + await asyncio.wait(self.sessions.items(), timeout=10) - async def ensure_connection(self, session): - self._dead_servers.append(session) - self.sessions.remove(session) - try: - if session.is_closing(): - await session.create_connection(self.timeout) - await asyncio.wait_for(session.send_request('server.banner'), timeout=self.timeout) - self.sessions.append(session) - self._dead_servers.remove(session) - except asyncio.TimeoutError: - log.warning("Timeout connecting to %s:%d", *session.server) - except asyncio.CancelledError: # pylint: disable=try-except-raise - raise - except socket.gaierror: - log.warning("Could not resolve IP for %s", session.server[0]) - except Exception as err: # pylint: disable=broad-except - if 'Connect call failed' in str(err): - log.warning("Could not connect to %s:%d", *session.server) + async def wait_for_fastest_session(self): + while True: + fastest = self.fastest_session + if fastest: + return fastest else: - log.exception("Connecting to %s:%d raised an exception:", *session.server) - - async def get_online_sessions(self): - while not self.online: - self._lost_master.set() - await asyncio.sleep(0.5) - return self.sessions + await asyncio.sleep(0.5) From e78e2738ef7d97c15b233674210c91f2e16f725e Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 6 Aug 2019 02:17:39 -0300 Subject: [PATCH 02/13] basenet refactor --- .../client_tests/integration/test_network.py | 21 ++++-- torba/torba/client/basenetwork.py | 73 +++++++++---------- 2 files changed, 48 insertions(+), 46 deletions(-) diff --git a/torba/tests/client_tests/integration/test_network.py b/torba/tests/client_tests/integration/test_network.py index b5b4643f1..5be044cef 100644 --- a/torba/tests/client_tests/integration/test_network.py +++ b/torba/tests/client_tests/integration/test_network.py @@ -22,9 +22,11 @@ class ReconnectTests(IntegrationTestCase): async def test_connection_drop_still_receives_events_after_reconnected(self): address1 = await self.account.receiving.get_or_create_usable_address() + # disconnect and send a new tx, should reconnect and get it self.ledger.network.client.connection_lost(Exception()) + self.assertFalse(self.ledger.network.is_connected) sendtxid = await self.blockchain.send_to_address(address1, 1.1337) - await self.on_transaction_id(sendtxid) # mempool + await asyncio.wait_for(self.on_transaction_id(sendtxid), 1.0) # mempool await self.blockchain.generate(1) await self.on_transaction_id(sendtxid) # confirmed @@ -45,8 +47,10 @@ class ReconnectTests(IntegrationTestCase): await self.ledger.network.get_transaction(sendtxid) async def test_timeout_then_reconnect(self): + # tests that it connects back after some failed attempts await self.conductor.spv_node.stop() self.assertFalse(self.ledger.network.is_connected) + await asyncio.sleep(0.2) # let it retry and fail once await self.conductor.spv_node.start(self.conductor.blockchain_node) await self.ledger.network.on_connected.first self.assertTrue(self.ledger.network.is_connected) @@ -79,9 +83,9 @@ class ServerPickingTestCase(AsyncioTestCase): await self._make_bad_server(), ('localhost', 1), ('example.that.doesnt.resolve', 9000), - await self._make_fake_server(latency=1.2, port=1340), - await self._make_fake_server(latency=0.5, port=1337), - await self._make_fake_server(latency=0.7, port=1339), + await self._make_fake_server(latency=1.0, port=1340), + await self._make_fake_server(latency=0.1, port=1337), + await self._make_fake_server(latency=0.4, port=1339), ], 'connect_timeout': 3 }) @@ -89,9 +93,10 @@ class ServerPickingTestCase(AsyncioTestCase): network = BaseNetwork(ledger) self.addCleanup(network.stop) asyncio.ensure_future(network.start()) - await asyncio.wait_for(network.on_connected.first, timeout=3) + await asyncio.wait_for(network.on_connected.first, timeout=1) self.assertTrue(network.is_connected) self.assertEqual(network.client.server, ('127.0.0.1', 1337)) - # ensure we are connected to all of them - self.assertTrue(all([not session.is_closing() for session in network.session_pool.sessions])) - self.assertEqual(len(network.session_pool.sessions), 3) + self.assertTrue(all([not session.is_closing() for session in network.session_pool.available_sessions])) + # ensure we are connected to all of them after a while + await asyncio.sleep(1) + self.assertEqual(len(network.session_pool.available_sessions), 3) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index 7ba0fb215..69e3eedb6 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -1,6 +1,7 @@ import logging import asyncio -from typing import Dict +from operator import itemgetter +from typing import Dict, Optional from time import time from torba.rpc import RPCSession as BaseClientSession, Connector, RPCError @@ -26,7 +27,7 @@ class ClientSession(BaseClientSession): @property def available(self): - return not self.is_closing() and self._can_send.is_set() + return not self.is_closing() and self._can_send.is_set() and self.latency < 1 << 32 async def send_request(self, method, args=()): try: @@ -39,14 +40,11 @@ class ClientSession(BaseClientSession): except RPCError as e: log.warning("Wallet server returned an error. Code: %s Message: %s", *e.args) raise e - except asyncio.TimeoutError: - self.abort() - raise async def ensure_session(self): # Handles reconnecting and maintaining a session alive # TODO: change to 'ping' on newer protocol (above 1.2) - retry_delay = 1.0 + retry_delay = default_delay = 0.1 while True: try: if self.is_closing(): @@ -54,10 +52,11 @@ class ClientSession(BaseClientSession): await self.ensure_server_version() if (time() - self.last_send) > self.max_seconds_idle: await self.send_request('server.banner') - retry_delay = 1.0 - except asyncio.TimeoutError: + retry_delay = default_delay + except (asyncio.TimeoutError, OSError): + await self.close() + retry_delay = min(60, retry_delay * 2) log.warning("Wallet server timeout (retry in %s seconds): %s:%d", retry_delay, *self.server) - retry_delay = max(60, retry_delay * 2) await asyncio.sleep(retry_delay) def ensure_server_version(self, required='1.2'): @@ -80,9 +79,10 @@ class ClientSession(BaseClientSession): class BaseNetwork: def __init__(self, ledger): + self.switch_event = asyncio.Event() self.config = ledger.config - self.client: ClientSession = None - self.session_pool: SessionPool = None + self.session_pool = SessionPool(network=self, timeout=self.config.get('connect_timeout', 6)) + self.client: Optional[ClientSession] = None self.running = False self.remote_height: int = 0 @@ -102,8 +102,6 @@ class BaseNetwork: async def start(self): self.running = True - connect_timeout = self.config.get('connect_timeout', 6) - self.session_pool = SessionPool(network=self, timeout=connect_timeout) self.session_pool.start(self.config['default_servers']) self.on_header.listen(self._update_remote_height) while self.running: @@ -112,7 +110,9 @@ class BaseNetwork: self._update_remote_height((await self.subscribe_headers(),)) log.info("Successfully connected to SPV wallet server: %s:%d", *self.client.server) self._on_connected_controller.add(True) - await self.client.on_disconnected.first + self.client.on_disconnected.listen(lambda _: self.switch_event.set()) + await self.switch_event.wait() + self.switch_event.clear() except asyncio.CancelledError: await self.stop() raise @@ -132,12 +132,14 @@ class BaseNetwork: @property def is_connected(self): - return self.session_pool.online + return self.client and not self.client.is_closing() - async def rpc(self, list_or_method, args): + def rpc(self, list_or_method, args): if self.is_connected: - await self.session_pool.wait_for_fastest_session() - return await self.session_pool.fastest_session.send_request(list_or_method, args) + fastest = self.session_pool.fastest_session + if self.client != fastest: + self.switch_event.set() + return self.client.send_request(list_or_method, args) else: raise ConnectionError("Attempting to send rpc request when connection is not available.") @@ -178,7 +180,6 @@ class SessionPool: def __init__(self, network: BaseNetwork, timeout: float): self.network = network self.sessions: Dict[ClientSession, asyncio.Task] = dict() - self.maintain_connections_task = None self.timeout = timeout @property @@ -193,31 +194,27 @@ class SessionPool: def fastest_session(self): if not self.available_sessions: return None - return min([(session.latency, session) for session in self.available_sessions])[1] + return min([(session.latency, session) for session in self.available_sessions], key=itemgetter(0))[1] def start(self, default_servers): - for server in default_servers: - session = ClientSession(network=self.network, server=server) - self.sessions[session] = asyncio.create_task(session.ensure_session()) - self.maintain_connections_task = asyncio.create_task(self.ensure_connections()) + self.sessions = { + ClientSession(network=self.network, server=server): None for server in default_servers + } + self.ensure_connections() def stop(self): - if self.maintain_connections_task: - self.maintain_connections_task.cancel() - self.maintain_connections_task = None - for session, maintenance_task in self.sessions.items(): - maintenance_task.cancel() - if not session.is_closing(): - session.abort() + for session, task in self.sessions.items(): + task.cancel() + session.abort() self.sessions.clear() - async def ensure_connections(self): - while True: - log.info("Checking conns") - for session, task in list(self.sessions.items()): - if task.done(): - self.sessions[session] = asyncio.create_task(session.ensure_session()) - await asyncio.wait(self.sessions.items(), timeout=10) + def ensure_connections(self): + for session, task in list(self.sessions.items()): + if not task or task.done(): + task = asyncio.create_task(session.ensure_session()) + task.add_done_callback(lambda _: self.ensure_connections()) + self.sessions[session] = task + async def wait_for_fastest_session(self): while True: From 8fce374caeded4bfe749571b57ab0b907d5af997 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 6 Aug 2019 02:18:07 -0300 Subject: [PATCH 03/13] race condition where the request is sent and connection lost right after --- torba/torba/rpc/session.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/torba/torba/rpc/session.py b/torba/torba/rpc/session.py index c8b8c6945..049cd22ed 100644 --- a/torba/torba/rpc/session.py +++ b/torba/torba/rpc/session.py @@ -473,6 +473,8 @@ class RPCSession(SessionBase): async def send_request(self, method, args=()): """Send an RPC request over the network.""" + if self.is_closing(): + raise CancelledError() message, event = self.connection.send_request(Request(method, args)) await self._send_message(message) await event.wait() From 022d60ec2c0f1340d2c0cb13e20f530c3367cab7 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 6 Aug 2019 02:54:11 -0300 Subject: [PATCH 04/13] both exceptions can be raised, depending on when the call happens --- lbry/tests/integration/test_wallet_server_sessions.py | 2 +- torba/tests/client_tests/integration/test_network.py | 2 +- torba/torba/client/basenetwork.py | 4 ++-- torba/torba/rpc/session.py | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/lbry/tests/integration/test_wallet_server_sessions.py b/lbry/tests/integration/test_wallet_server_sessions.py index 451006d48..5234b990c 100644 --- a/lbry/tests/integration/test_wallet_server_sessions.py +++ b/lbry/tests/integration/test_wallet_server_sessions.py @@ -26,7 +26,7 @@ class TestSessionBloat(IntegrationTestCase): self.assertEqual(len(self.conductor.spv_node.server.session_mgr.sessions), 1) self.assertFalse(session.is_closing()) await asyncio.sleep(1.1) - with self.assertRaises(asyncio.TimeoutError): + with self.assertRaises((asyncio.TimeoutError, asyncio.CancelledError)): await session.send_request('server.banner', ()) self.assertTrue(session.is_closing()) self.assertEqual(len(self.conductor.spv_node.server.session_mgr.sessions), 0) diff --git a/torba/tests/client_tests/integration/test_network.py b/torba/tests/client_tests/integration/test_network.py index 5be044cef..fb901196a 100644 --- a/torba/tests/client_tests/integration/test_network.py +++ b/torba/tests/client_tests/integration/test_network.py @@ -35,7 +35,7 @@ class ReconnectTests(IntegrationTestCase): d = self.ledger.network.get_transaction(sendtxid) # what's that smoke on my ethernet cable? oh no! self.ledger.network.client.connection_lost(Exception()) - with self.assertRaises(asyncio.CancelledError): + with self.assertRaises((asyncio.TimeoutError, asyncio.CancelledError)): await d # rich but offline? no way, no water, let's retry with self.assertRaisesRegex(ConnectionError, 'connection is not available'): diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index 69e3eedb6..92f163f88 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -169,10 +169,10 @@ class BaseNetwork: return self.rpc('blockchain.block.headers', [height, count]) def subscribe_headers(self): - return self.client.send_request('blockchain.headers.subscribe', [True]) + return self.rpc('blockchain.headers.subscribe', [True]) def subscribe_address(self, address): - return self.client.send_request('blockchain.address.subscribe', [address]) + return self.rpc('blockchain.address.subscribe', [address]) class SessionPool: diff --git a/torba/torba/rpc/session.py b/torba/torba/rpc/session.py index 049cd22ed..2de40b88c 100644 --- a/torba/torba/rpc/session.py +++ b/torba/torba/rpc/session.py @@ -474,7 +474,7 @@ class RPCSession(SessionBase): async def send_request(self, method, args=()): """Send an RPC request over the network.""" if self.is_closing(): - raise CancelledError() + raise asyncio.TimeoutError() message, event = self.connection.send_request(Request(method, args)) await self._send_message(message) await event.wait() From f04c5ee902383f3d14d704a6fc51c7ac5d789d07 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 6 Aug 2019 17:36:55 -0300 Subject: [PATCH 05/13] fix test reliability --- torba/tests/client_tests/integration/test_network.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/torba/tests/client_tests/integration/test_network.py b/torba/tests/client_tests/integration/test_network.py index fb901196a..cc5aef955 100644 --- a/torba/tests/client_tests/integration/test_network.py +++ b/torba/tests/client_tests/integration/test_network.py @@ -43,6 +43,8 @@ class ReconnectTests(IntegrationTestCase): # * goes to pick some water outside... * time passes by and another donation comes in sendtxid = await self.blockchain.send_to_address(address1, 42) await self.blockchain.generate(1) + # (this is just so the test doesnt hang forever if it doesnt reconnect, also its not instant yet) + await asyncio.wait_for(self.ledger.network.on_connected.first, timeout=1.0) # omg, the burned cable still works! torba is fire proof! await self.ledger.network.get_transaction(sendtxid) From 822f53c88813a40467f332529cec81624c9eb795 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 7 Aug 2019 01:56:28 -0300 Subject: [PATCH 06/13] remove unused things --- torba/torba/client/basenetwork.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index 92f163f88..3ce706357 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -108,7 +108,7 @@ class BaseNetwork: try: self.client = await self.session_pool.wait_for_fastest_session() self._update_remote_height((await self.subscribe_headers(),)) - log.info("Successfully connected to SPV wallet server: %s:%d", *self.client.server) + log.info("Switching to SPV wallet server: %s:%d", *self.client.server) self._on_connected_controller.add(True) self.client.on_disconnected.listen(lambda _: self.switch_event.set()) await self.switch_event.wait() @@ -118,8 +118,6 @@ class BaseNetwork: raise except asyncio.TimeoutError: pass - except Exception: # pylint: disable=broad-except - log.exception("Exception while trying to find a server!") async def stop(self): self.running = False @@ -143,10 +141,6 @@ class BaseNetwork: else: raise ConnectionError("Attempting to send rpc request when connection is not available.") - async def probe_session(self, session: ClientSession): - await session.send_request('server.banner') - return session - def _update_remote_height(self, header_args): self.remote_height = header_args[0]["height"] @@ -215,7 +209,6 @@ class SessionPool: task.add_done_callback(lambda _: self.ensure_connections()) self.sessions[session] = task - async def wait_for_fastest_session(self): while True: fastest = self.fastest_session From d1f5b25418864ff8b0ec1ce0e5437863f660fb31 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 7 Aug 2019 02:48:40 -0300 Subject: [PATCH 07/13] merge repeated subscription events --- .../unit/test_stream_controller.py | 19 +++++++++++++++++++ torba/torba/client/basenetwork.py | 4 ++-- torba/torba/stream.py | 8 ++++++-- 3 files changed, 27 insertions(+), 4 deletions(-) create mode 100644 torba/tests/client_tests/unit/test_stream_controller.py diff --git a/torba/tests/client_tests/unit/test_stream_controller.py b/torba/tests/client_tests/unit/test_stream_controller.py new file mode 100644 index 000000000..f82ab699f --- /dev/null +++ b/torba/tests/client_tests/unit/test_stream_controller.py @@ -0,0 +1,19 @@ +import unittest +from torba.stream import StreamController + +class StreamControllerTestCase(unittest.TestCase): + def test_non_unique_events(self): + events = [] + controller = StreamController() + controller.stream.listen(on_data=events.append) + controller.add("yo") + controller.add("yo") + self.assertEqual(events, ["yo", "yo"]) + + def test_unique_events(self): + events = [] + controller = StreamController(merge_repeated_events=True) + controller.stream.listen(on_data=events.append) + controller.add("yo") + controller.add("yo") + self.assertEqual(events, ["yo"]) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index 3ce706357..a7fad05c0 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -89,10 +89,10 @@ class BaseNetwork: self._on_connected_controller = StreamController() self.on_connected = self._on_connected_controller.stream - self._on_header_controller = StreamController() + self._on_header_controller = StreamController(merge_repeated_events=True) self.on_header = self._on_header_controller.stream - self._on_status_controller = StreamController() + self._on_status_controller = StreamController(merge_repeated_events=True) self.on_status = self._on_status_controller.stream self.subscription_controllers = { diff --git a/torba/torba/stream.py b/torba/torba/stream.py index 40589ade0..412a94525 100644 --- a/torba/torba/stream.py +++ b/torba/torba/stream.py @@ -45,10 +45,12 @@ class BroadcastSubscription: class StreamController: - def __init__(self): + def __init__(self, merge_repeated_events=False): self.stream = Stream(self) self._first_subscription = None self._last_subscription = None + self._last_event = None + self._merge_repeated = merge_repeated_events @property def has_listener(self): @@ -76,8 +78,10 @@ class StreamController: return f def add(self, event): + skip = self._merge_repeated and event == self._last_event + self._last_event = event return self._notify_and_ensure_future( - lambda subscription: subscription._add(event) + lambda subscription: None if skip else subscription._add(event) ) def add_error(self, exception): From cf924373f6bcef9ee0e9bde05eccd08694e6d609 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 7 Aug 2019 11:27:25 -0300 Subject: [PATCH 08/13] make stream test an asyncio test --- torba/tests/client_tests/unit/test_stream_controller.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/torba/tests/client_tests/unit/test_stream_controller.py b/torba/tests/client_tests/unit/test_stream_controller.py index f82ab699f..70c20fd3a 100644 --- a/torba/tests/client_tests/unit/test_stream_controller.py +++ b/torba/tests/client_tests/unit/test_stream_controller.py @@ -1,7 +1,8 @@ -import unittest from torba.stream import StreamController +from torba.testcase import AsyncioTestCase -class StreamControllerTestCase(unittest.TestCase): + +class StreamControllerTestCase(AsyncioTestCase): def test_non_unique_events(self): events = [] controller = StreamController() From 4a749f6c3834c4d5344b7d36dd8d02aa8ff3f791 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 7 Aug 2019 12:47:13 -0300 Subject: [PATCH 09/13] on timeout, restore latency to max --- torba/torba/client/basenetwork.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index a7fad05c0..fe166ba4f 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -40,6 +40,9 @@ class ClientSession(BaseClientSession): except RPCError as e: log.warning("Wallet server returned an error. Code: %s Message: %s", *e.args) raise e + except TimeoutError: + self.latency = 1 << 32 + raise async def ensure_session(self): # Handles reconnecting and maintaining a session alive @@ -50,7 +53,7 @@ class ClientSession(BaseClientSession): if self.is_closing(): await self.create_connection(self.timeout) await self.ensure_server_version() - if (time() - self.last_send) > self.max_seconds_idle: + if (time() - self.last_send) > self.max_seconds_idle or self.latency == 1 << 32: await self.send_request('server.banner') retry_delay = default_delay except (asyncio.TimeoutError, OSError): From 9ee2f30df47ac6d5dc87f6b1f04e3a245c3219dd Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 8 Aug 2019 04:48:52 -0300 Subject: [PATCH 10/13] timeout instead of cancel + minor fixes --- torba/tests/client_tests/integration/test_network.py | 2 ++ torba/torba/client/basenetwork.py | 7 ++++--- torba/torba/rpc/jsonrpc.py | 7 ++++--- torba/torba/rpc/session.py | 2 +- 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/torba/tests/client_tests/integration/test_network.py b/torba/tests/client_tests/integration/test_network.py index cc5aef955..e4465dff9 100644 --- a/torba/tests/client_tests/integration/test_network.py +++ b/torba/tests/client_tests/integration/test_network.py @@ -29,6 +29,7 @@ class ReconnectTests(IntegrationTestCase): await asyncio.wait_for(self.on_transaction_id(sendtxid), 1.0) # mempool await self.blockchain.generate(1) await self.on_transaction_id(sendtxid) # confirmed + self.assertLess(self.ledger.network.client.latency, 1) # latency properly set lower, we are fine await self.assertBalance(self.account, '1.1337') # is it real? are we rich!? let me see this tx... @@ -37,6 +38,7 @@ class ReconnectTests(IntegrationTestCase): self.ledger.network.client.connection_lost(Exception()) with self.assertRaises((asyncio.TimeoutError, asyncio.CancelledError)): await d + self.assertGreater(self.ledger.network.client.latency, 1000) # latency skyrockets as it failed # rich but offline? no way, no water, let's retry with self.assertRaisesRegex(ConnectionError, 'connection is not available'): await self.ledger.network.get_transaction(sendtxid) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index fe166ba4f..176740a15 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -74,6 +74,7 @@ class ClientSession(BaseClientSession): controller.add(request.args) def connection_lost(self, exc): + log.debug("Connection lost: %s:%d", *self.server) super().connection_lost(exc) self.latency = 1 << 32 self._on_disconnect_controller.add(True) @@ -136,10 +137,10 @@ class BaseNetwork: return self.client and not self.client.is_closing() def rpc(self, list_or_method, args): + fastest = self.session_pool.fastest_session + if fastest is not None and self.client != fastest: + self.switch_event.set() if self.is_connected: - fastest = self.session_pool.fastest_session - if self.client != fastest: - self.switch_event.set() return self.client.send_request(list_or_method, args) else: raise ConnectionError("Attempting to send rpc request when connection is not available.") diff --git a/torba/torba/rpc/jsonrpc.py b/torba/torba/rpc/jsonrpc.py index 5e908cd02..282b20df0 100644 --- a/torba/torba/rpc/jsonrpc.py +++ b/torba/torba/rpc/jsonrpc.py @@ -745,9 +745,10 @@ class JSONRPCConnection(object): self._protocol = item return self.receive_message(message) - def cancel_pending_requests(self): - """Cancel all pending requests.""" - exception = CancelledError() + def time_out_pending_requests(self): + """Times out all pending requests.""" + # this used to be CancelledError, but thats confusing as in are we closing the whole sdk or failing? + exception = TimeoutError() for request, event in self._requests.values(): event.result = exception event.set() diff --git a/torba/torba/rpc/session.py b/torba/torba/rpc/session.py index 2de40b88c..4e0295cdb 100644 --- a/torba/torba/rpc/session.py +++ b/torba/torba/rpc/session.py @@ -456,7 +456,7 @@ class RPCSession(SessionBase): def connection_lost(self, exc): # Cancel pending requests and message processing - self.connection.cancel_pending_requests() + self.connection.time_out_pending_requests() super().connection_lost(exc) # External API From 4ead92cfbe258fe394f1d29548cda70b3421fa96 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 12 Aug 2019 13:32:20 -0300 Subject: [PATCH 11/13] fixes from review --- .../client_tests/integration/test_network.py | 5 ++-- torba/torba/client/basenetwork.py | 24 +++++++++++-------- torba/torba/rpc/jsonrpc.py | 3 ++- torba/torba/rpc/session.py | 2 +- 4 files changed, 20 insertions(+), 14 deletions(-) diff --git a/torba/tests/client_tests/integration/test_network.py b/torba/tests/client_tests/integration/test_network.py index e4465dff9..d4a9c5d4b 100644 --- a/torba/tests/client_tests/integration/test_network.py +++ b/torba/tests/client_tests/integration/test_network.py @@ -45,8 +45,9 @@ class ReconnectTests(IntegrationTestCase): # * goes to pick some water outside... * time passes by and another donation comes in sendtxid = await self.blockchain.send_to_address(address1, 42) await self.blockchain.generate(1) - # (this is just so the test doesnt hang forever if it doesnt reconnect, also its not instant yet) - await asyncio.wait_for(self.ledger.network.on_connected.first, timeout=1.0) + # (this is just so the test doesnt hang forever if it doesnt reconnect) + if not self.ledger.network.is_connected: + await asyncio.wait_for(self.ledger.network.on_connected.first, timeout=1.0) # omg, the burned cable still works! torba is fire proof! await self.ledger.network.get_transaction(sendtxid) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index 176740a15..f145907a5 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -2,7 +2,7 @@ import logging import asyncio from operator import itemgetter from typing import Dict, Optional -from time import time +from time import perf_counter as time from torba.rpc import RPCSession as BaseClientSession, Connector, RPCError @@ -14,7 +14,7 @@ log = logging.getLogger(__name__) class ClientSession(BaseClientSession): - def __init__(self, *args, network, server, timeout=30, **kwargs): + def __init__(self, *args, network, server, timeout=30, on_connect_callback=None, **kwargs): self.network = network self.server = server super().__init__(*args, **kwargs) @@ -24,6 +24,7 @@ class ClientSession(BaseClientSession): self.timeout = timeout self.max_seconds_idle = timeout * 2 self.latency = 1 << 32 + self._on_connect_cb = on_connect_callback or (lambda: None) @property def available(self): @@ -53,6 +54,7 @@ class ClientSession(BaseClientSession): if self.is_closing(): await self.create_connection(self.timeout) await self.ensure_server_version() + self._on_connect_cb() if (time() - self.last_send) > self.max_seconds_idle or self.latency == 1 << 32: await self.send_request('server.banner') retry_delay = default_delay @@ -177,8 +179,9 @@ class SessionPool: def __init__(self, network: BaseNetwork, timeout: float): self.network = network - self.sessions: Dict[ClientSession, asyncio.Task] = dict() + self.sessions: Dict[ClientSession, Optional[asyncio.Task]] = dict() self.timeout = timeout + self.new_connection_event = asyncio.Event() @property def online(self): @@ -195,8 +198,11 @@ class SessionPool: return min([(session.latency, session) for session in self.available_sessions], key=itemgetter(0))[1] def start(self, default_servers): + callback = self.new_connection_event.set self.sessions = { - ClientSession(network=self.network, server=server): None for server in default_servers + ClientSession( + network=self.network, server=server, on_connect_callback=callback + ): None for server in default_servers } self.ensure_connections() @@ -214,9 +220,7 @@ class SessionPool: self.sessions[session] = task async def wait_for_fastest_session(self): - while True: - fastest = self.fastest_session - if fastest: - return fastest - else: - await asyncio.sleep(0.5) + while not self.fastest_session: + self.new_connection_event.clear() + await self.new_connection_event.wait() + return self.fastest_session diff --git a/torba/torba/rpc/jsonrpc.py b/torba/torba/rpc/jsonrpc.py index 282b20df0..4e5cca8ca 100644 --- a/torba/torba/rpc/jsonrpc.py +++ b/torba/torba/rpc/jsonrpc.py @@ -33,6 +33,7 @@ __all__ = ('JSONRPC', 'JSONRPCv1', 'JSONRPCv2', 'JSONRPCLoose', import itertools import json import typing +import asyncio from functools import partial from numbers import Number @@ -748,7 +749,7 @@ class JSONRPCConnection(object): def time_out_pending_requests(self): """Times out all pending requests.""" # this used to be CancelledError, but thats confusing as in are we closing the whole sdk or failing? - exception = TimeoutError() + exception = asyncio.TimeoutError() for request, event in self._requests.values(): event.result = exception event.set() diff --git a/torba/torba/rpc/session.py b/torba/torba/rpc/session.py index 4e0295cdb..e16b6bbb4 100644 --- a/torba/torba/rpc/session.py +++ b/torba/torba/rpc/session.py @@ -474,7 +474,7 @@ class RPCSession(SessionBase): async def send_request(self, method, args=()): """Send an RPC request over the network.""" if self.is_closing(): - raise asyncio.TimeoutError() + raise asyncio.TimeoutError("Trying to send request on a recently dropped connection.") message, event = self.connection.send_request(Request(method, args)) await self._send_message(message) await event.wait() From 011b7f090fb5defb8ef2ec4935c62cf0486020eb Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 12 Aug 2019 14:00:53 -0300 Subject: [PATCH 12/13] add urgent reconnect when api is called to bypass retry delay --- torba/torba/client/basenetwork.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index f145907a5..752c2ed90 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -2,7 +2,7 @@ import logging import asyncio from operator import itemgetter from typing import Dict, Optional -from time import perf_counter as time +from time import time, perf_counter from torba.rpc import RPCSession as BaseClientSession, Connector, RPCError @@ -25,6 +25,7 @@ class ClientSession(BaseClientSession): self.max_seconds_idle = timeout * 2 self.latency = 1 << 32 self._on_connect_cb = on_connect_callback or (lambda: None) + self.trigger_urgent_reconnect = asyncio.Event() @property def available(self): @@ -32,11 +33,11 @@ class ClientSession(BaseClientSession): async def send_request(self, method, args=()): try: - start = time() + start = perf_counter() result = await asyncio.wait_for( super().send_request(method, args), timeout=self.timeout ) - self.latency = time() - start + self.latency = perf_counter() - start return result except RPCError as e: log.warning("Wallet server returned an error. Code: %s Message: %s", *e.args) @@ -62,7 +63,12 @@ class ClientSession(BaseClientSession): await self.close() retry_delay = min(60, retry_delay * 2) log.warning("Wallet server timeout (retry in %s seconds): %s:%d", retry_delay, *self.server) - await asyncio.sleep(retry_delay) + try: + await asyncio.wait_for(self.trigger_urgent_reconnect.wait(), timeout=retry_delay) + except asyncio.TimeoutError: + pass + finally: + self.trigger_urgent_reconnect.clear() def ensure_server_version(self, required='1.2'): return self.send_request('server.version', [__version__, required]) @@ -145,6 +151,7 @@ class BaseNetwork: if self.is_connected: return self.client.send_request(list_or_method, args) else: + self.session_pool.trigger_nodelay_connect() raise ConnectionError("Attempting to send rpc request when connection is not available.") def _update_remote_height(self, header_args): @@ -219,8 +226,15 @@ class SessionPool: task.add_done_callback(lambda _: self.ensure_connections()) self.sessions[session] = task + def trigger_nodelay_connect(self): + # used when other parts of the system sees we might have internet back + # bypasses the retry interval + for session in self.sessions: + session.trigger_urgent_reconnect.set() + async def wait_for_fastest_session(self): while not self.fastest_session: + self.trigger_nodelay_connect() self.new_connection_event.clear() await self.new_connection_event.wait() return self.fastest_session From f3a163b382ef7dba8c963ba9e0e241fbb78e3d41 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 12 Aug 2019 18:16:53 -0300 Subject: [PATCH 13/13] fix names and types --- .../integration/test_wallet_server_sessions.py | 2 +- .../client_tests/integration/test_network.py | 6 +++--- torba/torba/client/basenetwork.py | 16 +++++++++------- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/lbry/tests/integration/test_wallet_server_sessions.py b/lbry/tests/integration/test_wallet_server_sessions.py index 5234b990c..451006d48 100644 --- a/lbry/tests/integration/test_wallet_server_sessions.py +++ b/lbry/tests/integration/test_wallet_server_sessions.py @@ -26,7 +26,7 @@ class TestSessionBloat(IntegrationTestCase): self.assertEqual(len(self.conductor.spv_node.server.session_mgr.sessions), 1) self.assertFalse(session.is_closing()) await asyncio.sleep(1.1) - with self.assertRaises((asyncio.TimeoutError, asyncio.CancelledError)): + with self.assertRaises(asyncio.TimeoutError): await session.send_request('server.banner', ()) self.assertTrue(session.is_closing()) self.assertEqual(len(self.conductor.spv_node.server.session_mgr.sessions), 0) diff --git a/torba/tests/client_tests/integration/test_network.py b/torba/tests/client_tests/integration/test_network.py index d4a9c5d4b..8d0faed2a 100644 --- a/torba/tests/client_tests/integration/test_network.py +++ b/torba/tests/client_tests/integration/test_network.py @@ -29,16 +29,16 @@ class ReconnectTests(IntegrationTestCase): await asyncio.wait_for(self.on_transaction_id(sendtxid), 1.0) # mempool await self.blockchain.generate(1) await self.on_transaction_id(sendtxid) # confirmed - self.assertLess(self.ledger.network.client.latency, 1) # latency properly set lower, we are fine + self.assertLess(self.ledger.network.client.response_time, 1) # response time properly set lower, we are fine await self.assertBalance(self.account, '1.1337') # is it real? are we rich!? let me see this tx... d = self.ledger.network.get_transaction(sendtxid) # what's that smoke on my ethernet cable? oh no! self.ledger.network.client.connection_lost(Exception()) - with self.assertRaises((asyncio.TimeoutError, asyncio.CancelledError)): + with self.assertRaises(asyncio.TimeoutError): await d - self.assertGreater(self.ledger.network.client.latency, 1000) # latency skyrockets as it failed + self.assertIsNone(self.ledger.network.client.response_time) # response time unknown as it failed # rich but offline? no way, no water, let's retry with self.assertRaisesRegex(ConnectionError, 'connection is not available'): await self.ledger.network.get_transaction(sendtxid) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index 752c2ed90..39ed1fbdf 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -23,13 +23,13 @@ class ClientSession(BaseClientSession): self.bw_limit = self.framer.max_size = self.max_errors = 1 << 32 self.timeout = timeout self.max_seconds_idle = timeout * 2 - self.latency = 1 << 32 + self.response_time: Optional[float] = None self._on_connect_cb = on_connect_callback or (lambda: None) self.trigger_urgent_reconnect = asyncio.Event() @property def available(self): - return not self.is_closing() and self._can_send.is_set() and self.latency < 1 << 32 + return not self.is_closing() and self._can_send.is_set() and self.response_time is not None async def send_request(self, method, args=()): try: @@ -37,13 +37,13 @@ class ClientSession(BaseClientSession): result = await asyncio.wait_for( super().send_request(method, args), timeout=self.timeout ) - self.latency = perf_counter() - start + self.response_time = perf_counter() - start return result except RPCError as e: log.warning("Wallet server returned an error. Code: %s Message: %s", *e.args) raise e except TimeoutError: - self.latency = 1 << 32 + self.response_time = None raise async def ensure_session(self): @@ -56,7 +56,7 @@ class ClientSession(BaseClientSession): await self.create_connection(self.timeout) await self.ensure_server_version() self._on_connect_cb() - if (time() - self.last_send) > self.max_seconds_idle or self.latency == 1 << 32: + if (time() - self.last_send) > self.max_seconds_idle or self.response_time is None: await self.send_request('server.banner') retry_delay = default_delay except (asyncio.TimeoutError, OSError): @@ -84,7 +84,7 @@ class ClientSession(BaseClientSession): def connection_lost(self, exc): log.debug("Connection lost: %s:%d", *self.server) super().connection_lost(exc) - self.latency = 1 << 32 + self.response_time = None self._on_disconnect_controller.add(True) @@ -202,7 +202,9 @@ class SessionPool: def fastest_session(self): if not self.available_sessions: return None - return min([(session.latency, session) for session in self.available_sessions], key=itemgetter(0))[1] + return min( + [(session.response_time, session) for session in self.available_sessions], key=itemgetter(0) + )[1] def start(self, default_servers): callback = self.new_connection_event.set