mirror of
https://github.com/LBRYFoundation/LBRY-Vault.git
synced 2025-08-31 01:11:35 +00:00
persist recent peers. implement dns seed bootstrapping.
dns seeds are currently disabled though, as they always seem to return mainnet nodes.
This commit is contained in:
parent
bc06ded4b9
commit
c02cc9bb3b
7 changed files with 194 additions and 34 deletions
|
@ -84,6 +84,11 @@ class BitcoinMainnet(AbstractNet):
|
|||
}
|
||||
XPUB_HEADERS_INV = inv_dict(XPUB_HEADERS)
|
||||
BIP44_COIN_TYPE = 0
|
||||
LN_REALM_BYTE = 0
|
||||
LN_DNS_SEEDS = [
|
||||
'nodes.lightning.directory.',
|
||||
'lseed.bitcoinstats.com.',
|
||||
]
|
||||
|
||||
|
||||
class BitcoinTestnet(AbstractNet):
|
||||
|
@ -115,6 +120,11 @@ class BitcoinTestnet(AbstractNet):
|
|||
}
|
||||
XPUB_HEADERS_INV = inv_dict(XPUB_HEADERS)
|
||||
BIP44_COIN_TYPE = 1
|
||||
LN_REALM_BYTE = 1
|
||||
LN_DNS_SEEDS = [
|
||||
'test.nodes.lightning.directory.',
|
||||
'lseed.bitcoinstats.com.',
|
||||
]
|
||||
|
||||
|
||||
class BitcoinRegtest(BitcoinTestnet):
|
||||
|
@ -123,6 +133,7 @@ class BitcoinRegtest(BitcoinTestnet):
|
|||
GENESIS = "0f9188f13cb7b2c71f2a335e3a4fc328bf5beb436012afca590b1a11466e2206"
|
||||
DEFAULT_SERVERS = read_json('servers_regtest.json', {})
|
||||
CHECKPOINTS = []
|
||||
LN_DNS_SEEDS = []
|
||||
|
||||
|
||||
class BitcoinSimnet(BitcoinTestnet):
|
||||
|
@ -134,6 +145,7 @@ class BitcoinSimnet(BitcoinTestnet):
|
|||
GENESIS = "683e86bd5c6d110d91b94b97137ba6bfe02dbbdb8e3dff722a669b5d69d77af6"
|
||||
DEFAULT_SERVERS = read_json('servers_regtest.json', {})
|
||||
CHECKPOINTS = []
|
||||
LN_DNS_SEEDS = []
|
||||
|
||||
|
||||
# don't import net directly, import the module instead (so that net is singleton)
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
from collections import namedtuple, defaultdict, OrderedDict, defaultdict
|
||||
from .lnutil import Outpoint, ChannelConfig, LocalState, RemoteState, Keypair, OnlyPubkeyKeypair, ChannelConstraints, RevocationStore
|
||||
from .lnutil import sign_and_get_sig_string, funding_output_script, get_ecdh, get_per_commitment_secret_from_seed
|
||||
from .lnutil import secret_to_pubkey
|
||||
from .lnutil import secret_to_pubkey, LNPeerAddr
|
||||
from .bitcoin import COIN
|
||||
|
||||
from ecdsa.util import sigdecode_der, sigencode_string_canonize, sigdecode_string
|
||||
|
@ -439,7 +439,6 @@ class Peer(PrintError):
|
|||
|
||||
def on_channel_announcement(self, payload):
|
||||
self.channel_db.on_channel_announcement(payload)
|
||||
self.network.trigger_callback('ln_status')
|
||||
|
||||
def on_announcement_signatures(self, payload):
|
||||
channel_id = payload['channel_id']
|
||||
|
@ -462,6 +461,7 @@ class Peer(PrintError):
|
|||
@aiosafe
|
||||
async def main_loop(self):
|
||||
await asyncio.wait_for(self.initialize(), 5)
|
||||
self.channel_db.add_recent_peer(LNPeerAddr(self.host, self.port, self.pubkey))
|
||||
# loop
|
||||
while True:
|
||||
self.ping_if_required()
|
||||
|
|
|
@ -38,7 +38,7 @@ from .storage import JsonDB
|
|||
from .lnchanannverifier import LNChanAnnVerifier, verify_sig_for_channel_update
|
||||
from .crypto import Hash
|
||||
from . import ecc
|
||||
from .lnutil import LN_GLOBAL_FEATURE_BITS
|
||||
from .lnutil import LN_GLOBAL_FEATURE_BITS, LNPeerAddr
|
||||
|
||||
|
||||
class UnknownEvenFeatureBits(Exception): pass
|
||||
|
@ -256,16 +256,19 @@ class NodeInfo(PrintError):
|
|||
|
||||
class ChannelDB(JsonDB):
|
||||
|
||||
NUM_MAX_RECENT_PEERS = 20
|
||||
|
||||
def __init__(self, network):
|
||||
self.network = network
|
||||
|
||||
path = os.path.join(get_headers_dir(network.config), 'channel_db')
|
||||
JsonDB.__init__(self, path)
|
||||
|
||||
self.lock = threading.Lock()
|
||||
self.lock = threading.RLock()
|
||||
self._id_to_channel_info = {}
|
||||
self._channels_for_node = defaultdict(set) # node -> set(short_channel_id)
|
||||
self.nodes = {} # node_id -> NodeInfo
|
||||
self._recent_peers = []
|
||||
|
||||
self.ca_verifier = LNChanAnnVerifier(network, self)
|
||||
self.network.add_jobs([self.ca_verifier])
|
||||
|
@ -289,6 +292,11 @@ class ChannelDB(JsonDB):
|
|||
node_info = NodeInfo.from_json(node_info_d)
|
||||
node_id = bfh(node_id)
|
||||
self.nodes[node_id] = node_info
|
||||
# recent peers
|
||||
recent_peers = self.get('recent_peers', {})
|
||||
for host, port, pubkey in recent_peers:
|
||||
peer = LNPeerAddr(str(host), int(port), bfh(pubkey))
|
||||
self._recent_peers.append(peer)
|
||||
|
||||
def save_data(self):
|
||||
with self.lock:
|
||||
|
@ -302,6 +310,12 @@ class ChannelDB(JsonDB):
|
|||
for node_id, node_info in self.nodes.items():
|
||||
node_infos[bh2u(node_id)] = node_info
|
||||
self.put('node_infos', node_infos)
|
||||
# recent peers
|
||||
recent_peers = []
|
||||
for peer in self._recent_peers:
|
||||
recent_peers.append(
|
||||
[str(peer.host), int(peer.port), bh2u(peer.pubkey)])
|
||||
self.put('recent_peers', recent_peers)
|
||||
self.write()
|
||||
|
||||
def __len__(self):
|
||||
|
@ -320,12 +334,26 @@ class ChannelDB(JsonDB):
|
|||
self._id_to_channel_info[short_channel_id] = channel_info
|
||||
self._channels_for_node[channel_info.node_id_1].add(short_channel_id)
|
||||
self._channels_for_node[channel_info.node_id_2].add(short_channel_id)
|
||||
self.network.trigger_callback('ln_status')
|
||||
|
||||
def get_recent_peers(self):
|
||||
with self.lock:
|
||||
return list(self._recent_peers)
|
||||
|
||||
def add_recent_peer(self, peer: LNPeerAddr):
|
||||
with self.lock:
|
||||
# list is ordered
|
||||
if peer in self._recent_peers:
|
||||
self._recent_peers.remove(peer)
|
||||
self._recent_peers.insert(0, peer)
|
||||
self._recent_peers = self._recent_peers[:self.NUM_MAX_RECENT_PEERS]
|
||||
|
||||
def on_channel_announcement(self, msg_payload, trusted=False):
|
||||
short_channel_id = msg_payload['short_channel_id']
|
||||
if short_channel_id in self._id_to_channel_info:
|
||||
return
|
||||
if constants.net.rev_genesis_bytes() != msg_payload['chain_hash']:
|
||||
#self.print_error("ChanAnn has unexpected chain_hash {}".format(bh2u(msg_payload['chain_hash'])))
|
||||
return
|
||||
try:
|
||||
channel_info = ChannelInfo(msg_payload)
|
||||
|
@ -365,6 +393,10 @@ class ChannelDB(JsonDB):
|
|||
new_node_info = NodeInfo(msg_payload)
|
||||
except UnknownEvenFeatureBits:
|
||||
return
|
||||
# TODO if this message is for a new node, and if we have no associated
|
||||
# channels for this node, we should ignore the message and return here,
|
||||
# to mitigate DOS. but race condition: the channels we have for this
|
||||
# node, might be under verification in self.ca_verifier, what then?
|
||||
if old_node_info and old_node_info.timestamp >= new_node_info.timestamp:
|
||||
return # ignore
|
||||
self.nodes[pubkey] = new_node_info
|
||||
|
|
|
@ -7,6 +7,7 @@ from .ecc import CURVE_ORDER, sig_string_from_der_sig, ECPubkey, string_to_numbe
|
|||
from . import ecc, bitcoin, crypto, transaction
|
||||
from .transaction import opcodes
|
||||
from .bitcoin import push_script
|
||||
from . import segwit_addr
|
||||
|
||||
HTLC_TIMEOUT_WEIGHT = 663
|
||||
HTLC_SUCCESS_WEIGHT = 703
|
||||
|
@ -396,3 +397,20 @@ LN_LOCAL_FEATURE_BITS_INV = inv_dict(LN_LOCAL_FEATURE_BITS)
|
|||
LN_GLOBAL_FEATURE_BITS = {}
|
||||
LN_GLOBAL_FEATURE_BITS_INV = inv_dict(LN_GLOBAL_FEATURE_BITS)
|
||||
|
||||
|
||||
class LNPeerAddr(namedtuple('LNPeerAddr', ['host', 'port', 'pubkey'])):
|
||||
__slots__ = ()
|
||||
|
||||
def __str__(self):
|
||||
return '{}@{}:{}'.format(bh2u(self.pubkey), self.host, self.port)
|
||||
|
||||
|
||||
def get_compressed_pubkey_from_bech32(bech32_pubkey: str) -> bytes:
|
||||
hrp, data_5bits = segwit_addr.bech32_decode(bech32_pubkey)
|
||||
if hrp != 'ln':
|
||||
raise Exception('unexpected hrp: {}'.format(hrp))
|
||||
data_8bits = segwit_addr.convertbits(data_5bits, 5, 8, False)
|
||||
# pad with zeroes
|
||||
COMPRESSED_PUBKEY_LENGTH = 33
|
||||
data_8bits = data_8bits + ((COMPRESSED_PUBKEY_LENGTH - len(data_8bits)) * [0])
|
||||
return bytes(data_8bits)
|
||||
|
|
|
@ -1,35 +1,39 @@
|
|||
import json
|
||||
import binascii
|
||||
import asyncio
|
||||
import os
|
||||
from decimal import Decimal
|
||||
import threading
|
||||
from collections import defaultdict
|
||||
import random
|
||||
import time
|
||||
from typing import Optional, Sequence
|
||||
|
||||
import dns.resolver
|
||||
import dns.exception
|
||||
|
||||
from . import constants
|
||||
from .bitcoin import sha256, COIN
|
||||
from .util import bh2u, bfh, PrintError, InvoiceError
|
||||
from .constants import set_testnet, set_simnet
|
||||
from .util import bh2u, bfh, PrintError, InvoiceError, resolve_dns_srv
|
||||
from .lnbase import Peer, privkey_to_pubkey, aiosafe
|
||||
from .lnaddr import lnencode, LnAddr, lndecode
|
||||
from .ecc import der_sig_from_sig_string
|
||||
from .transaction import Transaction
|
||||
from .lnhtlc import HTLCStateMachine
|
||||
from .lnutil import Outpoint, calc_short_channel_id
|
||||
from .lnutil import Outpoint, calc_short_channel_id, LNPeerAddr, get_compressed_pubkey_from_bech32
|
||||
from .lnwatcher import LNChanCloseHandler
|
||||
from .i18n import _
|
||||
|
||||
# hardcoded nodes
|
||||
node_list = [
|
||||
('ecdsa.net', '9735', '038370f0e7a03eded3e1d41dc081084a87f0afa1c5b22090b4f3abb391eb15d8ff'),
|
||||
]
|
||||
|
||||
NUM_PEERS_TARGET = 4
|
||||
PEER_RETRY_INTERVAL = 600 # seconds
|
||||
|
||||
FALLBACK_NODE_LIST = (
|
||||
LNPeerAddr('ecdsa.net', 9735, bfh('038370f0e7a03eded3e1d41dc081084a87f0afa1c5b22090b4f3abb391eb15d8ff')),
|
||||
)
|
||||
|
||||
|
||||
class LNWorker(PrintError):
|
||||
|
||||
def __init__(self, wallet, network):
|
||||
self.wallet = wallet
|
||||
self.network = network
|
||||
self.channel_db = self.network.channel_db
|
||||
pk = wallet.storage.get('lightning_privkey')
|
||||
if pk is None:
|
||||
pk = bh2u(os.urandom(32))
|
||||
|
@ -43,17 +47,21 @@ class LNWorker(PrintError):
|
|||
self.invoices = wallet.storage.get('lightning_invoices', {})
|
||||
for chan_id, chan in self.channels.items():
|
||||
self.network.lnwatcher.watch_channel(chan, self.on_channel_utxos)
|
||||
self._last_tried_peer = {} # LNPeerAddr -> unix timestamp
|
||||
# TODO peers that we have channels with should also be added now
|
||||
# but we don't store their IP/port yet.. also what if it changes?
|
||||
# need to listen for node_announcements and save the new IP/port
|
||||
peer_list = self.config.get('lightning_peers', node_list)
|
||||
for host, port, pubkey in peer_list:
|
||||
self.add_peer(host, int(port), bfh(pubkey))
|
||||
self._add_peers_from_config()
|
||||
# wait until we see confirmations
|
||||
self.network.register_callback(self.on_network_update, ['updated', 'verified', 'fee_histogram']) # thread safe
|
||||
self.on_network_update('updated') # shortcut (don't block) if funding tx locked and verified
|
||||
self.network.futures.append(asyncio.run_coroutine_threadsafe(self.main_loop(), asyncio.get_event_loop()))
|
||||
|
||||
def _add_peers_from_config(self):
|
||||
peer_list = self.config.get('lightning_peers', [])
|
||||
for host, port, pubkey in peer_list:
|
||||
self.add_peer(host, int(port), bfh(pubkey))
|
||||
|
||||
def suggest_peer(self):
|
||||
for node_id, peer in self.peers.items():
|
||||
if len(peer.channels) > 0:
|
||||
|
@ -67,7 +75,8 @@ class LNWorker(PrintError):
|
|||
return {x: y for (x, y) in self.channels.items() if y.node_id == node_id}
|
||||
|
||||
def add_peer(self, host, port, node_id):
|
||||
peer = Peer(self, host, int(port), node_id, request_initial_sync=self.config.get("request_initial_sync", True))
|
||||
port = int(port)
|
||||
peer = Peer(self, host, port, node_id, request_initial_sync=self.config.get("request_initial_sync", True))
|
||||
self.network.futures.append(asyncio.run_coroutine_threadsafe(peer.main_loop(), asyncio.get_event_loop()))
|
||||
self.peers[node_id] = peer
|
||||
self.network.trigger_callback('ln_status')
|
||||
|
@ -218,6 +227,80 @@ class LNWorker(PrintError):
|
|||
assert tx.is_complete()
|
||||
return self.network.broadcast_transaction(tx)
|
||||
|
||||
def _get_next_peers_to_try(self) -> Sequence[LNPeerAddr]:
|
||||
now = time.time()
|
||||
recent_peers = self.channel_db.get_recent_peers()
|
||||
# maintenance for last tried times
|
||||
for peer in list(self._last_tried_peer):
|
||||
if now >= self._last_tried_peer[peer] + PEER_RETRY_INTERVAL:
|
||||
del self._last_tried_peer[peer]
|
||||
# first try from recent peers
|
||||
for peer in recent_peers:
|
||||
if peer in self.peers:
|
||||
continue
|
||||
if peer in self._last_tried_peer:
|
||||
# due to maintenance above, this means we tried recently
|
||||
continue
|
||||
return [peer]
|
||||
# try random peer from graph
|
||||
all_nodes = self.channel_db.nodes
|
||||
if all_nodes:
|
||||
self.print_error('trying to get ln peers from channel db')
|
||||
node_ids = list(all_nodes)
|
||||
max_tries = min(200, len(all_nodes))
|
||||
for i in range(max_tries):
|
||||
node_id = random.choice(node_ids)
|
||||
node = all_nodes.get(node_id)
|
||||
if node is None: continue
|
||||
addresses = node.addresses
|
||||
if not addresses: continue
|
||||
host, port = addresses[0]
|
||||
peer = LNPeerAddr(host, port, node_id)
|
||||
if peer in self._last_tried_peer:
|
||||
continue
|
||||
self.print_error('taking random ln peer from our channel db')
|
||||
return [peer]
|
||||
|
||||
# TODO remove this. For some reason the dns seeds seem to ignore the realm byte
|
||||
# and only return mainnet nodes. so for the time being dns seeding is disabled:
|
||||
if constants.net in (constants.BitcoinTestnet, ):
|
||||
return [random.choice(FALLBACK_NODE_LIST)]
|
||||
else:
|
||||
return []
|
||||
|
||||
# try peers from dns seed.
|
||||
# return several peers to reduce the number of dns queries.
|
||||
if not constants.net.LN_DNS_SEEDS:
|
||||
return []
|
||||
dns_seed = random.choice(constants.net.LN_DNS_SEEDS)
|
||||
self.print_error('asking dns seed "{}" for ln peers'.format(dns_seed))
|
||||
try:
|
||||
# note: this might block for several seconds
|
||||
# this will include bech32-encoded-pubkeys and ports
|
||||
srv_answers = resolve_dns_srv('r{}.{}'.format(
|
||||
constants.net.LN_REALM_BYTE, dns_seed))
|
||||
except dns.exception.DNSException as e:
|
||||
return []
|
||||
random.shuffle(srv_answers)
|
||||
num_peers = 2 * NUM_PEERS_TARGET
|
||||
srv_answers = srv_answers[:num_peers]
|
||||
# we now have pubkeys and ports but host is still needed
|
||||
peers = []
|
||||
for srv_ans in srv_answers:
|
||||
try:
|
||||
# note: this might block for several seconds
|
||||
answers = dns.resolver.query(srv_ans['host'])
|
||||
except dns.exception.DNSException:
|
||||
continue
|
||||
else:
|
||||
ln_host = str(answers[0])
|
||||
port = int(srv_ans['port'])
|
||||
bech32_pubkey = srv_ans['host'].split('.')[0]
|
||||
pubkey = get_compressed_pubkey_from_bech32(bech32_pubkey)
|
||||
peers.append(LNPeerAddr(ln_host, port, pubkey))
|
||||
self.print_error('got {} ln peers from dns seed'.format(len(peers)))
|
||||
return peers
|
||||
|
||||
@aiosafe
|
||||
async def main_loop(self):
|
||||
while True:
|
||||
|
@ -226,15 +309,10 @@ class LNWorker(PrintError):
|
|||
if peer.exception:
|
||||
self.print_error("removing peer", peer.host)
|
||||
self.peers.pop(k)
|
||||
if len(self.peers) > 3:
|
||||
if len(self.peers) >= NUM_PEERS_TARGET:
|
||||
continue
|
||||
if not self.network.channel_db.nodes:
|
||||
continue
|
||||
all_nodes = self.network.channel_db.nodes
|
||||
node_id = random.choice(list(all_nodes))
|
||||
node = all_nodes.get(node_id)
|
||||
addresses = node.addresses
|
||||
if addresses:
|
||||
host, port = addresses[0]
|
||||
self.print_error("trying node", bh2u(node_id))
|
||||
self.add_peer(host, port, node_id)
|
||||
peers = self._get_next_peers_to_try()
|
||||
for peer in peers:
|
||||
self._last_tried_peer[peer] = time.time()
|
||||
self.print_error("trying node", peer)
|
||||
self.add_peer(peer.host, peer.port, peer.pubkey)
|
||||
|
|
|
@ -2,9 +2,10 @@ import unittest
|
|||
import json
|
||||
from electrum import bitcoin
|
||||
from electrum.lnutil import (RevocationStore, get_per_commitment_secret_from_seed, make_offered_htlc,
|
||||
make_received_htlc, make_commitment, make_htlc_tx_witness, make_htlc_tx_output,
|
||||
make_htlc_tx_inputs, secret_to_pubkey, derive_blinded_pubkey, derive_privkey,
|
||||
derive_pubkey, make_htlc_tx, extract_ctn_from_tx, UnableToDeriveSecret)
|
||||
make_received_htlc, make_commitment, make_htlc_tx_witness, make_htlc_tx_output,
|
||||
make_htlc_tx_inputs, secret_to_pubkey, derive_blinded_pubkey, derive_privkey,
|
||||
derive_pubkey, make_htlc_tx, extract_ctn_from_tx, UnableToDeriveSecret,
|
||||
get_compressed_pubkey_from_bech32)
|
||||
from electrum.util import bh2u, bfh
|
||||
from electrum.transaction import Transaction
|
||||
|
||||
|
@ -675,3 +676,7 @@ class TestLNUtil(unittest.TestCase):
|
|||
index_of_pubkey = pubkeys.index(bh2u(remote_pubkey))
|
||||
tx._inputs[0]["signatures"][index_of_pubkey] = remote_signature + "01"
|
||||
tx.raw = None
|
||||
|
||||
def test_get_compressed_pubkey_from_bech32(self):
|
||||
self.assertEqual(b'\x03\x84\xef\x87\xd9d\xa2\xaaa7=\xff\xb8\xfe=t8[}>;\n\x13\xa8e\x8eo:\xf5Mi\xb5H',
|
||||
get_compressed_pubkey_from_bech32('ln1qwzwlp7evj325cfh8hlm3l3awsu9klf78v9p82r93ehn4a2ddx65s66awg5'))
|
||||
|
|
|
@ -46,6 +46,7 @@ import aiohttp
|
|||
from aiohttp_socks import SocksConnector, SocksVer
|
||||
from aiorpcx import TaskGroup
|
||||
import certifi
|
||||
import dns.resolver
|
||||
|
||||
from .i18n import _
|
||||
from .logging import get_logger, Logger
|
||||
|
@ -1174,3 +1175,17 @@ def list_enabled_bits(x: int) -> Sequence[int]:
|
|||
binary = bin(x)[2:]
|
||||
rev_bin = reversed(binary)
|
||||
return tuple(i for i, b in enumerate(rev_bin) if b == '1')
|
||||
|
||||
|
||||
def resolve_dns_srv(host: str):
|
||||
srv_records = dns.resolver.query(host, 'SRV')
|
||||
# priority: prefer lower
|
||||
# weight: tie breaker; prefer higher
|
||||
srv_records = sorted(srv_records, key=lambda x: (x.priority, -x.weight))
|
||||
|
||||
def dict_from_srv_record(srv):
|
||||
return {
|
||||
'host': str(srv.target),
|
||||
'port': srv.port,
|
||||
}
|
||||
return [dict_from_srv_record(srv) for srv in srv_records]
|
||||
|
|
Loading…
Add table
Reference in a new issue