mirror of
https://github.com/LBRYFoundation/LBRY-Vault.git
synced 2025-08-29 16:31:29 +00:00
add lnchannel.can_send_ctx_updates. just drop illegal updates for now
This commit is contained in:
parent
9d1fa4cc99
commit
e54c69b861
4 changed files with 56 additions and 17 deletions
|
@ -151,6 +151,7 @@ class Channel(Logger):
|
||||||
self._outgoing_channel_update = None # type: Optional[bytes]
|
self._outgoing_channel_update = None # type: Optional[bytes]
|
||||||
self._chan_ann_without_sigs = None # type: Optional[bytes]
|
self._chan_ann_without_sigs = None # type: Optional[bytes]
|
||||||
self.revocation_store = RevocationStore(state["revocation_store"])
|
self.revocation_store = RevocationStore(state["revocation_store"])
|
||||||
|
self._can_send_ctx_updates = True # type: bool
|
||||||
|
|
||||||
def get_id_for_log(self) -> str:
|
def get_id_for_log(self) -> str:
|
||||||
scid = self.short_channel_id
|
scid = self.short_channel_id
|
||||||
|
@ -287,7 +288,7 @@ class Channel(Logger):
|
||||||
out[rhash] = (self.channel_id, htlc, direction)
|
out[rhash] = (self.channel_id, htlc, direction)
|
||||||
return out
|
return out
|
||||||
|
|
||||||
def open_with_first_pcp(self, remote_pcp, remote_sig):
|
def open_with_first_pcp(self, remote_pcp: bytes, remote_sig: bytes) -> None:
|
||||||
with self.db_lock:
|
with self.db_lock:
|
||||||
self.config[REMOTE].current_per_commitment_point = remote_pcp
|
self.config[REMOTE].current_per_commitment_point = remote_pcp
|
||||||
self.config[REMOTE].next_per_commitment_point = None
|
self.config[REMOTE].next_per_commitment_point = None
|
||||||
|
@ -321,6 +322,19 @@ class Channel(Logger):
|
||||||
# the closing txid has been saved
|
# the closing txid has been saved
|
||||||
return self.get_state() >= channel_states.CLOSED
|
return self.get_state() >= channel_states.CLOSED
|
||||||
|
|
||||||
|
def set_can_send_ctx_updates(self, b: bool) -> None:
|
||||||
|
self._can_send_ctx_updates = b
|
||||||
|
|
||||||
|
def can_send_ctx_updates(self) -> bool:
|
||||||
|
"""Whether we can send update_fee, update_*_htlc changes to the remote."""
|
||||||
|
if not self.is_open():
|
||||||
|
return False
|
||||||
|
if self.peer_state != peer_states.GOOD:
|
||||||
|
return False
|
||||||
|
if not self._can_send_ctx_updates:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
def save_funding_height(self, txid, height, timestamp):
|
def save_funding_height(self, txid, height, timestamp):
|
||||||
self.storage['funding_height'] = txid, height, timestamp
|
self.storage['funding_height'] = txid, height, timestamp
|
||||||
|
|
||||||
|
@ -345,6 +359,8 @@ class Channel(Logger):
|
||||||
raise PaymentFailure('Channel closed')
|
raise PaymentFailure('Channel closed')
|
||||||
if self.get_state() != channel_states.OPEN:
|
if self.get_state() != channel_states.OPEN:
|
||||||
raise PaymentFailure('Channel not open', self.get_state())
|
raise PaymentFailure('Channel not open', self.get_state())
|
||||||
|
if not self.can_send_ctx_updates():
|
||||||
|
raise PaymentFailure('Channel cannot send ctx updates')
|
||||||
if self.available_to_spend(LOCAL) < amount_msat:
|
if self.available_to_spend(LOCAL) < amount_msat:
|
||||||
raise PaymentFailure(f'Not enough local balance. Have: {self.available_to_spend(LOCAL)}, Need: {amount_msat}')
|
raise PaymentFailure(f'Not enough local balance. Have: {self.available_to_spend(LOCAL)}, Need: {amount_msat}')
|
||||||
if len(self.hm.htlcs(LOCAL)) + 1 > self.config[REMOTE].max_accepted_htlcs:
|
if len(self.hm.htlcs(LOCAL)) + 1 > self.config[REMOTE].max_accepted_htlcs:
|
||||||
|
@ -377,6 +393,7 @@ class Channel(Logger):
|
||||||
|
|
||||||
This docstring is from LND.
|
This docstring is from LND.
|
||||||
"""
|
"""
|
||||||
|
assert self.can_send_ctx_updates(), f"cannot update channel. {self.get_state()!r} {self.peer_state!r}"
|
||||||
if isinstance(htlc, dict): # legacy conversion # FIXME remove
|
if isinstance(htlc, dict): # legacy conversion # FIXME remove
|
||||||
htlc = UpdateAddHtlc(**htlc)
|
htlc = UpdateAddHtlc(**htlc)
|
||||||
assert isinstance(htlc, UpdateAddHtlc)
|
assert isinstance(htlc, UpdateAddHtlc)
|
||||||
|
@ -704,6 +721,7 @@ class Channel(Logger):
|
||||||
SettleHTLC attempts to settle an existing outstanding received HTLC.
|
SettleHTLC attempts to settle an existing outstanding received HTLC.
|
||||||
"""
|
"""
|
||||||
self.logger.info("settle_htlc")
|
self.logger.info("settle_htlc")
|
||||||
|
assert self.can_send_ctx_updates(), f"cannot update channel. {self.get_state()!r} {self.peer_state!r}"
|
||||||
log = self.hm.log[REMOTE]
|
log = self.hm.log[REMOTE]
|
||||||
htlc = log['adds'][htlc_id]
|
htlc = log['adds'][htlc_id]
|
||||||
assert htlc.payment_hash == sha256(preimage)
|
assert htlc.payment_hash == sha256(preimage)
|
||||||
|
@ -733,6 +751,7 @@ class Channel(Logger):
|
||||||
|
|
||||||
def fail_htlc(self, htlc_id):
|
def fail_htlc(self, htlc_id):
|
||||||
self.logger.info("fail_htlc")
|
self.logger.info("fail_htlc")
|
||||||
|
assert self.can_send_ctx_updates(), f"cannot update channel. {self.get_state()!r} {self.peer_state!r}"
|
||||||
with self.db_lock:
|
with self.db_lock:
|
||||||
self.hm.send_fail(htlc_id)
|
self.hm.send_fail(htlc_id)
|
||||||
|
|
||||||
|
@ -753,6 +772,7 @@ class Channel(Logger):
|
||||||
raise Exception(f"Cannot update_fee: wrong initiator. us: {from_us}")
|
raise Exception(f"Cannot update_fee: wrong initiator. us: {from_us}")
|
||||||
with self.db_lock:
|
with self.db_lock:
|
||||||
if from_us:
|
if from_us:
|
||||||
|
assert self.can_send_ctx_updates(), f"cannot update channel. {self.get_state()!r} {self.peer_state!r}"
|
||||||
self.hm.send_update_fee(feerate)
|
self.hm.send_update_fee(feerate)
|
||||||
else:
|
else:
|
||||||
self.hm.recv_update_fee(feerate)
|
self.hm.recv_update_fee(feerate)
|
||||||
|
|
|
@ -712,8 +712,8 @@ class Peer(Logger):
|
||||||
chan_id = chan.channel_id
|
chan_id = chan.channel_id
|
||||||
assert channel_states.PREOPENING < chan.get_state() < channel_states.CLOSED
|
assert channel_states.PREOPENING < chan.get_state() < channel_states.CLOSED
|
||||||
if chan.peer_state != peer_states.DISCONNECTED:
|
if chan.peer_state != peer_states.DISCONNECTED:
|
||||||
self.logger.info('reestablish_channel was called but channel {} already in state {}'
|
self.logger.info(f'reestablish_channel was called but channel {chan.get_id_for_log()} '
|
||||||
.format(chan_id, chan.get_state()))
|
f'already in peer_state {chan.peer_state}')
|
||||||
return
|
return
|
||||||
chan.peer_state = peer_states.REESTABLISHING
|
chan.peer_state = peer_states.REESTABLISHING
|
||||||
self.network.trigger_callback('channel', chan)
|
self.network.trigger_callback('channel', chan)
|
||||||
|
@ -890,7 +890,6 @@ class Peer(Logger):
|
||||||
if not chan:
|
if not chan:
|
||||||
raise Exception("Got unknown funding_locked", channel_id)
|
raise Exception("Got unknown funding_locked", channel_id)
|
||||||
if not chan.config[LOCAL].funding_locked_received:
|
if not chan.config[LOCAL].funding_locked_received:
|
||||||
our_next_point = chan.config[REMOTE].next_per_commitment_point
|
|
||||||
their_next_point = payload["next_per_commitment_point"]
|
their_next_point = payload["next_per_commitment_point"]
|
||||||
chan.config[REMOTE].next_per_commitment_point = their_next_point
|
chan.config[REMOTE].next_per_commitment_point = their_next_point
|
||||||
chan.config[LOCAL].funding_locked_received = True
|
chan.config[LOCAL].funding_locked_received = True
|
||||||
|
@ -1041,6 +1040,9 @@ class Peer(Logger):
|
||||||
raise PaymentFailure('Channel not open')
|
raise PaymentFailure('Channel not open')
|
||||||
assert amount_msat > 0, "amount_msat is not greater zero"
|
assert amount_msat > 0, "amount_msat is not greater zero"
|
||||||
await asyncio.wait_for(self.initialized, LN_P2P_NETWORK_TIMEOUT)
|
await asyncio.wait_for(self.initialized, LN_P2P_NETWORK_TIMEOUT)
|
||||||
|
# TODO also wait for channel reestablish to finish. (combine timeout with waiting for init?)
|
||||||
|
if not chan.can_send_ctx_updates():
|
||||||
|
raise PaymentFailure("Channel cannot send updates")
|
||||||
# create onion packet
|
# create onion packet
|
||||||
final_cltv = self.network.get_local_height() + min_final_cltv_expiry
|
final_cltv = self.network.get_local_height() + min_final_cltv_expiry
|
||||||
hops_data, amount_msat, cltv = calc_hops_data_for_payment(route, amount_msat, final_cltv)
|
hops_data, amount_msat, cltv = calc_hops_data_for_payment(route, amount_msat, final_cltv)
|
||||||
|
@ -1051,7 +1053,7 @@ class Peer(Logger):
|
||||||
htlc = UpdateAddHtlc(amount_msat=amount_msat, payment_hash=payment_hash, cltv_expiry=cltv, timestamp=int(time.time()))
|
htlc = UpdateAddHtlc(amount_msat=amount_msat, payment_hash=payment_hash, cltv_expiry=cltv, timestamp=int(time.time()))
|
||||||
htlc = chan.add_htlc(htlc)
|
htlc = chan.add_htlc(htlc)
|
||||||
remote_ctn = chan.get_latest_ctn(REMOTE)
|
remote_ctn = chan.get_latest_ctn(REMOTE)
|
||||||
chan.onion_keys[htlc.htlc_id] = secret_key
|
chan.set_onion_key(htlc.htlc_id, secret_key)
|
||||||
self.logger.info(f"starting payment. len(route)={len(route)}. route: {route}. htlc: {htlc}")
|
self.logger.info(f"starting payment. len(route)={len(route)}. route: {route}. htlc: {htlc}")
|
||||||
self.send_message("update_add_htlc",
|
self.send_message("update_add_htlc",
|
||||||
channel_id=chan.channel_id,
|
channel_id=chan.channel_id,
|
||||||
|
@ -1136,6 +1138,9 @@ class Peer(Logger):
|
||||||
timestamp=int(time.time()),
|
timestamp=int(time.time()),
|
||||||
htlc_id=htlc_id)
|
htlc_id=htlc_id)
|
||||||
htlc = chan.receive_htlc(htlc)
|
htlc = chan.receive_htlc(htlc)
|
||||||
|
# TODO: fulfilling/failing/forwarding of htlcs should be robust to going offline.
|
||||||
|
# instead of storing state implicitly in coroutines, we could decouple it from receiving the htlc.
|
||||||
|
# maybe persist the required details, and have a long-running task that makes these decisions.
|
||||||
local_ctn = chan.get_latest_ctn(LOCAL)
|
local_ctn = chan.get_latest_ctn(LOCAL)
|
||||||
remote_ctn = chan.get_latest_ctn(REMOTE)
|
remote_ctn = chan.get_latest_ctn(REMOTE)
|
||||||
if processed_onion.are_we_final:
|
if processed_onion.are_we_final:
|
||||||
|
@ -1179,8 +1184,9 @@ class Peer(Logger):
|
||||||
return
|
return
|
||||||
outgoing_chan_upd = next_chan.get_outgoing_gossip_channel_update()[2:]
|
outgoing_chan_upd = next_chan.get_outgoing_gossip_channel_update()[2:]
|
||||||
outgoing_chan_upd_len = len(outgoing_chan_upd).to_bytes(2, byteorder="big")
|
outgoing_chan_upd_len = len(outgoing_chan_upd).to_bytes(2, byteorder="big")
|
||||||
if next_chan.get_state() != channel_states.OPEN:
|
if not next_chan.can_send_ctx_updates():
|
||||||
self.logger.info(f"cannot forward htlc. next_chan not OPEN: {next_chan_scid} in state {next_chan.get_state()}")
|
self.logger.info(f"cannot forward htlc. next_chan {next_chan_scid} cannot send ctx updates. "
|
||||||
|
f"chan state {next_chan.get_state()}, peer state: {next_chan.peer_state}")
|
||||||
reason = OnionRoutingFailureMessage(code=OnionFailureCode.TEMPORARY_CHANNEL_FAILURE,
|
reason = OnionRoutingFailureMessage(code=OnionFailureCode.TEMPORARY_CHANNEL_FAILURE,
|
||||||
data=outgoing_chan_upd_len+outgoing_chan_upd)
|
data=outgoing_chan_upd_len+outgoing_chan_upd)
|
||||||
await self.fail_htlc(chan, htlc.htlc_id, onion_packet, reason)
|
await self.fail_htlc(chan, htlc.htlc_id, onion_packet, reason)
|
||||||
|
@ -1277,6 +1283,10 @@ class Peer(Logger):
|
||||||
|
|
||||||
async def _fulfill_htlc(self, chan: Channel, htlc_id: int, preimage: bytes):
|
async def _fulfill_htlc(self, chan: Channel, htlc_id: int, preimage: bytes):
|
||||||
self.logger.info(f"_fulfill_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}")
|
self.logger.info(f"_fulfill_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}")
|
||||||
|
if not chan.can_send_ctx_updates():
|
||||||
|
self.logger.info(f"dropping chan update (fulfill htlc {htlc_id}) for {chan.short_channel_id}. "
|
||||||
|
f"cannot send updates")
|
||||||
|
return
|
||||||
chan.settle_htlc(preimage, htlc_id)
|
chan.settle_htlc(preimage, htlc_id)
|
||||||
payment_hash = sha256(preimage)
|
payment_hash = sha256(preimage)
|
||||||
self.lnworker.payment_received(payment_hash)
|
self.lnworker.payment_received(payment_hash)
|
||||||
|
@ -1290,6 +1300,10 @@ class Peer(Logger):
|
||||||
async def fail_htlc(self, chan: Channel, htlc_id: int, onion_packet: OnionPacket,
|
async def fail_htlc(self, chan: Channel, htlc_id: int, onion_packet: OnionPacket,
|
||||||
reason: OnionRoutingFailureMessage):
|
reason: OnionRoutingFailureMessage):
|
||||||
self.logger.info(f"fail_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}. reason: {reason}")
|
self.logger.info(f"fail_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}. reason: {reason}")
|
||||||
|
if not chan.can_send_ctx_updates():
|
||||||
|
self.logger.info(f"dropping chan update (fail htlc {htlc_id}) for {chan.short_channel_id}. "
|
||||||
|
f"cannot send updates")
|
||||||
|
return
|
||||||
chan.fail_htlc(htlc_id)
|
chan.fail_htlc(htlc_id)
|
||||||
remote_ctn = chan.get_latest_ctn(REMOTE)
|
remote_ctn = chan.get_latest_ctn(REMOTE)
|
||||||
error_packet = construct_onion_error(reason, onion_packet, our_onion_private_key=self.privkey)
|
error_packet = construct_onion_error(reason, onion_packet, our_onion_private_key=self.privkey)
|
||||||
|
@ -1323,6 +1337,8 @@ class Peer(Logger):
|
||||||
"""
|
"""
|
||||||
called when our fee estimates change
|
called when our fee estimates change
|
||||||
"""
|
"""
|
||||||
|
if not chan.can_send_ctx_updates():
|
||||||
|
return
|
||||||
if not chan.constraints.is_initiator:
|
if not chan.constraints.is_initiator:
|
||||||
# TODO force close if initiator does not update_fee enough
|
# TODO force close if initiator does not update_fee enough
|
||||||
return
|
return
|
||||||
|
@ -1372,7 +1388,7 @@ class Peer(Logger):
|
||||||
async def send_shutdown(self, chan: Channel):
|
async def send_shutdown(self, chan: Channel):
|
||||||
scriptpubkey = bfh(bitcoin.address_to_script(chan.sweep_address))
|
scriptpubkey = bfh(bitcoin.address_to_script(chan.sweep_address))
|
||||||
# wait until no more pending updates (bolt2)
|
# wait until no more pending updates (bolt2)
|
||||||
# TODO: stop sending updates during that time
|
chan.set_can_send_ctx_updates(False)
|
||||||
ctn = chan.get_latest_ctn(REMOTE)
|
ctn = chan.get_latest_ctn(REMOTE)
|
||||||
if chan.has_pending_changes(REMOTE):
|
if chan.has_pending_changes(REMOTE):
|
||||||
await self.await_remote(chan, ctn)
|
await self.await_remote(chan, ctn)
|
||||||
|
|
|
@ -157,13 +157,12 @@ def create_test_channels(feerate=6000, local=None, remote=None):
|
||||||
alice_second = lnutil.secret_to_pubkey(int.from_bytes(lnutil.get_per_commitment_secret_from_seed(alice_seed, lnutil.RevocationStore.START_INDEX - 1), "big"))
|
alice_second = lnutil.secret_to_pubkey(int.from_bytes(lnutil.get_per_commitment_secret_from_seed(alice_seed, lnutil.RevocationStore.START_INDEX - 1), "big"))
|
||||||
bob_second = lnutil.secret_to_pubkey(int.from_bytes(lnutil.get_per_commitment_secret_from_seed(bob_seed, lnutil.RevocationStore.START_INDEX - 1), "big"))
|
bob_second = lnutil.secret_to_pubkey(int.from_bytes(lnutil.get_per_commitment_secret_from_seed(bob_seed, lnutil.RevocationStore.START_INDEX - 1), "big"))
|
||||||
|
|
||||||
alice.config[REMOTE].next_per_commitment_point = bob_second
|
alice.open_with_first_pcp(bob_first, sig_from_bob)
|
||||||
alice.config[REMOTE].current_per_commitment_point = bob_first
|
bob.open_with_first_pcp(alice_first, sig_from_alice)
|
||||||
bob.config[REMOTE].next_per_commitment_point = alice_second
|
|
||||||
bob.config[REMOTE].current_per_commitment_point = alice_first
|
|
||||||
|
|
||||||
alice.hm.channel_open_finished()
|
# from funding_locked:
|
||||||
bob.hm.channel_open_finished()
|
alice.config[REMOTE].next_per_commitment_point = bob_second
|
||||||
|
bob.config[REMOTE].next_per_commitment_point = alice_second
|
||||||
|
|
||||||
# TODO: sweep_address in lnchannel.py should use static_remotekey
|
# TODO: sweep_address in lnchannel.py should use static_remotekey
|
||||||
alice.sweep_address = bitcoin.pubkey_to_address('p2wpkh', alice.config[LOCAL].payment_basepoint.pubkey.hex())
|
alice.sweep_address = bitcoin.pubkey_to_address('p2wpkh', alice.config[LOCAL].payment_basepoint.pubkey.hex())
|
||||||
|
|
|
@ -225,6 +225,8 @@ class TestPeer(ElectrumTestCase):
|
||||||
def test_reestablish(self):
|
def test_reestablish(self):
|
||||||
alice_channel, bob_channel = create_test_channels()
|
alice_channel, bob_channel = create_test_channels()
|
||||||
p1, p2, w1, w2, _q1, _q2 = self.prepare_peers(alice_channel, bob_channel)
|
p1, p2, w1, w2, _q1, _q2 = self.prepare_peers(alice_channel, bob_channel)
|
||||||
|
for chan in (alice_channel, bob_channel):
|
||||||
|
chan.peer_state = peer_states.DISCONNECTED
|
||||||
async def reestablish():
|
async def reestablish():
|
||||||
await asyncio.gather(
|
await asyncio.gather(
|
||||||
p1.reestablish_channel(alice_channel),
|
p1.reestablish_channel(alice_channel),
|
||||||
|
@ -254,6 +256,8 @@ class TestPeer(ElectrumTestCase):
|
||||||
run(f())
|
run(f())
|
||||||
|
|
||||||
p1, p2, w1, w2, _q1, _q2 = self.prepare_peers(alice_channel_0, bob_channel)
|
p1, p2, w1, w2, _q1, _q2 = self.prepare_peers(alice_channel_0, bob_channel)
|
||||||
|
for chan in (alice_channel_0, bob_channel):
|
||||||
|
chan.peer_state = peer_states.DISCONNECTED
|
||||||
async def reestablish():
|
async def reestablish():
|
||||||
await asyncio.gather(
|
await asyncio.gather(
|
||||||
p1.reestablish_channel(alice_channel_0),
|
p1.reestablish_channel(alice_channel_0),
|
||||||
|
|
Loading…
Add table
Reference in a new issue