mirror of
https://github.com/LBRYFoundation/LBRY-Vault.git
synced 2025-08-23 17:47:31 +00:00
verify node and channel announcements before entering sql lock
This commit is contained in:
parent
fbafc77f01
commit
e68f318b12
3 changed files with 49 additions and 35 deletions
|
@ -69,6 +69,11 @@ class Peer(Logger):
|
||||||
self.channel_db = lnworker.network.channel_db
|
self.channel_db = lnworker.network.channel_db
|
||||||
self.ping_time = 0
|
self.ping_time = 0
|
||||||
self.reply_channel_range = asyncio.Queue()
|
self.reply_channel_range = asyncio.Queue()
|
||||||
|
# gossip message queues
|
||||||
|
self.channel_announcements = asyncio.Queue()
|
||||||
|
self.channel_updates = asyncio.Queue()
|
||||||
|
self.node_announcements = asyncio.Queue()
|
||||||
|
# channel messsage queues
|
||||||
self.shutdown_received = defaultdict(asyncio.Future)
|
self.shutdown_received = defaultdict(asyncio.Future)
|
||||||
self.channel_accepted = defaultdict(asyncio.Queue)
|
self.channel_accepted = defaultdict(asyncio.Queue)
|
||||||
self.channel_reestablished = defaultdict(asyncio.Future)
|
self.channel_reestablished = defaultdict(asyncio.Future)
|
||||||
|
@ -77,6 +82,7 @@ class Peer(Logger):
|
||||||
self.announcement_signatures = defaultdict(asyncio.Queue)
|
self.announcement_signatures = defaultdict(asyncio.Queue)
|
||||||
self.closing_signed = defaultdict(asyncio.Queue)
|
self.closing_signed = defaultdict(asyncio.Queue)
|
||||||
self.payment_preimages = defaultdict(asyncio.Queue)
|
self.payment_preimages = defaultdict(asyncio.Queue)
|
||||||
|
#
|
||||||
self.attempted_route = {}
|
self.attempted_route = {}
|
||||||
self.orphan_channel_updates = OrderedDict()
|
self.orphan_channel_updates = OrderedDict()
|
||||||
self.sent_commitment_for_ctn_last = defaultdict(lambda: None) # type: Dict[Channel, Optional[int]]
|
self.sent_commitment_for_ctn_last = defaultdict(lambda: None) # type: Dict[Channel, Optional[int]]
|
||||||
|
@ -115,7 +121,7 @@ class Peer(Logger):
|
||||||
#self.logger.info("Received '%s'" % message_type.upper(), payload)
|
#self.logger.info("Received '%s'" % message_type.upper(), payload)
|
||||||
return
|
return
|
||||||
# raw message is needed to check signature
|
# raw message is needed to check signature
|
||||||
if message_type in ['node_announcement', 'channel_update']:
|
if message_type in ['node_announcement', 'channel_announcement', 'channel_update']:
|
||||||
payload['raw'] = message
|
payload['raw'] = message
|
||||||
execution_result = f(payload)
|
execution_result = f(payload)
|
||||||
if asyncio.iscoroutinefunction(f):
|
if asyncio.iscoroutinefunction(f):
|
||||||
|
@ -175,13 +181,13 @@ class Peer(Logger):
|
||||||
self.initialized.set()
|
self.initialized.set()
|
||||||
|
|
||||||
def on_node_announcement(self, payload):
|
def on_node_announcement(self, payload):
|
||||||
self.channel_db.node_anns.append(payload)
|
self.node_announcements.put_nowait(payload)
|
||||||
|
|
||||||
def on_channel_update(self, payload):
|
|
||||||
self.channel_db.chan_upds.append(payload)
|
|
||||||
|
|
||||||
def on_channel_announcement(self, payload):
|
def on_channel_announcement(self, payload):
|
||||||
self.channel_db.chan_anns.append(payload)
|
self.channel_announcements.put_nowait(payload)
|
||||||
|
|
||||||
|
def on_channel_update(self, payload):
|
||||||
|
self.channel_updates.put_nowait(payload)
|
||||||
|
|
||||||
def on_announcement_signatures(self, payload):
|
def on_announcement_signatures(self, payload):
|
||||||
channel_id = payload['channel_id']
|
channel_id = payload['channel_id']
|
||||||
|
@ -207,10 +213,39 @@ class Peer(Logger):
|
||||||
async with aiorpcx.TaskGroup() as group:
|
async with aiorpcx.TaskGroup() as group:
|
||||||
await group.spawn(self._message_loop())
|
await group.spawn(self._message_loop())
|
||||||
await group.spawn(self._run_gossip())
|
await group.spawn(self._run_gossip())
|
||||||
|
await group.spawn(self.verify_node_announcements())
|
||||||
|
await group.spawn(self.verify_channel_announcements())
|
||||||
|
await group.spawn(self.verify_channel_updates())
|
||||||
|
|
||||||
|
async def verify_node_announcements(self):
|
||||||
|
while True:
|
||||||
|
payload = await self.node_announcements.get()
|
||||||
|
pubkey = payload['node_id']
|
||||||
|
signature = payload['signature']
|
||||||
|
h = sha256d(payload['raw'][66:])
|
||||||
|
if not ecc.verify_signature(pubkey, signature, h):
|
||||||
|
raise Exception('signature failed')
|
||||||
|
self.channel_db.node_anns.append(payload)
|
||||||
|
|
||||||
|
async def verify_channel_announcements(self):
|
||||||
|
while True:
|
||||||
|
payload = await self.channel_announcements.get()
|
||||||
|
h = sha256d(payload['raw'][2+256:])
|
||||||
|
pubkeys = [payload['node_id_1'], payload['node_id_2'], payload['bitcoin_key_1'], payload['bitcoin_key_2']]
|
||||||
|
sigs = [payload['node_signature_1'], payload['node_signature_2'], payload['bitcoin_signature_1'], payload['bitcoin_signature_2']]
|
||||||
|
for pubkey, sig in zip(pubkeys, sigs):
|
||||||
|
if not ecc.verify_signature(pubkey, sig, h):
|
||||||
|
raise Exception('signature failed')
|
||||||
|
self.channel_db.chan_anns.append(payload)
|
||||||
|
|
||||||
|
async def verify_channel_updates(self):
|
||||||
|
while True:
|
||||||
|
payload = await self.channel_updates.get()
|
||||||
|
self.channel_db.chan_upds.append(payload)
|
||||||
|
|
||||||
@log_exceptions
|
@log_exceptions
|
||||||
async def _run_gossip(self):
|
async def _run_gossip(self):
|
||||||
await asyncio.wait_for(self.initialized.wait(), 5)
|
await asyncio.wait_for(self.initialized.wait(), 10)
|
||||||
if self.lnworker == self.lnworker.network.lngossip:
|
if self.lnworker == self.lnworker.network.lngossip:
|
||||||
ids, complete = await asyncio.wait_for(self.get_channel_range(), 10)
|
ids, complete = await asyncio.wait_for(self.get_channel_range(), 10)
|
||||||
self.logger.info('Received {} channel ids. (complete: {})'.format(len(ids), complete))
|
self.logger.info('Received {} channel ids. (complete: {})'.format(len(ids), complete))
|
||||||
|
|
|
@ -228,15 +228,15 @@ class ChannelDB(SqlDB):
|
||||||
self.chan_upds = []
|
self.chan_upds = []
|
||||||
|
|
||||||
def process_gossip(self):
|
def process_gossip(self):
|
||||||
if self.node_anns:
|
|
||||||
self.on_node_announcement(self.node_anns)
|
|
||||||
self.node_anns = []
|
|
||||||
if self.chan_anns:
|
if self.chan_anns:
|
||||||
self.on_channel_announcement(self.chan_anns)
|
self.on_channel_announcement(self.chan_anns)
|
||||||
self.chan_anns = []
|
self.chan_anns = []
|
||||||
if self.chan_upds:
|
if self.chan_upds:
|
||||||
self.on_channel_update(self.chan_upds)
|
self.on_channel_update(self.chan_upds)
|
||||||
self.chan_upds = []
|
self.chan_upds = []
|
||||||
|
if self.node_anns:
|
||||||
|
self.on_node_announcement(self.node_anns)
|
||||||
|
self.node_anns = []
|
||||||
|
|
||||||
@sql
|
@sql
|
||||||
def update_counts(self):
|
def update_counts(self):
|
||||||
|
@ -333,7 +333,7 @@ class ChannelDB(SqlDB):
|
||||||
self.DBSession.commit()
|
self.DBSession.commit()
|
||||||
|
|
||||||
@sql
|
@sql
|
||||||
#@profiler
|
@profiler
|
||||||
def on_channel_announcement(self, msg_payloads, trusted=True):
|
def on_channel_announcement(self, msg_payloads, trusted=True):
|
||||||
if type(msg_payloads) is dict:
|
if type(msg_payloads) is dict:
|
||||||
msg_payloads = [msg_payloads]
|
msg_payloads = [msg_payloads]
|
||||||
|
@ -370,7 +370,7 @@ class ChannelDB(SqlDB):
|
||||||
return r.max_timestamp or 0
|
return r.max_timestamp or 0
|
||||||
|
|
||||||
@sql
|
@sql
|
||||||
#@profiler
|
@profiler
|
||||||
def on_channel_update(self, msg_payloads, trusted=False):
|
def on_channel_update(self, msg_payloads, trusted=False):
|
||||||
if type(msg_payloads) is dict:
|
if type(msg_payloads) is dict:
|
||||||
msg_payloads = [msg_payloads]
|
msg_payloads = [msg_payloads]
|
||||||
|
@ -414,7 +414,7 @@ class ChannelDB(SqlDB):
|
||||||
self._update_counts()
|
self._update_counts()
|
||||||
|
|
||||||
@sql
|
@sql
|
||||||
#@profiler
|
@profiler
|
||||||
def on_node_announcement(self, msg_payloads):
|
def on_node_announcement(self, msg_payloads):
|
||||||
if type(msg_payloads) is dict:
|
if type(msg_payloads) is dict:
|
||||||
msg_payloads = [msg_payloads]
|
msg_payloads = [msg_payloads]
|
||||||
|
@ -422,16 +422,10 @@ class ChannelDB(SqlDB):
|
||||||
new_nodes = {}
|
new_nodes = {}
|
||||||
new_addresses = {}
|
new_addresses = {}
|
||||||
for msg_payload in msg_payloads:
|
for msg_payload in msg_payloads:
|
||||||
pubkey = msg_payload['node_id']
|
|
||||||
signature = msg_payload['signature']
|
|
||||||
h = sha256d(msg_payload['raw'][66:])
|
|
||||||
if not ecc.verify_signature(pubkey, signature, h):
|
|
||||||
continue
|
|
||||||
try:
|
try:
|
||||||
node_info, node_addresses = NodeInfo.from_msg(msg_payload)
|
node_info, node_addresses = NodeInfo.from_msg(msg_payload)
|
||||||
except UnknownEvenFeatureBits:
|
except UnknownEvenFeatureBits:
|
||||||
continue
|
continue
|
||||||
#self.logger.info(f'received node announcement from {datetime.fromtimestamp(node_info.timestamp).ctime()}')
|
|
||||||
node_id = node_info.node_id
|
node_id = node_info.node_id
|
||||||
# Ignore node if it has no associated channel (DoS protection)
|
# Ignore node if it has no associated channel (DoS protection)
|
||||||
expr = or_(ChannelInfo.node1_id==node_id, ChannelInfo.node2_id==node_id)
|
expr = or_(ChannelInfo.node1_id==node_id, ChannelInfo.node2_id==node_id)
|
||||||
|
@ -447,7 +441,7 @@ class ChannelDB(SqlDB):
|
||||||
new_nodes[node_id] = node_info
|
new_nodes[node_id] = node_info
|
||||||
for addr in node_addresses:
|
for addr in node_addresses:
|
||||||
new_addresses[(addr.node_id,addr.host,addr.port)] = addr
|
new_addresses[(addr.node_id,addr.host,addr.port)] = addr
|
||||||
#self.logger.info("on_node_announcement: %d/%d"%(len(new_nodes), len(msg_payloads)))
|
self.logger.info("on_node_announcement: %d/%d"%(len(new_nodes), len(msg_payloads)))
|
||||||
for node_info in new_nodes.values():
|
for node_info in new_nodes.values():
|
||||||
self.DBSession.add(node_info)
|
self.DBSession.add(node_info)
|
||||||
for new_addr in new_addresses.values():
|
for new_addr in new_addresses.values():
|
||||||
|
|
|
@ -71,8 +71,6 @@ class LNChannelVerifier(NetworkJobOnDefaultServer):
|
||||||
return
|
return
|
||||||
if short_channel_id in self.blacklist:
|
if short_channel_id in self.blacklist:
|
||||||
return
|
return
|
||||||
if not verify_sigs_for_channel_announcement(msg_payload):
|
|
||||||
return
|
|
||||||
with self.lock:
|
with self.lock:
|
||||||
self.unverified_channel_info[short_channel_id] = msg_payload
|
self.unverified_channel_info[short_channel_id] = msg_payload
|
||||||
|
|
||||||
|
@ -180,19 +178,6 @@ class LNChannelVerifier(NetworkJobOnDefaultServer):
|
||||||
self.unverified_channel_info.pop(short_channel_id, None)
|
self.unverified_channel_info.pop(short_channel_id, None)
|
||||||
|
|
||||||
|
|
||||||
def verify_sigs_for_channel_announcement(msg_bytes: bytes) -> bool:
|
|
||||||
msg_type, chan_ann = decode_msg(msg_bytes)
|
|
||||||
assert msg_type == 'channel_announcement'
|
|
||||||
pre_hash = msg_bytes[2+256:]
|
|
||||||
h = sha256d(pre_hash)
|
|
||||||
pubkeys = [chan_ann['node_id_1'], chan_ann['node_id_2'], chan_ann['bitcoin_key_1'], chan_ann['bitcoin_key_2']]
|
|
||||||
sigs = [chan_ann['node_signature_1'], chan_ann['node_signature_2'], chan_ann['bitcoin_signature_1'], chan_ann['bitcoin_signature_2']]
|
|
||||||
for pubkey, sig in zip(pubkeys, sigs):
|
|
||||||
if not ecc.verify_signature(pubkey, sig, h):
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
def verify_sig_for_channel_update(chan_upd: dict, node_id: bytes) -> bool:
|
def verify_sig_for_channel_update(chan_upd: dict, node_id: bytes) -> bool:
|
||||||
msg_bytes = chan_upd['raw']
|
msg_bytes = chan_upd['raw']
|
||||||
pre_hash = msg_bytes[2+64:]
|
pre_hash = msg_bytes[2+64:]
|
||||||
|
|
Loading…
Add table
Reference in a new issue