#!/usr/bin/env python3 """ Lightning network interface for Electrum Derived from https://gist.github.com/AdamISZ/046d05c156aaeb56cc897f85eecb3eb8 """ from collections import OrderedDict, defaultdict import json import asyncio import os import time import hashlib import hmac from functools import partial from typing import List import cryptography.hazmat.primitives.ciphers.aead as AEAD import aiorpcx from .util import list_enabled_bits from . import bitcoin from . import ecc from .ecc import sig_string_from_r_and_s, get_r_and_s_from_sig_string from .crypto import sha256 from . import constants from .util import PrintError, bh2u, print_error, bfh, log_exceptions from .transaction import Transaction, TxOutput from .lnonion import new_onion_packet, OnionHopsDataSingle, OnionPerHop, decode_onion_error, ONION_FAILURE_CODE_MAP from .lnaddr import lndecode from .lnhtlc import HTLCStateMachine, RevokeAndAck from .lnutil import (Outpoint, ChannelConfig, LocalState, RemoteState, OnlyPubkeyKeypair, ChannelConstraints, RevocationStore, funding_output_script, get_ecdh, get_per_commitment_secret_from_seed, secret_to_pubkey, LNPeerAddr, PaymentFailure, LnLocalFeatures, LOCAL, REMOTE, HTLCOwner, generate_keypair, LnKeyFamily, get_ln_flag_pair_of_bit) from .lnrouter import NotFoundChanAnnouncementForUpdate, RouteEdge def channel_id_from_funding_tx(funding_txid, funding_index): funding_txid_bytes = bytes.fromhex(funding_txid)[::-1] i = int.from_bytes(funding_txid_bytes, 'big') ^ funding_index return i.to_bytes(32, 'big'), funding_txid_bytes class LightningError(Exception): pass class LightningPeerConnectionClosed(LightningError): pass message_types = {} def handlesingle(x, ma): """ Evaluate a term of the simple language used to specify lightning message field lengths. If `x` is an integer, it is returned as is, otherwise it is treated as a variable and looked up in `ma`. It the value in `ma` was no integer, it is assumed big-endian bytes and decoded. Returns int """ try: x = int(x) except ValueError: x = ma[x] try: x = int(x) except ValueError: x = int.from_bytes(x, byteorder='big') return x def calcexp(exp, ma): """ Evaluate simple mathematical expression given in `exp` with variables assigned in the dict `ma` Returns int """ exp = str(exp) if "*" in exp: assert "+" not in exp result = 1 for term in exp.split("*"): result *= handlesingle(term, ma) return result return sum(handlesingle(x, ma) for x in exp.split("+")) def make_handler(k, v): """ Generate a message handler function (taking bytes) for message type `k` with specification `v` Check lib/lightning.json, `k` could be 'init', and `v` could be { type: 16, payload: { 'gflen': ..., ... }, ... } Returns function taking bytes """ def handler(data): nonlocal k, v ma = {} pos = 0 for fieldname in v["payload"]: poslenMap = v["payload"][fieldname] if "feature" in poslenMap and pos == len(data): continue #print(poslenMap["position"], ma) assert pos == calcexp(poslenMap["position"], ma) length = poslenMap["length"] length = calcexp(length, ma) ma[fieldname] = data[pos:pos+length] pos += length assert pos == len(data), (k, pos, len(data)) return k, ma return handler path = os.path.join(os.path.dirname(__file__), 'lightning.json') with open(path) as f: structured = json.loads(f.read(), object_pairs_hook=OrderedDict) for k in structured: v = structured[k] # these message types are skipped since their types collide # (for example with pong, which also uses type=19) # we don't need them yet if k in ["final_incorrect_cltv_expiry", "final_incorrect_htlc_amount"]: continue if len(v["payload"]) == 0: continue try: num = int(v["type"]) except ValueError: #print("skipping", k) continue byts = num.to_bytes(2, 'big') assert byts not in message_types, (byts, message_types[byts].__name__, k) names = [x.__name__ for x in message_types.values()] assert k + "_handler" not in names, (k, names) message_types[byts] = make_handler(k, v) message_types[byts].__name__ = k + "_handler" assert message_types[b"\x00\x10"].__name__ == "init_handler" def decode_msg(data): """ Decode Lightning message by reading the first two bytes to determine message type. Returns message type string and parsed message contents dict """ typ = data[:2] k, parsed = message_types[typ](data[2:]) return k, parsed def gen_msg(msg_type, **kwargs): """ Encode kwargs into a Lightning message (bytes) of the type given in the msg_type string """ typ = structured[msg_type] data = int(typ["type"]).to_bytes(2, 'big') lengths = {} for k in typ["payload"]: poslenMap = typ["payload"][k] if "feature" in poslenMap: continue leng = calcexp(poslenMap["length"], lengths) try: clone = dict(lengths) clone.update(kwargs) leng = calcexp(poslenMap["length"], clone) except KeyError: pass try: param = kwargs[k] except KeyError: param = 0 try: if not isinstance(param, bytes): assert isinstance(param, int), "field {} is neither bytes or int".format(k) param = param.to_bytes(leng, 'big') except ValueError: raise Exception("{} does not fit in {} bytes".format(k, leng)) lengths[k] = len(param) if lengths[k] != leng: raise Exception("field {} is {} bytes long, should be {} bytes long".format(k, lengths[k], leng)) data += param return data class HandshakeState(object): prologue = b"lightning" protocol_name = b"Noise_XK_secp256k1_ChaChaPoly_SHA256" handshake_version = b"\x00" def __init__(self, responder_pub): self.responder_pub = responder_pub self.h = sha256(self.protocol_name) self.ck = self.h self.update(self.prologue) self.update(self.responder_pub) def update(self, data): self.h = sha256(self.h + data) return self.h class HandshakeFailed(Exception): pass def get_nonce_bytes(n): """BOLT 8 requires the nonce to be 12 bytes, 4 bytes leading zeroes and 8 bytes little endian encoded 64 bit integer. """ return b"\x00"*4 + n.to_bytes(8, 'little') def aead_encrypt(k, nonce, associated_data, data): nonce_bytes = get_nonce_bytes(nonce) a = AEAD.ChaCha20Poly1305(k) return a.encrypt(nonce_bytes, data, associated_data) def aead_decrypt(k, nonce, associated_data, data): nonce_bytes = get_nonce_bytes(nonce) a = AEAD.ChaCha20Poly1305(k) #raises InvalidTag exception if it's not valid return a.decrypt(nonce_bytes, data, associated_data) def get_bolt8_hkdf(salt, ikm): """RFC5869 HKDF instantiated in the specific form used in Lightning BOLT 8: Extract and expand to 64 bytes using HMAC-SHA256, with info field set to a zero length string as per BOLT8 Return as two 32 byte fields. """ #Extract prk = hmac.new(salt, msg=ikm, digestmod=hashlib.sha256).digest() assert len(prk) == 32 #Expand info = b"" T0 = b"" T1 = hmac.new(prk, T0 + info + b"\x01", digestmod=hashlib.sha256).digest() T2 = hmac.new(prk, T1 + info + b"\x02", digestmod=hashlib.sha256).digest() assert len(T1 + T2) == 64 return T1, T2 def act1_initiator_message(hs, epriv, epub): hs.update(epub) ss = get_ecdh(epriv, hs.responder_pub) ck2, temp_k1 = get_bolt8_hkdf(hs.ck, ss) hs.ck = ck2 c = aead_encrypt(temp_k1, 0, hs.h, b"") #for next step if we do it hs.update(c) msg = hs.handshake_version + epub + c assert len(msg) == 50 return msg def privkey_to_pubkey(priv: bytes) -> bytes: return ecc.ECPrivkey(priv[:32]).get_public_key_bytes() def create_ephemeral_key() -> (bytes, bytes): privkey = ecc.ECPrivkey.generate_random_key() return privkey.get_secret_bytes(), privkey.get_public_key_bytes() class Peer(PrintError): def __init__(self, lnworker, host, port, pubkey, request_initial_sync=False): self.host = host self.port = port self.pubkey = pubkey self.peer_addr = LNPeerAddr(host, port, pubkey) self.lnworker = lnworker self.privkey = lnworker.node_keypair.privkey self.network = lnworker.network self.lnwatcher = lnworker.network.lnwatcher self.channel_db = lnworker.network.channel_db self.read_buffer = b'' self.ping_time = 0 self.initialized = asyncio.Future() self.channel_accepted = defaultdict(asyncio.Queue) self.channel_reestablished = defaultdict(asyncio.Future) self.funding_signed = defaultdict(asyncio.Queue) self.funding_created = defaultdict(asyncio.Queue) self.revoke_and_ack = defaultdict(asyncio.Queue) self.commitment_signed = defaultdict(asyncio.Queue) self.announcement_signatures = defaultdict(asyncio.Queue) self.closing_signed = defaultdict(asyncio.Queue) self.payment_preimages = defaultdict(asyncio.Queue) self.localfeatures = LnLocalFeatures(0) if request_initial_sync: self.localfeatures |= LnLocalFeatures.INITIAL_ROUTING_SYNC self.localfeatures |= LnLocalFeatures.OPTION_DATA_LOSS_PROTECT_OPT self.invoices = lnworker.invoices self.attempted_route = {} @property def channels(self): return self.lnworker.channels_for_peer(self.pubkey) def diagnostic_name(self): return 'lnbase:' + str(self.host) def ping_if_required(self): if time.time() - self.ping_time > 120: self.send_message(gen_msg('ping', num_pong_bytes=4, byteslen=4)) self.ping_time = time.time() def send_message(self, msg): message_type, payload = decode_msg(msg) self.print_error("Sending '%s'"%message_type.upper()) l = len(msg).to_bytes(2, 'big') lc = aead_encrypt(self.sk, self.sn(), b'', l) c = aead_encrypt(self.sk, self.sn(), b'', msg) assert len(lc) == 18 assert len(c) == len(msg) + 16 self.writer.write(lc+c) async def read_message(self): rn_l, rk_l = self.rn() rn_m, rk_m = self.rn() while True: if len(self.read_buffer) >= 18: lc = self.read_buffer[:18] l = aead_decrypt(rk_l, rn_l, b'', lc) length = int.from_bytes(l, 'big') offset = 18 + length + 16 if len(self.read_buffer) >= offset: c = self.read_buffer[18:offset] self.read_buffer = self.read_buffer[offset:] msg = aead_decrypt(rk_m, rn_m, b'', c) return msg try: s = await self.reader.read(2**10) except: s = None if not s: raise LightningPeerConnectionClosed() self.read_buffer += s async def handshake(self): hs = HandshakeState(self.pubkey) # Get a new ephemeral key epriv, epub = create_ephemeral_key() msg = act1_initiator_message(hs, epriv, epub) # act 1 self.writer.write(msg) rspns = await self.reader.read(2**10) if len(rspns) != 50: raise HandshakeFailed("Lightning handshake act 1 response has bad length, are you sure this is the right pubkey? " + str(bh2u(self.pubkey))) hver, alice_epub, tag = rspns[0], rspns[1:34], rspns[34:] if bytes([hver]) != hs.handshake_version: raise HandshakeFailed("unexpected handshake version: {}".format(hver)) # act 2 hs.update(alice_epub) ss = get_ecdh(epriv, alice_epub) ck, temp_k2 = get_bolt8_hkdf(hs.ck, ss) hs.ck = ck p = aead_decrypt(temp_k2, 0, hs.h, tag) hs.update(tag) # act 3 my_pubkey = privkey_to_pubkey(self.privkey) c = aead_encrypt(temp_k2, 1, hs.h, my_pubkey) hs.update(c) ss = get_ecdh(self.privkey[:32], alice_epub) ck, temp_k3 = get_bolt8_hkdf(hs.ck, ss) hs.ck = ck t = aead_encrypt(temp_k3, 0, hs.h, b'') self.sk, self.rk = get_bolt8_hkdf(hs.ck, b'') msg = hs.handshake_version + c + t self.writer.write(msg) # init counters self._sn = 0 self._rn = 0 self.r_ck = ck self.s_ck = ck def rn(self): o = self._rn, self.rk self._rn += 1 if self._rn == 1000: self.r_ck, self.rk = get_bolt8_hkdf(self.r_ck, self.rk) self._rn = 0 return o def sn(self): o = self._sn self._sn += 1 if self._sn == 1000: self.s_ck, self.sk = get_bolt8_hkdf(self.s_ck, self.sk) self._sn = 0 return o def process_message(self, message): message_type, payload = decode_msg(message) try: f = getattr(self, 'on_' + message_type) except AttributeError: self.print_error("Received '%s'" % message_type.upper(), payload) return # raw message is needed to check signature if message_type=='node_announcement': payload['raw'] = message execution_result = f(payload) if asyncio.iscoroutinefunction(f): asyncio.ensure_future(execution_result) def on_error(self, payload): self.print_error("error", payload["data"].decode("ascii")) chan_id = payload.get("channel_id") for d in [ self.channel_accepted, self.channel_reestablished, self.funding_signed, self.funding_created, self.revoke_and_ack, self.commitment_signed, self.announcement_signatures, self.closing_signed ]: if chan_id in d: self.channel_accepted[chan_id].put_nowait({'error':payload['data']}) def on_ping(self, payload): l = int.from_bytes(payload['num_pong_bytes'], 'big') self.send_message(gen_msg('pong', byteslen=l)) def on_pong(self, payload): pass def on_accept_channel(self, payload): temp_chan_id = payload["temporary_channel_id"] if temp_chan_id not in self.channel_accepted: raise Exception("Got unknown accept_channel") self.channel_accepted[temp_chan_id].put_nowait(payload) def on_funding_signed(self, payload): channel_id = payload['channel_id'] if channel_id not in self.funding_signed: raise Exception("Got unknown funding_signed") self.funding_signed[channel_id].put_nowait(payload) def on_funding_created(self, payload): channel_id = payload['temporary_channel_id'] if channel_id not in self.funding_created: raise Exception("Got unknown funding_created") self.funding_created[channel_id].put_nowait(payload) def on_node_announcement(self, payload): self.channel_db.on_node_announcement(payload) self.network.trigger_callback('ln_status') def on_init(self, payload): # if they required some even flag we don't have, they will close themselves # but if we require an even flag they don't have, we close our_flags = set(list_enabled_bits(self.localfeatures)) their_flags = set(list_enabled_bits(int.from_bytes(payload['localfeatures'], byteorder="big"))) for flag in our_flags: if flag not in their_flags and get_ln_flag_pair_of_bit(flag) not in their_flags: # they don't have this feature we wanted :( if flag % 2 == 0: # even flags are compulsory raise LightningPeerConnectionClosed("remote does not have even flag {}" .format(str(LnLocalFeatures(1 << flag)))) self.localfeatures ^= 1 << flag # disable flag def on_channel_update(self, payload): try: self.channel_db.on_channel_update(payload) except NotFoundChanAnnouncementForUpdate: # If it's for a direct channel with this peer, save it in chan. # Note that this is prone to a race.. we might not have a short_channel_id # associated with the channel in some cases short_channel_id = payload['short_channel_id'] for chan in self.channels.values(): if chan.short_channel_id_predicted == short_channel_id: chan.pending_channel_update_message = payload def on_channel_announcement(self, payload): self.channel_db.on_channel_announcement(payload) def on_announcement_signatures(self, payload): channel_id = payload['channel_id'] chan = self.channels[payload['channel_id']] if chan.local_state.was_announced: h, local_node_sig, local_bitcoin_sig = self.send_announcement_signatures(chan) else: self.announcement_signatures[channel_id].put_nowait(payload) async def initialize(self): self.reader, self.writer = await asyncio.open_connection(self.host, self.port) await self.handshake() # send init self.send_message(gen_msg("init", gflen=0, lflen=1, localfeatures=self.localfeatures)) # read init msg = await self.read_message() self.process_message(msg) self.initialized.set_result(True) def handle_disconnect(func): async def wrapper_func(self, *args, **kwargs): try: return await func(self, *args, **kwargs) except LightningPeerConnectionClosed as e: self.print_error("disconnecting gracefully. {}".format(e)) finally: self.close_and_cleanup() self.lnworker.peers.pop(self.pubkey) return wrapper_func @aiosafe @handle_disconnect async def main_loop(self): try: await asyncio.wait_for(self.initialize(), 10) except (OSError, asyncio.TimeoutError, HandshakeFailed) as e: self.print_error('disconnecting due to: {}'.format(repr(e))) return self.channel_db.add_recent_peer(self.peer_addr) # loop while True: self.ping_if_required() msg = await self.read_message() self.process_message(msg) def close_and_cleanup(self): try: self.writer.close() except: pass for chan in self.channels.values(): chan.set_state('DISCONNECTED') self.network.trigger_callback('channel', chan) def make_local_config(self, funding_sat, push_msat, initiator: HTLCOwner): # key derivation channel_counter = self.lnworker.get_and_inc_counter_for_channel_keys() keypair_generator = lambda family: generate_keypair(self.lnworker.ln_keystore, family, channel_counter) if initiator == LOCAL: initial_msat = funding_sat * 1000 - push_msat else: initial_msat = push_msat local_config=ChannelConfig( payment_basepoint=keypair_generator(LnKeyFamily.PAYMENT_BASE), multisig_key=keypair_generator(LnKeyFamily.MULTISIG), htlc_basepoint=keypair_generator(LnKeyFamily.HTLC_BASE), delayed_basepoint=keypair_generator(LnKeyFamily.DELAY_BASE), revocation_basepoint=keypair_generator(LnKeyFamily.REVOCATION_BASE), to_self_delay=143, dust_limit_sat=546, max_htlc_value_in_flight_msat=0xffffffffffffffff, max_accepted_htlcs=5, initial_msat=initial_msat, ) per_commitment_secret_seed = keypair_generator(LnKeyFamily.REVOCATION_ROOT).privkey return local_config, per_commitment_secret_seed @log_exceptions async def channel_establishment_flow(self, password, funding_sat, push_msat, temp_channel_id): await self.initialized local_config, per_commitment_secret_seed = self.make_local_config(funding_sat, push_msat, LOCAL) # amounts local_feerate = self.current_feerate_per_kw() # for the first commitment transaction per_commitment_secret_first = get_per_commitment_secret_from_seed(per_commitment_secret_seed, RevocationStore.START_INDEX) per_commitment_point_first = secret_to_pubkey(int.from_bytes(per_commitment_secret_first, 'big')) msg = gen_msg( "open_channel", temporary_channel_id=temp_channel_id, chain_hash=constants.net.rev_genesis_bytes(), funding_satoshis=funding_sat, push_msat=push_msat, dust_limit_satoshis=local_config.dust_limit_sat, feerate_per_kw=local_feerate, max_accepted_htlcs=local_config.max_accepted_htlcs, funding_pubkey=local_config.multisig_key.pubkey, revocation_basepoint=local_config.revocation_basepoint.pubkey, htlc_basepoint=local_config.htlc_basepoint.pubkey, payment_basepoint=local_config.payment_basepoint.pubkey, delayed_payment_basepoint=local_config.delayed_basepoint.pubkey, first_per_commitment_point=per_commitment_point_first, to_self_delay=local_config.to_self_delay, max_htlc_value_in_flight_msat=local_config.max_htlc_value_in_flight_msat, channel_flags=0x00, # not willing to announce channel channel_reserve_satoshis=546 ) self.send_message(msg) payload = await self.channel_accepted[temp_channel_id].get() if payload.get('error'): raise Exception(payload.get('error')) remote_per_commitment_point = payload['first_per_commitment_point'] remote_config=ChannelConfig( payment_basepoint=OnlyPubkeyKeypair(payload['payment_basepoint']), multisig_key=OnlyPubkeyKeypair(payload["funding_pubkey"]), htlc_basepoint=OnlyPubkeyKeypair(payload['htlc_basepoint']), delayed_basepoint=OnlyPubkeyKeypair(payload['delayed_payment_basepoint']), revocation_basepoint=OnlyPubkeyKeypair(payload['revocation_basepoint']), to_self_delay=int.from_bytes(payload['to_self_delay'], byteorder='big'), dust_limit_sat=int.from_bytes(payload['dust_limit_satoshis'], byteorder='big'), max_htlc_value_in_flight_msat=int.from_bytes(payload['max_htlc_value_in_flight_msat'], 'big'), max_accepted_htlcs=int.from_bytes(payload["max_accepted_htlcs"], 'big'), initial_msat=push_msat ) funding_txn_minimum_depth = int.from_bytes(payload['minimum_depth'], 'big') assert remote_config.dust_limit_sat < 600 assert int.from_bytes(payload['htlc_minimum_msat'], 'big') < 600 * 1000 assert remote_config.max_htlc_value_in_flight_msat >= 198 * 1000 * 1000, remote_config.max_htlc_value_in_flight_msat self.print_error('remote delay', remote_config.to_self_delay) self.print_error('funding_txn_minimum_depth', funding_txn_minimum_depth) # create funding tx redeem_script = funding_output_script(local_config, remote_config) funding_address = bitcoin.redeem_script_to_address('p2wsh', redeem_script) funding_output = TxOutput(bitcoin.TYPE_ADDRESS, funding_address, funding_sat) funding_tx = self.lnworker.wallet.mktx([funding_output], password, self.lnworker.config, 1000) funding_txid = funding_tx.txid() funding_index = funding_tx.outputs().index(funding_output) # compute amounts local_amount = funding_sat*1000 - push_msat remote_amount = push_msat # remote commitment transaction channel_id, funding_txid_bytes = channel_id_from_funding_tx(funding_txid, funding_index) their_revocation_store = RevocationStore() chan = { "node_id": self.pubkey, "channel_id": channel_id, "short_channel_id": None, "funding_outpoint": Outpoint(funding_txid, funding_index), "local_config": local_config, "remote_config": remote_config, "remote_state": RemoteState( ctn = -1, next_per_commitment_point=remote_per_commitment_point, current_per_commitment_point=None, amount_msat=remote_amount, revocation_store=their_revocation_store, next_htlc_id = 0, feerate=local_feerate ), "local_state": LocalState( ctn = -1, per_commitment_secret_seed=per_commitment_secret_seed, amount_msat=local_amount, next_htlc_id = 0, funding_locked_received = False, was_announced = False, current_commitment_signature = None, current_htlc_signatures = None, feerate=local_feerate ), "constraints": ChannelConstraints(capacity=funding_sat, is_initiator=True, funding_txn_minimum_depth=funding_txn_minimum_depth), "remote_commitment_to_be_revoked": None, } m = HTLCStateMachine(chan) m.lnwatcher = self.lnwatcher m.sweep_address = self.lnworker.sweep_address sig_64, _ = m.sign_next_commitment() self.send_message(gen_msg("funding_created", temporary_channel_id=temp_channel_id, funding_txid=funding_txid_bytes, funding_output_index=funding_index, signature=sig_64)) payload = await self.funding_signed[channel_id].get() self.print_error('received funding_signed') remote_sig = payload['signature'] m.receive_new_commitment(remote_sig, []) # broadcast funding tx success, _txid = await self.network.broadcast_transaction(funding_tx) assert success, success m.remote_commitment_to_be_revoked = m.pending_remote_commitment m.remote_state = m.remote_state._replace(ctn=0) m.local_state = m.local_state._replace(ctn=0, current_commitment_signature=remote_sig) m.set_state('OPENING') return m async def on_open_channel(self, payload): # payload['channel_flags'] # payload['channel_reserve_satoshis'] if payload['chain_hash'] != constants.net.rev_genesis_bytes(): raise Exception('wrong chain_hash') funding_sat = int.from_bytes(payload['funding_satoshis'], 'big') push_msat = int.from_bytes(payload['push_msat'], 'big') remote_config = ChannelConfig( payment_basepoint=OnlyPubkeyKeypair(payload['payment_basepoint']), multisig_key=OnlyPubkeyKeypair(payload['funding_pubkey']), htlc_basepoint=OnlyPubkeyKeypair(payload['htlc_basepoint']), delayed_basepoint=OnlyPubkeyKeypair(payload['delayed_payment_basepoint']), revocation_basepoint=OnlyPubkeyKeypair(payload['revocation_basepoint']), to_self_delay=int.from_bytes(payload['to_self_delay'], 'big'), dust_limit_sat=int.from_bytes(payload['dust_limit_satoshis'], 'big'), max_htlc_value_in_flight_msat=int.from_bytes(payload['max_htlc_value_in_flight_msat'], 'big'), max_accepted_htlcs=int.from_bytes(payload['max_accepted_htlcs'], 'big'), initial_msat=funding_sat * 1000 - push_msat, ) temp_chan_id = payload['temporary_channel_id'] local_config, per_commitment_secret_seed = self.make_local_config(funding_sat * 1000, push_msat, REMOTE) # for the first commitment transaction per_commitment_secret_first = get_per_commitment_secret_from_seed(per_commitment_secret_seed, RevocationStore.START_INDEX) per_commitment_point_first = secret_to_pubkey(int.from_bytes(per_commitment_secret_first, 'big')) min_depth = 3 self.send_message(gen_msg('accept_channel', temporary_channel_id=temp_chan_id, dust_limit_satoshis=local_config.dust_limit_sat, max_htlc_value_in_flight_msat=local_config.max_htlc_value_in_flight_msat, channel_reserve_satoshis=546, htlc_minimum_msat=1000, minimum_depth=min_depth, to_self_delay=local_config.to_self_delay, max_accepted_htlcs=local_config.max_accepted_htlcs, funding_pubkey=local_config.multisig_key.pubkey, revocation_basepoint=local_config.revocation_basepoint.pubkey, payment_basepoint=local_config.payment_basepoint.pubkey, delayed_payment_basepoint=local_config.delayed_basepoint.pubkey, htlc_basepoint=local_config.htlc_basepoint.pubkey, first_per_commitment_point=per_commitment_point_first, )) funding_created = await self.funding_created[temp_chan_id].get() funding_idx = int.from_bytes(funding_created['funding_output_index'], 'big') funding_txid = bh2u(funding_created['funding_txid'][::-1]) channel_id, funding_txid_bytes = channel_id_from_funding_tx(funding_txid, funding_idx) their_revocation_store = RevocationStore() local_feerate = int.from_bytes(payload['feerate_per_kw'], 'big') chan = { "node_id": self.pubkey, "channel_id": channel_id, "short_channel_id": None, "funding_outpoint": Outpoint(funding_txid, funding_idx), "local_config": local_config, "remote_config": remote_config, "remote_state": RemoteState( ctn = -1, next_per_commitment_point=payload['first_per_commitment_point'], current_per_commitment_point=None, amount_msat=remote_config.initial_msat, revocation_store=their_revocation_store, next_htlc_id = 0, feerate=local_feerate ), "local_state": LocalState( ctn = -1, per_commitment_secret_seed=per_commitment_secret_seed, amount_msat=local_config.initial_msat, next_htlc_id = 0, funding_locked_received = False, was_announced = False, current_commitment_signature = None, current_htlc_signatures = None, feerate=local_feerate ), "constraints": ChannelConstraints(capacity=funding_sat, is_initiator=False, funding_txn_minimum_depth=min_depth), "remote_commitment_to_be_revoked": None, } m = HTLCStateMachine(chan) m.lnwatcher = self.lnwatcher m.sweep_address = self.lnworker.sweep_address remote_sig = funding_created['signature'] m.receive_new_commitment(remote_sig, []) sig_64, _ = m.sign_next_commitment() self.send_message(gen_msg('funding_signed', channel_id=channel_id, signature=sig_64, )) m.set_state('OPENING') m.remote_commitment_to_be_revoked = m.pending_remote_commitment m.remote_state = m.remote_state._replace(ctn=0) m.local_state = m.local_state._replace(ctn=0, current_commitment_signature=remote_sig) self.lnworker.save_channel(m) self.lnwatcher.watch_channel(m.get_funding_address(), m.funding_outpoint.to_str()) self.lnworker.on_channels_updated() while True: try: funding_tx = Transaction(await self.network.get_transaction(funding_txid)) except aiorpcx.jsonrpc.RPCError as e: print("sleeping", str(e)) await asyncio.sleep(1) else: break outp = funding_tx.outputs()[funding_idx] redeem_script = funding_output_script(remote_config, local_config) funding_address = bitcoin.redeem_script_to_address('p2wsh', redeem_script) if outp != TxOutput(bitcoin.TYPE_ADDRESS, funding_address, funding_sat): m.set_state('DISCONNECTED') raise Exception('funding outpoint mismatch') @aiosafe async def reestablish_channel(self, chan): await self.initialized chan_id = chan.channel_id if chan.get_state() != 'DISCONNECTED': self.print_error('reestablish_channel was called but channel {} already in state {}' .format(chan_id, chan.get_state())) return chan.set_state('REESTABLISHING') self.network.trigger_callback('channel', chan) self.send_message(gen_msg("channel_reestablish", channel_id=chan_id, next_local_commitment_number=chan.local_state.ctn+1, next_remote_revocation_number=chan.remote_state.ctn )) await self.channel_reestablished[chan_id] chan.set_state('OPENING') if chan.local_state.funding_locked_received and chan.short_channel_id: self.mark_open(chan) self.network.trigger_callback('channel', chan) def on_channel_reestablish(self, payload): chan_id = payload["channel_id"] self.print_error("Received channel_reestablish", bh2u(chan_id)) chan = self.channels.get(chan_id) if not chan: print("Warning: received unknown channel_reestablish", bh2u(chan_id)) return def try_to_get_remote_to_force_close_with_their_latest(): self.print_error("trying to get remote to force close", bh2u(chan_id)) self.send_message(gen_msg("channel_reestablish", channel_id=chan_id, next_local_commitment_number=0, next_remote_revocation_number=0 )) channel_reestablish_msg = payload # compare remote ctns remote_ctn = int.from_bytes(channel_reestablish_msg["next_local_commitment_number"], 'big') if remote_ctn != chan.remote_state.ctn + 1: self.print_error("expected remote ctn {}, got {}".format(chan.remote_state.ctn + 1, remote_ctn)) # TODO iff their ctn is lower than ours, we should force close instead try_to_get_remote_to_force_close_with_their_latest() return # compare local ctns local_ctn = int.from_bytes(channel_reestablish_msg["next_remote_revocation_number"], 'big') if local_ctn != chan.local_state.ctn: self.print_error("expected local ctn {}, got {}".format(chan.local_state.ctn, local_ctn)) # TODO iff their ctn is lower than ours, we should force close instead try_to_get_remote_to_force_close_with_their_latest() return # compare per commitment points (needs data_protect option) their_pcp = channel_reestablish_msg.get("my_current_per_commitment_point", None) if their_pcp is not None: our_pcp = chan.remote_state.current_per_commitment_point if our_pcp is None: our_pcp = chan.remote_state.next_per_commitment_point if our_pcp != their_pcp: self.print_error("Remote PCP mismatch: {} {}".format(bh2u(our_pcp), bh2u(their_pcp))) # FIXME ...what now? try_to_get_remote_to_force_close_with_their_latest() return # checks done self.channel_reestablished[chan_id].set_result(True) def funding_locked(self, chan): channel_id = chan.channel_id per_commitment_secret_index = RevocationStore.START_INDEX - 1 per_commitment_point_second = secret_to_pubkey(int.from_bytes( get_per_commitment_secret_from_seed(chan.local_state.per_commitment_secret_seed, per_commitment_secret_index), 'big')) # note: if funding_locked was not yet received, we might send it multiple times self.send_message(gen_msg("funding_locked", channel_id=channel_id, next_per_commitment_point=per_commitment_point_second)) if chan.local_state.funding_locked_received: self.mark_open(chan) def on_funding_locked(self, payload): channel_id = payload['channel_id'] chan = self.channels.get(channel_id) if not chan: print(self.channels) raise Exception("Got unknown funding_locked", channel_id) if not chan.local_state.funding_locked_received: our_next_point = chan.remote_state.next_per_commitment_point their_next_point = payload["next_per_commitment_point"] new_remote_state = chan.remote_state._replace(next_per_commitment_point=their_next_point, current_per_commitment_point=our_next_point) new_local_state = chan.local_state._replace(funding_locked_received = True) chan.remote_state=new_remote_state chan.local_state=new_local_state self.lnworker.save_channel(chan) if chan.short_channel_id: self.mark_open(chan) def on_network_update(self, chan, funding_tx_depth): """ Only called when the channel is OPEN. Runs on the Network thread. """ if not chan.local_state.was_announced and funding_tx_depth >= 6: # don't announce our channels # FIXME should this be a field in chan.local_state maybe? return chan.local_state=chan.local_state._replace(was_announced=True) coro = self.handle_announcements(chan) self.lnworker.save_channel(chan) asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop) @aiosafe async def handle_announcements(self, chan): h, local_node_sig, local_bitcoin_sig = self.send_announcement_signatures(chan) announcement_signatures_msg = await self.announcement_signatures[chan.channel_id].get() remote_node_sig = announcement_signatures_msg["node_signature"] remote_bitcoin_sig = announcement_signatures_msg["bitcoin_signature"] if not ecc.verify_signature(chan.remote_config.multisig_key.pubkey, remote_bitcoin_sig, h): raise Exception("bitcoin_sig invalid in announcement_signatures") if not ecc.verify_signature(self.pubkey, remote_node_sig, h): raise Exception("node_sig invalid in announcement_signatures") node_sigs = [local_node_sig, remote_node_sig] bitcoin_sigs = [local_bitcoin_sig, remote_bitcoin_sig] node_ids = [privkey_to_pubkey(self.privkey), self.pubkey] bitcoin_keys = [chan.local_config.multisig_key.pubkey, chan.remote_config.multisig_key.pubkey] if node_ids[0] > node_ids[1]: node_sigs.reverse() bitcoin_sigs.reverse() node_ids.reverse() bitcoin_keys.reverse() channel_announcement = gen_msg("channel_announcement", node_signatures_1=node_sigs[0], node_signatures_2=node_sigs[1], bitcoin_signature_1=bitcoin_sigs[0], bitcoin_signature_2=bitcoin_sigs[1], len=0, #features not set (defaults to zeros) chain_hash=constants.net.rev_genesis_bytes(), short_channel_id=chan.short_channel_id, node_id_1=node_ids[0], node_id_2=node_ids[1], bitcoin_key_1=bitcoin_keys[0], bitcoin_key_2=bitcoin_keys[1] ) self.send_message(channel_announcement) print("SENT CHANNEL ANNOUNCEMENT") def mark_open(self, chan): if chan.get_state() == "OPEN": return # NOTE: even closed channels will be temporarily marked "OPEN" assert chan.local_state.funding_locked_received chan.set_state("OPEN") self.network.trigger_callback('channel', chan) # add channel to database pubkey_ours = self.lnworker.node_keypair.pubkey pubkey_theirs = self.pubkey node_ids = [pubkey_theirs, pubkey_ours] bitcoin_keys = [chan.local_config.multisig_key.pubkey, chan.remote_config.multisig_key.pubkey] sorted_node_ids = list(sorted(node_ids)) if sorted_node_ids != node_ids: node_ids = sorted_node_ids bitcoin_keys.reverse() # note: we inject a channel announcement, and a channel update (for outgoing direction) # This is atm needed for # - finding routes # - the ChanAnn is needed so that we can anchor to it a future ChanUpd # that the remote sends, even if the channel was not announced # (from BOLT-07: "MAY create a channel_update to communicate the channel # parameters to the final node, even though the channel has not yet been announced") self.channel_db.on_channel_announcement({"short_channel_id": chan.short_channel_id, "node_id_1": node_ids[0], "node_id_2": node_ids[1], 'chain_hash': constants.net.rev_genesis_bytes(), 'len': b'\x00\x00', 'features': b'', 'bitcoin_key_1': bitcoin_keys[0], 'bitcoin_key_2': bitcoin_keys[1]}, trusted=True) # only inject outgoing direction: flags = b'\x00' if node_ids[0] == pubkey_ours else b'\x01' now = int(time.time()).to_bytes(4, byteorder="big") self.channel_db.on_channel_update({"short_channel_id": chan.short_channel_id, 'flags': flags, 'cltv_expiry_delta': b'\x90', 'htlc_minimum_msat': b'\x03\xe8', 'fee_base_msat': b'\x03\xe8', 'fee_proportional_millionths': b'\x01', 'chain_hash': constants.net.rev_genesis_bytes(), 'timestamp': now}, trusted=True) # peer may have sent us a channel update for the incoming direction previously # note: if we were offline when the 3rd conf happened, lnd will never send us this channel_update # see https://github.com/lightningnetwork/lnd/issues/1347 #self.send_message(gen_msg("query_short_channel_ids", chain_hash=constants.net.rev_genesis_bytes(), # len=9, encoded_short_ids=b'\x00'+chan.short_channel_id)) if hasattr(chan, 'pending_channel_update_message'): self.on_channel_update(chan.pending_channel_update_message) self.print_error("CHANNEL OPENING COMPLETED") def send_announcement_signatures(self, chan): bitcoin_keys = [chan.local_config.multisig_key.pubkey, chan.remote_config.multisig_key.pubkey] node_ids = [privkey_to_pubkey(self.privkey), self.pubkey] sorted_node_ids = list(sorted(node_ids)) if sorted_node_ids != node_ids: node_ids = sorted_node_ids bitcoin_keys.reverse() chan_ann = gen_msg("channel_announcement", len=0, #features not set (defaults to zeros) chain_hash=constants.net.rev_genesis_bytes(), short_channel_id=chan.short_channel_id, node_id_1=node_ids[0], node_id_2=node_ids[1], bitcoin_key_1=bitcoin_keys[0], bitcoin_key_2=bitcoin_keys[1] ) to_hash = chan_ann[256+2:] h = bitcoin.Hash(to_hash) bitcoin_signature = ecc.ECPrivkey(chan.local_config.multisig_key.privkey).sign(h, sig_string_from_r_and_s, get_r_and_s_from_sig_string) node_signature = ecc.ECPrivkey(self.privkey).sign(h, sig_string_from_r_and_s, get_r_and_s_from_sig_string) self.send_message(gen_msg("announcement_signatures", channel_id=chan.channel_id, short_channel_id=chan.short_channel_id, node_signature=node_signature, bitcoin_signature=bitcoin_signature )) return h, node_signature, bitcoin_signature @aiosafe async def on_update_fail_htlc(self, payload): channel_id = payload["channel_id"] chan = self.channels[channel_id] htlc_id = int.from_bytes(payload["id"], "big") key = (channel_id, htlc_id) try: route = self.attempted_route[key] except KeyError: # the remote might try to fail an htlc after we restarted... # attempted_route is not persisted, so we will get here then self.print_error("UPDATE_FAIL_HTLC. cannot decode! attempted route is MISSING. {}".format(key)) else: failure_msg, sender_idx = decode_onion_error(payload["reason"], [x.node_id for x in route], chan.onion_keys[htlc_id]) code = failure_msg.code code_name = ONION_FAILURE_CODE_MAP.get(code, 'unknown_error??') data = failure_msg.data self.print_error("UPDATE_FAIL_HTLC", code_name, code, data) try: short_chan_id = route[sender_idx + 1].short_channel_id except IndexError: self.print_error("payment destination reported error") else: # TODO this should depend on the error # also, we need finer blacklisting (directed edges; nodes) self.network.path_finder.blacklist.add(short_chan_id) self.print_error("HTLC failure with code {} ({})".format(code, code_name)) # process update_fail_htlc on channel chan = self.channels[channel_id] chan.receive_fail_htlc(htlc_id) await self.receive_commitment(chan) self.revoke(chan) self.send_commitment(chan) # htlc will be removed await self.receive_revoke(chan) self.lnworker.save_channel(chan) def send_commitment(self, chan): sig_64, htlc_sigs = chan.sign_next_commitment() self.send_message(gen_msg("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=len(htlc_sigs), htlc_signature=b"".join(htlc_sigs))) return len(htlc_sigs) async def update_channel(self, chan, update): """ generic channel update flow """ self.send_message(update) self.send_commitment(chan) await self.receive_revoke(chan) await self.receive_commitment(chan) self.revoke(chan) async def pay(self, route: List[RouteEdge], chan, amount_msat, payment_hash, min_final_cltv_expiry): assert chan.get_state() == "OPEN", chan.get_state() assert amount_msat > 0, "amount_msat is not greater zero" height = self.network.get_local_height() hops_data = [] sum_of_deltas = sum(route_edge.cltv_expiry_delta for route_edge in route[1:]) total_fee = 0 final_cltv_expiry_without_deltas = (height + min_final_cltv_expiry) final_cltv_expiry_with_deltas = final_cltv_expiry_without_deltas + sum_of_deltas for idx, route_edge in enumerate(route[1:]): hops_data += [OnionHopsDataSingle(OnionPerHop(route_edge.short_channel_id, amount_msat.to_bytes(8, "big"), final_cltv_expiry_without_deltas.to_bytes(4, "big")))] total_fee += route_edge.fee_base_msat + ( amount_msat * route_edge.fee_proportional_millionths // 1000000 ) associated_data = payment_hash secret_key = os.urandom(32) hops_data += [OnionHopsDataSingle(OnionPerHop(b"\x00"*8, amount_msat.to_bytes(8, "big"), (final_cltv_expiry_without_deltas).to_bytes(4, "big")))] onion = new_onion_packet([x.node_id for x in route], secret_key, hops_data, associated_data) amount_msat += total_fee # FIXME this below will probably break with multiple HTLCs msat_local = chan.balance(LOCAL) - amount_msat msat_remote = chan.balance(REMOTE) + amount_msat htlc = {'amount_msat':amount_msat, 'payment_hash':payment_hash, 'cltv_expiry':final_cltv_expiry_with_deltas} # FIXME if we raise here, this channel will not get blacklisted, and the payment can never succeed, # as we will just keep retrying this same path. using the current blacklisting is not a solution as # then no other payment can use this channel either. # we need finer blacklisting -- e.g. a blacklist for just this "payment session"? # or blacklist entries could store an msat value and also expire if len(chan.htlcs_in_local) + 1 > chan.remote_config.max_accepted_htlcs: raise PaymentFailure('too many HTLCs already in channel') if chan.htlcsum(chan.htlcs_in_local) + amount_msat > chan.remote_config.max_htlc_value_in_flight_msat: raise PaymentFailure('HTLC value sum would exceed max allowed: {} msat'.format(chan.remote_config.max_htlc_value_in_flight_msat)) if msat_local < 0: # FIXME what about channel_reserve_satoshis? will the remote fail the channel if we go below? test. raise PaymentFailure('not enough local balance') htlc_id = chan.add_htlc(htlc) chan.onion_keys[htlc_id] = secret_key update = gen_msg("update_add_htlc", channel_id=chan.channel_id, id=htlc_id, cltv_expiry=final_cltv_expiry_with_deltas, amount_msat=amount_msat, payment_hash=payment_hash, onion_routing_packet=onion.to_bytes()) self.attempted_route[(chan.channel_id, htlc_id)] = route await self.update_channel(chan, update) async def receive_revoke(self, m): revoke_and_ack_msg = await self.revoke_and_ack[m.channel_id].get() m.receive_revocation(RevokeAndAck(revoke_and_ack_msg["per_commitment_secret"], revoke_and_ack_msg["next_per_commitment_point"])) def revoke(self, m): rev, _ = m.revoke_current_commitment() self.lnworker.save_channel(m) self.send_message(gen_msg("revoke_and_ack", channel_id=m.channel_id, per_commitment_secret=rev.per_commitment_secret, next_per_commitment_point=rev.next_per_commitment_point)) async def receive_commitment(self, m, commitment_signed_msg=None): if commitment_signed_msg is None: commitment_signed_msg = await self.commitment_signed[m.channel_id].get() data = commitment_signed_msg["htlc_signature"] htlc_sigs = [data[i:i+64] for i in range(0, len(data), 64)] m.receive_new_commitment(commitment_signed_msg["signature"], htlc_sigs) return len(htlc_sigs) @aiosafe async def receive_commitment_revoke_ack(self, htlc, decoded, payment_preimage): chan = self.channels[htlc['channel_id']] channel_id = chan.channel_id expected_received_msat = int(decoded.amount * bitcoin.COIN * 1000) htlc_id = int.from_bytes(htlc["id"], 'big') assert htlc_id == chan.remote_state.next_htlc_id, (htlc_id, chan.remote_state.next_htlc_id) assert chan.get_state() == "OPEN" cltv_expiry = int.from_bytes(htlc["cltv_expiry"], 'big') # TODO verify sanity of their cltv expiry amount_msat = int.from_bytes(htlc["amount_msat"], 'big') assert amount_msat == expected_received_msat payment_hash = htlc["payment_hash"] htlc = {'amount_msat': amount_msat, 'payment_hash':payment_hash, 'cltv_expiry':cltv_expiry} chan.receive_htlc(htlc) assert (await self.receive_commitment(chan)) <= 1 self.revoke(chan) self.send_commitment(chan) await self.receive_revoke(chan) chan.settle_htlc(payment_preimage, htlc_id) fulfillment = gen_msg("update_fulfill_htlc", channel_id=channel_id, id=htlc_id, payment_preimage=payment_preimage) await self.update_channel(chan, fulfillment) self.lnworker.save_channel(chan) def on_commitment_signed(self, payload): self.print_error("commitment_signed", payload) channel_id = payload['channel_id'] chan = self.channels[channel_id] chan.local_state=chan.local_state._replace( current_commitment_signature=payload['signature'], current_htlc_signatures=payload['htlc_signature']) self.lnworker.save_channel(chan) self.commitment_signed[channel_id].put_nowait(payload) @aiosafe async def on_update_fulfill_htlc(self, update_fulfill_htlc_msg): self.print_error("update_fulfill") chan = self.channels[update_fulfill_htlc_msg["channel_id"]] preimage = update_fulfill_htlc_msg["payment_preimage"] htlc_id = int.from_bytes(update_fulfill_htlc_msg["id"], "big") htlc = chan.lookup_htlc(chan.log[LOCAL], htlc_id) chan.receive_htlc_settle(preimage, htlc_id) await self.receive_commitment(chan) self.revoke(chan) self.send_commitment(chan) # htlc will be removed await self.receive_revoke(chan) self.lnworker.save_channel(chan) # used in lightning-integration self.payment_preimages[sha256(preimage)].put_nowait(preimage) def on_update_fail_malformed_htlc(self, payload): self.print_error("error", payload["data"].decode("ascii")) def on_update_add_htlc(self, payload): # no onion routing for the moment: we assume we are the end node self.print_error('on_update_add_htlc', payload) # check if this in our list of requests payment_hash = payload["payment_hash"] for k in self.invoices.keys(): preimage = bfh(k) if sha256(preimage) == payment_hash: req = self.invoices[k] decoded = lndecode(req, expected_hrp=constants.net.SEGWIT_HRP) coro = self.receive_commitment_revoke_ack(payload, decoded, preimage) asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop) break else: assert False def on_revoke_and_ack(self, payload): print("got revoke_and_ack") channel_id = payload["channel_id"] self.revoke_and_ack[channel_id].put_nowait(payload) def on_update_fee(self, payload): channel_id = payload["channel_id"] self.channels[channel_id].receive_update_fee(int.from_bytes(payload["feerate_per_kw"], "big")) async def bitcoin_fee_update(self, chan): """ called when our fee estimates change """ if not chan.constraints.is_initiator: # TODO force close if initiator does not update_fee enough return feerate_per_kw = self.current_feerate_per_kw() chan_fee = chan.pending_feerate(REMOTE) self.print_error("current pending feerate", chan_fee) self.print_error("new feerate", feerate_per_kw) if feerate_per_kw < chan_fee / 2: self.print_error("FEES HAVE FALLEN") elif feerate_per_kw > chan_fee * 2: self.print_error("FEES HAVE RISEN") else: return chan.update_fee(feerate_per_kw) update = gen_msg("update_fee", channel_id=chan.channel_id, feerate_per_kw=feerate_per_kw) await self.update_channel(chan, update) def current_feerate_per_kw(self): from .simple_config import FEE_LN_ETA_TARGET, FEERATE_FALLBACK_STATIC_FEE feerate_per_kvbyte = self.network.config.eta_target_to_fee(FEE_LN_ETA_TARGET) if feerate_per_kvbyte is None: feerate_per_kvbyte = FEERATE_FALLBACK_STATIC_FEE return max(253, feerate_per_kvbyte // 4) def on_closing_signed(self, payload): chan_id = payload["channel_id"] if chan_id not in self.closing_signed: raise Exception("Got unknown closing_signed") self.closing_signed[chan_id].put_nowait(payload) @aiosafe async def on_shutdown(self, payload): # length of scripts allowed in BOLT-02 if int.from_bytes(payload['len'], 'big') not in (3+20+2, 2+20+1, 2+20, 2+32): raise Exception('scriptpubkey length in received shutdown message invalid: ' + str(payload['len'])) chan = self.channels[payload['channel_id']] scriptpubkey = bfh(bitcoin.address_to_script(chan.sweep_address)) self.send_message(gen_msg('shutdown', channel_id=chan.channel_id, len=len(scriptpubkey), scriptpubkey=scriptpubkey)) signature, fee = chan.make_closing_tx(scriptpubkey, payload['scriptpubkey']) self.send_message(gen_msg('closing_signed', channel_id=chan.channel_id, fee_satoshis=fee, signature=signature)) while chan.get_state() != 'CLOSED': try: closing_signed = await asyncio.wait_for(self.closing_signed[chan.channel_id].get(), 1) except asyncio.TimeoutError: pass else: fee = int.from_bytes(closing_signed['fee_satoshis'], 'big') signature, _ = chan.make_closing_tx(scriptpubkey, payload['scriptpubkey'], fee_sat=fee) self.send_message(gen_msg('closing_signed', channel_id=chan.channel_id, fee_satoshis=fee, signature=signature)) self.print_error('REMOTE PEER CLOSED CHANNEL')