diff --git a/scribe/blockchain/env.py b/scribe/blockchain/env.py index 7a7fc09..c6c2cdd 100644 --- a/scribe/blockchain/env.py +++ b/scribe/blockchain/env.py @@ -5,14 +5,16 @@ class BlockchainEnv(Env): def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, blocking_channel_ids=None, filtering_channel_ids=None, - db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None): + db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None, + index_address_status=None): super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, - cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids) + cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status) self.db_max_open_files = db_max_open_files self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL') self.hashX_history_cache_size = hashX_history_cache_size if hashX_history_cache_size is not None \ else self.integer('ADDRESS_HISTORY_CACHE_SIZE', 1000) + @classmethod def contribute_to_arg_parser(cls, parser): super().contribute_to_arg_parser(parser) diff --git a/scribe/blockchain/service.py b/scribe/blockchain/service.py index ee858c7..767442f 100644 --- a/scribe/blockchain/service.py +++ b/scribe/blockchain/service.py @@ -146,8 +146,9 @@ class BlockchainProcessorService(BlockchainService): def update_mempool(unsafe_commit, mempool_prefix, to_put, to_delete): self.mempool.remove(to_delete) touched_hashXs = self.mempool.update_mempool(to_put) - for hashX in touched_hashXs: - self._get_update_hashX_mempool_status_ops(hashX) + if self.env.index_address_status: + for hashX in touched_hashXs: + self._get_update_hashX_mempool_status_ops(hashX) for tx_hash, raw_tx in to_put: mempool_prefix.stage_put((tx_hash,), (raw_tx,)) for tx_hash, raw_tx in to_delete.items(): @@ -1290,22 +1291,6 @@ class BlockchainProcessorService(BlockchainService): status = sha256(history.encode()) self.db.prefix_db.hashX_mempool_status.stage_put((hashX,), (status,)) - def _get_compactify_hashX_history_ops(self, height: int, hashX: bytes): - if height > self.env.reorg_limit: # compactify existing history - hist_txs = b'' - # accumulate and delete all of the tx histories between height 1 and current - reorg_limit - for k, hist in self.db.prefix_db.hashX_history.iterate( - start=(hashX, 1), stop=(hashX, height - self.env.reorg_limit), - deserialize_key=False, deserialize_value=False): - hist_txs += hist - self.db.prefix_db.stage_raw_delete(k, hist) - if hist_txs: - # add the accumulated histories onto the existing compacted history at height 0 - key = self.db.prefix_db.hashX_history.pack_key(hashX, 0) - existing = self.db.prefix_db.get(key) - if existing is not None: - self.db.prefix_db.stage_raw_delete(key, existing) - self.db.prefix_db.stage_raw_put(key, (existing or b'') + hist_txs) def advance_block(self, block: Block): height = self.height + 1 @@ -1398,15 +1383,11 @@ class BlockchainProcessorService(BlockchainService): # clear the mempool tx index self._get_clear_mempool_ops() - for hashX, new_history in self.hashXs_by_tx.items(): - # TODO: combine this with compaction so that we only read the history once - self._get_update_hashX_status_ops( - hashX, [(self.pending_transactions[tx_num], height) for tx_num in new_history] - ) - self._get_compactify_hashX_history_ops(height, hashX) - if not new_history: - continue - self.db.prefix_db.hashX_history.stage_put(key_args=(hashX, height), value_args=(new_history,)) + # update hashX history status hashes and compactify the histories + self._get_update_hashX_histories_ops(height) + + if not self.db.catching_up and self.env.index_address_status: + self._get_compactify_ops(height) self.tx_count = tx_count self.db.tx_counts.append(self.tx_count) @@ -1455,6 +1436,88 @@ class BlockchainProcessorService(BlockchainService): deserialize_key=False, deserialize_value=False)) ) + def _get_update_hashX_histories_ops(self, height: int): + self.db.prefix_db.hashX_history.stage_multi_put( + [((hashX, height), (new_tx_nums,)) for hashX, new_tx_nums in self.hashXs_by_tx.items()] + ) + + def _get_compactify_ops(self, height: int): + existing_hashX_statuses = self.db.prefix_db.hashX_status.multi_get([(hashX,) for hashX in self.hashXs_by_tx.keys()], deserialize_value=False) + if existing_hashX_statuses: + pack_key = self.db.prefix_db.hashX_status.pack_key + keys = [ + pack_key(hashX) for hashX, existing in zip( + self.hashXs_by_tx, existing_hashX_statuses + ) + ] + self.db.prefix_db.multi_delete([(k, v) for k, v in zip(keys, existing_hashX_statuses) if v is not None]) + + block_hashX_history_deletes = [] + append_deletes_hashX_history = block_hashX_history_deletes.append + block_hashX_history_puts = [] + + for (hashX, new_tx_nums), existing in zip(self.hashXs_by_tx.items(), existing_hashX_statuses): + new_history = [(self.pending_transactions[tx_num], height) for tx_num in new_tx_nums] + + tx_nums = [] + txs_extend = tx_nums.extend + compact_hist_txs = [] + compact_txs_extend = compact_hist_txs.extend + history_item_0 = None + existing_item_0 = None + reorg_limit = self.env.reorg_limit + unpack_history = self.db.prefix_db.hashX_history.unpack_value + unpack_key = self.db.prefix_db.hashX_history.unpack_key + needs_compaction = False + + total_hist_txs = b'' + for k, hist in self.db.prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_key=False, + deserialize_value=False): + hist_txs = unpack_history(hist) + total_hist_txs += hist + txs_extend(hist_txs) + hist_height = unpack_key(k).height + if height > reorg_limit and hist_height < height - reorg_limit: + compact_txs_extend(hist_txs) + if hist_height == 0: + history_item_0 = (k, hist) + elif hist_height > 0: + needs_compaction = True + # self.db.prefix_db.stage_raw_delete(k, hist) + append_deletes_hashX_history((k, hist)) + existing_item_0 = history_item_0 + if needs_compaction: + # add the accumulated histories onto the existing compacted history at height 0 + if existing_item_0 is not None: # delete if key 0 exists + key, existing = existing_item_0 + append_deletes_hashX_history((key, existing)) + block_hashX_history_puts.append(((hashX, 0), (compact_hist_txs,))) + if not new_history: + continue + + needed_tx_infos = [] + append_needed_tx_info = needed_tx_infos.append + tx_infos = {} + for tx_num in tx_nums: + if tx_num in self.history_tx_info_cache: + tx_infos[tx_num] = self.history_tx_info_cache[tx_num] + else: + append_needed_tx_info(tx_num) + if needed_tx_infos: + for tx_num, tx_hash in zip(needed_tx_infos, self.db.get_tx_hashes(needed_tx_infos)): + tx_info = f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num):d}:' + tx_infos[tx_num] = tx_info + self.history_tx_info_cache[tx_num] = tx_info + history = ''.join(map(tx_infos.__getitem__, tx_nums)) + for tx_hash, height in new_history: + history += f'{tx_hash[::-1].hex()}:{height:d}:' + if history: + status = sha256(history.encode()) + self.db.prefix_db.hashX_status.stage_put((hashX,), (status,)) + + self.db.prefix_db.multi_delete(block_hashX_history_deletes) + self.db.prefix_db.hashX_history.stage_multi_put(block_hashX_history_puts) + def clear_after_advance_or_reorg(self): self.txo_to_claim.clear() self.claim_hash_to_txo.clear() diff --git a/scribe/db/db.py b/scribe/db/db.py index ee34112..e4db988 100644 --- a/scribe/db/db.py +++ b/scribe/db/db.py @@ -39,7 +39,8 @@ class HubDB: def __init__(self, coin, db_dir: str, reorg_limit: int = 200, cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, secondary_name: str = '', max_open_files: int = 64, blocking_channel_ids: List[str] = None, - filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None): + filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None, + index_address_status=False): self.logger = logging.getLogger(__name__) self.coin = coin self._executor = executor @@ -52,6 +53,7 @@ class HubDB: if secondary_name: assert max_open_files == -1, 'max open files must be -1 for secondary readers' self._db_max_open_files = max_open_files + self._index_address_status = index_address_status self.prefix_db: typing.Optional[PrefixDB] = None self.hist_unflushed = defaultdict(partial(array.array, 'I')) diff --git a/scribe/env.py b/scribe/env.py index a019540..6ca1146 100644 --- a/scribe/env.py +++ b/scribe/env.py @@ -31,7 +31,7 @@ class Env: def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, - blocking_channel_ids=None, filtering_channel_ids=None): + blocking_channel_ids=None, filtering_channel_ids=None, index_address_status=None): self.logger = logging.getLogger(__name__) self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY') @@ -52,6 +52,8 @@ class Env: 'BLOCKING_CHANNEL_IDS', '').split(' ') self.filtering_channel_ids = filtering_channel_ids if filtering_channel_ids is not None else self.default( 'FILTERING_CHANNEL_IDS', '').split(' ') + self.index_address_status = index_address_status if index_address_status is not None else \ + self.boolean('INDEX_ADDRESS_STATUS', False) @classmethod def default(cls, envvar, default): @@ -187,6 +189,12 @@ class Env: "Claims that are reposted by these channels aren't returned in search results. " "Can be set in env with 'FILTERING_CHANNEL_IDS'", default=cls.default('FILTERING_CHANNEL_IDS', '').split(' ')) + parser.add_argument('--index_address_statuses', action='store_true', + help="Use precomputed address statuses, must be enabled in the reader and the writer to " + "use it. If disabled (the default), the status of an address must be calculated at " + "runtime when clients request it (address subscriptions, address history sync). " + "If enabled, scribe will maintain an index of precomputed statuses", + default=cls.boolean('INDEX_ADDRESS_STATUS', False)) @classmethod def from_arg_parser(cls, args): diff --git a/scribe/hub/mempool.py b/scribe/hub/mempool.py index 8d7dbd9..a2db139 100644 --- a/scribe/hub/mempool.py +++ b/scribe/hub/mempool.py @@ -157,6 +157,14 @@ class HubMemPool: result.append(MemPoolTxSummary(tx_hash, tx.fee, has_ui)) return result + def mempool_history(self, hashX: bytes) -> str: + result = '' + for tx_hash in self.touched_hashXs.get(hashX, ()): + if tx_hash not in self.txs: + continue # the tx hash for the touched address is an input that isn't in mempool anymore + result += f'{tx_hash[::-1].hex()}:{-any(_hash in self.txs for _hash, idx in self.txs[tx_hash].in_pairs):d}:' + return result + def unordered_UTXOs(self, hashX): """Return an unordered list of UTXO named tuples from mempool transactions that pay to hashX. @@ -276,7 +284,6 @@ class HubMemPool: if session.subscribe_headers and height_changed: sent_headers += 1 self._notification_q.put_nowait((session_id, height_changed, hashXes)) - if sent_headers: self.logger.info(f'notified {sent_headers} sessions of new block header') if session_hashxes_to_notify: diff --git a/scribe/hub/session.py b/scribe/hub/session.py index e350d5c..21df5be 100644 --- a/scribe/hub/session.py +++ b/scribe/hub/session.py @@ -1089,7 +1089,18 @@ class LBRYElectrumX(asyncio.Protocol): return len(self.hashX_subs) async def get_hashX_status(self, hashX: bytes): - return await self.loop.run_in_executor(self.db._executor, self.db.get_hashX_status, hashX) + self.session_manager.db.last_flush + if self.env.index_address_status: + loop = self.loop + return await loop.run_in_executor(None, self.db.get_hashX_status, hashX) + history = ''.join( + f"{tx_hash[::-1].hex()}:{height:d}:" + for tx_hash, height in await self.db.limited_history(hashX, limit=None) + ) + self.mempool.mempool_history(hashX) + if not history: + return + status = sha256(history.encode()) + return status.hex() async def send_history_notifications(self, *hashXes: typing.Iterable[bytes]): notifications = [] diff --git a/scribe/service.py b/scribe/service.py index bcde306..d2227fd 100644 --- a/scribe/service.py +++ b/scribe/service.py @@ -30,7 +30,8 @@ class BlockchainService: self.db = HubDB( env.coin, env.db_dir, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes, secondary_name=secondary_name, max_open_files=-1, blocking_channel_ids=env.blocking_channel_ids, - filtering_channel_ids=env.filtering_channel_ids, executor=self._executor + filtering_channel_ids=env.filtering_channel_ids, executor=self._executor, + index_address_status=env.index_address_status ) self._stopping = False