From 1badc5f38c20834dca51f5d5405b0a57b7cc13bb Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 21 Mar 2022 22:47:11 -0400 Subject: [PATCH] `BlockchainService` base class for readers and the writer -move base58.py and bip32.py into scribe.schema -fix https://github.com/lbryio/scribe/issues/3 --- README.md | 12 +- ...erfile.wallet_server => Dockerfile.scribe} | 4 +- docker/deploy_scribe_dev.sh | 20 ++ ...ver_entrypoint.sh => scribe_entrypoint.sh} | 6 +- scribe/blockchain/network.py | 4 +- scribe/blockchain/prefetcher.py | 3 +- .../{block_processor.py => service.py} | 163 +++------- scribe/cli.py | 11 +- .../service.py} | 19 +- .../{reader/hub_server.py => hub/service.py} | 16 +- scribe/hub/session.py | 4 +- scribe/{reader/prometheus.py => metrics.py} | 0 scribe/reader/__init__.py | 3 - scribe/schema/attrs.py | 4 +- scribe/{ => schema}/base58.py | 0 scribe/{ => schema}/bip32.py | 2 +- scribe/{reader/interface.py => service.py} | 287 ++++++++++-------- 17 files changed, 260 insertions(+), 298 deletions(-) rename docker/{Dockerfile.wallet_server => Dockerfile.scribe} (94%) create mode 100755 docker/deploy_scribe_dev.sh rename docker/{wallet_server_entrypoint.sh => scribe_entrypoint.sh} (63%) rename scribe/blockchain/{block_processor.py => service.py} (94%) rename scribe/{reader/elastic_sync.py => elasticsearch/service.py} (98%) rename scribe/{reader/hub_server.py => hub/service.py} (91%) rename scribe/{reader/prometheus.py => metrics.py} (100%) delete mode 100644 scribe/reader/__init__.py rename scribe/{ => schema}/base58.py (100%) rename scribe/{ => schema}/bip32.py (99%) rename scribe/{reader/interface.py => service.py} (82%) diff --git a/README.md b/README.md index 5643e11..53d96ad 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,12 @@ Scribe maintains a [rocksdb](https://github.com/lbryio/lbry-rocksdb) database containing the [LBRY blockchain](https://github.com/lbryio/lbrycrd) and provides an interface for python based services that utilize the blockchain data in an ongoing manner. Scribe includes implementations of this interface to provide an electrum server for thin-wallet clients (such as lbry-sdk) and to maintain an elasticsearch database of metadata for claims in the LBRY blockchain. - * Uses Python 3.7-3.8 + * Uses Python 3.7-3.9 (3.10 probably works but hasn't yet been tested) * Protobuf schema for encoding and decoding metadata stored on the blockchain ([scribe.schema](https://github.com/lbryio/scribe/tree/master/scribe/schema)). - * Blockchain processor that maintains an up to date rocksdb database ([scribe.blockchain](https://github.com/lbryio/scribe/tree/master/scribe/blockchain)) - * Rocksdb based database containing the blockchain data ([scribe.db](https://github.com/lbryio/scribe/tree/master/scribe/db)) - * Interface for python services to implement in order for them maintain a read only view of the blockchain data ([scribe.reader.interface](https://github.com/lbryio/scribe/tree/master/scribe/reader/interface.py)) - * Electrum based server for thin-wallet clients like lbry-sdk ([scribe.reader.hub_server](https://github.com/lbryio/scribe/tree/master/scribe/reader/hub_server.py)) - * Elasticsearch sync utility to index all the claim metadata in the blockchain into an easily searchable form ([scribe.reader.elastic_sync](https://github.com/lbryio/scribe/tree/master/scribe/reader/elastic_sync.py)) + * Blockchain processor that maintains an up to date rocksdb database ([scribe.blockchain.service](https://github.com/lbryio/scribe/tree/master/scribe/blockchain/service.py)) + * [Rocksdb](https://github.com/lbryio/lbry-rocksdb/) based database containing the blockchain data ([scribe.db](https://github.com/lbryio/scribe/tree/master/scribe/db)) + * Interface for python services to implement in order for them maintain a read only view of the blockchain data ([scribe.service](https://github.com/lbryio/scribe/tree/master/scribe/service.py)) + * Electrum based server for thin-wallet clients like lbry-sdk ([scribe.hub.service](https://github.com/lbryio/scribe/tree/master/scribe/hub/service.py)) + * Elasticsearch sync utility to index all the claim metadata in the blockchain into an easily searchable form ([scribe.elasticsearch.service](https://github.com/lbryio/scribe/tree/master/scribe/elasticsearch/service.py)) ## Installation diff --git a/docker/Dockerfile.wallet_server b/docker/Dockerfile.scribe similarity index 94% rename from docker/Dockerfile.wallet_server rename to docker/Dockerfile.scribe index 0be0ffc..94db423 100644 --- a/docker/Dockerfile.wallet_server +++ b/docker/Dockerfile.scribe @@ -1,5 +1,7 @@ FROM debian:11-slim +STOPSIGNAL SIGINT + ARG user=lbry ARG db_dir=/database ARG projects_dir=/home/$user @@ -49,5 +51,5 @@ ENV MAX_SEND=1000000000000000000 ENV MAX_RECEIVE=1000000000000000000 -COPY ./docker/wallet_server_entrypoint.sh /entrypoint.sh +COPY ./docker/scribe_entrypoint.sh /entrypoint.sh ENTRYPOINT ["/entrypoint.sh"] diff --git a/docker/deploy_scribe_dev.sh b/docker/deploy_scribe_dev.sh new file mode 100755 index 0000000..016da54 --- /dev/null +++ b/docker/deploy_scribe_dev.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash + +# usage: deploy_scribe_dev.sh +TARGET_HOST=$1 + +DOCKER_DIR=`dirname $0` +SCRIBE_DIR=`dirname $DOCKER_DIR` + +# build the image +docker build -f $DOCKER_DIR/Dockerfile.scribe -t lbry/scribe:development $SCRIBE_DIR +IMAGE=`docker image inspect lbry/scribe:development | sed -n "s/^.*Id\":\s*\"sha256:\s*\(\S*\)\".*$/\1/p"` + +# push the image to the server +ssh $TARGET_HOST docker image prune --force +docker save $IMAGE | ssh $TARGET_HOST docker load +ssh $TARGET_HOST docker tag $IMAGE lbry/scribe:development + +## restart the wallet server +ssh $TARGET_HOST docker-compose down +ssh $TARGET_HOST SCRIBE_TAG="development" docker-compose up -d diff --git a/docker/wallet_server_entrypoint.sh b/docker/scribe_entrypoint.sh similarity index 63% rename from docker/wallet_server_entrypoint.sh rename to docker/scribe_entrypoint.sh index 09b836b..63db297 100755 --- a/docker/wallet_server_entrypoint.sh +++ b/docker/scribe_entrypoint.sh @@ -10,8 +10,8 @@ if [ -z "$HUB_COMMAND" ]; then fi case "$HUB_COMMAND" in - scribe ) /home/lbry/.local/bin/scribe "$@" ;; - scribe-hub ) /home/lbry/.local/bin/scribe-hub "$@" ;; - scribe-elastic-sync ) /home/lbry/.local/bin/scribe-elastic-sync ;; + scribe ) exec /home/lbry/.local/bin/scribe "$@" ;; + scribe-hub ) exec /home/lbry/.local/bin/scribe-hub "$@" ;; + scribe-elastic-sync ) exec /home/lbry/.local/bin/scribe-elastic-sync ;; * ) "HUB_COMMAND env variable must be scribe, scribe-hub, or scribe-elastic-sync" && exit 1 ;; esac diff --git a/scribe/blockchain/network.py b/scribe/blockchain/network.py index 5450666..eb63ecf 100644 --- a/scribe/blockchain/network.py +++ b/scribe/blockchain/network.py @@ -4,8 +4,8 @@ import typing from typing import List from hashlib import sha256 from decimal import Decimal -from scribe.base58 import Base58 -from scribe.bip32 import PublicKey +from scribe.schema.base58 import Base58 +from scribe.schema.bip32 import PublicKey from scribe.common import hash160, hash_to_hex_str, double_sha256 from scribe.blockchain.transaction import TxOutput, TxInput, Block from scribe.blockchain.transaction.deserializer import Deserializer diff --git a/scribe/blockchain/prefetcher.py b/scribe/blockchain/prefetcher.py index 68f635a..ab4eaff 100644 --- a/scribe/blockchain/prefetcher.py +++ b/scribe/blockchain/prefetcher.py @@ -34,9 +34,10 @@ class Prefetcher: self.ave_size = self.min_cache_size // 10 self.polling_delay = 0.5 - async def main_loop(self, bp_height): + async def main_loop(self, bp_height, started: asyncio.Event): """Loop forever polling for more blocks.""" await self.reset_height(bp_height) + started.set() try: while True: # Sleep a while if there is nothing to prefetch diff --git a/scribe/blockchain/block_processor.py b/scribe/blockchain/service.py similarity index 94% rename from scribe/blockchain/block_processor.py rename to scribe/blockchain/service.py index 0edb65b..ba0c816 100644 --- a/scribe/blockchain/block_processor.py +++ b/scribe/blockchain/service.py @@ -1,24 +1,21 @@ -import logging import time import asyncio import typing -import signal from bisect import bisect_right from struct import pack -from concurrent.futures.thread import ThreadPoolExecutor from typing import Optional, List, Tuple, Set, DefaultDict, Dict from prometheus_client import Gauge, Histogram from collections import defaultdict from scribe import __version__, PROMETHEUS_NAMESPACE -from scribe.db.db import HubDB from scribe.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE from scribe.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue from scribe.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS from scribe.blockchain.daemon import LBCDaemon -from scribe.blockchain.transaction import Tx, TxOutput, TxInput +from scribe.blockchain.transaction import Tx, TxOutput, TxInput, Block from scribe.blockchain.prefetcher import Prefetcher from scribe.schema.url import normalize_name +from scribe.service import BlockchainService if typing.TYPE_CHECKING: from scribe.env import Env from scribe.db.revertable import RevertableOpStack @@ -56,7 +53,7 @@ class StagedClaimtrieItem(typing.NamedTuple): NAMESPACE = f"{PROMETHEUS_NAMESPACE}_writer" -class BlockProcessor: +class BlockchainProcessorService(BlockchainService): """Process blocks and update the DB state to match. Employ a prefetcher to prefetch blocks in batches for processing. @@ -74,20 +71,11 @@ class BlockProcessor: ) def __init__(self, env: 'Env'): - self.cancellable_tasks = [] - - self.env = env - self.state_lock = asyncio.Lock() + super().__init__(env, secondary_name='', thread_workers=1, thread_prefix='block-processor') self.daemon = LBCDaemon(env.coin, env.daemon_url) - self._chain_executor = ThreadPoolExecutor(1, thread_name_prefix='block-processor') - self.db = HubDB( - env.coin, env.db_dir, env.cache_MB, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes, - max_open_files=env.db_max_open_files, blocking_channel_ids=env.blocking_channel_ids, - filtering_channel_ids=env.filtering_channel_ids, executor=self._chain_executor - ) - self.shutdown_event = asyncio.Event() self.coin = env.coin self.wait_for_blocks_duration = 0.1 + self._ready_to_stop = asyncio.Event() self._caught_up_event: Optional[asyncio.Event] = None self.height = 0 @@ -96,7 +84,7 @@ class BlockProcessor: self.blocks_event = asyncio.Event() self.prefetcher = Prefetcher(self.daemon, env.coin, self.blocks_event) - self.logger = logging.getLogger(__name__) + # self.logger = logging.getLogger(__name__) # Meta self.touched_hashXs: Set[bytes] = set() @@ -163,9 +151,6 @@ class BlockProcessor: self.pending_transaction_num_mapping: Dict[bytes, int] = {} self.pending_transactions: Dict[int, bytes] = {} - self._stopping = False - self._ready_to_stop = asyncio.Event() - async def run_in_thread_with_lock(self, func, *args): # Run in a thread to prevent blocking. Shielded so that # cancellations from shutdown don't lose work - when the task @@ -173,13 +158,13 @@ class BlockProcessor: # Take the state lock to be certain in-memory state is # consistent and not being updated elsewhere. async def run_in_thread_locked(): - async with self.state_lock: - return await asyncio.get_event_loop().run_in_executor(self._chain_executor, func, *args) + async with self.lock: + return await asyncio.get_event_loop().run_in_executor(self._executor, func, *args) return await asyncio.shield(run_in_thread_locked()) async def run_in_thread(self, func, *args): async def run_in_thread(): - return await asyncio.get_event_loop().run_in_executor(self._chain_executor, func, *args) + return await asyncio.get_event_loop().run_in_executor(self._executor, func, *args) return await asyncio.shield(run_in_thread()) async def refresh_mempool(self): @@ -195,13 +180,13 @@ class BlockProcessor: mempool_prefix.stage_delete((tx_hash,), (raw_tx,)) unsafe_commit() - async with self.state_lock: + async with self.lock: current_mempool = await self.run_in_thread(fetch_mempool, self.db.prefix_db.mempool_tx) _to_put = [] try: mempool_hashes = await self.daemon.mempool_hashes() - except (TypeError, RPCError): - self.logger.warning("failed to get mempool tx hashes, reorg underway?") + except (TypeError, RPCError) as err: + self.log.exception("failed to get mempool tx hashes, reorg underway? (%s)", err) return for hh in mempool_hashes: tx_hash = bytes.fromhex(hh)[::-1] @@ -211,7 +196,7 @@ class BlockProcessor: try: _to_put.append((tx_hash, bytes.fromhex(await self.daemon.getrawtransaction(hh)))) except (TypeError, RPCError): - self.logger.warning("failed to get a mempool tx, reorg underway?") + self.log.warning("failed to get a mempool tx, reorg underway?") return if current_mempool: if bytes.fromhex(await self.daemon.getbestblockhash())[::-1] != self.coin.header_hash(self.db.headers[-1]): @@ -243,17 +228,17 @@ class BlockProcessor: start = time.perf_counter() start_count = self.tx_count txo_count = await self.run_in_thread_with_lock(self.advance_block, block) - self.logger.info( + self.log.info( "writer advanced to %i (%i txs, %i txos) in %0.3fs", self.height, self.tx_count - start_count, txo_count, time.perf_counter() - start ) if self.height == self.coin.nExtendedClaimExpirationForkHeight: - self.logger.warning( + self.log.warning( "applying extended claim expiration fork on claims accepted by, %i", self.height ) await self.run_in_thread_with_lock(self.db.apply_expiration_extension_fork) except: - self.logger.exception("advance blocks failed") + self.log.exception("advance blocks failed") raise processed_time = time.perf_counter() - total_start self.block_count_metric.set(self.height) @@ -271,29 +256,29 @@ class BlockProcessor: if self.db.get_block_hash(height)[::-1].hex() == block_hash: break count += 1 - self.logger.warning(f"blockchain reorg detected at {self.height}, unwinding last {count} blocks") + self.log.warning(f"blockchain reorg detected at {self.height}, unwinding last {count} blocks") try: assert count > 0, count for _ in range(count): await self.run_in_thread_with_lock(self.backup_block) - self.logger.info(f'backed up to height {self.height:,d}') + self.log.info(f'backed up to height {self.height:,d}') if self.env.cache_all_claim_txos: await self.db._read_claim_txos() # TODO: don't do this await self.prefetcher.reset_height(self.height) self.reorg_count_metric.inc() except: - self.logger.exception("reorg blocks failed") + self.log.exception("reorg blocks failed") raise finally: - self.logger.info("backed up to block %i", self.height) + self.log.info("backed up to block %i", self.height) else: # It is probably possible but extremely rare that what # bitcoind returns doesn't form a chain because it # reorg-ed the chain as it was processing the batched # block hash requests. Should this happen it's simplest # just to reset the prefetcher and try again. - self.logger.warning('daemon blocks do not form a chain; ' + self.log.warning('daemon blocks do not form a chain; ' 'resetting the prefetcher') await self.prefetcher.reset_height(self.height) @@ -365,7 +350,7 @@ class BlockProcessor: # else: # print("\tfailed to validate signed claim") except: - self.logger.exception(f"error validating channel signature for %s:%i", tx_hash[::-1].hex(), nout) + self.log.exception(f"error validating channel signature for %s:%i", tx_hash[::-1].hex(), nout) if txo.is_claim: # it's a root claim root_tx_num, root_idx = tx_num, nout @@ -375,7 +360,7 @@ class BlockProcessor: # print(f"\tthis is a wonky tx, contains unlinked claim update {claim_hash.hex()}") return if normalized_name != spent_claims[claim_hash][2]: - self.logger.warning( + self.log.warning( f"{tx_hash[::-1].hex()} contains mismatched name for claim update {claim_hash.hex()}" ) return @@ -1280,7 +1265,7 @@ class BlockProcessor: self.touched_claims_to_send_es.difference_update(self.removed_claim_hashes) self.removed_claims_to_send_es.update(self.removed_claim_hashes) - def advance_block(self, block): + def advance_block(self, block: Block): height = self.height + 1 # print("advance ", height) # Use local vars for speed in the loops @@ -1454,7 +1439,7 @@ class BlockProcessor: self.removed_claims_to_send_es.update(touched_and_deleted.deleted_claims) # self.db.assert_flushed(self.flush_data()) - self.logger.info("backup block %i", self.height) + self.log.info("backup block %i", self.height) # Check and update self.tip self.db.tx_counts.pop() @@ -1507,7 +1492,7 @@ class BlockProcessor: self.db.assert_db_state() elapsed = self.db.last_flush - start_time - self.logger.warning(f'backup flush #{self.db.hist_flush_count:,d} took {elapsed:.1f}s. ' + self.log.warning(f'backup flush #{self.db.hist_flush_count:,d} took {elapsed:.1f}s. ' f'Height {self.height:,d} txs: {self.tx_count:,d} ({tx_delta:+,d})') def add_utxo(self, tx_hash: bytes, tx_num: int, nout: int, txout: 'TxOutput') -> Optional[bytes]: @@ -1535,7 +1520,7 @@ class BlockProcessor: hashX = hashX_value.hashX utxo_value = self.db.prefix_db.utxo.get(hashX, txin_num, nout) if not utxo_value: - self.logger.warning( + self.log.warning( "%s:%s is not found in UTXO db for %s", hash_to_hex_str(tx_hash), nout, hash_to_hex_str(hashX) ) raise ChainError( @@ -1551,8 +1536,9 @@ class BlockProcessor: self.touched_hashXs.add(hashX) return hashX - async def process_blocks_and_mempool_forever(self): + async def process_blocks_and_mempool_forever(self, caught_up_event): """Loop forever processing blocks as they arrive.""" + self._caught_up_event = caught_up_event try: while not self._stopping: if self.height == self.daemon.cached_height(): @@ -1573,7 +1559,7 @@ class BlockProcessor: except asyncio.CancelledError: raise except Exception: - self.logger.exception("error while updating mempool txs") + self.log.exception("error while updating mempool txs") raise else: try: @@ -1581,13 +1567,13 @@ class BlockProcessor: except asyncio.CancelledError: raise except Exception: - self.logger.exception("error while processing txs") + self.log.exception("error while processing txs") raise finally: self._ready_to_stop.set() async def _first_caught_up(self): - self.logger.info(f'caught up to height {self.height}') + self.log.info(f'caught up to height {self.height}') # Flush everything but with first_sync->False state. first_sync = self.db.first_sync self.db.first_sync = False @@ -1601,86 +1587,19 @@ class BlockProcessor: await self.run_in_thread_with_lock(flush) if first_sync: - self.logger.info(f'{__version__} synced to ' + self.log.info(f'{__version__} synced to ' f'height {self.height:,d}, halting here.') self.shutdown_event.set() - async def open(self): - self.db.open_db() + def _iter_start_tasks(self): self.height = self.db.db_height self.tip = self.db.db_tip self.tx_count = self.db.db_tx_count - await self.db.initialize_caches() + yield self.daemon.height() + yield self.start_cancellable(self.prefetcher.main_loop, self.height) + yield self.start_cancellable(self.process_blocks_and_mempool_forever) - async def fetch_and_process_blocks(self, caught_up_event): - """Fetch, process and index blocks from the daemon. - - Sets caught_up_event when first caught up. Flushes to disk - and shuts down cleanly if cancelled. - - This is mainly because if, during initial sync ElectrumX is - asked to shut down when a large number of blocks have been - processed but not written to disk, it should write those to - disk before exiting, as otherwise a significant amount of work - could be lost. - """ - - await self.open() - - self._caught_up_event = caught_up_event - try: - await asyncio.wait([ - self.prefetcher.main_loop(self.height), - self.process_blocks_and_mempool_forever() - ]) - except asyncio.CancelledError: - raise - except: - self.logger.exception("Block processing failed!") - raise - finally: - # Shut down block processing - self.logger.info('closing the DB for a clean shutdown...') - self._chain_executor.shutdown(wait=True) - self.db.close() - - async def start(self): - self._stopping = False - env = self.env - self.logger.info(f'software version: {__version__}') - self.logger.info(f'event loop policy: {env.loop_policy}') - self.logger.info(f'reorg limit is {env.reorg_limit:,d} blocks') - - await self.daemon.height() - - def _start_cancellable(run, *args): - _flag = asyncio.Event() - self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag))) - return _flag.wait() - - await _start_cancellable(self.fetch_and_process_blocks) - - async def stop(self): - self._stopping = True - await self._ready_to_stop.wait() - for task in reversed(self.cancellable_tasks): - task.cancel() - await asyncio.wait(self.cancellable_tasks) - self.shutdown_event.set() - await self.daemon.close() - - def run(self): - loop = asyncio.get_event_loop() - loop.set_default_executor(self._chain_executor) - - def __exit(): - raise SystemExit() - try: - loop.add_signal_handler(signal.SIGINT, __exit) - loop.add_signal_handler(signal.SIGTERM, __exit) - loop.run_until_complete(self.start()) - loop.run_until_complete(self.shutdown_event.wait()) - except (SystemExit, KeyboardInterrupt): - pass - finally: - loop.run_until_complete(self.stop()) + def _iter_stop_tasks(self): + yield self._ready_to_stop.wait() + yield self._stop_cancellable_tasks() + yield self.daemon.close() diff --git a/scribe/cli.py b/scribe/cli.py index a7413be..675a77f 100644 --- a/scribe/cli.py +++ b/scribe/cli.py @@ -2,8 +2,9 @@ import logging import traceback import argparse from scribe.env import Env -from scribe.blockchain.block_processor import BlockProcessor -from scribe.reader import BlockchainReaderServer, ElasticWriter +from scribe.blockchain.service import BlockchainProcessorService +from scribe.hub.service import HubServerService +from scribe.elasticsearch.service import ElasticSyncService def get_arg_parser(name): @@ -24,7 +25,7 @@ def run_writer_forever(): setup_logging() args = get_arg_parser('scribe').parse_args() try: - block_processor = BlockProcessor(Env.from_arg_parser(args)) + block_processor = BlockchainProcessorService(Env.from_arg_parser(args)) block_processor.run() except Exception: traceback.print_exc() @@ -38,7 +39,7 @@ def run_server_forever(): args = get_arg_parser('scribe-hub').parse_args() try: - server = BlockchainReaderServer(Env.from_arg_parser(args)) + server = HubServerService(Env.from_arg_parser(args)) server.run() except Exception: traceback.print_exc() @@ -54,7 +55,7 @@ def run_es_sync_forever(): args = parser.parse_args() try: - server = ElasticWriter(Env.from_arg_parser(args)) + server = ElasticSyncService(Env.from_arg_parser(args)) server.run(args.reindex) except Exception: traceback.print_exc() diff --git a/scribe/reader/elastic_sync.py b/scribe/elasticsearch/service.py similarity index 98% rename from scribe/reader/elastic_sync.py rename to scribe/elasticsearch/service.py index ca56b49..6b471ce 100644 --- a/scribe/reader/elastic_sync.py +++ b/scribe/elasticsearch/service.py @@ -2,25 +2,22 @@ import os import json import typing import asyncio -import logging from collections import defaultdict from elasticsearch import AsyncElasticsearch, NotFoundError from elasticsearch.helpers import async_streaming_bulk from scribe.schema.result import Censor +from scribe.service import BlockchainReaderService +from scribe.db.revertable import RevertableOp +from scribe.db.common import TrendingNotification, DB_PREFIXES + from scribe.elasticsearch.notifier_protocol import ElasticNotifierProtocol from scribe.elasticsearch.search import IndexVersionMismatch, expand_query from scribe.elasticsearch.constants import ALL_FIELDS, INDEX_DEFAULT_SETTINGS from scribe.elasticsearch.fast_ar_trending import FAST_AR_TRENDING_SCRIPT -from scribe.reader import BaseBlockchainReader -from scribe.db.revertable import RevertableOp -from scribe.db.common import TrendingNotification, DB_PREFIXES -log = logging.getLogger(__name__) - - -class ElasticWriter(BaseBlockchainReader): +class ElasticSyncService(BlockchainReaderService): VERSION = 1 def __init__(self, env): @@ -342,10 +339,10 @@ class ElasticWriter(BaseBlockchainReader): def _iter_start_tasks(self): yield self.read_es_height() yield self.start_index() - yield self._start_cancellable(self.run_es_notifier) + yield self.start_cancellable(self.run_es_notifier) yield self.reindex(force=self._force_reindex) yield self.catch_up() - yield self._start_cancellable(self.refresh_blocks_forever) + yield self.start_cancellable(self.refresh_blocks_forever) def _iter_stop_tasks(self): yield self._stop_cancellable_tasks() @@ -363,7 +360,7 @@ class ElasticWriter(BaseBlockchainReader): self._force_reindex = False async def _reindex(self): - async with self._lock: + async with self.lock: self.log.info("reindexing %i claims (estimate)", self.db.prefix_db.claim_to_txo.estimate_num_keys()) await self.delete_index() res = await self.sync_client.indices.create(self.index, INDEX_DEFAULT_SETTINGS, ignore=400) diff --git a/scribe/reader/hub_server.py b/scribe/hub/service.py similarity index 91% rename from scribe/reader/hub_server.py rename to scribe/hub/service.py index 1bab810..6caef20 100644 --- a/scribe/reader/hub_server.py +++ b/scribe/hub/service.py @@ -1,13 +1,15 @@ import asyncio from scribe.blockchain.daemon import LBCDaemon -from scribe.reader import BaseBlockchainReader -from scribe.elasticsearch import ElasticNotifierClientProtocol + from scribe.hub.session import SessionManager from scribe.hub.mempool import MemPool from scribe.hub.udp import StatusServer +from scribe.service import BlockchainReaderService +from scribe.elasticsearch import ElasticNotifierClientProtocol -class BlockchainReaderServer(BaseBlockchainReader): + +class HubServerService(BlockchainReaderService): def __init__(self, env): super().__init__(env, 'lbry-reader', thread_workers=max(1, env.max_query_workers), thread_prefix='hub-worker') self.notifications_to_send = [] @@ -89,11 +91,11 @@ class BlockchainReaderServer(BaseBlockchainReader): def _iter_start_tasks(self): yield self.start_status_server() - yield self._start_cancellable(self.es_notification_client.maintain_connection) - yield self._start_cancellable(self.receive_es_notifications) - yield self._start_cancellable(self.refresh_blocks_forever) + yield self.start_cancellable(self.es_notification_client.maintain_connection) + yield self.start_cancellable(self.receive_es_notifications) + yield self.start_cancellable(self.refresh_blocks_forever) yield self.session_manager.search_index.start() - yield self._start_cancellable(self.session_manager.serve, self.mempool) + yield self.start_cancellable(self.session_manager.serve, self.mempool) def _iter_stop_tasks(self): yield self.status_server.stop() diff --git a/scribe/hub/session.py b/scribe/hub/session.py index 4a7ac1e..92ef367 100644 --- a/scribe/hub/session.py +++ b/scribe/hub/session.py @@ -12,11 +12,11 @@ from bisect import bisect_right from asyncio import Event, sleep from collections import defaultdict, namedtuple from contextlib import suppress -from functools import partial, lru_cache +from functools import partial from elasticsearch import ConnectionTimeout from prometheus_client import Counter, Info, Histogram, Gauge from scribe.schema.result import Outputs -from scribe.base58 import Base58Error +from scribe.schema.base58 import Base58Error from scribe.error import ResolveCensoredError, TooManyClaimSearchParametersError from scribe import __version__, PROTOCOL_MIN, PROTOCOL_MAX, PROMETHEUS_NAMESPACE from scribe.build_info import BUILD, COMMIT_HASH, DOCKER_TAG diff --git a/scribe/reader/prometheus.py b/scribe/metrics.py similarity index 100% rename from scribe/reader/prometheus.py rename to scribe/metrics.py diff --git a/scribe/reader/__init__.py b/scribe/reader/__init__.py deleted file mode 100644 index 9f464a6..0000000 --- a/scribe/reader/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from scribe.reader.interface import BaseBlockchainReader -from scribe.reader.hub_server import BlockchainReaderServer -from scribe.reader.elastic_sync import ElasticWriter diff --git a/scribe/schema/attrs.py b/scribe/schema/attrs.py index 6ba471b..a855f42 100644 --- a/scribe/schema/attrs.py +++ b/scribe/schema/attrs.py @@ -7,12 +7,12 @@ from string import ascii_letters from decimal import Decimal, ROUND_UP from google.protobuf.json_format import MessageToDict -from scribe.base58 import Base58, b58_encode +from scribe.schema.base58 import Base58, b58_encode from scribe.error import MissingPublishedFileError, EmptyPublishedFileError from scribe.schema.mime_types import guess_media_type from scribe.schema.base import Metadata, BaseMessageList -from scribe.schema.tags import clean_tags, normalize_tag +from scribe.schema.tags import normalize_tag from scribe.schema.types.v2.claim_pb2 import ( Fee as FeeMessage, Location as LocationMessage, diff --git a/scribe/base58.py b/scribe/schema/base58.py similarity index 100% rename from scribe/base58.py rename to scribe/schema/base58.py diff --git a/scribe/bip32.py b/scribe/schema/bip32.py similarity index 99% rename from scribe/bip32.py rename to scribe/schema/bip32.py index e4a2456..d12ff1b 100644 --- a/scribe/bip32.py +++ b/scribe/schema/bip32.py @@ -8,7 +8,7 @@ from coincurve.utils import ( pem_to_der, lib as libsecp256k1, ffi as libsecp256k1_ffi ) from coincurve.ecdsa import CDATA_SIG_LENGTH -from scribe.base58 import Base58 +from scribe.schema.base58 import Base58 if (sys.version_info.major, sys.version_info.minor) > (3, 7): diff --git a/scribe/reader/interface.py b/scribe/service.py similarity index 82% rename from scribe/reader/interface.py rename to scribe/service.py index df6a31a..bdac2b1 100644 --- a/scribe/reader/interface.py +++ b/scribe/service.py @@ -4,31 +4,35 @@ import typing import signal from concurrent.futures.thread import ThreadPoolExecutor from prometheus_client import Gauge, Histogram -from scribe import PROMETHEUS_NAMESPACE, __version__ -from scribe.common import HISTOGRAM_BUCKETS -from scribe.db.prefixes import DBState + +from scribe import __version__, PROMETHEUS_NAMESPACE +from scribe.env import Env from scribe.db import HubDB -from scribe.reader.prometheus import PrometheusServer +from scribe.db.prefixes import DBState +from scribe.common import HISTOGRAM_BUCKETS +from scribe.metrics import PrometheusServer -NAMESPACE = f"{PROMETHEUS_NAMESPACE}_reader" - - -class BlockchainReaderInterface: - async def poll_for_changes(self): - """ - Detect and handle if the db has advanced to a new block or unwound during a chain reorganization - - If a reorg is detected, this will first unwind() to the branching height and then advance() forward - to the new block(s). - """ - raise NotImplementedError() - - def clear_caches(self): - """ - Called after finished advancing, used for invalidating caches - """ - pass +class BlockchainService: + """ + Base class for blockchain readers and the writer + """ + def __init__(self, env: Env, secondary_name: str, thread_workers: int = 1, thread_prefix: str = 'scribe'): + self.env = env + self.log = logging.getLogger(__name__).getChild(self.__class__.__name__) + self.shutdown_event = asyncio.Event() + self.cancellable_tasks = [] + self._thread_workers = thread_workers + self._thread_prefix = thread_prefix + self._executor = ThreadPoolExecutor(thread_workers, thread_name_prefix=thread_prefix) + self.lock = asyncio.Lock() + self.last_state: typing.Optional[DBState] = None + self.db = HubDB( + env.coin, env.db_dir, env.cache_MB, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes, + secondary_name=secondary_name, max_open_files=-1, blocking_channel_ids=env.blocking_channel_ids, + filtering_channel_ids=env.filtering_channel_ids, executor=self._executor + ) + self._stopping = False def advance(self, height: int): """ @@ -44,8 +48,84 @@ class BlockchainReaderInterface: """ raise NotImplementedError() + def start_cancellable(self, run, *args): + _flag = asyncio.Event() + self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag))) + return _flag.wait() -class BaseBlockchainReader(BlockchainReaderInterface): + def _iter_start_tasks(self): + raise NotImplementedError() + + def _iter_stop_tasks(self): + yield self._stop_cancellable_tasks() + + async def _stop_cancellable_tasks(self): + async with self.lock: + while self.cancellable_tasks: + t = self.cancellable_tasks.pop() + if not t.done(): + t.cancel() + + async def start(self): + if not self._executor: + self._executor = ThreadPoolExecutor(self._thread_workers, thread_name_prefix=self._thread_prefix) + self.db._executor = self._executor + + env = self.env + # min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings() + self.log.info(f'software version: {__version__}') + # self.log.info(f'supported protocol versions: {min_str}-{max_str}') + self.log.info(f'reorg limit is {env.reorg_limit:,d} blocks') + + self.db.open_db() + self.log.info(f'initializing caches') + await self.db.initialize_caches() + self.last_state = self.db.read_db_state() + self.log.info(f'opened db at block {self.db.db_height}') + + for start_task in self._iter_start_tasks(): + await start_task + + async def stop(self): + self.log.info("stopping") + self._stopping = True + for stop_task in self._iter_stop_tasks(): + await stop_task + self.db.close() + self._executor.shutdown(wait=True) + self._executor = None + self.shutdown_event.set() + + async def _run(self): + try: + await self.start() + self.log.info("finished start(), waiting for shutdown event") + await self.shutdown_event.wait() + except (SystemExit, KeyboardInterrupt, asyncio.CancelledError): + self.log.warning("exiting") + self._stopping = True + except Exception as err: + self.log.exception("unexpected fatal error: %s", err) + self._stopping = True + + def run(self): + def __exit(): + raise SystemExit() + + loop = asyncio.get_event_loop() + loop.set_default_executor(self._executor) + loop.add_signal_handler(signal.SIGINT, __exit) + loop.add_signal_handler(signal.SIGTERM, __exit) + try: + loop.run_until_complete(self._run()) + finally: + loop.run_until_complete(self.stop()) + + +NAMESPACE = f"{PROMETHEUS_NAMESPACE}_reader" + + +class BlockchainReaderService(BlockchainService): block_count_metric = Gauge( "block_count", "Number of processed blocks", namespace=NAMESPACE ) @@ -57,23 +137,56 @@ class BaseBlockchainReader(BlockchainReaderInterface): ) def __init__(self, env, secondary_name: str, thread_workers: int = 1, thread_prefix: str = 'blockchain-reader'): - self.env = env - self.log = logging.getLogger(__name__).getChild(self.__class__.__name__) - self.shutdown_event = asyncio.Event() - self.cancellable_tasks = [] - self._thread_workers = thread_workers - self._thread_prefix = thread_prefix - self._executor = ThreadPoolExecutor(thread_workers, thread_name_prefix=thread_prefix) - self.db = HubDB( - env.coin, env.db_dir, env.cache_MB, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes, - secondary_name=secondary_name, max_open_files=-1, blocking_channel_ids=env.blocking_channel_ids, - filtering_channel_ids=env.filtering_channel_ids, executor=self._executor - ) - self.last_state: typing.Optional[DBState] = None + super().__init__(env, secondary_name, thread_workers, thread_prefix) self._refresh_interval = 0.1 - self._lock = asyncio.Lock() self.prometheus_server: typing.Optional[PrometheusServer] = None + async def poll_for_changes(self): + """ + Detect and handle if the db has advanced to a new block or unwound during a chain reorganization + + If a reorg is detected, this will first unwind() to the branching height and then advance() forward + to the new block(s). + """ + await asyncio.get_event_loop().run_in_executor(self._executor, self._detect_changes) + + def clear_caches(self): + """ + Called after finished advancing, used for invalidating caches + """ + pass + + def advance(self, height: int): + """ + Called when advancing to the given block height + Callbacks that look up new values from the added block can be put here + """ + tx_count = self.db.prefix_db.tx_count.get(height).tx_count + assert tx_count not in self.db.tx_counts, f'boom {tx_count} in {len(self.db.tx_counts)} tx counts' + assert len(self.db.tx_counts) == height, f"{len(self.db.tx_counts)} != {height}" + prev_count = self.db.tx_counts[-1] + self.db.tx_counts.append(tx_count) + if self.db._cache_all_tx_hashes: + for tx_num in range(prev_count, tx_count): + tx_hash = self.db.prefix_db.tx_hash.get(tx_num).tx_hash + self.db.total_transactions.append(tx_hash) + self.db.tx_num_mapping[tx_hash] = tx_count + assert len(self.db.total_transactions) == tx_count, f"{len(self.db.total_transactions)} vs {tx_count}" + self.db.headers.append(self.db.prefix_db.header.get(height, deserialize_value=False)) + + def unwind(self): + """ + Go backwards one block + + """ + prev_count = self.db.tx_counts.pop() + tx_count = self.db.tx_counts[-1] + self.db.headers.pop() + if self.db._cache_all_tx_hashes: + for _ in range(prev_count - tx_count): + self.db.tx_num_mapping.pop(self.db.total_transactions.pop()) + assert len(self.db.total_transactions) == tx_count, f"{len(self.db.total_transactions)} vs {tx_count}" + def _detect_changes(self): try: self.db.prefix_db.try_catch_up_with_primary() @@ -115,19 +228,10 @@ class BaseBlockchainReader(BlockchainReaderInterface): self.db.filtering_channel_hashes ) - async def poll_for_changes(self): - """ - Detect and handle if the db has advanced to a new block or unwound during a chain reorganization - - If a reorg is detected, this will first unwind() to the branching height and then advance() forward - to the new block(s). - """ - await asyncio.get_event_loop().run_in_executor(self._executor, self._detect_changes) - async def refresh_blocks_forever(self, synchronized: asyncio.Event): while True: try: - async with self._lock: + async with self.lock: await self.poll_for_changes() except asyncio.CancelledError: raise @@ -137,80 +241,15 @@ class BaseBlockchainReader(BlockchainReaderInterface): await asyncio.sleep(self._refresh_interval) synchronized.set() - def advance(self, height: int): - tx_count = self.db.prefix_db.tx_count.get(height).tx_count - assert tx_count not in self.db.tx_counts, f'boom {tx_count} in {len(self.db.tx_counts)} tx counts' - assert len(self.db.tx_counts) == height, f"{len(self.db.tx_counts)} != {height}" - prev_count = self.db.tx_counts[-1] - self.db.tx_counts.append(tx_count) - if self.db._cache_all_tx_hashes: - for tx_num in range(prev_count, tx_count): - tx_hash = self.db.prefix_db.tx_hash.get(tx_num).tx_hash - self.db.total_transactions.append(tx_hash) - self.db.tx_num_mapping[tx_hash] = tx_count - assert len(self.db.total_transactions) == tx_count, f"{len(self.db.total_transactions)} vs {tx_count}" - self.db.headers.append(self.db.prefix_db.header.get(height, deserialize_value=False)) - - def unwind(self): - prev_count = self.db.tx_counts.pop() - tx_count = self.db.tx_counts[-1] - self.db.headers.pop() - if self.db._cache_all_tx_hashes: - for _ in range(prev_count - tx_count): - self.db.tx_num_mapping.pop(self.db.total_transactions.pop()) - assert len(self.db.total_transactions) == tx_count, f"{len(self.db.total_transactions)} vs {tx_count}" - - def _start_cancellable(self, run, *args): - _flag = asyncio.Event() - self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag))) - return _flag.wait() - def _iter_start_tasks(self): - yield self._start_cancellable(self.refresh_blocks_forever) + self.block_count_metric.set(self.last_state.height) + yield self.start_prometheus() + yield self.start_cancellable(self.refresh_blocks_forever) def _iter_stop_tasks(self): + yield self.stop_prometheus() yield self._stop_cancellable_tasks() - async def _stop_cancellable_tasks(self): - async with self._lock: - while self.cancellable_tasks: - t = self.cancellable_tasks.pop() - if not t.done(): - t.cancel() - - async def start(self): - if not self._executor: - self._executor = ThreadPoolExecutor(self._thread_workers, thread_name_prefix=self._thread_prefix) - self.db._executor = self._executor - - env = self.env - # min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings() - self.log.info(f'software version: {__version__}') - # self.log.info(f'supported protocol versions: {min_str}-{max_str}') - self.log.info(f'event loop policy: {env.loop_policy}') - self.log.info(f'reorg limit is {env.reorg_limit:,d} blocks') - - self.db.open_db() - self.log.info(f'initializing caches') - await self.db.initialize_caches() - self.last_state = self.db.read_db_state() - self.log.info(f'opened db at block {self.last_state.height}') - self.block_count_metric.set(self.last_state.height) - - await self.start_prometheus() - for start_task in self._iter_start_tasks(): - await start_task - self.log.info("finished starting") - - async def stop(self): - for stop_task in self._iter_stop_tasks(): - await stop_task - await self.stop_prometheus() - self.db.close() - self._executor.shutdown(wait=True) - self._executor = None - self.shutdown_event.set() - async def start_prometheus(self): if not self.prometheus_server and self.env.prometheus_port: self.prometheus_server = PrometheusServer() @@ -220,19 +259,3 @@ class BaseBlockchainReader(BlockchainReaderInterface): if self.prometheus_server: await self.prometheus_server.stop() self.prometheus_server = None - - def run(self): - loop = asyncio.get_event_loop() - loop.set_default_executor(self._executor) - - def __exit(): - raise SystemExit() - try: - loop.add_signal_handler(signal.SIGINT, __exit) - loop.add_signal_handler(signal.SIGTERM, __exit) - loop.run_until_complete(self.start()) - loop.run_until_complete(self.shutdown_event.wait()) - except (SystemExit, KeyboardInterrupt): - pass - finally: - loop.run_until_complete(self.stop())