From e5713dc63c35ef8fc2622b4a1ba0fe709fa4406e Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 19 May 2022 12:53:49 -0400 Subject: [PATCH] improve caching for `blockchain.address.get_history` --- hub/db/db.py | 26 +++++++++++++++++----- hub/herald/service.py | 18 +++++++++++++++ hub/herald/session.py | 51 +++++++++++++++++++++++++++++++------------ hub/scribe/service.py | 6 ++--- 4 files changed, 79 insertions(+), 22 deletions(-) diff --git a/hub/db/db.py b/hub/db/db.py index 8a05cf7..669af9a 100644 --- a/hub/db/db.py +++ b/hub/db/db.py @@ -959,11 +959,25 @@ class HubDB: return self.total_transactions[tx_num] return self.prefix_db.tx_hash.get(tx_num, deserialize_value=False) - def get_tx_hashes(self, tx_nums: List[int]) -> List[Optional[bytes]]: + def _get_tx_hashes(self, tx_nums: List[int]) -> List[Optional[bytes]]: if self._cache_all_tx_hashes: return [None if tx_num > self.db_tx_count else self.total_transactions[tx_num] for tx_num in tx_nums] return self.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in tx_nums], deserialize_value=False) + async def get_tx_hashes(self, tx_nums: List[int]) -> List[Optional[bytes]]: + if self._cache_all_tx_hashes: + result = [] + append_result = result.append + for tx_num in tx_nums: + append_result(None if tx_num > self.db_tx_count else self.total_transactions[tx_num]) + await asyncio.sleep(0) + return result + + def _get_tx_hashes(): + return self.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in tx_nums], deserialize_value=False) + + return await asyncio.get_event_loop().run_in_executor(self._executor, _get_tx_hashes) + def get_raw_mempool_tx(self, tx_hash: bytes) -> Optional[bytes]: return self.prefix_db.mempool_tx.get(tx_hash, deserialize_value=False) @@ -1159,7 +1173,7 @@ class HubDB: raise DBError(f'only got {len(self.headers) - height:,d} headers starting at {height:,d}, not {count:,d}') return [self.coin.header_hash(header) for header in self.headers[height:height + count]] - def read_history(self, hashX: bytes, limit: int = 1000) -> List[int]: + def _read_history(self, hashX: bytes, limit: int = 1000) -> List[int]: txs = [] txs_extend = txs.extend for hist in self.prefix_db.hashX_history.iterate(prefix=(hashX,), include_key=False): @@ -1168,6 +1182,9 @@ class HubDB: break return txs + async def read_history(self, hashX: bytes, limit: int = 1000) -> List[int]: + return await asyncio.get_event_loop().run_in_executor(self._executor, self._read_history, hashX, limit) + async def limited_history(self, hashX, *, limit=1000): """Return an unpruned, sorted list of (tx_hash, height) tuples of confirmed transactions that touched the address, earliest in @@ -1176,13 +1193,12 @@ class HubDB: limit to None to get them all. """ run_in_executor = asyncio.get_event_loop().run_in_executor - tx_nums = await run_in_executor(self._executor, self.read_history, hashX, limit) + tx_nums = await run_in_executor(self._executor, self._read_history, hashX, limit) history = [] append_history = history.append while tx_nums: batch, tx_nums = tx_nums[:100], tx_nums[100:] - batch_result = self.get_tx_hashes(batch) if self._cache_all_tx_hashes else await run_in_executor(self._executor, self.get_tx_hashes, batch) - for tx_num, tx_hash in zip(batch, batch_result): + for tx_num, tx_hash in zip(batch, await self.get_tx_hashes(batch)): append_history((tx_hash, bisect_right(self.tx_counts, tx_num))) await asyncio.sleep(0) return history diff --git a/hub/herald/service.py b/hub/herald/service.py index 64f9415..fefa3ee 100644 --- a/hub/herald/service.py +++ b/hub/herald/service.py @@ -48,6 +48,24 @@ class HubServerService(BlockchainReaderService): touched_hashXs = self.db.prefix_db.touched_hashX.get(height).touched_hashXs self.notifications_to_send.append((set(touched_hashXs), height)) + def unwind(self): + prev_count = self.db.tx_counts.pop() + tx_count = self.db.tx_counts[-1] + self.db.headers.pop() + self.db.block_hashes.pop() + current_count = prev_count + for _ in range(prev_count - tx_count): + if current_count in self.session_manager.history_tx_info_cache: + self.session_manager.history_tx_info_cache.pop(current_count) + current_count -= 1 + if self.db._cache_all_tx_hashes: + for _ in range(prev_count - tx_count): + tx_hash = self.db.tx_num_mapping.pop(self.db.total_transactions.pop()) + if tx_hash in self.db.tx_cache: + self.db.tx_cache.pop(tx_hash) + assert len(self.db.total_transactions) == tx_count, f"{len(self.db.total_transactions)} vs {tx_count}" + self.db.merkle_cache.clear() + def _detect_changes(self): super()._detect_changes() start = time.perf_counter() diff --git a/hub/herald/session.py b/hub/herald/session.py index 4907c87..d69a12a 100644 --- a/hub/herald/session.py +++ b/hub/herald/session.py @@ -22,7 +22,7 @@ from hub.herald import PROTOCOL_MIN, PROTOCOL_MAX, HUB_PROTOCOL_VERSION from hub.build_info import BUILD, COMMIT_HASH, DOCKER_TAG from hub.herald.search import SearchIndex from hub.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time -from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS +from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS, LRUCache from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2, JSONRPC from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification from hub.herald.framer import NewlineFramer @@ -183,7 +183,6 @@ class SessionManager: self.cur_group = SessionGroup(0) self.txs_sent = 0 self.start_time = time.time() - self.history_cache = {} self.resolve_outputs_cache = {} self.resolve_cache = {} self.notified_height: typing.Optional[int] = None @@ -198,9 +197,13 @@ class SessionManager: elastic_host=env.elastic_host, elastic_port=env.elastic_port ) self.running = False + self.hashX_history_cache = LRUCache(2 ** 14) + self.hashX_full_cache = LRUCache(2 ** 12) + self.history_tx_info_cache = LRUCache(2 ** 17) def clear_caches(self): - self.history_cache.clear() + self.hashX_history_cache.clear() + self.hashX_full_cache.clear() self.resolve_outputs_cache.clear() self.resolve_cache.clear() @@ -592,16 +595,36 @@ class SessionManager: self.txs_sent += 1 return hex_hash - async def limited_history(self, hashX): - """A caching layer.""" - if hashX not in self.history_cache: - # History DoS limit. Each element of history is about 99 - # bytes when encoded as JSON. This limits resource usage - # on bloated history requests, and uses a smaller divisor - # so large requests are logged before refusing them. + async def limited_history(self, hashX: bytes) -> typing.List[typing.Tuple[str, int]]: + if hashX in self.hashX_full_cache: + return self.hashX_full_cache[hashX] + if hashX not in self.hashX_history_cache: limit = self.env.max_send // 97 - self.history_cache[hashX] = await self.db.limited_history(hashX, limit=limit) - return self.history_cache[hashX] + self.hashX_history_cache[hashX] = tx_nums = await self.db.read_history(hashX, limit) + else: + tx_nums = self.hashX_history_cache[hashX] + needed_tx_infos = [] + append_needed_tx_info = needed_tx_infos.append + tx_infos = {} + for tx_num in tx_nums: + if tx_num in self.history_tx_info_cache: + tx_infos[tx_num] = self.history_tx_info_cache[tx_num] + else: + append_needed_tx_info(tx_num) + await asyncio.sleep(0) + if needed_tx_infos: + + for tx_num, tx_hash in zip(needed_tx_infos, await self.db.get_tx_hashes(needed_tx_infos)): + hist = tx_hash[::-1].hex(), bisect_right(self.db.tx_counts, tx_num) + tx_infos[tx_num] = self.history_tx_info_cache[tx_num] = hist + await asyncio.sleep(0) + history = [] + history_append = history.append + for tx_num in tx_nums: + history_append(tx_infos[tx_num]) + await asyncio.sleep(0) + self.hashX_full_cache[hashX] = history + return history def _notify_peer(self, peer): notify_tasks = [ @@ -1419,8 +1442,8 @@ class LBRYElectrumX(asyncio.Protocol): async def confirmed_and_unconfirmed_history(self, hashX): # Note history is ordered but unconfirmed is unordered in e-s history = await self.session_manager.limited_history(hashX) - conf = [{'tx_hash': hash_to_hex_str(tx_hash), 'height': height} - for tx_hash, height in history] + conf = [{'tx_hash': txid, 'height': height} + for txid, height in history] return conf + self.unconfirmed_history(hashX) async def scripthash_get_history(self, scripthash): diff --git a/hub/scribe/service.py b/hub/scribe/service.py index b22451c..74bd73c 100644 --- a/hub/scribe/service.py +++ b/hub/scribe/service.py @@ -1245,7 +1245,7 @@ class BlockchainProcessorService(BlockchainService): if hashX in self.hashX_full_cache: return self.hashX_full_cache[hashX] if hashX not in self.hashX_history_cache: - self.hashX_history_cache[hashX] = tx_nums = self.db.read_history(hashX, limit=None) + self.hashX_history_cache[hashX] = tx_nums = self.db._read_history(hashX, limit=None) else: tx_nums = self.hashX_history_cache[hashX] needed_tx_infos = [] @@ -1257,7 +1257,7 @@ class BlockchainProcessorService(BlockchainService): else: append_needed_tx_info(tx_num) if needed_tx_infos: - for tx_num, tx_hash in zip(needed_tx_infos, self.db.get_tx_hashes(needed_tx_infos)): + for tx_num, tx_hash in zip(needed_tx_infos, self.db._get_tx_hashes(needed_tx_infos)): tx_infos[tx_num] = self.history_tx_info_cache[tx_num] = f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num):d}:' history = '' @@ -1487,7 +1487,7 @@ class BlockchainProcessorService(BlockchainService): else: append_needed_tx_info(tx_num) if needed_tx_infos: - for tx_num, tx_hash in zip(needed_tx_infos, self.db.get_tx_hashes(needed_tx_infos)): + for tx_num, tx_hash in zip(needed_tx_infos, self.db._get_tx_hashes(needed_tx_infos)): tx_info = f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num):d}:' tx_infos[tx_num] = tx_info self.history_tx_info_cache[tx_num] = tx_info