fix last outstanding test failures

This commit is contained in:
Jeffrey Picard 2022-04-11 00:41:50 +00:00
parent cd34564d78
commit 377ecd5f14
4 changed files with 77 additions and 30 deletions

View file

@ -57,7 +57,7 @@ class ElasticNotifierClientProtocol(asyncio.Protocol):
await self.connect() await self.connect()
first_connect = False first_connect = False
synchronized.set() synchronized.set()
log.info("connected to es notifier") log.warning("connected to es notifier: %d", self.port)
except Exception as e: except Exception as e:
if not isinstance(e, asyncio.CancelledError): if not isinstance(e, asyncio.CancelledError):
log.warning("waiting 30s for scribe-elastic-sync notifier to become available (%s:%i)", self.host, self.port) log.warning("waiting 30s for scribe-elastic-sync notifier to become available (%s:%i)", self.host, self.port)
@ -75,13 +75,17 @@ class ElasticNotifierClientProtocol(asyncio.Protocol):
self._lost_connection.clear() self._lost_connection.clear()
def connection_lost(self, exc) -> None: def connection_lost(self, exc) -> None:
log.warning("lost connection to notifier port %d: %s", self.port, exc)
self.transport = None self.transport = None
self._lost_connection.set() self._lost_connection.set()
def data_received(self, data: bytes) -> None: def data_received(self, data: bytes) -> None:
log.warning("received data from notifier port %d: %s", self.port, data)
while data:
chunk, data = data[:40], data[40:]
try: try:
height, block_hash = struct.unpack(b'>Q32s', data) height, block_hash = struct.unpack(b'>Q32s', chunk)
except: except:
log.exception("failed to decode %s", (data or b'').hex()) log.exception("failed to decode %s", (chunk or b'').hex())
raise raise
self.notifications.put_nowait((height, block_hash)) self.notifications.put_nowait((height, block_hash))

View file

