mirror of
https://github.com/LBRYFoundation/LBRY-Vault.git
synced 2025-09-03 02:35:20 +00:00
LNWorker: connect to multiple peers.
save exceptions in aiosafe. enable adding peer in GUI.
This commit is contained in:
parent
8276d023c2
commit
ae389ae5e2
4 changed files with 77 additions and 33 deletions
|
@ -87,7 +87,7 @@ class ChannelsList(MyTreeWidget):
|
||||||
push_amt_inp.setAmount(0)
|
push_amt_inp.setAmount(0)
|
||||||
h.addWidget(QLabel(_('Your Node ID')), 0, 0)
|
h.addWidget(QLabel(_('Your Node ID')), 0, 0)
|
||||||
h.addWidget(local_nodeid, 0, 1)
|
h.addWidget(local_nodeid, 0, 1)
|
||||||
h.addWidget(QLabel(_('Remote Node ID')), 1, 0)
|
h.addWidget(QLabel(_('Remote Node ID or connection string')), 1, 0)
|
||||||
h.addWidget(remote_nodeid, 1, 1)
|
h.addWidget(remote_nodeid, 1, 1)
|
||||||
h.addWidget(QLabel('Local amount'), 2, 0)
|
h.addWidget(QLabel('Local amount'), 2, 0)
|
||||||
h.addWidget(local_amt_inp, 2, 1)
|
h.addWidget(local_amt_inp, 2, 1)
|
||||||
|
@ -97,19 +97,43 @@ class ChannelsList(MyTreeWidget):
|
||||||
vbox.addLayout(Buttons(CancelButton(d), OkButton(d)))
|
vbox.addLayout(Buttons(CancelButton(d), OkButton(d)))
|
||||||
if not d.exec_():
|
if not d.exec_():
|
||||||
return
|
return
|
||||||
nodeid_hex = str(remote_nodeid.text())
|
|
||||||
local_amt = local_amt_inp.get_amount()
|
local_amt = local_amt_inp.get_amount()
|
||||||
push_amt = push_amt_inp.get_amount()
|
push_amt = push_amt_inp.get_amount()
|
||||||
|
connect_contents = str(remote_nodeid.text())
|
||||||
|
rest = None
|
||||||
|
try:
|
||||||
|
nodeid_hex, rest = connect_contents.split("@")
|
||||||
|
except ValueError:
|
||||||
|
nodeid_hex = connect_contents
|
||||||
try:
|
try:
|
||||||
node_id = bfh(nodeid_hex)
|
node_id = bfh(nodeid_hex)
|
||||||
|
assert len(node_id) == 33
|
||||||
except:
|
except:
|
||||||
self.parent.show_error(_('Invalid node ID'))
|
self.parent.show_error(_('Invalid node ID, must be 33 bytes and hexadecimal'))
|
||||||
return
|
return
|
||||||
if node_id not in self.parent.wallet.lnworker.peers and node_id not in self.parent.network.lightning_nodes:
|
peer = self.parent.wallet.lnworker.peers.get(node_id)
|
||||||
self.parent.show_error(_('Unknown node:') + ' ' + nodeid_hex)
|
|
||||||
return
|
if not peer:
|
||||||
assert local_amt >= 200000
|
known = node_id in self.parent.network.lightning_nodes
|
||||||
assert local_amt >= push_amt
|
if rest is not None:
|
||||||
|
try:
|
||||||
|
host, port = rest.split(":")
|
||||||
|
except ValueError:
|
||||||
|
self.parent.show_error(_('Connection strings must be in <node_pubkey>@<host>:<port> format'))
|
||||||
|
elif known:
|
||||||
|
node = self.network.lightning_nodes.get(node_id)
|
||||||
|
host, port = node['addresses'][0]
|
||||||
|
else:
|
||||||
|
self.parent.show_error(_('Unknown node:') + ' ' + nodeid_hex)
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
int(port)
|
||||||
|
except:
|
||||||
|
self.parent.show_error(_('Port number must be decimal'))
|
||||||
|
return
|
||||||
|
|
||||||
|
self.parent.wallet.lnworker.add_peer(host, port, node_id)
|
||||||
|
|
||||||
self.main_window.protect(self.open_channel, (node_id, local_amt, push_amt))
|
self.main_window.protect(self.open_channel, (node_id, local_amt, push_amt))
|
||||||
|
|
||||||
def open_channel(self, *args, **kwargs):
|
def open_channel(self, *args, **kwargs):
|
||||||
|
|
|
@ -267,21 +267,24 @@ def create_ephemeral_key(privkey):
|
||||||
|
|
||||||
|
|
||||||
def aiosafe(f):
|
def aiosafe(f):
|
||||||
|
# save exception in object.
|
||||||
|
# f must be a method of a PrintError instance.
|
||||||
|
# aiosafe calls should not be nested
|
||||||
async def f2(*args, **kwargs):
|
async def f2(*args, **kwargs):
|
||||||
|
self = args[0]
|
||||||
try:
|
try:
|
||||||
return await f(*args, **kwargs)
|
return await f(*args, **kwargs)
|
||||||
except:
|
except BaseException as e:
|
||||||
# if the loop isn't stopped
|
self.print_msg("Exception in", f.__name__, ":", e.__class__.__name__, str(e))
|
||||||
# run_forever in network.py would not return,
|
self.exception = e
|
||||||
# the asyncioThread would not die,
|
|
||||||
# and we would block on shutdown
|
|
||||||
asyncio.get_event_loop().stop()
|
|
||||||
traceback.print_exc()
|
|
||||||
return f2
|
return f2
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class Peer(PrintError):
|
class Peer(PrintError):
|
||||||
|
|
||||||
def __init__(self, lnworker, host, port, pubkey, request_initial_sync=False):
|
def __init__(self, lnworker, host, port, pubkey, request_initial_sync=False):
|
||||||
|
self.exception = None # set by aiosafe
|
||||||
self.host = host
|
self.host = host
|
||||||
self.port = port
|
self.port = port
|
||||||
self.pubkey = pubkey
|
self.pubkey = pubkey
|
||||||
|
@ -307,7 +310,7 @@ class Peer(PrintError):
|
||||||
self.attempted_route = {}
|
self.attempted_route = {}
|
||||||
|
|
||||||
def diagnostic_name(self):
|
def diagnostic_name(self):
|
||||||
return self.host
|
return 'lnbase:' + self.host
|
||||||
|
|
||||||
def ping_if_required(self):
|
def ping_if_required(self):
|
||||||
if time.time() - self.ping_time > 120:
|
if time.time() - self.ping_time > 120:
|
||||||
|
@ -455,7 +458,7 @@ class Peer(PrintError):
|
||||||
'alias': alias,
|
'alias': alias,
|
||||||
'addresses': addresses
|
'addresses': addresses
|
||||||
}
|
}
|
||||||
self.print_error('node announcement', binascii.hexlify(pubkey), alias, addresses)
|
#self.print_error('node announcement', binascii.hexlify(pubkey), alias, addresses)
|
||||||
self.network.trigger_callback('ln_status')
|
self.network.trigger_callback('ln_status')
|
||||||
|
|
||||||
def on_init(self, payload):
|
def on_init(self, payload):
|
||||||
|
@ -476,8 +479,7 @@ class Peer(PrintError):
|
||||||
else:
|
else:
|
||||||
self.announcement_signatures[channel_id].put_nowait(payload)
|
self.announcement_signatures[channel_id].put_nowait(payload)
|
||||||
|
|
||||||
@aiosafe
|
async def initialize(self):
|
||||||
async def main_loop(self):
|
|
||||||
self.reader, self.writer = await asyncio.open_connection(self.host, self.port)
|
self.reader, self.writer = await asyncio.open_connection(self.host, self.port)
|
||||||
await self.handshake()
|
await self.handshake()
|
||||||
# send init
|
# send init
|
||||||
|
@ -486,6 +488,10 @@ class Peer(PrintError):
|
||||||
msg = await self.read_message()
|
msg = await self.read_message()
|
||||||
self.process_message(msg)
|
self.process_message(msg)
|
||||||
self.initialized.set_result(True)
|
self.initialized.set_result(True)
|
||||||
|
|
||||||
|
@aiosafe
|
||||||
|
async def main_loop(self):
|
||||||
|
await asyncio.wait_for(self.initialize(), 5)
|
||||||
# loop
|
# loop
|
||||||
while True:
|
while True:
|
||||||
self.ping_if_required()
|
self.ping_if_required()
|
||||||
|
|
|
@ -70,7 +70,7 @@ class ChannelInfo(PrintError):
|
||||||
self.policy_node1 = ChannelInfoDirectedPolicy(msg_payload)
|
self.policy_node1 = ChannelInfoDirectedPolicy(msg_payload)
|
||||||
else:
|
else:
|
||||||
self.policy_node2 = ChannelInfoDirectedPolicy(msg_payload)
|
self.policy_node2 = ChannelInfoDirectedPolicy(msg_payload)
|
||||||
self.print_error('channel update', binascii.hexlify(self.channel_id).decode("ascii"), flags)
|
#self.print_error('channel update', binascii.hexlify(self.channel_id).decode("ascii"), flags)
|
||||||
|
|
||||||
def get_policy_for_node(self, node_id):
|
def get_policy_for_node(self, node_id):
|
||||||
if node_id == self.node_id_1:
|
if node_id == self.node_id_1:
|
||||||
|
@ -112,7 +112,7 @@ class ChannelDB(PrintError):
|
||||||
|
|
||||||
def on_channel_announcement(self, msg_payload):
|
def on_channel_announcement(self, msg_payload):
|
||||||
short_channel_id = msg_payload['short_channel_id']
|
short_channel_id = msg_payload['short_channel_id']
|
||||||
self.print_error('channel announcement', binascii.hexlify(short_channel_id).decode("ascii"))
|
#self.print_error('channel announcement', binascii.hexlify(short_channel_id).decode("ascii"))
|
||||||
channel_info = ChannelInfo(msg_payload)
|
channel_info = ChannelInfo(msg_payload)
|
||||||
self._id_to_channel_info[short_channel_id] = channel_info
|
self._id_to_channel_info[short_channel_id] = channel_info
|
||||||
self._channels_for_node[channel_info.node_id_1].add(short_channel_id)
|
self._channels_for_node[channel_info.node_id_1].add(short_channel_id)
|
||||||
|
|
|
@ -5,12 +5,13 @@ import os
|
||||||
from decimal import Decimal
|
from decimal import Decimal
|
||||||
import threading
|
import threading
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
import random
|
||||||
|
|
||||||
from . import constants
|
from . import constants
|
||||||
from .bitcoin import sha256, COIN
|
from .bitcoin import sha256, COIN
|
||||||
from .util import bh2u, bfh, PrintError
|
from .util import bh2u, bfh, PrintError
|
||||||
from .constants import set_testnet, set_simnet
|
from .constants import set_testnet, set_simnet
|
||||||
from .lnbase import Peer, privkey_to_pubkey
|
from .lnbase import Peer, privkey_to_pubkey, aiosafe
|
||||||
from .lnaddr import lnencode, LnAddr, lndecode
|
from .lnaddr import lnencode, LnAddr, lndecode
|
||||||
from .ecc import der_sig_from_sig_string
|
from .ecc import der_sig_from_sig_string
|
||||||
from .transaction import Transaction
|
from .transaction import Transaction
|
||||||
|
@ -39,15 +40,13 @@ class LNWorker(PrintError):
|
||||||
self.peers = {}
|
self.peers = {}
|
||||||
self.channels = {x.channel_id: x for x in map(HTLCStateMachine, wallet.storage.get("channels", []))}
|
self.channels = {x.channel_id: x for x in map(HTLCStateMachine, wallet.storage.get("channels", []))}
|
||||||
self.invoices = wallet.storage.get('lightning_invoices', {})
|
self.invoices = wallet.storage.get('lightning_invoices', {})
|
||||||
peer_list = network.config.get('lightning_peers', node_list)
|
|
||||||
self.channel_state = {chan.channel_id: "DISCONNECTED" for chan in self.channels.values()}
|
self.channel_state = {chan.channel_id: "DISCONNECTED" for chan in self.channels.values()}
|
||||||
for chan_id, chan in self.channels.items():
|
for chan_id, chan in self.channels.items():
|
||||||
self.network.lnwatcher.watch_channel(chan, self.on_channel_utxos)
|
self.network.lnwatcher.watch_channel(chan, self.on_channel_utxos)
|
||||||
for host, port, pubkey in peer_list:
|
|
||||||
self.add_peer(host, int(port), bfh(pubkey))
|
|
||||||
# wait until we see confirmations
|
# wait until we see confirmations
|
||||||
self.network.register_callback(self.on_network_update, ['updated', 'verified']) # thread safe
|
self.network.register_callback(self.on_network_update, ['updated', 'verified']) # thread safe
|
||||||
self.on_network_update('updated') # shortcut (don't block) if funding tx locked and verified
|
self.on_network_update('updated') # shortcut (don't block) if funding tx locked and verified
|
||||||
|
asyncio.run_coroutine_threadsafe(self.main_loop(), asyncio.get_event_loop())
|
||||||
|
|
||||||
def channels_for_peer(self, node_id):
|
def channels_for_peer(self, node_id):
|
||||||
assert type(node_id) is bytes
|
assert type(node_id) is bytes
|
||||||
|
@ -118,15 +117,9 @@ class LNWorker(PrintError):
|
||||||
conf = self.wallet.get_tx_height(chan.funding_outpoint.txid)[1]
|
conf = self.wallet.get_tx_height(chan.funding_outpoint.txid)[1]
|
||||||
peer.on_network_update(chan, conf)
|
peer.on_network_update(chan, conf)
|
||||||
|
|
||||||
async def _open_channel_coroutine(self, node_id, amount_sat, push_sat, password):
|
async def _open_channel_coroutine(self, node_id, local_amount_sat, push_sat, password):
|
||||||
if node_id not in self.peers:
|
|
||||||
node = self.network.lightning_nodes.get(node_id)
|
|
||||||
if node is None:
|
|
||||||
return "node not found, peers available are: " + str(self.network.lightning_nodes.keys())
|
|
||||||
host, port = node['addresses'][0]
|
|
||||||
self.add_peer(host, port, node_id)
|
|
||||||
peer = self.peers[node_id]
|
peer = self.peers[node_id]
|
||||||
openingchannel = await peer.channel_establishment_flow(self.wallet, self.config, password, amount_sat, push_sat * 1000, temp_channel_id=os.urandom(32))
|
openingchannel = await peer.channel_establishment_flow(self.wallet, self.config, password, local_amount_sat + push_sat, push_sat * 1000, temp_channel_id=os.urandom(32))
|
||||||
self.save_channel(openingchannel)
|
self.save_channel(openingchannel)
|
||||||
self.network.lnwatcher.watch_channel(openingchannel, self.on_channel_utxos)
|
self.network.lnwatcher.watch_channel(openingchannel, self.on_channel_utxos)
|
||||||
self.on_channels_updated()
|
self.on_channels_updated()
|
||||||
|
@ -192,3 +185,24 @@ class LNWorker(PrintError):
|
||||||
tx.add_signature_to_txin(0, none_idx, bh2u(remote_sig))
|
tx.add_signature_to_txin(0, none_idx, bh2u(remote_sig))
|
||||||
assert tx.is_complete()
|
assert tx.is_complete()
|
||||||
return self.network.broadcast_transaction(tx)
|
return self.network.broadcast_transaction(tx)
|
||||||
|
|
||||||
|
@aiosafe
|
||||||
|
async def main_loop(self):
|
||||||
|
peer_list = self.config.get('lightning_peers', node_list)
|
||||||
|
for host, port, pubkey in peer_list:
|
||||||
|
self.add_peer(host, int(port), bfh(pubkey))
|
||||||
|
while True:
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
for k, peer in list(self.peers.items()):
|
||||||
|
if peer.exception:
|
||||||
|
self.print_error("removing peer", peer.host)
|
||||||
|
self.peers.pop(k)
|
||||||
|
if len(self.peers) > 3:
|
||||||
|
continue
|
||||||
|
node_id = random.choice(list(self.network.lightning_nodes.keys()))
|
||||||
|
node = self.network.lightning_nodes.get(node_id)
|
||||||
|
addresses = node.get('addresses')
|
||||||
|
if addresses:
|
||||||
|
host, port = addresses[0]
|
||||||
|
self.print_error("trying node", bh2u(node_id))
|
||||||
|
self.add_peer(host, port, node_id)
|
||||||
|
|
Loading…
Add table
Reference in a new issue