mirror of
https://github.com/LBRYFoundation/LBRY-Vault.git
synced 2025-09-02 18:25:21 +00:00
network: make best_effort_reliable smarter and a bit more lenient
related: #5815
This commit is contained in:
parent
dfdc1e1d25
commit
68dad21fb4
2 changed files with 14 additions and 13 deletions
|
@ -374,7 +374,7 @@ class Interface(Logger):
|
||||||
self.logger.info(f'disconnecting due to: {repr(e)}')
|
self.logger.info(f'disconnecting due to: {repr(e)}')
|
||||||
return
|
return
|
||||||
|
|
||||||
def mark_ready(self):
|
def _mark_ready(self) -> None:
|
||||||
if self.ready.cancelled():
|
if self.ready.cancelled():
|
||||||
raise GracefulDisconnect('conn establishment was too slow; *ready* future was cancelled')
|
raise GracefulDisconnect('conn establishment was too slow; *ready* future was cancelled')
|
||||||
if self.ready.done():
|
if self.ready.done():
|
||||||
|
@ -512,7 +512,7 @@ class Interface(Logger):
|
||||||
self.tip = height
|
self.tip = height
|
||||||
if self.tip < constants.net.max_checkpoint():
|
if self.tip < constants.net.max_checkpoint():
|
||||||
raise GracefulDisconnect('server tip below max checkpoint')
|
raise GracefulDisconnect('server tip below max checkpoint')
|
||||||
self.mark_ready()
|
self._mark_ready()
|
||||||
await self._process_header_at_tip()
|
await self._process_header_at_tip()
|
||||||
self.network.trigger_callback('network_updated')
|
self.network.trigger_callback('network_updated')
|
||||||
await self.network.switch_unwanted_fork_interface()
|
await self.network.switch_unwanted_fork_interface()
|
||||||
|
|
|
@ -290,6 +290,7 @@ class Network(Logger):
|
||||||
self.nodes_retry_time = time.time()
|
self.nodes_retry_time = time.time()
|
||||||
# the main server we are currently communicating with
|
# the main server we are currently communicating with
|
||||||
self.interface = None # type: Interface
|
self.interface = None # type: Interface
|
||||||
|
self.default_server_changed_event = asyncio.Event()
|
||||||
# set of servers we have an ongoing connection with
|
# set of servers we have an ongoing connection with
|
||||||
self.interfaces = {} # type: Dict[str, Interface]
|
self.interfaces = {} # type: Dict[str, Interface]
|
||||||
self.auto_connect = self.config.get('auto_connect', True)
|
self.auto_connect = self.config.get('auto_connect', True)
|
||||||
|
@ -730,10 +731,13 @@ class Network(Logger):
|
||||||
i = self.interfaces[server]
|
i = self.interfaces[server]
|
||||||
if old_interface != i:
|
if old_interface != i:
|
||||||
self.logger.info(f"switching to {server}")
|
self.logger.info(f"switching to {server}")
|
||||||
|
assert i.ready.done(), "interface we are switching to is not ready yet"
|
||||||
blockchain_updated = i.blockchain != self.blockchain()
|
blockchain_updated = i.blockchain != self.blockchain()
|
||||||
self.interface = i
|
self.interface = i
|
||||||
await i.group.spawn(self._request_server_info(i))
|
await i.group.spawn(self._request_server_info(i))
|
||||||
self.trigger_callback('default_server_changed')
|
self.trigger_callback('default_server_changed')
|
||||||
|
self.default_server_changed_event.set()
|
||||||
|
self.default_server_changed_event.clear()
|
||||||
self._set_status('connected')
|
self._set_status('connected')
|
||||||
self.trigger_callback('network_updated')
|
self.trigger_callback('network_updated')
|
||||||
if blockchain_updated: self.trigger_callback('blockchain_updated')
|
if blockchain_updated: self.trigger_callback('blockchain_updated')
|
||||||
|
@ -840,30 +844,27 @@ class Network(Logger):
|
||||||
b.update_size()
|
b.update_size()
|
||||||
|
|
||||||
def best_effort_reliable(func):
|
def best_effort_reliable(func):
|
||||||
async def make_reliable_wrapper(self, *args, **kwargs):
|
async def make_reliable_wrapper(self: 'Network', *args, **kwargs):
|
||||||
for i in range(10):
|
for i in range(10):
|
||||||
iface = self.interface
|
iface = self.interface
|
||||||
# retry until there is a main interface
|
# retry until there is a main interface
|
||||||
if not iface:
|
if not iface:
|
||||||
await asyncio.sleep(0.1)
|
try:
|
||||||
continue # try again
|
await asyncio.wait_for(self.default_server_changed_event.wait(), 1)
|
||||||
# wait for it to be usable
|
except asyncio.TimeoutError:
|
||||||
iface_ready = iface.ready
|
pass
|
||||||
iface_disconnected = iface.got_disconnected
|
|
||||||
await asyncio.wait([iface_ready, iface_disconnected], return_when=asyncio.FIRST_COMPLETED)
|
|
||||||
if not iface_ready.done() or iface_ready.cancelled():
|
|
||||||
await asyncio.sleep(0.1)
|
|
||||||
continue # try again
|
continue # try again
|
||||||
|
assert iface.ready.done(), "interface not ready yet"
|
||||||
# try actual request
|
# try actual request
|
||||||
success_fut = asyncio.ensure_future(func(self, *args, **kwargs))
|
success_fut = asyncio.ensure_future(func(self, *args, **kwargs))
|
||||||
await asyncio.wait([success_fut, iface_disconnected], return_when=asyncio.FIRST_COMPLETED)
|
await asyncio.wait([success_fut, iface.got_disconnected], return_when=asyncio.FIRST_COMPLETED)
|
||||||
if success_fut.done() and not success_fut.cancelled():
|
if success_fut.done() and not success_fut.cancelled():
|
||||||
if success_fut.exception():
|
if success_fut.exception():
|
||||||
try:
|
try:
|
||||||
raise success_fut.exception()
|
raise success_fut.exception()
|
||||||
except RequestTimedOut:
|
except RequestTimedOut:
|
||||||
await iface.close()
|
await iface.close()
|
||||||
await iface_disconnected
|
await iface.got_disconnected
|
||||||
continue # try again
|
continue # try again
|
||||||
return success_fut.result()
|
return success_fut.result()
|
||||||
# otherwise; try again
|
# otherwise; try again
|
||||||
|
|
Loading…
Add table
Reference in a new issue