diff --git a/electrum/address_synchronizer.py b/electrum/address_synchronizer.py index 56203d671..d0e8dff1f 100644 --- a/electrum/address_synchronizer.py +++ b/electrum/address_synchronizer.py @@ -139,14 +139,14 @@ class AddressSynchronizer(PrintError): if self.network is not None: self.verifier = SPV(self.network, self) self.synchronizer = Synchronizer(self, network) - network.add_jobs([self.verifier, self.synchronizer]) + #network.add_jobs([self.verifier, self.synchronizer]) else: self.verifier = None self.synchronizer = None def stop_threads(self): if self.network: - self.network.remove_jobs([self.synchronizer, self.verifier]) + #self.network.remove_jobs([self.synchronizer, self.verifier]) self.synchronizer.release() self.synchronizer = None self.verifier = None diff --git a/electrum/daemon.py b/electrum/daemon.py index ccdce4a0a..72d6ece8b 100644 --- a/electrum/daemon.py +++ b/electrum/daemon.py @@ -128,8 +128,8 @@ class Daemon(DaemonThread): self.network = Network(config) self.network.start() self.fx = FxThread(config, self.network) - if self.network: - self.network.add_jobs([self.fx]) + #if self.network: + # self.network.add_jobs([self.fx]) self.gui = None self.wallets = {} # Setup JSONRPC server diff --git a/electrum/gui/qt/main_window.py b/electrum/gui/qt/main_window.py index 4d2eb241a..b62dd3e9b 100644 --- a/electrum/gui/qt/main_window.py +++ b/electrum/gui/qt/main_window.py @@ -712,7 +712,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, PrintError): if not self.wallet: return - if self.network is None or not self.network.is_running(): + if self.network is None: text = _("Offline") icon = QIcon(":icons/status_disconnected.png") diff --git a/electrum/network.py b/electrum/network.py index 5793ad621..4a7215ced 100644 --- a/electrum/network.py +++ b/electrum/network.py @@ -39,20 +39,42 @@ import dns.resolver import socks from . import util -from .util import print_error +from .util import print_error, PrintError from . import bitcoin from .bitcoin import COIN from . import constants -from .interface import Connection, Interface from . import blockchain from .version import ELECTRUM_VERSION, PROTOCOL_VERSION from .i18n import _ from .blockchain import InvalidHeader +import aiorpcx, asyncio, ssl +import concurrent.futures NODES_RETRY_INTERVAL = 60 SERVER_RETRY_INTERVAL = 10 +class Interface(PrintError): + @util.aiosafe + async def run(self): + self.host, self.port, self.protocol = self.server.split(':') + async with aiorpcx.ClientSession(self.host, self.port) as session: + ver = await session.send_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION]) + print(ver) + + def __init__(self, server): + self.exception = None + self.server = server + self.fut = asyncio.get_event_loop().create_task(self.run()) + + def has_timed_out(self): + return self.fut.done() + + def queue_request(self, method, params, msg_id): + pass + + def close(self): + self.fut.cancel() def parse_servers(result): """ parse servers list into dict format""" @@ -162,7 +184,7 @@ def serialize_server(host, port, protocol): return str(':'.join([host, port, protocol])) -class Network(util.DaemonThread): +class Network(PrintError): """The Network class manages a set of connections to remote electrum servers, each connected socket is handled by an Interface() object. Connections are initiated by a Connection() thread which stops once @@ -179,7 +201,6 @@ class Network(util.DaemonThread): def __init__(self, config=None): if config is None: config = {} # Do not use mutables as default values! - util.DaemonThread.__init__(self) self.config = SimpleConfig(config) if isinstance(config, dict) else config self.num_server = 10 if not self.config.get('oneserver') else 0 self.blockchains = blockchain.read_blockchains(self.config) # note: needs self.blockchains_lock @@ -244,6 +265,7 @@ class Network(util.DaemonThread): self.socket_queue = queue.Queue() self.start_network(deserialize_server(self.default_server)[2], deserialize_proxy(self.config.get('proxy'))) + self.asyncio_loop = asyncio.get_event_loop() def with_interface_lock(func): def func_wrapper(self, *args, **kwargs): @@ -424,7 +446,7 @@ class Network(util.DaemonThread): self.print_error("connecting to %s as new interface" % server) self.set_status('connecting') self.connecting.add(server) - Connection(server, self.socket_queue, self.config.path) + self.socket_queue.put(server) def start_random_interface(self): with self.interface_lock: @@ -781,10 +803,10 @@ class Network(util.DaemonThread): if b.catch_up == server: b.catch_up = None - def new_interface(self, server, socket): + def new_interface(self, server): # todo: get tip first, then decide which checkpoint to use. self.add_recent_server(server) - interface = Interface(server, socket) + interface = Interface(server) interface.blockchain = None interface.tip_header = None interface.tip = 0 @@ -804,12 +826,12 @@ class Network(util.DaemonThread): '''Socket maintenance.''' # Responses to connection attempts? while not self.socket_queue.empty(): - server, socket = self.socket_queue.get() + server = self.socket_queue.get() if server in self.connecting: self.connecting.remove(server) if socket: - self.new_interface(server, socket) + self.new_interface(server) else: self.connection_down(server) @@ -1078,16 +1100,15 @@ class Network(util.DaemonThread): with b.lock: b.update_size() - def run(self): + def _run(self): self.init_headers_file() - while self.is_running(): - self.maintain_sockets() - self.wait_on_sockets() - self.maintain_requests() - self.run_jobs() # Synchronizer and Verifier - self.process_pending_sends() - self.stop_network() - self.on_stop() + these = [self.maintain_sessions()] + these = [self.asyncio_loop.create_task(x) for x in these] + self.gat = asyncio.gather(*these) + try: + self.asyncio_loop.run_until_complete(self.gat) + except concurrent.futures.CancelledError: + pass def on_notify_header(self, interface, header_dict): try: @@ -1321,3 +1342,31 @@ class Network(util.DaemonThread): @classmethod def max_checkpoint(cls): return max(0, len(constants.net.CHECKPOINTS) * 2016 - 1) + + def start(self): + self.fut = threading.Thread(target=self._run) + self.fut.start() + + def stop(self): + async def stop(): + self.gat.cancel() + asyncio.run_coroutine_threadsafe(stop(), self.asyncio_loop) + + def join(self): + return self.fut.join(1) + + async def maintain_sessions(self): + while True: + while self.socket_queue.qsize() > 0: + server = self.socket_queue.get() + self.new_interface(server) + remove = [] + for k, i in self.interfaces.items(): + if i.has_timed_out(): + remove.append(k) + for k in remove: + self.connection_down(k) + for i in range(self.num_server - len(self.interfaces)): + self.start_random_interface() + self.notify('updated') + await asyncio.sleep(1) diff --git a/electrum/util.py b/electrum/util.py index 7963c1a2b..4a3629cd4 100644 --- a/electrum/util.py +++ b/electrum/util.py @@ -925,6 +925,19 @@ def make_dir(path, allow_symlink=True): os.mkdir(path) os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) +def aiosafe(f): + # save exception in object. + # f must be a method of a PrintError instance. + # aiosafe calls should not be nested + async def f2(*args, **kwargs): + self = args[0] + try: + return await f(*args, **kwargs) + except BaseException as e: + self.print_error("Exception in", f.__name__, ":", e.__class__.__name__, str(e)) + traceback.print_exc(file=sys.stderr) + self.exception = e + return f2 TxMinedStatus = NamedTuple("TxMinedStatus", [("height", int), ("conf", int),