batch spend utxos

This commit is contained in:
Jack Robison 2022-10-16 14:21:41 -04:00
parent 55eb8818ea
commit 6a9a2ad40f

View file

@ -1391,11 +1391,12 @@ class BlockchainProcessorService(BlockchainService):
self.db.prefix_db.hashX_mempool_status.stash_put((hashX,), (status,)) self.db.prefix_db.hashX_mempool_status.stash_put((hashX,), (status,))
def advance_block(self, block: Block): def advance_block(self, block: Block):
txi_count = 0
height = self.height + 1 height = self.height + 1
# print("advance ", height) # print("advance ", height)
# Use local vars for speed in the loops # Use local vars for speed in the loops
tx_count = self.tx_count tx_count = self.tx_count
spend_utxo = self.spend_utxo spend_utxos = self.spend_utxos
add_utxo = self.add_utxo add_utxo = self.add_utxo
spend_claim_or_support_txo = self._spend_claim_or_support_txo spend_claim_or_support_txo = self._spend_claim_or_support_txo
add_claim_or_support = self._add_claim_or_support add_claim_or_support = self._add_claim_or_support
@ -1418,18 +1419,17 @@ class BlockchainProcessorService(BlockchainService):
self.db.prefix_db.tx_num.stash_put(key_args=(tx_hash,), value_args=(tx_count,)) self.db.prefix_db.tx_num.stash_put(key_args=(tx_hash,), value_args=(tx_count,))
self.db.prefix_db.tx_hash.stash_put(key_args=(tx_count,), value_args=(tx_hash,)) self.db.prefix_db.tx_hash.stash_put(key_args=(tx_count,), value_args=(tx_hash,))
spent_txos = []
append_spent_txo = spent_txos.append
# Spend the inputs # Spend the inputs
txi_count += len(tx.inputs)
for txin in tx.inputs: for txin in tx.inputs:
if txin.is_generation(): if txin.is_generation():
continue continue
# spend utxo for address histories append_spent_txo((txin.prev_hash, txin.prev_idx))
hashX = spend_utxo(txin.prev_hash, txin.prev_idx) support_spent_count += spent_supports_cnt
if hashX: spend_utxos(tx_count, spent_txos)
if tx_count not in self.hashXs_by_tx[hashX]:
self.hashXs_by_tx[hashX].append(tx_count)
# spend claim/support txo
spend_claim_or_support_txo(height, txin, spent_claims)
# Add the new UTXOs # Add the new UTXOs
for nout, txout in enumerate(tx.outputs): for nout, txout in enumerate(tx.outputs):
@ -1741,31 +1741,71 @@ class BlockchainProcessorService(BlockchainService):
else: else:
return self.db.get_tx_num(tx_hash) return self.db.get_tx_num(tx_hash)
def spend_utxo(self, tx_hash: bytes, nout: int): def spend_utxos(self, tx_count: int, txis: List[Tuple[bytes, int]]):
hashX, amount = self.utxo_cache.pop((tx_hash, nout), (None, None)) tx_nums = self.db.get_tx_nums(
txin_num = self.get_pending_tx_num(tx_hash) list(
if not hashX: {tx_hash for tx_hash, nout in txis if tx_hash not in self.pending_transaction_num_mapping}
hashX_value = self.db.prefix_db.hashX_utxo.get(tx_hash[:4], txin_num, nout) )
if not hashX_value: )
return txo_hashXs = {}
hashX = hashX_value.hashX hashX_utxos_needed = {}
utxo_value = self.db.prefix_db.utxo.get(hashX, txin_num, nout) utxos_needed = {}
if not utxo_value:
self.log.warning( for tx_hash, nout in txis:
"%s:%s is not found in UTXO db for %s", hash_to_hex_str(tx_hash), nout, hash_to_hex_str(hashX) if tx_hash in self.pending_transaction_num_mapping:
) txin_num = self.pending_transaction_num_mapping[tx_hash]
raise ChainError( else:
f"{hash_to_hex_str(tx_hash)}:{nout} is not found in UTXO db for {hash_to_hex_str(hashX)}" txin_num = tx_nums[tx_hash]
) hashX, amount = self.utxo_cache.pop((tx_hash, nout), (None, None))
self.touched_hashXs.add(hashX) txo_hashXs[(tx_hash, nout)] = (hashX, amount, txin_num)
self.db.prefix_db.hashX_utxo.stage_delete((tx_hash[:4], txin_num, nout), hashX_value) hashX_utxos_needed[(tx_hash[:4], txin_num, nout)] = tx_hash, nout
self.db.prefix_db.utxo.stage_delete((hashX, txin_num, nout), utxo_value) utxos_needed[(hashX, txin_num, nout)] = tx_hash, nout
return hashX hashX_utxos = {
elif amount is not None: (tx_hash, nout): v for (tx_hash, nout), v in zip(
self.db.prefix_db.hashX_utxo.stage_delete((tx_hash[:4], txin_num, nout), (hashX,)) hashX_utxos_needed.values(), self.db.prefix_db.hashX_utxo.multi_get(list(hashX_utxos_needed.keys()))
self.db.prefix_db.utxo.stage_delete((hashX, txin_num, nout), (amount,)) ) if v is not None
self.touched_hashXs.add(hashX) }
return hashX
for (tx_hash, nout), v in hashX_utxos.items():
if tx_hash in self.pending_transaction_num_mapping:
txin_num = self.pending_transaction_num_mapping[tx_hash]
else:
txin_num = tx_nums[tx_hash]
utxos_needed[(v.hashX, txin_num, nout)] = tx_hash, nout
utxos_needed = {
(hashX, txin_num, nout): v
for (hashX, txin_num, nout), v in utxos_needed.items() if hashX is not None
}
utxos = {
(tx_hash, nout): v for (tx_hash, nout), v in zip(
utxos_needed.values(), self.db.prefix_db.utxo.multi_get(list(utxos_needed.keys()))
)
}
for (tx_hash, nout), (hashX, amount, txin_num) in txo_hashXs.items():
if not hashX:
hashX_value = hashX_utxos.get((tx_hash[:4], txin_num, nout))
if not hashX_value:
continue
hashX = hashX_value.hashX
utxo_value = utxos.get((hashX, txin_num, nout))
if not utxo_value:
self.log.warning(
"%s:%s is not found in UTXO db for %s", hash_to_hex_str(tx_hash), nout, hash_to_hex_str(hashX)
)
raise ChainError(
f"{hash_to_hex_str(tx_hash)}:{nout} is not found in UTXO db for {hash_to_hex_str(hashX)}"
)
self.touched_hashXs.add(hashX)
self.db.prefix_db.hashX_utxo.stash_delete((tx_hash[:4], txin_num, nout), hashX_value)
self.db.prefix_db.utxo.stash_delete((hashX, txin_num, nout), utxo_value)
if tx_count not in self.hashXs_by_tx[hashX]:
self.hashXs_by_tx[hashX].append(tx_count)
elif amount is not None:
self.db.prefix_db.hashX_utxo.stash_delete((tx_hash[:4], txin_num, nout), (hashX,))
self.db.prefix_db.utxo.stash_delete((hashX, txin_num, nout), (amount,))
self.touched_hashXs.add(hashX)
if tx_count not in self.hashXs_by_tx[hashX]:
self.hashXs_by_tx[hashX].append(tx_count)
async def process_blocks_and_mempool_forever(self, caught_up_event): async def process_blocks_and_mempool_forever(self, caught_up_event):
"""Loop forever processing blocks as they arrive.""" """Loop forever processing blocks as they arrive."""