mirror of
https://github.com/LBRYFoundation/LBRY-Vault.git
synced 2025-08-23 17:47:31 +00:00
lnworker: make add_peer async
This commit is contained in:
parent
8bb23ea2cd
commit
ff0aa90ddf
1 changed files with 16 additions and 11 deletions
|
@ -90,7 +90,10 @@ class LNWorker(PrintError):
|
||||||
def _add_peers_from_config(self):
|
def _add_peers_from_config(self):
|
||||||
peer_list = self.config.get('lightning_peers', [])
|
peer_list = self.config.get('lightning_peers', [])
|
||||||
for host, port, pubkey in peer_list:
|
for host, port, pubkey in peer_list:
|
||||||
self.add_peer(host, int(port), bfh(pubkey))
|
asyncio.run_coroutine_threadsafe(
|
||||||
|
self.add_peer(host, int(port), bfh(pubkey)),
|
||||||
|
self.network.asyncio_loop)
|
||||||
|
|
||||||
|
|
||||||
def suggest_peer(self):
|
def suggest_peer(self):
|
||||||
for node_id, peer in self.peers.items():
|
for node_id, peer in self.peers.items():
|
||||||
|
@ -105,20 +108,20 @@ class LNWorker(PrintError):
|
||||||
with self.lock:
|
with self.lock:
|
||||||
return {x: y for (x, y) in self.channels.items() if y.node_id == node_id}
|
return {x: y for (x, y) in self.channels.items() if y.node_id == node_id}
|
||||||
|
|
||||||
def add_peer(self, host, port, node_id):
|
async def add_peer(self, host, port, node_id):
|
||||||
port = int(port)
|
port = int(port)
|
||||||
peer_addr = LNPeerAddr(host, port, node_id)
|
peer_addr = LNPeerAddr(host, port, node_id)
|
||||||
if node_id in self.peers:
|
if node_id in self.peers:
|
||||||
return
|
return
|
||||||
self._last_tried_peer[peer_addr] = time.time()
|
self._last_tried_peer[peer_addr] = time.time()
|
||||||
self.print_error("adding peer", peer_addr)
|
self.print_error("adding peer", peer_addr)
|
||||||
|
peer = Peer(self, peer_addr, request_initial_sync=self.config.get("request_initial_sync", True))
|
||||||
async def _init_peer():
|
async def _init_peer():
|
||||||
reader, writer = await asyncio.open_connection(peer_addr.host, peer_addr.port)
|
reader, writer = await asyncio.open_connection(peer_addr.host, peer_addr.port)
|
||||||
transport = LNTransport(self.node_keypair.privkey, node_id, reader, writer)
|
transport = LNTransport(self.node_keypair.privkey, node_id, reader, writer)
|
||||||
peer.transport = transport
|
peer.transport = transport
|
||||||
await self.network.main_taskgroup.spawn(peer.main_loop())
|
await self.network.main_taskgroup.spawn(peer.main_loop())
|
||||||
asyncio.ensure_future(_init_peer())
|
asyncio.ensure_future(_init_peer())
|
||||||
peer = Peer(self, peer_addr, request_initial_sync=self.config.get("request_initial_sync", True))
|
|
||||||
self.peers[node_id] = peer
|
self.peers[node_id] = peer
|
||||||
self.network.trigger_callback('ln_status')
|
self.network.trigger_callback('ln_status')
|
||||||
return peer
|
return peer
|
||||||
|
@ -240,7 +243,9 @@ class LNWorker(PrintError):
|
||||||
socket.getaddrinfo(host, int(port))
|
socket.getaddrinfo(host, int(port))
|
||||||
except socket.gaierror:
|
except socket.gaierror:
|
||||||
raise ConnStringFormatError(_('Hostname does not resolve (getaddrinfo failed)'))
|
raise ConnStringFormatError(_('Hostname does not resolve (getaddrinfo failed)'))
|
||||||
peer = self.add_peer(host, port, node_id)
|
peer_future = asyncio.run_coroutine_threadsafe(self.add_peer(host, port, node_id),
|
||||||
|
self.network.asyncio_loop)
|
||||||
|
peer = peer_future.result(timeout)
|
||||||
coro = self._open_channel_coroutine(peer, local_amt_sat, push_amt_sat, password)
|
coro = self._open_channel_coroutine(peer, local_amt_sat, push_amt_sat, password)
|
||||||
f = asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop)
|
f = asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop)
|
||||||
chan = f.result(timeout)
|
chan = f.result(timeout)
|
||||||
|
@ -452,14 +457,14 @@ class LNWorker(PrintError):
|
||||||
self.print_error('got {} ln peers from dns seed'.format(len(peers)))
|
self.print_error('got {} ln peers from dns seed'.format(len(peers)))
|
||||||
return peers
|
return peers
|
||||||
|
|
||||||
def reestablish_peers_and_channels(self):
|
async def reestablish_peers_and_channels(self):
|
||||||
def reestablish_peer_for_given_channel():
|
async def reestablish_peer_for_given_channel():
|
||||||
# try last good address first
|
# try last good address first
|
||||||
peer = self.channel_db.get_last_good_address(chan.node_id)
|
peer = self.channel_db.get_last_good_address(chan.node_id)
|
||||||
if peer:
|
if peer:
|
||||||
last_tried = self._last_tried_peer.get(peer, 0)
|
last_tried = self._last_tried_peer.get(peer, 0)
|
||||||
if last_tried + PEER_RETRY_INTERVAL_FOR_CHANNELS < now:
|
if last_tried + PEER_RETRY_INTERVAL_FOR_CHANNELS < now:
|
||||||
self.add_peer(peer.host, peer.port, peer.pubkey)
|
await self.add_peer(peer.host, peer.port, peer.pubkey)
|
||||||
return
|
return
|
||||||
# try random address for node_id
|
# try random address for node_id
|
||||||
node_info = self.channel_db.nodes.get(chan.node_id, None)
|
node_info = self.channel_db.nodes.get(chan.node_id, None)
|
||||||
|
@ -470,7 +475,7 @@ class LNWorker(PrintError):
|
||||||
peer = LNPeerAddr(host, port, chan.node_id)
|
peer = LNPeerAddr(host, port, chan.node_id)
|
||||||
last_tried = self._last_tried_peer.get(peer, 0)
|
last_tried = self._last_tried_peer.get(peer, 0)
|
||||||
if last_tried + PEER_RETRY_INTERVAL_FOR_CHANNELS < now:
|
if last_tried + PEER_RETRY_INTERVAL_FOR_CHANNELS < now:
|
||||||
self.add_peer(host, port, chan.node_id)
|
await self.add_peer(host, port, chan.node_id)
|
||||||
|
|
||||||
with self.lock:
|
with self.lock:
|
||||||
channels = list(self.channels.values())
|
channels = list(self.channels.values())
|
||||||
|
@ -480,7 +485,7 @@ class LNWorker(PrintError):
|
||||||
continue
|
continue
|
||||||
peer = self.peers.get(chan.node_id, None)
|
peer = self.peers.get(chan.node_id, None)
|
||||||
if peer is None:
|
if peer is None:
|
||||||
reestablish_peer_for_given_channel()
|
await reestablish_peer_for_given_channel()
|
||||||
else:
|
else:
|
||||||
coro = peer.reestablish_channel(chan)
|
coro = peer.reestablish_channel(chan)
|
||||||
asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop)
|
asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop)
|
||||||
|
@ -491,11 +496,11 @@ class LNWorker(PrintError):
|
||||||
while True:
|
while True:
|
||||||
await asyncio.sleep(1)
|
await asyncio.sleep(1)
|
||||||
now = time.time()
|
now = time.time()
|
||||||
self.reestablish_peers_and_channels()
|
await self.reestablish_peers_and_channels()
|
||||||
if len(self.peers) >= NUM_PEERS_TARGET:
|
if len(self.peers) >= NUM_PEERS_TARGET:
|
||||||
continue
|
continue
|
||||||
peers = self._get_next_peers_to_try()
|
peers = self._get_next_peers_to_try()
|
||||||
for peer in peers:
|
for peer in peers:
|
||||||
last_tried = self._last_tried_peer.get(peer, 0)
|
last_tried = self._last_tried_peer.get(peer, 0)
|
||||||
if last_tried + PEER_RETRY_INTERVAL < now:
|
if last_tried + PEER_RETRY_INTERVAL < now:
|
||||||
self.add_peer(peer.host, peer.port, peer.pubkey)
|
await self.add_peer(peer.host, peer.port, peer.pubkey)
|
||||||
|
|
Loading…
Add table
Reference in a new issue