@ -38,8 +38,8 @@ class Env:
allow_lan_udp=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, country=None, allow_lan_udp=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, country=None,
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None, payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
session_timeout=None, drop_client=None, description=None, daily_fee=None, session_timeout=None, drop_client=None, description=None, daily_fee=None,
database_query_timeout=None, db_max_open_files=64, elastic_notifier_host=None, database_query_timeout=None, db_max_open_files=64, elastic_notifier_port=None,
elastic_notifier_port=None, blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, go_hub_notifier_port=None, blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None,
peer_announce=None): peer_announce=None):
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY') self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY')
@ -49,8 +49,8 @@ class Env:
self.host = host if host is not None else self.default('HOST', 'localhost') self.host = host if host is not None else self.default('HOST', 'localhost')
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost') self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost')
self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200) self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200)
self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default('ELASTIC_NOTIFIER_HOST', 'localhost')
self.elastic_notifier_port = elastic_notifier_port if elastic_notifier_port is not None else self.integer('ELASTIC_NOTIFIER_PORT', 19080) self.elastic_notifier_port = elastic_notifier_port if elastic_notifier_port is not None else self.integer('ELASTIC_NOTIFIER_PORT', 19080)
self.go_hub_notifier_port = go_hub_notifier_port if go_hub_notifier_port is not None else self.integer('GO_HUB_NOTIFIER_PORT', 18080)
self.loop_policy = self.set_event_loop_policy( self.loop_policy = self.set_event_loop_policy(
loop_policy if loop_policy is not None else self.default('EVENT_LOOP_POLICY', None) loop_policy if loop_policy is not None else self.default('EVENT_LOOP_POLICY', None)

View file

@ -28,9 +28,17 @@ class HubServerService(BlockchainReaderService):
self.es_notification_client = ElasticNotifierClientProtocol( self.es_notification_client = ElasticNotifierClientProtocol(
self.es_notifications, self.env.elastic_notifier_host, self.env.elastic_notifier_port self.es_notifications, self.env.elastic_notifier_host, self.env.elastic_notifier_port
) )
self.go_hub_notifications = asyncio.Queue()
self.go_hub_notifications_client = ElasticNotifierClientProtocol(
self.go_hub_notifications, '127.0.0.1', self.env.go_hub_notifier_port
)
self.es_synchronized = asyncio.Event()
self.go_hub_synchronized = asyncio.Event()
self.synchronized = asyncio.Event() self.synchronized = asyncio.Event()
self._es_height = None self._es_height = None
self._es_block_hash = None self._es_block_hash = None
self._go_hub_height = None
self._go_hub_block_hash = None
def clear_caches(self): def clear_caches(self):
self.session_manager.clear_caches() self.session_manager.clear_caches()
@ -61,7 +69,9 @@ class HubServerService(BlockchainReaderService):
await self.mempool.on_block(touched, height) await self.mempool.on_block(touched, height)
self.log.info("reader advanced to %i", height) self.log.info("reader advanced to %i", height)
if self._es_height == self.db.db_height: if self._es_height == self.db.db_height:
self.synchronized.set() self.es_synchronized.set()
if self._go_hub_height == self.db.db_height:
self.go_hub_synchronized.set()
if self.mempool_notifications: if self.mempool_notifications:
await self.mempool.on_mempool( await self.mempool.on_mempool(
set(self.mempool.touched_hashXs), self.mempool_notifications, self.db.db_height set(self.mempool.touched_hashXs), self.mempool_notifications, self.db.db_height
@ -69,21 +79,63 @@ class HubServerService(BlockchainReaderService):
self.mempool_notifications.clear() self.mempool_notifications.clear()
self.notifications_to_send.clear() self.notifications_to_send.clear()
async def receive_es_notifications(self, synchronized: asyncio.Event): async def receive_es_notifications(self, synchronized: asyncio.Event): ####
synchronized.set() synchronized.set()
try: try:
while True: while True:
self._es_height, self._es_block_hash = await self.es_notifications.get() self._es_height, self._es_block_hash = await self.es_notifications.get()
self.clear_search_cache() self.clear_search_cache()
if self.last_state and self._es_block_hash == self.last_state.tip: if self.last_state and self._es_block_hash == self.last_state.tip:
self.synchronized.set() self.es_synchronized.set()
self.log.info("es and reader are in sync at block %i", self.last_state.height) self.log.info("es and reader are in sync at block %i", self.last_state.height)
else: else:
self.log.info("es and reader are not yet in sync (block %s vs %s)", self._es_height, self.log.info("es and reader are not yet in sync (block %s vs %s)", self._es_height,
self.db.db_height) self.db.db_height)
except Exception as e:
if not isinstance(e, asyncio.CancelledError):
self.log.exception("es notification error: %s", e)
raise e
finally: finally:
self.es_notification_client.close() self.es_notification_client.close()
async def receive_go_hub_notifications(self, synchronized: asyncio.Event): ####
synchronized.set()
try:
while True:
self._go_hub_height, self._go_hub_block_hash = await self.go_hub_notifications.get()
if self.last_state and self._go_hub_block_hash == self.last_state.tip:
self.go_hub_synchronized.set()
self.log.info("go hub and reader are in sync at block %i", self.last_state.height)
else:
self.log.info("go hub and reader are not yet in sync (block %s vs %s)", self._go_hub_height,
self.db.db_height)
except Exception as e:
if not isinstance(e, asyncio.CancelledError):
self.log.exception("go hub notification error: %s", e)
raise e
finally:
self.go_hub_notifications_client.close()
async def wait_for_sync(self, synchronized: asyncio.Event):
synchronized.set()
try:
while True:
es_sync = asyncio.create_task(self.es_synchronized.wait())
go_hub_sync = asyncio.create_task(self.go_hub_synchronized.wait())
await asyncio.wait([es_sync, go_hub_sync], return_when=asyncio.FIRST_COMPLETED)
if not es_sync.done():
es_sync.cancel()
if not go_hub_sync.done():
go_hub_sync.cancel()
if self._go_hub_height == self._es_height == self.db.db_height:
self.synchronized.set()
# self.log.info("reader, hub, and es are in sync at block %i", self.db.db_height)
except Exception as e:
if not isinstance(e, asyncio.CancelledError):
self.log.exception("wait_for_sync error: %s", e)
raise e
async def start_status_server(self): async def start_status_server(self):
if self.env.udp_port and int(self.env.udp_port): if self.env.udp_port and int(self.env.udp_port):
await self.status_server.start( await self.status_server.start(
@ -93,13 +145,16 @@ class HubServerService(BlockchainReaderService):
def _iter_start_tasks(self): def _iter_start_tasks(self):
yield self.start_status_server() yield self.start_status_server()
yield self.start_cancellable(self.es_notification_client.maintain_connection)
yield self.start_cancellable(self.mempool.send_notifications_forever) yield self.start_cancellable(self.mempool.send_notifications_forever)
yield self.start_cancellable(self.es_notification_client.maintain_connection)
yield self.start_cancellable(self.go_hub_notifications_client.maintain_connection)
yield self.start_cancellable(self.refresh_blocks_forever) yield self.start_cancellable(self.refresh_blocks_forever)
yield self.finished_initial_catch_up.wait() yield self.finished_initial_catch_up.wait()
self.block_count_metric.set(self.last_state.height) self.block_count_metric.set(self.last_state.height)
yield self.start_prometheus() yield self.start_prometheus()
yield self.start_cancellable(self.receive_es_notifications) yield self.start_cancellable(self.receive_es_notifications)
yield self.start_cancellable(self.receive_go_hub_notifications)
yield self.start_cancellable(self.wait_for_sync)
yield self.session_manager.search_index.start() yield self.session_manager.search_index.start()
yield self.start_cancellable(self.session_manager.serve, self.mempool) yield self.start_cancellable(self.session_manager.serve, self.mempool)

View file

@ -415,8 +415,6 @@ class ResolveCommand(BaseResolveTestCase):
uri = 'lbry://@abc/on-channel-claim' uri = 'lbry://@abc/on-channel-claim'
# now, claim something on this channel (it will update the invalid claim, but we save and forcefully restore) # now, claim something on this channel (it will update the invalid claim, but we save and forcefully restore)
valid_claim = await self.stream_create('on-channel-claim', '0.00000001', channel_id=channel['claim_id']) valid_claim = await self.stream_create('on-channel-claim', '0.00000001', channel_id=channel['claim_id'])
print("sleeeeeeeeeping.....")
time.sleep(1) # FIXME: don't wait for the claim to be saved
# resolves normally # resolves normally
response = await self.resolve(uri) response = await self.resolve(uri)
self.assertTrue(response['is_channel_signature_valid']) self.assertTrue(response['is_channel_signature_valid'])
@ -1536,6 +1534,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order') self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order')
async def test_reorg(self): async def test_reorg(self):
await asyncio.wait_for(self.on_header(206), 3.0)
self.assertEqual(self.ledger.headers.height, 206) self.assertEqual(self.ledger.headers.height, 206)
channel_name = '@abc' channel_name = '@abc'
@ -1587,8 +1586,6 @@ class ResolveAfterReorg(BaseResolveTestCase):
) )
await self.stream_abandon(stream_id) await self.stream_abandon(stream_id)
print("Sleeeeeeeping to wait for abandoned stream to be removed")
time.sleep(1)
self.assertNotIn('error', await self.resolve(channel_name)) self.assertNotIn('error', await self.resolve(channel_name))
self.assertIn('error', await self.resolve(stream_name)) self.assertIn('error', await self.resolve(stream_name))
self.assertEqual(channel_id, (await self.assertMatchWinningClaim(channel_name)).claim_hash.hex()) self.assertEqual(channel_id, (await self.assertMatchWinningClaim(channel_name)).claim_hash.hex())
@ -1596,16 +1593,12 @@ class ResolveAfterReorg(BaseResolveTestCase):
# TODO: check @abc/foo too # TODO: check @abc/foo too
await self.reorg(206) await self.reorg(206)
print("Sleeeeeeeping to wait for reorg")
time.sleep(1)
self.assertNotIn('error', await self.resolve(channel_name)) self.assertNotIn('error', await self.resolve(channel_name))
self.assertIn('error', await self.resolve(stream_name)) self.assertIn('error', await self.resolve(stream_name))
self.assertEqual(channel_id, (await self.assertMatchWinningClaim(channel_name)).claim_hash.hex()) self.assertEqual(channel_id, (await self.assertMatchWinningClaim(channel_name)).claim_hash.hex())
await self.assertNoClaimForName(stream_name) await self.assertNoClaimForName(stream_name)
await self.channel_abandon(channel_id) await self.channel_abandon(channel_id)
print("Sleeeeeeeping to wait for abandoned channel to be removed")
time.sleep(1)
self.assertIn('error', await self.resolve(channel_name)) self.assertIn('error', await self.resolve(channel_name))
self.assertIn('error', await self.resolve(stream_name)) self.assertIn('error', await self.resolve(stream_name))
await self.reorg(206) await self.reorg(206)
@ -1637,6 +1630,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
await self.assertBlockHash(208) await self.assertBlockHash(208)
claim = await self.resolve('hovercraft') claim = await self.resolve('hovercraft')
self.assertEqual(claim['txid'], broadcast_tx.id) self.assertEqual(claim['txid'], broadcast_tx.id)
self.assertEqual(claim['height'], 208) self.assertEqual(claim['height'], 208)
@ -1644,6 +1638,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
invalidated_block_hash = (await self.ledger.headers.hash(208)).decode() invalidated_block_hash = (await self.ledger.headers.hash(208)).decode()
block_207 = await self.blockchain.get_block(invalidated_block_hash) block_207 = await self.blockchain.get_block(invalidated_block_hash)
self.assertIn(claim['txid'], block_207['tx']) self.assertIn(claim['txid'], block_207['tx'])
await asyncio.wait_for(self.on_header(208), 3.0)
self.assertEqual(208, claim['height']) self.assertEqual(208, claim['height'])
# reorg the last block dropping our claim tx # reorg the last block dropping our claim tx
@ -1653,8 +1648,6 @@ class ResolveAfterReorg(BaseResolveTestCase):
# wait for the client to catch up and verify the reorg # wait for the client to catch up and verify the reorg
await asyncio.wait_for(self.on_header(209), 3.0) await asyncio.wait_for(self.on_header(209), 3.0)
# FIXME: add endpoint in golang to get height and wait for that to hit the right height
time.sleep(1)
await self.assertBlockHash(207) await self.assertBlockHash(207)
await self.assertBlockHash(208) await self.assertBlockHash(208)
await self.assertBlockHash(209) await self.assertBlockHash(209)
@ -1684,8 +1677,6 @@ class ResolveAfterReorg(BaseResolveTestCase):
# wait for the client to catch up # wait for the client to catch up
await asyncio.wait_for(self.on_header(210), 3.0) await asyncio.wait_for(self.on_header(210), 3.0)
# FIXME: add endpoint in golang to get height and wait for that to hit the right height
time.sleep(1)
# verify the claim is in the new block and that it is returned by claim_search # verify the claim is in the new block and that it is returned by claim_search
republished = await self.resolve('hovercraft') republished = await self.resolve('hovercraft')
@ -1719,6 +1710,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
self.assertEqual(self.ledger.headers.height, 208) self.assertEqual(self.ledger.headers.height, 208)
await self.assertBlockHash(208) await self.assertBlockHash(208)
await asyncio.wait_for(self.on_header(208), 30.0)
claim = await self.resolve('hovercraft') claim = await self.resolve('hovercraft')
self.assertEqual(claim['txid'], broadcast_tx.id) self.assertEqual(claim['txid'], broadcast_tx.id)
self.assertEqual(claim['height'], 208) self.assertEqual(claim['height'], 208)
@ -1736,8 +1728,6 @@ class ResolveAfterReorg(BaseResolveTestCase):
# wait for the client to catch up and verify the reorg # wait for the client to catch up and verify the reorg
await asyncio.wait_for(self.on_header(209), 30.0) await asyncio.wait_for(self.on_header(209), 30.0)
# FIXME: add endpoint in golang to get height and wait for that to hit the right height
time.sleep(1)
await self.assertBlockHash(207) await self.assertBlockHash(207)
await self.assertBlockHash(208) await self.assertBlockHash(208)
await self.assertBlockHash(209) await self.assertBlockHash(209)
@ -1767,8 +1757,6 @@ class ResolveAfterReorg(BaseResolveTestCase):
# wait for the client to catch up # wait for the client to catch up
await asyncio.wait_for(self.on_header(210), 1.0) await asyncio.wait_for(self.on_header(210), 1.0)
# FIXME: add endpoint in golang to get height and wait for that to hit the right height
time.sleep(1)
# verify the claim is in the new block and that it is returned by claim_search # verify the claim is in the new block and that it is returned by claim_search
republished = await self.resolve('hovercraft') republished = await self.resolve('hovercraft')