mirror of
https://github.com/LBRYFoundation/LBRY-Vault.git
synced 2025-08-29 16:31:29 +00:00
network: don't let _maintain_sessions die from CancelledError
as then the network would get paralysed and no one can fix it
This commit is contained in:
parent
b3ff173b45
commit
62e352a2a8
1 changed files with 19 additions and 10 deletions
|
@ -197,7 +197,7 @@ class Network(PrintError):
|
||||||
if not self.default_server:
|
if not self.default_server:
|
||||||
self.default_server = pick_random_server()
|
self.default_server = pick_random_server()
|
||||||
|
|
||||||
self.main_taskgroup = None
|
self.main_taskgroup = None # type: TaskGroup
|
||||||
|
|
||||||
# locks
|
# locks
|
||||||
self.restart_lock = asyncio.Lock()
|
self.restart_lock = asyncio.Lock()
|
||||||
|
@ -817,7 +817,7 @@ class Network(PrintError):
|
||||||
|
|
||||||
async def _start(self):
|
async def _start(self):
|
||||||
assert not self.main_taskgroup
|
assert not self.main_taskgroup
|
||||||
self.main_taskgroup = SilentTaskGroup()
|
self.main_taskgroup = main_taskgroup = SilentTaskGroup()
|
||||||
assert not self.interface and not self.interfaces
|
assert not self.interface and not self.interfaces
|
||||||
assert not self.connecting and not self.server_queue
|
assert not self.connecting and not self.server_queue
|
||||||
self.print_error('starting network')
|
self.print_error('starting network')
|
||||||
|
@ -831,7 +831,9 @@ class Network(PrintError):
|
||||||
async def main():
|
async def main():
|
||||||
try:
|
try:
|
||||||
await self._init_headers_file()
|
await self._init_headers_file()
|
||||||
async with self.main_taskgroup as group:
|
# note: if a task finishes with CancelledError, that
|
||||||
|
# will NOT raise, and the group will keep the other tasks running
|
||||||
|
async with main_taskgroup as group:
|
||||||
await group.spawn(self._maintain_sessions())
|
await group.spawn(self._maintain_sessions())
|
||||||
[await group.spawn(job) for job in self._jobs]
|
[await group.spawn(job) for job in self._jobs]
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
@ -852,7 +854,7 @@ class Network(PrintError):
|
||||||
await asyncio.wait_for(self.main_taskgroup.cancel_remaining(), timeout=2)
|
await asyncio.wait_for(self.main_taskgroup.cancel_remaining(), timeout=2)
|
||||||
except (asyncio.TimeoutError, asyncio.CancelledError) as e:
|
except (asyncio.TimeoutError, asyncio.CancelledError) as e:
|
||||||
self.print_error(f"exc during main_taskgroup cancellation: {repr(e)}")
|
self.print_error(f"exc during main_taskgroup cancellation: {repr(e)}")
|
||||||
self.main_taskgroup = None
|
self.main_taskgroup = None # type: TaskGroup
|
||||||
self.interface = None # type: Interface
|
self.interface = None # type: Interface
|
||||||
self.interfaces = {} # type: Dict[str, Interface]
|
self.interfaces = {} # type: Dict[str, Interface]
|
||||||
self.connecting.clear()
|
self.connecting.clear()
|
||||||
|
@ -884,13 +886,11 @@ class Network(PrintError):
|
||||||
await self.switch_to_interface(self.default_server)
|
await self.switch_to_interface(self.default_server)
|
||||||
|
|
||||||
async def _maintain_sessions(self):
|
async def _maintain_sessions(self):
|
||||||
while True:
|
async def launch_already_queued_up_new_interfaces():
|
||||||
# launch already queued up new interfaces
|
|
||||||
while self.server_queue.qsize() > 0:
|
while self.server_queue.qsize() > 0:
|
||||||
server = self.server_queue.get()
|
server = self.server_queue.get()
|
||||||
await self.main_taskgroup.spawn(self._run_new_interface(server))
|
await self.main_taskgroup.spawn(self._run_new_interface(server))
|
||||||
|
async def maybe_queue_new_interfaces_to_be_launched_later():
|
||||||
# maybe queue new interfaces to be launched later
|
|
||||||
now = time.time()
|
now = time.time()
|
||||||
for i in range(self.num_server - len(self.interfaces) - len(self.connecting)):
|
for i in range(self.num_server - len(self.interfaces) - len(self.connecting)):
|
||||||
self._start_random_interface()
|
self._start_random_interface()
|
||||||
|
@ -898,13 +898,22 @@ class Network(PrintError):
|
||||||
self.print_error('network: retrying connections')
|
self.print_error('network: retrying connections')
|
||||||
self.disconnected_servers = set([])
|
self.disconnected_servers = set([])
|
||||||
self.nodes_retry_time = now
|
self.nodes_retry_time = now
|
||||||
|
async def maintain_main_interface():
|
||||||
# main interface
|
|
||||||
await self._ensure_there_is_a_main_interface()
|
await self._ensure_there_is_a_main_interface()
|
||||||
if self.is_connected():
|
if self.is_connected():
|
||||||
if self.config.is_fee_estimates_update_required():
|
if self.config.is_fee_estimates_update_required():
|
||||||
await self.interface.group.spawn(self._request_fee_estimates, self.interface)
|
await self.interface.group.spawn(self._request_fee_estimates, self.interface)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
await launch_already_queued_up_new_interfaces()
|
||||||
|
await maybe_queue_new_interfaces_to_be_launched_later()
|
||||||
|
await maintain_main_interface()
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
# suppress spurious cancellations
|
||||||
|
group = self.main_taskgroup
|
||||||
|
if not group or group._closed:
|
||||||
|
raise
|
||||||
await asyncio.sleep(0.1)
|
await asyncio.sleep(0.1)
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue