mirror of
https://github.com/LBRYFoundation/LBRY-Vault.git
synced 2025-08-23 17:47:31 +00:00
lnpeer: restore "temp save orphan channel updates" functionality
needed to handle race where remote might send chan_upd too soon (before we save the short channel id for the channel after it reaches funding locked)
This commit is contained in:
parent
ba431495db
commit
d229bb4e4d
2 changed files with 45 additions and 21 deletions
|
@ -188,6 +188,15 @@ class Address(NamedTuple):
|
||||||
port: int
|
port: int
|
||||||
last_connected_date: int
|
last_connected_date: int
|
||||||
|
|
||||||
|
|
||||||
|
class CategorizedChannelUpdates(NamedTuple):
|
||||||
|
orphaned: List # no channel announcement for channel update
|
||||||
|
expired: List # update older than two weeks
|
||||||
|
deprecated: List # update older than database entry
|
||||||
|
good: List # good updates
|
||||||
|
to_delete: List # database entries to delete
|
||||||
|
|
||||||
|
|
||||||
create_channel_info = """
|
create_channel_info = """
|
||||||
CREATE TABLE IF NOT EXISTS channel_info (
|
CREATE TABLE IF NOT EXISTS channel_info (
|
||||||
short_channel_id VARCHAR(64),
|
short_channel_id VARCHAR(64),
|
||||||
|
@ -241,7 +250,7 @@ class ChannelDB(SqlDB):
|
||||||
self._channel_updates_for_private_channels = {} # type: Dict[Tuple[bytes, bytes], dict]
|
self._channel_updates_for_private_channels = {} # type: Dict[Tuple[bytes, bytes], dict]
|
||||||
self.ca_verifier = LNChannelVerifier(network, self)
|
self.ca_verifier = LNChannelVerifier(network, self)
|
||||||
# initialized in load_data
|
# initialized in load_data
|
||||||
self._channels = {}
|
self._channels = {} # type: Dict[bytes, ChannelInfo]
|
||||||
self._policies = {}
|
self._policies = {}
|
||||||
self._nodes = {}
|
self._nodes = {}
|
||||||
self._addresses = defaultdict(set)
|
self._addresses = defaultdict(set)
|
||||||
|
@ -320,12 +329,12 @@ class ChannelDB(SqlDB):
|
||||||
if old_policy.channel_flags != new_policy.channel_flags:
|
if old_policy.channel_flags != new_policy.channel_flags:
|
||||||
self.logger.info(f'channel_flags: {old_policy.channel_flags} -> {new_policy.channel_flags}')
|
self.logger.info(f'channel_flags: {old_policy.channel_flags} -> {new_policy.channel_flags}')
|
||||||
|
|
||||||
def add_channel_updates(self, payloads, max_age=None, verify=True):
|
def add_channel_updates(self, payloads, max_age=None, verify=True) -> CategorizedChannelUpdates:
|
||||||
orphaned = [] # no channel announcement for channel update
|
orphaned = []
|
||||||
expired = [] # update older than two weeks
|
expired = []
|
||||||
deprecated = [] # update older than database entry
|
deprecated = []
|
||||||
good = [] # good updates
|
good = []
|
||||||
to_delete = [] # database entries to delete
|
to_delete = []
|
||||||
# filter orphaned and expired first
|
# filter orphaned and expired first
|
||||||
known = []
|
known = []
|
||||||
now = int(time.time())
|
now = int(time.time())
|
||||||
|
@ -333,11 +342,11 @@ class ChannelDB(SqlDB):
|
||||||
short_channel_id = payload['short_channel_id']
|
short_channel_id = payload['short_channel_id']
|
||||||
timestamp = int.from_bytes(payload['timestamp'], "big")
|
timestamp = int.from_bytes(payload['timestamp'], "big")
|
||||||
if max_age and now - timestamp > max_age:
|
if max_age and now - timestamp > max_age:
|
||||||
expired.append(short_channel_id)
|
expired.append(payload)
|
||||||
continue
|
continue
|
||||||
channel_info = self._channels.get(short_channel_id)
|
channel_info = self._channels.get(short_channel_id)
|
||||||
if not channel_info:
|
if not channel_info:
|
||||||
orphaned.append(short_channel_id)
|
orphaned.append(payload)
|
||||||
continue
|
continue
|
||||||
flags = int.from_bytes(payload['channel_flags'], 'big')
|
flags = int.from_bytes(payload['channel_flags'], 'big')
|
||||||
direction = flags & FLAG_DIRECTION
|
direction = flags & FLAG_DIRECTION
|
||||||
|
@ -352,7 +361,7 @@ class ChannelDB(SqlDB):
|
||||||
key = (start_node, short_channel_id)
|
key = (start_node, short_channel_id)
|
||||||
old_policy = self._policies.get(key)
|
old_policy = self._policies.get(key)
|
||||||
if old_policy and timestamp <= old_policy.timestamp:
|
if old_policy and timestamp <= old_policy.timestamp:
|
||||||
deprecated.append(short_channel_id)
|
deprecated.append(payload)
|
||||||
continue
|
continue
|
||||||
good.append(payload)
|
good.append(payload)
|
||||||
if verify:
|
if verify:
|
||||||
|
@ -362,11 +371,17 @@ class ChannelDB(SqlDB):
|
||||||
self.save_policy(policy)
|
self.save_policy(policy)
|
||||||
#
|
#
|
||||||
self.update_counts()
|
self.update_counts()
|
||||||
return orphaned, expired, deprecated, good, to_delete
|
return CategorizedChannelUpdates(
|
||||||
|
orphaned=orphaned,
|
||||||
|
expired=expired,
|
||||||
|
deprecated=deprecated,
|
||||||
|
good=good,
|
||||||
|
to_delete=to_delete,
|
||||||
|
)
|
||||||
|
|
||||||
def add_channel_update(self, payload):
|
def add_channel_update(self, payload):
|
||||||
orphaned, expired, deprecated, good, to_delete = self.add_channel_updates([payload], verify=False)
|
categorized_chan_upds = self.add_channel_updates([payload], verify=False)
|
||||||
assert len(good) == 1
|
assert len(categorized_chan_upds.good) == 1
|
||||||
|
|
||||||
def create_database(self):
|
def create_database(self):
|
||||||
c = self.conn.cursor()
|
c = self.conn.cursor()
|
||||||
|
|
|
@ -266,13 +266,22 @@ class Peer(Logger):
|
||||||
self.channel_db.add_node_announcement(node_anns_chunk)
|
self.channel_db.add_node_announcement(node_anns_chunk)
|
||||||
# channel updates
|
# channel updates
|
||||||
for chan_upds_chunk in chunks(chan_upds, 1000):
|
for chan_upds_chunk in chunks(chan_upds, 1000):
|
||||||
orphaned, expired, deprecated, good, to_delete = self.channel_db.add_channel_updates(
|
categorized_chan_upds = self.channel_db.add_channel_updates(
|
||||||
chan_upds_chunk, max_age=self.network.lngossip.max_age)
|
chan_upds_chunk, max_age=self.network.lngossip.max_age)
|
||||||
|
orphaned = categorized_chan_upds.orphaned
|
||||||
if orphaned:
|
if orphaned:
|
||||||
self.logger.info(f'adding {len(orphaned)} unknown channel ids')
|
self.logger.info(f'adding {len(orphaned)} unknown channel ids')
|
||||||
await self.network.lngossip.add_new_ids(orphaned)
|
await self.network.lngossip.add_new_ids(orphaned)
|
||||||
if good:
|
# Save (some bounded number of) orphan channel updates for later
|
||||||
self.logger.debug(f'on_channel_update: {len(good)}/{len(chan_upds_chunk)}')
|
# as it might be for our own direct channel with this peer
|
||||||
|
# (and we might not yet know the short channel id for that)
|
||||||
|
for chan_upd_payload in orphaned:
|
||||||
|
short_channel_id = chan_upd_payload['short_channel_id']
|
||||||
|
self.orphan_channel_updates[short_channel_id] = chan_upd_payload
|
||||||
|
while len(self.orphan_channel_updates) > 25:
|
||||||
|
self.orphan_channel_updates.popitem(last=False)
|
||||||
|
if categorized_chan_upds.good:
|
||||||
|
self.logger.debug(f'on_channel_update: {len(categorized_chan_upds.good)}/{len(chan_upds_chunk)}')
|
||||||
# refresh gui
|
# refresh gui
|
||||||
if chan_anns or node_anns or chan_upds:
|
if chan_anns or node_anns or chan_upds:
|
||||||
self.network.lngossip.refresh_gui()
|
self.network.lngossip.refresh_gui()
|
||||||
|
@ -1074,18 +1083,18 @@ class Peer(Logger):
|
||||||
channel_update = (258).to_bytes(length=2, byteorder="big") + data[offset+2: offset+2+channel_update_len]
|
channel_update = (258).to_bytes(length=2, byteorder="big") + data[offset+2: offset+2+channel_update_len]
|
||||||
message_type, payload = decode_msg(channel_update)
|
message_type, payload = decode_msg(channel_update)
|
||||||
payload['raw'] = channel_update
|
payload['raw'] = channel_update
|
||||||
orphaned, expired, deprecated, good, to_delete = self.channel_db.add_channel_updates([payload])
|
categorized_chan_upds = self.channel_db.add_channel_updates([payload])
|
||||||
blacklist = False
|
blacklist = False
|
||||||
if good:
|
if categorized_chan_upds.good:
|
||||||
self.logger.info("applied channel update on our db")
|
self.logger.info("applied channel update on our db")
|
||||||
elif orphaned:
|
elif categorized_chan_upds.orphaned:
|
||||||
# maybe it is a private channel (and data in invoice was outdated)
|
# maybe it is a private channel (and data in invoice was outdated)
|
||||||
self.logger.info("maybe channel update is for private channel?")
|
self.logger.info("maybe channel update is for private channel?")
|
||||||
start_node_id = route[sender_idx].node_id
|
start_node_id = route[sender_idx].node_id
|
||||||
self.channel_db.add_channel_update_for_private_channel(payload, start_node_id)
|
self.channel_db.add_channel_update_for_private_channel(payload, start_node_id)
|
||||||
elif expired:
|
elif categorized_chan_upds.expired:
|
||||||
blacklist = True
|
blacklist = True
|
||||||
elif deprecated:
|
elif categorized_chan_upds.deprecated:
|
||||||
self.logger.info(f'channel update is not more recent.')
|
self.logger.info(f'channel update is not more recent.')
|
||||||
blacklist = True
|
blacklist = True
|
||||||
else:
|
else:
|
||||||
|
|
Loading…
Add table
Reference in a new issue