mirror of
https://github.com/LBRYFoundation/LBRY-Vault.git
synced 2025-08-23 17:47:31 +00:00
channel_db: add verbose option to add_channel_update
This commit is contained in:
parent
64733a39dc
commit
2d0ef78a11
2 changed files with 55 additions and 50 deletions
|
@ -32,6 +32,7 @@ import binascii
|
||||||
import base64
|
import base64
|
||||||
import asyncio
|
import asyncio
|
||||||
import threading
|
import threading
|
||||||
|
from enum import IntEnum
|
||||||
|
|
||||||
|
|
||||||
from .sql_db import SqlDB, sql
|
from .sql_db import SqlDB, sql
|
||||||
|
@ -196,13 +197,17 @@ class NodeInfo(NamedTuple):
|
||||||
return addresses
|
return addresses
|
||||||
|
|
||||||
|
|
||||||
|
class UpdateStatus(IntEnum):
|
||||||
|
ORPHANED = 0
|
||||||
|
EXPIRED = 1
|
||||||
|
DEPRECATED = 2
|
||||||
|
GOOD = 3
|
||||||
|
|
||||||
class CategorizedChannelUpdates(NamedTuple):
|
class CategorizedChannelUpdates(NamedTuple):
|
||||||
orphaned: List # no channel announcement for channel update
|
orphaned: List # no channel announcement for channel update
|
||||||
expired: List # update older than two weeks
|
expired: List # update older than two weeks
|
||||||
deprecated: List # update older than database entry
|
deprecated: List # update older than database entry
|
||||||
good: List # good updates
|
good: List # good updates
|
||||||
to_delete: List # database entries to delete
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
create_channel_info = """
|
create_channel_info = """
|
||||||
|
@ -374,62 +379,61 @@ class ChannelDB(SqlDB):
|
||||||
if old_policy.message_flags != new_policy.message_flags:
|
if old_policy.message_flags != new_policy.message_flags:
|
||||||
self.logger.info(f'message_flags: {old_policy.message_flags} -> {new_policy.message_flags}')
|
self.logger.info(f'message_flags: {old_policy.message_flags} -> {new_policy.message_flags}')
|
||||||
|
|
||||||
def add_channel_updates(self, payloads, max_age=None, verify=True) -> CategorizedChannelUpdates:
|
def add_channel_update(self, payload, max_age=None, verify=False, verbose=True):
|
||||||
|
now = int(time.time())
|
||||||
|
short_channel_id = ShortChannelID(payload['short_channel_id'])
|
||||||
|
timestamp = payload['timestamp']
|
||||||
|
if max_age and now - timestamp > max_age:
|
||||||
|
return UpdateStatus.EXPIRED
|
||||||
|
channel_info = self._channels.get(short_channel_id)
|
||||||
|
if not channel_info:
|
||||||
|
return UpdateStatus.ORPHANED
|
||||||
|
flags = int.from_bytes(payload['channel_flags'], 'big')
|
||||||
|
direction = flags & FLAG_DIRECTION
|
||||||
|
start_node = channel_info.node1_id if direction == 0 else channel_info.node2_id
|
||||||
|
payload['start_node'] = start_node
|
||||||
|
# compare updates to existing database entries
|
||||||
|
timestamp = payload['timestamp']
|
||||||
|
start_node = payload['start_node']
|
||||||
|
short_channel_id = ShortChannelID(payload['short_channel_id'])
|
||||||
|
key = (start_node, short_channel_id)
|
||||||
|
old_policy = self._policies.get(key)
|
||||||
|
if old_policy and timestamp <= old_policy.timestamp:
|
||||||
|
return UpdateStatus.DEPRECATED
|
||||||
|
if verify:
|
||||||
|
self.verify_channel_update(payload)
|
||||||
|
policy = Policy.from_msg(payload)
|
||||||
|
with self.lock:
|
||||||
|
self._policies[key] = policy
|
||||||
|
self._update_num_policies_for_chan(short_channel_id)
|
||||||
|
if 'raw' in payload:
|
||||||
|
self._db_save_policy(policy.key, payload['raw'])
|
||||||
|
if old_policy and verbose:
|
||||||
|
self.print_change(old_policy, policy)
|
||||||
|
return UpdateStatus.GOOD
|
||||||
|
|
||||||
|
def add_channel_updates(self, payloads, max_age=None) -> CategorizedChannelUpdates:
|
||||||
orphaned = []
|
orphaned = []
|
||||||
expired = []
|
expired = []
|
||||||
deprecated = []
|
deprecated = []
|
||||||
good = []
|
good = []
|
||||||
to_delete = []
|
|
||||||
# filter orphaned and expired first
|
|
||||||
known = []
|
|
||||||
now = int(time.time())
|
|
||||||
for payload in payloads:
|
for payload in payloads:
|
||||||
short_channel_id = ShortChannelID(payload['short_channel_id'])
|
r = self.add_channel_update(payload, max_age=max_age, verbose=False)
|
||||||
timestamp = payload['timestamp']
|
if r == UpdateStatus.ORPHANED:
|
||||||
if max_age and now - timestamp > max_age:
|
|
||||||
expired.append(payload)
|
|
||||||
continue
|
|
||||||
channel_info = self._channels.get(short_channel_id)
|
|
||||||
if not channel_info:
|
|
||||||
orphaned.append(payload)
|
orphaned.append(payload)
|
||||||
continue
|
elif r == UpdateStatus.EXPIRED:
|
||||||
flags = int.from_bytes(payload['channel_flags'], 'big')
|
expired.append(payload)
|
||||||
direction = flags & FLAG_DIRECTION
|
elif r == UpdateStatus.DEPRECATED:
|
||||||
start_node = channel_info.node1_id if direction == 0 else channel_info.node2_id
|
|
||||||
payload['start_node'] = start_node
|
|
||||||
known.append(payload)
|
|
||||||
# compare updates to existing database entries
|
|
||||||
for payload in known:
|
|
||||||
timestamp = payload['timestamp']
|
|
||||||
start_node = payload['start_node']
|
|
||||||
short_channel_id = ShortChannelID(payload['short_channel_id'])
|
|
||||||
key = (start_node, short_channel_id)
|
|
||||||
old_policy = self._policies.get(key)
|
|
||||||
if old_policy and timestamp <= old_policy.timestamp:
|
|
||||||
deprecated.append(payload)
|
deprecated.append(payload)
|
||||||
continue
|
elif r == UpdateStatus.GOOD:
|
||||||
good.append(payload)
|
good.append(payload)
|
||||||
if verify:
|
|
||||||
self.verify_channel_update(payload)
|
|
||||||
policy = Policy.from_msg(payload)
|
|
||||||
with self.lock:
|
|
||||||
self._policies[key] = policy
|
|
||||||
self._update_num_policies_for_chan(short_channel_id)
|
|
||||||
if 'raw' in payload:
|
|
||||||
self._db_save_policy(policy.key, payload['raw'])
|
|
||||||
#
|
|
||||||
self.update_counts()
|
self.update_counts()
|
||||||
return CategorizedChannelUpdates(
|
return CategorizedChannelUpdates(
|
||||||
orphaned=orphaned,
|
orphaned=orphaned,
|
||||||
expired=expired,
|
expired=expired,
|
||||||
deprecated=deprecated,
|
deprecated=deprecated,
|
||||||
good=good,
|
good=good)
|
||||||
to_delete=to_delete,
|
|
||||||
)
|
|
||||||
|
|
||||||
def add_channel_update(self, payload):
|
|
||||||
# called from tests
|
|
||||||
self.add_channel_updates([payload], verify=False)
|
|
||||||
|
|
||||||
def create_database(self):
|
def create_database(self):
|
||||||
c = self.conn.cursor()
|
c = self.conn.cursor()
|
||||||
|
|
|
@ -67,6 +67,7 @@ from .lnwatcher import LNWalletWatcher
|
||||||
from .crypto import pw_encode_bytes, pw_decode_bytes, PW_HASH_VERSION_LATEST
|
from .crypto import pw_encode_bytes, pw_decode_bytes, PW_HASH_VERSION_LATEST
|
||||||
from .lnutil import ChannelBackupStorage
|
from .lnutil import ChannelBackupStorage
|
||||||
from .lnchannel import ChannelBackup
|
from .lnchannel import ChannelBackup
|
||||||
|
from .channel_db import UpdateStatus
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from .network import Network
|
from .network import Network
|
||||||
|
@ -930,20 +931,20 @@ class LNWallet(LNWorker):
|
||||||
if payload['chain_hash'] != constants.net.rev_genesis_bytes():
|
if payload['chain_hash'] != constants.net.rev_genesis_bytes():
|
||||||
self.logger.info(f'could not decode channel_update for failed htlc: {channel_update_as_received.hex()}')
|
self.logger.info(f'could not decode channel_update for failed htlc: {channel_update_as_received.hex()}')
|
||||||
return True
|
return True
|
||||||
categorized_chan_upds = self.channel_db.add_channel_updates([payload])
|
r = self.channel_db.add_channel_update(payload)
|
||||||
blacklist = False
|
blacklist = False
|
||||||
short_channel_id = ShortChannelID(payload['short_channel_id'])
|
short_channel_id = ShortChannelID(payload['short_channel_id'])
|
||||||
if categorized_chan_upds.good:
|
if r == UpdateStatus.GOOD:
|
||||||
self.logger.info(f"applied channel update to {short_channel_id}")
|
self.logger.info(f"applied channel update to {short_channel_id}")
|
||||||
peer.maybe_save_remote_update(payload)
|
peer.maybe_save_remote_update(payload)
|
||||||
elif categorized_chan_upds.orphaned:
|
elif r == UpdateStatus.ORPHANED:
|
||||||
# maybe it is a private channel (and data in invoice was outdated)
|
# maybe it is a private channel (and data in invoice was outdated)
|
||||||
self.logger.info(f"Could not find {short_channel_id}. maybe update is for private channel?")
|
self.logger.info(f"Could not find {short_channel_id}. maybe update is for private channel?")
|
||||||
start_node_id = route[sender_idx].node_id
|
start_node_id = route[sender_idx].node_id
|
||||||
self.channel_db.add_channel_update_for_private_channel(payload, start_node_id)
|
self.channel_db.add_channel_update_for_private_channel(payload, start_node_id)
|
||||||
elif categorized_chan_upds.expired:
|
elif r == UpdateStatus.EXPIRED:
|
||||||
blacklist = True
|
blacklist = True
|
||||||
elif categorized_chan_upds.deprecated:
|
elif r == UpdateStatus.DEPRECATED:
|
||||||
self.logger.info(f'channel update is not more recent.')
|
self.logger.info(f'channel update is not more recent.')
|
||||||
blacklist = True
|
blacklist = True
|
||||||
else:
|
else:
|
||||||
|
|
Loading…
Add table
Reference in a new issue