mirror of
https://github.com/LBRYFoundation/LBRY-Vault.git
synced 2025-08-23 17:47:31 +00:00
make LNWatcher inherit AddressSynchronizer
This commit is contained in:
parent
78896897cb
commit
88c6eeb966
2 changed files with 23 additions and 27 deletions
|
@ -35,20 +35,18 @@ class TxMinedDepth(IntEnum):
|
||||||
FREE = auto()
|
FREE = auto()
|
||||||
|
|
||||||
|
|
||||||
class LNWatcher(PrintError):
|
class LNWatcher(AddressSynchronizer):
|
||||||
# TODO if verifier gets an incorrect merkle proof, that tx will never verify!!
|
# TODO if verifier gets an incorrect merkle proof, that tx will never verify!!
|
||||||
# similarly, what if server ignores request for merkle proof?
|
# similarly, what if server ignores request for merkle proof?
|
||||||
# maybe we should disconnect from server in these cases
|
# maybe we should disconnect from server in these cases
|
||||||
verbosity_filter = 'W'
|
verbosity_filter = 'W'
|
||||||
|
|
||||||
def __init__(self, network: 'Network'):
|
def __init__(self, network: 'Network'):
|
||||||
self.network = network
|
|
||||||
self.config = network.config
|
|
||||||
path = os.path.join(network.config.path, "watcher_db")
|
path = os.path.join(network.config.path, "watcher_db")
|
||||||
storage = WalletStorage(path)
|
storage = WalletStorage(path)
|
||||||
self.addr_sync = AddressSynchronizer(storage)
|
AddressSynchronizer.__init__(self, storage)
|
||||||
self.addr_sync.diagnostic_name = lambda: 'LnWatcherAS'
|
self.config = network.config
|
||||||
self.addr_sync.start_network(network)
|
self.start_network(network)
|
||||||
self.lock = threading.RLock()
|
self.lock = threading.RLock()
|
||||||
self.watched_addresses = set()
|
self.watched_addresses = set()
|
||||||
self.channel_info = storage.get('channel_info', {}) # access with 'lock'
|
self.channel_info = storage.get('channel_info', {}) # access with 'lock'
|
||||||
|
@ -99,7 +97,7 @@ class LNWatcher(PrintError):
|
||||||
def write_to_disk(self):
|
def write_to_disk(self):
|
||||||
# FIXME: json => every update takes linear instead of constant disk write
|
# FIXME: json => every update takes linear instead of constant disk write
|
||||||
with self.lock:
|
with self.lock:
|
||||||
storage = self.addr_sync.storage
|
storage = self.storage
|
||||||
storage.put('channel_info', self.channel_info)
|
storage.put('channel_info', self.channel_info)
|
||||||
# self.sweepstore
|
# self.sweepstore
|
||||||
sweepstore = {}
|
sweepstore = {}
|
||||||
|
@ -121,13 +119,12 @@ class LNWatcher(PrintError):
|
||||||
@log_exceptions
|
@log_exceptions
|
||||||
async def on_network_update(self, event, *args):
|
async def on_network_update(self, event, *args):
|
||||||
if event in ('verified', 'wallet_updated'):
|
if event in ('verified', 'wallet_updated'):
|
||||||
wallet = args[0]
|
if args[0] != self:
|
||||||
if wallet != self.addr_sync:
|
|
||||||
return
|
return
|
||||||
if not self.addr_sync.synchronizer:
|
if not self.synchronizer:
|
||||||
self.print_error("synchronizer not set yet")
|
self.print_error("synchronizer not set yet")
|
||||||
return
|
return
|
||||||
if not self.addr_sync.synchronizer.is_up_to_date():
|
if not self.synchronizer.is_up_to_date():
|
||||||
return
|
return
|
||||||
with self.lock:
|
with self.lock:
|
||||||
channel_info_items = list(self.channel_info.items())
|
channel_info_items = list(self.channel_info.items())
|
||||||
|
@ -137,21 +134,21 @@ class LNWatcher(PrintError):
|
||||||
def watch_address(self, addr):
|
def watch_address(self, addr):
|
||||||
with self.lock:
|
with self.lock:
|
||||||
self.watched_addresses.add(addr)
|
self.watched_addresses.add(addr)
|
||||||
self.addr_sync.add_address(addr)
|
self.add_address(addr)
|
||||||
|
|
||||||
async def check_onchain_situation(self, funding_outpoint):
|
async def check_onchain_situation(self, funding_outpoint):
|
||||||
txid, index = funding_outpoint.split(':')
|
txid, index = funding_outpoint.split(':')
|
||||||
ctx_candidate_txid = self.addr_sync.spent_outpoints[txid].get(int(index))
|
ctx_candidate_txid = self.spent_outpoints[txid].get(int(index))
|
||||||
is_spent = ctx_candidate_txid is not None
|
is_spent = ctx_candidate_txid is not None
|
||||||
self.network.trigger_callback('channel_txo', funding_outpoint, is_spent)
|
self.network.trigger_callback('channel_txo', funding_outpoint, is_spent)
|
||||||
if not is_spent:
|
if not is_spent:
|
||||||
return
|
return
|
||||||
ctx_candidate = self.addr_sync.transactions.get(ctx_candidate_txid)
|
ctx_candidate = self.transactions.get(ctx_candidate_txid)
|
||||||
if ctx_candidate is None:
|
if ctx_candidate is None:
|
||||||
return
|
return
|
||||||
#self.print_error("funding outpoint {} is spent by {}"
|
#self.print_error("funding outpoint {} is spent by {}"
|
||||||
# .format(funding_outpoint, ctx_candidate_txid))
|
# .format(funding_outpoint, ctx_candidate_txid))
|
||||||
conf = self.addr_sync.get_tx_height(ctx_candidate_txid).conf
|
conf = self.get_tx_height(ctx_candidate_txid).conf
|
||||||
# only care about confirmed and verified ctxs. TODO is this necessary?
|
# only care about confirmed and verified ctxs. TODO is this necessary?
|
||||||
if conf == 0:
|
if conf == 0:
|
||||||
return
|
return
|
||||||
|
@ -189,12 +186,12 @@ class LNWatcher(PrintError):
|
||||||
local_height = self.network.get_local_height()
|
local_height = self.network.get_local_height()
|
||||||
self.print_error(funding_outpoint, 'iterating over encumbered txs')
|
self.print_error(funding_outpoint, 'iterating over encumbered txs')
|
||||||
for e_tx in list(encumbered_sweep_txns):
|
for e_tx in list(encumbered_sweep_txns):
|
||||||
conflicts = self.addr_sync.get_conflicting_transactions(e_tx.tx.txid(), e_tx.tx, include_self=True)
|
conflicts = self.get_conflicting_transactions(e_tx.tx.txid(), e_tx.tx, include_self=True)
|
||||||
conflict_mined_depth = self.get_deepest_tx_mined_depth_for_txids(conflicts)
|
conflict_mined_depth = self.get_deepest_tx_mined_depth_for_txids(conflicts)
|
||||||
if conflict_mined_depth != TxMinedDepth.DEEP:
|
if conflict_mined_depth != TxMinedDepth.DEEP:
|
||||||
keep_watching_this = True
|
keep_watching_this = True
|
||||||
if conflict_mined_depth == TxMinedDepth.FREE:
|
if conflict_mined_depth == TxMinedDepth.FREE:
|
||||||
tx_height = self.addr_sync.get_tx_height(prev_txid).height
|
tx_height = self.get_tx_height(prev_txid).height
|
||||||
if tx_height == TX_HEIGHT_LOCAL:
|
if tx_height == TX_HEIGHT_LOCAL:
|
||||||
continue
|
continue
|
||||||
num_conf = local_height - tx_height + 1
|
num_conf = local_height - tx_height + 1
|
||||||
|
@ -222,7 +219,7 @@ class LNWatcher(PrintError):
|
||||||
return keep_watching_this
|
return keep_watching_this
|
||||||
|
|
||||||
async def broadcast_or_log(self, funding_outpoint, e_tx):
|
async def broadcast_or_log(self, funding_outpoint, e_tx):
|
||||||
height = self.addr_sync.get_tx_height(e_tx.tx.txid()).height
|
height = self.get_tx_height(e_tx.tx.txid()).height
|
||||||
if height != TX_HEIGHT_LOCAL:
|
if height != TX_HEIGHT_LOCAL:
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
|
@ -245,7 +242,7 @@ class LNWatcher(PrintError):
|
||||||
def get_tx_mined_depth(self, txid: str):
|
def get_tx_mined_depth(self, txid: str):
|
||||||
if not txid:
|
if not txid:
|
||||||
return TxMinedStatus.FREE
|
return TxMinedStatus.FREE
|
||||||
tx_mined_depth = self.addr_sync.get_tx_height(txid)
|
tx_mined_depth = self.get_tx_height(txid)
|
||||||
height, conf = tx_mined_depth.height, tx_mined_depth.conf
|
height, conf = tx_mined_depth.height, tx_mined_depth.conf
|
||||||
if conf > 100:
|
if conf > 100:
|
||||||
return TxMinedDepth.DEEP
|
return TxMinedDepth.DEEP
|
||||||
|
|
|
@ -243,10 +243,10 @@ class LNWorker(PrintError):
|
||||||
Returns tuple (mined_deep_enough, num_confirmations).
|
Returns tuple (mined_deep_enough, num_confirmations).
|
||||||
"""
|
"""
|
||||||
assert chan.get_state() in ["OPEN", "OPENING"]
|
assert chan.get_state() in ["OPEN", "OPENING"]
|
||||||
addr_sync = self.network.lnwatcher.addr_sync
|
lnwatcher = self.network.lnwatcher
|
||||||
conf = addr_sync.get_tx_height(chan.funding_outpoint.txid).conf
|
conf = lnwatcher.get_tx_height(chan.funding_outpoint.txid).conf
|
||||||
if conf > 0:
|
if conf > 0:
|
||||||
block_height, tx_pos = addr_sync.get_txpos(chan.funding_outpoint.txid)
|
block_height, tx_pos = lnwatcher.get_txpos(chan.funding_outpoint.txid)
|
||||||
assert tx_pos >= 0
|
assert tx_pos >= 0
|
||||||
chan.short_channel_id_predicted = calc_short_channel_id(block_height, tx_pos, chan.funding_outpoint.output_index)
|
chan.short_channel_id_predicted = calc_short_channel_id(block_height, tx_pos, chan.funding_outpoint.output_index)
|
||||||
if conf >= chan.constraints.funding_txn_minimum_depth > 0:
|
if conf >= chan.constraints.funding_txn_minimum_depth > 0:
|
||||||
|
@ -279,10 +279,9 @@ class LNWorker(PrintError):
|
||||||
# since short_channel_id could be changed while saving.
|
# since short_channel_id could be changed while saving.
|
||||||
with self.lock:
|
with self.lock:
|
||||||
channels = list(self.channels.values())
|
channels = list(self.channels.values())
|
||||||
addr_sync = self.network.lnwatcher.addr_sync
|
lnwatcher = self.network.lnwatcher
|
||||||
if event in ('verified', 'wallet_updated'):
|
if event in ('verified', 'wallet_updated'):
|
||||||
wallet = args[0]
|
if args[0] != lnwatcher:
|
||||||
if wallet != addr_sync:
|
|
||||||
return
|
return
|
||||||
for chan in channels:
|
for chan in channels:
|
||||||
if chan.get_state() == "OPENING":
|
if chan.get_state() == "OPENING":
|
||||||
|
@ -300,11 +299,11 @@ class LNWorker(PrintError):
|
||||||
return
|
return
|
||||||
if event == 'fee':
|
if event == 'fee':
|
||||||
await peer.bitcoin_fee_update(chan)
|
await peer.bitcoin_fee_update(chan)
|
||||||
conf = addr_sync.get_tx_height(chan.funding_outpoint.txid).conf
|
conf = lnwatcher.get_tx_height(chan.funding_outpoint.txid).conf
|
||||||
peer.on_network_update(chan, conf)
|
peer.on_network_update(chan, conf)
|
||||||
elif chan.get_state() == 'FORCE_CLOSING':
|
elif chan.get_state() == 'FORCE_CLOSING':
|
||||||
txid = chan.force_close_tx().txid()
|
txid = chan.force_close_tx().txid()
|
||||||
height = addr_sync.get_tx_height(txid).height
|
height = lnwatcher.get_tx_height(txid).height
|
||||||
self.print_error("force closing tx", txid, "height", height)
|
self.print_error("force closing tx", txid, "height", height)
|
||||||
if height == TX_HEIGHT_LOCAL:
|
if height == TX_HEIGHT_LOCAL:
|
||||||
self.print_error('REBROADCASTING CLOSING TX')
|
self.print_error('REBROADCASTING CLOSING TX')
|
||||||
|
|
Loading…
Add table
Reference in a new issue