From 75e9123eaf338a1eaa5491c081a8522471106734 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sat, 21 May 2022 15:25:07 -0400 Subject: [PATCH] update the history cache in place instead of clearing/rebuilding --- hub/db/db.py | 4 +- hub/herald/service.py | 3 ++ hub/herald/session.py | 102 +++++++++++++++++++++++++++++------------- 3 files changed, 75 insertions(+), 34 deletions(-) diff --git a/hub/db/db.py b/hub/db/db.py index 669af9a..117501f 100644 --- a/hub/db/db.py +++ b/hub/db/db.py @@ -1173,7 +1173,7 @@ class HubDB: raise DBError(f'only got {len(self.headers) - height:,d} headers starting at {height:,d}, not {count:,d}') return [self.coin.header_hash(header) for header in self.headers[height:height + count]] - def _read_history(self, hashX: bytes, limit: int = 1000) -> List[int]: + def _read_history(self, hashX: bytes, limit: Optional[int] = 1000) -> List[int]: txs = [] txs_extend = txs.extend for hist in self.prefix_db.hashX_history.iterate(prefix=(hashX,), include_key=False): @@ -1182,7 +1182,7 @@ class HubDB: break return txs - async def read_history(self, hashX: bytes, limit: int = 1000) -> List[int]: + async def read_history(self, hashX: bytes, limit: Optional[int] = 1000) -> List[int]: return await asyncio.get_event_loop().run_in_executor(self._executor, self._read_history, hashX, limit) async def limited_history(self, hashX, *, limit=1000): diff --git a/hub/herald/service.py b/hub/herald/service.py index fefa3ee..df7bedb 100644 --- a/hub/herald/service.py +++ b/hub/herald/service.py @@ -46,9 +46,12 @@ class HubServerService(BlockchainReaderService): def advance(self, height: int): super().advance(height) touched_hashXs = self.db.prefix_db.touched_hashX.get(height).touched_hashXs + self.session_manager.update_history_caches(touched_hashXs) self.notifications_to_send.append((set(touched_hashXs), height)) def unwind(self): + self.session_manager.hashX_raw_history_cache.clear() + self.session_manager.hashX_history_cache.clear() prev_count = self.db.tx_counts.pop() tx_count = self.db.tx_counts[-1] self.db.headers.pop() diff --git a/hub/herald/session.py b/hub/herald/session.py index ef0ed51..83745a0 100644 --- a/hub/herald/session.py +++ b/hub/herald/session.py @@ -1,5 +1,5 @@ import os -import ssl +import sys import math import time import codecs @@ -23,7 +23,7 @@ from hub.build_info import BUILD, COMMIT_HASH, DOCKER_TAG from hub.herald.search import SearchIndex from hub.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time, SIZE_BUCKETS from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS -from hub.common import LRUCache, LRUCacheWithMetrics, asyncify_for_loop +from hub.common import LRUCacheWithMetrics from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2, JSONRPC from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification from hub.herald.framer import NewlineFramer @@ -33,6 +33,8 @@ if typing.TYPE_CHECKING: from hub.scribe.daemon import LBCDaemon from hub.herald.mempool import HubMemPool +PYTHON_VERSION = sys.version_info.major, sys.version_info.minor +TypedDict = dict if PYTHON_VERSION < (3, 8) else typing.TypedDict BAD_REQUEST = 1 DAEMON_ERROR = 2 @@ -43,6 +45,11 @@ SignatureInfo = namedtuple('SignatureInfo', 'min_args max_args ' 'required_names other_names') +class CachedAddressHistoryItem(TypedDict): + tx_hash: str + height: int + + def scripthash_to_hashX(scripthash: str) -> bytes: try: bin_hash = hex_str_to_hash(scripthash) @@ -205,16 +212,50 @@ class SessionManager: elastic_host=env.elastic_host, elastic_port=env.elastic_port ) self.running = False - self.hashX_history_cache = LRUCacheWithMetrics(2 ** 14, metric_name='raw_history', namespace=NAMESPACE) - self.hashX_full_cache = LRUCacheWithMetrics(2 ** 12, metric_name='full_history', namespace=NAMESPACE) - self.history_tx_info_cache = LRUCacheWithMetrics(2 ** 18, metric_name='history_tx', namespace=NAMESPACE) + # hashX: List[int] + self.hashX_raw_history_cache = LRUCacheWithMetrics(2 ** 16, metric_name='raw_history', namespace=NAMESPACE) + # hashX: List[CachedAddressHistoryItem] + self.hashX_history_cache = LRUCacheWithMetrics(2 ** 14, metric_name='full_history', namespace=NAMESPACE) + # tx_num: Tuple[txid, height] + self.history_tx_info_cache = LRUCacheWithMetrics(2 ** 19, metric_name='history_tx', namespace=NAMESPACE) def clear_caches(self): - self.hashX_history_cache.clear() - self.hashX_full_cache.clear() self.resolve_outputs_cache.clear() self.resolve_cache.clear() + def update_history_caches(self, touched_hashXs: typing.List[bytes]): + update_history_cache = {} + for hashX in set(touched_hashXs): + history_tx_nums = None + # if the history is the raw_history_cache, update it + # TODO: use a reversed iterator for this instead of rescanning it all + if hashX in self.hashX_raw_history_cache: + self.hashX_raw_history_cache[hashX] = history_tx_nums = self.db._read_history(hashX, None) + # if it's in hashX_history_cache, prepare to update it in a batch + if hashX in self.hashX_history_cache: + full_cached = self.hashX_history_cache[hashX] + if history_tx_nums is None: + history_tx_nums = self.db._read_history(hashX, None) + new_txs = history_tx_nums[len(full_cached):] + update_history_cache[hashX] = full_cached, new_txs + if update_history_cache: + # get the set of new tx nums that were touched in all of the new histories to be cached + total_tx_nums = set() + for _, new_txs in update_history_cache.values(): + total_tx_nums.update(new_txs) + total_tx_nums = list(total_tx_nums) + # collect the total new tx infos + referenced_new_txs = { + tx_num: (CachedAddressHistoryItem(tx_hash=tx_hash[::-1].hex(), height=bisect_right(self.db.tx_counts, tx_num))) + for tx_num, tx_hash in zip(total_tx_nums, self.db._get_tx_hashes(total_tx_nums)) + } + # update the cached history lists + get_referenced = referenced_new_txs.__getitem__ + for hashX, (full, new_txs) in update_history_cache.items(): + append_to_full = full.append + for tx_num in new_txs: + append_to_full(get_referenced(tx_num)) + async def _start_server(self, kind, *args, **kw_args): loop = asyncio.get_event_loop() @@ -603,45 +644,47 @@ class SessionManager: self.txs_sent += 1 return hex_hash - async def limited_history(self, hashX: bytes) -> typing.List[typing.Tuple[str, int]]: - cached_full_history = self.hashX_full_cache.get(hashX) + async def _cached_raw_history(self, hashX: bytes, limit: typing.Optional[int] = None): + tx_nums = self.hashX_raw_history_cache.get(hashX) + if tx_nums is None: + self.hashX_raw_history_cache[hashX] = tx_nums = await self.db.read_history(hashX, limit) + return tx_nums + + async def cached_confirmed_history(self, hashX: bytes, + limit: typing.Optional[int] = None) -> typing.List[CachedAddressHistoryItem]: + cached_full_history = self.hashX_history_cache.get(hashX) + # return the cached history if cached_full_history is not None: self.address_history_size_metric.observe(len(cached_full_history)) return cached_full_history - if hashX not in self.hashX_history_cache: - limit = self.env.max_send // 97 - self.hashX_history_cache[hashX] = tx_nums = await self.db.read_history(hashX, limit) - else: - tx_nums = self.hashX_history_cache[hashX] - self.address_history_size_metric.observe(len(tx_nums)) + # return the history and update the caches + tx_nums = await self._cached_raw_history(hashX, limit) needed_tx_infos = [] append_needed_tx_info = needed_tx_infos.append tx_infos = {} - cnt = 0 - for tx_num in tx_nums: + for cnt, tx_num in enumerate(tx_nums): # determine which tx_hashes are cached and which we need to look up cached = self.history_tx_info_cache.get(tx_num) if cached is not None: tx_infos[tx_num] = cached else: append_needed_tx_info(tx_num) - cnt += 1 if cnt % 1000 == 0: await asyncio.sleep(0) - if needed_tx_infos: - for tx_num, tx_hash in zip(needed_tx_infos, await self.db.get_tx_hashes(needed_tx_infos)): - hist = tx_hash[::-1].hex(), bisect_right(self.db.tx_counts, tx_num) + if needed_tx_infos: # request all the needed tx hashes in one batch, cache the txids and heights + for cnt, (tx_num, tx_hash) in enumerate(zip(needed_tx_infos, await self.db.get_tx_hashes(needed_tx_infos))): + hist = CachedAddressHistoryItem(tx_hash=tx_hash[::-1].hex(), height=bisect_right(self.db.tx_counts, tx_num)) tx_infos[tx_num] = self.history_tx_info_cache[tx_num] = hist - cnt += 1 if cnt % 1000 == 0: await asyncio.sleep(0) + # ensure the ordering of the txs history = [] history_append = history.append - for tx_num in tx_nums: + for cnt, tx_num in enumerate(tx_nums): history_append(tx_infos[tx_num]) - self.hashX_full_cache[hashX] = history - cnt += 1 if cnt % 1000 == 0: await asyncio.sleep(0) + self.hashX_history_cache[hashX] = history + self.address_history_size_metric.observe(len(history)) return history def _notify_peer(self, peer): @@ -1463,13 +1506,8 @@ class LBRYElectrumX(asyncio.Protocol): async def confirmed_and_unconfirmed_history(self, hashX): # Note history is ordered but unconfirmed is unordered in e-s - history = await self.session_manager.limited_history(hashX) - conf = [ - item async for item in asyncify_for_loop( - ({'tx_hash': txid, 'height': height} for txid, height in history), 1000 - ) - ] - return conf + self.unconfirmed_history(hashX) + history = await self.session_manager.cached_confirmed_history(hashX) + return history + self.unconfirmed_history(hashX) async def scripthash_get_history(self, scripthash): """Return the confirmed and unconfirmed history of a scripthash."""