diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 77666e509..1676b01f1 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -722,7 +722,7 @@ class Ledger(metaclass=LedgerRegistry): if cache_item is None: cache_item = TransactionCacheItem() self._tx_cache[txid] = cache_item - tx = cache_item.tx or Transaction(unhexlify(raw), height=remote_height) + tx = cache_item.tx or Transaction(bytes.fromhex(raw), height=remote_height) tx.height = remote_height cache_item.tx = tx if 'merkle' in merkle and remote_heights[txid] > 0: diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 7fe0ae059..bfd70ec50 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -171,7 +171,8 @@ class BlockProcessor: # Caches of unflushed items. self.headers = [] - self.tx_hashes = [] + self.block_hashes = [] + self.block_txs = [] self.undo_infos = [] # UTXO cache @@ -336,8 +337,8 @@ class BlockProcessor: def flush_data(self): """The data for a flush. The lock must be taken.""" assert self.state_lock.locked() - return FlushData(self.height, self.tx_count, self.headers, - self.tx_hashes, self.undo_infos, self.utxo_cache, + return FlushData(self.height, self.tx_count, self.headers, self.block_hashes, + self.block_txs, self.undo_infos, self.utxo_cache, self.db_deletes, self.tip) async def flush(self, flush_utxos): @@ -392,7 +393,8 @@ class BlockProcessor: for block in blocks: height += 1 undo_info = self.advance_txs( - height, block.transactions, self.coin.electrum_header(block.header, height) + height, block.transactions, self.coin.electrum_header(block.header, height), + self.coin.header_hash(block.header) ) if height >= min_height: self.undo_infos.append((undo_info, height)) @@ -403,25 +405,26 @@ class BlockProcessor: self.headers.extend(headers) self.tip = self.coin.header_hash(headers[-1]) - def advance_txs(self, height, txs, header): - self.tx_hashes.append(b''.join(tx_hash for tx, tx_hash in txs)) + def advance_txs(self, height, txs, header, block_hash): + self.block_hashes.append(block_hash) + self.block_txs.append((b''.join(tx_hash for tx, tx_hash in txs), [tx.raw for tx, _ in txs])) - # Use local vars for speed in the loops undo_info = [] tx_num = self.tx_count - script_hashX = self.coin.hashX_from_script - s_pack = pack + hashXs_by_tx = [] + + # Use local vars for speed in the loops put_utxo = self.utxo_cache.__setitem__ spend_utxo = self.spend_utxo undo_info_append = undo_info.append update_touched = self.touched.update - hashXs_by_tx = [] - append_hashXs = hashXs_by_tx.append + append_hashX_by_tx = hashXs_by_tx.append + hashX_from_script = self.coin.hashX_from_script for tx, tx_hash in txs: hashXs = [] append_hashX = hashXs.append - tx_numb = s_pack('= 0 else 0) - assert len(flush_data.block_tx_hashes) == len(flush_data.headers) + assert len(flush_data.block_txs) == len(flush_data.headers) assert flush_data.height == self.fs_height + len(flush_data.headers) assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts else 0) assert len(self.tx_counts) == flush_data.height + 1 - assert len(b''.join(flush_data.block_tx_hashes)) // 32 == flush_data.tx_count - prior_tx_count + assert len( + b''.join(hashes for hashes, _ in flush_data.block_txs) + ) // 32 == flush_data.tx_count - prior_tx_count # Write the headers, tx counts, and tx hashes start_time = time.perf_counter() height_start = self.fs_height + 1 tx_num = prior_tx_count - for header, tx_hashes in zip(flush_data.headers, flush_data.block_tx_hashes): + for header, block_hash, (tx_hashes, txs) in zip( + flush_data.headers, flush_data.block_hashes, flush_data.block_txs): tx_count = self.tx_counts[height_start] self.headers_db.put(HEADER_PREFIX + util.pack_be_uint64(height_start), header) - self.tx_count_db.put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count)) + self.tx_db.put(BLOCK_HASH_PREFIX + util.pack_be_uint64(height_start), block_hash[::-1]) + self.tx_db.put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count)) height_start += 1 offset = 0 while offset < len(tx_hashes): - self.hashes_db.put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset+32]) + self.tx_db.put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset+32]) + self.tx_db.put(TX_NUM_PREFIX + tx_hashes[offset:offset+32], util.pack_be_uint64(tx_num)) + self.tx_db.put(TX_PREFIX + tx_hashes[offset:offset+32], txs[offset // 32]) tx_num += 1 offset += 32 - flush_data.block_tx_hashes.clear() + flush_data.block_txs.clear() + flush_data.block_hashes.clear() + self.fs_height = flush_data.height self.fs_tx_count = flush_data.tx_count flush_data.headers.clear() @@ -362,7 +365,7 @@ class LevelDB: def flush_backup(self, flush_data, touched): """Like flush_dbs() but when backing up. All UTXOs are flushed.""" assert not flush_data.headers - assert not flush_data.block_tx_hashes + assert not flush_data.block_txs assert flush_data.height < self.db_height self.history.assert_flushed() @@ -431,9 +434,54 @@ class LevelDB: if tx_height > self.db_height: tx_hash = None else: - tx_hash = self.hashes_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) + tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) return tx_hash, tx_height + def _fs_transactions(self, txids): + def _iter_transactions(): + block_txs = {} + branch_and_root = self.merkle.branch_and_root + tx_iterator = self.tx_db.iterator + tx_counts = self.tx_counts + tx_db_get = self.tx_db.get + unpack_be_uint64 = util.unpack_be_uint64 + pack_be_uint64 = util.pack_be_uint64 + + for tx_hash in txids: + tx_hash_bytes = bytes.fromhex(tx_hash)[::-1] + tx_num = tx_db_get(TX_NUM_PREFIX + tx_hash_bytes) + if tx_num is not None: + tx_num = unpack_be_uint64(tx_num) + tx_height = bisect_right(tx_counts, tx_num) + else: + yield tx_hash, (None, {'block_height': -1}) + continue + if tx_height >= self.db_height: + yield tx_hash, (None, {'block_height': -1}) + continue + tx = tx_db_get(TX_PREFIX + tx_hash_bytes) + if tx_height not in block_txs: + block_txs[tx_height] = list(tx_iterator( + start=TX_HASH_PREFIX + pack_be_uint64(tx_counts[tx_height - 1]), + stop=None if tx_height + 1 == len(tx_counts) else + TX_HASH_PREFIX + pack_be_uint64(tx_counts[tx_height]), + include_key=False + )) + tx_pos = tx_counts[tx_height] - tx_num + branch, root = branch_and_root(block_txs[tx_height], tx_pos) + merkle = { + 'block_height': tx_height, + 'merkle': [hash_to_hex_str(hash) for hash in branch], + 'pos': tx_pos + } + yield tx_hash, (None if not tx else tx.hex(), merkle) + return { + _tx_hash: _val for (_tx_hash, _val) in _iter_transactions() + } + + async def fs_transactions(self, txids): + return await asyncio.get_event_loop().run_in_executor(self.executor, self._fs_transactions, txids) + async def fs_block_hashes(self, height, count): headers_concat, headers_count = await self.read_headers(height, count) if headers_count != count: @@ -453,25 +501,54 @@ class LevelDB: transactions. By default returns at most 1000 entries. Set limit to None to get them all. """ - def read_history(): - hashx_history = [] + # def read_history(): + # hashx_history = [] + # for key, hist in self.history.db.iterator(prefix=hashX): + # a = array.array('I') + # a.frombytes(hist) + # for tx_num in a: + # tx_height = bisect_right(self.tx_counts, tx_num) + # if tx_height > self.db_height: + # tx_hash = None + # else: + # tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) + # + # hashx_history.append((tx_hash, tx_height)) + # if limit and len(hashx_history) >= limit: + # return hashx_history + # return hashx_history + + def iter_tx_heights(): + db_height = self.db_height + tx_counts = self.tx_counts + tx_db_get = self.tx_db.get + pack_be_uint64 = util.pack_be_uint64 + + cnt = 0 + for key, hist in self.history.db.iterator(prefix=hashX): a = array.array('I') a.frombytes(hist) for tx_num in a: - tx_height = bisect_right(self.tx_counts, tx_num) - if tx_height > self.db_height: - tx_hash = None - else: - tx_hash = self.hashes_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) - hashx_history.append((tx_hash, tx_height)) - if limit and len(hashx_history) >= limit: - return hashx_history - return hashx_history + tx_height = bisect_right(tx_counts, tx_num) + if tx_height > db_height: + yield None, tx_height + return + yield tx_db_get(TX_HASH_PREFIX + pack_be_uint64(tx_num)), tx_height + cnt += 1 + if limit and cnt >= limit: + return + if limit and cnt >= limit: + return + + def read_history(): + return [ + (tx_num, tx_height) for (tx_num, tx_height) in iter_tx_heights() + ] while True: history = await asyncio.get_event_loop().run_in_executor(self.executor, read_history) - if all(hash is not None for hash, height in history): + if not history or history[-1][0] is not None: return history self.logger.warning(f'limited_history: tx hash ' f'not found (reorg?), retrying...') @@ -629,13 +706,14 @@ class LevelDB: utxos = [] utxos_append = utxos.append s_unpack = unpack + fs_tx_hash = self.fs_tx_hash # Key: b'u' + address_hashX + tx_idx + tx_num # Value: the UTXO value as a 64-bit unsigned integer prefix = b'u' + hashX for db_key, db_value in self.utxo_db.iterator(prefix=prefix): tx_pos, tx_num = s_unpack(' 100: raise RPCError(BAD_REQUEST, f'too many tx hashes in request: {len(tx_hashes)}') for tx_hash in tx_hashes: assert_tx_hash(tx_hash) - batch_result = {} + batch_result = await self.db.fs_transactions(tx_hashes) + needed_merkles = {} for tx_hash in tx_hashes: + if tx_hash in batch_result and batch_result[tx_hash][0]: + continue tx_info = await self.daemon_request('getrawtransaction', tx_hash, True) raw_tx = tx_info['hex'] block_hash = tx_info.get('blockhash') - merkle = {} if block_hash: block = await self.daemon.deserialised_block(block_hash) height = block['height'] @@ -1552,12 +1556,21 @@ class LBRYElectrumX(SessionBase): except ValueError: raise RPCError(BAD_REQUEST, f'tx hash {tx_hash} not in ' f'block {block_hash} at height {height:,d}') - merkle["merkle"] = self._get_merkle_branch(block['tx'], pos) - merkle["pos"] = pos + needed_merkles[tx_hash] = raw_tx, block['tx'], pos, height else: - height = -1 - merkle['block_height'] = height - batch_result[tx_hash] = [raw_tx, merkle] + batch_result[tx_hash] = [raw_tx, {'block_height': -1}] + + def threaded_get_merkle(): + for tx_hash, (raw_tx, block_txs, pos, block_height) in needed_merkles.items(): + batch_result[tx_hash] = raw_tx, { + 'merkle': self._get_merkle_branch(block_txs, pos), + 'pos': pos, + 'block_height': block_height + } + if needed_merkles: + await asyncio.get_running_loop().run_in_executor(self.db.executor, threaded_get_merkle) + + self.session_mgr.tx_replied_count_metric.inc(len(tx_hashes)) return batch_result async def transaction_get(self, tx_hash, verbose=False): @@ -1572,19 +1585,6 @@ class LBRYElectrumX(SessionBase): return await self.daemon_request('getrawtransaction', tx_hash, verbose) - async def _block_hash_and_tx_hashes(self, height): - """Returns a pair (block_hash, tx_hashes) for the main chain block at - the given height. - - block_hash is a hexadecimal string, and tx_hashes is an - ordered list of hexadecimal strings. - """ - height = non_negative_integer(height) - hex_hashes = await self.daemon_request('block_hex_hashes', height, 1) - block_hash = hex_hashes[0] - block = await self.daemon.deserialised_block(block_hash) - return block_hash, block['tx'] - def _get_merkle_branch(self, tx_hashes, tx_pos): """Return a merkle branch to a transaction. @@ -1604,35 +1604,11 @@ class LBRYElectrumX(SessionBase): height: the height of the block it is in """ assert_tx_hash(tx_hash) - block_hash, tx_hashes = await self._block_hash_and_tx_hashes(height) - try: - pos = tx_hashes.index(tx_hash) - except ValueError: + result = await self.transaction_get_batch(tx_hash) + if tx_hash not in result or result[tx_hash][1]['block_height'] <= 0: raise RPCError(BAD_REQUEST, f'tx hash {tx_hash} not in ' - f'block {block_hash} at height {height:,d}') - branch = self._get_merkle_branch(tx_hashes, pos) - return {"block_height": height, "merkle": branch, "pos": pos} - - async def transaction_id_from_pos(self, height, tx_pos, merkle=False): - """Return the txid and optionally a merkle proof, given - a block height and position in the block. - """ - tx_pos = non_negative_integer(tx_pos) - if merkle not in (True, False): - raise RPCError(BAD_REQUEST, f'"merkle" must be a boolean') - - block_hash, tx_hashes = await self._block_hash_and_tx_hashes(height) - try: - tx_hash = tx_hashes[tx_pos] - except IndexError: - raise RPCError(BAD_REQUEST, f'no tx at position {tx_pos:,d} in ' - f'block {block_hash} at height {height:,d}') - - if merkle: - branch = self._get_merkle_branch(tx_hashes, tx_pos) - return {"tx_hash": tx_hash, "merkle": branch} - else: - return tx_hash + f'block at height {height:,d}') + return result[tx_hash][1] class LocalRPC(SessionBase): diff --git a/lbry/wallet/server/storage.py b/lbry/wallet/server/storage.py index 5e7db97dd..127166204 100644 --- a/lbry/wallet/server/storage.py +++ b/lbry/wallet/server/storage.py @@ -77,18 +77,16 @@ class LevelDB(Storage): import plyvel cls.module = plyvel - def open(self, name, create): - mof = 512 if self.for_sync else 128 + def open(self, name, create, lru_cache_size=None): + mof = 10000 path = os.path.join(self.db_dir, name) # Use snappy compression (the default) - self.db = self.module.DB(path, create_if_missing=create, - max_open_files=mof) + self.db = self.module.DB(path, create_if_missing=create, max_open_files=mof) self.close = self.db.close self.get = self.db.get self.put = self.db.put self.iterator = self.db.iterator - self.write_batch = partial(self.db.write_batch, transaction=True, - sync=True) + self.write_batch = partial(self.db.write_batch, transaction=True, sync=True) class RocksDB(Storage):