faster fs_transactions

This commit is contained in:
Jack Robison 2020-12-10 19:26:34 -05:00
parent 61b4a492c3
commit 2318e6d8e9
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -94,8 +94,7 @@ class LevelDB:
self.headers_db = None self.headers_db = None
self.tx_db = None self.tx_db = None
self._block_txs_cache = pylru.lrucache(50000) self._tx_and_merkle_cache = pylru.lrucache(100000)
self._merkle_tx_cache = pylru.lrucache(100000)
self.total_transactions = None self.total_transactions = None
async def _read_tx_counts(self): async def _read_tx_counts(self):
@ -147,7 +146,7 @@ class LevelDB:
async def _open_dbs(self, for_sync, compacting): async def _open_dbs(self, for_sync, compacting):
if self.executor is None: if self.executor is None:
self.executor = ThreadPoolExecutor(max(1, os.cpu_count() - 1)) self.executor = ThreadPoolExecutor(1)
coin_path = os.path.join(self.env.db_dir, 'COIN') coin_path = os.path.join(self.env.db_dir, 'COIN')
if not os.path.isfile(coin_path): if not os.path.isfile(coin_path):
with util.open_file(coin_path, create=True) as f: with util.open_file(coin_path, create=True) as f:
@ -470,76 +469,52 @@ class LevelDB:
return None, tx_height return None, tx_height
return self.total_transactions[tx_num], tx_height return self.total_transactions[tx_num], tx_height
async def tx_merkle(self, tx_num, tx_height): def _fs_transactions(self, txids: Iterable[str]):
if tx_height == -1:
return {
'block_height': -1
}
tx_counts = self.tx_counts
tx_pos = tx_num - tx_counts[tx_height - 1]
def _update_block_txs_cache():
block_txs = list(self.tx_db.iterator(
start=TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height - 1]),
stop=None if tx_height + 1 == len(tx_counts) else
TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height]), include_key=False
))
if tx_height + 100 > self.db_height:
return block_txs
self._block_txs_cache[tx_height] = block_txs
uncached = None
if (tx_num, tx_height) in self._merkle_tx_cache:
return self._merkle_tx_cache[(tx_num, tx_height)]
if tx_height not in self._block_txs_cache:
uncached = await asyncio.get_event_loop().run_in_executor(self.executor, _update_block_txs_cache)
block_txs = self._block_txs_cache.get(tx_height, uncached)
branch, root = self.merkle.branch_and_root(block_txs, tx_pos)
merkle = {
'block_height': tx_height,
'merkle': [
hash_to_hex_str(hash)
for hash in branch
],
'pos': tx_pos
}
if tx_height + 100 < self.db_height:
self._merkle_tx_cache[(tx_num, tx_height)] = merkle
return merkle
def _fs_transactions(self, txids: Iterable[str]) -> List[Tuple[str, Optional[str], int, int]]:
unpack_be_uint64 = util.unpack_be_uint64 unpack_be_uint64 = util.unpack_be_uint64
tx_counts = self.tx_counts tx_counts = self.tx_counts
tx_db_get = self.tx_db.get tx_db_get = self.tx_db.get
tx_infos = [] tx_cache = self._tx_and_merkle_cache
tx_infos = {}
for tx_hash in txids: for tx_hash in txids:
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1] cached_tx = tx_cache.get(tx_hash)
tx_num = tx_db_get(TX_NUM_PREFIX + tx_hash_bytes) if cached_tx:
tx = None tx, merkle = cached_tx
tx_height = -1 else:
if tx_num is not None: tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
tx_num = unpack_be_uint64(tx_num) tx_num = tx_db_get(TX_NUM_PREFIX + tx_hash_bytes)
tx_height = bisect_right(tx_counts, tx_num) tx = None
if tx_height < self.db_height: tx_height = -1
tx = tx_db_get(TX_PREFIX + tx_hash_bytes) if tx_num is not None:
tx_infos.append((tx_hash, None if not tx else tx.hex(), tx_num, tx_height)) tx_num = unpack_be_uint64(tx_num)
tx_height = bisect_right(tx_counts, tx_num)
if tx_height < self.db_height:
tx = tx_db_get(TX_PREFIX + tx_hash_bytes)
if tx_height == -1:
merkle = {
'block_height': -1
}
else:
tx_pos = tx_num - tx_counts[tx_height - 1]
branch, root = self.merkle.branch_and_root(
self.total_transactions[tx_counts[tx_height - 1]:tx_counts[tx_height]], tx_pos
)
merkle = {
'block_height': tx_height,
'merkle': [
hash_to_hex_str(hash)
for hash in branch
],
'pos': tx_pos
}
if tx_height + 10 < self.db_height:
tx_cache[tx_hash] = tx, merkle
tx_infos[tx_hash] = (None if not tx else tx.hex(), merkle)
return tx_infos return tx_infos
async def fs_transactions(self, txids): async def fs_transactions(self, txids):
txs = await asyncio.get_event_loop().run_in_executor( return await asyncio.get_event_loop().run_in_executor(self.executor, self._fs_transactions, txids)
self.executor, self._fs_transactions, txids
)
unsorted_result = {}
async def add_result(item):
_txid, _tx, _tx_num, _tx_height = item
unsorted_result[_txid] = (_tx, await self.tx_merkle(_tx_num, _tx_height))
if txs:
await asyncio.gather(*map(add_result, txs))
return {txid: unsorted_result[txid] for txid, _, _, _ in txs}
async def fs_block_hashes(self, height, count): async def fs_block_hashes(self, height, count):
if height + count > len(self.headers): if height + count > len(self.headers):
@ -553,28 +528,10 @@ class LevelDB:
transactions. By default returns at most 1000 entries. Set transactions. By default returns at most 1000 entries. Set
limit to None to get them all. limit to None to get them all.
""" """
# def read_history():
# hashx_history = []
# for key, hist in self.history.db.iterator(prefix=hashX):
# a = array.array('I')
# a.frombytes(hist)
# for tx_num in a:
# tx_height = bisect_right(self.tx_counts, tx_num)
# if tx_height > self.db_height:
# tx_hash = None
# else:
# tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num))
#
# hashx_history.append((tx_hash, tx_height))
# if limit and len(hashx_history) >= limit:
# return hashx_history
# return hashx_history
def read_history(): def read_history():
db_height = self.db_height db_height = self.db_height
tx_counts = self.tx_counts tx_counts = self.tx_counts
tx_db_get = self.tx_db.get
pack_be_uint64 = util.pack_be_uint64
cnt = 0 cnt = 0
txs = [] txs = []