diff --git a/spvsvc/spvchain/blockmanager.go b/spvsvc/spvchain/blockmanager.go index e671caf..1533cf7 100644 --- a/spvsvc/spvchain/blockmanager.go +++ b/spvsvc/spvchain/blockmanager.go @@ -13,8 +13,6 @@ import ( "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" - "github.com/btcsuite/btcutil/gcs" - "github.com/btcsuite/btcutil/gcs/builder" ) const ( @@ -91,13 +89,6 @@ type processCFHeadersMsg struct { extended bool } -// cfilterMsg packages a bitcoin cfilter message and the peer it came from -// together so the block handler has access to that information. -type cfilterMsg struct { - cfilter *wire.MsgCFilter - peer *serverPeer -} - // donePeerMsg signifies a newly disconnected peer to the block handler. type donePeerMsg struct { peer *serverPeer @@ -305,14 +296,6 @@ func (b *blockManager) handleDonePeerMsg(peers *list.List, sp *serverPeer) { log.Infof("Lost peer %s", sp) - // Remove requested blocks from the global map so that they will be - // fetched from elsewhere next time we get an inv. - // TODO: we could possibly here check which peers have these blocks - // and request them now to speed things up a little. - for k := range sp.requestedBlocks { - delete(b.requestedBlocks, k) - } - // Attempt to find a new peer to sync from if the quitting peer is the // sync peer. Also, reset the header state. if b.syncPeer != nil && b.syncPeer == sp { @@ -370,9 +353,6 @@ out: case *cfheadersMsg: b.handleCFHeadersMsg(msg) - case *cfilterMsg: - b.handleCFilterMsg(msg) - case *donePeerMsg: b.handleDonePeerMsg(candidatePeers, msg.peer) @@ -1304,91 +1284,6 @@ func (b *blockManager) handleProcessCFHeadersMsg(msg *processCFHeadersMsg) { } } -// QueueCFilter adds the passed cfilter message and peer to the block handling -// queue. -func (b *blockManager) QueueCFilter(cfilter *wire.MsgCFilter, sp *serverPeer) { - // No channel handling here because peers do not need to block on - // headers messages. - if atomic.LoadInt32(&b.shutdown) != 0 { - return - } - - // Make sure we've actually requested this message. - req := cfRequest{ - extended: cfilter.Extended, - blockHash: cfilter.BlockHash, - } - if _, ok := sp.requestedCFilters[req]; !ok { - return - } - delete(sp.requestedCFilters, req) - - b.peerChan <- &cfilterMsg{cfilter: cfilter, peer: sp} -} - -// handleCFilterMsg handles cfilter messages from all peers. -// TODO: Refactor for checking adversarial conditions. -func (b *blockManager) handleCFilterMsg(cfmsg *cfilterMsg) { - readFunc := b.server.GetBasicHeader - putFunc := b.server.putBasicFilter - if cfmsg.cfilter.Extended { - readFunc = b.server.GetExtHeader - putFunc = b.server.putExtFilter - } - // Check that the cfilter we received fits correctly into the filter - // chain. - blockHeader, _, err := b.server.GetBlockByHash(cfmsg.cfilter.BlockHash) - if err != nil { - log.Warnf("Received cfilter for unknown block: %s, extended: "+ - "%t", cfmsg.cfilter.BlockHash, cfmsg.cfilter.Extended) - return - } - cfHeader, err := readFunc(cfmsg.cfilter.BlockHash) - if err != nil { - log.Warnf("Received cfilter for block with unknown cfheader: "+ - "%s, extended: %t", cfmsg.cfilter.BlockHash, - cfmsg.cfilter.Extended) - return - } - cfPrevHeader, err := readFunc(blockHeader.PrevBlock) - if err != nil { - log.Warnf("Received cfilter for block with unknown previous "+ - "cfheader: %s, extended: %t", blockHeader.PrevBlock, - cfmsg.cfilter.Extended) - return - } - filter, err := gcs.FromNBytes(builder.DefaultP, cfmsg.cfilter.Data) - if err != nil { - log.Warnf("Couldn't parse cfilter data for block: %s, "+ - "extended: %t", cfmsg.cfilter.BlockHash, - cfmsg.cfilter.Extended) - return - } - if makeHeaderForFilter(filter, *cfPrevHeader) != *cfHeader { - log.Warnf("Got cfilter that doesn't match cfheader chain for "+ - "block: %s, extended: %t", cfmsg.cfilter.BlockHash, - cfmsg.cfilter.Extended) - return - } - // Save the cfilter we received into the database. - err = putFunc(cfmsg.cfilter.BlockHash, filter) - if err != nil { - log.Warnf("Couldn't write cfilter to database for block: %s, "+ - "extended: %t", cfmsg.cfilter.BlockHash, - cfmsg.cfilter.Extended) - // Should we panic here? - return - } - // Notify the ChainService of the newly-found filter. - b.server.query <- processCFilterMsg{ - cfRequest: cfRequest{ - blockHash: cfmsg.cfilter.BlockHash, - extended: cfmsg.cfilter.Extended, - }, - filter: filter, - } -} - // checkHeaderSanity checks the PoW, and timestamp of a block header. func (b *blockManager) checkHeaderSanity(blockHeader *wire.BlockHeader, maxTimestamp time.Time, reorgAttempt bool) error { diff --git a/spvsvc/spvchain/db.go b/spvsvc/spvchain/db.go index 5dcfecd..9e67806 100644 --- a/spvsvc/spvchain/db.go +++ b/spvsvc/spvchain/db.go @@ -464,20 +464,20 @@ func createSPVNS(namespace walletdb.Namespace, params *chaincfg.Params) error { log.Info("Creating wallet SPV namespace.") - basicFilter, err := buildBasicFilter(params.GenesisBlock) + basicFilter, err := BuildBasicFilter(params.GenesisBlock) if err != nil { return err } - basicFilterTip := makeHeaderForFilter(basicFilter, + basicFilterTip := MakeHeaderForFilter(basicFilter, params.GenesisBlock.Header.PrevBlock) - extFilter, err := buildExtFilter(params.GenesisBlock) + extFilter, err := BuildExtFilter(params.GenesisBlock) if err != nil { return err } - extFilterTip := makeHeaderForFilter(extFilter, + extFilterTip := MakeHeaderForFilter(extFilter, params.GenesisBlock.Header.PrevBlock) err = putBlock(tx, params.GenesisBlock.Header, 0) diff --git a/spvsvc/spvchain/filter.go b/spvsvc/spvchain/filter.go index f47bec4..7499bd8 100644 --- a/spvsvc/spvchain/filter.go +++ b/spvsvc/spvchain/filter.go @@ -7,7 +7,10 @@ import ( "github.com/btcsuite/btcutil/gcs/builder" ) -func buildBasicFilter(block *wire.MsgBlock) (*gcs.Filter, error) { +// TODO: Move these functions into github.com/btcsuite/btcutil/gcs/builder. + +// BuildBasicFilter will be factored out into gcs.builder +func BuildBasicFilter(block *wire.MsgBlock) (*gcs.Filter, error) { blockHash := block.BlockHash() b := builder.WithKeyHash(&blockHash) _, err := b.Key() @@ -34,7 +37,8 @@ func buildBasicFilter(block *wire.MsgBlock) (*gcs.Filter, error) { return f, nil } -func buildExtFilter(block *wire.MsgBlock) (*gcs.Filter, error) { +// BuildExtFilter will be factored out into gcs.builder +func BuildExtFilter(block *wire.MsgBlock) (*gcs.Filter, error) { blockHash := block.BlockHash() b := builder.WithKeyHash(&blockHash) _, err := b.Key() @@ -60,13 +64,15 @@ func buildExtFilter(block *wire.MsgBlock) (*gcs.Filter, error) { return f, nil } -func getFilterHash(filter *gcs.Filter) chainhash.Hash { +// GetFilterHash will be factored out into gcs.builder +func GetFilterHash(filter *gcs.Filter) chainhash.Hash { return chainhash.HashH(filter.NBytes()) } -func makeHeaderForFilter(filter *gcs.Filter, prevHeader chainhash.Hash) chainhash.Hash { +// MakeHeaderForFilter will be factored out into gcs.builder +func MakeHeaderForFilter(filter *gcs.Filter, prevHeader chainhash.Hash) chainhash.Hash { filterTip := make([]byte, 2*chainhash.HashSize) - filterHash := getFilterHash(filter) + filterHash := GetFilterHash(filter) copy(filterTip, filterHash[:]) copy(filterTip[chainhash.HashSize:], prevHeader[:]) return chainhash.HashH(filterTip) diff --git a/spvsvc/spvchain/notifications.go b/spvsvc/spvchain/notifications.go index 418a84e..cf6db03 100644 --- a/spvsvc/spvchain/notifications.go +++ b/spvsvc/spvchain/notifications.go @@ -8,9 +8,11 @@ import ( "errors" "github.com/btcsuite/btcd/addrmgr" + "github.com/btcsuite/btcd/blockchain" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/connmgr" "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil/gcs" "github.com/btcsuite/btcutil/gcs/builder" ) @@ -59,9 +61,10 @@ type getCFilterMsg struct { reply chan *gcs.Filter } -type processCFilterMsg struct { - cfRequest - filter *gcs.Filter +type getBlockMsg struct { + blockHeader *wire.BlockHeader + height uint32 + reply chan *btcutil.Block } // TODO: General - abstract out more of blockmanager into queries. It'll make @@ -204,7 +207,7 @@ func (s *ChainService) handleQuery(state *peerState, querymsg interface{}) { // can ignore this message. return } - if makeHeaderForFilter(filter, + if MakeHeaderForFilter(filter, *msg.prevHeader) != *msg.curHeader { // Filter data doesn't match @@ -227,54 +230,70 @@ func (s *ChainService) handleQuery(state *peerState, querymsg interface{}) { if !found { msg.reply <- nil } - /*sent := false - state.forAllPeers(func(sp *serverPeer) { - // Send to one peer at a time. No use flooding the - // network. - if sent { - return - } - // Don't send to a peer that's not connected. - if !sp.Connected() { - return - } - // Don't send to any peer from which we've already - // requested this cfilter. - if _, ok := sp.requestedCFilters[msg.cfRequest]; ok { - return - } - // Request a cfilter from the peer and mark sent as - // true so we don't ask any other peers unless - // necessary. - err := sp.pushGetCFilterMsg( - &msg.cfRequest.blockHash, - msg.cfRequest.extended) - if err == nil { - sent = true - } - - }) - if !sent { + case getBlockMsg: + found := false + getData := wire.NewMsgGetData() + blockHash := msg.blockHeader.BlockHash() + getData.AddInvVect(wire.NewInvVect(wire.InvTypeBlock, + &blockHash)) + state.queryPeers( + // Should we query this peer? + func(sp *serverPeer) bool { + // Don't send requests to disconnected peers. + return sp.Connected() + }, + // Send a wire.GetCFilterMsg + getData, + // Check responses and if we get one that matches, + // end the query early. + func(sp *serverPeer, resp wire.Message, + quit chan<- struct{}) { + switch response := resp.(type) { + // We're only interested in "block" messages. + case *wire.MsgBlock: + // If this isn't our block, ignore it. + if response.BlockHash() != + blockHash { + return + } + block := btcutil.NewBlock(response) + // Only set height if btcutil hasn't + // automagically put one in. + if block.Height() == + btcutil.BlockHeightUnknown { + block.SetHeight( + int32(msg.height)) + } + // If this claims our block but doesn't + // pass the sanity check, the peer is + // trying to bamboozle us. Disconnect + // it. + if err := blockchain.CheckBlockSanity( + block, + // We don't need to check PoW + // because by the time we get + // here, it's been checked + // during header synchronization + s.chainParams.PowLimit, + s.timeSource, + ); err != nil { + log.Warnf("Invalid block for "+ + "%s received from %s "+ + "-- disconnecting peer", + blockHash, sp.Addr()) + sp.Disconnect() + return + } + found = true + close(quit) + msg.reply <- block + default: + } + }, + ) + // We timed out without finding a correct answer to our query. + if !found { msg.reply <- nil - s.signalAllCFilters(msg.cfRequest, nil) - return } - // Record the required header information against which to check - // the cfilter. - s.cfRequestHeaders[msg.cfRequest] = [2]*chainhash.Hash{ - msg.prevHeader, - msg.curHeader, - }*/ - case processCFilterMsg: - s.signalAllCFilters(msg.cfRequest, msg.filter) } } - -func (s *ChainService) signalAllCFilters(req cfRequest, filter *gcs.Filter) { - go func() { - for _, replyChan := range s.cfilterRequests[req] { - replyChan <- filter - } - s.cfilterRequests[req] = make([]chan *gcs.Filter, 0) - }() -} diff --git a/spvsvc/spvchain/spvchain.go b/spvsvc/spvchain/spvchain.go index 5014776..c663ff4 100644 --- a/spvsvc/spvchain/spvchain.go +++ b/spvsvc/spvchain/spvchain.go @@ -217,6 +217,11 @@ func (ps *peerState) queryPeers( // to make sure we don't interrupt // another query. We need broadcast // support in OnRead to do this right. + // TODO: Fix this to support either + // querying *all* peers simultaneously + // to avoid timeout delays, or starting + // with the syncPeer when not querying + // *all* peers. sp.subscribeRecvMsg(channel) sp.QueueMessage(queryMsg, nil) timeout = time.After(qo.queryTimeout) @@ -265,14 +270,10 @@ type serverPeer struct { continueHash *chainhash.Hash relayMtx sync.Mutex requestQueue []*wire.InvVect - requestedCFilters map[cfRequest]struct{} requestedCFHeaders map[cfhRequest]int - requestedBlocks map[chainhash.Hash]struct{} knownAddresses map[string]struct{} banScore connmgr.DynamicBanScore quit chan struct{} - // The following chans are used to sync blockmanager and server. - blockProcessed chan struct{} // The following slice of channels is used to subscribe to messages from // the peer. This allows broadcast to multiple subscribers at once, // allowing for multiple queries to be going to multiple peers at any @@ -289,12 +290,9 @@ func newServerPeer(s *ChainService, isPersistent bool) *serverPeer { return &serverPeer{ server: s, persistent: isPersistent, - requestedCFilters: make(map[cfRequest]struct{}), - requestedBlocks: make(map[chainhash.Hash]struct{}), requestedCFHeaders: make(map[cfhRequest]int), knownAddresses: make(map[string]struct{}), quit: make(chan struct{}), - blockProcessed: make(chan struct{}, 1), } } @@ -370,20 +368,6 @@ func (sp *serverPeer) pushGetCFHeadersMsg(locator blockchain.BlockLocator, return nil } -// pushGetCFilterMsg sends a getcfilter message for the provided block hash to -// the connected peer. -func (sp *serverPeer) pushGetCFilterMsg(blockHash *chainhash.Hash, - ext bool) error { - req := cfRequest{ - extended: ext, - blockHash: *blockHash, - } - sp.requestedCFilters[req] = struct{}{} - msg := wire.NewMsgGetCFilter(blockHash, ext) - sp.QueueMessage(msg, nil) - return nil -} - // pushSendHeadersMsg sends a sendheaders message to the connected peer. func (sp *serverPeer) pushSendHeadersMsg() error { if sp.VersionKnown() { @@ -436,33 +420,6 @@ func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) { sp.server.AddPeer(sp) } -// OnBlock is invoked when a peer receives a block bitcoin message. It -// blocks until the bitcoin block has been fully processed. -func (sp *serverPeer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) { - log.Tracef("got block %s", msg.BlockHash()) - // Convert the raw MsgBlock to a btcutil.Block which provides some - // convenience methods and things such as hash caching. - block := btcutil.NewBlockFromBlockAndBytes(msg, buf) - - // Add the block to the known inventory for the peer. - iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash()) - sp.AddKnownInventory(iv) - - // Queue the block up to be handled by the block - // manager and intentionally block further receives - // until the bitcoin block is fully processed and known - // good or bad. This helps prevent a malicious peer - // from queuing up a bunch of bad blocks before - // disconnecting (or being disconnected) and wasting - // memory. Additionally, this behavior is depended on - // by at least the block acceptance test tool as the - // reference implementation processes blocks in the same - // thread and therefore blocks further messages until - // the bitcoin block has been fully processed. - //sp.server.blockManager.QueueBlock(block, sp) - <-sp.blockProcessed -} - // OnInv is invoked when a peer receives an inv bitcoin message and is // used to examine the inventory being advertised by the remote peer and react // accordingly. We pass the message down to blockmanager which will call @@ -602,13 +559,6 @@ func (sp *serverPeer) OnCFHeaders(p *peer.Peer, msg *wire.MsgCFHeaders) { sp.server.blockManager.QueueCFHeaders(msg, sp) } -// OnCFilter is invoked when a peer receives a cfilter bitcoin message and is -// used to notify the server about a committed filter. -func (sp *serverPeer) OnCFilter(p *peer.Peer, msg *wire.MsgCFilter) { - log.Tracef("Got cfilter message from %s", p.Addr()) - sp.server.blockManager.QueueCFilter(msg, sp) -} - // OnAddr is invoked when a peer receives an addr bitcoin message and is // used to notify the server about advertised addresses. func (sp *serverPeer) OnAddr(_ *peer.Peer, msg *wire.MsgAddr) { @@ -1294,11 +1244,9 @@ func newPeerConfig(sp *serverPeer) *peer.Config { Listeners: peer.MessageListeners{ OnVersion: sp.OnVersion, //OnVerAck: sp.OnVerAck, // Don't use sendheaders yet - OnBlock: sp.OnBlock, OnInv: sp.OnInv, OnHeaders: sp.OnHeaders, OnCFHeaders: sp.OnCFHeaders, - OnCFilter: sp.OnCFilter, OnGetData: sp.OnGetData, OnReject: sp.OnReject, OnFeeFilter: sp.OnFeeFilter, @@ -1784,3 +1732,24 @@ func (s *ChainService) GetCFilter(blockHash chainhash.Hash, } return filter } + +// GetBlockFromNetwork gets a block by requesting it from the network, one peer +// at a time, until one answers. +func (s *ChainService) GetBlockFromNetwork( + blockHash chainhash.Hash) *btcutil.Block { + blockHeader, height, err := s.GetBlockByHash(blockHash) + if err != nil || blockHeader.BlockHash() != blockHash { + return nil + } + replyChan := make(chan *btcutil.Block) + s.query <- getBlockMsg{ + blockHeader: &blockHeader, + height: height, + reply: replyChan, + } + block := <-replyChan + if block != nil { + log.Tracef("Got block %s from network", blockHash) + } + return block +} diff --git a/spvsvc/spvchain/sync_test.go b/spvsvc/spvchain/sync_test.go index 72fb11f..a3fcc1a 100644 --- a/spvsvc/spvchain/sync_test.go +++ b/spvsvc/spvchain/sync_test.go @@ -6,6 +6,8 @@ import ( "io/ioutil" "math/rand" "os" + "reflect" + "sync" "testing" "time" @@ -23,9 +25,10 @@ import ( ) const ( - logLevel = btclog.TraceLvl - syncTimeout = 30 * time.Second - syncUpdate = time.Second + logLevel = btclog.TraceLvl + syncTimeout = 30 * time.Second + syncUpdate = time.Second + numTestBlocks = 50 ) func TestSetup(t *testing.T) { @@ -206,6 +209,13 @@ func TestSetup(t *testing.T) { if err != nil { t.Fatalf("Couldn't sync ChainService: %s", err) } + + // Test that we can get blocks and cfilters via P2P and decide which are + // valid and which aren't. + err = testRandomBlocks(t, svc, h1) + if err != nil { + t.Fatalf("Testing blocks and cfilters failed: %s", err) + } } // csd does a connect-sync-disconnect between nodes in order to support @@ -360,48 +370,217 @@ func waitForSync(t *testing.T, svc *spvchain.ChainService, knownExtHeader.HeaderHashes[0]) } } - // Test getting 15 random filters. - heights := rand.Perm(int(haveBest.Height)) - for i := 0; i < 15; i++ { - height := uint32(heights[i]) - block, _, err := svc.GetBlockByHeight(height) - if err != nil { - return fmt.Errorf("Get block by height %d:"+ - " %s", height, err) - } - blockHash := block.BlockHash() - haveFilter := svc.GetCFilter(blockHash, false) - if haveFilter == nil { - return fmt.Errorf("Couldn't get basic "+ - "filter for block %d", height) - } - t.Logf("%x", haveFilter.NBytes()) - wantFilter, err := correctSyncNode.Node.GetCFilter(&blockHash, - false) - if err != nil { - return fmt.Errorf("Couldn't get basic filter for "+ - "block %d via RPC: %s", height, err) - } - if !bytes.Equal(haveFilter.NBytes(), wantFilter.Data) { - return fmt.Errorf("Basic filter from P2P network/DB"+ - " doesn't match RPC value for block %d", height) - } - haveFilter = svc.GetCFilter(blockHash, true) - if haveFilter == nil { - return fmt.Errorf("Couldn't get extended "+ - "filter for block %d", height) - } - t.Logf("%x", haveFilter.NBytes()) - wantFilter, err = correctSyncNode.Node.GetCFilter(&blockHash, - true) - if err != nil { - return fmt.Errorf("Couldn't get extended filter for "+ - "block %d via RPC: %s", height, err) - } - if !bytes.Equal(haveFilter.NBytes(), wantFilter.Data) { - return fmt.Errorf("Extended filter from P2P network/DB"+ - " doesn't match RPC value for block %d", height) - } - } return nil } + +// testRandomBlocks goes through numTestBlocks random blocks and ensures we +// can correctly get filters from them. We don't go through *all* the blocks +// because it can be a little slow, but we'll improve that soon-ish hopefully +// to the point where we can do it. +// TODO: Improve concurrency on framework side. +func testRandomBlocks(t *testing.T, svc *spvchain.ChainService, + correctSyncNode *rpctest.Harness) error { + var haveBest *waddrmgr.BlockStamp + haveBest, err := svc.BestSnapshot() + if err != nil { + return fmt.Errorf("Couldn't get best snapshot from "+ + "ChainService: %s", err) + } + // Keep track of an error channel + errChan := make(chan error) + var lastErr error + go func() { + for err := range errChan { + if err != nil { + t.Errorf("%s", err) + lastErr = fmt.Errorf("Couldn't validate all " + + "blocks, filters, and filter headers.") + } + } + }() + // Test getting numTestBlocks random blocks and filters. + var wg sync.WaitGroup + heights := rand.Perm(int(haveBest.Height)) + for i := 0; i < numTestBlocks; i++ { + wg.Add(1) + height := uint32(heights[i]) + go func() { + defer wg.Done() + // Get block header from database. + blockHeader, blockHeight, err := svc.GetBlockByHeight(height) + if err != nil { + errChan <- fmt.Errorf("Couldn't get block "+ + "header by height %d: %s", height, err) + return + } + if blockHeight != height { + errChan <- fmt.Errorf("Block height retrieved from DB "+ + "doesn't match expected height. Want: %d, "+ + "have: %d", height, blockHeight) + return + } + blockHash := blockHeader.BlockHash() + // Get block via RPC. + wantBlock, err := correctSyncNode.Node.GetBlock(&blockHash) + if err != nil { + errChan <- fmt.Errorf("Couldn't get block %d (%s) by RPC", + height, blockHash) + return + } + // Get block from network. + haveBlock := svc.GetBlockFromNetwork(blockHash) + if haveBlock == nil { + errChan <- fmt.Errorf("Couldn't get block %d (%s) from"+ + "network", height, blockHash) + return + } + // Check that network and RPC blocks match. + if !reflect.DeepEqual(*haveBlock.MsgBlock(), *wantBlock) { + errChan <- fmt.Errorf("Block from network doesn't match "+ + "block from RPC. Want: %s, RPC: %s, network: "+ + "%s", blockHash, wantBlock.BlockHash(), + haveBlock.MsgBlock().BlockHash()) + return + } + // Check that block height matches what we have. + if int32(blockHeight) != haveBlock.Height() { + errChan <- fmt.Errorf("Block height from network doesn't "+ + "match expected height. Want: %s, network: %s", + blockHeight, haveBlock.Height()) + return + } + // Get basic cfilter from network. + haveFilter := svc.GetCFilter(blockHash, false) + if haveFilter == nil { + errChan <- fmt.Errorf("Couldn't get basic "+ + "filter for block %d", height) + return + } + // Get basic cfilter from RPC. + wantFilter, err := correctSyncNode.Node.GetCFilter(&blockHash, + false) + if err != nil { + errChan <- fmt.Errorf("Couldn't get basic filter for "+ + "block %d via RPC: %s", height, err) + return + } + // Check that network and RPC cfilters match. + if !bytes.Equal(haveFilter.NBytes(), wantFilter.Data) { + errChan <- fmt.Errorf("Basic filter from P2P network/DB"+ + " doesn't match RPC value for block %d", height) + return + } + // Calculate basic filter from block. + calcFilter, err := spvchain.BuildBasicFilter( + haveBlock.MsgBlock()) + if err != nil { + errChan <- fmt.Errorf("Couldn't build basic filter for "+ + "block %d (%s): %s", height, blockHash, err) + return + } + // Check that the network value matches the calculated value + // from the block. + if !reflect.DeepEqual(*haveFilter, *calcFilter) { + errChan <- fmt.Errorf("Basic filter from P2P network/DB "+ + "doesn't match calculated value for block %d", + height) + return + } + // Get previous basic filter header from the database. + prevHeader, err := svc.GetBasicHeader(blockHeader.PrevBlock) + if err != nil { + errChan <- fmt.Errorf("Couldn't get basic filter header "+ + "for block %d (%s) from DB: %s", height-1, + blockHeader.PrevBlock, err) + return + } + // Get current basic filter header from the database. + curHeader, err := svc.GetBasicHeader(blockHash) + if err != nil { + errChan <- fmt.Errorf("Couldn't get basic filter header "+ + "for block %d (%s) from DB: %s", height-1, + blockHash, err) + return + } + // Check that the filter and header line up. + calcHeader := spvchain.MakeHeaderForFilter(calcFilter, + *prevHeader) + if !bytes.Equal(curHeader[:], calcHeader[:]) { + errChan <- fmt.Errorf("Filter header doesn't match. Want: "+ + "%s, got: %s", curHeader, calcHeader) + return + } + // Get extended cfilter from network + haveFilter = svc.GetCFilter(blockHash, true) + if haveFilter == nil { + errChan <- fmt.Errorf("Couldn't get extended "+ + "filter for block %d", height) + return + } + // Get extended cfilter from RPC + wantFilter, err = correctSyncNode.Node.GetCFilter(&blockHash, + true) + if err != nil { + errChan <- fmt.Errorf("Couldn't get extended filter for "+ + "block %d via RPC: %s", height, err) + return + } + // Check that network and RPC cfilters match + if !bytes.Equal(haveFilter.NBytes(), wantFilter.Data) { + errChan <- fmt.Errorf("Extended filter from P2P network/DB"+ + " doesn't match RPC value for block %d", height) + return + } + // Calculate extended filter from block + calcFilter, err = spvchain.BuildExtFilter( + haveBlock.MsgBlock()) + if err != nil { + errChan <- fmt.Errorf("Couldn't build extended filter for "+ + "block %d (%s): %s", height, blockHash, err) + return + } + // Check that the network value matches the calculated value + // from the block. + if !reflect.DeepEqual(*haveFilter, *calcFilter) { + errChan <- fmt.Errorf("Extended filter from P2P network/DB"+ + " doesn't match calculated value for block %d", + height) + return + } + // Get previous extended filter header from the database. + prevHeader, err = svc.GetExtHeader(blockHeader.PrevBlock) + if err != nil { + errChan <- fmt.Errorf("Couldn't get extended filter header"+ + " for block %d (%s) from DB: %s", height-1, + blockHeader.PrevBlock, err) + return + } + // Get current basic filter header from the database. + curHeader, err = svc.GetExtHeader(blockHash) + if err != nil { + errChan <- fmt.Errorf("Couldn't get extended filter header"+ + " for block %d (%s) from DB: %s", height-1, + blockHash, err) + return + } + // Check that the filter and header line up. + calcHeader = spvchain.MakeHeaderForFilter(calcFilter, + *prevHeader) + if !bytes.Equal(curHeader[:], calcHeader[:]) { + errChan <- fmt.Errorf("Filter header doesn't match. Want: "+ + "%s, got: %s", curHeader, calcHeader) + return + } + }() + } + // Wait for all queries to finish. + wg.Wait() + if logLevel != btclog.Off { + t.Logf("Finished checking %d blocks and their cfilters", + numTestBlocks) + } + // Close the error channel to make the error monitoring goroutine + // finish. + close(errChan) + return lastErr +}