From 8147bbf3b939627a74001c0dfdd1d29b59297b29 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 12 Jan 2023 13:20:50 -0500 Subject: [PATCH] remove `--cache_all_claim_txos` setting --- hub/db/db.py | 29 +---------------------------- hub/elastic_sync/db.py | 4 ++-- hub/elastic_sync/env.py | 6 +++--- hub/elastic_sync/service.py | 2 +- hub/env.py | 8 +------- hub/herald/db.py | 4 ++-- hub/herald/env.py | 6 +++--- hub/herald/service.py | 2 +- hub/scribe/db.py | 4 ++-- hub/scribe/env.py | 7 +++---- hub/scribe/service.py | 23 ++++------------------- hub/service.py | 2 +- 12 files changed, 24 insertions(+), 73 deletions(-) diff --git a/hub/db/db.py b/hub/db/db.py index a6903a5..e3e5e40 100644 --- a/hub/db/db.py +++ b/hub/db/db.py @@ -37,7 +37,7 @@ class SecondaryDB: DB_VERSIONS = [7, 8, 9, 10, 11, 12] def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200, - cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, + cache_all_tx_hashes: bool = False, blocking_channel_ids: List[str] = None, filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None, index_address_status=False, merkle_cache_size=32768, tx_cache_size=32768, @@ -47,7 +47,6 @@ class SecondaryDB: self._executor = executor self._db_dir = db_dir self._reorg_limit = reorg_limit - self._cache_all_claim_txos = cache_all_claim_txos self._cache_all_tx_hashes = cache_all_tx_hashes self._secondary_name = secondary_name if secondary_name: @@ -100,9 +99,6 @@ class SecondaryDB: self.total_transactions: List[bytes] = [] self.tx_num_mapping: Dict[bytes, int] = {} - # these are only used if the cache_all_claim_txos setting is on - self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {} - self.txo_to_claim: DefaultDict[int, Dict[int, bytes]] = defaultdict(dict) self.genesis_bytes = bytes.fromhex(self.coin.GENESIS_HASH) def get_claim_from_txo(self, tx_num: int, tx_idx: int) -> Optional[TXOToClaimValue]: @@ -956,21 +952,6 @@ class SecondaryDB: else: assert self.db_tx_count == 0 - async def _read_claim_txos(self): - def read_claim_txos(): - set_claim_to_txo = self.claim_to_txo.__setitem__ - for k, v in self.prefix_db.claim_to_txo.iterate(fill_cache=False): - set_claim_to_txo(k.claim_hash, v) - self.txo_to_claim[v.tx_num][v.position] = k.claim_hash - - self.claim_to_txo.clear() - self.txo_to_claim.clear() - start = time.perf_counter() - self.logger.info("loading claims") - await asyncio.get_event_loop().run_in_executor(self._executor, read_claim_txos) - ts = time.perf_counter() - start - self.logger.info("loaded %i claim txos in %ss", len(self.claim_to_txo), round(ts, 4)) - # async def _read_headers(self): # # if self.headers is not None: # # return @@ -1063,8 +1044,6 @@ class SecondaryDB: async def initialize_caches(self): await self._read_tx_counts() await self._read_block_hashes() - if self._cache_all_claim_txos: - await self._read_claim_txos() if self._cache_all_tx_hashes: await self._read_tx_hashes() if self.db_height > 0: @@ -1154,15 +1133,9 @@ class SecondaryDB: } def get_cached_claim_txo(self, claim_hash: bytes) -> Optional[ClaimToTXOValue]: - if self._cache_all_claim_txos: - return self.claim_to_txo.get(claim_hash) return self.prefix_db.claim_to_txo.get_pending(claim_hash) def get_cached_claim_hash(self, tx_num: int, position: int) -> Optional[bytes]: - if self._cache_all_claim_txos: - if tx_num not in self.txo_to_claim: - return - return self.txo_to_claim[tx_num].get(position, None) v = self.prefix_db.txo_to_claim.get_pending(tx_num, position) return None if not v else v.claim_hash diff --git a/hub/elastic_sync/db.py b/hub/elastic_sync/db.py index 5fa5856..17b71ea 100644 --- a/hub/elastic_sync/db.py +++ b/hub/elastic_sync/db.py @@ -9,11 +9,11 @@ from hub.db.common import ResolveResult class ElasticSyncDB(SecondaryDB): def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200, - cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, + cache_all_tx_hashes: bool = False, blocking_channel_ids: List[str] = None, filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None, index_address_status=False): - super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit, cache_all_claim_txos, + super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit, cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor, index_address_status) self.block_timestamp_cache = LRUCache(1024) diff --git a/hub/elastic_sync/env.py b/hub/elastic_sync/env.py index 478ce98..fc07fc1 100644 --- a/hub/elastic_sync/env.py +++ b/hub/elastic_sync/env.py @@ -3,11 +3,11 @@ from hub.env import Env class ElasticEnv(Env): def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, prometheus_port=None, - cache_all_tx_hashes=None, cache_all_claim_txos=None, elastic_host=None, elastic_port=None, + cache_all_tx_hashes=None, elastic_host=None, elastic_port=None, es_index_prefix=None, elastic_notifier_host=None, elastic_notifier_port=None, blocking_channel_ids=None, filtering_channel_ids=None, reindex=False): super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, - cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids) + blocking_channel_ids, filtering_channel_ids) self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost') self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200) self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default( @@ -43,7 +43,7 @@ class ElasticEnv(Env): elastic_port=args.elastic_port, max_query_workers=args.max_query_workers, chain=args.chain, es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes, - cache_all_claim_txos=args.cache_all_claim_txos, blocking_channel_ids=args.blocking_channel_ids, + blocking_channel_ids=args.blocking_channel_ids, filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host, elastic_notifier_port=args.elastic_notifier_port ) diff --git a/hub/elastic_sync/service.py b/hub/elastic_sync/service.py index 745e1bd..b65c05c 100644 --- a/hub/elastic_sync/service.py +++ b/hub/elastic_sync/service.py @@ -49,7 +49,7 @@ class ElasticSyncService(BlockchainReaderService): def open_db(self): env = self.env self.db = ElasticSyncDB( - env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos, + env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids, filtering_channel_ids=env.filtering_channel_ids, executor=self._executor, index_address_status=env.index_address_status diff --git a/hub/env.py b/hub/env.py index ce18748..d7edbdc 100644 --- a/hub/env.py +++ b/hub/env.py @@ -30,7 +30,7 @@ class Env: pass def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, - prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, + prometheus_port=None, cache_all_tx_hashes=None, blocking_channel_ids=None, filtering_channel_ids=None, index_address_status=None): self.logger = logging.getLogger(__name__) @@ -46,7 +46,6 @@ class Env: self.reorg_limit = reorg_limit if reorg_limit is not None else self.integer('REORG_LIMIT', self.coin.REORG_LIMIT) self.prometheus_port = prometheus_port if prometheus_port is not None else self.integer('PROMETHEUS_PORT', 0) self.cache_all_tx_hashes = cache_all_tx_hashes if cache_all_tx_hashes is not None else self.boolean('CACHE_ALL_TX_HASHES', False) - self.cache_all_claim_txos = cache_all_claim_txos if cache_all_claim_txos is not None else self.boolean('CACHE_ALL_CLAIM_TXOS', False) # Filtering / Blocking self.blocking_channel_ids = blocking_channel_ids if blocking_channel_ids is not None else self.default( 'BLOCKING_CHANNEL_IDS', '').split(' ') @@ -171,11 +170,6 @@ class Env: "resolve, transaction fetching, and block sync all faster at the expense of higher " "memory usage (at least 10GB more). Can be set in env with 'CACHE_ALL_TX_HASHES'.", default=cls.boolean('CACHE_ALL_TX_HASHES', False)) - parser.add_argument('--cache_all_claim_txos', action='store_true', - help="Load all claim txos into memory. This will make address subscriptions and sync, " - "resolve, transaction fetching, and block sync all faster at the expense of higher " - "memory usage. Can be set in env with 'CACHE_ALL_CLAIM_TXOS'.", - default=cls.boolean('CACHE_ALL_CLAIM_TXOS', False)) parser.add_argument('--prometheus_port', type=int, default=cls.integer('PROMETHEUS_PORT', 0), help="Port for prometheus metrics to listen on, disabled by default. " "Can be set in env with 'PROMETHEUS_PORT'.") diff --git a/hub/herald/db.py b/hub/herald/db.py index fa0153a..30cb13e 100644 --- a/hub/herald/db.py +++ b/hub/herald/db.py @@ -6,11 +6,11 @@ from hub.db import SecondaryDB class HeraldDB(SecondaryDB): def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200, - cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, + cache_all_tx_hashes: bool = False, blocking_channel_ids: List[str] = None, filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None, index_address_status=False, merkle_cache_size=32768, tx_cache_size=32768): - super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit, cache_all_claim_txos, + super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit, cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor, index_address_status, merkle_cache_size, tx_cache_size) # self.headers = None diff --git a/hub/herald/env.py b/hub/herald/env.py index 52d16b6..81a8861 100644 --- a/hub/herald/env.py +++ b/hub/herald/env.py @@ -19,7 +19,7 @@ def parse_es_services(elastic_services_arg: str): class ServerEnv(Env): def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, - prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, + prometheus_port=None, cache_all_tx_hashes=None, daemon_url=None, host=None, elastic_services=None, es_index_prefix=None, tcp_port=None, udp_port=None, banner_file=None, allow_lan_udp=None, country=None, payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None, @@ -29,7 +29,7 @@ class ServerEnv(Env): merkle_cache_size=None, resolved_url_cache_size=None, tx_cache_size=None, history_tx_cache_size=None, largest_address_history_cache_size=None): super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, - cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status) + blocking_channel_ids, filtering_channel_ids, index_address_status) self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL') self.host = host if host is not None else self.default('HOST', 'localhost') self.elastic_services = deque(parse_es_services(elastic_services or 'localhost:9200/localhost:19080')) @@ -153,7 +153,7 @@ class ServerEnv(Env): es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port, udp_port=args.udp_port, prometheus_port=args.prometheus_port, banner_file=args.banner_file, allow_lan_udp=args.allow_lan_udp, cache_all_tx_hashes=args.cache_all_tx_hashes, - cache_all_claim_txos=args.cache_all_claim_txos, country=args.country, payment_address=args.payment_address, + country=args.country, payment_address=args.payment_address, donation_address=args.donation_address, max_send=args.max_send, max_receive=args.max_receive, max_sessions=args.max_sessions, session_timeout=args.session_timeout, drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee, diff --git a/hub/herald/service.py b/hub/herald/service.py index 9aa53e8..8d504b7 100644 --- a/hub/herald/service.py +++ b/hub/herald/service.py @@ -53,7 +53,7 @@ class HubServerService(BlockchainReaderService): def open_db(self): env = self.env self.db = HeraldDB( - env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos, + env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids, filtering_channel_ids=env.filtering_channel_ids, executor=self._executor, index_address_status=env.index_address_status, merkle_cache_size=env.merkle_cache_size, diff --git a/hub/scribe/db.py b/hub/scribe/db.py index f47259e..788f2e6 100644 --- a/hub/scribe/db.py +++ b/hub/scribe/db.py @@ -11,11 +11,11 @@ from hub.db import SecondaryDB class PrimaryDB(SecondaryDB): def __init__(self, coin, db_dir: str, reorg_limit: int = 200, - cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, + cache_all_tx_hashes: bool = False, max_open_files: int = 64, blocking_channel_ids: List[str] = None, filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None, index_address_status=False, enforce_integrity=True): - super().__init__(coin, db_dir, '', max_open_files, reorg_limit, cache_all_claim_txos, cache_all_tx_hashes, + super().__init__(coin, db_dir, '', max_open_files, reorg_limit, cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor, index_address_status, enforce_integrity=enforce_integrity) diff --git a/hub/scribe/env.py b/hub/scribe/env.py index b1eb667..9bdacb0 100644 --- a/hub/scribe/env.py +++ b/hub/scribe/env.py @@ -3,14 +3,13 @@ from hub.env import Env class BlockchainEnv(Env): def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, - prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, - blocking_channel_ids=None, filtering_channel_ids=None, + prometheus_port=None, cache_all_tx_hashes=None, blocking_channel_ids=None, filtering_channel_ids=None, db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None, index_address_status=None, rebuild_address_status_from_height=None, daemon_ca_path=None, history_tx_cache_size=None, db_disable_integrity_checks=False): super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, - cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status) + blocking_channel_ids, filtering_channel_ids, index_address_status) self.db_max_open_files = db_max_open_files self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL') self.hashX_history_cache_size = hashX_history_cache_size if hashX_history_cache_size is not None \ @@ -57,7 +56,7 @@ class BlockchainEnv(Env): db_dir=args.db_dir, daemon_url=args.daemon_url, db_max_open_files=args.db_max_open_files, max_query_workers=args.max_query_workers, chain=args.chain, reorg_limit=args.reorg_limit, prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes, - cache_all_claim_txos=args.cache_all_claim_txos, index_address_status=args.index_address_statuses, + index_address_status=args.index_address_statuses, hashX_history_cache_size=args.address_history_cache_size, rebuild_address_status_from_height=args.rebuild_address_status_from_height, daemon_ca_path=args.daemon_ca_path, history_tx_cache_size=args.history_tx_cache_size, diff --git a/hub/scribe/service.py b/hub/scribe/service.py index f914210..ac21752 100644 --- a/hub/scribe/service.py +++ b/hub/scribe/service.py @@ -138,11 +138,10 @@ class BlockchainProcessorService(BlockchainService): def open_db(self): env = self.env self.db = PrimaryDB( - env.coin, env.db_dir, env.reorg_limit, cache_all_claim_txos=env.cache_all_claim_txos, - cache_all_tx_hashes=env.cache_all_tx_hashes, max_open_files=env.db_max_open_files, - blocking_channel_ids=env.blocking_channel_ids, filtering_channel_ids=env.filtering_channel_ids, - executor=self._executor, index_address_status=env.index_address_status, - enforce_integrity=not env.db_disable_integrity_checks + env.coin, env.db_dir, env.reorg_limit, cache_all_tx_hashes=env.cache_all_tx_hashes, + max_open_files=env.db_max_open_files, blocking_channel_ids=env.blocking_channel_ids, + filtering_channel_ids=env.filtering_channel_ids, executor=self._executor, + index_address_status=env.index_address_status, enforce_integrity=not env.db_disable_integrity_checks ) async def run_in_thread_with_lock(self, func, *args): @@ -276,9 +275,6 @@ class BlockchainProcessorService(BlockchainService): for _ in range(count): await self.run_in_thread_with_lock(self.backup_block) self.log.info(f'backed up to height {self.height:,d}') - - if self.env.cache_all_claim_txos: - await self.db._read_claim_txos() # TODO: don't do this await self.prefetcher.reset_height(self.height) self.reorg_count_metric.inc() except: @@ -407,12 +403,6 @@ class BlockchainProcessorService(BlockchainService): if claim_hash not in self.updated_claim_previous_activations: self.updated_claim_previous_activations[claim_hash] = activation - if self.env.cache_all_claim_txos: - self.db.claim_to_txo[claim_hash] = ClaimToTXOValue( - tx_num, nout, root_tx_num, root_idx, txo.value, channel_signature_is_valid, claim_name - ) - self.db.txo_to_claim[tx_num][nout] = claim_hash - pending = StagedClaimtrieItem( claim_name, normalized_name, claim_hash, txo.value, self.coin.get_expiration_height(height), tx_num, nout, root_tx_num, root_idx, channel_signature_is_valid, signing_channel_hash, reposted_claim_hash @@ -703,11 +693,6 @@ class BlockchainProcessorService(BlockchainService): if 0 < activation <= self.height: self.effective_amount_delta[claim_hash] -= spent.amount self.future_effective_amount_delta[spent.claim_hash] -= spent.amount - if self.env.cache_all_claim_txos: - claim_hash = self.db.txo_to_claim[txin_num].pop(nout) - if not self.db.txo_to_claim[txin_num]: - self.db.txo_to_claim.pop(txin_num) - self.db.claim_to_txo.pop(claim_hash) if spent.reposted_claim_hash: self.pending_reposted.add(spent.reposted_claim_hash) if spent.signing_hash and spent.channel_signature_is_valid and spent.signing_hash not in self.abandoned_claims: diff --git a/hub/service.py b/hub/service.py index 83e6602..f0443c4 100644 --- a/hub/service.py +++ b/hub/service.py @@ -37,7 +37,7 @@ class BlockchainService: def open_db(self): env = self.env self.db = SecondaryDB( - env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos, + env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids, filtering_channel_ids=env.filtering_channel_ids, executor=self._executor, index_address_status=env.index_address_status