diff --git a/hub/common.py b/hub/common.py index 2216686..10196c2 100644 --- a/hub/common.py +++ b/hub/common.py @@ -7,8 +7,10 @@ import logging import logging.handlers import typing import collections +from bisect import insort_right +from collections import deque from decimal import Decimal -from typing import Iterable +from typing import Iterable, Deque from asyncio import get_event_loop, Event from prometheus_client import Counter from hub.schema.tags import clean_tags @@ -484,6 +486,89 @@ class LFUCacheWithMetrics(LFUCache): self.misses.inc() +class LargestValueCacheItem: + __slots__ = [ + 'key', + 'value', + ] + + def __init__(self, key, value): + self.key = key + self.value = value + + def __gt__(self, other): + return len(self.value) > len(other.value) + + def __ge__(self, other): + return len(self.value) >= len(other.value) + + def __lt__(self, other): + return len(self.value) < len(other.value) + + def __le__(self, other): + return len(self.value) <= len(other.value) + + def __eq__(self, other): + return len(self.value) == len(other.value) + + +class LargestValueCache: + __slots__ = [ + '_capacity', + '_cache', + '_raw_cache' + ] + + def __init__(self, capacity: int): + self._capacity = max(capacity, 0) + self._cache = {} + self._raw_cache: Deque[LargestValueCacheItem] = deque() + + def items(self): + return self._cache.items() + + def get(self, key, default=None): + return self._cache.get(key, default) + + @property + def full(self): + return len(self._cache) >= self._capacity + + def set(self, key, value) -> bool: + if self._capacity == 0: + return False + if self.full: + if len(value) < len(self._raw_cache[0].value): + return False + popped = self._raw_cache.popleft() + self._cache.pop(popped.key) + item = LargestValueCacheItem(key, value) + insort_right(self._raw_cache, item) + self._cache[key] = value + return True + + def clear(self): + self._cache.clear() + self._raw_cache.clear() + + def pop(self, key): + value = self._cache.pop(key) + self._raw_cache.remove(LargestValueCacheItem(key, value)) + return value + + def __setitem__(self, key, value): + return self.set(key, value) + + def __getitem__(self, item): + return self.get(item) + + def __contains__(self, item) -> bool: + return item in self._cache + + def __len__(self): + return len(self._cache) + + # the ipaddress module does not show these subnets as reserved CARRIER_GRADE_NAT_SUBNET = ipaddress.ip_network('100.64.0.0/10') IPV4_TO_6_RELAY_SUBNET = ipaddress.ip_network('192.88.99.0/24') diff --git a/hub/herald/service.py b/hub/herald/service.py index bffe7c9..3a2a71c 100644 --- a/hub/herald/service.py +++ b/hub/herald/service.py @@ -62,6 +62,7 @@ class HubServerService(BlockchainReaderService): def unwind(self): self.session_manager.hashX_raw_history_cache.clear() + self.session_manager.hashX_history_cache.clear() prev_count = self.db.tx_counts.pop() tx_count = self.db.tx_counts[-1] self.db.block_hashes.pop() diff --git a/hub/herald/session.py b/hub/herald/session.py index fb702c2..82947cb 100644 --- a/hub/herald/session.py +++ b/hub/herald/session.py @@ -23,7 +23,7 @@ from hub.build_info import BUILD, COMMIT_HASH, DOCKER_TAG from hub.herald.search import SearchIndex from hub.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time, SIZE_BUCKETS from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS, asyncify_for_loop -from hub.common import LRUCacheWithMetrics, LFUCacheWithMetrics +from hub.common import LRUCacheWithMetrics, LFUCacheWithMetrics, LargestValueCache from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2, JSONRPC from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification from hub.herald.framer import NewlineFramer @@ -215,20 +215,47 @@ class SessionManager: self.running = False # hashX: List[int] self.hashX_raw_history_cache = LFUCacheWithMetrics(env.hashX_history_cache_size, metric_name='raw_history', namespace=NAMESPACE) - # tx_num: CachedAddressHistoryItem + # hashX: List[CachedAddressHistoryItem] + self.hashX_history_cache = LargestValueCache(env.hashX_history_cache_size) + # tx_num: Tuple[txid, height] self.history_tx_info_cache = LFUCacheWithMetrics(env.history_tx_cache_size, metric_name='history_tx', namespace=NAMESPACE) def clear_caches(self): self.resolve_cache.clear() def update_history_caches(self, touched_hashXs: typing.List[bytes]): - # update_history_cache = {} + update_history_cache = {} for hashX in set(touched_hashXs): - # history_tx_nums = None + history_tx_nums = None # if the history is the raw_history_cache, update it # TODO: use a reversed iterator for this instead of rescanning it all if hashX in self.hashX_raw_history_cache: - self.hashX_raw_history_cache[hashX] = self.db._read_history(hashX, None) + self.hashX_raw_history_cache[hashX] = history_tx_nums = self.db._read_history(hashX, None) + # if it's in hashX_history_cache, prepare to update it in a batch + if hashX in self.hashX_history_cache: + full_cached = self.hashX_history_cache[hashX] + if history_tx_nums is None: + history_tx_nums = self.db._read_history(hashX, None) + new_txs = history_tx_nums[len(full_cached):] + update_history_cache[hashX] = full_cached, new_txs + if update_history_cache: + # get the set of new tx nums that were touched in all of the new histories to be cached + total_tx_nums = set() + for _, new_txs in update_history_cache.values(): + total_tx_nums.update(new_txs) + total_tx_nums = list(total_tx_nums) + # collect the total new tx infos + referenced_new_txs = { + tx_num: (CachedAddressHistoryItem( + tx_hash=tx_hash[::-1].hex(), height=bisect_right(self.db.tx_counts, tx_num) + )) for tx_num, tx_hash in zip(total_tx_nums, self.db._get_tx_hashes(total_tx_nums)) + } + # update the cached history lists + get_referenced = referenced_new_txs.__getitem__ + for hashX, (full, new_txs) in update_history_cache.items(): + append_to_full = full.append + for tx_num in new_txs: + append_to_full(get_referenced(tx_num)) async def _start_server(self, kind, *args, **kw_args): loop = asyncio.get_event_loop() @@ -626,6 +653,11 @@ class SessionManager: async def cached_confirmed_history(self, hashX: bytes, limit: typing.Optional[int] = None) -> typing.List[CachedAddressHistoryItem]: + cached_full_history = self.hashX_history_cache.get(hashX) + # return the cached history + if cached_full_history is not None: + self.address_history_size_metric.observe(len(cached_full_history)) + return cached_full_history # return the history and update the caches tx_nums = await self._cached_raw_history(hashX, limit) needed_tx_infos = [] @@ -652,6 +684,7 @@ class SessionManager: history_append(tx_infos[tx_num]) if cnt % 1000 == 0: await asyncio.sleep(0) + self.hashX_history_cache[hashX] = history self.address_history_size_metric.observe(len(history)) return history