From 2318e6d8e92110c608a3bbf497da8b29776d5a09 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 10 Dec 2020 19:26:34 -0500 Subject: [PATCH 1/5] faster fs_transactions --- lbry/wallet/server/leveldb.py | 123 +++++++++++----------------------- 1 file changed, 40 insertions(+), 83 deletions(-) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index f06517edb..50ac57a96 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -94,8 +94,7 @@ class LevelDB: self.headers_db = None self.tx_db = None - self._block_txs_cache = pylru.lrucache(50000) - self._merkle_tx_cache = pylru.lrucache(100000) + self._tx_and_merkle_cache = pylru.lrucache(100000) self.total_transactions = None async def _read_tx_counts(self): @@ -147,7 +146,7 @@ class LevelDB: async def _open_dbs(self, for_sync, compacting): if self.executor is None: - self.executor = ThreadPoolExecutor(max(1, os.cpu_count() - 1)) + self.executor = ThreadPoolExecutor(1) coin_path = os.path.join(self.env.db_dir, 'COIN') if not os.path.isfile(coin_path): with util.open_file(coin_path, create=True) as f: @@ -470,76 +469,52 @@ class LevelDB: return None, tx_height return self.total_transactions[tx_num], tx_height - async def tx_merkle(self, tx_num, tx_height): - if tx_height == -1: - return { - 'block_height': -1 - } - tx_counts = self.tx_counts - tx_pos = tx_num - tx_counts[tx_height - 1] - - def _update_block_txs_cache(): - block_txs = list(self.tx_db.iterator( - start=TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height - 1]), - stop=None if tx_height + 1 == len(tx_counts) else - TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height]), include_key=False - )) - if tx_height + 100 > self.db_height: - return block_txs - self._block_txs_cache[tx_height] = block_txs - - uncached = None - if (tx_num, tx_height) in self._merkle_tx_cache: - return self._merkle_tx_cache[(tx_num, tx_height)] - if tx_height not in self._block_txs_cache: - uncached = await asyncio.get_event_loop().run_in_executor(self.executor, _update_block_txs_cache) - block_txs = self._block_txs_cache.get(tx_height, uncached) - branch, root = self.merkle.branch_and_root(block_txs, tx_pos) - merkle = { - 'block_height': tx_height, - 'merkle': [ - hash_to_hex_str(hash) - for hash in branch - ], - 'pos': tx_pos - } - if tx_height + 100 < self.db_height: - self._merkle_tx_cache[(tx_num, tx_height)] = merkle - return merkle - - def _fs_transactions(self, txids: Iterable[str]) -> List[Tuple[str, Optional[str], int, int]]: + def _fs_transactions(self, txids: Iterable[str]): unpack_be_uint64 = util.unpack_be_uint64 tx_counts = self.tx_counts tx_db_get = self.tx_db.get - tx_infos = [] + tx_cache = self._tx_and_merkle_cache + + tx_infos = {} for tx_hash in txids: - tx_hash_bytes = bytes.fromhex(tx_hash)[::-1] - tx_num = tx_db_get(TX_NUM_PREFIX + tx_hash_bytes) - tx = None - tx_height = -1 - if tx_num is not None: - tx_num = unpack_be_uint64(tx_num) - tx_height = bisect_right(tx_counts, tx_num) - if tx_height < self.db_height: - tx = tx_db_get(TX_PREFIX + tx_hash_bytes) - tx_infos.append((tx_hash, None if not tx else tx.hex(), tx_num, tx_height)) - + cached_tx = tx_cache.get(tx_hash) + if cached_tx: + tx, merkle = cached_tx + else: + tx_hash_bytes = bytes.fromhex(tx_hash)[::-1] + tx_num = tx_db_get(TX_NUM_PREFIX + tx_hash_bytes) + tx = None + tx_height = -1 + if tx_num is not None: + tx_num = unpack_be_uint64(tx_num) + tx_height = bisect_right(tx_counts, tx_num) + if tx_height < self.db_height: + tx = tx_db_get(TX_PREFIX + tx_hash_bytes) + if tx_height == -1: + merkle = { + 'block_height': -1 + } + else: + tx_pos = tx_num - tx_counts[tx_height - 1] + branch, root = self.merkle.branch_and_root( + self.total_transactions[tx_counts[tx_height - 1]:tx_counts[tx_height]], tx_pos + ) + merkle = { + 'block_height': tx_height, + 'merkle': [ + hash_to_hex_str(hash) + for hash in branch + ], + 'pos': tx_pos + } + if tx_height + 10 < self.db_height: + tx_cache[tx_hash] = tx, merkle + tx_infos[tx_hash] = (None if not tx else tx.hex(), merkle) return tx_infos async def fs_transactions(self, txids): - txs = await asyncio.get_event_loop().run_in_executor( - self.executor, self._fs_transactions, txids - ) - unsorted_result = {} - - async def add_result(item): - _txid, _tx, _tx_num, _tx_height = item - unsorted_result[_txid] = (_tx, await self.tx_merkle(_tx_num, _tx_height)) - - if txs: - await asyncio.gather(*map(add_result, txs)) - return {txid: unsorted_result[txid] for txid, _, _, _ in txs} + return await asyncio.get_event_loop().run_in_executor(self.executor, self._fs_transactions, txids) async def fs_block_hashes(self, height, count): if height + count > len(self.headers): @@ -553,28 +528,10 @@ class LevelDB: transactions. By default returns at most 1000 entries. Set limit to None to get them all. """ - # def read_history(): - # hashx_history = [] - # for key, hist in self.history.db.iterator(prefix=hashX): - # a = array.array('I') - # a.frombytes(hist) - # for tx_num in a: - # tx_height = bisect_right(self.tx_counts, tx_num) - # if tx_height > self.db_height: - # tx_hash = None - # else: - # tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) - # - # hashx_history.append((tx_hash, tx_height)) - # if limit and len(hashx_history) >= limit: - # return hashx_history - # return hashx_history def read_history(): db_height = self.db_height tx_counts = self.tx_counts - tx_db_get = self.tx_db.get - pack_be_uint64 = util.pack_be_uint64 cnt = 0 txs = [] From 751cc4c44d0b9c60679d8f254e5dec391e600624 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sun, 13 Dec 2020 13:48:50 -0500 Subject: [PATCH 2/5] don't deserialize mempool in a thread --- lbry/wallet/server/mempool.py | 58 ++++++++++++++--------------------- 1 file changed, 23 insertions(+), 35 deletions(-) diff --git a/lbry/wallet/server/mempool.py b/lbry/wallet/server/mempool.py index 4a0a80f30..3ef6c5628 100644 --- a/lbry/wallet/server/mempool.py +++ b/lbry/wallet/server/mempool.py @@ -6,13 +6,11 @@ # and warranty status of this software. """Mempool handling.""" -import os import asyncio import itertools import time from abc import ABC, abstractmethod from collections import defaultdict -from concurrent.futures.thread import ThreadPoolExecutor from prometheus_client import Histogram import attr @@ -117,7 +115,6 @@ class MemPool: # Prevents mempool refreshes during fee histogram calculation self.lock = asyncio.Lock() self.wakeup = asyncio.Event() - self.executor = ThreadPoolExecutor(max(os.cpu_count() - 1, 1)) self.mempool_process_time_metric = mempool_process_time_metric async def _logging(self, synchronized_event): @@ -135,15 +132,11 @@ class MemPool: await synchronized_event.wait() async def _refresh_histogram(self, synchronized_event): - try: - while True: - await synchronized_event.wait() - async with self.lock: - # Threaded as can be expensive - await asyncio.get_event_loop().run_in_executor(self.executor, self._update_histogram, 100_000) - await asyncio.sleep(self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS) - finally: - self.executor.shutdown(wait=True) + while True: + await synchronized_event.wait() + async with self.lock: + self._update_histogram(100_000) + await asyncio.sleep(self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS) def _update_histogram(self, bin_size): # Build a histogram by fee rate @@ -286,30 +279,25 @@ class MemPool: hex_hashes_iter = (hash_to_hex_str(hash) for hash in hashes) raw_txs = await self.api.raw_transactions(hex_hashes_iter) - def deserialize_txs(): # This function is pure - to_hashX = self.coin.hashX_from_script - deserializer = self.coin.DESERIALIZER + to_hashX = self.coin.hashX_from_script + deserializer = self.coin.DESERIALIZER - txs = {} - for hash, raw_tx in zip(hashes, raw_txs): - # The daemon may have evicted the tx from its - # mempool or it may have gotten in a block - if not raw_tx: - continue - tx, tx_size = deserializer(raw_tx).read_tx_and_vsize() - # Convert the inputs and outputs into (hashX, value) pairs - # Drop generation-like inputs from MemPoolTx.prevouts - txin_pairs = tuple((txin.prev_hash, txin.prev_idx) - for txin in tx.inputs - if not txin.is_generation()) - txout_pairs = tuple((to_hashX(txout.pk_script), txout.value) - for txout in tx.outputs) - txs[hash] = MemPoolTx(txin_pairs, None, txout_pairs, - 0, tx_size) - return txs - - # Thread this potentially slow operation so as not to block - tx_map = await asyncio.get_event_loop().run_in_executor(self.executor, deserialize_txs) + tx_map = {} + for hash, raw_tx in zip(hashes, raw_txs): + # The daemon may have evicted the tx from its + # mempool or it may have gotten in a block + if not raw_tx: + continue + tx, tx_size = deserializer(raw_tx).read_tx_and_vsize() + # Convert the inputs and outputs into (hashX, value) pairs + # Drop generation-like inputs from MemPoolTx.prevouts + txin_pairs = tuple((txin.prev_hash, txin.prev_idx) + for txin in tx.inputs + if not txin.is_generation()) + txout_pairs = tuple((to_hashX(txout.pk_script), txout.value) + for txout in tx.outputs) + tx_map[hash] = MemPoolTx(txin_pairs, None, txout_pairs, + 0, tx_size) # Determine all prevouts not in the mempool, and fetch the # UTXO information from the database. Failed prevout lookups From 20dad7f07f0d18f035ca7e87e2a344db17a3ce41 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 14 Dec 2020 13:42:20 -0500 Subject: [PATCH 3/5] only notify hashxs touched since last notification --- lbry/wallet/server/block_processor.py | 1 + lbry/wallet/server/mempool.py | 17 ++++++++++++----- lbry/wallet/server/server.py | 18 +++++++++++------- lbry/wallet/server/session.py | 5 +++-- 4 files changed, 27 insertions(+), 14 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 26eaad00e..e7076a5a7 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -216,6 +216,7 @@ class BlockProcessor: for cache in self.search_cache.values(): cache.clear() self.history_cache.clear() + self.notifications.notified_mempool_txs.clear() await self._maybe_flush() processed_time = time.perf_counter() - start self.block_count_metric.set(self.height) diff --git a/lbry/wallet/server/mempool.py b/lbry/wallet/server/mempool.py index 3ef6c5628..4ebc22544 100644 --- a/lbry/wallet/server/mempool.py +++ b/lbry/wallet/server/mempool.py @@ -72,7 +72,7 @@ class MemPoolAPI(ABC): """ @abstractmethod - async def on_mempool(self, touched, height): + async def on_mempool(self, touched, new_touched, height): """Called each time the mempool is synchronized. touched is a set of hashXs touched since the previous call. height is the daemon's height at the time the mempool was obtained.""" @@ -116,6 +116,7 @@ class MemPool: self.lock = asyncio.Lock() self.wakeup = asyncio.Event() self.mempool_process_time_metric = mempool_process_time_metric + self.notified_mempool_txs = set() async def _logging(self, synchronized_event): """Print regular logs of mempool stats.""" @@ -219,10 +220,15 @@ class MemPool: continue hashes = {hex_str_to_hash(hh) for hh in hex_hashes} async with self.lock: + new_hashes = hashes.difference(self.notified_mempool_txs) touched = await self._process_mempool(hashes) + self.notified_mempool_txs.update(new_hashes) + new_touched = { + touched_hashx for touched_hashx, txs in self.hashXs.items() if txs.intersection(new_hashes) + } synchronized_event.set() synchronized_event.clear() - await self.api.on_mempool(touched, height) + await self.api.on_mempool(touched, new_touched, height) duration = time.perf_counter() - start self.mempool_process_time_metric.observe(duration) try: @@ -236,7 +242,8 @@ class MemPool: async def _process_mempool(self, all_hashes): # Re-sync with the new set of hashes txs = self.txs - hashXs = self.hashXs + + hashXs = self.hashXs # hashX: [tx_hash, ...] touched = set() # First handle txs that have disappeared @@ -267,8 +274,8 @@ class MemPool: # FIXME: this is not particularly efficient while tx_map and len(tx_map) != prior_count: prior_count = len(tx_map) - tx_map, utxo_map = self._accept_transactions(tx_map, utxo_map, - touched) + tx_map, utxo_map = self._accept_transactions(tx_map, utxo_map, touched) + if tx_map: self.logger.info(f'{len(tx_map)} txs dropped') diff --git a/lbry/wallet/server/server.py b/lbry/wallet/server/server.py index cca84c852..56b8ffb9b 100644 --- a/lbry/wallet/server/server.py +++ b/lbry/wallet/server/server.py @@ -25,9 +25,10 @@ class Notifications: def __init__(self): self._touched_mp = {} self._touched_bp = {} + self.notified_mempool_txs = set() self._highest_block = -1 - async def _maybe_notify(self): + async def _maybe_notify(self, new_touched): tmp, tbp = self._touched_mp, self._touched_bp common = set(tmp).intersection(tbp) if common: @@ -44,24 +45,24 @@ class Notifications: del tmp[old] for old in [h for h in tbp if h <= height]: touched.update(tbp.pop(old)) - await self.notify(height, touched) + await self.notify(height, touched, new_touched) - async def notify(self, height, touched): + async def notify(self, height, touched, new_touched): pass async def start(self, height, notify_func): self._highest_block = height self.notify = notify_func - await self.notify(height, set()) + await self.notify(height, set(), set()) - async def on_mempool(self, touched, height): + async def on_mempool(self, touched, new_touched, height): self._touched_mp[height] = touched - await self._maybe_notify() + await self._maybe_notify(new_touched) async def on_block(self, touched, height): self._touched_bp[height] = touched self._highest_block = height - await self._maybe_notify() + await self._maybe_notify(set()) class Server: @@ -84,9 +85,12 @@ class Server: notifications.mempool_hashes = daemon.mempool_hashes notifications.raw_transactions = daemon.getrawtransactions notifications.lookup_utxos = db.lookup_utxos + MemPoolAPI.register(Notifications) self.mempool = mempool = MemPool(env.coin, notifications) + notifications.notified_mempool_txs = self.mempool.notified_mempool_txs + self.session_mgr = env.coin.SESSION_MANAGER( env, db, bp, daemon, mempool, self.shutdown_event ) diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 0a996d68e..6009009b2 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -635,7 +635,7 @@ class SessionManager: self.history_cache[hashX] = await self.db.limited_history(hashX, limit=limit) return self.history_cache[hashX] - async def _notify_sessions(self, height, touched): + async def _notify_sessions(self, height, touched, new_touched): """Notify sessions about height changes and touched addresses.""" height_changed = height != self.notified_height if height_changed: @@ -660,7 +660,8 @@ class SessionManager: if touched or (height_changed and self.mempool_statuses): notified_hashxs = 0 notified_sessions = 0 - for hashX in touched.union(self.mempool_statuses.keys()): + to_notify = touched.union(self.mempool_statuses.keys() if height_changed else new_touched.intersection(self.mempool_statuses.keys())) + for hashX in to_notify: for session_id in self.hashx_subscriptions_by_session[hashX]: asyncio.create_task(self.sessions[session_id].send_history_notification(hashX)) notified_sessions += 1 From 8dfa2767ec0a464fbbee8282bd14fda93b7705b9 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 14 Dec 2020 13:52:26 -0500 Subject: [PATCH 4/5] new_touched --- lbry/wallet/server/session.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 6009009b2..4a415ba38 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -660,7 +660,7 @@ class SessionManager: if touched or (height_changed and self.mempool_statuses): notified_hashxs = 0 notified_sessions = 0 - to_notify = touched.union(self.mempool_statuses.keys() if height_changed else new_touched.intersection(self.mempool_statuses.keys())) + to_notify = touched if height_changed else new_touched for hashX in to_notify: for session_id in self.hashx_subscriptions_by_session[hashX]: asyncio.create_task(self.sessions[session_id].send_history_notification(hashX)) From 674ce02e58d252f9a7a4af473f1609f02aac439b Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 14 Dec 2020 14:38:36 -0500 Subject: [PATCH 5/5] logging --- lbry/wallet/server/session.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 4a415ba38..6f9808497 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -666,7 +666,8 @@ class SessionManager: asyncio.create_task(self.sessions[session_id].send_history_notification(hashX)) notified_sessions += 1 notified_hashxs += 1 - self.logger.info(f'notified {notified_sessions} sessions/{notified_hashxs:,d} touched addresses') + if notified_sessions: + self.logger.info(f'notified {notified_sessions} sessions/{notified_hashxs:,d} touched addresses') def add_session(self, session): self.sessions[id(session)] = session