mirror of
https://github.com/LBRYFoundation/LBRY-Vault.git
synced 2025-09-05 05:15:12 +00:00
Use a Future for lnpeer.initialized, so that exceptions are passed to the GUI
This commit is contained in:
parent
56cb45df70
commit
bda23ef73d
2 changed files with 23 additions and 15 deletions
|
@ -68,7 +68,7 @@ class Peer(Logger):
|
|||
def __init__(self, lnworker: Union['LNGossip', 'LNWallet'], pubkey:bytes, transport: LNTransportBase):
|
||||
self._sent_init = False # type: bool
|
||||
self._received_init = False # type: bool
|
||||
self.initialized = asyncio.Event()
|
||||
self.initialized = asyncio.Future()
|
||||
self.querying = asyncio.Event()
|
||||
self.transport = transport
|
||||
self.pubkey = pubkey # remote pubkey
|
||||
|
@ -100,7 +100,7 @@ class Peer(Logger):
|
|||
def send_message(self, message_name: str, **kwargs):
|
||||
assert type(message_name) is str
|
||||
self.logger.debug(f"Sending {message_name.upper()}")
|
||||
if message_name.upper() != "INIT" and not self.initialized.is_set():
|
||||
if message_name.upper() != "INIT" and not self.is_initialized():
|
||||
raise Exception("tried to send message before we are initialized")
|
||||
raw_msg = encode_msg(message_name, **kwargs)
|
||||
self._store_raw_msg_if_local_update(raw_msg, message_name=message_name, channel_id=kwargs.get("channel_id"))
|
||||
|
@ -117,11 +117,21 @@ class Peer(Logger):
|
|||
# saving now, to ensure replaying updates works (in case of channel reestablishment)
|
||||
self.lnworker.save_channel(chan)
|
||||
|
||||
def maybe_set_initialized(self):
|
||||
if self.initialized.done():
|
||||
return
|
||||
if self._sent_init and self._received_init:
|
||||
self.initialized.set_result(True)
|
||||
|
||||
def is_initialized(self):
|
||||
return self.initialized.done() and self.initialized.result() == True
|
||||
|
||||
async def initialize(self):
|
||||
if isinstance(self.transport, LNTransport):
|
||||
await self.transport.handshake()
|
||||
self.send_message("init", gflen=0, lflen=2, localfeatures=self.localfeatures)
|
||||
self._sent_init = True
|
||||
self.maybe_set_initialized()
|
||||
|
||||
@property
|
||||
def channels(self) -> Dict[bytes, Channel]:
|
||||
|
@ -191,12 +201,12 @@ class Peer(Logger):
|
|||
try:
|
||||
self.localfeatures = ln_compare_features(self.localfeatures, their_localfeatures)
|
||||
except ValueError as e:
|
||||
self.initialized.set_exception(e)
|
||||
raise GracefulDisconnect(f"remote does not support {str(e)}")
|
||||
if isinstance(self.transport, LNTransport):
|
||||
self.channel_db.add_recent_peer(self.transport.peer_addr)
|
||||
self._received_init = True
|
||||
if self._sent_init and self._received_init:
|
||||
self.initialized.set()
|
||||
self.maybe_set_initialized()
|
||||
|
||||
def on_node_announcement(self, payload):
|
||||
self.gossip_queue.put_nowait(('node_announcement', payload))
|
||||
|
@ -312,7 +322,7 @@ class Peer(Logger):
|
|||
|
||||
async def query_gossip(self):
|
||||
try:
|
||||
await asyncio.wait_for(self.initialized.wait(), LN_P2P_NETWORK_TIMEOUT)
|
||||
await asyncio.wait_for(self.initialized, LN_P2P_NETWORK_TIMEOUT)
|
||||
except asyncio.TimeoutError as e:
|
||||
raise GracefulDisconnect("initialize timed out") from e
|
||||
if self.lnworker == self.lnworker.network.lngossip:
|
||||
|
@ -433,8 +443,6 @@ class Peer(Logger):
|
|||
await asyncio.wait_for(self.initialize(), LN_P2P_NETWORK_TIMEOUT)
|
||||
except (OSError, asyncio.TimeoutError, HandshakeFailed) as e:
|
||||
raise GracefulDisconnect(f'initialize failed: {repr(e)}') from e
|
||||
if self._sent_init and self._received_init:
|
||||
self.initialized.set()
|
||||
async for msg in self.transport.read_messages():
|
||||
self.process_message(msg)
|
||||
await asyncio.sleep(.01)
|
||||
|
@ -497,7 +505,7 @@ class Peer(Logger):
|
|||
@log_exceptions
|
||||
async def channel_establishment_flow(self, password: Optional[str], funding_tx: 'PartialTransaction', funding_sat: int,
|
||||
push_msat: int, temp_channel_id: bytes) -> Tuple[Channel, 'PartialTransaction']:
|
||||
await asyncio.wait_for(self.initialized.wait(), LN_P2P_NETWORK_TIMEOUT)
|
||||
await asyncio.wait_for(self.initialized, LN_P2P_NETWORK_TIMEOUT)
|
||||
feerate = self.lnworker.current_feerate_per_kw()
|
||||
local_config = self.make_local_config(funding_sat, push_msat, LOCAL)
|
||||
# for the first commitment transaction
|
||||
|
@ -716,7 +724,7 @@ class Peer(Logger):
|
|||
|
||||
@log_exceptions
|
||||
async def reestablish_channel(self, chan: Channel):
|
||||
await self.initialized.wait()
|
||||
await self.initialized
|
||||
chan_id = chan.channel_id
|
||||
if chan.peer_state != peer_states.DISCONNECTED:
|
||||
self.logger.info('reestablish_channel was called but channel {} already in state {}'
|
||||
|
@ -1040,7 +1048,7 @@ class Peer(Logger):
|
|||
if chan.get_state() != channel_states.OPEN:
|
||||
raise PaymentFailure('Channel not open')
|
||||
assert amount_msat > 0, "amount_msat is not greater zero"
|
||||
await asyncio.wait_for(self.initialized.wait(), LN_P2P_NETWORK_TIMEOUT)
|
||||
await asyncio.wait_for(self.initialized, LN_P2P_NETWORK_TIMEOUT)
|
||||
# create onion packet
|
||||
final_cltv = self.network.get_local_height() + min_final_cltv_expiry
|
||||
hops_data, amount_msat, cltv = calc_hops_data_for_payment(route, amount_msat, final_cltv)
|
||||
|
|
|
@ -181,7 +181,7 @@ class LNWorker(Logger):
|
|||
return peer
|
||||
|
||||
def num_peers(self):
|
||||
return sum([p.initialized.is_set() for p in self.peers.values()])
|
||||
return sum([p.is_initialized() for p in self.peers.values()])
|
||||
|
||||
def start_network(self, network: 'Network'):
|
||||
assert network
|
||||
|
@ -588,7 +588,7 @@ class LNWallet(LNWorker):
|
|||
def suggest_peer(self):
|
||||
r = []
|
||||
for node_id, peer in self.peers.items():
|
||||
if not peer.initialized.is_set():
|
||||
if not peer.is_initialized():
|
||||
continue
|
||||
if not all([chan.is_closed() for chan in peer.channels.values()]):
|
||||
continue
|
||||
|
@ -666,7 +666,7 @@ class LNWallet(LNWorker):
|
|||
|
||||
if chan.get_state() == channel_states.FUNDED:
|
||||
peer = self.peers.get(chan.node_id)
|
||||
if peer and peer.initialized.is_set():
|
||||
if peer and peer.is_initialized():
|
||||
peer.send_funding_locked(chan)
|
||||
|
||||
elif chan.get_state() == channel_states.OPEN:
|
||||
|
@ -741,8 +741,8 @@ class LNWallet(LNWorker):
|
|||
funding_sat: int, push_sat: int,
|
||||
password: Optional[str]) -> Tuple[Channel, PartialTransaction]:
|
||||
peer = await self.add_peer(connect_str)
|
||||
# peer might just have been connected to
|
||||
await asyncio.wait_for(peer.initialized.wait(), LN_P2P_NETWORK_TIMEOUT)
|
||||
# will raise if init fails
|
||||
await asyncio.wait_for(peer.initialized, LN_P2P_NETWORK_TIMEOUT)
|
||||
chan, funding_tx = await peer.channel_establishment_flow(
|
||||
password,
|
||||
funding_tx=funding_tx,
|
||||
|
|
Loading…
Add table
Reference in a new issue