mirror of
https://github.com/LBRYFoundation/LBRY-Vault.git
synced 2025-08-23 17:47:31 +00:00
move event loop construction to daemon
This commit is contained in:
parent
ca8eae919f
commit
c2ecfaf239
2 changed files with 41 additions and 34 deletions
|
@ -30,6 +30,7 @@ import traceback
|
|||
import sys
|
||||
import threading
|
||||
from typing import Dict, Optional, Tuple
|
||||
import re
|
||||
|
||||
import jsonrpclib
|
||||
|
||||
|
@ -127,10 +128,12 @@ class Daemon(DaemonThread):
|
|||
if fd is None and listen_jsonrpc:
|
||||
fd, server = get_fd_or_server(config)
|
||||
if fd is None: raise Exception('failed to lock daemon; already running?')
|
||||
self.create_and_start_event_loop()
|
||||
if config.get('offline'):
|
||||
self.network = None
|
||||
else:
|
||||
self.network = Network(config)
|
||||
self.network._loop_thread = self._loop_thread
|
||||
self.fx = FxThread(config, self.network)
|
||||
if self.network:
|
||||
self.network.start([self.fx.run])
|
||||
|
@ -170,7 +173,7 @@ class Daemon(DaemonThread):
|
|||
return True
|
||||
|
||||
def run_daemon(self, config_options):
|
||||
asyncio.set_event_loop(self.network.asyncio_loop) # FIXME what if self.network is None?
|
||||
asyncio.set_event_loop(self.asyncio_loop)
|
||||
config = SimpleConfig(config_options)
|
||||
sub = config.get('subcommand')
|
||||
assert sub in [None, 'start', 'stop', 'status', 'load_wallet', 'close_wallet']
|
||||
|
@ -265,7 +268,7 @@ class Daemon(DaemonThread):
|
|||
wallet.stop_threads()
|
||||
|
||||
def run_cmdline(self, config_options):
|
||||
asyncio.set_event_loop(self.network.asyncio_loop) # FIXME what if self.network is None?
|
||||
asyncio.set_event_loop(self.asyncio_loop)
|
||||
password = config_options.get('password')
|
||||
new_password = config_options.get('new_password')
|
||||
config = SimpleConfig(config_options)
|
||||
|
@ -297,11 +300,15 @@ class Daemon(DaemonThread):
|
|||
def run(self):
|
||||
while self.is_running():
|
||||
self.server.handle_request() if self.server else time.sleep(0.1)
|
||||
# stop network/wallets
|
||||
for k, wallet in self.wallets.items():
|
||||
wallet.stop_threads()
|
||||
if self.network:
|
||||
self.print_error("shutting down network")
|
||||
self.network.stop()
|
||||
# stop event loop
|
||||
self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1)
|
||||
self._loop_thread.join(timeout=1)
|
||||
self.on_stop()
|
||||
|
||||
def stop(self):
|
||||
|
@ -323,3 +330,22 @@ class Daemon(DaemonThread):
|
|||
except BaseException as e:
|
||||
traceback.print_exc(file=sys.stdout)
|
||||
# app will exit now
|
||||
|
||||
def create_and_start_event_loop(self):
|
||||
def on_exception(loop, context):
|
||||
"""Suppress spurious messages it appears we cannot control."""
|
||||
SUPPRESS_MESSAGE_REGEX = re.compile('SSL handshake|Fatal read error on|'
|
||||
'SSL error in data received')
|
||||
message = context.get('message')
|
||||
if message and SUPPRESS_MESSAGE_REGEX.match(message):
|
||||
return
|
||||
loop.default_exception_handler(context)
|
||||
|
||||
self.asyncio_loop = asyncio.get_event_loop()
|
||||
self.asyncio_loop.set_exception_handler(on_exception)
|
||||
# self.asyncio_loop.set_debug(1)
|
||||
self._stop_loop = asyncio.Future()
|
||||
self._loop_thread = threading.Thread(target=self.asyncio_loop.run_until_complete,
|
||||
args=(self._stop_loop,),
|
||||
name='EventLoop')
|
||||
self._loop_thread.start()
|
||||
|
|
|
@ -168,6 +168,10 @@ class Network(PrintError):
|
|||
def __init__(self, config: SimpleConfig=None):
|
||||
global INSTANCE
|
||||
INSTANCE = self
|
||||
|
||||
self.asyncio_loop = asyncio.get_event_loop()
|
||||
self._loop_thread = None # type: threading.Thread # set by caller; only used for sanity checks
|
||||
|
||||
if config is None:
|
||||
config = {} # Do not use mutables as default values!
|
||||
self.config = SimpleConfig(config) if isinstance(config, dict) else config # type: SimpleConfig
|
||||
|
@ -221,17 +225,8 @@ class Network(PrintError):
|
|||
self.server_queue = None
|
||||
self.proxy = None
|
||||
|
||||
self.asyncio_loop = asyncio.get_event_loop()
|
||||
self.asyncio_loop.set_exception_handler(self.on_event_loop_exception)
|
||||
#self.asyncio_loop.set_debug(1)
|
||||
self._run_forever = asyncio.Future()
|
||||
self._thread = threading.Thread(target=self.asyncio_loop.run_until_complete,
|
||||
args=(self._run_forever,),
|
||||
name='Network')
|
||||
self._thread.start()
|
||||
|
||||
def run_from_another_thread(self, coro):
|
||||
assert self._thread != threading.current_thread(), 'must not be called from network thread'
|
||||
assert self._loop_thread != threading.current_thread(), 'must not be called from network thread'
|
||||
fut = asyncio.run_coroutine_threadsafe(coro, self.asyncio_loop)
|
||||
return fut.result()
|
||||
|
||||
|
@ -239,15 +234,6 @@ class Network(PrintError):
|
|||
def get_instance():
|
||||
return INSTANCE
|
||||
|
||||
def on_event_loop_exception(self, loop, context):
|
||||
"""Suppress spurious messages it appears we cannot control."""
|
||||
SUPPRESS_MESSAGE_REGEX = re.compile('SSL handshake|Fatal read error on|'
|
||||
'SSL error in data received')
|
||||
message = context.get('message')
|
||||
if message and SUPPRESS_MESSAGE_REGEX.match(message):
|
||||
return
|
||||
loop.default_exception_handler(context)
|
||||
|
||||
def with_recent_servers_lock(func):
|
||||
def func_wrapper(self, *args, **kwargs):
|
||||
with self.recent_servers_lock:
|
||||
|
@ -845,25 +831,20 @@ class Network(PrintError):
|
|||
await asyncio.wait_for(self.main_taskgroup.cancel_remaining(), timeout=2)
|
||||
except (asyncio.TimeoutError, asyncio.CancelledError) as e:
|
||||
self.print_error(f"exc during main_taskgroup cancellation: {repr(e)}")
|
||||
try:
|
||||
self.main_taskgroup = None
|
||||
self.interface = None # type: Interface
|
||||
self.interfaces = {} # type: Dict[str, Interface]
|
||||
self.connecting.clear()
|
||||
self.server_queue = None
|
||||
if not full_shutdown:
|
||||
self.trigger_callback('network_updated')
|
||||
finally:
|
||||
if full_shutdown:
|
||||
self._run_forever.set_result(1)
|
||||
self.main_taskgroup = None
|
||||
self.interface = None # type: Interface
|
||||
self.interfaces = {} # type: Dict[str, Interface]
|
||||
self.connecting.clear()
|
||||
self.server_queue = None
|
||||
if not full_shutdown:
|
||||
self.trigger_callback('network_updated')
|
||||
|
||||
def stop(self):
|
||||
assert self._thread != threading.current_thread(), 'must not be called from network thread'
|
||||
assert self._loop_thread != threading.current_thread(), 'must not be called from network thread'
|
||||
fut = asyncio.run_coroutine_threadsafe(self._stop(full_shutdown=True), self.asyncio_loop)
|
||||
try:
|
||||
fut.result(timeout=2)
|
||||
except (asyncio.TimeoutError, asyncio.CancelledError): pass
|
||||
self._thread.join(timeout=1)
|
||||
|
||||
async def _ensure_there_is_a_main_interface(self):
|
||||
if self.is_connected():
|
||||
|
|
Loading…
Add table
Reference in a new issue