From 5d8a32368a41c17bc5414355876377b5d32ba6f4 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 6 Apr 2022 22:01:55 -0400 Subject: [PATCH] add a migrator to build the hashX status index and to compactify histories --- scribe/blockchain/service.py | 10 ++++ scribe/db/db.py | 2 +- scribe/db/migrators/__init__.py | 0 scribe/db/migrators/migrate7to8.py | 89 ++++++++++++++++++++++++++++++ 4 files changed, 100 insertions(+), 1 deletion(-) create mode 100644 scribe/db/migrators/__init__.py create mode 100644 scribe/db/migrators/migrate7to8.py diff --git a/scribe/blockchain/service.py b/scribe/blockchain/service.py index 8642505..b539ff0 100644 --- a/scribe/blockchain/service.py +++ b/scribe/blockchain/service.py @@ -1637,6 +1637,16 @@ class BlockchainProcessorService(BlockchainService): await self.run_in_thread_with_lock(flush) def _iter_start_tasks(self): + while self.db.db_version < max(self.db.DB_VERSIONS): + if self.db.db_version == 7: + from scribe.db.migrators.migrate7to8 import migrate, FROM_VERSION, TO_VERSION + else: + raise RuntimeError("unknown db version") + self.log.warning(f"migrating database from version {FROM_VERSION} to version {TO_VERSION}") + migrate(self.db) + self.log.info("finished migration") + self.db.read_db_state() + self.height = self.db.db_height self.tip = self.db.db_tip self.tx_count = self.db.db_tx_count diff --git a/scribe/db/db.py b/scribe/db/db.py index 5587636..98fccae 100644 --- a/scribe/db/db.py +++ b/scribe/db/db.py @@ -32,7 +32,7 @@ TXO_STRUCT_pack = TXO_STRUCT.pack class HubDB: - DB_VERSIONS = HIST_DB_VERSIONS = [7] + DB_VERSIONS = [7, 8] def __init__(self, coin, db_dir: str, cache_MB: int = 512, reorg_limit: int = 200, cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, diff --git a/scribe/db/migrators/__init__.py b/scribe/db/migrators/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scribe/db/migrators/migrate7to8.py b/scribe/db/migrators/migrate7to8.py new file mode 100644 index 0000000..8a04d5d --- /dev/null +++ b/scribe/db/migrators/migrate7to8.py @@ -0,0 +1,89 @@ +import logging +import time +import array +import typing +from bisect import bisect_right +from scribe.common import sha256 +if typing.TYPE_CHECKING: + from scribe.db.db import HubDB + +FROM_VERSION = 7 +TO_VERSION = 8 + +log = logging.getLogger() + + +def get_all_hashXs(db): + def iterator(): + last_hashX = None + for k in db.prefix_db.hashX_history.iterate(deserialize_key=False, include_value=False): + hashX = k[1:12] + if last_hashX is None: + last_hashX = hashX + if last_hashX != hashX: + yield hashX + last_hashX = hashX + if last_hashX: + yield last_hashX + return [hashX for hashX in iterator()] + + +def hashX_history(db: 'HubDB', hashX: bytes): + history = b'' + to_delete = [] + for k, v in db.prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, deserialize_key=False): + to_delete.append((k, v)) + history += v + return history, to_delete + + +def hashX_status_from_history(db: 'HubDB', history: bytes) -> bytes: + tx_counts = db.tx_counts + hist_tx_nums = array.array('I') + hist_tx_nums.frombytes(history) + hist = '' + for tx_num in hist_tx_nums: + hist += f'{db.get_tx_hash(tx_num)[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:' + return sha256(hist.encode()) + + +def migrate(db): + start = time.perf_counter() + prefix_db = db.prefix_db + hashXs = get_all_hashXs(db) + log.info(f"loaded {len(hashXs)} hashXs in {round(time.perf_counter() - start, 2)}s, " + f"now building the status index...") + op_cnt = 0 + hashX_cnt = 0 + for hashX in hashXs: + hashX_cnt += 1 + key = prefix_db.hashX_status.pack_key(hashX) + history, to_delete = hashX_history(db, hashX) + status = hashX_status_from_history(db, history) + existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False) + if existing_status and existing_status != status: + prefix_db.stage_raw_delete(key, existing_status) + op_cnt += 1 + elif existing_status == status: + pass + else: + prefix_db.stage_raw_put(key, status) + op_cnt += 1 + if len(to_delete) > 1: + for k, v in to_delete: + prefix_db.stage_raw_delete(k, v) + op_cnt += 1 + if history: + prefix_db.stage_raw_put(prefix_db.hashX_history.pack_key(hashX, 0), history) + op_cnt += 1 + if op_cnt > 100000: + prefix_db.unsafe_commit() + log.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses") + op_cnt = 0 + if op_cnt: + prefix_db.unsafe_commit() + log.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses") + db.db_version = 8 + db.write_db_state() + db.prefix_db.unsafe_commit() + log.info("finished migration")