mirror of
https://github.com/LBRYFoundation/LBRY-Vault.git
synced 2025-08-23 17:47:31 +00:00
async block headers: avoid duplicate tip fields, handle electrumx server skipping blocks
This commit is contained in:
parent
4d95452ae7
commit
9c363db440
2 changed files with 27 additions and 28 deletions
|
@ -204,30 +204,26 @@ class Interface(PrintError):
|
||||||
@aiosafe
|
@aiosafe
|
||||||
async def run_fetch_blocks(self, sub_reply, replies, conniface):
|
async def run_fetch_blocks(self, sub_reply, replies, conniface):
|
||||||
async with self.network.bhi_lock:
|
async with self.network.bhi_lock:
|
||||||
bhi = BlockHeaderInterface(conniface, sub_reply['height'], self.blockchain.height()+1, self)
|
bhi = BlockHeaderInterface(conniface, self.blockchain.height()+1, self)
|
||||||
await replies.put(blockchain.deserialize_header(bfh(sub_reply['hex']), sub_reply['height']))
|
await replies.put(blockchain.deserialize_header(bfh(sub_reply['hex']), sub_reply['height']))
|
||||||
self.print_error("checking if catched up {}-1 > {}, tip {}".format(sub_reply['height'], self.blockchain.height(), bhi.tip))
|
|
||||||
if sub_reply['height']-1 > self.blockchain.height():
|
|
||||||
last_status = await bhi.sync_until()
|
|
||||||
assert last_status == 'catchup', last_status
|
|
||||||
assert self.blockchain.height()+1 == bhi.height, (self.blockchain.height(), bhi.height)
|
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
self.network.notify('updated')
|
self.network.notify('updated')
|
||||||
item = await replies.get()
|
item = await replies.get()
|
||||||
async with self.network.bhi_lock:
|
async with self.network.bhi_lock:
|
||||||
|
if self.blockchain.height()-1 < item['block_height']:
|
||||||
|
await bhi.sync_until()
|
||||||
if self.blockchain.height() >= bhi.height and self.blockchain.check_header(item):
|
if self.blockchain.height() >= bhi.height and self.blockchain.check_header(item):
|
||||||
# another interface amended the blockchain
|
# another interface amended the blockchain
|
||||||
self.print_error("SKIPPING HEADER", bhi.height)
|
self.print_error("SKIPPING HEADER", bhi.height)
|
||||||
continue
|
continue
|
||||||
if bhi.tip < bhi.height:
|
if self.tip < bhi.height:
|
||||||
bhi.height = bhi.tip
|
bhi.height = self.tip
|
||||||
await bhi.step(item)
|
await bhi.step(item)
|
||||||
bhi.tip = max(bhi.height, bhi.tip)
|
self.tip = max(bhi.height, self.tip)
|
||||||
|
|
||||||
class BlockHeaderInterface(PrintError):
|
class BlockHeaderInterface(PrintError):
|
||||||
def __init__(self, conn, tip, height, iface):
|
def __init__(self, conn, height, iface):
|
||||||
self.tip = tip
|
|
||||||
self.height = height
|
self.height = height
|
||||||
self.conn = conn
|
self.conn = conn
|
||||||
self.iface = iface
|
self.iface = iface
|
||||||
|
@ -237,31 +233,33 @@ class BlockHeaderInterface(PrintError):
|
||||||
|
|
||||||
async def sync_until(self, next_height=None):
|
async def sync_until(self, next_height=None):
|
||||||
if next_height is None:
|
if next_height is None:
|
||||||
next_height = self.tip
|
next_height = self.iface.tip
|
||||||
last = None
|
last = None
|
||||||
while last is None or self.height < next_height:
|
while last is None or self.height < next_height:
|
||||||
if next_height > self.height + 10:
|
if next_height > self.height + 10:
|
||||||
could_connect, num_headers = await self.conn.request_chunk(self.height, next_height)
|
could_connect, num_headers = await self.conn.request_chunk(self.height, next_height)
|
||||||
self.tip = max(self.height + num_headers, self.tip)
|
self.iface.tip = max(self.height + num_headers, self.iface.tip)
|
||||||
if not could_connect:
|
if not could_connect:
|
||||||
if self.height <= self.iface.network.max_checkpoint():
|
if self.height <= self.iface.network.max_checkpoint():
|
||||||
raise Exception('server chain conflicts with checkpoints or genesis')
|
raise Exception('server chain conflicts with checkpoints or genesis')
|
||||||
last = await self.step()
|
last = await self.step()
|
||||||
self.tip = max(self.height, self.tip)
|
self.iface.tip = max(self.height, self.iface.tip)
|
||||||
continue
|
continue
|
||||||
self.height = (self.height // 2016 * 2016) + num_headers
|
self.height = (self.height // 2016 * 2016) + num_headers
|
||||||
if self.height > next_height:
|
if self.height > next_height:
|
||||||
assert False, (self.height, self.tip)
|
assert False, (self.height, self.iface.tip)
|
||||||
last = 'catchup'
|
last = 'catchup'
|
||||||
else:
|
else:
|
||||||
last = await self.step()
|
last = await self.step()
|
||||||
self.tip = max(self.height, self.tip)
|
self.iface.tip = max(self.height, self.iface.tip)
|
||||||
return last
|
return last
|
||||||
|
|
||||||
async def step(self, header=None):
|
async def step(self, header=None):
|
||||||
assert self.height != 0
|
assert self.height != 0
|
||||||
if header is None:
|
if header is None:
|
||||||
header = await self.conn.get_block_header(self.height, 'catchup')
|
header = await self.conn.get_block_header(self.height, 'catchup')
|
||||||
|
chain = self.iface.blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)
|
||||||
|
if chain: return 'catchup'
|
||||||
can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](self)
|
can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](self)
|
||||||
|
|
||||||
bad_header = None
|
bad_header = None
|
||||||
|
@ -284,8 +282,8 @@ class BlockHeaderInterface(PrintError):
|
||||||
while not chain and not can_connect:
|
while not chain and not can_connect:
|
||||||
bad = self.height
|
bad = self.height
|
||||||
bad_header = header
|
bad_header = header
|
||||||
delta = self.tip - self.height
|
delta = self.iface.tip - self.height
|
||||||
next_height = self.tip - 2 * delta
|
next_height = self.iface.tip - 2 * delta
|
||||||
checkp = False
|
checkp = False
|
||||||
if next_height <= self.iface.network.max_checkpoint():
|
if next_height <= self.iface.network.max_checkpoint():
|
||||||
next_height = self.iface.network.max_checkpoint() + 1
|
next_height = self.iface.network.max_checkpoint() + 1
|
||||||
|
@ -305,7 +303,7 @@ class BlockHeaderInterface(PrintError):
|
||||||
if type(can_connect) is bool:
|
if type(can_connect) is bool:
|
||||||
# mock
|
# mock
|
||||||
self.height += 1
|
self.height += 1
|
||||||
if self.height > self.tip:
|
if self.height > self.iface.tip:
|
||||||
assert False
|
assert False
|
||||||
return 'catchup'
|
return 'catchup'
|
||||||
self.iface.blockchain = can_connect
|
self.iface.blockchain = can_connect
|
||||||
|
@ -360,7 +358,7 @@ class BlockHeaderInterface(PrintError):
|
||||||
return 'join'
|
return 'join'
|
||||||
else:
|
else:
|
||||||
if ismocking and branch['parent']['check'](header) or not ismocking and branch.parent().check_header(header):
|
if ismocking and branch['parent']['check'](header) or not ismocking and branch.parent().check_header(header):
|
||||||
self.print_error('reorg', bad, self.tip)
|
self.print_error('reorg', bad, self.iface.tip)
|
||||||
self.iface.blockchain = branch.parent() if not ismocking else branch['parent']
|
self.iface.blockchain = branch.parent() if not ismocking else branch['parent']
|
||||||
self.height = bad
|
self.height = bad
|
||||||
header = await self.conn.get_block_header(self.height, 'binary')
|
header = await self.conn.get_block_header(self.height, 'binary')
|
||||||
|
@ -394,7 +392,7 @@ class BlockHeaderInterface(PrintError):
|
||||||
return 'fork'
|
return 'fork'
|
||||||
else:
|
else:
|
||||||
assert bh == good
|
assert bh == good
|
||||||
if bh < self.tip:
|
if bh < self.iface.tip:
|
||||||
self.print_error("catching up from %d"% (bh + 1))
|
self.print_error("catching up from %d"% (bh + 1))
|
||||||
self.height = bh + 1
|
self.height = bh + 1
|
||||||
return 'no_fork'
|
return 'no_fork'
|
||||||
|
|
|
@ -30,6 +30,7 @@ class TestNetwork(unittest.TestCase):
|
||||||
class FakeIface:
|
class FakeIface:
|
||||||
blockchain = b
|
blockchain = b
|
||||||
network = FakeNetwork
|
network = FakeNetwork
|
||||||
|
tip = 12
|
||||||
return FakeIface
|
return FakeIface
|
||||||
|
|
||||||
def test_new_fork(self):
|
def test_new_fork(self):
|
||||||
|
@ -43,7 +44,7 @@ class TestNetwork(unittest.TestCase):
|
||||||
conn.q.put_nowait({'block_height': 4, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}})
|
conn.q.put_nowait({'block_height': 4, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}})
|
||||||
conn.q.put_nowait({'block_height': 5, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}})
|
conn.q.put_nowait({'block_height': 5, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}})
|
||||||
conn.q.put_nowait({'block_height': 6, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}})
|
conn.q.put_nowait({'block_height': 6, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}})
|
||||||
ifa = BlockHeaderInterface(conn, 12, 8, self.blockchain_iface_pair())
|
ifa = BlockHeaderInterface(conn, 8, self.blockchain_iface_pair())
|
||||||
self.assertEqual('fork', asyncio.get_event_loop().run_until_complete(ifa.sync_until(next_height=8)))
|
self.assertEqual('fork', asyncio.get_event_loop().run_until_complete(ifa.sync_until(next_height=8)))
|
||||||
self.assertEqual(conn.q.qsize(), 0)
|
self.assertEqual(conn.q.qsize(), 0)
|
||||||
|
|
||||||
|
@ -57,7 +58,7 @@ class TestNetwork(unittest.TestCase):
|
||||||
conn.q.put_nowait({'block_height': 2, 'mock': {'backward':1, 'check': lambda x: False, 'connect': mock_connect, 'fork': self.mock_fork}})
|
conn.q.put_nowait({'block_height': 2, 'mock': {'backward':1, 'check': lambda x: False, 'connect': mock_connect, 'fork': self.mock_fork}})
|
||||||
conn.q.put_nowait({'block_height': 3, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}})
|
conn.q.put_nowait({'block_height': 3, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}})
|
||||||
conn.q.put_nowait({'block_height': 4, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}})
|
conn.q.put_nowait({'block_height': 4, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}})
|
||||||
ifa = BlockHeaderInterface(conn, 12, 8, self.blockchain_iface_pair())
|
ifa = BlockHeaderInterface(conn, 8, self.blockchain_iface_pair())
|
||||||
self.assertEqual('catchup', asyncio.get_event_loop().run_until_complete(ifa.sync_until(next_height=5)))
|
self.assertEqual('catchup', asyncio.get_event_loop().run_until_complete(ifa.sync_until(next_height=5)))
|
||||||
self.assertEqual(conn.q.qsize(), 0)
|
self.assertEqual(conn.q.qsize(), 0)
|
||||||
|
|
||||||
|
@ -73,9 +74,9 @@ class TestNetwork(unittest.TestCase):
|
||||||
conn.q.put_nowait({'block_height': 2, 'mock': {'backward':1, 'check': lambda x: True, 'connect': mock_connect}})
|
conn.q.put_nowait({'block_height': 2, 'mock': {'backward':1, 'check': lambda x: True, 'connect': mock_connect}})
|
||||||
conn.q.put_nowait({'block_height': 4, 'mock': {'binary':1, 'check': lambda x: False, 'fork': self.mock_fork, 'connect': mock_connect}})
|
conn.q.put_nowait({'block_height': 4, 'mock': {'binary':1, 'check': lambda x: False, 'fork': self.mock_fork, 'connect': mock_connect}})
|
||||||
conn.q.put_nowait({'block_height': 3, 'mock': {'binary':1, 'check': lambda x: True, 'connect': lambda x: True}})
|
conn.q.put_nowait({'block_height': 3, 'mock': {'binary':1, 'check': lambda x: True, 'connect': lambda x: True}})
|
||||||
conn.q.put_nowait({'block_height': 5, 'mock': {'catchup':1, 'check': lambda x: True, 'connect': lambda x: True}})
|
conn.q.put_nowait({'block_height': 5, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}})
|
||||||
conn.q.put_nowait({'block_height': 6, 'mock': {'catchup':1, 'check': lambda x: True, 'connect': lambda x: True}})
|
conn.q.put_nowait({'block_height': 6, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}})
|
||||||
ifa = BlockHeaderInterface(conn, 12, 8, self.blockchain_iface_pair(1000))
|
ifa = BlockHeaderInterface(conn, 8, self.blockchain_iface_pair(1000))
|
||||||
self.assertEqual('catchup', asyncio.get_event_loop().run_until_complete(ifa.sync_until(next_height=7)))
|
self.assertEqual('catchup', asyncio.get_event_loop().run_until_complete(ifa.sync_until(next_height=7)))
|
||||||
self.assertEqual(conn.q.qsize(), 0)
|
self.assertEqual(conn.q.qsize(), 0)
|
||||||
|
|
||||||
|
@ -88,7 +89,7 @@ class TestNetwork(unittest.TestCase):
|
||||||
conn.q.put_nowait({'block_height': 4, 'mock': {'binary':1, 'check': lambda x: True, 'connect': lambda x: False}})
|
conn.q.put_nowait({'block_height': 4, 'mock': {'binary':1, 'check': lambda x: True, 'connect': lambda x: False}})
|
||||||
conn.q.put_nowait({'block_height': 5, 'mock': {'binary':1, 'check': lambda x: True, 'connect': lambda x: False}})
|
conn.q.put_nowait({'block_height': 5, 'mock': {'binary':1, 'check': lambda x: True, 'connect': lambda x: False}})
|
||||||
conn.q.put_nowait({'block_height': 6, 'mock': {'binary':1, 'check': lambda x: True, 'connect': lambda x: True}})
|
conn.q.put_nowait({'block_height': 6, 'mock': {'binary':1, 'check': lambda x: True, 'connect': lambda x: True}})
|
||||||
ifa = BlockHeaderInterface(conn, 12, 8, self.blockchain_iface_pair())
|
ifa = BlockHeaderInterface(conn, 8, self.blockchain_iface_pair())
|
||||||
self.assertEqual('join', asyncio.get_event_loop().run_until_complete(ifa.sync_until(next_height=7)))
|
self.assertEqual('join', asyncio.get_event_loop().run_until_complete(ifa.sync_until(next_height=7)))
|
||||||
self.assertEqual(conn.q.qsize(), 0)
|
self.assertEqual(conn.q.qsize(), 0)
|
||||||
|
|
||||||
|
@ -108,7 +109,7 @@ class TestNetwork(unittest.TestCase):
|
||||||
conn.q.put_nowait({'block_height': 5, 'mock': {'binary':1, 'check': lambda x: 1, 'connect': lambda x: False}})
|
conn.q.put_nowait({'block_height': 5, 'mock': {'binary':1, 'check': lambda x: 1, 'connect': lambda x: False}})
|
||||||
conn.q.put_nowait({'block_height': 6, 'mock': {'binary':1, 'check': lambda x: 1, 'connect': lambda x: True}})
|
conn.q.put_nowait({'block_height': 6, 'mock': {'binary':1, 'check': lambda x: 1, 'connect': lambda x: True}})
|
||||||
conn.q.put_nowait({'block_height': 7, 'mock': {'binary':1, 'check': lambda x: False, 'connect': lambda x: True}})
|
conn.q.put_nowait({'block_height': 7, 'mock': {'binary':1, 'check': lambda x: False, 'connect': lambda x: True}})
|
||||||
ifa = BlockHeaderInterface(conn, 12, 8, self.blockchain_iface_pair())
|
ifa = BlockHeaderInterface(conn, 8, self.blockchain_iface_pair())
|
||||||
self.assertEqual('join', asyncio.get_event_loop().run_until_complete(ifa.sync_until(next_height=8)))
|
self.assertEqual('join', asyncio.get_event_loop().run_until_complete(ifa.sync_until(next_height=8)))
|
||||||
self.assertEqual(conn.q.qsize(), 0)
|
self.assertEqual(conn.q.qsize(), 0)
|
||||||
self.assertEqual(times, 2)
|
self.assertEqual(times, 2)
|
||||||
|
|
Loading…
Add table
Reference in a new issue