mirror of
https://github.com/LBRYFoundation/LBRY-Vault.git
synced 2025-09-05 05:15:12 +00:00
lnworker: implement exponential backoff for retries
This commit is contained in:
parent
86b29603cb
commit
90cb032721
3 changed files with 49 additions and 30 deletions
|
@ -219,10 +219,7 @@ class Peer(Logger):
|
|||
if constants.net.rev_genesis_bytes() not in their_chains:
|
||||
raise GracefulDisconnect(f"no common chain found with remote. (they sent: {their_chains})")
|
||||
# all checks passed
|
||||
if self.channel_db and isinstance(self.transport, LNTransport):
|
||||
self.channel_db.add_recent_peer(self.transport.peer_addr)
|
||||
for chan in self.channels.values():
|
||||
chan.add_or_update_peer_addr(self.transport.peer_addr)
|
||||
self.lnworker.on_peer_successfully_established(self)
|
||||
self._received_init = True
|
||||
self.maybe_set_initialized()
|
||||
|
||||
|
|
|
@ -155,6 +155,8 @@ class LNTransportBase:
|
|||
|
||||
|
||||
class LNResponderTransport(LNTransportBase):
|
||||
"""Transport initiated by remote party."""
|
||||
|
||||
def __init__(self, privkey: bytes, reader: StreamReader, writer: StreamWriter):
|
||||
LNTransportBase.__init__(self)
|
||||
self.reader = reader
|
||||
|
@ -211,7 +213,9 @@ class LNResponderTransport(LNTransportBase):
|
|||
self.init_counters(ck)
|
||||
return rs
|
||||
|
||||
|
||||
class LNTransport(LNTransportBase):
|
||||
"""Transport initiated by local party."""
|
||||
|
||||
def __init__(self, privkey: bytes, peer_addr: LNPeerAddr):
|
||||
LNTransportBase.__init__(self)
|
||||
|
|
|
@ -77,9 +77,11 @@ SAVED_PR_STATUS = [PR_PAID, PR_UNPAID, PR_INFLIGHT] # status that are persisted
|
|||
|
||||
|
||||
NUM_PEERS_TARGET = 4
|
||||
PEER_RETRY_INTERVAL = 600 # seconds
|
||||
PEER_RETRY_INTERVAL_FOR_CHANNELS = 30 # seconds
|
||||
GRAPH_DOWNLOAD_SECONDS = 600
|
||||
|
||||
MAX_RETRY_DELAY_FOR_PEERS = 3600 # sec
|
||||
INIT_RETRY_DELAY_FOR_PEERS = 600 # sec
|
||||
MAX_RETRY_DELAY_FOR_CHANNEL_PEERS = 300 # sec
|
||||
INIT_RETRY_DELAY_FOR_CHANNEL_PEERS = 4 # sec
|
||||
|
||||
FALLBACK_NODE_LIST_TESTNET = (
|
||||
LNPeerAddr(host='203.132.95.10', port=9735, pubkey=bfh('038863cf8ab91046230f561cd5b386cbff8309fa02e3f0c3ed161a3aeb64a643b9')),
|
||||
|
@ -156,6 +158,8 @@ class LNWorker(Logger):
|
|||
self.features |= LnFeatures.VAR_ONION_OPT
|
||||
self.features |= LnFeatures.PAYMENT_SECRET_OPT
|
||||
|
||||
self._last_tried_peer = {} # type: Dict[LNPeerAddr, Tuple[float, int]] # LNPeerAddr -> (unix ts, num_attempts)
|
||||
|
||||
def channels_for_peer(self, node_id):
|
||||
return {}
|
||||
|
||||
|
@ -204,8 +208,7 @@ class LNWorker(Logger):
|
|||
continue
|
||||
peers = await self._get_next_peers_to_try()
|
||||
for peer in peers:
|
||||
last_tried = self._last_tried_peer.get(peer, 0)
|
||||
if last_tried + PEER_RETRY_INTERVAL < now:
|
||||
if self._can_retry_peer(peer, now=now):
|
||||
await self._add_peer(peer.host, peer.port, peer.pubkey)
|
||||
|
||||
async def _add_peer(self, host, port, node_id) -> Peer:
|
||||
|
@ -214,7 +217,8 @@ class LNWorker(Logger):
|
|||
port = int(port)
|
||||
peer_addr = LNPeerAddr(host, port, node_id)
|
||||
transport = LNTransport(self.node_keypair.privkey, peer_addr)
|
||||
self._last_tried_peer[peer_addr] = time.time()
|
||||
last_time, num_attempts = self._last_tried_peer.get(peer_addr, (0, 0))
|
||||
self._last_tried_peer[peer_addr] = time.time(), num_attempts + 1
|
||||
self.logger.info(f"adding peer {peer_addr}")
|
||||
peer = Peer(self, node_id, transport)
|
||||
await self.taskgroup.spawn(peer.main_loop())
|
||||
|
@ -233,7 +237,6 @@ class LNWorker(Logger):
|
|||
self.network = network
|
||||
self.config = network.config
|
||||
self.channel_db = self.network.channel_db
|
||||
self._last_tried_peer = {} # type: Dict[LNPeerAddr, float] # LNPeerAddr -> unix timestamp
|
||||
self._add_peers_from_config()
|
||||
asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop)
|
||||
|
||||
|
@ -259,20 +262,43 @@ class LNWorker(Logger):
|
|||
#self.logger.info(f'is_good {peer.host}')
|
||||
return True
|
||||
|
||||
def on_peer_successfully_established(self, peer: Peer) -> None:
|
||||
if isinstance(peer.transport, LNTransport):
|
||||
peer_addr = peer.transport.peer_addr
|
||||
# reset connection attempt count
|
||||
self._last_tried_peer[peer_addr] = time.time(), 0
|
||||
# add into channel db
|
||||
if self.channel_db:
|
||||
self.channel_db.add_recent_peer(peer_addr)
|
||||
# save network address into channels we might have with peer
|
||||
for chan in peer.channels.values():
|
||||
chan.add_or_update_peer_addr(peer_addr)
|
||||
|
||||
def _can_retry_peer(self, peer: LNPeerAddr, *,
|
||||
now: float = None, for_channel: bool = False) -> bool:
|
||||
if now is None:
|
||||
now = time.time()
|
||||
last_time, num_attempts = self._last_tried_peer.get(peer, (0, 0))
|
||||
if for_channel:
|
||||
delay = min(MAX_RETRY_DELAY_FOR_CHANNEL_PEERS,
|
||||
INIT_RETRY_DELAY_FOR_CHANNEL_PEERS * 2 ** num_attempts)
|
||||
else:
|
||||
delay = min(MAX_RETRY_DELAY_FOR_PEERS,
|
||||
INIT_RETRY_DELAY_FOR_PEERS * 2 ** num_attempts)
|
||||
next_time = last_time + delay
|
||||
return next_time < now
|
||||
|
||||
async def _get_next_peers_to_try(self) -> Sequence[LNPeerAddr]:
|
||||
now = time.time()
|
||||
await self.channel_db.data_loaded.wait()
|
||||
recent_peers = self.channel_db.get_recent_peers()
|
||||
# maintenance for last tried times
|
||||
# due to this, below we can just test membership in _last_tried_peer
|
||||
for peer in list(self._last_tried_peer):
|
||||
if now >= self._last_tried_peer[peer] + PEER_RETRY_INTERVAL:
|
||||
del self._last_tried_peer[peer]
|
||||
# first try from recent peers
|
||||
recent_peers = self.channel_db.get_recent_peers()
|
||||
for peer in recent_peers:
|
||||
if not peer:
|
||||
continue
|
||||
if peer.pubkey in self.peers:
|
||||
continue
|
||||
if peer in self._last_tried_peer:
|
||||
if not self._can_retry_peer(peer, now=now):
|
||||
continue
|
||||
if not self.is_good_peer(peer):
|
||||
continue
|
||||
|
@ -289,7 +315,7 @@ class LNWorker(Logger):
|
|||
peer = LNPeerAddr(host, port, node_id)
|
||||
except ValueError:
|
||||
continue
|
||||
if peer in self._last_tried_peer:
|
||||
if not self._can_retry_peer(peer, now=now):
|
||||
continue
|
||||
if not self.is_good_peer(peer):
|
||||
continue
|
||||
|
@ -304,7 +330,7 @@ class LNWorker(Logger):
|
|||
else:
|
||||
return [] # regtest??
|
||||
|
||||
fallback_list = [peer for peer in fallback_list if peer not in self._last_tried_peer]
|
||||
fallback_list = [peer for peer in fallback_list if self._can_retry_peer(peer, now=now)]
|
||||
if fallback_list:
|
||||
return [random.choice(fallback_list)]
|
||||
|
||||
|
@ -1269,18 +1295,10 @@ class LNWallet(LNWorker):
|
|||
peer_addresses.append(LNPeerAddr(host, port, chan.node_id))
|
||||
# will try addresses stored in channel storage
|
||||
peer_addresses += list(chan.get_peer_addresses())
|
||||
# Done gathering addresses.
|
||||
# Now select first one that has not failed recently.
|
||||
# Use long retry interval to check. This ensures each address we gathered gets a chance.
|
||||
for peer in peer_addresses:
|
||||
last_tried = self._last_tried_peer.get(peer, 0)
|
||||
if last_tried + PEER_RETRY_INTERVAL < now:
|
||||
await self._add_peer(peer.host, peer.port, peer.pubkey)
|
||||
return
|
||||
# Still here? That means all addresses failed ~recently.
|
||||
# Use short retry interval now.
|
||||
for peer in peer_addresses:
|
||||
last_tried = self._last_tried_peer.get(peer, 0)
|
||||
if last_tried + PEER_RETRY_INTERVAL_FOR_CHANNELS < now:
|
||||
if self._can_retry_peer(peer, for_channel=True, now=now):
|
||||
await self._add_peer(peer.host, peer.port, peer.pubkey)
|
||||
return
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue