diff --git a/integration/rpctest/memwallet.go b/integration/rpctest/memwallet.go index 9b2999f3..0dccde10 100644 --- a/integration/rpctest/memwallet.go +++ b/integration/rpctest/memwallet.go @@ -56,6 +56,7 @@ func (u *utxo) isMature(height int32) bool { type chainUpdate struct { blockHeight int32 filteredTxns []*btcutil.Tx + isConnect bool // True if connect, false if disconnect } // undoEntry is functionally the opposite of a chainUpdate. An undoEntry is @@ -176,13 +177,14 @@ func (m *memWallet) SetRPCClient(rpcClient *rpcclient.Client) { } // IngestBlock is a call-back which is to be triggered each time a new block is -// connected to the main chain. Ingesting a block updates the wallet's internal -// utxo state based on the outputs created and destroyed within each block. +// connected to the main chain. It queues the update for the chain syncer, +// calling the private version in sequential order. func (m *memWallet) IngestBlock(height int32, header *wire.BlockHeader, filteredTxns []*btcutil.Tx) { // Append this new chain update to the end of the queue of new chain // updates. m.chainMtx.Lock() - m.chainUpdates = append(m.chainUpdates, &chainUpdate{height, filteredTxns}) + m.chainUpdates = append(m.chainUpdates, &chainUpdate{height, + filteredTxns, true}) m.chainMtx.Unlock() // Launch a goroutine to signal the chainSyncer that a new update is @@ -193,6 +195,30 @@ func (m *memWallet) IngestBlock(height int32, header *wire.BlockHeader, filtered }() } +// ingestBlock updates the wallet's internal utxo state based on the outputs +// created and destroyed within each block. +func (m *memWallet) ingestBlock(update *chainUpdate) { + // Update the latest synced height, then process each filtered + // transaction in the block creating and destroying utxos within + // the wallet as a result. + m.currentHeight = update.blockHeight + undo := &undoEntry{ + utxosDestroyed: make(map[wire.OutPoint]*utxo), + } + for _, tx := range update.filteredTxns { + mtx := tx.MsgTx() + isCoinbase := blockchain.IsCoinBaseTx(mtx) + txHash := mtx.TxHash() + m.evalOutputs(mtx.TxOut, &txHash, isCoinbase, undo) + m.evalInputs(mtx.TxIn, undo) + } + + // Finally, record the undo entry for this block so we can + // properly update our internal state in response to the block + // being re-org'd from the main chain. + m.reorgJournal[update.blockHeight] = undo +} + // chainSyncer is a goroutine dedicated to processing new blocks in order to // keep the wallet's utxo state up to date. // @@ -209,26 +235,12 @@ func (m *memWallet) chainSyncer() { m.chainUpdates = m.chainUpdates[1:] m.chainMtx.Unlock() - // Update the latest synced height, then process each filtered - // transaction in the block creating and destroying utxos within - // the wallet as a result. m.Lock() - m.currentHeight = update.blockHeight - undo := &undoEntry{ - utxosDestroyed: make(map[wire.OutPoint]*utxo), + if update.isConnect { + m.ingestBlock(update) + } else { + m.unwindBlock(update) } - for _, tx := range update.filteredTxns { - mtx := tx.MsgTx() - isCoinbase := blockchain.IsCoinBaseTx(mtx) - txHash := mtx.TxHash() - m.evalOutputs(mtx.TxOut, &txHash, isCoinbase, undo) - m.evalInputs(mtx.TxIn, undo) - } - - // Finally, record the undo entry for this block so we can - // properly update our internal state in response to the block - // being re-org'd from the main chain. - m.reorgJournal[update.blockHeight] = undo m.Unlock() } } @@ -285,13 +297,28 @@ func (m *memWallet) evalInputs(inputs []*wire.TxIn, undo *undoEntry) { } // UnwindBlock is a call-back which is to be executed each time a block is -// disconnected from the main chain. Unwinding a block undoes the effect that a -// particular block had on the wallet's internal utxo state. +// disconnected from the main chain. It queues the update for the chain syncer, +// calling the private version in sequential order. func (m *memWallet) UnwindBlock(height int32, header *wire.BlockHeader) { - m.Lock() - defer m.Unlock() + // Append this new chain update to the end of the queue of new chain + // updates. + m.chainMtx.Lock() + m.chainUpdates = append(m.chainUpdates, &chainUpdate{height, + nil, false}) + m.chainMtx.Unlock() - undo := m.reorgJournal[height] + // Launch a goroutine to signal the chainSyncer that a new update is + // available. We do this in a new goroutine in order to avoid blocking + // the main loop of the rpc client. + go func() { + m.chainUpdateSignal <- struct{}{} + }() +} + +// unwindBlock undoes the effect that a particular block had on the wallet's +// internal utxo state. +func (m *memWallet) unwindBlock(update *chainUpdate) { + undo := m.reorgJournal[update.blockHeight] for _, utxo := range undo.utxosCreated { delete(m.utxos, utxo) @@ -301,7 +328,7 @@ func (m *memWallet) UnwindBlock(height int32, header *wire.BlockHeader) { m.utxos[outPoint] = utxo } - delete(m.reorgJournal, height) + delete(m.reorgJournal, update.blockHeight) } // newAddress returns a new address from the wallet's hd key chain. It also