diff --git a/scribe/blockchain/service.py b/scribe/blockchain/service.py index baa63ca..4cafeef 100644 --- a/scribe/blockchain/service.py +++ b/scribe/blockchain/service.py @@ -169,8 +169,9 @@ class BlockchainProcessorService(BlockchainService): async def refresh_mempool(self): def fetch_mempool(mempool_prefix): + lower, upper = mempool_prefix.MIN_TX_HASH, mempool_prefix.MAX_TX_HASH return { - k.tx_hash: v.raw_tx for (k, v) in mempool_prefix.iterate() + k.tx_hash: v.raw_tx for (k, v) in mempool_prefix.iterate(start=(lower,), stop=(upper,)) } def update_mempool(unsafe_commit, mempool_prefix, to_put, to_delete): diff --git a/scribe/db/prefixes.py b/scribe/db/prefixes.py index 6543330..09ae90e 100644 --- a/scribe/db/prefixes.py +++ b/scribe/db/prefixes.py @@ -1513,6 +1513,9 @@ class MempoolTXPrefixRow(PrefixRow): prefix = DB_PREFIXES.mempool_tx.value key_struct = struct.Struct(b'>32s') + MAX_TX_HASH = b'\xff' * 32 + MIN_TX_HASH = b'\x00' * 32 + key_part_lambdas = [ lambda: b'', struct.Struct(b'>32s').pack diff --git a/scribe/hub/mempool.py b/scribe/hub/mempool.py index eb32224..0a0cc34 100644 --- a/scribe/hub/mempool.py +++ b/scribe/hub/mempool.py @@ -55,17 +55,20 @@ class MemPool: def refresh(self) -> typing.Set[bytes]: # returns list of new touched hashXs prefix_db = self._db.prefix_db + mempool_tx_hashes = set() try: - new_mempool = {k.tx_hash: v.raw_tx for k, v in prefix_db.mempool_tx.iterate()} # TODO: make this more efficient + lower, upper = prefix_db.mempool_tx.MIN_TX_HASH, prefix_db.mempool_tx.MAX_TX_HASH + for k, v in prefix_db.mempool_tx.iterate(start=(lower,), stop=(upper,)): + self.raw_mempool[k.tx_hash] = v.raw_tx + mempool_tx_hashes.add(k.tx_hash) + for removed_mempool_tx in set(self.raw_mempool.keys()).difference(mempool_tx_hashes): + self.raw_mempool.pop(removed_mempool_tx) except rocksdb.errors.RocksIOError as err: # FIXME: why does this happen? can it happen elsewhere? if err.args[0].startswith(b'IO error: No such file or directory: While open a file for random read:'): self.logger.error("failed to process mempool, retrying later") return set() raise err - else: - self.raw_mempool.clear() - self.raw_mempool.update(new_mempool) # hashXs = self.hashXs # hashX: [tx_hash, ...] touched_hashXs = set()