mirror of
https://github.com/LBRYFoundation/LBRY-Vault.git
synced 2025-08-23 17:47:31 +00:00
Use one LNWatcher instance per wallet
This commit is contained in:
parent
4d76e84218
commit
2be68ac4d2
3 changed files with 44 additions and 48 deletions
|
@ -67,7 +67,6 @@ class Peer(Logger):
|
||||||
self.localfeatures = self.lnworker.localfeatures
|
self.localfeatures = self.lnworker.localfeatures
|
||||||
self.node_ids = [self.pubkey, privkey_to_pubkey(self.privkey)]
|
self.node_ids = [self.pubkey, privkey_to_pubkey(self.privkey)]
|
||||||
self.network = lnworker.network
|
self.network = lnworker.network
|
||||||
self.lnwatcher = lnworker.network.lnwatcher
|
|
||||||
self.channel_db = lnworker.network.channel_db
|
self.channel_db = lnworker.network.channel_db
|
||||||
self.ping_time = 0
|
self.ping_time = 0
|
||||||
self.reply_channel_range = asyncio.Queue()
|
self.reply_channel_range = asyncio.Queue()
|
||||||
|
@ -550,7 +549,6 @@ class Peer(Logger):
|
||||||
chan = Channel(chan_dict,
|
chan = Channel(chan_dict,
|
||||||
sweep_address=self.lnworker.sweep_address,
|
sweep_address=self.lnworker.sweep_address,
|
||||||
lnworker=self.lnworker)
|
lnworker=self.lnworker)
|
||||||
chan.lnwatcher = self.lnwatcher
|
|
||||||
sig_64, _ = chan.sign_next_commitment()
|
sig_64, _ = chan.sign_next_commitment()
|
||||||
self.send_message("funding_created",
|
self.send_message("funding_created",
|
||||||
temporary_channel_id=temp_channel_id,
|
temporary_channel_id=temp_channel_id,
|
||||||
|
@ -638,7 +636,6 @@ class Peer(Logger):
|
||||||
chan = Channel(chan_dict,
|
chan = Channel(chan_dict,
|
||||||
sweep_address=self.lnworker.sweep_address,
|
sweep_address=self.lnworker.sweep_address,
|
||||||
lnworker=self.lnworker)
|
lnworker=self.lnworker)
|
||||||
chan.lnwatcher = self.lnwatcher
|
|
||||||
remote_sig = funding_created['signature']
|
remote_sig = funding_created['signature']
|
||||||
chan.receive_new_commitment(remote_sig, [])
|
chan.receive_new_commitment(remote_sig, [])
|
||||||
sig_64, _ = chan.sign_next_commitment()
|
sig_64, _ = chan.sign_next_commitment()
|
||||||
|
@ -648,7 +645,7 @@ class Peer(Logger):
|
||||||
)
|
)
|
||||||
chan.open_with_first_pcp(payload['first_per_commitment_point'], remote_sig)
|
chan.open_with_first_pcp(payload['first_per_commitment_point'], remote_sig)
|
||||||
self.lnworker.save_channel(chan)
|
self.lnworker.save_channel(chan)
|
||||||
await self.lnwatcher.add_channel(chan.funding_outpoint.to_str(), chan.get_funding_address())
|
self.lnworker.lnwatcher.add_channel(chan.funding_outpoint.to_str(), chan.get_funding_address())
|
||||||
self.lnworker.on_channels_updated()
|
self.lnworker.on_channels_updated()
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
|
@ -1277,7 +1274,7 @@ class Peer(Logger):
|
||||||
outpoint = chan.funding_outpoint.to_str()
|
outpoint = chan.funding_outpoint.to_str()
|
||||||
sweeptxs = create_sweeptxs_for_watchtower(chan, ctx, per_commitment_secret, chan.sweep_address)
|
sweeptxs = create_sweeptxs_for_watchtower(chan, ctx, per_commitment_secret, chan.sweep_address)
|
||||||
for tx in sweeptxs:
|
for tx in sweeptxs:
|
||||||
await self.lnwatcher.add_sweep_tx(outpoint, tx.prevout(0), str(tx))
|
await self.lnworker.lnwatcher.add_sweep_tx(outpoint, tx.prevout(0), str(tx))
|
||||||
|
|
||||||
def on_update_fee(self, payload):
|
def on_update_fee(self, payload):
|
||||||
channel_id = payload["channel_id"]
|
channel_id = payload["channel_id"]
|
||||||
|
|
|
@ -14,6 +14,7 @@ from typing import NamedTuple, Dict
|
||||||
import jsonrpclib
|
import jsonrpclib
|
||||||
|
|
||||||
from .sql_db import SqlDB, sql
|
from .sql_db import SqlDB, sql
|
||||||
|
from .json_db import JsonDB
|
||||||
from .util import bh2u, bfh, log_exceptions, ignore_exceptions
|
from .util import bh2u, bfh, log_exceptions, ignore_exceptions
|
||||||
from . import wallet
|
from . import wallet
|
||||||
from .storage import WalletStorage
|
from .storage import WalletStorage
|
||||||
|
@ -142,16 +143,19 @@ class LNWatcher(AddressSynchronizer):
|
||||||
verbosity_filter = 'W'
|
verbosity_filter = 'W'
|
||||||
|
|
||||||
def __init__(self, network: 'Network'):
|
def __init__(self, network: 'Network'):
|
||||||
path = os.path.join(network.config.path, "watchtower_wallet")
|
AddressSynchronizer.__init__(self, JsonDB({}, manual_upgrades=False))
|
||||||
storage = WalletStorage(path)
|
|
||||||
AddressSynchronizer.__init__(self, storage)
|
|
||||||
self.config = network.config
|
self.config = network.config
|
||||||
self.start_network(network)
|
self.start_network(network)
|
||||||
self.lock = threading.RLock()
|
self.lock = threading.RLock()
|
||||||
self.sweepstore = SweepStore(os.path.join(network.config.path, "watchtower_db"), network)
|
self.sweepstore = None
|
||||||
|
self.channels = {}
|
||||||
|
if self.config.get('sweepstore', False):
|
||||||
|
self.sweepstore = SweepStore(os.path.join(network.config.path, "watchtower_db"), network)
|
||||||
|
self.watchtower = None
|
||||||
|
if self.config.get('watchtower_url'):
|
||||||
|
self.set_remote_watchtower()
|
||||||
self.network.register_callback(self.on_network_update,
|
self.network.register_callback(self.on_network_update,
|
||||||
['network_updated', 'blockchain_updated', 'verified', 'wallet_updated'])
|
['network_updated', 'blockchain_updated', 'verified', 'wallet_updated'])
|
||||||
self.set_remote_watchtower()
|
|
||||||
# this maps funding_outpoints to ListenerItems, which have an event for when the watcher is done,
|
# this maps funding_outpoints to ListenerItems, which have an event for when the watcher is done,
|
||||||
# and a queue for seeing which txs are being published
|
# and a queue for seeing which txs are being published
|
||||||
self.tx_progress = {} # type: Dict[str, ListenerItem]
|
self.tx_progress = {} # type: Dict[str, ListenerItem]
|
||||||
|
@ -167,14 +171,18 @@ class LNWatcher(AddressSynchronizer):
|
||||||
self.watchtower = jsonrpclib.Server(watchtower_url) if watchtower_url else None
|
self.watchtower = jsonrpclib.Server(watchtower_url) if watchtower_url else None
|
||||||
except:
|
except:
|
||||||
self.watchtower = None
|
self.watchtower = None
|
||||||
self.watchtower_queue = asyncio.Queue()
|
self.watchtower_queue = asyncio.Queue()
|
||||||
|
|
||||||
def get_num_tx(self, outpoint):
|
def get_num_tx(self, outpoint):
|
||||||
|
if not self.sweepstore:
|
||||||
|
return 0
|
||||||
async def f():
|
async def f():
|
||||||
return await self.sweepstore.get_num_tx(outpoint)
|
return await self.sweepstore.get_num_tx(outpoint)
|
||||||
return self.network.run_from_another_thread(f())
|
return self.network.run_from_another_thread(f())
|
||||||
|
|
||||||
def list_sweep_tx(self):
|
def list_sweep_tx(self):
|
||||||
|
if not self.sweepstore:
|
||||||
|
return []
|
||||||
async def f():
|
async def f():
|
||||||
return await self.sweepstore.list_sweep_tx()
|
return await self.sweepstore.list_sweep_tx()
|
||||||
return self.network.run_from_another_thread(f())
|
return self.network.run_from_another_thread(f())
|
||||||
|
@ -182,35 +190,25 @@ class LNWatcher(AddressSynchronizer):
|
||||||
@ignore_exceptions
|
@ignore_exceptions
|
||||||
@log_exceptions
|
@log_exceptions
|
||||||
async def watchtower_task(self):
|
async def watchtower_task(self):
|
||||||
|
if not self.watchtower:
|
||||||
|
return
|
||||||
self.logger.info('watchtower task started')
|
self.logger.info('watchtower task started')
|
||||||
# initial check
|
|
||||||
for address, outpoint in await self.sweepstore.list_channel_info():
|
|
||||||
await self.watchtower_queue.put(outpoint)
|
|
||||||
while True:
|
while True:
|
||||||
outpoint = await self.watchtower_queue.get()
|
outpoint, prevout, tx = await self.watchtower_queue.get()
|
||||||
if self.watchtower is None:
|
|
||||||
continue
|
|
||||||
# synchronize with remote
|
|
||||||
try:
|
try:
|
||||||
local_n = await self.sweepstore.get_num_tx(outpoint)
|
self.watchtower.add_sweep_tx(outpoint, prevout, tx)
|
||||||
n = self.watchtower.get_num_tx(outpoint)
|
self.logger.info("transaction sent to watchtower")
|
||||||
if n == 0:
|
|
||||||
address = await self.sweepstore.get_address(outpoint)
|
|
||||||
self.watchtower.add_channel(outpoint, address)
|
|
||||||
self.logger.info("sending %d transactions to watchtower"%(local_n - n))
|
|
||||||
for index in range(n, local_n):
|
|
||||||
prevout, tx = await self.sweepstore.get_tx_by_index(outpoint, index)
|
|
||||||
self.watchtower.add_sweep_tx(outpoint, prevout, tx)
|
|
||||||
except ConnectionRefusedError:
|
except ConnectionRefusedError:
|
||||||
self.logger.info('could not reach watchtower, will retry in 5s')
|
self.logger.info('could not reach watchtower, will retry in 5s')
|
||||||
await asyncio.sleep(5)
|
await asyncio.sleep(5)
|
||||||
await self.watchtower_queue.put(outpoint)
|
await self.watchtower_queue.put((outpoint, prevout, tx))
|
||||||
|
|
||||||
async def add_channel(self, outpoint, address):
|
def add_channel(self, outpoint, address):
|
||||||
self.add_address(address)
|
self.add_address(address)
|
||||||
with self.lock:
|
self.channels[address] = outpoint
|
||||||
if not await self.sweepstore.has_channel(outpoint):
|
#if self.sweepstore:
|
||||||
await self.sweepstore.add_channel(outpoint, address)
|
# if not await self.sweepstore.has_channel(outpoint):
|
||||||
|
# await self.sweepstore.add_channel(outpoint, address)
|
||||||
|
|
||||||
async def unwatch_channel(self, address, funding_outpoint):
|
async def unwatch_channel(self, address, funding_outpoint):
|
||||||
self.logger.info(f'unwatching {funding_outpoint}')
|
self.logger.info(f'unwatching {funding_outpoint}')
|
||||||
|
@ -229,7 +227,7 @@ class LNWatcher(AddressSynchronizer):
|
||||||
return
|
return
|
||||||
if not self.up_to_date:
|
if not self.up_to_date:
|
||||||
return
|
return
|
||||||
for address, outpoint in await self.sweepstore.list_channel_info():
|
for address, outpoint in self.channels.items():
|
||||||
await self.check_onchain_situation(address, outpoint)
|
await self.check_onchain_situation(address, outpoint)
|
||||||
|
|
||||||
async def check_onchain_situation(self, address, funding_outpoint):
|
async def check_onchain_situation(self, address, funding_outpoint):
|
||||||
|
@ -306,9 +304,10 @@ class LNWatcher(AddressSynchronizer):
|
||||||
return txid
|
return txid
|
||||||
|
|
||||||
async def add_sweep_tx(self, funding_outpoint: str, prevout: str, tx: str):
|
async def add_sweep_tx(self, funding_outpoint: str, prevout: str, tx: str):
|
||||||
await self.sweepstore.add_sweep_tx(funding_outpoint, prevout, tx)
|
if self.sweepstore:
|
||||||
|
await self.sweepstore.add_sweep_tx(funding_outpoint, prevout, tx)
|
||||||
if self.watchtower:
|
if self.watchtower:
|
||||||
self.watchtower_queue.put_nowait(funding_outpoint)
|
self.watchtower_queue.put_nowait(funding_outpoint, prevout, tx)
|
||||||
|
|
||||||
def get_tx_mined_depth(self, txid: str):
|
def get_tx_mined_depth(self, txid: str):
|
||||||
if not txid:
|
if not txid:
|
||||||
|
|
|
@ -47,6 +47,7 @@ from .lnrouter import RouteEdge, is_route_sane_to_use
|
||||||
from .address_synchronizer import TX_HEIGHT_LOCAL
|
from .address_synchronizer import TX_HEIGHT_LOCAL
|
||||||
from . import lnsweep
|
from . import lnsweep
|
||||||
from .lnsweep import create_sweeptxs_for_their_ctx, create_sweeptxs_for_our_ctx
|
from .lnsweep import create_sweeptxs_for_their_ctx, create_sweeptxs_for_our_ctx
|
||||||
|
from .lnwatcher import LNWatcher
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from .network import Network
|
from .network import Network
|
||||||
|
@ -317,19 +318,20 @@ class LNWallet(LNWorker):
|
||||||
self.pending_payments = defaultdict(asyncio.Future)
|
self.pending_payments = defaultdict(asyncio.Future)
|
||||||
|
|
||||||
def start_network(self, network: 'Network'):
|
def start_network(self, network: 'Network'):
|
||||||
|
self.lnwatcher = LNWatcher(network)
|
||||||
self.network = network
|
self.network = network
|
||||||
self.network.register_callback(self.on_network_update, ['wallet_updated', 'network_updated', 'verified', 'fee']) # thread safe
|
self.network.register_callback(self.on_network_update, ['wallet_updated', 'network_updated', 'verified', 'fee']) # thread safe
|
||||||
self.network.register_callback(self.on_channel_open, ['channel_open'])
|
self.network.register_callback(self.on_channel_open, ['channel_open'])
|
||||||
self.network.register_callback(self.on_channel_closed, ['channel_closed'])
|
self.network.register_callback(self.on_channel_closed, ['channel_closed'])
|
||||||
|
|
||||||
for chan_id, chan in self.channels.items():
|
for chan_id, chan in self.channels.items():
|
||||||
self.network.lnwatcher.add_address(chan.get_funding_address())
|
self.lnwatcher.add_channel(chan.funding_outpoint.to_str(), chan.get_funding_address())
|
||||||
|
|
||||||
super().start_network(network)
|
super().start_network(network)
|
||||||
for coro in [
|
for coro in [
|
||||||
self.maybe_listen(),
|
self.maybe_listen(),
|
||||||
self.on_network_update('network_updated'), # shortcut (don't block) if funding tx locked and verified
|
self.on_network_update('network_updated'), # shortcut (don't block) if funding tx locked and verified
|
||||||
self.network.lnwatcher.on_network_update('network_updated'), # ping watcher to check our channels
|
self.lnwatcher.on_network_update('network_updated'), # ping watcher to check our channels
|
||||||
self.reestablish_peers_and_channels()
|
self.reestablish_peers_and_channels()
|
||||||
]:
|
]:
|
||||||
asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(coro), self.network.asyncio_loop)
|
asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(coro), self.network.asyncio_loop)
|
||||||
|
@ -468,13 +470,13 @@ class LNWallet(LNWorker):
|
||||||
if it's also deep enough, also save to disk.
|
if it's also deep enough, also save to disk.
|
||||||
Returns tuple (mined_deep_enough, num_confirmations).
|
Returns tuple (mined_deep_enough, num_confirmations).
|
||||||
"""
|
"""
|
||||||
lnwatcher = self.network.lnwatcher
|
conf = self.lnwatcher.get_tx_height(chan.funding_outpoint.txid).conf
|
||||||
conf = lnwatcher.get_tx_height(chan.funding_outpoint.txid).conf
|
|
||||||
if conf > 0:
|
if conf > 0:
|
||||||
block_height, tx_pos = lnwatcher.get_txpos(chan.funding_outpoint.txid)
|
block_height, tx_pos = self.lnwatcher.get_txpos(chan.funding_outpoint.txid)
|
||||||
assert tx_pos >= 0
|
assert tx_pos >= 0
|
||||||
chan.short_channel_id_predicted = calc_short_channel_id(block_height, tx_pos, chan.funding_outpoint.output_index)
|
chan.short_channel_id_predicted = calc_short_channel_id(block_height, tx_pos, chan.funding_outpoint.output_index)
|
||||||
if conf >= chan.constraints.funding_txn_minimum_depth > 0:
|
if conf >= chan.constraints.funding_txn_minimum_depth > 0:
|
||||||
|
self.logger.info(f"save_short_channel_id")
|
||||||
chan.short_channel_id = chan.short_channel_id_predicted
|
chan.short_channel_id = chan.short_channel_id_predicted
|
||||||
self.save_channel(chan)
|
self.save_channel(chan)
|
||||||
self.on_channels_updated()
|
self.on_channels_updated()
|
||||||
|
@ -492,7 +494,7 @@ class LNWallet(LNWorker):
|
||||||
chan = self.channel_by_txo(funding_outpoint)
|
chan = self.channel_by_txo(funding_outpoint)
|
||||||
if not chan:
|
if not chan:
|
||||||
return
|
return
|
||||||
#self.logger.debug(f'on_channel_open {funding_outpoint}')
|
self.logger.debug(f'on_channel_open {funding_outpoint}')
|
||||||
self.channel_timestamps[bh2u(chan.channel_id)] = funding_txid, funding_height.height, funding_height.timestamp, None, None, None
|
self.channel_timestamps[bh2u(chan.channel_id)] = funding_txid, funding_height.height, funding_height.timestamp, None, None, None
|
||||||
self.storage.put('lightning_channel_timestamps', self.channel_timestamps)
|
self.storage.put('lightning_channel_timestamps', self.channel_timestamps)
|
||||||
chan.set_funding_txo_spentness(False)
|
chan.set_funding_txo_spentness(False)
|
||||||
|
@ -550,7 +552,7 @@ class LNWallet(LNWorker):
|
||||||
.format(name, local_height, cltv_expiry, prevout))
|
.format(name, local_height, cltv_expiry, prevout))
|
||||||
broadcast = False
|
broadcast = False
|
||||||
if csv_delay:
|
if csv_delay:
|
||||||
prev_height = self.network.lnwatcher.get_tx_height(prev_txid)
|
prev_height = self.lnwatcher.get_tx_height(prev_txid)
|
||||||
remaining = csv_delay - prev_height.conf
|
remaining = csv_delay - prev_height.conf
|
||||||
if remaining > 0:
|
if remaining > 0:
|
||||||
self.logger.info('waiting for {}: CSV ({} >= {}), prevout: {}'
|
self.logger.info('waiting for {}: CSV ({} >= {}), prevout: {}'
|
||||||
|
@ -593,9 +595,8 @@ class LNWallet(LNWorker):
|
||||||
# since short_channel_id could be changed while saving.
|
# since short_channel_id could be changed while saving.
|
||||||
with self.lock:
|
with self.lock:
|
||||||
channels = list(self.channels.values())
|
channels = list(self.channels.values())
|
||||||
lnwatcher = self.network.lnwatcher
|
|
||||||
if event in ('verified', 'wallet_updated'):
|
if event in ('verified', 'wallet_updated'):
|
||||||
if args[0] != lnwatcher:
|
if args[0] != self.lnwatcher:
|
||||||
return
|
return
|
||||||
for chan in channels:
|
for chan in channels:
|
||||||
if chan.is_closed():
|
if chan.is_closed():
|
||||||
|
@ -615,11 +616,11 @@ class LNWallet(LNWorker):
|
||||||
return
|
return
|
||||||
if event == 'fee':
|
if event == 'fee':
|
||||||
await peer.bitcoin_fee_update(chan)
|
await peer.bitcoin_fee_update(chan)
|
||||||
conf = lnwatcher.get_tx_height(chan.funding_outpoint.txid).conf
|
conf = self.lnwatcher.get_tx_height(chan.funding_outpoint.txid).conf
|
||||||
peer.on_network_update(chan, conf)
|
peer.on_network_update(chan, conf)
|
||||||
elif chan.force_closed and chan.get_state() != 'CLOSED':
|
elif chan.force_closed and chan.get_state() != 'CLOSED':
|
||||||
txid = chan.force_close_tx().txid()
|
txid = chan.force_close_tx().txid()
|
||||||
height = lnwatcher.get_tx_height(txid).height
|
height = self.lnwatcher.get_tx_height(txid).height
|
||||||
self.logger.info(f"force closing tx {txid}, height {height}")
|
self.logger.info(f"force closing tx {txid}, height {height}")
|
||||||
if height == TX_HEIGHT_LOCAL:
|
if height == TX_HEIGHT_LOCAL:
|
||||||
self.logger.info('REBROADCASTING CLOSING TX')
|
self.logger.info('REBROADCASTING CLOSING TX')
|
||||||
|
@ -635,8 +636,7 @@ class LNWallet(LNWorker):
|
||||||
push_msat=push_sat * 1000,
|
push_msat=push_sat * 1000,
|
||||||
temp_channel_id=os.urandom(32))
|
temp_channel_id=os.urandom(32))
|
||||||
self.save_channel(chan)
|
self.save_channel(chan)
|
||||||
self.network.lnwatcher.add_address(chan.get_funding_address())
|
self.lnwatcher.add_channel(chan.funding_outpoint.to_str(), chan.get_funding_address())
|
||||||
await self.network.lnwatcher.add_channel(chan.funding_outpoint.to_str(), chan.get_funding_address())
|
|
||||||
self.on_channels_updated()
|
self.on_channels_updated()
|
||||||
return chan
|
return chan
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue