diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index bb233e2d5..a9aa1e7e8 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -20,6 +20,7 @@ from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN from lbry.wallet.server.util import chunks, class_logger from lbry.crypto.hash import hash160 from lbry.wallet.server.mempool import MemPool +from lbry.wallet.server.db.common import TrendingNotification from lbry.wallet.server.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE from lbry.wallet.server.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue from lbry.wallet.server.udp import StatusServer @@ -28,12 +29,6 @@ if typing.TYPE_CHECKING: from lbry.wallet.server.leveldb import LevelDB -class TrendingNotification(NamedTuple): - height: int - prev_amount: int - new_amount: int - - class Prefetcher: """Prefetches blocks (in the forward direction only).""" @@ -326,6 +321,22 @@ class BlockProcessor: return await asyncio.get_event_loop().run_in_executor(self._chain_executor, func, *args) return await asyncio.shield(run_in_thread()) + async def check_mempool(self): + if self.db.prefix_db.closed: + return + current_mempool = { + k.tx_hash: v.raw_tx for (k, v) in self.db.prefix_db.mempool_tx.iterate() + } + for hh in await self.daemon.mempool_hashes(): + tx_hash = bytes.fromhex(hh)[::-1] + if tx_hash in current_mempool: + current_mempool.pop(tx_hash) + else: + raw_tx = bytes.fromhex(await self.daemon.getrawtransaction(hh)) + self.db.prefix_db.mempool_tx.stage_put((tx_hash,), (raw_tx,)) + for tx_hash, raw_tx in current_mempool.items(): + self.db.prefix_db.mempool_tx.stage_delete((tx_hash,), (raw_tx,)) + async def check_and_advance_blocks(self, raw_blocks): """Process the list of raw blocks passed. Detects and handles reorgs. @@ -1413,8 +1424,8 @@ class BlockProcessor: or touched in self.pending_support_amount_change: # exclude sending notifications for claims/supports that activated but # weren't added/spent in this block - self._add_claim_activation_change_notification( - touched.hex(), height, prev_effective_amount, new_effective_amount + self.db.prefix_db.trending_notification.stage_put( + (height, touched), (prev_effective_amount, new_effective_amount) ) for channel_hash, count in self.pending_channel_counts.items(): @@ -1454,6 +1465,12 @@ class BlockProcessor: spent_claims = {} txos = Transaction(tx.raw).outputs + # clean up mempool, delete txs that were already in mempool/staged to be added + # leave txs in mempool that werent in the block + mempool_tx = self.db.prefix_db.mempool_tx.get_pending(tx_hash) + if mempool_tx: + self.db.prefix_db.mempool_tx.stage_delete((tx_hash,), mempool_tx) + self.db.prefix_db.tx.stage_put(key_args=(tx_hash,), value_args=(tx.raw,)) self.db.prefix_db.tx_num.stage_put(key_args=(tx_hash,), value_args=(tx_count,)) self.db.prefix_db.tx_hash.stage_put(key_args=(tx_count,), value_args=(tx_hash,)) @@ -1513,6 +1530,8 @@ class BlockProcessor: # update effective amount and update sets of touched and deleted claims self._get_cumulative_update_ops(height) + self.db.prefix_db.touched_hashX.stage_put((height,), (list(sorted(self.touched_hashXs)),)) + self.db.prefix_db.tx_count.stage_put(key_args=(height,), value_args=(tx_count,)) for hashX, new_history in self.hashXs_by_tx.items(): @@ -1594,6 +1613,7 @@ class BlockProcessor: self.pending_support_amount_change.clear() self.resolve_cache.clear() self.resolve_outputs_cache.clear() + self.touched_hashXs.clear() async def backup_block(self): assert len(self.db.prefix_db._op_stack) == 0 diff --git a/lbry/wallet/server/db/__init__.py b/lbry/wallet/server/db/__init__.py index 7da046edc..8800eb983 100644 --- a/lbry/wallet/server/db/__init__.py +++ b/lbry/wallet/server/db/__init__.py @@ -40,3 +40,6 @@ class DB_PREFIXES(enum.Enum): channel_count = b'Z' support_amount = b'a' block_txs = b'b' + trending_notifications = b'c' + mempool_tx = b'd' + touched_hashX = b'e' diff --git a/lbry/wallet/server/db/common.py b/lbry/wallet/server/db/common.py index dce98711d..22450ae4e 100644 --- a/lbry/wallet/server/db/common.py +++ b/lbry/wallet/server/db/common.py @@ -445,3 +445,9 @@ class ResolveResult(typing.NamedTuple): channel_hash: typing.Optional[bytes] reposted_claim_hash: typing.Optional[bytes] signature_valid: typing.Optional[bool] + + +class TrendingNotification(typing.NamedTuple): + height: int + prev_amount: int + new_amount: int diff --git a/lbry/wallet/server/db/prefixes.py b/lbry/wallet/server/db/prefixes.py index e51313097..39a3f6266 100644 --- a/lbry/wallet/server/db/prefixes.py +++ b/lbry/wallet/server/db/prefixes.py @@ -5,6 +5,7 @@ import base64 from typing import Union, Tuple, NamedTuple, Optional from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db.db import RocksDBStore, PrefixDB +from lbry.wallet.server.db.common import TrendingNotification from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete from lbry.schema.url import normalize_name @@ -230,7 +231,7 @@ class TxValue(NamedTuple): raw_tx: bytes def __str__(self): - return f"{self.__class__.__name__}(raw_tx={base64.b64encode(self.raw_tx)})" + return f"{self.__class__.__name__}(raw_tx={base64.b64encode(self.raw_tx).decode()})" class BlockHeaderKey(NamedTuple): @@ -1595,6 +1596,124 @@ class BlockTxsPrefixRow(PrefixRow): return cls.pack_key(height), cls.pack_value(tx_hashes) +class MempoolTxKey(TxKey): + pass + + +class MempoolTxValue(TxValue): + pass + + +class MempoolTXPrefixRow(PrefixRow): + prefix = DB_PREFIXES.mempool_tx.value + key_struct = struct.Struct(b'>32s') + + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>32s').pack + ] + + @classmethod + def pack_key(cls, tx_hash: bytes) -> bytes: + return super().pack_key(tx_hash) + + @classmethod + def unpack_key(cls, tx_hash: bytes) -> MempoolTxKey: + return MempoolTxKey(*super().unpack_key(tx_hash)) + + @classmethod + def pack_value(cls, tx: bytes) -> bytes: + return tx + + @classmethod + def unpack_value(cls, data: bytes) -> MempoolTxValue: + return MempoolTxValue(data) + + @classmethod + def pack_item(cls, tx_hash: bytes, raw_tx: bytes): + return cls.pack_key(tx_hash), cls.pack_value(raw_tx) + + +class TrendingNotificationKey(typing.NamedTuple): + height: int + claim_hash: bytes + + +class TrendingNotificationValue(typing.NamedTuple): + previous_amount: int + new_amount: int + + +class TrendingNotificationPrefixRow(PrefixRow): + prefix = DB_PREFIXES.trending_notifications.value + key_struct = struct.Struct(b'>L20s') + value_struct = struct.Struct(b'>QQ') + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>L').pack, + struct.Struct(b'>L20s').pack + ] + + @classmethod + def pack_key(cls, height: int, claim_hash: bytes): + return super().pack_key(height, claim_hash) + + @classmethod + def unpack_key(cls, key: bytes) -> TrendingNotificationKey: + return TrendingNotificationKey(*super().unpack_key(key)) + + @classmethod + def pack_value(cls, previous_amount: int, new_amount: int) -> bytes: + return super().pack_value(previous_amount, new_amount) + + @classmethod + def unpack_value(cls, data: bytes) -> TrendingNotificationValue: + return TrendingNotificationValue(*super().unpack_value(data)) + + @classmethod + def pack_item(cls, height, claim_hash, previous_amount, new_amount): + return cls.pack_key(height, claim_hash), cls.pack_value(previous_amount, new_amount) + + +class TouchedHashXKey(NamedTuple): + height: int + + +class TouchedHashXValue(NamedTuple): + touched_hashXs: typing.List[bytes] + + +class TouchedHashXPrefixRow(PrefixRow): + prefix = DB_PREFIXES.touched_hashX.value + key_struct = struct.Struct(b'>L') + + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>L').pack + ] + + @classmethod + def pack_key(cls, height: int): + return super().pack_key(height) + + @classmethod + def unpack_key(cls, key: bytes) -> TouchedHashXKey: + return TouchedHashXKey(*super().unpack_key(key)) + + @classmethod + def pack_value(cls, touched: typing.List[bytes]) -> bytes: + assert all(map(lambda item: len(item) == 11, touched)) + return b''.join(touched) + + @classmethod + def unpack_value(cls, data: bytes) -> TouchedHashXValue: + return TouchedHashXValue([data[idx*11:(idx*11)+11] for idx in range(len(data) // 11)]) + + @classmethod + def pack_item(cls, height: int, touched: typing.List[bytes]): + return cls.pack_key(height), cls.pack_value(touched) + + class HubDB(PrefixDB): def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 512, secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None): @@ -1630,6 +1749,9 @@ class HubDB(PrefixDB): self.db_state = DBStatePrefixRow(db, self._op_stack) self.support_amount = SupportAmountPrefixRow(db, self._op_stack) self.block_txs = BlockTxsPrefixRow(db, self._op_stack) + self.mempool_tx = MempoolTXPrefixRow(db, self._op_stack) + self.trending_notification = TrendingNotificationPrefixRow(db, self._op_stack) + self.touched_hashX = TouchedHashXPrefixRow(db, self._op_stack) def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]: