From 8b734ce6966f8d20d11d8c8fe380a17060ac4acd Mon Sep 17 00:00:00 2001 From: Alex Date: Fri, 12 May 2017 11:05:30 -0600 Subject: [PATCH] Fix bugs and add SendTransaction, which isn't that great. --- spvsvc/spvchain/query.go | 73 +++++++-- spvsvc/spvchain/rescan.go | 58 ++++--- spvsvc/spvchain/spvchain.go | 119 +-------------- spvsvc/spvchain/sync_test.go | 282 +++++++++++++++++++++-------------- 4 files changed, 271 insertions(+), 261 deletions(-) diff --git a/spvsvc/spvchain/query.go b/spvsvc/spvchain/query.go index f23f468..cb32fba 100644 --- a/spvsvc/spvchain/query.go +++ b/spvsvc/spvchain/query.go @@ -3,6 +3,8 @@ package spvchain import ( + "fmt" + "strings" "sync" "sync/atomic" "time" @@ -207,7 +209,8 @@ func (s *ChainService) queryPeers( // to the channel if we quit before // reading the channel. sentChan := make(chan struct{}, 1) - sp.QueueMessage(queryMsg, sentChan) + sp.QueueMessageWithEncoding(queryMsg, + sentChan, wire.WitnessEncoding) select { case <-sentChan: case <-quit: @@ -279,7 +282,7 @@ checkResponses: // GetCFilter gets a cfilter from the database. Failing that, it requests the // cfilter from the network and writes it to the database. func (s *ChainService) GetCFilter(blockHash chainhash.Hash, - extended bool, options ...QueryOption) *gcs.Filter { + extended bool, options ...QueryOption) (*gcs.Filter, error) { getFilter := s.GetBasicFilter getHeader := s.GetBasicHeader putFilter := s.putBasicFilter @@ -290,22 +293,34 @@ func (s *ChainService) GetCFilter(blockHash chainhash.Hash, } filter, err := getFilter(blockHash) if err == nil && filter != nil { - return filter + return filter, nil } // We didn't get the filter from the DB, so we'll set it to nil and try // to get it from the network. filter = nil block, _, err := s.GetBlockByHash(blockHash) - if err != nil || block.BlockHash() != blockHash { - return nil + if err != nil { + return nil, err + } + if block.BlockHash() != blockHash { + return nil, fmt.Errorf("Couldn't get header for block %s "+ + "from database", blockHash) } curHeader, err := getHeader(blockHash) if err != nil { - return nil + return nil, fmt.Errorf("Couldn't get cfheader for block %s "+ + "from database", blockHash) } prevHeader, err := getHeader(block.PrevBlock) if err != nil { - return nil + return nil, fmt.Errorf("Couldn't get cfheader for block %s "+ + "from database", blockHash) + } + // If we're expecting a zero filter, just return a nil filter and don't + // bother trying to get it from the network. The caller will know + // there's no error because we're also returning a nil error. + if builder.MakeHeaderForFilter(nil, *prevHeader) == *curHeader { + return nil, nil } s.queryPeers( // Send a wire.GetCFilterMsg @@ -364,19 +379,21 @@ func (s *ChainService) GetCFilter(blockHash chainhash.Hash, log.Tracef("Wrote filter for block %s, extended: %t", blockHash, extended) } - return filter + return filter, nil } // GetBlockFromNetwork gets a block by requesting it from the network, one peer // at a time, until one answers. func (s *ChainService) GetBlockFromNetwork( - blockHash chainhash.Hash, options ...QueryOption) *btcutil.Block { + blockHash chainhash.Hash, options ...QueryOption) (*btcutil.Block, + error) { blockHeader, height, err := s.GetBlockByHash(blockHash) if err != nil || blockHeader.BlockHash() != blockHash { - return nil + return nil, fmt.Errorf("Couldn't get header for block %s "+ + "from database", blockHash) } getData := wire.NewMsgGetData() - getData.AddInvVect(wire.NewInvVect(wire.InvTypeBlock, + getData.AddInvVect(wire.NewInvVect(wire.InvTypeWitnessBlock, &blockHash)) // The block is only updated from the checkResponse function argument, // which is always called single-threadedly. We don't check the block @@ -441,5 +458,37 @@ func (s *ChainService) GetBlockFromNetwork( }, options..., ) - return foundBlock + if foundBlock == nil { + return nil, fmt.Errorf("Couldn't retrieve block %s from "+ + "network", blockHash) + } + return foundBlock, nil +} + +// SendTransaction sends a transaction to each peer. It returns an error if any +// peer rejects the transaction for any reason than that it's already known. +// TODO: Better privacy by sending to only one random peer and watching +// propagation, requires better peer selection support in query API. +func (s *ChainService) SendTransaction(tx *wire.MsgTx, + options ...QueryOption) error { + var err error + s.queryPeers( + tx, + func(sp *serverPeer, resp wire.Message, quit chan<- struct{}) { + switch response := resp.(type) { + case *wire.MsgReject: + if response.Hash == tx.TxHash() && + !strings.Contains(response.Reason, + "already have transaction") { + err = log.Errorf("Transaction %s "+ + "rejected by %s: %s", + tx.TxHash(), sp.Addr(), + response.Reason) + close(quit) + } + } + }, + options..., + ) + return err } diff --git a/spvsvc/spvchain/rescan.go b/spvsvc/spvchain/rescan.go index 966d211..00f16e4 100644 --- a/spvsvc/spvchain/rescan.go +++ b/spvsvc/spvchain/rescan.go @@ -320,7 +320,10 @@ rescanLoop: var err error key := builder.DeriveKey(&curStamp.Hash) matched := false - bFilter = s.GetCFilter(curStamp.Hash, false) + bFilter, err = s.GetCFilter(curStamp.Hash, false) + if err != nil { + return err + } if bFilter != nil && bFilter.N() != 0 { // We see if any relevant transactions match. matched, err = bFilter.MatchAny(key, watchList) @@ -329,7 +332,10 @@ rescanLoop: } } if len(ro.watchTXIDs) > 0 { - eFilter = s.GetCFilter(curStamp.Hash, true) + eFilter, err = s.GetCFilter(curStamp.Hash, true) + if err != nil { + return err + } } if eFilter != nil && eFilter.N() != 0 { // We see if any relevant transactions match. @@ -345,11 +351,14 @@ rescanLoop: // We've matched. Now we actually get the block // and cycle through the transactions to see // which ones are relevant. - block = s.GetBlockFromNetwork( + block, err = s.GetBlockFromNetwork( curStamp.Hash, ro.queryOptions...) + if err != nil { + return err + } if block == nil { - return fmt.Errorf("Couldn't get block "+ - "%d (%s)", curStamp.Height, + return fmt.Errorf("Couldn't get block %d "+ + "(%s) from network", curStamp.Height, curStamp.Hash) } relevantTxs, err = notifyBlock(block, @@ -409,9 +418,7 @@ func notifyBlock(block *btcutil.Block, outPoints *[]wire.OutPoint, } } for outIdx, out := range tx.MsgTx().TxOut { - pushedData, err := - txscript.PushedData( - out.PkScript) + pushedData, err := txscript.PushedData(out.PkScript) if err != nil { continue } @@ -503,42 +510,51 @@ func (s *ChainService) GetUtxo(options ...RescanOption) (*wire.TxOut, error) { } } log.Tracef("Starting scan for output spend from known block %d (%s) "+ - "back to block %d (%s)", curStamp.Height, curStamp.Hash) + "back to block %d (%s)", curStamp.Height, curStamp.Hash, + ro.startBlock.Height, ro.startBlock.Hash) for { // Check the basic filter for the spend and the extended filter // for the transaction in which the outpout is funded. - filter := s.GetCFilter(curStamp.Hash, false, + filter, err := s.GetCFilter(curStamp.Hash, false, ro.queryOptions...) - if filter == nil { + if err != nil { return nil, fmt.Errorf("Couldn't get basic filter for "+ "block %d (%s)", curStamp.Height, curStamp.Hash) } - matched, err := filter.MatchAny(builder.DeriveKey( - &curStamp.Hash), watchList) + matched := false + if filter != nil { + matched, err = filter.MatchAny(builder.DeriveKey( + &curStamp.Hash), watchList) + } if err != nil { return nil, err } if !matched { - filter = s.GetCFilter(curStamp.Hash, true, + filter, err = s.GetCFilter(curStamp.Hash, true, ro.queryOptions...) - if filter == nil { + if err != nil { return nil, fmt.Errorf("Couldn't get extended "+ "filter for block %d (%s)", curStamp.Height, curStamp.Hash) } - matched, err = filter.MatchAny(builder.DeriveKey( - &curStamp.Hash), watchList) + if filter != nil { + matched, err = filter.MatchAny( + builder.DeriveKey(&curStamp.Hash), + watchList) + } } // If either is matched, download the block and check to see // what we have. if matched { - block := s.GetBlockFromNetwork(curStamp.Hash, + block, err := s.GetBlockFromNetwork(curStamp.Hash, ro.queryOptions...) + if err != nil { + return nil, err + } if block == nil { - return nil, fmt.Errorf("Couldn't get "+ - "block %d (%s)", - curStamp.Height, curStamp.Hash) + return nil, fmt.Errorf("Couldn't get block %d "+ + "(%s)", curStamp.Height, curStamp.Hash) } // If we've spent the output in this block, return an // error stating that the output is spent. diff --git a/spvsvc/spvchain/spvchain.go b/spvsvc/spvchain/spvchain.go index d1f0b0e..dc63fe1 100644 --- a/spvsvc/spvchain/spvchain.go +++ b/spvsvc/spvchain/spvchain.go @@ -21,7 +21,6 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" "github.com/btcsuite/btcwallet/waddrmgr" - "github.com/btcsuite/btcwallet/wallet" "github.com/btcsuite/btcwallet/walletdb" ) @@ -43,11 +42,11 @@ var ( UserAgentVersion = "0.0.1-alpha" // Services describes the services that are supported by the server. - Services = wire.SFNodeCF + Services = wire.SFNodeWitness | wire.SFNodeCF // RequiredServices describes the services that are required to be // supported by outbound peers. - RequiredServices = wire.SFNodeNetwork | wire.SFNodeCF + RequiredServices = wire.SFNodeNetwork | wire.SFNodeWitness | wire.SFNodeCF // BanThreshold is the maximum ban score before a peer is banned. BanThreshold = uint32(100) @@ -1052,33 +1051,6 @@ func disconnectPeer(peerList map[int32]*serverPeer, compareFunc func(*serverPeer return false } -// sendUnminedTxs iterates through all transactions that spend from wallet -// credits that are not known to have been mined into a block, and attempts to -// send each to the chain server for relay. -// -// TODO: This should return an error if any of these lookups or sends fail, but -// since send errors due to double spends need to be handled gracefully and this -// isn't done yet, all sending errors are simply logged. -func (s *ChainService) sendUnminedTxs(w *wallet.Wallet) error { - /*txs, err := w.TxStore.UnminedTxs() - if err != nil { - return err - } - rpcClient := s.rpcClient - for _, tx := range txs { - resp, err := rpcClient.SendRawTransaction(tx, false) - if err != nil { - // TODO(jrick): Check error for if this tx is a double spend, - // remove it if so. - log.Debugf("Could not resend transaction %v: %v", - tx.TxHash(), err) - continue - } - log.Debugf("Resent unmined transaction %v", resp) - }*/ - return nil -} - // PublishTransaction sends the transaction to the consensus RPC server so it // can be propigated to other nodes and eventually mined. func (s *ChainService) PublishTransaction(tx *wire.MsgTx) error { @@ -1087,31 +1059,6 @@ func (s *ChainService) PublishTransaction(tx *wire.MsgTx) error { return nil } -// AnnounceNewTransactions generates and relays inventory vectors and notifies -// both websocket and getblocktemplate long poll clients of the passed -// transactions. This function should be called whenever new transactions -// are added to the mempool. -func (s *ChainService) AnnounceNewTransactions( /*newTxs []*mempool.TxDesc*/ ) { - // Generate and relay inventory vectors for all newly accepted - // transactions into the memory pool due to the original being - // accepted. - /*for _, txD := range newTxs { - // Generate the inventory vector and relay it. - iv := wire.NewInvVect(wire.InvTypeTx, txD.Tx.Hash()) - s.RelayInventory(iv, txD) - - if s.rpcServer != nil { - // Notify websocket clients about mempool transactions. - s.rpcServer.ntfnMgr.NotifyMempoolTx(txD.Tx, true) - - // Potentially notify any getblocktemplate long poll clients - // about stale block templates due to the new transaction. - s.rpcServer.gbtWorkState.NotifyMempoolTx( - s.txMemPool.LastUpdated()) - } - }*/ -} - // newPeerConfig returns the configuration for the given serverPeer. func newPeerConfig(sp *serverPeer) *peer.Config { return &peer.Config{ @@ -1189,64 +1136,6 @@ func (s *ChainService) UpdatePeerHeights(latestBlkHash *chainhash.Hash, latestHe } } -// rebroadcastHandler keeps track of user submitted inventories that we have -// sent out but have not yet made it into a block. We periodically rebroadcast -// them in case our peers restarted or otherwise lost track of them. -func (s *ChainService) rebroadcastHandler() { - // Wait 5 min before first tx rebroadcast. - timer := time.NewTimer(5 * time.Minute) - //pendingInvs := make(map[wire.InvVect]interface{}) - -out: - for { - select { - /*case riv := <-s.modifyRebroadcastInv: - switch msg := riv.(type) { - // Incoming InvVects are added to our map of RPC txs. - case broadcastInventoryAdd: - pendingInvs[*msg.invVect] = msg.data - - // When an InvVect has been added to a block, we can - // now remove it, if it was present. - case broadcastInventoryDel: - if _, ok := pendingInvs[*msg]; ok { - delete(pendingInvs, *msg) - } - }*/ - - case <-timer.C: /* - // Any inventory we have has not made it into a block - // yet. We periodically resubmit them until they have. - for iv, data := range pendingInvs { - ivCopy := iv - s.RelayInventory(&ivCopy, data) - } - - // Process at a random time up to 30mins (in seconds) - // in the future. - timer.Reset(time.Second * - time.Duration(randomUint16Number(1800))) */ - - case <-s.quit: - break out - } - } - - timer.Stop() - - // Drain channels before exiting so nothing is left waiting around - // to send. - /*cleanup: - for { - select { - //case <-s.modifyRebroadcastInv: - default: - break cleanup - } - }*/ - s.wg.Done() -} - // ChainParams returns a copy of the ChainService's chaincfg.Params. func (s *ChainService) ChainParams() chaincfg.Params { return s.chainParams @@ -1261,10 +1150,8 @@ func (s *ChainService) Start() { // Start the peer handler which in turn starts the address and block // managers. - s.wg.Add(2) + s.wg.Add(1) go s.peerHandler() - go s.rebroadcastHandler() - } // Stop gracefully shuts down the server by stopping and disconnecting all diff --git a/spvsvc/spvchain/sync_test.go b/spvsvc/spvchain/sync_test.go index d6fbfec..8c8623a 100644 --- a/spvsvc/spvchain/sync_test.go +++ b/spvsvc/spvchain/sync_test.go @@ -23,6 +23,7 @@ import ( "github.com/btcsuite/btclog" "github.com/btcsuite/btcrpcclient" "github.com/btcsuite/btcutil" + "github.com/btcsuite/btcutil/gcs" "github.com/btcsuite/btcutil/gcs/builder" "github.com/btcsuite/btcwallet/spvsvc/spvchain" "github.com/btcsuite/btcwallet/waddrmgr" @@ -37,7 +38,7 @@ var ( // log messages from the tests themselves as well. Keep in mind some // log messages may not appear in order due to use of multiple query // goroutines in the tests. - logLevel = btclog.Off + logLevel = btclog.TraceLvl syncTimeout = 30 * time.Second syncUpdate = time.Second // Don't set this too high for your platform, or the tests will miss @@ -323,10 +324,8 @@ func TestSetup(t *testing.T) { }, } - spvchain.Services = 0 spvchain.MaxPeers = 3 spvchain.BanDuration = 5 * time.Second - spvchain.RequiredServices = wire.SFNodeNetwork spvchain.WaitForMoreCFHeaders = time.Second svc, err := spvchain.NewChainService(config) if err != nil { @@ -341,14 +340,6 @@ func TestSetup(t *testing.T) { t.Fatalf("Couldn't sync ChainService: %s", err) } - // Test that we can get blocks and cfilters via P2P and decide which are - // valid and which aren't. - // TODO: Split this out into a benchmark. - err = testRandomBlocks(t, svc, h1) - if err != nil { - t.Fatalf("Testing blocks and cfilters failed: %s", err) - } - // Generate an address and send it some coins on the h1 chain. We use // this to test rescans and notifications. secSrc := newSecSource(&modParams) @@ -568,9 +559,10 @@ func TestSetup(t *testing.T) { if err != nil { t.Fatalf("Couldn't sign transaction: %s", err) } - _, err = h1.Node.SendRawTransaction(authTx1.Tx, true) + banPeer(svc, h2) + err = svc.SendTransaction(authTx1.Tx, queryOptions...) if err != nil { - t.Fatalf("Unable to send raw transaction to node: %s", err) + t.Fatalf("Unable to send transaction to network: %s", err) } _, err = h1.Node.Generate(1) if err != nil { @@ -604,9 +596,10 @@ func TestSetup(t *testing.T) { if err != nil { t.Fatalf("Couldn't sign transaction: %s", err) } - _, err = h1.Node.SendRawTransaction(authTx2.Tx, true) + banPeer(svc, h2) + err = svc.SendTransaction(authTx2.Tx, queryOptions...) if err != nil { - t.Fatalf("Unable to send raw transaction to node: %s", err) + t.Fatalf("Unable to send transaction to network: %s", err) } _, err = h1.Node.Generate(1) if err != nil { @@ -621,17 +614,20 @@ func TestSetup(t *testing.T) { t.Fatalf("Wrong number of relevant transactions. Want: 4, got:"+ " %d", numTXs) } - // Generate 1 blocks on h1, one at a time, to make sure the - // ChainService instance stays caught up. - for i := 0; i < 1; i++ { - _, err = h1.Node.Generate(1) - if err != nil { - t.Fatalf("Couldn't generate/submit block: %s", err) - } - err = waitForSync(t, svc, h1) - if err != nil { - t.Fatalf("Couldn't sync ChainService: %s", err) - } + // Generate a block with a nonstandard coinbase to generate a basic + // filter with 0 entries. + _, err = h1.GenerateAndSubmitBlockWithCustomCoinbaseOutputs( + []*btcutil.Tx{}, rpctest.BlockVersion, time.Time{}, + []wire.TxOut{{ + Value: 0, + PkScript: []byte{}, + }}) + if err != nil { + t.Fatalf("Couldn't generate/submit block: %s", err) + } + err = waitForSync(t, svc, h1) + if err != nil { + t.Fatalf("Couldn't sync ChainService: %s", err) } // Check and make sure the previous UTXO is now spent. @@ -644,8 +640,17 @@ func TestSetup(t *testing.T) { t.Fatalf("UTXO %s not seen as spent: %s", ourOutPoint, err) } + // Test that we can get blocks and cfilters via P2P and decide which are + // valid and which aren't. + // TODO: Split this out into a benchmark. + err = testRandomBlocks(t, svc, h1) + if err != nil { + t.Fatalf("Testing blocks and cfilters failed: %s", err) + } + // Generate 5 blocks on h2 and wait for ChainService to sync to the - // newly-best chain on h2. + // newly-best chain on h2. This includes the transactions sent via + // svc.SendTransaction earlier, so we'll have to _, err = h2.Node.Generate(5) if err != nil { t.Fatalf("Couldn't generate/submit blocks: %s", err) @@ -913,8 +918,12 @@ func testRandomBlocks(t *testing.T, svc *spvchain.ChainService, return } // Get block from network. - haveBlock := svc.GetBlockFromNetwork(blockHash, + haveBlock, err := svc.GetBlockFromNetwork(blockHash, queryOptions...) + if err != nil { + errChan <- err + return + } if haveBlock == nil { errChan <- fmt.Errorf("Couldn't get block %d "+ "(%s) from network", height, blockHash) @@ -939,11 +948,10 @@ func testRandomBlocks(t *testing.T, svc *spvchain.ChainService, return } // Get basic cfilter from network. - haveFilter := svc.GetCFilter(blockHash, false, + haveFilter, err := svc.GetCFilter(blockHash, false, queryOptions...) - if haveFilter == nil { - errChan <- fmt.Errorf("Couldn't get basic "+ - "filter for block %d", height) + if err != nil { + errChan <- err return } // Get basic cfilter from RPC. @@ -956,7 +964,11 @@ func testRandomBlocks(t *testing.T, svc *spvchain.ChainService, return } // Check that network and RPC cfilters match. - if !bytes.Equal(haveFilter.NBytes(), wantFilter.Data) { + var haveBytes []byte + if haveFilter != nil { + haveBytes = haveFilter.NBytes() + } + if !bytes.Equal(haveBytes, wantFilter.Data) { errChan <- fmt.Errorf("Basic filter from P2P "+ "network/DB doesn't match RPC value "+ "for block %d", height) @@ -965,7 +977,7 @@ func testRandomBlocks(t *testing.T, svc *spvchain.ChainService, // Calculate basic filter from block. calcFilter, err := builder.BuildBasicFilter( haveBlock.MsgBlock()) - if err != nil { + if err != nil && err != gcs.ErrNoData { errChan <- fmt.Errorf("Couldn't build basic "+ "filter for block %d (%s): %s", height, blockHash, err) @@ -973,7 +985,7 @@ func testRandomBlocks(t *testing.T, svc *spvchain.ChainService, } // Check that the network value matches the calculated // value from the block. - if !reflect.DeepEqual(*haveFilter, *calcFilter) { + if !reflect.DeepEqual(haveFilter, calcFilter) { errChan <- fmt.Errorf("Basic filter from P2P "+ "network/DB doesn't match calculated "+ "value for block %d", height) @@ -1007,11 +1019,10 @@ func testRandomBlocks(t *testing.T, svc *spvchain.ChainService, return } // Get extended cfilter from network - haveFilter = svc.GetCFilter(blockHash, true, + haveFilter, err = svc.GetCFilter(blockHash, true, queryOptions...) - if haveFilter == nil { - errChan <- fmt.Errorf("Couldn't get extended "+ - "filter for block %d", height) + if err != nil { + errChan <- err return } // Get extended cfilter from RPC @@ -1024,7 +1035,10 @@ func testRandomBlocks(t *testing.T, svc *spvchain.ChainService, return } // Check that network and RPC cfilters match - if !bytes.Equal(haveFilter.NBytes(), wantFilter.Data) { + if haveFilter != nil { + haveBytes = haveFilter.NBytes() + } + if !bytes.Equal(haveBytes, wantFilter.Data) { errChan <- fmt.Errorf("Extended filter from "+ "P2P network/DB doesn't match RPC "+ "value for block %d", height) @@ -1033,7 +1047,7 @@ func testRandomBlocks(t *testing.T, svc *spvchain.ChainService, // Calculate extended filter from block calcFilter, err = builder.BuildExtFilter( haveBlock.MsgBlock()) - if err != nil { + if err != nil && err != gcs.ErrNoData { errChan <- fmt.Errorf("Couldn't build extended"+ " filter for block %d (%s): %s", height, blockHash, err) @@ -1041,7 +1055,7 @@ func testRandomBlocks(t *testing.T, svc *spvchain.ChainService, } // Check that the network value matches the calculated // value from the block. - if !reflect.DeepEqual(*haveFilter, *calcFilter) { + if !reflect.DeepEqual(haveFilter, calcFilter) { errChan <- fmt.Errorf("Extended filter from "+ "P2P network/DB doesn't match "+ "calculated value for block %d", height) @@ -1104,76 +1118,108 @@ func testRandomBlocks(t *testing.T, svc *spvchain.ChainService, // notifications continue until the `quit` channel is closed. func startRescan(t *testing.T, svc *spvchain.ChainService, addr btcutil.Address, startBlock *waddrmgr.BlockStamp, quit <-chan struct{}) error { - go svc.Rescan( - spvchain.QuitChan(quit), - spvchain.WatchAddrs(addr), - spvchain.StartBlock(startBlock), - spvchain.NotificationHandlers(btcrpcclient.NotificationHandlers{ - OnBlockConnected: func(hash *chainhash.Hash, - height int32, time time.Time) { - rescanMtx.Lock() - gotLog = append(gotLog, []byte("bc")...) - curBlockHeight = height - rescanMtx.Unlock() - }, - OnBlockDisconnected: func(hash *chainhash.Hash, - height int32, time time.Time) { - rescanMtx.Lock() - delete(ourKnownTxsByBlock, *hash) - gotLog = append(gotLog, []byte("bd")...) - curBlockHeight = height - 1 - rescanMtx.Unlock() - }, - OnRecvTx: func(tx *btcutil.Tx, - details *btcjson.BlockDetails) { - rescanMtx.Lock() - hash, err := chainhash.NewHashFromStr( - details.Hash) - if err != nil { - t.Errorf("Couldn't decode hash %s: %s", - details.Hash, err) - } - ourKnownTxsByBlock[*hash] = append( - ourKnownTxsByBlock[*hash], tx) - gotLog = append(gotLog, []byte("rv")...) - rescanMtx.Unlock() - }, - OnRedeemingTx: func(tx *btcutil.Tx, - details *btcjson.BlockDetails) { - rescanMtx.Lock() - hash, err := chainhash.NewHashFromStr( - details.Hash) - if err != nil { - t.Errorf("Couldn't decode hash %s: %s", - details.Hash, err) - } - ourKnownTxsByBlock[*hash] = append( - ourKnownTxsByBlock[*hash], tx) - gotLog = append(gotLog, []byte("rd")...) - rescanMtx.Unlock() - }, - OnFilteredBlockConnected: func(height int32, - header *wire.BlockHeader, - relevantTxs []*btcutil.Tx) { - rescanMtx.Lock() - ourKnownTxsByFilteredBlock[header.BlockHash()] = - relevantTxs - gotLog = append(gotLog, []byte("fc")...) - gotLog = append(gotLog, uint8(len(relevantTxs))) - curFilteredBlockHeight = height - rescanMtx.Unlock() - }, - OnFilteredBlockDisconnected: func(height int32, - header *wire.BlockHeader) { - rescanMtx.Lock() - delete(ourKnownTxsByFilteredBlock, - header.BlockHash()) - gotLog = append(gotLog, []byte("fd")...) - curFilteredBlockHeight = height - 1 - rescanMtx.Unlock() - }, - }), - ) + go func() { + err := svc.Rescan( + spvchain.QuitChan(quit), + spvchain.WatchAddrs(addr), + spvchain.StartBlock(startBlock), + spvchain.NotificationHandlers( + btcrpcclient.NotificationHandlers{ + OnBlockConnected: func( + hash *chainhash.Hash, + height int32, time time.Time) { + rescanMtx.Lock() + gotLog = append(gotLog, + []byte("bc")...) + curBlockHeight = height + rescanMtx.Unlock() + }, + OnBlockDisconnected: func( + hash *chainhash.Hash, + height int32, time time.Time) { + rescanMtx.Lock() + delete(ourKnownTxsByBlock, *hash) + gotLog = append(gotLog, + []byte("bd")...) + curBlockHeight = height - 1 + rescanMtx.Unlock() + }, + OnRecvTx: func(tx *btcutil.Tx, + details *btcjson.BlockDetails) { + rescanMtx.Lock() + hash, err := chainhash. + NewHashFromStr( + details.Hash) + if err != nil { + t.Errorf("Couldn't "+ + "decode hash "+ + "%s: %s", + details.Hash, + err) + } + ourKnownTxsByBlock[*hash] = append( + ourKnownTxsByBlock[*hash], + tx) + gotLog = append(gotLog, + []byte("rv")...) + rescanMtx.Unlock() + }, + OnRedeemingTx: func(tx *btcutil.Tx, + details *btcjson.BlockDetails) { + rescanMtx.Lock() + hash, err := chainhash. + NewHashFromStr( + details.Hash) + if err != nil { + t.Errorf("Couldn't "+ + "decode hash "+ + "%s: %s", + details.Hash, + err) + } + ourKnownTxsByBlock[*hash] = append( + ourKnownTxsByBlock[*hash], + tx) + gotLog = append(gotLog, + []byte("rd")...) + rescanMtx.Unlock() + }, + OnFilteredBlockConnected: func( + height int32, + header *wire.BlockHeader, + relevantTxs []*btcutil.Tx) { + rescanMtx.Lock() + ourKnownTxsByFilteredBlock[header.BlockHash()] = + relevantTxs + gotLog = append(gotLog, + []byte("fc")...) + gotLog = append(gotLog, + uint8(len(relevantTxs))) + curFilteredBlockHeight = height + rescanMtx.Unlock() + }, + OnFilteredBlockDisconnected: func( + height int32, + header *wire.BlockHeader) { + rescanMtx.Lock() + delete(ourKnownTxsByFilteredBlock, + header.BlockHash()) + gotLog = append(gotLog, + []byte("fd")...) + curFilteredBlockHeight = + height - 1 + rescanMtx.Unlock() + }, + }), + ) + if logLevel != btclog.Off { + if err != nil { + t.Logf("Rescan ended: %s", err) + } else { + t.Logf("Rescan ended successfully") + } + } + }() return nil } @@ -1203,3 +1249,15 @@ func checkRescanStatus() (int, int32, error) { } return txCount[0], curBlockHeight, nil } + +// banPeer bans and disconnects the requested harness from the ChainService +// instance for BanDuration seconds. +func banPeer(svc *spvchain.ChainService, harness *rpctest.Harness) { + peers := svc.Peers() + for _, peer := range peers { + if peer.Addr() == harness.P2PAddress() { + svc.BanPeer(peer) + peer.Disconnect() + } + } +}