lnworker: add own taskgroup (run in daemon.taskgroup)

This commit is contained in:
SomberNight 2020-02-27 18:50:03 +01:00
parent 0bf09d14a0
commit c8260249b0
No known key found for this signature in database
GPG key ID: B33B5F232C6271E9
2 changed files with 22 additions and 12 deletions

View file

@ -238,7 +238,7 @@ class Peer(Logger):
self.close_and_cleanup() self.close_and_cleanup()
return wrapper_func return wrapper_func
@ignore_exceptions # do not kill main_taskgroup @ignore_exceptions # do not kill outer taskgroup
@log_exceptions @log_exceptions
@handle_disconnect @handle_disconnect
async def main_loop(self): async def main_loop(self):

View file

@ -32,7 +32,7 @@ from .transaction import Transaction
from .crypto import sha256 from .crypto import sha256
from .bip32 import BIP32Node from .bip32 import BIP32Node
from .util import bh2u, bfh, InvoiceError, resolve_dns_srv, is_ip_address, log_exceptions from .util import bh2u, bfh, InvoiceError, resolve_dns_srv, is_ip_address, log_exceptions
from .util import ignore_exceptions, make_aiohttp_session from .util import ignore_exceptions, make_aiohttp_session, SilentTaskGroup
from .util import timestamp_to_datetime from .util import timestamp_to_datetime
from .util import MyEncoder from .util import MyEncoder
from .logging import Logger from .logging import Logger
@ -126,6 +126,7 @@ class LNWorker(Logger):
Logger.__init__(self) Logger.__init__(self)
self.node_keypair = generate_keypair(BIP32Node.from_xkey(xprv), LnKeyFamily.NODE_KEY) self.node_keypair = generate_keypair(BIP32Node.from_xkey(xprv), LnKeyFamily.NODE_KEY)
self.peers = {} # type: Dict[bytes, Peer] # pubkey -> Peer self.peers = {} # type: Dict[bytes, Peer] # pubkey -> Peer
self.taskgroup = SilentTaskGroup()
# set some feature flags as baseline for both LNWallet and LNGossip # set some feature flags as baseline for both LNWallet and LNGossip
# note that e.g. DATA_LOSS_PROTECT is needed for LNGossip as many peers require it # note that e.g. DATA_LOSS_PROTECT is needed for LNGossip as many peers require it
self.localfeatures = LnLocalFeatures(0) self.localfeatures = LnLocalFeatures(0)
@ -136,6 +137,7 @@ class LNWorker(Logger):
return {} return {}
async def maybe_listen(self): async def maybe_listen(self):
# FIXME: only one LNWorker can listen at a time (single port)
listen_addr = self.config.get('lightning_listen') listen_addr = self.config.get('lightning_listen')
if listen_addr: if listen_addr:
addr, port = listen_addr.rsplit(':', 2) addr, port = listen_addr.rsplit(':', 2)
@ -151,11 +153,21 @@ class LNWorker(Logger):
return return
peer = Peer(self, node_id, transport) peer = Peer(self, node_id, transport)
self.peers[node_id] = peer self.peers[node_id] = peer
await self.network.main_taskgroup.spawn(peer.main_loop()) await self.taskgroup.spawn(peer.main_loop())
await asyncio.start_server(cb, addr, int(port)) try:
await asyncio.start_server(cb, addr, int(port))
except OSError as e:
self.logger.error(f"cannot listen for lightning p2p. error: {e!r}")
@log_exceptions @ignore_exceptions # don't kill outer taskgroup
async def main_loop(self): async def main_loop(self):
try:
async with self.taskgroup as group:
await group.spawn(self._maintain_connectivity())
except Exception as e:
self.logger.exception("taskgroup died.")
async def _maintain_connectivity(self):
while True: while True:
await asyncio.sleep(1) await asyncio.sleep(1)
now = time.time() now = time.time()
@ -176,7 +188,7 @@ class LNWorker(Logger):
self._last_tried_peer[peer_addr] = time.time() self._last_tried_peer[peer_addr] = time.time()
self.logger.info(f"adding peer {peer_addr}") self.logger.info(f"adding peer {peer_addr}")
peer = Peer(self, node_id, transport) peer = Peer(self, node_id, transport)
await self.network.main_taskgroup.spawn(peer.main_loop()) await self.taskgroup.spawn(peer.main_loop())
self.peers[node_id] = peer self.peers[node_id] = peer
return peer return peer
@ -328,7 +340,7 @@ class LNGossip(LNWorker):
def start_network(self, network: 'Network'): def start_network(self, network: 'Network'):
assert network assert network
super().start_network(network) super().start_network(network)
asyncio.run_coroutine_threadsafe(network.daemon.taskgroup.spawn(self.maintain_db()), self.network.asyncio_loop) asyncio.run_coroutine_threadsafe(self.taskgroup.spawn(self.maintain_db()), self.network.asyncio_loop)
async def maintain_db(self): async def maintain_db(self):
await self.channel_db.load_data() await self.channel_db.load_data()
@ -429,7 +441,6 @@ class LNWallet(LNWorker):
self.lnwatcher = LNWalletWatcher(self, network) self.lnwatcher = LNWalletWatcher(self, network)
self.lnwatcher.start_network(network) self.lnwatcher.start_network(network)
self.network = network self.network = network
daemon = network.daemon
for chan_id, chan in self.channels.items(): for chan_id, chan in self.channels.items():
self.lnwatcher.add_channel(chan.funding_outpoint.to_str(), chan.get_funding_address()) self.lnwatcher.add_channel(chan.funding_outpoint.to_str(), chan.get_funding_address())
@ -441,8 +452,8 @@ class LNWallet(LNWorker):
self.sync_with_local_watchtower(), self.sync_with_local_watchtower(),
self.sync_with_remote_watchtower(), self.sync_with_remote_watchtower(),
]: ]:
# FIXME: exceptions in those coroutines will cancel daemon.taskgroup tg_coro = self.taskgroup.spawn(coro)
asyncio.run_coroutine_threadsafe(daemon.taskgroup.spawn(coro), self.network.asyncio_loop) asyncio.run_coroutine_threadsafe(tg_coro, self.network.asyncio_loop)
def peer_closed(self, peer): def peer_closed(self, peer):
for chan in self.channels_for_peer(peer.pubkey).values(): for chan in self.channels_for_peer(peer.pubkey).values():
@ -1285,8 +1296,7 @@ class LNWallet(LNWorker):
if peer: if peer:
await peer.group.spawn(peer.reestablish_channel(chan)) await peer.group.spawn(peer.reestablish_channel(chan))
else: else:
await self.network.main_taskgroup.spawn( await self.taskgroup.spawn(self.reestablish_peer_for_given_channel(chan))
self.reestablish_peer_for_given_channel(chan))
def current_feerate_per_kw(self): def current_feerate_per_kw(self):
from .simple_config import FEE_LN_ETA_TARGET, FEERATE_FALLBACK_STATIC_FEE, FEERATE_REGTEST_HARDCODED from .simple_config import FEE_LN_ETA_TARGET, FEERATE_FALLBACK_STATIC_FEE, FEERATE_REGTEST_HARDCODED