mirror of
https://github.com/LBRYFoundation/LBRY-Vault.git
synced 2025-08-31 17:31:36 +00:00
ln: begin handling htlc failures
This commit is contained in:
parent
6d8cae11dd
commit
e9fec66eb4
4 changed files with 59 additions and 4 deletions
|
@ -11,6 +11,7 @@ import traceback
|
||||||
import json
|
import json
|
||||||
from collections import OrderedDict, defaultdict
|
from collections import OrderedDict, defaultdict
|
||||||
import asyncio
|
import asyncio
|
||||||
|
from concurrent.futures import FIRST_COMPLETED
|
||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
import binascii
|
import binascii
|
||||||
|
@ -598,11 +599,13 @@ class Peer(PrintError):
|
||||||
self.update_fulfill_htlc = defaultdict(asyncio.Queue)
|
self.update_fulfill_htlc = defaultdict(asyncio.Queue)
|
||||||
self.commitment_signed = defaultdict(asyncio.Queue)
|
self.commitment_signed = defaultdict(asyncio.Queue)
|
||||||
self.announcement_signatures = defaultdict(asyncio.Queue)
|
self.announcement_signatures = defaultdict(asyncio.Queue)
|
||||||
|
self.update_fail_htlc = defaultdict(asyncio.Queue)
|
||||||
self.is_funding_six_deep = defaultdict(lambda: False)
|
self.is_funding_six_deep = defaultdict(lambda: False)
|
||||||
self.localfeatures = (0x08 if request_initial_sync else 0)
|
self.localfeatures = (0x08 if request_initial_sync else 0)
|
||||||
self.nodes = {}
|
self.nodes = {}
|
||||||
self.channels = lnworker.channels
|
self.channels = lnworker.channels
|
||||||
self.invoices = lnworker.invoices
|
self.invoices = lnworker.invoices
|
||||||
|
self.attempted_route = {}
|
||||||
|
|
||||||
def diagnostic_name(self):
|
def diagnostic_name(self):
|
||||||
return self.host
|
return self.host
|
||||||
|
@ -1067,7 +1070,30 @@ class Peer(PrintError):
|
||||||
return h, node_signature, bitcoin_signature
|
return h, node_signature, bitcoin_signature
|
||||||
|
|
||||||
def on_update_fail_htlc(self, payload):
|
def on_update_fail_htlc(self, payload):
|
||||||
print("UPDATE_FAIL_HTLC", decode_onion_error(payload["reason"], self.node_keys, self.secret_key))
|
channel_id = payload["channel_id"]
|
||||||
|
htlc_id = int.from_bytes(payload["id"], "big")
|
||||||
|
key = (channel_id, htlc_id)
|
||||||
|
route = self.attempted_route[key]
|
||||||
|
failure_msg, sender_idx = decode_onion_error(payload["reason"], [x.node_id for x in route], self.secret_key)
|
||||||
|
code = failure_msg.code
|
||||||
|
data = failure_msg.data
|
||||||
|
codes = []
|
||||||
|
if code & 0x8000:
|
||||||
|
codes += ["BADONION"]
|
||||||
|
if code & 0x4000:
|
||||||
|
codes += ["PERM"]
|
||||||
|
if code & 0x2000:
|
||||||
|
codes += ["NODE"]
|
||||||
|
if code & 0x1000:
|
||||||
|
codes += ["UPDATE"]
|
||||||
|
print("UPDATE_FAIL_HTLC", codes, code, data)
|
||||||
|
try:
|
||||||
|
short_chan_id = route[sender_idx + 1].short_channel_id
|
||||||
|
except IndexError:
|
||||||
|
print("payment destination reported error")
|
||||||
|
|
||||||
|
self.lnworker.path_finder.blacklist.add(short_chan_id)
|
||||||
|
self.update_fail_htlc[payload["channel_id"]].put_nowait("HTLC failure with code {} (categories {})".format(code, codes))
|
||||||
|
|
||||||
@aiosafe
|
@aiosafe
|
||||||
async def pay(self, path, chan, amount_msat, payment_hash, pubkey_in_invoice, min_final_cltv_expiry):
|
async def pay(self, path, chan, amount_msat, payment_hash, pubkey_in_invoice, min_final_cltv_expiry):
|
||||||
|
@ -1085,9 +1111,8 @@ class Peer(PrintError):
|
||||||
total_fee += route_edge.channel_policy.fee_base_msat + ( amount_msat * route_edge.channel_policy.fee_proportional_millionths // 1000000 )
|
total_fee += route_edge.channel_policy.fee_base_msat + ( amount_msat * route_edge.channel_policy.fee_proportional_millionths // 1000000 )
|
||||||
associated_data = payment_hash
|
associated_data = payment_hash
|
||||||
self.secret_key = os.urandom(32)
|
self.secret_key = os.urandom(32)
|
||||||
self.node_keys = [x.node_id for x in route]
|
|
||||||
hops_data += [OnionHopsDataSingle(OnionPerHop(b"\x00"*8, amount_msat.to_bytes(8, "big"), (final_cltv_expiry_without_deltas).to_bytes(4, "big")))]
|
hops_data += [OnionHopsDataSingle(OnionPerHop(b"\x00"*8, amount_msat.to_bytes(8, "big"), (final_cltv_expiry_without_deltas).to_bytes(4, "big")))]
|
||||||
onion = new_onion_packet(self.node_keys, self.secret_key, hops_data, associated_data)
|
onion = new_onion_packet([x.node_id for x in route], self.secret_key, hops_data, associated_data)
|
||||||
msat_local = chan.local_state.amount_msat - (amount_msat + total_fee)
|
msat_local = chan.local_state.amount_msat - (amount_msat + total_fee)
|
||||||
msat_remote = chan.remote_state.amount_msat + (amount_msat + total_fee)
|
msat_remote = chan.remote_state.amount_msat + (amount_msat + total_fee)
|
||||||
htlc = UpdateAddHtlc(amount_msat, payment_hash, final_cltv_expiry_with_deltas, total_fee)
|
htlc = UpdateAddHtlc(amount_msat, payment_hash, final_cltv_expiry_with_deltas, total_fee)
|
||||||
|
@ -1097,6 +1122,7 @@ class Peer(PrintError):
|
||||||
|
|
||||||
m = HTLCStateMachine(chan)
|
m = HTLCStateMachine(chan)
|
||||||
m.add_htlc(htlc)
|
m.add_htlc(htlc)
|
||||||
|
self.attempted_route[(chan.channel_id, htlc.htlc_id)] = route
|
||||||
|
|
||||||
sig_64, htlc_sigs = m.sign_next_commitment()
|
sig_64, htlc_sigs = m.sign_next_commitment()
|
||||||
htlc_sig = htlc_sigs[0]
|
htlc_sig = htlc_sigs[0]
|
||||||
|
@ -1107,7 +1133,27 @@ class Peer(PrintError):
|
||||||
|
|
||||||
self.revoke(m)
|
self.revoke(m)
|
||||||
|
|
||||||
update_fulfill_htlc_msg = await self.update_fulfill_htlc[chan.channel_id].get()
|
fulfill_coro = asyncio.ensure_future(self.update_fulfill_htlc[chan.channel_id].get())
|
||||||
|
failure_coro = asyncio.ensure_future(self.update_fail_htlc[chan.channel_id].get())
|
||||||
|
|
||||||
|
done, pending = await asyncio.wait([fulfill_coro, failure_coro], return_when=FIRST_COMPLETED)
|
||||||
|
if failure_coro.done():
|
||||||
|
m.fail_htlc(htlc)
|
||||||
|
# TODO receive their commitment here
|
||||||
|
|
||||||
|
# TODO find out why the following block fails with "not enough htlc signatures"
|
||||||
|
sig_64, htlc_sigs = m.sign_next_commitment()
|
||||||
|
assert len(htlc_sigs) == 0
|
||||||
|
self.send_message(gen_msg("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=0))
|
||||||
|
|
||||||
|
await self.receive_revoke(m)
|
||||||
|
self.revoke(m)
|
||||||
|
fulfill_coro.cancel()
|
||||||
|
return failure_coro.result()
|
||||||
|
if fulfill_coro.done():
|
||||||
|
failure_coro.cancel()
|
||||||
|
update_fulfill_htlc_msg = fulfill_coro.result()
|
||||||
|
|
||||||
m.receive_htlc_settle(update_fulfill_htlc_msg["payment_preimage"], int.from_bytes(update_fulfill_htlc_msg["id"], "big"))
|
m.receive_htlc_settle(update_fulfill_htlc_msg["payment_preimage"], int.from_bytes(update_fulfill_htlc_msg["id"], "big"))
|
||||||
|
|
||||||
while (await self.commitment_signed[chan.channel_id].get())["htlc_signature"] != b"":
|
while (await self.commitment_signed[chan.channel_id].get())["htlc_signature"] != b"":
|
||||||
|
|
|
@ -394,3 +394,8 @@ class HTLCStateMachine(PrintError):
|
||||||
assert htlc.payment_hash == sha256(preimage)
|
assert htlc.payment_hash == sha256(preimage)
|
||||||
assert len([x.htlc_id == htlc_index for x in self.local_update_log]) == 1
|
assert len([x.htlc_id == htlc_index for x in self.local_update_log]) == 1
|
||||||
self.remote_update_log.append(SettleHtlc(htlc_index))
|
self.remote_update_log.append(SettleHtlc(htlc_index))
|
||||||
|
|
||||||
|
def fail_htlc(self, htlc):
|
||||||
|
# TODO
|
||||||
|
self.local_update_log = []
|
||||||
|
self.remote_update_log = []
|
||||||
|
|
|
@ -152,6 +152,7 @@ class LNPathFinder(PrintError):
|
||||||
|
|
||||||
def __init__(self, channel_db):
|
def __init__(self, channel_db):
|
||||||
self.channel_db = channel_db
|
self.channel_db = channel_db
|
||||||
|
self.blacklist = set()
|
||||||
|
|
||||||
def _edge_cost(self, short_channel_id: bytes, start_node: bytes, payment_amt_msat: int,
|
def _edge_cost(self, short_channel_id: bytes, start_node: bytes, payment_amt_msat: int,
|
||||||
ignore_cltv=False) -> float:
|
ignore_cltv=False) -> float:
|
||||||
|
@ -211,6 +212,7 @@ class LNPathFinder(PrintError):
|
||||||
# so there are duplicates in the queue, that we discard now:
|
# so there are duplicates in the queue, that we discard now:
|
||||||
continue
|
continue
|
||||||
for edge_channel_id in self.channel_db.get_channels_for_node(cur_node):
|
for edge_channel_id in self.channel_db.get_channels_for_node(cur_node):
|
||||||
|
if edge_channel_id in self.blacklist: continue
|
||||||
channel_info = self.channel_db.get_channel_info(edge_channel_id)
|
channel_info = self.channel_db.get_channel_info(edge_channel_id)
|
||||||
node1, node2 = channel_info.node_id_1, channel_info.node_id_2
|
node1, node2 = channel_info.node_id_1, channel_info.node_id_2
|
||||||
neighbour = node2 if node1 == cur_node else node1
|
neighbour = node2 if node1 == cur_node else node1
|
||||||
|
|
|
@ -193,6 +193,8 @@ class LNWorker(PrintError):
|
||||||
for chan in self.channels.values():
|
for chan in self.channels.values():
|
||||||
if chan.short_channel_id == short_channel_id:
|
if chan.short_channel_id == short_channel_id:
|
||||||
break
|
break
|
||||||
|
else:
|
||||||
|
raise Exception("ChannelDB returned path with short_channel_id that is not in channel list")
|
||||||
coro = peer.pay(path, chan, amount_msat, payment_hash, invoice_pubkey, addr.min_final_cltv_expiry)
|
coro = peer.pay(path, chan, amount_msat, payment_hash, invoice_pubkey, addr.min_final_cltv_expiry)
|
||||||
return asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop)
|
return asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop)
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue