From 55eb8818ea68ab07f849251312c4412d7df991bc Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sun, 16 Oct 2022 14:15:42 -0400 Subject: [PATCH] add `--db_disable_integrity_checks` option to scribe --- hub/db/db.py | 6 ++++-- hub/db/interface.py | 7 +++++-- hub/db/prefixes.py | 6 ++++-- hub/db/revertable.py | 12 +++++++++++- hub/scribe/db.py | 5 +++-- hub/scribe/env.py | 10 ++++++++-- hub/scribe/service.py | 3 ++- 7 files changed, 37 insertions(+), 12 deletions(-) diff --git a/hub/db/db.py b/hub/db/db.py index 36b8da5..92209c6 100644 --- a/hub/db/db.py +++ b/hub/db/db.py @@ -40,7 +40,8 @@ class SecondaryDB: cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, blocking_channel_ids: List[str] = None, filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None, - index_address_status=False, merkle_cache_size=32768, tx_cache_size=32768): + index_address_status=False, merkle_cache_size=32768, tx_cache_size=32768, + enforce_integrity=True): self.logger = logging.getLogger(__name__) self.coin = coin self._executor = executor @@ -53,6 +54,7 @@ class SecondaryDB: assert max_open_files == -1, 'max open files must be -1 for secondary readers' self._db_max_open_files = max_open_files self._index_address_status = index_address_status + self._enforce_integrity = enforce_integrity self.prefix_db: typing.Optional[PrefixDB] = None self.hist_unflushed = defaultdict(partial(array.array, 'I')) @@ -1016,7 +1018,7 @@ class SecondaryDB: self.prefix_db = PrefixDB( db_path, reorg_limit=self._reorg_limit, max_open_files=self._db_max_open_files, unsafe_prefixes={DBStatePrefixRow.prefix, MempoolTXPrefixRow.prefix, HashXMempoolStatusPrefixRow.prefix}, - secondary_path=secondary_path + secondary_path=secondary_path, enforce_integrity=self._enforce_integrity ) if secondary_path != '': diff --git a/hub/db/interface.py b/hub/db/interface.py index 9a2895c..50a1914 100644 --- a/hub/db/interface.py +++ b/hub/db/interface.py @@ -183,7 +183,8 @@ class BasePrefixDB: UNDO_KEY_STRUCT = struct.Struct(b'>Q32s') PARTIAL_UNDO_KEY_STRUCT = struct.Struct(b'>Q') - def __init__(self, path, max_open_files=64, secondary_path='', max_undo_depth: int = 200, unsafe_prefixes=None): + def __init__(self, path, max_open_files=64, secondary_path='', max_undo_depth: int = 200, unsafe_prefixes=None, + enforce_integrity=True): column_family_options = {} for prefix in DB_PREFIXES: settings = COLUMN_SETTINGS[prefix.value] @@ -206,7 +207,9 @@ class BasePrefixDB: cf = self._db.get_column_family(prefix.value) self.column_families[prefix.value] = cf - self._op_stack = RevertableOpStack(self.get, self.multi_get, unsafe_prefixes=unsafe_prefixes) + self._op_stack = RevertableOpStack( + self.get, self.multi_get, unsafe_prefixes=unsafe_prefixes, enforce_integrity=enforce_integrity + ) self._max_undo_depth = max_undo_depth def unsafe_commit(self): diff --git a/hub/db/prefixes.py b/hub/db/prefixes.py index d0f929b..1821287 100644 --- a/hub/db/prefixes.py +++ b/hub/db/prefixes.py @@ -1852,9 +1852,11 @@ class FutureEffectiveAmountPrefixRow(PrefixRow): class PrefixDB(BasePrefixDB): def __init__(self, path: str, reorg_limit: int = 200, max_open_files: int = 64, - secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None): + secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None, + enforce_integrity: bool = True): super().__init__(path, max_open_files=max_open_files, secondary_path=secondary_path, - max_undo_depth=reorg_limit, unsafe_prefixes=unsafe_prefixes) + max_undo_depth=reorg_limit, unsafe_prefixes=unsafe_prefixes, + enforce_integrity=enforce_integrity) db = self._db self.claim_to_support = ClaimToSupportPrefixRow(db, self._op_stack) self.support_to_claim = SupportToClaimPrefixRow(db, self._op_stack) diff --git a/hub/db/revertable.py b/hub/db/revertable.py index 9e66a27..a3ba7d6 100644 --- a/hub/db/revertable.py +++ b/hub/db/revertable.py @@ -83,7 +83,8 @@ class OpStackIntegrity(Exception): class RevertableOpStack: def __init__(self, get_fn: Callable[[bytes], Optional[bytes]], - multi_get_fn: Callable[[List[bytes]], Iterable[Optional[bytes]]], unsafe_prefixes=None): + multi_get_fn: Callable[[List[bytes]], Iterable[Optional[bytes]]], unsafe_prefixes=None, + enforce_integrity=True): """ This represents a sequence of revertable puts and deletes to a key-value database that checks for integrity violations when applying the puts and deletes. The integrity checks assure that keys that do not exist @@ -103,6 +104,7 @@ class RevertableOpStack: self._stash: Deque[RevertableOp] = deque() self._stashed_last_op_for_key = {} self._unsafe_prefixes = unsafe_prefixes or set() + self._enforce_integrity = enforce_integrity def stash_ops(self, ops: Iterable[RevertableOp]): self._stash.extend(ops) @@ -129,6 +131,14 @@ class RevertableOpStack: else: append_op_needed(op) unique_keys.add(op.key) + + existing = {} + if self._enforce_integrity and unique_keys: + unique_keys = list(unique_keys) + existing.update({ + k: v for k, v in zip(unique_keys, self._multi_get(unique_keys)) + }) + for op in ops_to_apply: if op.key in self._items and len(self._items[op.key]) and self._items[op.key][-1] == op.invert(): self._items[op.key].pop() diff --git a/hub/scribe/db.py b/hub/scribe/db.py index 691a551..bb8a5ca 100644 --- a/hub/scribe/db.py +++ b/hub/scribe/db.py @@ -13,9 +13,10 @@ class PrimaryDB(SecondaryDB): cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, max_open_files: int = 64, blocking_channel_ids: List[str] = None, filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None, - index_address_status=False): + index_address_status=False, enforce_integrity=True): super().__init__(coin, db_dir, '', max_open_files, reorg_limit, cache_all_claim_txos, cache_all_tx_hashes, - blocking_channel_ids, filtering_channel_ids, executor, index_address_status) + blocking_channel_ids, filtering_channel_ids, executor, index_address_status, + enforce_integrity=enforce_integrity) def _rebuild_hashX_status_index(self, start_height: int): self.logger.warning("rebuilding the address status index...") diff --git a/hub/scribe/env.py b/hub/scribe/env.py index f49cee2..b1eb667 100644 --- a/hub/scribe/env.py +++ b/hub/scribe/env.py @@ -7,7 +7,8 @@ class BlockchainEnv(Env): blocking_channel_ids=None, filtering_channel_ids=None, db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None, index_address_status=None, rebuild_address_status_from_height=None, - daemon_ca_path=None, history_tx_cache_size=None): + daemon_ca_path=None, history_tx_cache_size=None, + db_disable_integrity_checks=False): super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status) self.db_max_open_files = db_max_open_files @@ -19,6 +20,7 @@ class BlockchainEnv(Env): self.daemon_ca_path = daemon_ca_path if daemon_ca_path else None self.history_tx_cache_size = history_tx_cache_size if history_tx_cache_size is not None else \ self.integer('HISTORY_TX_CACHE_SIZE', 4194304) + self.db_disable_integrity_checks = db_disable_integrity_checks @classmethod def contribute_to_arg_parser(cls, parser): @@ -30,6 +32,9 @@ class BlockchainEnv(Env): default=env_daemon_url) parser.add_argument('--daemon_ca_path', type=str, default='', help='Path to the lbcd ca file, used for lbcd with ssl') + parser.add_argument('--db_disable_integrity_checks', action='store_true', + help="Disable verifications that no db operation breaks the ability to be rewound", + default=False) parser.add_argument('--db_max_open_files', type=int, default=64, help='This setting translates into the max_open_files option given to rocksdb. ' 'A higher number will use more memory. Defaults to 64.') @@ -55,5 +60,6 @@ class BlockchainEnv(Env): cache_all_claim_txos=args.cache_all_claim_txos, index_address_status=args.index_address_statuses, hashX_history_cache_size=args.address_history_cache_size, rebuild_address_status_from_height=args.rebuild_address_status_from_height, - daemon_ca_path=args.daemon_ca_path, history_tx_cache_size=args.history_tx_cache_size + daemon_ca_path=args.daemon_ca_path, history_tx_cache_size=args.history_tx_cache_size, + db_disable_integrity_checks=args.db_disable_integrity_checks ) diff --git a/hub/scribe/service.py b/hub/scribe/service.py index a751043..d6509e4 100644 --- a/hub/scribe/service.py +++ b/hub/scribe/service.py @@ -138,7 +138,8 @@ class BlockchainProcessorService(BlockchainService): env.coin, env.db_dir, env.reorg_limit, cache_all_claim_txos=env.cache_all_claim_txos, cache_all_tx_hashes=env.cache_all_tx_hashes, max_open_files=env.db_max_open_files, blocking_channel_ids=env.blocking_channel_ids, filtering_channel_ids=env.filtering_channel_ids, - executor=self._executor, index_address_status=env.index_address_status + executor=self._executor, index_address_status=env.index_address_status, + enforce_integrity=not env.db_disable_integrity_checks ) async def run_in_thread_with_lock(self, func, *args):