mirror of
https://github.com/LBRYFoundation/LBRY-Vault.git
synced 2025-08-23 17:47:31 +00:00
lnpeer: some exception handling clean up
main_loop should dump traces of unexpected exceptions to log. Coroutines/functions invoked inside main_loop should simply propagate it up the chain. Typical exceptions are handled in handle_disconnect without dumping the trace.
This commit is contained in:
parent
efc8948c00
commit
7e8be3d2e7
1 changed files with 17 additions and 10 deletions
|
@ -43,6 +43,7 @@ from .lnutil import (Outpoint, LocalConfig, RECEIVED, UpdateAddHtlc,
|
||||||
from .lntransport import LNTransport, LNTransportBase
|
from .lntransport import LNTransport, LNTransportBase
|
||||||
from .lnmsg import encode_msg, decode_msg
|
from .lnmsg import encode_msg, decode_msg
|
||||||
from .lnverifier import verify_sig_for_channel_update
|
from .lnverifier import verify_sig_for_channel_update
|
||||||
|
from .interface import GracefulDisconnect
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from .lnworker import LNWorker
|
from .lnworker import LNWorker
|
||||||
|
@ -172,7 +173,7 @@ class Peer(Logger):
|
||||||
if flag not in their_flags and get_ln_flag_pair_of_bit(flag) not in their_flags:
|
if flag not in their_flags and get_ln_flag_pair_of_bit(flag) not in their_flags:
|
||||||
# they don't have this feature we wanted :(
|
# they don't have this feature we wanted :(
|
||||||
if flag % 2 == 0: # even flags are compulsory
|
if flag % 2 == 0: # even flags are compulsory
|
||||||
raise LightningPeerConnectionClosed("remote does not have even flag {}"
|
raise GracefulDisconnect("remote does not have even flag {}"
|
||||||
.format(str(LnLocalFeatures(1 << flag))))
|
.format(str(LnLocalFeatures(1 << flag))))
|
||||||
self.localfeatures ^= 1 << flag # disable flag
|
self.localfeatures ^= 1 << flag # disable flag
|
||||||
if isinstance(self.transport, LNTransport):
|
if isinstance(self.transport, LNTransport):
|
||||||
|
@ -200,13 +201,16 @@ class Peer(Logger):
|
||||||
async def wrapper_func(self, *args, **kwargs):
|
async def wrapper_func(self, *args, **kwargs):
|
||||||
try:
|
try:
|
||||||
return await func(self, *args, **kwargs)
|
return await func(self, *args, **kwargs)
|
||||||
except Exception as e:
|
except GracefulDisconnect as e:
|
||||||
self.logger.info("Disconnecting: {}".format(repr(e)))
|
self.logger.log(e.log_level, f"Disconnecting: {repr(e)}")
|
||||||
|
except LightningPeerConnectionClosed as e:
|
||||||
|
self.logger.info(f"Disconnecting: {repr(e)}")
|
||||||
finally:
|
finally:
|
||||||
self.close_and_cleanup()
|
self.close_and_cleanup()
|
||||||
return wrapper_func
|
return wrapper_func
|
||||||
|
|
||||||
@ignore_exceptions # do not kill main_taskgroup
|
@ignore_exceptions # do not kill main_taskgroup
|
||||||
|
@log_exceptions
|
||||||
@handle_disconnect
|
@handle_disconnect
|
||||||
async def main_loop(self):
|
async def main_loop(self):
|
||||||
async with aiorpcx.TaskGroup() as group:
|
async with aiorpcx.TaskGroup() as group:
|
||||||
|
@ -214,7 +218,6 @@ class Peer(Logger):
|
||||||
await group.spawn(self.query_gossip())
|
await group.spawn(self.query_gossip())
|
||||||
await group.spawn(self.process_gossip())
|
await group.spawn(self.process_gossip())
|
||||||
|
|
||||||
@log_exceptions
|
|
||||||
async def process_gossip(self):
|
async def process_gossip(self):
|
||||||
# verify in peer's TaskGroup so that we fail the connection
|
# verify in peer's TaskGroup so that we fail the connection
|
||||||
while True:
|
while True:
|
||||||
|
@ -283,11 +286,16 @@ class Peer(Logger):
|
||||||
if not verify_sig_for_channel_update(payload, payload['start_node']):
|
if not verify_sig_for_channel_update(payload, payload['start_node']):
|
||||||
raise BaseException('verify error')
|
raise BaseException('verify error')
|
||||||
|
|
||||||
@log_exceptions
|
|
||||||
async def query_gossip(self):
|
async def query_gossip(self):
|
||||||
|
try:
|
||||||
await asyncio.wait_for(self.initialized.wait(), 10)
|
await asyncio.wait_for(self.initialized.wait(), 10)
|
||||||
|
except asyncio.TimeoutError as e:
|
||||||
|
raise GracefulDisconnect("initialize timed out") from e
|
||||||
if self.lnworker == self.lnworker.network.lngossip:
|
if self.lnworker == self.lnworker.network.lngossip:
|
||||||
|
try:
|
||||||
ids, complete = await asyncio.wait_for(self.get_channel_range(), 10)
|
ids, complete = await asyncio.wait_for(self.get_channel_range(), 10)
|
||||||
|
except asyncio.TimeoutError as e:
|
||||||
|
raise GracefulDisconnect("query_channel_range timed out") from e
|
||||||
self.logger.info('Received {} channel ids. (complete: {})'.format(len(ids), complete))
|
self.logger.info('Received {} channel ids. (complete: {})'.format(len(ids), complete))
|
||||||
self.lnworker.add_new_ids(ids)
|
self.lnworker.add_new_ids(ids)
|
||||||
while True:
|
while True:
|
||||||
|
@ -400,8 +408,7 @@ class Peer(Logger):
|
||||||
try:
|
try:
|
||||||
await asyncio.wait_for(self.initialize(), 10)
|
await asyncio.wait_for(self.initialize(), 10)
|
||||||
except (OSError, asyncio.TimeoutError, HandshakeFailed) as e:
|
except (OSError, asyncio.TimeoutError, HandshakeFailed) as e:
|
||||||
self.logger.info('initialize failed, disconnecting: {}'.format(repr(e)))
|
raise GracefulDisconnect(f'initialize failed: {repr(e)}') from e
|
||||||
return
|
|
||||||
async for msg in self.transport.read_messages():
|
async for msg in self.transport.read_messages():
|
||||||
self.process_message(msg)
|
self.process_message(msg)
|
||||||
await asyncio.sleep(.01)
|
await asyncio.sleep(.01)
|
||||||
|
|
Loading…
Add table
Reference in a new issue