mirror of
https://github.com/LBRYFoundation/LBRY-Vault.git
synced 2025-08-29 16:31:29 +00:00
reestablish channels in network callback
This commit is contained in:
parent
322acd93d9
commit
df960700c9
3 changed files with 18 additions and 13 deletions
|
@ -790,8 +790,6 @@ class Peer(PrintError):
|
||||||
msg = await self.read_message()
|
msg = await self.read_message()
|
||||||
self.process_message(msg)
|
self.process_message(msg)
|
||||||
self.initialized.set_result(True)
|
self.initialized.set_result(True)
|
||||||
# reestablish channels
|
|
||||||
[self.reestablish_channel(c) for c in self.channels.values() if self.lnworker.channel_state[c.channel_id] != "CLOSED"]
|
|
||||||
# loop
|
# loop
|
||||||
while True:
|
while True:
|
||||||
self.ping_if_required()
|
self.ping_if_required()
|
||||||
|
@ -930,7 +928,8 @@ class Peer(PrintError):
|
||||||
assert success, success
|
assert success, success
|
||||||
return chan._replace(remote_state=chan.remote_state._replace(ctn=0),local_state=chan.local_state._replace(ctn=0, current_commitment_signature=remote_sig))
|
return chan._replace(remote_state=chan.remote_state._replace(ctn=0),local_state=chan.local_state._replace(ctn=0, current_commitment_signature=remote_sig))
|
||||||
|
|
||||||
def reestablish_channel(self, chan):
|
async def reestablish_channel(self, chan):
|
||||||
|
await self.initialized
|
||||||
self.channel_state[chan.channel_id] = 'REESTABLISHING'
|
self.channel_state[chan.channel_id] = 'REESTABLISHING'
|
||||||
self.network.trigger_callback('channel', chan)
|
self.network.trigger_callback('channel', chan)
|
||||||
self.send_message(gen_msg("channel_reestablish",
|
self.send_message(gen_msg("channel_reestablish",
|
||||||
|
|
|
@ -1,13 +1,12 @@
|
||||||
from .util import PrintError
|
from .util import PrintError
|
||||||
from .lnbase import Outpoint, funding_output_script
|
from .lnbase import funding_output_script
|
||||||
from .bitcoin import redeem_script_to_address
|
from .bitcoin import redeem_script_to_address
|
||||||
|
|
||||||
class LNWatcher(PrintError):
|
class LNWatcher(PrintError):
|
||||||
|
|
||||||
def __init__(self, network, channel_state):
|
def __init__(self, network, channel_state):
|
||||||
self.network = network
|
self.network = network
|
||||||
self.channel_state = channel_state
|
self.watched_channels = {}
|
||||||
self.channels ={}
|
|
||||||
|
|
||||||
def parse_response(self, response):
|
def parse_response(self, response):
|
||||||
if response.get('error'):
|
if response.get('error'):
|
||||||
|
@ -15,10 +14,10 @@ class LNWatcher(PrintError):
|
||||||
return None, None
|
return None, None
|
||||||
return response['params'], response['result']
|
return response['params'], response['result']
|
||||||
|
|
||||||
def watch_channel(self, chan):
|
def watch_channel(self, chan, callback):
|
||||||
script = funding_output_script(chan.local_config, chan.remote_config)
|
script = funding_output_script(chan.local_config, chan.remote_config)
|
||||||
funding_address = redeem_script_to_address('p2wsh', script)
|
funding_address = redeem_script_to_address('p2wsh', script)
|
||||||
self.channels[funding_address] = chan
|
self.watched_channels[funding_address] = chan, callback
|
||||||
self.network.subscribe_to_addresses([funding_address], self.on_address_status)
|
self.network.subscribe_to_addresses([funding_address], self.on_address_status)
|
||||||
|
|
||||||
def on_address_status(self, response):
|
def on_address_status(self, response):
|
||||||
|
@ -33,7 +32,5 @@ class LNWatcher(PrintError):
|
||||||
if not params:
|
if not params:
|
||||||
return
|
return
|
||||||
addr = params[0]
|
addr = params[0]
|
||||||
chan = self.channels[addr]
|
chan, callback = self.watched_channels[addr]
|
||||||
outpoints = [Outpoint(x["tx_hash"], x["tx_pos"]) for x in result]
|
callback(chan, result)
|
||||||
if chan.funding_outpoint not in outpoints:
|
|
||||||
self.channel_state[chan.channel_id] = "CLOSED"
|
|
||||||
|
|
|
@ -102,7 +102,7 @@ class LNWorker(PrintError):
|
||||||
self.channel_state = {chan.channel_id: "DISCONNECTED" for chan in self.channels.values()}
|
self.channel_state = {chan.channel_id: "DISCONNECTED" for chan in self.channels.values()}
|
||||||
self.lnwatcher = LNWatcher(network, self.channel_state)
|
self.lnwatcher = LNWatcher(network, self.channel_state)
|
||||||
for chan_id, chan in self.channels.items():
|
for chan_id, chan in self.channels.items():
|
||||||
self.lnwatcher.watch_channel(chan)
|
self.lnwatcher.watch_channel(chan, self.on_channel_utxos)
|
||||||
for host, port, pubkey in peer_list:
|
for host, port, pubkey in peer_list:
|
||||||
self.add_peer(host, int(port), pubkey)
|
self.add_peer(host, int(port), pubkey)
|
||||||
# wait until we see confirmations
|
# wait until we see confirmations
|
||||||
|
@ -153,6 +153,15 @@ class LNWorker(PrintError):
|
||||||
return chan
|
return chan
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def on_channel_utxos(self, chan, utxos):
|
||||||
|
outpoints = [Outpoint(x["tx_hash"], x["tx_pos"]) for x in utxos]
|
||||||
|
if chan.funding_outpoint not in outpoints:
|
||||||
|
self.channel_state[chan.channel_id] = "CLOSED"
|
||||||
|
elif self.channel_state[chan.channel_id] == 'DISCONNECTED':
|
||||||
|
peer = self.peers[chan.node_id]
|
||||||
|
coro = peer.reestablish_channel(chan)
|
||||||
|
asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop)
|
||||||
|
|
||||||
def on_network_update(self, event, *args):
|
def on_network_update(self, event, *args):
|
||||||
for chan in self.channels.values():
|
for chan in self.channels.values():
|
||||||
peer = self.peers[chan.node_id]
|
peer = self.peers[chan.node_id]
|
||||||
|
|
Loading…
Add table
Reference in a new issue