From c5dc8d5cad305767a90e4369f08acaa02932f91f Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 9 Mar 2022 17:26:20 -0500 Subject: [PATCH] refactoring, fix reconnecting to notifier --- scribe/blockchain/block_processor.py | 17 +-- scribe/common.py | 4 + scribe/elasticsearch/notifier_protocol.py | 36 +++++- scribe/hub/common.py | 5 - scribe/hub/jsonrpc.py | 6 - scribe/hub/mempool.py | 4 +- scribe/hub/session.py | 5 +- scribe/reader/elastic_sync.py | 89 ++++----------- scribe/reader/hub_server.py | 86 +++------------ scribe/reader/interface.py | 127 +++++++++++++++++++--- 10 files changed, 190 insertions(+), 189 deletions(-) diff --git a/scribe/blockchain/block_processor.py b/scribe/blockchain/block_processor.py index 41954e7..0edb65b 100644 --- a/scribe/blockchain/block_processor.py +++ b/scribe/blockchain/block_processor.py @@ -3,7 +3,6 @@ import time import asyncio import typing import signal - from bisect import bisect_right from struct import pack from concurrent.futures.thread import ThreadPoolExecutor @@ -11,18 +10,15 @@ from typing import Optional, List, Tuple, Set, DefaultDict, Dict from prometheus_client import Gauge, Histogram from collections import defaultdict -from scribe.schema.url import normalize_name - from scribe import __version__, PROMETHEUS_NAMESPACE -from scribe.blockchain.daemon import LBCDaemon -from scribe.blockchain.transaction import Tx, TxOutput, TxInput from scribe.db.db import HubDB from scribe.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE from scribe.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue -from scribe.common import hash_to_hex_str, hash160, RPCError +from scribe.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS +from scribe.blockchain.daemon import LBCDaemon +from scribe.blockchain.transaction import Tx, TxOutput, TxInput from scribe.blockchain.prefetcher import Prefetcher - - +from scribe.schema.url import normalize_name if typing.TYPE_CHECKING: from scribe.env import Env from scribe.db.revertable import RevertableOpStack @@ -57,11 +53,6 @@ class StagedClaimtrieItem(typing.NamedTuple): ) -HISTOGRAM_BUCKETS = ( - .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') -) - - NAMESPACE = f"{PROMETHEUS_NAMESPACE}_writer" diff --git a/scribe/common.py b/scribe/common.py index 9e767b9..06f486a 100644 --- a/scribe/common.py +++ b/scribe/common.py @@ -18,6 +18,10 @@ HASHX_LEN = 11 CLAIM_HASH_LEN = 20 +HISTOGRAM_BUCKETS = ( + .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') +) + # class cachedproperty: # def __init__(self, f): # self.f = f diff --git a/scribe/elasticsearch/notifier_protocol.py b/scribe/elasticsearch/notifier_protocol.py index 900668c..adc2f67 100644 --- a/scribe/elasticsearch/notifier_protocol.py +++ b/scribe/elasticsearch/notifier_protocol.py @@ -31,9 +31,40 @@ class ElasticNotifierProtocol(asyncio.Protocol): class ElasticNotifierClientProtocol(asyncio.Protocol): """notifies the reader when ES has written updates""" - def __init__(self, notifications: asyncio.Queue): + def __init__(self, notifications: asyncio.Queue, host: str, port: int): self.notifications = notifications self.transport: typing.Optional[asyncio.Transport] = None + self.host = host + self.port = port + self._lost_connection = asyncio.Event() + self._lost_connection.set() + + async def connect(self): + if self._lost_connection.is_set(): + await asyncio.get_event_loop().create_connection( + lambda: self, self.host, self.port + ) + + async def maintain_connection(self, synchronized: asyncio.Event): + first_connect = True + if not self._lost_connection.is_set(): + synchronized.set() + while True: + try: + await self._lost_connection.wait() + if not first_connect: + log.warning("lost connection to scribe-elastic-sync notifier") + await self.connect() + first_connect = False + synchronized.set() + log.info("connected to es notifier") + except Exception as e: + if not isinstance(e, asyncio.CancelledError): + log.warning("waiting 30s for scribe-elastic-sync notifier to become available (%s:%i)", self.host, self.port) + await asyncio.sleep(30) + else: + log.info("stopping the notifier loop") + raise e def close(self): if self.transport and not self.transport.is_closing(): @@ -41,10 +72,11 @@ class ElasticNotifierClientProtocol(asyncio.Protocol): def connection_made(self, transport): self.transport = transport - log.info("connected to es notifier") + self._lost_connection.clear() def connection_lost(self, exc) -> None: self.transport = None + self._lost_connection.set() def data_received(self, data: bytes) -> None: try: diff --git a/scribe/hub/common.py b/scribe/hub/common.py index 3991c9e..7eb2c11 100644 --- a/scribe/hub/common.py +++ b/scribe/hub/common.py @@ -4,11 +4,6 @@ from functools import lru_cache from scribe.common import CodeMessageError -HISTOGRAM_BUCKETS = ( - .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') -) - - SignatureInfo = namedtuple('SignatureInfo', 'min_args max_args ' 'required_names other_names') diff --git a/scribe/hub/jsonrpc.py b/scribe/hub/jsonrpc.py index 1a732a1..cf9f0c4 100644 --- a/scribe/hub/jsonrpc.py +++ b/scribe/hub/jsonrpc.py @@ -9,12 +9,6 @@ from scribe.common import RPCError, CodeMessageError from scribe.hub.common import Notification, Request, Response, Batch, ProtocolError -HISTOGRAM_BUCKETS = ( - .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') -) -NAMESPACE = "scribe" - - class JSONRPC: """Abstract base class that interprets and constructs JSON RPC messages.""" diff --git a/scribe/hub/mempool.py b/scribe/hub/mempool.py index 40d8793..ab9f88d 100644 --- a/scribe/hub/mempool.py +++ b/scribe/hub/mempool.py @@ -6,6 +6,7 @@ import logging from collections import defaultdict from prometheus_client import Histogram from scribe import PROMETHEUS_NAMESPACE +from scribe.common import HISTOGRAM_BUCKETS from scribe.blockchain.transaction.deserializer import Deserializer if typing.TYPE_CHECKING: @@ -32,9 +33,6 @@ class MemPoolTxSummary: NAMESPACE = f"{PROMETHEUS_NAMESPACE}_hub" -HISTOGRAM_BUCKETS = ( - .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') -) mempool_process_time_metric = Histogram( "processed_mempool", "Time to process mempool and notify touched addresses", namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS diff --git a/scribe/hub/session.py b/scribe/hub/session.py index 86d5b08..a204b3d 100644 --- a/scribe/hub/session.py +++ b/scribe/hub/session.py @@ -23,7 +23,7 @@ from scribe import __version__, PROTOCOL_MIN, PROTOCOL_MAX, PROMETHEUS_NAMESPACE from scribe.build_info import BUILD, COMMIT_HASH, DOCKER_TAG from scribe.elasticsearch import SearchIndex from scribe.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time -from scribe.common import protocol_version, RPCError, DaemonError, TaskGroup +from scribe.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS from scribe.hub.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2, JSONRPC from scribe.hub.common import BatchRequest, ProtocolError, Request, Batch, Notification from scribe.hub.framer import NewlineFramer @@ -190,9 +190,6 @@ class SessionGroup: NAMESPACE = f"{PROMETHEUS_NAMESPACE}_hub" -HISTOGRAM_BUCKETS = ( - .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') -) class SessionManager: diff --git a/scribe/reader/elastic_sync.py b/scribe/reader/elastic_sync.py index 6761f55..ecec17a 100644 --- a/scribe/reader/elastic_sync.py +++ b/scribe/reader/elastic_sync.py @@ -1,18 +1,13 @@ import os -import signal import json import typing -import struct -from collections import defaultdict import asyncio import logging -from decimal import Decimal +from collections import defaultdict from elasticsearch import AsyncElasticsearch, NotFoundError from elasticsearch.helpers import async_streaming_bulk -from prometheus_client import Gauge, Histogram from scribe.schema.result import Censor -from scribe import PROMETHEUS_NAMESPACE from scribe.elasticsearch.notifier_protocol import ElasticNotifierProtocol from scribe.elasticsearch.search import IndexVersionMismatch, expand_query from scribe.elasticsearch.constants import ALL_FIELDS, INDEX_DEFAULT_SETTINGS @@ -24,24 +19,9 @@ from scribe.db.common import TrendingNotification, DB_PREFIXES log = logging.getLogger(__name__) -NAMESPACE = f"{PROMETHEUS_NAMESPACE}_elastic_sync" -HISTOGRAM_BUCKETS = ( - .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') -) - class ElasticWriter(BaseBlockchainReader): VERSION = 1 - prometheus_namespace = "" - block_count_metric = Gauge( - "block_count", "Number of processed blocks", namespace=NAMESPACE - ) - block_update_time_metric = Histogram( - "block_time", "Block update times", namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS - ) - reorg_count_metric = Gauge( - "reorg_count", "Number of reorgs", namespace=NAMESPACE - ) def __init__(self, env): super().__init__(env, 'lbry-elastic-writer', thread_workers=1, thread_prefix='lbry-elastic-writer') @@ -65,6 +45,7 @@ class ElasticWriter(BaseBlockchainReader): self._advanced = True self.synchronized = asyncio.Event() self._listeners: typing.List[ElasticNotifierProtocol] = [] + self._force_reindex = False async def run_es_notifier(self, synchronized: asyncio.Event): server = await asyncio.get_event_loop().create_server( @@ -138,8 +119,10 @@ class ElasticWriter(BaseBlockchainReader): await self.sync_client.indices.refresh(self.index) return False - async def stop_index(self): + async def stop_index(self, delete=False): if self.sync_client: + if delete: + await self.delete_index() await self.sync_client.close() self.sync_client = None @@ -311,60 +294,30 @@ class ElasticWriter(BaseBlockchainReader): def last_synced_height(self) -> int: return self._last_wrote_height - async def start(self, reindex=False): - await super().start() - - def _start_cancellable(run, *args): - _flag = asyncio.Event() - self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag))) - return _flag.wait() - - self.db.open_db() - await self.db.initialize_caches() - await self.read_es_height() - await self.start_index() - self.last_state = self.db.read_db_state() - - await _start_cancellable(self.run_es_notifier) - - if reindex or self._last_wrote_height == 0 and self.db.db_height > 0: + async def reindex(self, force=False): + if force or self._last_wrote_height == 0 and self.db.db_height > 0: if self._last_wrote_height == 0: self.log.info("running initial ES indexing of rocksdb at block height %i", self.db.db_height) else: self.log.info("reindex (last wrote: %i, db height: %i)", self._last_wrote_height, self.db.db_height) - await self.reindex() - await _start_cancellable(self.refresh_blocks_forever) + await self._reindex() - async def stop(self, delete_index=False): - async with self._lock: - while self.cancellable_tasks: - t = self.cancellable_tasks.pop() - if not t.done(): - t.cancel() - if delete_index: - await self.delete_index() - await self.stop_index() - self._executor.shutdown(wait=True) - self._executor = None - self.shutdown_event.set() + def _iter_start_tasks(self): + yield self.read_es_height() + yield self.start_index() + yield self._start_cancellable(self.run_es_notifier) + yield self.reindex(force=self._force_reindex) + yield self._start_cancellable(self.refresh_blocks_forever) + + def _iter_stop_tasks(self): + yield self._stop_cancellable_tasks() + yield self.stop_index() def run(self, reindex=False): - loop = asyncio.get_event_loop() - loop.set_default_executor(self._executor) + self._force_reindex = reindex + return super().run() - def __exit(): - raise SystemExit() - try: - loop.add_signal_handler(signal.SIGINT, __exit) - loop.add_signal_handler(signal.SIGTERM, __exit) - loop.run_until_complete(self.start(reindex=reindex)) - loop.run_until_complete(self.shutdown_event.wait()) - except (SystemExit, KeyboardInterrupt): - pass - finally: - loop.run_until_complete(self.stop()) - - async def reindex(self): + async def _reindex(self): async with self._lock: self.log.info("reindexing %i claims (estimate)", self.db.prefix_db.claim_to_txo.estimate_num_keys()) await self.delete_index() diff --git a/scribe/reader/hub_server.py b/scribe/reader/hub_server.py index 681c713..b509e7b 100644 --- a/scribe/reader/hub_server.py +++ b/scribe/reader/hub_server.py @@ -1,14 +1,10 @@ -import signal import asyncio -import typing -from scribe import __version__ from scribe.blockchain.daemon import LBCDaemon from scribe.reader import BaseBlockchainReader from scribe.elasticsearch import ElasticNotifierClientProtocol from scribe.hub.session import SessionManager from scribe.hub.mempool import MemPool from scribe.hub.udp import StatusServer -from scribe.hub.prometheus import PrometheusServer class BlockchainReaderServer(BaseBlockchainReader): @@ -21,7 +17,6 @@ class BlockchainReaderServer(BaseBlockchainReader): self.mempool_notifications = set() self.status_server = StatusServer() self.daemon = LBCDaemon(env.coin, env.daemon_url) # only needed for broadcasting txs - self.prometheus_server: typing.Optional[PrometheusServer] = None self.mempool = MemPool(self.env.coin, self.db) self.session_manager = SessionManager( env, self.db, self.mempool, self.history_cache, self.resolve_cache, @@ -32,7 +27,9 @@ class BlockchainReaderServer(BaseBlockchainReader): ) self.mempool.session_manager = self.session_manager self.es_notifications = asyncio.Queue() - self.es_notification_client = ElasticNotifierClientProtocol(self.es_notifications) + self.es_notification_client = ElasticNotifierClientProtocol( + self.es_notifications, '127.0.0.1', self.env.elastic_notifier_port + ) self.synchronized = asyncio.Event() self._es_height = None self._es_block_hash = None @@ -75,9 +72,6 @@ class BlockchainReaderServer(BaseBlockchainReader): self.notifications_to_send.clear() async def receive_es_notifications(self, synchronized: asyncio.Event): - await asyncio.get_event_loop().create_connection( - lambda: self.es_notification_client, '127.0.0.1', self.env.elastic_notifier_port - ) synchronized.set() try: while True: @@ -92,71 +86,23 @@ class BlockchainReaderServer(BaseBlockchainReader): finally: self.es_notification_client.close() - async def start(self): - await super().start() - env = self.env - # min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings() - self.log.info(f'software version: {__version__}') - # self.log.info(f'supported protocol versions: {min_str}-{max_str}') - self.log.info(f'event loop policy: {env.loop_policy}') - self.log.info(f'reorg limit is {env.reorg_limit:,d} blocks') - await self.daemon.height() - - def _start_cancellable(run, *args): - _flag = asyncio.Event() - self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag))) - return _flag.wait() - - self.db.open_db() - await self.db.initialize_caches() - - self.last_state = self.db.read_db_state() - - await self.start_prometheus() + async def start_status_server(self): if self.env.udp_port and int(self.env.udp_port): await self.status_server.start( 0, bytes.fromhex(self.env.coin.GENESIS_HASH)[::-1], self.env.country, self.env.host, self.env.udp_port, self.env.allow_lan_udp ) - await _start_cancellable(self.receive_es_notifications) - await _start_cancellable(self.refresh_blocks_forever) - await self.session_manager.search_index.start() - await _start_cancellable(self.session_manager.serve, self.mempool) - async def stop(self): - await self.status_server.stop() - async with self._lock: - while self.cancellable_tasks: - t = self.cancellable_tasks.pop() - if not t.done(): - t.cancel() - await self.session_manager.search_index.stop() - self.db.close() - if self.prometheus_server: - await self.prometheus_server.stop() - self.prometheus_server = None - await self.daemon.close() - self._executor.shutdown(wait=True) - self._executor = None - self.shutdown_event.set() + def _iter_start_tasks(self): + yield self.start_status_server() + yield self._start_cancellable(self.es_notification_client.maintain_connection) + yield self._start_cancellable(self.receive_es_notifications) + yield self._start_cancellable(self.refresh_blocks_forever) + yield self.session_manager.search_index.start() + yield self._start_cancellable(self.session_manager.serve, self.mempool) - def run(self): - loop = asyncio.get_event_loop() - loop.set_default_executor(self._executor) - - def __exit(): - raise SystemExit() - try: - loop.add_signal_handler(signal.SIGINT, __exit) - loop.add_signal_handler(signal.SIGTERM, __exit) - loop.run_until_complete(self.start()) - loop.run_until_complete(self.shutdown_event.wait()) - except (SystemExit, KeyboardInterrupt): - pass - finally: - loop.run_until_complete(self.stop()) - - async def start_prometheus(self): - if not self.prometheus_server and self.env.prometheus_port: - self.prometheus_server = PrometheusServer() - await self.prometheus_server.start("0.0.0.0", self.env.prometheus_port) + def _iter_stop_tasks(self): + yield self.status_server.stop() + yield self._stop_cancellable_tasks() + yield self.session_manager.search_index.stop() + yield self.daemon.close() diff --git a/scribe/reader/interface.py b/scribe/reader/interface.py index 2e0afb0..84841e8 100644 --- a/scribe/reader/interface.py +++ b/scribe/reader/interface.py @@ -1,20 +1,51 @@ import logging import asyncio import typing +import signal from concurrent.futures.thread import ThreadPoolExecutor from prometheus_client import Gauge, Histogram -from scribe import PROMETHEUS_NAMESPACE +from scribe import PROMETHEUS_NAMESPACE, __version__ +from scribe.common import HISTOGRAM_BUCKETS from scribe.db.prefixes import DBState from scribe.db import HubDB +from scribe.reader.prometheus import PrometheusServer -HISTOGRAM_BUCKETS = ( - .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') -) NAMESPACE = f"{PROMETHEUS_NAMESPACE}_reader" -class BaseBlockchainReader: +class BlockchainReaderInterface: + async def poll_for_changes(self): + """ + Detect and handle if the db has advanced to a new block or unwound during a chain reorganization + + If a reorg is detected, this will first unwind() to the branching height and then advance() forward + to the new block(s). + """ + raise NotImplementedError() + + def clear_caches(self): + """ + Called after finished advancing, used for invalidating caches + """ + pass + + def advance(self, height: int): + """ + Called when advancing to the given block height + Callbacks that look up new values from the added block can be put here + """ + raise NotImplementedError() + + def unwind(self): + """ + Go backwards one block + + """ + raise NotImplementedError() + + +class BaseBlockchainReader(BlockchainReaderInterface): block_count_metric = Gauge( "block_count", "Number of processed blocks", namespace=NAMESPACE ) @@ -41,6 +72,7 @@ class BaseBlockchainReader: self.last_state: typing.Optional[DBState] = None self._refresh_interval = 0.1 self._lock = asyncio.Lock() + self.prometheus_server: typing.Optional[PrometheusServer] = None def _detect_changes(self): try: @@ -105,16 +137,7 @@ class BaseBlockchainReader: await asyncio.sleep(self._refresh_interval) synchronized.set() - def clear_caches(self): - """ - Called after finished advancing, used for invalidating caches - """ - pass - def advance(self, height: int): - """ - Advance to the given block height - """ tx_count = self.db.prefix_db.tx_count.get(height).tx_count assert tx_count not in self.db.tx_counts, f'boom {tx_count} in {len(self.db.tx_counts)} tx counts' assert len(self.db.tx_counts) == height, f"{len(self.db.tx_counts)} != {height}" @@ -122,14 +145,82 @@ class BaseBlockchainReader: self.db.headers.append(self.db.prefix_db.header.get(height, deserialize_value=False)) def unwind(self): - """ - Go backwards one block - """ self.db.tx_counts.pop() self.db.headers.pop() + def _start_cancellable(self, run, *args): + _flag = asyncio.Event() + self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag))) + return _flag.wait() + + def _iter_start_tasks(self): + yield self._start_cancellable(self.refresh_blocks_forever) + + def _iter_stop_tasks(self): + yield self._stop_cancellable_tasks() + + async def _stop_cancellable_tasks(self): + async with self._lock: + while self.cancellable_tasks: + t = self.cancellable_tasks.pop() + if not t.done(): + t.cancel() + async def start(self): - # TODO: make the method here useful if not self._executor: self._executor = ThreadPoolExecutor(self._thread_workers, thread_name_prefix=self._thread_prefix) self.db._executor = self._executor + + env = self.env + # min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings() + self.log.info(f'software version: {__version__}') + # self.log.info(f'supported protocol versions: {min_str}-{max_str}') + self.log.info(f'event loop policy: {env.loop_policy}') + self.log.info(f'reorg limit is {env.reorg_limit:,d} blocks') + + self.db.open_db() + self.log.info(f'initializing caches') + await self.db.initialize_caches() + self.last_state = self.db.read_db_state() + self.log.info(f'opened db at block {self.last_state.height}') + self.block_count_metric.set(self.last_state.height) + + await self.start_prometheus() + for start_task in self._iter_start_tasks(): + await start_task + self.log.info("finished starting") + + async def stop(self): + for stop_task in self._iter_stop_tasks(): + await stop_task + await self.stop_prometheus() + self.db.close() + self._executor.shutdown(wait=True) + self._executor = None + self.shutdown_event.set() + + async def start_prometheus(self): + if not self.prometheus_server and self.env.prometheus_port: + self.prometheus_server = PrometheusServer() + await self.prometheus_server.start("0.0.0.0", self.env.prometheus_port) + + async def stop_prometheus(self): + if self.prometheus_server: + await self.prometheus_server.stop() + self.prometheus_server = None + + def run(self): + loop = asyncio.get_event_loop() + loop.set_default_executor(self._executor) + + def __exit(): + raise SystemExit() + try: + loop.add_signal_handler(signal.SIGINT, __exit) + loop.add_signal_handler(signal.SIGTERM, __exit) + loop.run_until_complete(self.start()) + loop.run_until_complete(self.shutdown_event.wait()) + except (SystemExit, KeyboardInterrupt): + pass + finally: + loop.run_until_complete(self.stop())