diff --git a/electrum/blockchain.py b/electrum/blockchain.py index c4c557826..e0ab5d162 100644 --- a/electrum/blockchain.py +++ b/electrum/blockchain.py @@ -308,7 +308,7 @@ class Blockchain(util.PrintError): elif height == 0: return constants.net.GENESIS elif height < len(self.checkpoints) * 2016: - assert (height+1) % 2016 == 0, height + assert (height+1) % 2016 == 0, (height, len(self.checkpoints), (height+1) % 2016) index = height // 2016 h, t = self.checkpoints[index] return h diff --git a/electrum/interface.py b/electrum/interface.py index 1db430fe7..23119333b 100644 --- a/electrum/interface.py +++ b/electrum/interface.py @@ -87,7 +87,7 @@ class Interface(PrintError): return False return True - @util.aiosafe + @aiosafe async def run(self): if self.protocol != 's': await self.open_session(None, exit_early=False) @@ -183,11 +183,15 @@ class Interface(PrintError): self.tip = subscription_res['height'] self.mark_ready() self.session = session + copy_header_queue = asyncio.Queue() + conniface = Conn(self.server, session, lambda idx, tip: self.network.request_chunk(idx, tip, session)) + block_retriever = asyncio.get_event_loop().create_task(self.run_fetch_blocks(subscription_res, copy_header_queue, conniface)) while True: try: new_header = await asyncio.wait_for(header_queue.get(), 300) self.tip_header = new_header self.tip = new_header['block_height'] + await copy_header_queue.put(new_header) except concurrent.futures.TimeoutError: await asyncio.wait_for(session.send_request('server.ping'), 5) @@ -197,6 +201,212 @@ class Interface(PrintError): def close(self): self.fut.cancel() + @aiosafe + async def run_fetch_blocks(self, sub_reply, replies, conniface): + async with self.network.bhi_lock: + bhi = BlockHeaderInterface(conniface, sub_reply['height'], self.blockchain.height()+1, self) + await replies.put(blockchain.deserialize_header(bfh(sub_reply['hex']), sub_reply['height'])) + self.print_error("checking if catched up {}-1 > {}, tip {}".format(sub_reply['height'], self.blockchain.height(), bhi.tip)) + if sub_reply['height']-1 > self.blockchain.height(): + last_status = await bhi.sync_until() + assert last_status == 'catchup', last_status + assert self.blockchain.height()+1 == bhi.height, (self.blockchain.height(), bhi.height) + + while True: + self.network.notify('updated') + item = await replies.get() + async with self.network.bhi_lock: + if self.blockchain.height() >= bhi.height and self.blockchain.check_header(item): + # another interface amended the blockchain + self.print_error("SKIPPING HEADER", bhi.height) + continue + if bhi.tip < bhi.height: + bhi.height = bhi.tip + await bhi.step(item) + bhi.tip = max(bhi.height, bhi.tip) + +class BlockHeaderInterface(PrintError): + def __init__(self, conn, tip, height, iface): + self.tip = tip + self.height = height + self.conn = conn + self.iface = iface + + def diagnostic_name(self): + return self.conn.server + + async def sync_until(self, next_height=None): + if next_height is None: + next_height = self.tip + last = None + while last is None or self.height < next_height: + if next_height > self.height + 10: + could_connect, num_headers = await self.conn.request_chunk(self.height, next_height) + self.tip = max(self.height + num_headers, self.tip) + if not could_connect: + last = await self.step() + self.tip = max(self.height, self.tip) + continue + self.height = (self.height // 2016 * 2016) + num_headers + if self.height > next_height: + assert False, (self.height, self.tip) + last = 'catchup' + else: + last = await self.step() + self.tip = max(self.height, self.tip) + return last + + async def step(self, header=None): + assert self.height != 0 + if header is None: + header = await self.conn.get_block_header(self.height, 'catchup') + can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](self) + + bad_header = None + if not can_connect: + self.print_error("can't connect", self.height) + #backward + bad = self.height + bad_header = header + self.height -= 1 + checkp = False + if self.height <= self.iface.network.max_checkpoint(): + self.height = self.iface.network.max_checkpoint() + 1 + checkp = True + + header = await self.conn.get_block_header(self.height, 'backward') + chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) + can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](self) + if checkp: + assert can_connect or chain, (can_connect, chain) + while not chain and not can_connect: + bad = self.height + bad_header = header + delta = self.tip - self.height + next_height = self.tip - 2 * delta + checkp = False + if next_height <= self.iface.network.max_checkpoint(): + next_height = self.iface.network.max_checkpoint() + 1 + checkp = True + self.height = next_height + + header = await self.conn.get_block_header(self.height, 'backward') + chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) + can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](self) + if checkp: + assert can_connect or chain, (can_connect, chain) + self.print_error("exiting backward mode at", self.height) + if can_connect: + self.print_error("could connect", self.height) + chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) + + if type(can_connect) is bool: + # mock + self.height += 1 + if self.height > self.tip: + assert False + return 'catchup' + self.iface.blockchain = can_connect + self.height += 1 + self.iface.blockchain.save_header(header) + return 'catchup' + + if not chain: + raise Exception("not chain") # line 931 in 8e69174374aee87d73cd2f8005fbbe87c93eee9c's network.py + + # binary + if type(chain) in [int, bool]: + pass # mock + else: + self.iface.blockchain = chain + good = self.height + self.height = (bad + good) // 2 + header = await self.conn.get_block_header(self.height, 'binary') + while True: + self.print_error("binary step") + chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) + if chain: + assert bad != self.height, (bad, self.height) + good = self.height + self.iface.blockchain = self.iface.blockchain if type(chain) in [bool, int] else chain + else: + bad = self.height + assert good != self.height + bad_header = header + if bad != good + 1: + self.height = (bad + good) // 2 + header = await self.conn.get_block_header(self.height, 'binary') + continue + mock = bad_header and 'mock' in bad_header and bad_header['mock']['connect'](self) + real = not mock and self.iface.blockchain.can_connect(bad_header, check_height=False) + if not real and not mock: + raise Exception('unexpected bad header during binary' + str(bad_header)) # line 948 in 8e69174374aee87d73cd2f8005fbbe87c93eee9c's network.py + branch = blockchain.blockchains.get(bad) + if branch is not None: + ismocking = False + if type(branch) is dict: + ismocking = True + # FIXME: it does not seem sufficient to check that the branch + # contains the bad_header. what if self.blockchain doesn't? + # the chains shouldn't be joined then. observe the incorrect + # joining on regtest with a server that has a fork of height + # one. the problem is observed only if forking is not during + # electrum runtime + if ismocking and branch['check'](bad_header) or not ismocking and branch.check_header(bad_header): + self.print_error('joining chain', bad) + self.height += 1 + return 'join' + else: + if ismocking and branch['parent']['check'](header) or not ismocking and branch.parent().check_header(header): + self.print_error('reorg', bad, self.tip) + self.iface.blockchain = branch.parent() if not ismocking else branch['parent'] + self.height = bad + header = await self.conn.get_block_header(self.height, 'binary') + else: + if ismocking: + self.height = bad + 1 + self.print_error("TODO replace blockchain") + return 'conflict' + self.print_error('forkpoint conflicts with existing fork', branch.path()) + branch.write(b'', 0) + branch.save_header(bad_header) + self.iface.blockchain = branch + self.height = bad + 1 + return 'conflict' + else: + bh = self.iface.blockchain.height() + if bh > good: + forkfun = self.iface.blockchain.fork + if 'mock' in bad_header: + chain = bad_header['mock']['check'](bad_header) + forkfun = bad_header['mock']['fork'] if 'fork' in bad_header['mock'] else forkfun + else: + chain = self.iface.blockchain.check_header(bad_header) + if not chain: + b = forkfun(bad_header) + assert bad not in blockchain.blockchains, (bad, list(blockchain.blockchains.keys())) + blockchain.blockchains[bad] = b + self.iface.blockchain = b + self.height = b.forkpoint + 1 + assert b.forkpoint == bad + return 'fork' + else: + assert bh == good + if bh < self.tip: + self.print_error("catching up from %d"% (bh + 1)) + self.height = bh + 1 + return 'no_fork' + +class Conn: + def __init__(self, server, session, get_chunk): + self.server = server + self.session = session # type: aiorpcx.ClientSession + self.request_chunk = get_chunk + async def get_block_header(self, height, assert_mode): + res = await asyncio.wait_for(self.session.send_request('blockchain.block.header', [height]), 1) + return blockchain.deserialize_header(bytes.fromhex(res), height) + + def check_cert(host, cert): try: b = pem.dePem(cert, 'CERTIFICATE') diff --git a/electrum/network.py b/electrum/network.py index 1d5055f69..92378d9fa 100644 --- a/electrum/network.py +++ b/electrum/network.py @@ -37,7 +37,7 @@ import dns import dns.resolver from . import util -from .util import PrintError, print_error, bfh +from .util import PrintError, print_error, aiosafe, bfh from .bitcoin import COIN from . import constants from . import blockchain @@ -533,7 +533,12 @@ class Network(PrintError): if self.server_is_lagging() and self.auto_connect: # switch to one that has the correct header (not height) header = self.blockchain().read_header(self.get_local_height()) - filtered = list(map(lambda x: x[0], filter(lambda x: x[1].tip_header == header, self.interfaces.items()))) + def filt(x): + a = x[1].tip_header + b = header + assert type(a) is type(b) + return a == b + filtered = list(map(lambda x: x[0], filter(filt, self.interfaces.items()))) if filtered: choice = random.choice(filtered) self.switch_to_interface(choice) @@ -668,7 +673,7 @@ class Network(PrintError): if b.catch_up == server: b.catch_up = None - @util.aiosafe + @aiosafe async def new_interface(self, server): # todo: get tip first, then decide which checkpoint to use. self.add_recent_server(server) @@ -677,8 +682,8 @@ class Network(PrintError): try: await asyncio.wait_for(interface.ready, 5) except BaseException as e: - import traceback - traceback.print_exc() + #import traceback + #traceback.print_exc() self.print_error(interface.server, "couldn't launch because", str(e), str(type(e))) self.connection_down(interface.server) return @@ -713,6 +718,29 @@ class Network(PrintError): except concurrent.futures.CancelledError: pass + async def get_merkle_for_transaction(self, tx_hash, tx_height): + print("getting merkle for transaction", tx_hash, tx_height) + return await self.interface.session.send_request('blockchain.transaction.get_merkle', [tx_hash, tx_height]) + + def broadcast_transaction(self, tx): + fut = asyncio.run_coroutine_threadsafe(self.interface.session.send_request('blockchain.transaction.broadcast', [str(tx)]), self.asyncio_loop) + return True, fut.result(1) + + async def request_chunk(self, height, tip, session=None): + if session is None: session = self.interface.session + index = height // 2016 + size = 2016 + if tip is not None and height + 2016 >= tip: + size = tip - height + #if index * 2016 < height: + # size = height - index * 2016 + res = await session.send_request('blockchain.block.headers', [index * 2016, size]) + conn = self.blockchain().connect_chunk(index, res['hex']) + if not conn: + return conn, 0 + self.blockchain().save_chunk(index, bfh(res['hex'])) + return conn, res['count'] + @with_interface_lock def blockchain(self): if self.interface and self.interface.blockchain is not None: diff --git a/electrum/tests/test_interface.py b/electrum/tests/test_interface.py deleted file mode 100644 index 402588ca8..000000000 --- a/electrum/tests/test_interface.py +++ /dev/null @@ -1,28 +0,0 @@ -import unittest - -from electrum import interface - -from . import SequentialTestCase - - -class TestInterface(SequentialTestCase): - - def test_match_host_name(self): - self.assertTrue(interface._match_hostname('asd.fgh.com', 'asd.fgh.com')) - self.assertFalse(interface._match_hostname('asd.fgh.com', 'asd.zxc.com')) - self.assertTrue(interface._match_hostname('asd.fgh.com', '*.fgh.com')) - self.assertFalse(interface._match_hostname('asd.fgh.com', '*fgh.com')) - self.assertFalse(interface._match_hostname('asd.fgh.com', '*.zxc.com')) - - def test_check_host_name(self): - i = interface.TcpConnection(server=':1:', queue=None, config_path=None) - - self.assertFalse(i.check_host_name(None, None)) - self.assertFalse(i.check_host_name( - peercert={'subjectAltName': []}, name='')) - self.assertTrue(i.check_host_name( - peercert={'subjectAltName': [('DNS', 'foo.bar.com')]}, - name='foo.bar.com')) - self.assertTrue(i.check_host_name( - peercert={'subject': [('commonName', 'foo.bar.com')]}, - name='foo.bar.com')) diff --git a/electrum/tests/test_network.py b/electrum/tests/test_network.py new file mode 100644 index 000000000..8181951de --- /dev/null +++ b/electrum/tests/test_network.py @@ -0,0 +1,118 @@ +import asyncio +import tempfile +import unittest + +from electrum.constants import set_regtest +from electrum.simple_config import SimpleConfig +from electrum import blockchain +from electrum.interface import BlockHeaderInterface + +class MockConnection: + def __init__(self): + self.q = asyncio.Queue() + self.server = 'mock-server' + async def get_block_header(self, height, assert_mode): + assert self.q.qsize() > 0, (height, assert_mode) + item = await self.q.get() + print("step with height", height, item) + assert item['block_height'] == height, (item['block_height'], height) + assert assert_mode in item['mock'], (assert_mode, item) + return item + +class TestNetwork(unittest.TestCase): + def setUp(self): + self.config = SimpleConfig({'electrum_path': tempfile.mkdtemp(prefix="test_network")}) + + def blockchain_iface_pair(self, forkpoint=2002): + b = blockchain.Blockchain(self.config, forkpoint, None) + class FakeNetwork: + max_checkpoint = lambda: 0 + class FakeIface: + blockchain = b + network = FakeNetwork + return FakeIface + + def test_new_fork(self): + blockchain.blockchains = {} + conn = MockConnection() + conn.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}}) + def mock_connect(block_header_iface): + return block_header_iface.height == 6 + conn.q.put_nowait({'block_height': 7, 'mock': {'backward':1,'check': lambda x: False, 'connect': mock_connect, 'fork': self.mock_fork}}) + conn.q.put_nowait({'block_height': 2, 'mock': {'backward':1,'check':lambda x: True, 'connect': lambda x: False}}) + conn.q.put_nowait({'block_height': 4, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}}) + conn.q.put_nowait({'block_height': 5, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}}) + conn.q.put_nowait({'block_height': 6, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}}) + ifa = BlockHeaderInterface(conn, 12, 8, self.blockchain_iface_pair()) + self.assertEqual('fork', asyncio.get_event_loop().run_until_complete(ifa.sync_until(next_height=8))) + self.assertEqual(conn.q.qsize(), 0) + + def test_new_can_connect_during_backward(self): + blockchain.blockchains = {} + conn = MockConnection() + conn.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}}) + def mock_connect(block_header_iface): + return block_header_iface.height == 2 + conn.q.put_nowait({'block_height': 7, 'mock': {'backward':1, 'check': lambda x: False, 'connect': mock_connect, 'fork': self.mock_fork}}) + conn.q.put_nowait({'block_height': 2, 'mock': {'backward':1, 'check': lambda x: False, 'connect': mock_connect, 'fork': self.mock_fork}}) + conn.q.put_nowait({'block_height': 3, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}}) + conn.q.put_nowait({'block_height': 4, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}}) + ifa = BlockHeaderInterface(conn, 12, 8, self.blockchain_iface_pair()) + self.assertEqual('catchup', asyncio.get_event_loop().run_until_complete(ifa.sync_until(next_height=5))) + self.assertEqual(conn.q.qsize(), 0) + + def mock_fork(self, bad_header): + return blockchain.Blockchain(self.config, bad_header['block_height'], None) + + def test_new_chain_false_during_binary(self): + blockchain.blockchains = {} + conn = MockConnection() + conn.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}}) + mock_connect = lambda bhi: bhi.height == 3 + conn.q.put_nowait({'block_height': 7, 'mock': {'backward':1, 'check': lambda x: False, 'connect': mock_connect}}) + conn.q.put_nowait({'block_height': 2, 'mock': {'backward':1, 'check': lambda x: True, 'connect': mock_connect}}) + conn.q.put_nowait({'block_height': 4, 'mock': {'binary':1, 'check': lambda x: False, 'fork': self.mock_fork, 'connect': mock_connect}}) + conn.q.put_nowait({'block_height': 3, 'mock': {'binary':1, 'check': lambda x: True, 'connect': lambda x: True}}) + conn.q.put_nowait({'block_height': 5, 'mock': {'catchup':1, 'check': lambda x: True, 'connect': lambda x: True}}) + conn.q.put_nowait({'block_height': 6, 'mock': {'catchup':1, 'check': lambda x: True, 'connect': lambda x: True}}) + ifa = BlockHeaderInterface(conn, 12, 8, self.blockchain_iface_pair(1000)) + self.assertEqual('catchup', asyncio.get_event_loop().run_until_complete(ifa.sync_until(next_height=7))) + self.assertEqual(conn.q.qsize(), 0) + + def test_new_join(self): + blockchain.blockchains = {7: {'check': lambda bad_header: True}} + conn = MockConnection() + conn.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}}) + conn.q.put_nowait({'block_height': 7, 'mock': {'backward':1, 'check': lambda x: False, 'connect': lambda x: x.height == 6}}) + conn.q.put_nowait({'block_height': 2, 'mock': {'backward':1, 'check': lambda x: True, 'connect': lambda x: False}}) + conn.q.put_nowait({'block_height': 4, 'mock': {'binary':1, 'check': lambda x: True, 'connect': lambda x: False}}) + conn.q.put_nowait({'block_height': 5, 'mock': {'binary':1, 'check': lambda x: True, 'connect': lambda x: False}}) + conn.q.put_nowait({'block_height': 6, 'mock': {'binary':1, 'check': lambda x: True, 'connect': lambda x: True}}) + ifa = BlockHeaderInterface(conn, 12, 8, self.blockchain_iface_pair()) + self.assertEqual('join', asyncio.get_event_loop().run_until_complete(ifa.sync_until(next_height=7))) + self.assertEqual(conn.q.qsize(), 0) + + def test_new_reorg(self): + times = 0 + def check(header): + nonlocal times + self.assertEqual(header['block_height'], 7) + times += 1 + return times != 1 + blockchain.blockchains = {7: {'check': check, 'parent': {'check': lambda x: True}}} + conn = MockConnection() + conn.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}}) + conn.q.put_nowait({'block_height': 7, 'mock': {'backward':1, 'check': lambda x: False, 'connect': lambda x: x.height == 6}}) + conn.q.put_nowait({'block_height': 2, 'mock': {'backward':1, 'check': lambda x: 1, 'connect': lambda x: False}}) + conn.q.put_nowait({'block_height': 4, 'mock': {'binary':1, 'check': lambda x: 1, 'connect': lambda x: False}}) + conn.q.put_nowait({'block_height': 5, 'mock': {'binary':1, 'check': lambda x: 1, 'connect': lambda x: False}}) + conn.q.put_nowait({'block_height': 6, 'mock': {'binary':1, 'check': lambda x: 1, 'connect': lambda x: True}}) + conn.q.put_nowait({'block_height': 7, 'mock': {'binary':1, 'check': lambda x: False, 'connect': lambda x: True}}) + ifa = BlockHeaderInterface(conn, 12, 8, self.blockchain_iface_pair()) + self.assertEqual('join', asyncio.get_event_loop().run_until_complete(ifa.sync_until(next_height=8))) + self.assertEqual(conn.q.qsize(), 0) + self.assertEqual(times, 2) + +if __name__=="__main__": + set_regtest() + unittest.main() diff --git a/electrum/verifier.py b/electrum/verifier.py index f64e7fa47..61642364e 100644 --- a/electrum/verifier.py +++ b/electrum/verifier.py @@ -70,8 +70,7 @@ class SPV(ThreadJob): if header is None: index = tx_height // 2016 if index < len(blockchain.checkpoints): - # FIXME disabled until async block header download has been merged - pass #await self.network.request_chunk(tx_height, None) + await self.network.request_chunk(tx_height, None) elif (tx_hash not in self.requested_merkle and tx_hash not in self.merkle_roots): self.print_error('requested merkle', tx_hash)