mirror of
https://github.com/LBRYFoundation/LBRY-Vault.git
synced 2025-08-23 17:47:31 +00:00
network: fix some threading issues
This commit is contained in:
parent
1294608571
commit
3be5b4b00f
2 changed files with 18 additions and 15 deletions
|
@ -369,8 +369,10 @@ class Interface(PrintError):
|
||||||
await self.session.send_request('server.ping')
|
await self.session.send_request('server.ping')
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self.fut.cancel()
|
async def job():
|
||||||
asyncio.get_event_loop().create_task(self.group.cancel_remaining())
|
self.fut.cancel()
|
||||||
|
await self.group.cancel_remaining()
|
||||||
|
asyncio.run_coroutine_threadsafe(job(), self.network.asyncio_loop)
|
||||||
|
|
||||||
async def run_fetch_blocks(self):
|
async def run_fetch_blocks(self):
|
||||||
header_queue = asyncio.Queue()
|
header_queue = asyncio.Queue()
|
||||||
|
|
|
@ -260,11 +260,11 @@ class Network(PrintError):
|
||||||
with self.callback_lock:
|
with self.callback_lock:
|
||||||
callbacks = self.callbacks[event][:]
|
callbacks = self.callbacks[event][:]
|
||||||
for callback in callbacks:
|
for callback in callbacks:
|
||||||
|
# FIXME: if callback throws, we will lose the traceback
|
||||||
if asyncio.iscoroutinefunction(callback):
|
if asyncio.iscoroutinefunction(callback):
|
||||||
# FIXME: if callback throws, we will lose the traceback
|
|
||||||
asyncio.run_coroutine_threadsafe(callback(event, *args), self.asyncio_loop)
|
asyncio.run_coroutine_threadsafe(callback(event, *args), self.asyncio_loop)
|
||||||
else:
|
else:
|
||||||
callback(event, *args)
|
self.asyncio_loop.call_soon_threadsafe(callback, event, *args)
|
||||||
|
|
||||||
def read_recent_servers(self):
|
def read_recent_servers(self):
|
||||||
if not self.config.path:
|
if not self.config.path:
|
||||||
|
@ -425,7 +425,7 @@ class Network(PrintError):
|
||||||
|
|
||||||
def start_random_interface(self):
|
def start_random_interface(self):
|
||||||
with self.interface_lock:
|
with self.interface_lock:
|
||||||
exclude_set = self.disconnected_servers.union(set(self.interfaces))
|
exclude_set = self.disconnected_servers | set(self.interfaces) | self.connecting
|
||||||
server = pick_random_server(self.get_servers(), self.protocol, exclude_set)
|
server = pick_random_server(self.get_servers(), self.protocol, exclude_set)
|
||||||
if server:
|
if server:
|
||||||
self.start_interface(server)
|
self.start_interface(server)
|
||||||
|
@ -602,8 +602,8 @@ class Network(PrintError):
|
||||||
self.start_interface(old_server)
|
self.start_interface(old_server)
|
||||||
|
|
||||||
self.interface = i
|
self.interface = i
|
||||||
asyncio.get_event_loop().create_task(
|
asyncio.run_coroutine_threadsafe(
|
||||||
i.group.spawn(self.request_server_info(i)))
|
i.group.spawn(self.request_server_info(i)), self.asyncio_loop)
|
||||||
self.trigger_callback('default_server_changed')
|
self.trigger_callback('default_server_changed')
|
||||||
self.set_status('connected')
|
self.set_status('connected')
|
||||||
self.trigger_callback('network_updated')
|
self.trigger_callback('network_updated')
|
||||||
|
@ -647,21 +647,22 @@ class Network(PrintError):
|
||||||
except BaseException as e:
|
except BaseException as e:
|
||||||
#import traceback
|
#import traceback
|
||||||
#traceback.print_exc()
|
#traceback.print_exc()
|
||||||
self.print_error(interface.server, "couldn't launch because", str(e), str(type(e)))
|
self.print_error(server, "couldn't launch because", str(e), str(type(e)))
|
||||||
# note: connection_down will not call interface.close() as
|
# note: connection_down will not call interface.close() as
|
||||||
# interface is not yet in self.interfaces. OTOH, calling
|
# interface is not yet in self.interfaces. OTOH, calling
|
||||||
# interface.close() here will sometimes raise deep inside the
|
# interface.close() here will sometimes raise deep inside the
|
||||||
# asyncio internal select.select... instead, interface will close
|
# asyncio internal select.select... instead, interface will close
|
||||||
# itself when it detects the cancellation of interface.ready;
|
# itself when it detects the cancellation of interface.ready;
|
||||||
# however this might take several seconds...
|
# however this might take several seconds...
|
||||||
self.connection_down(interface.server)
|
self.connection_down(server)
|
||||||
return
|
return
|
||||||
|
else:
|
||||||
|
with self.interface_lock:
|
||||||
|
self.interfaces[server] = interface
|
||||||
finally:
|
finally:
|
||||||
try: self.connecting.remove(server)
|
with self.interface_lock:
|
||||||
except KeyError: pass
|
try: self.connecting.remove(server)
|
||||||
|
except KeyError: pass
|
||||||
with self.interface_lock:
|
|
||||||
self.interfaces[server] = interface
|
|
||||||
|
|
||||||
if server == self.default_server:
|
if server == self.default_server:
|
||||||
self.switch_to_interface(server)
|
self.switch_to_interface(server)
|
||||||
|
@ -819,6 +820,6 @@ class Network(PrintError):
|
||||||
self.switch_to_interface(self.default_server)
|
self.switch_to_interface(self.default_server)
|
||||||
else:
|
else:
|
||||||
if self.config.is_fee_estimates_update_required():
|
if self.config.is_fee_estimates_update_required():
|
||||||
await self.interface.group.spawn(self.request_fee_estimates(self.interface))
|
await self.interface.group.spawn(self.request_fee_estimates, self.interface)
|
||||||
|
|
||||||
await asyncio.sleep(0.1)
|
await asyncio.sleep(0.1)
|
||||||
|
|
Loading…
Add table
Reference in a new issue