From bebe29012e0c832f6fedbf3a2d0567aab0babffe Mon Sep 17 00:00:00 2001 From: Alex Date: Thu, 4 May 2017 00:05:05 -0600 Subject: [PATCH] Rescan tests pass except an intermittent close of closed channel. --- spvsvc/spvchain/blockmanager.go | 61 +--- spvsvc/spvchain/query.go | 32 +- spvsvc/spvchain/rescan.go | 56 ++-- spvsvc/spvchain/spvchain.go | 1 + spvsvc/spvchain/sync_test.go | 507 ++++++++++++++++++++++++++++---- 5 files changed, 516 insertions(+), 141 deletions(-) diff --git a/spvsvc/spvchain/blockmanager.go b/spvsvc/spvchain/blockmanager.go index ea110d2..c7454e7 100644 --- a/spvsvc/spvchain/blockmanager.go +++ b/spvsvc/spvchain/blockmanager.go @@ -103,30 +103,6 @@ type txMsg struct { peer *serverPeer } -// getSyncPeerMsg is a message type to be sent across the message channel for -// retrieving the current sync peer. -type getSyncPeerMsg struct { - reply chan *serverPeer -} - -// processBlockResponse is a response sent to the reply channel of a -// processBlockMsg. -type processBlockResponse struct { - isOrphan bool - err error -} - -// processBlockMsg is a message type to be sent across the message channel -// for requested a block is processed. Note this call differs from blockMsg -// above in that blockMsg is intended for blocks that came from peers and have -// extra handling whereas this message essentially is just a concurrent safe -// way to call ProcessBlock on the internal block chain instance. -type processBlockMsg struct { - block *btcutil.Block - flags blockchain.BehaviorFlags - reply chan processBlockResponse -} - // isCurrentMsg is a message type to be sent across the message channel for // requesting whether or not the block manager believes it is synced with // the currently connected peers. @@ -150,6 +126,7 @@ type blockManager struct { requestedBlocks map[chainhash.Hash]struct{} progressLogger *blockProgressLogger syncPeer *serverPeer + syncPeerMutex sync.Mutex // Channel for messages that come from peers peerChan chan interface{} // Channel for messages that come from internal commands @@ -301,7 +278,9 @@ func (b *blockManager) handleDonePeerMsg(peers *list.List, sp *serverPeer) { // Attempt to find a new peer to sync from if the quitting peer is the // sync peer. Also, reset the header state. if b.syncPeer != nil && b.syncPeer == sp { + b.syncPeerMutex.Lock() b.syncPeer = nil + b.syncPeerMutex.Unlock() header, height, err := b.server.LatestBlock() if err != nil { return @@ -342,10 +321,6 @@ out: case *newPeerMsg: b.handleNewPeerMsg(candidatePeers, msg.peer) - /*case *blockMsg: - b.handleBlockMsg(msg) - msg.peer.blockProcessed <- struct{}{}*/ - case *invMsg: b.handleInvMsg(msg) @@ -358,24 +333,6 @@ out: case *donePeerMsg: b.handleDonePeerMsg(candidatePeers, msg.peer) - case getSyncPeerMsg: - msg.reply <- b.syncPeer - - /*case processBlockMsg: - _, isOrphan, err := b.chain.ProcessBlock( - msg.block, msg.flags) - if err != nil { - msg.reply <- processBlockResponse{ - isOrphan: false, - err: err, - } - } - - msg.reply <- processBlockResponse{ - isOrphan: isOrphan, - err: nil, - }*/ - case isCurrentMsg: msg.reply <- b.current() @@ -393,8 +350,12 @@ out: log.Trace("Block handler done") } -// queueHandler reads the message channel and queues the message. This allows -// lookahead checks in +// SyncPeer returns the current sync peer. +func (b *blockManager) SyncPeer() *serverPeer { + b.syncPeerMutex.Lock() + defer b.syncPeerMutex.Unlock() + return b.syncPeer +} // isSyncCandidate returns whether or not the peer is a candidate to consider // syncing from. @@ -561,7 +522,9 @@ func (b *blockManager) startSync(peers *list.List) { // and fully validate them. Finally, regression test mode does // not support the headers-first approach so do normal block // downloads when in regression test mode. + b.syncPeerMutex.Lock() b.syncPeer = bestPeer + b.syncPeerMutex.Unlock() if b.nextCheckpoint != nil && best.Height < b.nextCheckpoint.Height { @@ -935,7 +898,9 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { // We also change the sync peer. Then we can continue // with the rest of the headers in the message as if // nothing has happened. + b.syncPeerMutex.Lock() b.syncPeer = hmsg.peer + b.syncPeerMutex.Unlock() _, err = b.server.rollBackToHeight(backHeight) if err != nil { log.Criticalf("Rollback failed: %s", diff --git a/spvsvc/spvchain/query.go b/spvsvc/spvchain/query.go index 41ee87f..9de6d95 100644 --- a/spvsvc/spvchain/query.go +++ b/spvsvc/spvchain/query.go @@ -4,6 +4,7 @@ package spvchain import ( "sync" + "sync/atomic" "time" "github.com/btcsuite/btcd/blockchain" @@ -114,12 +115,14 @@ func (s *ChainService) queryPeers( // in a single thread. This is the only part of the query framework that // requires access to peerState, so it's done once per query. peers := s.Peers() + syncPeer := s.blockManager.SyncPeer() // This will be shared state between the per-peer goroutines. quit := make(chan struct{}) allQuit := make(chan struct{}) startQuery := make(chan struct{}) var wg sync.WaitGroup + var syncPeerTries uint32 // Increase this number to be able to handle more queries at once as // each channel gets results for all queries, otherwise messages can // get mixed and there's a vicious cycle of retries causing a bigger @@ -146,6 +149,7 @@ func (s *ChainService) queryPeers( return } timeout := make(<-chan time.Time) + queryLoop: for { select { case <-timeout: @@ -175,11 +179,29 @@ func (s *ChainService) queryPeers( case <-startQuery: // We're the lucky peer whose turn it is // to try to answer the current query. - // TODO: Fix this to support either - // querying *all* peers simultaneously - // to avoid timeout delays, or starting - // with the syncPeer when not querying - // *all* peers. + // TODO: Add support for querying *all* + // peers simultaneously to avoid timeout + // delays. + // If the sync peer hasn't tried yet and + // we aren't the sync peer, don't do + // anything but forward the message down + // the startQuery channel until the + // sync peer gets a shot. + if sp == syncPeer { + atomic.StoreUint32( + &syncPeerTries, 1) + } + if atomic.LoadUint32(&syncPeerTries) == + 0 { + select { + case startQuery <- struct{}{}: + case <-quit: + return + case <-allQuit: + return + } + continue queryLoop + } sp.subscribeRecvMsg(subscription) // Don't want the peer hanging on send // to the channel if we quit before diff --git a/spvsvc/spvchain/rescan.go b/spvsvc/spvchain/rescan.go index db94df9..b83fc1f 100644 --- a/spvsvc/spvchain/rescan.go +++ b/spvsvc/spvchain/rescan.go @@ -198,6 +198,8 @@ func (s *ChainService) Rescan(options ...RescanOption) error { } } } + log.Tracef("Starting rescan from known block %d (%s)", curStamp.Height, + curStamp.Hash) // Listen for notifications. blockConnected := make(chan wire.BlockHeader) @@ -244,10 +246,9 @@ rescanLoop: if ro.ntfn. OnFilteredBlockDisconnected != nil { - ro.ntfn. - OnFilteredBlockDisconnected( - curStamp.Height, - &curHeader) + ro.ntfn.OnFilteredBlockDisconnected( + curStamp.Height, + &curHeader) } if ro.ntfn.OnBlockDisconnected != nil { ro.ntfn.OnBlockDisconnected( @@ -273,6 +274,9 @@ rescanLoop: header, err := s.GetBlockByHeight(uint32( curStamp.Height + 1)) if err != nil { + log.Tracef("Rescan became current at %d (%s), "+ + "subscribing to block notifications", + curStamp.Height, curStamp.Hash) current = true // Subscribe to block notifications. s.subscribeBlockMsg(subscription) @@ -316,17 +320,15 @@ rescanLoop: } relevantTxs, err = notifyBlock(block, filter, &ro.watchOutPoints, ro.watchAddrs, - ro.ntfn) + &watchList, ro.ntfn) if err != nil { return err } } } if ro.ntfn.OnFilteredBlockConnected != nil { - ro.ntfn.OnFilteredBlockConnected( - block.Height(), - &(block.MsgBlock().Header), - relevantTxs) + ro.ntfn.OnFilteredBlockConnected(curStamp.Height, + &curHeader, relevantTxs) } } } @@ -336,7 +338,8 @@ rescanLoop: // matched addresses. func notifyBlock(block *btcutil.Block, filter *gcs.Filter, outPoints *[]wire.OutPoint, addrs []btcutil.Address, - ntfn btcrpcclient.NotificationHandlers) ([]*btcutil.Tx, error) { + watchList *[][]byte, ntfn btcrpcclient.NotificationHandlers) ( + []*btcutil.Tx, error) { var relevantTxs []*btcutil.Tx blockHeader := block.MsgBlock().Header details := btcjson.BlockDetails{ @@ -353,23 +356,17 @@ func notifyBlock(block *btcutil.Block, filter *gcs.Filter, break } for _, op := range *outPoints { - if in.PreviousOutPoint == - op { + if in.PreviousOutPoint == op { relevant = true if ntfn.OnRedeemingTx != nil { - ntfn.OnRedeemingTx( - tx, - &txDetails, - ) + ntfn.OnRedeemingTx(tx, + &txDetails) } break } } } for outIdx, out := range tx.MsgTx().TxOut { - if relevant { - break - } pushedData, err := txscript.PushedData( out.PkScript) @@ -384,27 +381,22 @@ func notifyBlock(block *btcutil.Block, filter *gcs.Filter, if bytes.Equal(data, addr.ScriptAddress()) { relevant = true - hash := - tx.Hash() + hash := tx.Hash() outPoint := wire.OutPoint{ Hash: *hash, Index: uint32(outIdx), } - *outPoints = - append( - *outPoints, - outPoint, - ) + *outPoints = append(*outPoints, + outPoint) + *watchList = append(*watchList, + builder.OutPointToFilterEntry( + outPoint)) if ntfn.OnRecvTx != nil { - ntfn.OnRecvTx( - tx, - &txDetails, - ) + ntfn.OnRecvTx(tx, + &txDetails) } - break } } - } } if relevant { diff --git a/spvsvc/spvchain/spvchain.go b/spvsvc/spvchain/spvchain.go index 2f575f7..d1f0b0e 100644 --- a/spvsvc/spvchain/spvchain.go +++ b/spvsvc/spvchain/spvchain.go @@ -769,6 +769,7 @@ func NewChainService(cfg Config) (*ChainService, error) { services: Services, userAgentName: UserAgentName, userAgentVersion: UserAgentVersion, + blockSubscribers: make(map[blockSubscription]struct{}), } err := s.createSPVNS() diff --git a/spvsvc/spvchain/sync_test.go b/spvsvc/spvchain/sync_test.go index 9067288..94e66bf 100644 --- a/spvsvc/spvchain/sync_test.go +++ b/spvsvc/spvchain/sync_test.go @@ -14,6 +14,7 @@ import ( "github.com/aakselrod/btctestlog" "github.com/btcsuite/btcd/btcec" + "github.com/btcsuite/btcd/btcjson" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/rpctest" @@ -25,6 +26,7 @@ import ( "github.com/btcsuite/btcutil/gcs/builder" "github.com/btcsuite/btcwallet/spvsvc/spvchain" "github.com/btcsuite/btcwallet/waddrmgr" + "github.com/btcsuite/btcwallet/wallet/txauthor" "github.com/btcsuite/btcwallet/walletdb" _ "github.com/btcsuite/btcwallet/walletdb/bdb" ) @@ -42,39 +44,158 @@ var ( queryOptions = []spvchain.QueryOption{ //spvchain.NumRetries(5), } - // The sequence of connecting blocks. - conn = func() []int32 { - blocks := []int32{} - for i := 801; i <= 928; i++ { - blocks = append(blocks, int32(i)) + // The logged sequence of events we want to see. The value of i + // represents the block for which a loop is generating a log entry, + // given for readability only. + // "bc": OnBlockConnected + // "fc" xx: OnFilteredBlockConnected with xx (uint8) relevant TXs + // "rv": OnRecvTx + // "rd": OnRedeemingTx + // "bd": OnBlockDisconnected + // "fd": OnFilteredBlockDisconnected + wantLog = func() (log []byte) { + for i := 796; i <= 800; i++ { + // BlockConnected and FilteredBlockConnected + log = append(log, []byte("bcfc")...) + // 0 relevant TXs + log = append(log, 0x00) } - for i := 926; i <= 930; i++ { - blocks = append(blocks, int32(i)) + // Block with two relevant (receive) transactions + log = append(log, []byte("bcrvrvfc")...) + log = append(log, 0x02) + // 124 blocks with nothing + for i := 802; i <= 925; i++ { + log = append(log, []byte("bcfc")...) + log = append(log, 0x00) } - for i := 926; i <= 935; i++ { - blocks = append(blocks, int32(i)) + // 2 blocks with 1 redeeming transaction each + for i := 926; i <= 927; i++ { + log = append(log, []byte("bcrdfc")...) + log = append(log, 0x01) } - return blocks - } - // The sequence of disconnecting blocks. - dconn = func() []int32 { - blocks := []int32{} + // Block with nothing + log = append(log, []byte("bcfc")...) + log = append(log, 0x00) + // 3 block rollback for i := 928; i >= 926; i-- { - blocks = append(blocks, int32(i)) + log = append(log, []byte("fdbd")...) } + // 5 block empty reorg + for i := 926; i <= 930; i++ { + log = append(log, []byte("bcfc")...) + log = append(log, 0x00) + } + // 5 block rollback for i := 930; i >= 926; i-- { - blocks = append(blocks, int32(i)) + log = append(log, []byte("fdbd")...) } - return blocks - } - // Blocks with relevant transactions - relevant = []int32{801, 929, 930} - // Blocks with receive transactions - receive = []int32{801} - // Blocks with redeeming transactions - redeem = []int32{929, 930} + // 2 blocks with 1 redeeming transaction each + for i := 926; i <= 927; i++ { + log = append(log, []byte("bcrdfc")...) + log = append(log, 0x01) + } + // 8 block rest of reorg + for i := 928; i <= 935; i++ { + log = append(log, []byte("bcfc")...) + log = append(log, 0x00) + } + return log + }() + + // rescanMtx locks all the variables to which the rescan goroutine's + // notifications write. + rescanMtx sync.RWMutex + + // gotLog is where we accumulate the event log from the rescan. Then we + // compare it to wantLog to see if the series of events the rescan saw + // happened as expected. + gotLog []byte + + // curBlockHeight lets the rescan goroutine track where it thinks the + // chain is based on OnBlockConnected and OnBlockDisconnected. + curBlockHeight int32 + + // curFilteredBlockHeight lets the rescan goroutine track where it + // thinks the chain is based on OnFilteredBlockConnected and + // OnFilteredBlockDisconnected. + curFilteredBlockHeight int32 + + // ourKnownTxsByBlock lets the rescan goroutine keep track of + // transactions we're interested in that are in the blockchain we're + // following as signalled by OnBlockConnected, OnBlockDisconnected, + // OnRecvTx, and OnRedeemingTx. + ourKnownTxsByBlock = make(map[chainhash.Hash][]*btcutil.Tx) + + // ourKnownTxsByFilteredBlock lets the rescan goroutine keep track of + // transactions we're interested in that are in the blockchain we're + // following as signalled by OnFilteredBlockConnected and + // OnFilteredBlockDisconnected. + ourKnownTxsByFilteredBlock = make(map[chainhash.Hash][]*btcutil.Tx) ) +// secSource is an implementation of btcwallet/txauthor/SecretsSource that +// stores WitnessPubKeyHash addresses. +type secSource struct { + keys map[string]*btcec.PrivateKey + scripts map[string]*[]byte + params *chaincfg.Params +} + +func (s *secSource) add(privKey *btcec.PrivateKey) (btcutil.Address, error) { + pubKeyHash := btcutil.Hash160(privKey.PubKey().SerializeCompressed()) + addr, err := btcutil.NewAddressWitnessPubKeyHash(pubKeyHash, s.params) + if err != nil { + return nil, err + } + script, err := txscript.PayToAddrScript(addr) + if err != nil { + return nil, err + } + s.keys[addr.String()] = privKey + s.scripts[addr.String()] = &script + _, addrs, _, err := txscript.ExtractPkScriptAddrs(script, s.params) + if err != nil { + return nil, err + } + if addrs[0].String() != addr.String() { + return nil, fmt.Errorf("Encoded and decoded addresses don't "+ + "match. Encoded: %s, decoded: %s", addr, addrs[0]) + } + return addr, nil +} + +// GetKey is required by the txscript.KeyDB interface +func (s *secSource) GetKey(addr btcutil.Address) (*btcec.PrivateKey, bool, + error) { + privKey, ok := s.keys[addr.String()] + if !ok { + return nil, true, fmt.Errorf("No key for address %s", addr) + } + return privKey, true, nil +} + +// GetScript is required by the txscript.ScriptDB interface +func (s *secSource) GetScript(addr btcutil.Address) ([]byte, error) { + script, ok := s.scripts[addr.String()] + if !ok { + return nil, fmt.Errorf("No script for address %s", addr) + } + return *script, nil +} + +// ChainParams is required by the SecretsSource interface +func (s *secSource) ChainParams() *chaincfg.Params { + return s.params +} + +func newSecSource(params *chaincfg.Params) *secSource { + return &secSource{ + keys: make(map[string]*btcec.PrivateKey), + scripts: make(map[string]*[]byte), + params: params, + } +} + func TestSetup(t *testing.T) { // Set up logging. logger, err := btctestlog.NewTestLogger(t) @@ -224,42 +345,64 @@ func TestSetup(t *testing.T) { // Generate an address and send it some coins on the h1 chain. We use // this to test rescans and notifications. - privKey, err := btcec.NewPrivateKey(btcec.S256()) + secSrc := newSecSource(&modParams) + privKey1, err := btcec.NewPrivateKey(btcec.S256()) if err != nil { t.Fatalf("Couldn't generate private key: %s", err) } - pubKeyHash := btcutil.Hash160(privKey.PubKey().SerializeCompressed()) - addr, err := btcutil.NewAddressWitnessPubKeyHash(pubKeyHash, &modParams) + addr1, err := secSrc.add(privKey1) if err != nil { t.Fatalf("Couldn't create address from key: %s", err) } - script, err := txscript.PayToAddrScript(addr) + script1, err := secSrc.GetScript(addr1) if err != nil { t.Fatalf("Couldn't create script from address: %s", err) } - out := wire.TxOut{ - PkScript: script, + out1 := wire.TxOut{ + PkScript: script1, Value: 1000000000, } - tx1, err := h1.CreateTransaction([]*wire.TxOut{&out}, 1000) + // Fee rate is satoshis per byte + tx1, err := h1.CreateTransaction([]*wire.TxOut{&out1}, 1000) if err != nil { t.Fatalf("Couldn't create transaction from script: %s", err) } - utx1 := btcutil.NewTx(tx1) - utx1.SetIndex(1) - tx2, err := h1.CreateTransaction([]*wire.TxOut{&out}, 1000) + _, err = h1.Node.SendRawTransaction(tx1, true) + if err != nil { + t.Fatalf("Unable to send raw transaction to node: %s", err) + } + // Fee rate is satoshis per byte + tx2, err := h1.CreateTransaction([]*wire.TxOut{&out1}, 1000) if err != nil { t.Fatalf("Couldn't create transaction from script: %s", err) } - utx2 := btcutil.NewTx(tx2) - utx2.SetIndex(2) - if tx1.TxHash() == tx2.TxHash() { - t.Fatalf("Created two identical transactions") - } - _, err = h1.GenerateAndSubmitBlock([]*btcutil.Tx{utx1, utx2}, - -1, time.Time{}) + _, err = h1.Node.SendRawTransaction(tx2, true) if err != nil { - t.Fatalf("Couldn't generate/submit block: %s") + t.Fatalf("Unable to send raw transaction to node: %s", err) + } + _, err = h1.Node.Generate(1) + if err != nil { + t.Fatalf("Couldn't generate/submit block: %s", err) + } + + // Start a rescan with notifications in another goroutine. We'll kill + // it with a quit channel at the end and make sure we got the expected + // results. + quitRescan := make(chan struct{}) + startBlock := &waddrmgr.BlockStamp{Height: 795} + err = startRescan(t, svc, addr1, startBlock, quitRescan) + if err != nil { + t.Fatalf("Couldn't start a rescan for %s: %s", addr1, err) + } + err = waitForSync(t, svc, h1) + if err != nil { + t.Fatalf("Couldn't sync ChainService: %s", err) + } + + numTXs, _, err := checkRescanStatus() + if numTXs != 2 { + t.Fatalf("Wrong number of relevant transactions. Want: 2, got:"+ + " %d", numTXs) } // Generate 124 blocks on h1 to make sure it reorgs the other nodes. @@ -276,13 +419,13 @@ func TestSetup(t *testing.T) { t.Fatalf("Couldn't sync h2 to h1: %s", err) } - // Spend the outputs we sent ourselves. - _ = func(tx wire.MsgTx) func(target btcutil.Amount) ( + // Spend the outputs we sent ourselves over two blocks. + inSrc := func(tx wire.MsgTx) func(target btcutil.Amount) ( total btcutil.Amount, inputs []*wire.TxIn, inputValues []btcutil.Amount, scripts [][]byte, err error) { ourIndex := 1 << 30 // Should work on 32-bit systems for i, txo := range tx.TxOut { - if bytes.Equal(txo.PkScript, script) { + if bytes.Equal(txo.PkScript, script1) { ourIndex = i } } @@ -296,7 +439,7 @@ func TestSetup(t *testing.T) { } total = target inputs = []*wire.TxIn{ - &wire.TxIn{ + { PreviousOutPoint: wire.OutPoint{ Hash: tx.TxHash(), Index: uint32(ourIndex), @@ -310,11 +453,104 @@ func TestSetup(t *testing.T) { return } } - - // Generate 3 blocks on h1, one at a time, to make sure the + // Create another address to send to so we don't trip the rescan with + // the old address and we can test monitoring both OutPoint usage and + // receipt by addresses. + privKey2, err := btcec.NewPrivateKey(btcec.S256()) + if err != nil { + t.Fatalf("Couldn't generate private key: %s", err) + } + addr2, err := secSrc.add(privKey2) + if err != nil { + t.Fatalf("Couldn't create address from key: %s", err) + } + script2, err := secSrc.GetScript(addr2) + if err != nil { + t.Fatalf("Couldn't create script from address: %s", err) + } + out2 := wire.TxOut{ + PkScript: script2, + Value: 500000000, + } + // Spend the first transaction and mine a block. + authTx1, err := txauthor.NewUnsignedTransaction( + []*wire.TxOut{ + &out2, + }, + // Fee rate is satoshis per kilobyte + 1024000, + inSrc(*tx1), + func() ([]byte, error) { + return script2, nil + }, + ) + if err != nil { + t.Fatalf("Couldn't create unsigned transaction: %s", err) + } + err = authTx1.AddAllInputScripts(secSrc) + if err != nil { + t.Fatalf("Couldn't sign transaction: %s", err) + } + _, err = h1.Node.SendRawTransaction(authTx1.Tx, true) + if err != nil { + t.Fatalf("Unable to send raw transaction to node: %s", err) + } + _, err = h1.Node.Generate(1) + if err != nil { + t.Fatalf("Couldn't generate/submit block: %s", err) + } + err = waitForSync(t, svc, h1) + if err != nil { + t.Fatalf("Couldn't sync ChainService: %s", err) + } + numTXs, _, err = checkRescanStatus() + if numTXs != 3 { + t.Fatalf("Wrong number of relevant transactions. Want: 3, got:"+ + " %d", numTXs) + } + // Spend the second transaction and mine a block. + authTx2, err := txauthor.NewUnsignedTransaction( + []*wire.TxOut{ + &out2, + }, + // Fee rate is satoshis per kilobyte + 1024000, + inSrc(*tx2), + func() ([]byte, error) { + return script2, nil + }, + ) + if err != nil { + t.Fatalf("Couldn't create unsigned transaction: %s", err) + } + err = authTx2.AddAllInputScripts(secSrc) + if err != nil { + t.Fatalf("Couldn't sign transaction: %s", err) + } + _, err = h1.Node.SendRawTransaction(authTx2.Tx, true) + if err != nil { + t.Fatalf("Unable to send raw transaction to node: %s", err) + } + _, err = h1.Node.Generate(1) + if err != nil { + t.Fatalf("Couldn't generate/submit block: %s", err) + } + err = waitForSync(t, svc, h1) + if err != nil { + t.Fatalf("Couldn't sync ChainService: %s", err) + } + numTXs, _, err = checkRescanStatus() + if numTXs != 4 { + t.Fatalf("Wrong number of relevant transactions. Want: 4, got:"+ + " %d", numTXs) + } + // Generate 1 blocks on h1, one at a time, to make sure the // ChainService instance stays caught up. - for i := 0; i < 3; i++ { - h1.Node.Generate(1) + for i := 0; i < 1; i++ { + _, err = h1.Node.Generate(1) + if err != nil { + t.Fatalf("Couldn't generate/submit block: %s", err) + } err = waitForSync(t, svc, h1) if err != nil { t.Fatalf("Couldn't sync ChainService: %s", err) @@ -323,19 +559,41 @@ func TestSetup(t *testing.T) { // Generate 5 blocks on h2 and wait for ChainService to sync to the // newly-best chain on h2. - h2.Node.Generate(5) + _, err = h2.Node.Generate(5) + if err != nil { + t.Fatalf("Couldn't generate/submit blocks: %s", err) + } err = waitForSync(t, svc, h2) if err != nil { t.Fatalf("Couldn't sync ChainService: %s", err) } + numTXs, _, err = checkRescanStatus() + if numTXs != 2 { + t.Fatalf("Wrong number of relevant transactions. Want: 2, got:"+ + " %d", numTXs) + } // Generate 7 blocks on h1 and wait for ChainService to sync to the // newly-best chain on h1. - h1.Node.Generate(7) + _, err = h1.Node.Generate(7) + if err != nil { + t.Fatalf("Couldn't generate/submit block: %s", err) + } err = waitForSync(t, svc, h1) if err != nil { t.Fatalf("Couldn't sync ChainService: %s", err) } + numTXs, _, err = checkRescanStatus() + if numTXs != 4 { + t.Fatalf("Wrong number of relevant transactions. Want: 4, got:"+ + " %d", numTXs) + } + + close(quitRescan) + if !bytes.Equal(wantLog, gotLog) { + t.Fatalf("Rescan event logs incorrect.\nWant: %s\nGot: %s\n", + wantLog, gotLog) + } } // csd does a connect-sync-disconnect between nodes in order to support @@ -490,13 +748,44 @@ func waitForSync(t *testing.T, svc *spvchain.ChainService, knownExtHeader.HeaderHashes[0]) } } + // At this point, we know we have good cfheaders. Now we wait for the + // rescan, if one is going, to catch up. + for { + time.Sleep(syncUpdate) + total += syncUpdate + rescanMtx.RLock() + // We don't want to do this if we haven't started a rescan + // yet. + if len(gotLog) == 0 { + rescanMtx.RUnlock() + break + } + _, rescanHeight, err := checkRescanStatus() + if err != nil { + rescanMtx.RUnlock() + return err + } + if logLevel != btclog.Off { + t.Logf("Rescan caught up to block %d", rescanHeight) + } + if rescanHeight == haveBest.Height { + rescanMtx.RUnlock() + break + } + if total > syncTimeout { + rescanMtx.RUnlock() + return fmt.Errorf("Timed out after %v waiting for "+ + "rescan to catch up.", syncTimeout) + } + rescanMtx.RUnlock() + } return nil } -// testRandomBlocks goes through numTestBlocks random blocks and ensures we -// can correctly get filters from them. We don't go through *all* the blocks -// because it can be a little slow, but we'll improve that soon-ish hopefully -// to the point where we can do it. +// testRandomBlocks goes through all blocks in random order and ensures we can +// correctly get cfilters from them. It uses numQueryThreads goroutines running +// at the same time to go through this. 50 is comfortable on my somewhat dated +// laptop with default query optimization settings. // TODO: Make this a benchmark instead. func testRandomBlocks(t *testing.T, svc *spvchain.ChainService, correctSyncNode *rpctest.Harness) error { @@ -725,3 +1014,109 @@ func testRandomBlocks(t *testing.T, svc *spvchain.ChainService, } return lastErr } + +// startRescan starts a rescan in another goroutine, and logs all notifications +// from the rescan. At the end, the log should match one we precomputed based +// on the flow of the test. The rescan starts at the genesis block and the +// notifications continue until the `quit` channel is closed. +func startRescan(t *testing.T, svc *spvchain.ChainService, addr btcutil.Address, + startBlock *waddrmgr.BlockStamp, quit <-chan struct{}) error { + go svc.Rescan( + spvchain.QuitChan(quit), + spvchain.WatchAddrs(addr), + spvchain.StartBlock(startBlock), + spvchain.NotificationHandlers(btcrpcclient.NotificationHandlers{ + OnBlockConnected: func(hash *chainhash.Hash, + height int32, time time.Time) { + rescanMtx.Lock() + gotLog = append(gotLog, []byte("bc")...) + curBlockHeight = height + rescanMtx.Unlock() + }, + OnBlockDisconnected: func(hash *chainhash.Hash, + height int32, time time.Time) { + rescanMtx.Lock() + delete(ourKnownTxsByBlock, *hash) + gotLog = append(gotLog, []byte("bd")...) + curBlockHeight = height - 1 + rescanMtx.Unlock() + }, + OnRecvTx: func(tx *btcutil.Tx, + details *btcjson.BlockDetails) { + rescanMtx.Lock() + hash, err := chainhash.NewHashFromStr( + details.Hash) + if err != nil { + t.Errorf("Couldn't decode hash %s: %s", + details.Hash, err) + } + ourKnownTxsByBlock[*hash] = append( + ourKnownTxsByBlock[*hash], tx) + gotLog = append(gotLog, []byte("rv")...) + rescanMtx.Unlock() + }, + OnRedeemingTx: func(tx *btcutil.Tx, + details *btcjson.BlockDetails) { + rescanMtx.Lock() + hash, err := chainhash.NewHashFromStr( + details.Hash) + if err != nil { + t.Errorf("Couldn't decode hash %s: %s", + details.Hash, err) + } + ourKnownTxsByBlock[*hash] = append( + ourKnownTxsByBlock[*hash], tx) + gotLog = append(gotLog, []byte("rd")...) + rescanMtx.Unlock() + }, + OnFilteredBlockConnected: func(height int32, + header *wire.BlockHeader, + relevantTxs []*btcutil.Tx) { + rescanMtx.Lock() + ourKnownTxsByFilteredBlock[header.BlockHash()] = + relevantTxs + gotLog = append(gotLog, []byte("fc")...) + gotLog = append(gotLog, uint8(len(relevantTxs))) + curFilteredBlockHeight = height + rescanMtx.Unlock() + }, + OnFilteredBlockDisconnected: func(height int32, + header *wire.BlockHeader) { + rescanMtx.Lock() + delete(ourKnownTxsByFilteredBlock, + header.BlockHash()) + gotLog = append(gotLog, []byte("fd")...) + curFilteredBlockHeight = height - 1 + rescanMtx.Unlock() + }, + }), + ) + return nil +} + +// checkRescanStatus returns the number of relevant transactions we currently +// know about and the currently known height. +func checkRescanStatus() (int, int32, error) { + var txCount [2]int + rescanMtx.RLock() + defer rescanMtx.RUnlock() + for _, list := range ourKnownTxsByBlock { + for range list { + txCount[0]++ + } + } + for _, list := range ourKnownTxsByFilteredBlock { + for range list { + txCount[1]++ + } + } + if txCount[0] != txCount[1] { + return 0, 0, fmt.Errorf("Conflicting transaction count " + + "between notifications.") + } + if curBlockHeight != curFilteredBlockHeight { + return 0, 0, fmt.Errorf("Conflicting block height between " + + "notifications.") + } + return txCount[0], curBlockHeight, nil +}