mirror of
https://github.com/LBRYFoundation/LBRY-Vault.git
synced 2025-08-23 17:47:31 +00:00
move on_funding_locked to lnworker
This commit is contained in:
parent
0552c61b66
commit
b71f020fc9
3 changed files with 60 additions and 106 deletions
|
@ -763,6 +763,23 @@ class Commands:
|
||||||
# for the python console
|
# for the python console
|
||||||
return sorted(known_commands.keys())
|
return sorted(known_commands.keys())
|
||||||
|
|
||||||
|
# lightning network commands
|
||||||
|
@command('wpn')
|
||||||
|
def open_channel(self, node_id, amount, push_msat=0, password=None):
|
||||||
|
self.wallet.lnworker.open_channel(node_id, amount, push_msat, password)
|
||||||
|
|
||||||
|
@command('wn')
|
||||||
|
def reestablish_channel(self):
|
||||||
|
self.wallet.lnworker.reestablish_channel()
|
||||||
|
|
||||||
|
@command('wn')
|
||||||
|
def lnpay():
|
||||||
|
self.wallet.lnworker.pay()
|
||||||
|
|
||||||
|
@command('wn')
|
||||||
|
def lnreceive():
|
||||||
|
self.wallet.lnworker.get_paid()
|
||||||
|
|
||||||
def eval_bool(x: str) -> bool:
|
def eval_bool(x: str) -> bool:
|
||||||
if x == 'false': return False
|
if x == 'false': return False
|
||||||
if x == 'true': return True
|
if x == 'true': return True
|
||||||
|
@ -820,6 +837,7 @@ command_options = {
|
||||||
'timeout': (None, "Timeout in seconds"),
|
'timeout': (None, "Timeout in seconds"),
|
||||||
'force': (None, "Create new address beyond gap limit, if no more addresses are available."),
|
'force': (None, "Create new address beyond gap limit, if no more addresses are available."),
|
||||||
'pending': (None, "Show only pending requests."),
|
'pending': (None, "Show only pending requests."),
|
||||||
|
'push_msat': (None, 'push millisatoshis'),
|
||||||
'expired': (None, "Show only expired requests."),
|
'expired': (None, "Show only expired requests."),
|
||||||
'paid': (None, "Show only paid requests."),
|
'paid': (None, "Show only paid requests."),
|
||||||
'show_addresses': (None, "Show input and output addresses"),
|
'show_addresses': (None, "Show input and output addresses"),
|
||||||
|
|
|
@ -984,34 +984,7 @@ class Peer(PrintError):
|
||||||
raise Exception("Remote PCP mismatch")
|
raise Exception("Remote PCP mismatch")
|
||||||
return chan
|
return chan
|
||||||
|
|
||||||
|
async def on_funding_locked(self):
|
||||||
async def wait_for_funding_locked(self, chan, wallet):
|
|
||||||
channel_id = chan.channel_id
|
|
||||||
|
|
||||||
def on_network_update(event, *args):
|
|
||||||
conf = wallet.get_tx_height(chan.funding_outpoint.txid)[1]
|
|
||||||
if conf >= chan.constraints.funding_txn_minimum_depth:
|
|
||||||
async def set_local_funding_locked_result():
|
|
||||||
try:
|
|
||||||
self.local_funding_locked[channel_id].set_result(short_channel_id)
|
|
||||||
except (asyncio.InvalidStateError, KeyError) as e:
|
|
||||||
# FIXME race condition if updates come in quickly, set_result might be called multiple times
|
|
||||||
# or self.local_funding_locked[channel_id] might be deleted already
|
|
||||||
self.print_error('local_funding_locked.set_result error for channel {}: {}'.format(channel_id, e))
|
|
||||||
block_height, tx_pos = wallet.get_txpos(chan.funding_outpoint.txid)
|
|
||||||
if tx_pos == -1:
|
|
||||||
self.print_error('funding tx is not yet SPV verified.. but there are '
|
|
||||||
'already enough confirmations (currently {})'.format(conf))
|
|
||||||
return
|
|
||||||
short_channel_id = calc_short_channel_id(block_height, tx_pos, chan.funding_outpoint.output_index)
|
|
||||||
asyncio.run_coroutine_threadsafe(set_local_funding_locked_result(), asyncio.get_event_loop())
|
|
||||||
self.network.unregister_callback(on_network_update)
|
|
||||||
|
|
||||||
# wait until we see confirmations
|
|
||||||
self.network.register_callback(on_network_update, ['updated', 'verified']) # thread safe
|
|
||||||
|
|
||||||
on_network_update('updated') # shortcut (don't block) if funding tx locked and verified
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
short_channel_id = await self.local_funding_locked[channel_id]
|
short_channel_id = await self.local_funding_locked[channel_id]
|
||||||
finally:
|
finally:
|
||||||
|
|
119
lib/lnworker.py
119
lib/lnworker.py
|
@ -97,6 +97,9 @@ class LNWorker:
|
||||||
peer_list = network.config.get('lightning_peers', node_list)
|
peer_list = network.config.get('lightning_peers', node_list)
|
||||||
for host, port, pubkey in peer_list:
|
for host, port, pubkey in peer_list:
|
||||||
self.add_peer(host, port, pubkey)
|
self.add_peer(host, port, pubkey)
|
||||||
|
# wait until we see confirmations
|
||||||
|
self.network.register_callback(self.on_network_update, ['updated', 'verified']) # thread safe
|
||||||
|
self.on_network_update('updated') # shortcut (don't block) if funding tx locked and verified
|
||||||
|
|
||||||
def add_peer(self, host, port, pubkey):
|
def add_peer(self, host, port, pubkey):
|
||||||
peer = Peer(host, int(port), binascii.unhexlify(pubkey), self.privkey, self.network)
|
peer = Peer(host, int(port), binascii.unhexlify(pubkey), self.privkey, self.network)
|
||||||
|
@ -108,12 +111,47 @@ class LNWorker:
|
||||||
self.wallet.storage.put("channels", dumped)
|
self.wallet.storage.put("channels", dumped)
|
||||||
self.wallet.storage.write()
|
self.wallet.storage.write()
|
||||||
|
|
||||||
|
def on_network_update(self, event, *args):
|
||||||
|
for chan in self.channels:
|
||||||
|
peer = self.peers[chan.node_id]
|
||||||
|
conf = wallet.get_tx_height(chan.funding_outpoint.txid)[1]
|
||||||
|
if conf >= chan.constraints.funding_txn_minimum_depth:
|
||||||
|
block_height, tx_pos = wallet.get_txpos(chan.funding_outpoint.txid)
|
||||||
|
if tx_pos == -1:
|
||||||
|
self.print_error('funding tx is not yet SPV verified.. but there are '
|
||||||
|
'already enough confirmations (currently {})'.format(conf))
|
||||||
|
return
|
||||||
|
asyncio.run_coroutine_threadsafe(self.set_local_funding_locked_result(peer, chan, block_height, txpos), asyncio.get_event_loop())
|
||||||
|
|
||||||
|
async def set_local_funding_locked_result(self, peer, chan, block_height, txpos):
|
||||||
|
channel_id = chan.channel_id
|
||||||
|
try:
|
||||||
|
peer.local_funding_locked[channel_id].set_result(short_channel_id)
|
||||||
|
except (asyncio.InvalidStateError, KeyError) as e:
|
||||||
|
# FIXME race condition if updates come in quickly, set_result might be called multiple times
|
||||||
|
# or self.local_funding_locked[channel_id] might be deleted already
|
||||||
|
self.print_error('local_funding_locked.set_result error for channel {}: {}'.format(channel_id, e))
|
||||||
|
short_channel_id = calc_short_channel_id(block_height, tx_pos, chan.funding_outpoint.output_index)
|
||||||
|
openchannel = await peer.on_funding_locked(openingchannel, self.wallet)
|
||||||
|
self.save_channel(openchannel)
|
||||||
|
|
||||||
@aiosafe
|
@aiosafe
|
||||||
async def open_channel(self, peer, amount, push_msat, password):
|
async def _open_channel_coroutine(self, node_id, amount, push_msat, password):
|
||||||
|
peer = self.peers[node_id]
|
||||||
openingchannel = await peer.channel_establishment_flow(self.wallet, self.config, password, amount, push_msat, temp_channel_id=os.urandom(32))
|
openingchannel = await peer.channel_establishment_flow(self.wallet, self.config, password, amount, push_msat, temp_channel_id=os.urandom(32))
|
||||||
self.save_channel(openingchannel)
|
self.save_channel(openingchannel)
|
||||||
openchannel = await peer.wait_for_funding_locked(openingchannel, self.wallet)
|
|
||||||
self.save_channel(openchannel)
|
def open_channel(self, node_id, local_amt, push_amt, emit_function, pw):
|
||||||
|
coro = self._open_channel_coroutine(node_id, local_amt, push_amt, None if pw == "" else pw)
|
||||||
|
asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop)
|
||||||
|
|
||||||
|
#chan = fut.result()
|
||||||
|
# https://api.lightning.community/#listchannels
|
||||||
|
#std_chan = {"chan_id": chan.channel_id}
|
||||||
|
#emit_function({"channels": [std_chan]})
|
||||||
|
|
||||||
|
def list_channels(self):
|
||||||
|
return self.channels
|
||||||
|
|
||||||
@aiosafe
|
@aiosafe
|
||||||
async def reestablish_channel(self):
|
async def reestablish_channel(self):
|
||||||
|
@ -144,20 +182,6 @@ class LNWorker:
|
||||||
openchannel = await peer.receive_commitment_revoke_ack(openchannel, expected_received_msat, payment_preimage)
|
openchannel = await peer.receive_commitment_revoke_ack(openchannel, expected_received_msat, payment_preimage)
|
||||||
self.save_channel(openchannel)
|
self.save_channel(openchannel)
|
||||||
|
|
||||||
def open_channel_from_other_thread(self, node_id, local_amt, push_amt, emit_function, pw):
|
|
||||||
# TODO this could race on peers
|
|
||||||
peer = self.peers.get(node_id)
|
|
||||||
if peer is None:
|
|
||||||
if len(self.peers) != 1:
|
|
||||||
print("Peer not found, and peer list is empty or has multiple peers.")
|
|
||||||
return
|
|
||||||
peer = next(iter(self.peers.values()))
|
|
||||||
coro = self.open_channel(peer, local_amt, push_amt, None if pw == "" else pw)
|
|
||||||
fut = asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop)
|
|
||||||
chan = fut.result()
|
|
||||||
# https://api.lightning.community/#listchannels
|
|
||||||
std_chan = {"chan_id": chan.channel_id}
|
|
||||||
emit_function({"channels": [std_chan]})
|
|
||||||
|
|
||||||
def subscribe_payment_received_from_other_thread(self, emit_function):
|
def subscribe_payment_received_from_other_thread(self, emit_function):
|
||||||
pass
|
pass
|
||||||
|
@ -178,64 +202,3 @@ class LNWorker:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
if len(sys.argv) > 3:
|
|
||||||
host, port, pubkey = sys.argv[3:6]
|
|
||||||
else:
|
|
||||||
host, port, pubkey = node_list[0]
|
|
||||||
pubkey = binascii.unhexlify(pubkey)
|
|
||||||
port = int(port)
|
|
||||||
if not any(x in sys.argv[1] for x in ["new_channel", "reestablish_channel"]):
|
|
||||||
raise Exception("first argument must contain new_channel or reestablish_channel")
|
|
||||||
if sys.argv[2] not in ["simnet", "testnet"]:
|
|
||||||
raise Exception("second argument must be simnet or testnet")
|
|
||||||
if sys.argv[2] == "simnet":
|
|
||||||
set_simnet()
|
|
||||||
config = SimpleConfig({'lnbase':True, 'simnet':True})
|
|
||||||
else:
|
|
||||||
set_testnet()
|
|
||||||
config = SimpleConfig({'lnbase':True, 'testnet':True})
|
|
||||||
# start network
|
|
||||||
config.set_key('lightning_peers', [])
|
|
||||||
network = Network(config)
|
|
||||||
network.start()
|
|
||||||
asyncio.set_event_loop(network.asyncio_loop)
|
|
||||||
# wallet
|
|
||||||
storage = WalletStorage(config.get_wallet_path())
|
|
||||||
wallet = Wallet(storage)
|
|
||||||
wallet.start_threads(network)
|
|
||||||
# start peer
|
|
||||||
if "new_channel" in sys.argv[1]:
|
|
||||||
privkey = sha256(os.urandom(32))
|
|
||||||
wallet.storage.put("channels_privkey", bh2u(privkey))
|
|
||||||
wallet.storage.write()
|
|
||||||
elif "reestablish_channel" in sys.argv[1]:
|
|
||||||
privkey = wallet.storage.get("channels_privkey", None)
|
|
||||||
assert privkey is not None
|
|
||||||
privkey = bfh(privkey)
|
|
||||||
peer = Peer(host, port, pubkey, privkey, network, request_initial_sync=True)
|
|
||||||
network.futures.append(asyncio.run_coroutine_threadsafe(peer.main_loop(), network.asyncio_loop))
|
|
||||||
|
|
||||||
# run blocking test
|
|
||||||
async def async_test():
|
|
||||||
if "new_channel" in sys.argv[1]:
|
|
||||||
await wallet.lnworker.open_channel()
|
|
||||||
elif "reestablish_channel" in sys.argv[1]:
|
|
||||||
await wallet.lnworker.reestablish_channel()
|
|
||||||
if "pay" in sys.argv[1]:
|
|
||||||
await lnworker.pay()
|
|
||||||
elif "get_paid" in sys.argv[1]:
|
|
||||||
await lnworker.get_paid()
|
|
||||||
fut = asyncio.run_coroutine_threadsafe(async_test(), network.asyncio_loop)
|
|
||||||
while not fut.done():
|
|
||||||
time.sleep(1)
|
|
||||||
try:
|
|
||||||
if fut.exception():
|
|
||||||
raise fut.exception()
|
|
||||||
except:
|
|
||||||
traceback.print_exc()
|
|
||||||
else:
|
|
||||||
print("result", fut.result())
|
|
||||||
finally:
|
|
||||||
network.stop()
|
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue