diff --git a/hub/herald/mempool.py b/hub/herald/mempool.py index 26e1ad7..67400d1 100644 --- a/hub/herald/mempool.py +++ b/hub/herald/mempool.py @@ -242,7 +242,7 @@ class HubMemPool: (self.session_manager.hsub_results[session.subscribe_headers_raw],)) ) if hashXes: - asyncio.create_task(session.send_history_notifications(*hashXes)) + asyncio.create_task(session.send_history_notifications(hashXes)) async def _notify_sessions(self, height, touched, new_touched): """Notify sessions about height changes and touched addresses.""" diff --git a/hub/herald/session.py b/hub/herald/session.py index 79d3f59..4c6ce1d 100644 --- a/hub/herald/session.py +++ b/hub/herald/session.py @@ -22,7 +22,7 @@ from hub.herald import PROTOCOL_MIN, PROTOCOL_MAX, HUB_PROTOCOL_VERSION from hub.build_info import BUILD, COMMIT_HASH, DOCKER_TAG from hub.herald.search import SearchIndex from hub.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time, SIZE_BUCKETS -from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS +from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS, asyncify_for_loop from hub.common import LRUCacheWithMetrics from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2, JSONRPC from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification @@ -1181,33 +1181,39 @@ class LBRYElectrumX(asyncio.Protocol): status = sha256(history.encode()) return status.hex() - async def send_history_notifications(self, *hashXes: typing.Iterable[bytes]): + async def get_hashX_statuses(self, hashXes: typing.List[bytes]): + if self.env.index_address_status: + return await self.db.get_hashX_statuses(hashXes) + return [await self.get_hashX_status(hashX) for hashX in hashXes] + + async def send_history_notifications(self, hashXes: typing.List[bytes]): notifications = [] - for hashX in hashXes: + start = time.perf_counter() + statuses = await self.get_hashX_statuses(hashXes) + duration = time.perf_counter() - start + self.session_manager.address_history_metric.observe(duration) + start = time.perf_counter() + scripthash_notifications = 0 + address_notifications = 0 + for hashX, status in zip(hashXes, statuses): alias = self.hashX_subs[hashX] if len(alias) == 64: method = 'blockchain.scripthash.subscribe' + scripthash_notifications += 1 else: method = 'blockchain.address.subscribe' - start = time.perf_counter() - status = await self.get_hashX_status(hashX) - duration = time.perf_counter() - start - self.session_manager.address_history_metric.observe(duration) - notifications.append((method, (alias, status))) - if duration > 30: - self.logger.warning("slow history notification (%s) for '%s'", duration, alias) - - start = time.perf_counter() - self.session_manager.notifications_in_flight_metric.inc() - for method, args in notifications: - self.NOTIFICATION_COUNT.labels(method=method,).inc() + address_notifications += 1 + notifications.append(Notification(method, (alias, status))) + if scripthash_notifications: + self.NOTIFICATION_COUNT.labels(method='blockchain.scripthash.subscribe',).inc(scripthash_notifications) + if address_notifications: + self.NOTIFICATION_COUNT.labels(method='blockchain.address.subscribe', ).inc(address_notifications) + self.session_manager.notifications_in_flight_metric.inc(len(notifications)) try: - await self.send_notifications( - Batch([Notification(method, (alias, status)) for (method, (alias, status)) in notifications]) - ) + await self.send_notifications(Batch(notifications)) self.session_manager.notifications_sent_metric.observe(time.perf_counter() - start) finally: - self.session_manager.notifications_in_flight_metric.dec() + self.session_manager.notifications_in_flight_metric.dec(len(notifications)) # def get_metrics_or_placeholder_for_api(self, query_name): # """ Do not hold on to a reference to the metrics @@ -1470,12 +1476,13 @@ class LBRYElectrumX(asyncio.Protocol): address: the address to subscribe to""" if len(addresses) > 1000: raise RPCError(BAD_REQUEST, f'too many addresses in subscription request: {len(addresses)}') - results = [] + hashXes = [item async for item in asyncify_for_loop((self.address_to_hashX(address) for address in addresses), 100)] + statuses = await self.get_hashX_statuses(hashXes) + for hashX, alias in zip(hashXes, addresses): + self.hashX_subs[hashX] = alias + self.session_manager.hashx_subscriptions_by_session[hashX].add(id(self)) self.session_manager.address_subscription_metric.inc(len(addresses)) - for address in addresses: - results.append(await self.hashX_subscribe(self.address_to_hashX(address), address)) - await asyncio.sleep(0) - return results + return statuses async def address_unsubscribe(self, address): """Unsubscribe an address.