diff --git a/blockmanager.go b/blockmanager.go index 1df250dc..93344ad8 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -16,6 +16,7 @@ import ( "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/database" "github.com/btcsuite/btcd/mempool" + peerpkg "github.com/btcsuite/btcd/peer" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" ) @@ -49,14 +50,14 @@ var zeroHash chainhash.Hash // newPeerMsg signifies a newly connected peer to the block handler. type newPeerMsg struct { - peer *serverPeer + peer *peerpkg.Peer } // blockMsg packages a bitcoin block message and the peer it came from together // so the block handler has access to that information. type blockMsg struct { block *btcutil.Block - peer *serverPeer + peer *peerpkg.Peer reply chan struct{} } @@ -64,33 +65,33 @@ type blockMsg struct { // so the block handler has access to that information. type invMsg struct { inv *wire.MsgInv - peer *serverPeer + peer *peerpkg.Peer } // headersMsg packages a bitcoin headers message and the peer it came from // together so the block handler has access to that information. type headersMsg struct { headers *wire.MsgHeaders - peer *serverPeer + peer *peerpkg.Peer } // donePeerMsg signifies a newly disconnected peer to the block handler. type donePeerMsg struct { - peer *serverPeer + peer *peerpkg.Peer } // txMsg packages a bitcoin tx message and the peer it came from together // so the block handler has access to that information. type txMsg struct { tx *btcutil.Tx - peer *serverPeer + peer *peerpkg.Peer reply chan struct{} } // getSyncPeerMsg is a message type to be sent across the message channel for // retrieving the current sync peer. type getSyncPeerMsg struct { - reply chan *serverPeer + reply chan int32 } // processBlockResponse is a response sent to the reply channel of a @@ -138,7 +139,7 @@ type headerNode struct { type PeerNotifier interface { AnnounceNewTransactions(newTxs []*mempool.TxDesc) - UpdatePeerHeights(latestBlkHash *chainhash.Hash, latestHeight int32, updateSource *serverPeer) + UpdatePeerHeights(latestBlkHash *chainhash.Hash, latestHeight int32, updateSource *peerpkg.Peer) RelayInventory(invVect *wire.InvVect, data interface{}) @@ -157,23 +158,35 @@ type blockManagerConfig struct { MaxPeers int } +// peerSyncState stores additional information that the blockManager tracks +// about a peer. +type peerSyncState struct { + syncCandidate bool + requestQueue []*wire.InvVect + requestedTxns map[chainhash.Hash]struct{} + requestedBlocks map[chainhash.Hash]struct{} +} + // blockManager provides a concurrency safe block manager for handling all // incoming blocks. type blockManager struct { - peerNotifier PeerNotifier - started int32 - shutdown int32 - chain *blockchain.BlockChain - txMemPool *mempool.TxPool - chainParams *chaincfg.Params + peerNotifier PeerNotifier + started int32 + shutdown int32 + chain *blockchain.BlockChain + txMemPool *mempool.TxPool + chainParams *chaincfg.Params + progressLogger *blockProgressLogger + msgChan chan interface{} + wg sync.WaitGroup + quit chan struct{} + + // These fields should only be accessed from the blockHandler thread rejectedTxns map[chainhash.Hash]struct{} requestedTxns map[chainhash.Hash]struct{} requestedBlocks map[chainhash.Hash]struct{} - progressLogger *blockProgressLogger - syncPeer *serverPeer - msgChan chan interface{} - wg sync.WaitGroup - quit chan struct{} + syncPeer *peerpkg.Peer + peerStates map[*peerpkg.Peer]*peerSyncState // The following fields are used for headers-first mode. headersFirstMode bool @@ -230,30 +243,30 @@ func (b *blockManager) findNextHeaderCheckpoint(height int32) *chaincfg.Checkpoi // download/sync the blockchain from. When syncing is already running, it // simply returns. It also examines the candidates for any which are no longer // candidates and removes them as needed. -func (b *blockManager) startSync(peers *list.List) { +func (b *blockManager) startSync() { // Return now if we're already syncing. if b.syncPeer != nil { return } - best := b.chain.BestSnapshot() - var bestPeer *serverPeer - var enext *list.Element - for e := peers.Front(); e != nil; e = enext { - enext = e.Next() - sp := e.Value.(*serverPeer) + // Once the segwit soft-fork package has activated, we only + // want to sync from peers which are witness enabled to ensure + // that we fully validate all blockchain data. + segwitActive, err := b.chain.IsDeploymentActive(chaincfg.DeploymentSegwit) + if err != nil { + bmgrLog.Errorf("Unable to query for segwit soft-fork state: %v", err) + return + } - // Once the segwit soft-fork package has activated, we only - // want to sync from peers which are witness enabled to ensure - // that we fully validate all blockchain data. - segwitActive, err := b.chain.IsDeploymentActive(chaincfg.DeploymentSegwit) - if err != nil { - bmgrLog.Errorf("Unable to query for segwit "+ - "soft-fork state: %v", err) + best := b.chain.BestSnapshot() + var bestPeer *peerpkg.Peer + for peer, state := range b.peerStates { + if !state.syncCandidate { continue } - if segwitActive && !sp.IsWitnessEnabled() { - bmgrLog.Infof("peer %v not witness enabled, skipping", sp) + + if segwitActive && !peer.IsWitnessEnabled() { + bmgrLog.Debugf("peer %v not witness enabled, skipping", peer) continue } @@ -263,14 +276,14 @@ func (b *blockManager) startSync(peers *list.List) { // doesn't have a later block when it's equal, it will likely // have one soon so it is a reasonable choice. It also allows // the case where both are at 0 such as during regression test. - if sp.LastBlock() < best.Height { - peers.Remove(e) + if peer.LastBlock() < best.Height { + state.syncCandidate = false continue } // TODO(davec): Use a better algorithm to choose the best peer. // For now, just pick the first available candidate. - bestPeer = sp + bestPeer = peer } // Start syncing from the best peer if one was selected. @@ -327,14 +340,14 @@ func (b *blockManager) startSync(peers *list.List) { // isSyncCandidate returns whether or not the peer is a candidate to consider // syncing from. -func (b *blockManager) isSyncCandidate(sp *serverPeer) bool { +func (b *blockManager) isSyncCandidate(peer *peerpkg.Peer) bool { // Typically a peer is not a candidate for sync if it's not a full node, // however regression test is special in that the regression tool is // not a full node and still needs to be considered a sync candidate. if b.chainParams == &chaincfg.RegressionNetParams { // The peer is not a candidate if it's not coming from localhost // or the hostname can't be determined for some reason. - host, _, err := net.SplitHostPort(sp.Addr()) + host, _, err := net.SplitHostPort(peer.Addr()) if err != nil { return false } @@ -351,9 +364,9 @@ func (b *blockManager) isSyncCandidate(sp *serverPeer) bool { bmgrLog.Errorf("Unable to query for segwit "+ "soft-fork state: %v", err) } - nodeServices := sp.Services() + nodeServices := peer.Services() if nodeServices&wire.SFNodeNetwork != wire.SFNodeNetwork || - (segwitActive && !sp.IsWitnessEnabled()) { + (segwitActive && !peer.IsWitnessEnabled()) { return false } } @@ -365,70 +378,80 @@ func (b *blockManager) isSyncCandidate(sp *serverPeer) bool { // handleNewPeerMsg deals with new peers that have signalled they may // be considered as a sync peer (they have already successfully negotiated). It // also starts syncing if needed. It is invoked from the syncHandler goroutine. -func (b *blockManager) handleNewPeerMsg(peers *list.List, sp *serverPeer) { +func (b *blockManager) handleNewPeerMsg(peer *peerpkg.Peer) { // Ignore if in the process of shutting down. if atomic.LoadInt32(&b.shutdown) != 0 { return } - bmgrLog.Infof("New valid peer %s (%s)", sp, sp.UserAgent()) + bmgrLog.Infof("New valid peer %s (%s)", peer, peer.UserAgent()) - // Ignore the peer if it's not a sync candidate. - if !b.isSyncCandidate(sp) { - return + // Initialize the peer state + isSyncCandidate := b.isSyncCandidate(peer) + b.peerStates[peer] = &peerSyncState{ + syncCandidate: isSyncCandidate, + requestedTxns: make(map[chainhash.Hash]struct{}), + requestedBlocks: make(map[chainhash.Hash]struct{}), } - // Add the peer as a candidate to sync from. - peers.PushBack(sp) - // Start syncing by choosing the best candidate if needed. - b.startSync(peers) + if isSyncCandidate && b.syncPeer == nil { + b.startSync() + } } // handleDonePeerMsg deals with peers that have signalled they are done. It // removes the peer as a candidate for syncing and in the case where it was // the current sync peer, attempts to select a new best peer to sync from. It // is invoked from the syncHandler goroutine. -func (b *blockManager) handleDonePeerMsg(peers *list.List, sp *serverPeer) { - // Remove the peer from the list of candidate peers. - for e := peers.Front(); e != nil; e = e.Next() { - if e.Value == sp { - peers.Remove(e) - break - } +func (b *blockManager) handleDonePeerMsg(peer *peerpkg.Peer) { + state, exists := b.peerStates[peer] + if !exists { + bmgrLog.Warnf("Received done peer message for unknown peer %s", peer) + return } - bmgrLog.Infof("Lost peer %s", sp) + // Remove the peer from the list of candidate peers. + delete(b.peerStates, peer) + + bmgrLog.Infof("Lost peer %s", peer) // Remove requested transactions from the global map so that they will // be fetched from elsewhere next time we get an inv. - for k := range sp.requestedTxns { - delete(b.requestedTxns, k) + for txHash := range state.requestedTxns { + delete(b.requestedTxns, txHash) } // Remove requested blocks from the global map so that they will be // fetched from elsewhere next time we get an inv. // TODO: we could possibly here check which peers have these blocks // and request them now to speed things up a little. - for k := range sp.requestedBlocks { - delete(b.requestedBlocks, k) + for blockHash := range state.requestedBlocks { + delete(b.requestedBlocks, blockHash) } // Attempt to find a new peer to sync from if the quitting peer is the // sync peer. Also, reset the headers-first state if in headers-first // mode so - if b.syncPeer != nil && b.syncPeer == sp { + if b.syncPeer == peer { b.syncPeer = nil if b.headersFirstMode { best := b.chain.BestSnapshot() b.resetHeaderState(&best.Hash, best.Height) } - b.startSync(peers) + b.startSync() } } // handleTxMsg handles transaction messages from all peers. func (b *blockManager) handleTxMsg(tmsg *txMsg) { + peer := tmsg.peer + state, exists := b.peerStates[peer] + if !exists { + bmgrLog.Warnf("Received tx message from unknown peer %s", peer) + return + } + // NOTE: BitcoinJ, and possibly other wallets, don't follow the spec of // sending an inventory message and allowing the remote peer to decide // whether or not they want to request the transaction via a getdata @@ -442,22 +465,22 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { // Ignore transactions that we have already rejected. Do not // send a reject message here because if the transaction was already // rejected, the transaction was unsolicited. - if _, exists := b.rejectedTxns[*txHash]; exists { + if _, exists = b.rejectedTxns[*txHash]; exists { bmgrLog.Debugf("Ignoring unsolicited previously rejected "+ - "transaction %v from %s", txHash, tmsg.peer) + "transaction %v from %s", txHash, peer) return } // Process the transaction to include validation, insertion in the // memory pool, orphan handling, etc. acceptedTxs, err := b.txMemPool.ProcessTransaction(tmsg.tx, - true, true, mempool.Tag(tmsg.peer.ID())) + true, true, mempool.Tag(peer.ID())) // Remove transaction from request maps. Either the mempool/chain // already knows about it and as such we shouldn't have any more // instances of trying to fetch it, or we failed to insert and thus // we'll retry next time we get an inv. - delete(tmsg.peer.requestedTxns, *txHash) + delete(state.requestedTxns, *txHash) delete(b.requestedTxns, *txHash) if err != nil { @@ -472,7 +495,7 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { // so log it as an actual error. if _, ok := err.(mempool.RuleError); ok { bmgrLog.Debugf("Rejected transaction %v from %s: %v", - txHash, tmsg.peer, err) + txHash, peer, err) } else { bmgrLog.Errorf("Failed to process transaction %v: %v", txHash, err) @@ -481,8 +504,7 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { // Convert the error into an appropriate reject message and // send it. code, reason := mempool.ErrToRejectErr(err) - tmsg.peer.PushRejectMsg(wire.CmdTx, code, reason, txHash, - false) + peer.PushRejectMsg(wire.CmdTx, code, reason, txHash, false) return } @@ -512,9 +534,16 @@ func (b *blockManager) current() bool { // handleBlockMsg handles block messages from all peers. func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { + peer := bmsg.peer + state, exists := b.peerStates[peer] + if !exists { + bmgrLog.Warnf("Received block message from unknown peer %s", peer) + return + } + // If we didn't ask for this block then the peer is misbehaving. blockHash := bmsg.block.Hash() - if _, exists := bmsg.peer.requestedBlocks[*blockHash]; !exists { + if _, exists = state.requestedBlocks[*blockHash]; !exists { // The regression test intentionally sends some blocks twice // to test duplicate block insertion fails. Don't disconnect // the peer or ignore the block when we're in regression test @@ -522,8 +551,8 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // duplicate blocks. if b.chainParams != &chaincfg.RegressionNetParams { bmgrLog.Warnf("Got unrequested block %v from %s -- "+ - "disconnecting", blockHash, bmsg.peer.Addr()) - bmsg.peer.Disconnect() + "disconnecting", blockHash, peer.Addr()) + peer.Disconnect() return } } @@ -555,7 +584,7 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // Remove block from request maps. Either chain will know about it and // so we shouldn't have any more instances of trying to fetch it, or we // will fail the insert and thus we'll retry next time we get an inv. - delete(bmsg.peer.requestedBlocks, *blockHash) + delete(state.requestedBlocks, *blockHash) delete(b.requestedBlocks, *blockHash) // Process the block to include validation, best chain selection, orphan @@ -568,7 +597,7 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // it as an actual error. if _, ok := err.(blockchain.RuleError); ok { bmgrLog.Infof("Rejected block %v from %s: %v", blockHash, - bmsg.peer, err) + peer, err) } else { bmgrLog.Errorf("Failed to process block %v: %v", blockHash, err) @@ -581,8 +610,7 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // Convert the error into an appropriate reject message and // send it. code, reason := mempool.ErrToRejectErr(err) - bmsg.peer.PushRejectMsg(wire.CmdBlock, code, reason, - blockHash, false) + peer.PushRejectMsg(wire.CmdBlock, code, reason, blockHash, false) return } @@ -626,7 +654,7 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { bmgrLog.Warnf("Failed to get block locator for the "+ "latest block: %v", err) } else { - bmsg.peer.PushGetBlocksMsg(locator, orphanRoot) + peer.PushGetBlocksMsg(locator, orphanRoot) } } else { // When the block is not an orphan, log information about it and @@ -648,9 +676,10 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // chain is "current". This avoids sending a spammy amount of messages // if we're syncing the chain from scratch. if blkHashUpdate != nil && heightUpdate != 0 { - bmsg.peer.UpdateLastBlockHeight(heightUpdate) + peer.UpdateLastBlockHeight(heightUpdate) if isOrphan || b.current() { - go b.peerNotifier.UpdatePeerHeights(blkHashUpdate, heightUpdate, bmsg.peer) + go b.peerNotifier.UpdatePeerHeights(blkHashUpdate, heightUpdate, + peer) } } @@ -664,7 +693,7 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // getting short. if !isCheckpointBlock { if b.startHeader != nil && - len(bmsg.peer.requestedBlocks) < minInFlightBlocks { + len(state.requestedBlocks) < minInFlightBlocks { b.fetchHeaderBlocks() } return @@ -679,10 +708,10 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { b.nextCheckpoint = b.findNextHeaderCheckpoint(prevHeight) if b.nextCheckpoint != nil { locator := blockchain.BlockLocator([]*chainhash.Hash{prevHash}) - err := bmsg.peer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash) + err := peer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash) if err != nil { bmgrLog.Warnf("Failed to send getheaders message to "+ - "peer %s: %v", bmsg.peer.Addr(), err) + "peer %s: %v", peer.Addr(), err) return } bmgrLog.Infof("Downloading headers for blocks %d to %d from "+ @@ -698,10 +727,10 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { b.headerList.Init() bmgrLog.Infof("Reached the final checkpoint -- switching to normal mode") locator := blockchain.BlockLocator([]*chainhash.Hash{blockHash}) - err = bmsg.peer.PushGetBlocksMsg(locator, &zeroHash) + err = peer.PushGetBlocksMsg(locator, &zeroHash) if err != nil { bmgrLog.Warnf("Failed to send getblocks message to peer %s: %v", - bmsg.peer.Addr(), err) + peer.Addr(), err) return } } @@ -735,8 +764,10 @@ func (b *blockManager) fetchHeaderBlocks() { "fetch: %v", err) } if !haveInv { + syncPeerState := b.peerStates[b.syncPeer] + b.requestedBlocks[*node.hash] = struct{}{} - b.syncPeer.requestedBlocks[*node.hash] = struct{}{} + syncPeerState.requestedBlocks[*node.hash] = struct{}{} // If we're fetching from a witness enabled peer // post-fork, then ensure that we receive all the @@ -761,13 +792,20 @@ func (b *blockManager) fetchHeaderBlocks() { // handleHeadersMsg handles block header messages from all peers. Headers are // requested when performing a headers-first sync. func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { + peer := hmsg.peer + _, exists := b.peerStates[peer] + if !exists { + bmgrLog.Warnf("Received headers message from unknown peer %s", peer) + return + } + // The remote peer is misbehaving if we didn't request headers. msg := hmsg.headers numHeaders := len(msg.Headers) if !b.headersFirstMode { bmgrLog.Warnf("Got %d unrequested headers from %s -- "+ - "disconnecting", numHeaders, hmsg.peer.Addr()) - hmsg.peer.Disconnect() + "disconnecting", numHeaders, peer.Addr()) + peer.Disconnect() return } @@ -789,7 +827,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { if prevNodeEl == nil { bmgrLog.Warnf("Header list does not contain a previous" + "element as expected -- disconnecting peer") - hmsg.peer.Disconnect() + peer.Disconnect() return } @@ -806,8 +844,8 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { } else { bmgrLog.Warnf("Received block header that does not "+ "properly connect to the chain from peer %s "+ - "-- disconnecting", hmsg.peer.Addr()) - hmsg.peer.Disconnect() + "-- disconnecting", peer.Addr()) + peer.Disconnect() return } @@ -823,9 +861,9 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { "%s from peer %s does NOT match "+ "expected checkpoint hash of %s -- "+ "disconnecting", node.height, - node.hash, hmsg.peer.Addr(), + node.hash, peer.Addr(), b.nextCheckpoint.Hash) - hmsg.peer.Disconnect() + peer.Disconnect() return } break @@ -851,10 +889,10 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { // headers starting from the latest known header and ending with the // next checkpoint. locator := blockchain.BlockLocator([]*chainhash.Hash{finalHash}) - err := hmsg.peer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash) + err := peer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash) if err != nil { bmgrLog.Warnf("Failed to send getheaders message to "+ - "peer %s: %v", hmsg.peer.Addr(), err) + "peer %s: %v", peer.Addr(), err) return } } @@ -899,6 +937,13 @@ func (b *blockManager) haveInventory(invVect *wire.InvVect) (bool, error) { // handleInvMsg handles inv messages from all peers. // We examine the inventory advertised by the remote peer and act accordingly. func (b *blockManager) handleInvMsg(imsg *invMsg) { + peer := imsg.peer + state, exists := b.peerStates[peer] + if !exists { + bmgrLog.Warnf("Received inv message from unknown peer %s", peer) + return + } + // Attempt to find the final block in the inventory list. There may // not be one. lastBlock := -1 @@ -915,13 +960,13 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { // announced block for this peer. We'll use this information later to // update the heights of peers based on blocks we've accepted that they // previously announced. - if lastBlock != -1 && (imsg.peer != b.syncPeer || b.current()) { - imsg.peer.UpdateLastAnnouncedBlock(&invVects[lastBlock].Hash) + if lastBlock != -1 && (peer != b.syncPeer || b.current()) { + peer.UpdateLastAnnouncedBlock(&invVects[lastBlock].Hash) } // Ignore invs from peers that aren't the sync if we are not current. // Helps prevent fetching a mass of orphans. - if imsg.peer != b.syncPeer && !b.current() { + if peer != b.syncPeer && !b.current() { return } @@ -930,7 +975,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { if lastBlock != -1 && b.current() { blkHeight, err := b.chain.BlockHeightByHash(&invVects[lastBlock].Hash) if err == nil { - imsg.peer.UpdateLastBlockHeight(blkHeight) + peer.UpdateLastBlockHeight(blkHeight) } } @@ -951,7 +996,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { // Add the inventory to the cache of known inventory // for the peer. - imsg.peer.AddKnownInventory(iv) + peer.AddKnownInventory(iv) // Ignore inventory when we're in headers-first mode. if b.headersFirstMode { @@ -979,12 +1024,12 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { // peers, as after segwit activation we only want to // download from peers that can provide us full witness // data for blocks. - if !imsg.peer.IsWitnessEnabled() && iv.Type == wire.InvTypeBlock { + if !peer.IsWitnessEnabled() && iv.Type == wire.InvTypeBlock { continue } // Add it to the request queue. - imsg.peer.requestQueue = append(imsg.peer.requestQueue, iv) + state.requestQueue = append(state.requestQueue, iv) continue } @@ -1011,7 +1056,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { "%v", err) continue } - imsg.peer.PushGetBlocksMsg(locator, orphanRoot) + peer.PushGetBlocksMsg(locator, orphanRoot) continue } @@ -1024,7 +1069,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { // final one the remote peer knows about (zero // stop hash). locator := b.chain.BlockLocatorFromHash(&iv.Hash) - imsg.peer.PushGetBlocksMsg(locator, &zeroHash) + peer.PushGetBlocksMsg(locator, &zeroHash) } } } @@ -1033,7 +1078,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { // the request will be requested on the next inv message. numRequested := 0 gdmsg := wire.NewMsgGetData() - requestQueue := imsg.peer.requestQueue + requestQueue := state.requestQueue for len(requestQueue) != 0 { iv := requestQueue[0] requestQueue[0] = nil @@ -1048,9 +1093,9 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { if _, exists := b.requestedBlocks[iv.Hash]; !exists { b.requestedBlocks[iv.Hash] = struct{}{} b.limitMap(b.requestedBlocks, maxRequestedBlocks) - imsg.peer.requestedBlocks[iv.Hash] = struct{}{} + state.requestedBlocks[iv.Hash] = struct{}{} - if imsg.peer.IsWitnessEnabled() { + if peer.IsWitnessEnabled() { iv.Type = wire.InvTypeWitnessBlock } @@ -1066,11 +1111,11 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { if _, exists := b.requestedTxns[iv.Hash]; !exists { b.requestedTxns[iv.Hash] = struct{}{} b.limitMap(b.requestedTxns, maxRequestedTxns) - imsg.peer.requestedTxns[iv.Hash] = struct{}{} + state.requestedTxns[iv.Hash] = struct{}{} // If the peer is capable, request the txn // including all witness data. - if imsg.peer.IsWitnessEnabled() { + if peer.IsWitnessEnabled() { iv.Type = wire.InvTypeWitnessTx } @@ -1083,9 +1128,9 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { break } } - imsg.peer.requestQueue = requestQueue + state.requestQueue = requestQueue if len(gdmsg.InvList) > 0 { - imsg.peer.QueueMessage(gdmsg, nil) + peer.QueueMessage(gdmsg, nil) } } @@ -1114,14 +1159,13 @@ func (b *blockManager) limitMap(m map[chainhash.Hash]struct{}, limit int) { // important because the block manager controls which blocks are needed and how // the fetching should proceed. func (b *blockManager) blockHandler() { - candidatePeers := list.New() out: for { select { case m := <-b.msgChan: switch msg := m.(type) { case *newPeerMsg: - b.handleNewPeerMsg(candidatePeers, msg.peer) + b.handleNewPeerMsg(msg.peer) case *txMsg: b.handleTxMsg(msg) @@ -1138,10 +1182,14 @@ out: b.handleHeadersMsg(msg) case *donePeerMsg: - b.handleDonePeerMsg(candidatePeers, msg.peer) + b.handleDonePeerMsg(msg.peer) case getSyncPeerMsg: - msg.reply <- b.syncPeer + var peerID int32 + if b.syncPeer != nil { + peerID = b.syncPeer.ID() + } + msg.reply <- peerID case processBlockMsg: _, isOrphan, err := b.chain.ProcessBlock( @@ -1251,71 +1299,71 @@ func (b *blockManager) handleBlockchainNotification(notification *blockchain.Not } // NewPeer informs the block manager of a newly active peer. -func (b *blockManager) NewPeer(sp *serverPeer) { +func (b *blockManager) NewPeer(peer *peerpkg.Peer) { // Ignore if we are shutting down. if atomic.LoadInt32(&b.shutdown) != 0 { return } - b.msgChan <- &newPeerMsg{peer: sp} + b.msgChan <- &newPeerMsg{peer: peer} } // QueueTx adds the passed transaction message and peer to the block handling // queue. Responds to the done channel argument after the tx message is // processed. -func (b *blockManager) QueueTx(tx *btcutil.Tx, sp *serverPeer, done chan struct{}) { +func (b *blockManager) QueueTx(tx *btcutil.Tx, peer *peerpkg.Peer, done chan struct{}) { // Don't accept more transactions if we're shutting down. if atomic.LoadInt32(&b.shutdown) != 0 { done <- struct{}{} return } - b.msgChan <- &txMsg{tx: tx, peer: sp, reply: done} + b.msgChan <- &txMsg{tx: tx, peer: peer, reply: done} } // QueueBlock adds the passed block message and peer to the block handling // queue. Responds to the done channel argument after the block message is // processed. -func (b *blockManager) QueueBlock(block *btcutil.Block, sp *serverPeer, done chan struct{}) { +func (b *blockManager) QueueBlock(block *btcutil.Block, peer *peerpkg.Peer, done chan struct{}) { // Don't accept more blocks if we're shutting down. if atomic.LoadInt32(&b.shutdown) != 0 { done <- struct{}{} return } - b.msgChan <- &blockMsg{block: block, peer: sp, reply: done} + b.msgChan <- &blockMsg{block: block, peer: peer, reply: done} } // QueueInv adds the passed inv message and peer to the block handling queue. -func (b *blockManager) QueueInv(inv *wire.MsgInv, sp *serverPeer) { +func (b *blockManager) QueueInv(inv *wire.MsgInv, peer *peerpkg.Peer) { // No channel handling here because peers do not need to block on inv // messages. if atomic.LoadInt32(&b.shutdown) != 0 { return } - b.msgChan <- &invMsg{inv: inv, peer: sp} + b.msgChan <- &invMsg{inv: inv, peer: peer} } // QueueHeaders adds the passed headers message and peer to the block handling // queue. -func (b *blockManager) QueueHeaders(headers *wire.MsgHeaders, sp *serverPeer) { +func (b *blockManager) QueueHeaders(headers *wire.MsgHeaders, peer *peerpkg.Peer) { // No channel handling here because peers do not need to block on // headers messages. if atomic.LoadInt32(&b.shutdown) != 0 { return } - b.msgChan <- &headersMsg{headers: headers, peer: sp} + b.msgChan <- &headersMsg{headers: headers, peer: peer} } // DonePeer informs the blockmanager that a peer has disconnected. -func (b *blockManager) DonePeer(sp *serverPeer) { +func (b *blockManager) DonePeer(peer *peerpkg.Peer) { // Ignore if we are shutting down. if atomic.LoadInt32(&b.shutdown) != 0 { return } - b.msgChan <- &donePeerMsg{peer: sp} + b.msgChan <- &donePeerMsg{peer: peer} } // Start begins the core block handler which processes block and inv messages. @@ -1345,9 +1393,9 @@ func (b *blockManager) Stop() error { return nil } -// SyncPeer returns the current sync peer. -func (b *blockManager) SyncPeer() *serverPeer { - reply := make(chan *serverPeer) +// SyncPeerID returns the ID of the current sync peer, or 0 if there is none. +func (b *blockManager) SyncPeerID() int32 { + reply := make(chan int32) b.msgChan <- getSyncPeerMsg{reply: reply} return <-reply } @@ -1391,6 +1439,7 @@ func newBlockManager(config *blockManagerConfig) (*blockManager, error) { rejectedTxns: make(map[chainhash.Hash]struct{}), requestedTxns: make(map[chainhash.Hash]struct{}), requestedBlocks: make(map[chainhash.Hash]struct{}), + peerStates: make(map[*peerpkg.Peer]*peerSyncState), progressLogger: newBlockProgressLogger("Processed", bmgrLog), msgChan: make(chan interface{}, config.MaxPeers*3), headerList: list.New(), diff --git a/rpcadaptors.go b/rpcadaptors.go index eebf47d8..0e8fa393 100644 --- a/rpcadaptors.go +++ b/rpcadaptors.go @@ -258,12 +258,13 @@ func (b *rpcSyncMgr) Pause() chan<- struct{} { return b.blockMgr.Pause() } -// SyncPeer returns the peer that is currently the peer being used to sync from. +// SyncPeerID returns the peer that is currently the peer being used to sync +// from. // // This function is safe for concurrent access and is part of the // rpcserverSyncManager interface implementation. -func (b *rpcSyncMgr) SyncPeer() rpcserverPeer { - return (*rpcPeer)(b.blockMgr.SyncPeer()) +func (b *rpcSyncMgr) SyncPeerID() int32 { + return b.blockMgr.SyncPeerID() } // LocateBlocks returns the hashes of the blocks after the first known block in diff --git a/rpcserver.go b/rpcserver.go index b115cb61..07b4674e 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -2402,7 +2402,7 @@ func handleGetNetworkHashPS(s *rpcServer, cmd interface{}, closeChan <-chan stru // handleGetPeerInfo implements the getpeerinfo command. func handleGetPeerInfo(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { peers := s.cfg.ConnMgr.ConnectedPeers() - syncPeer := s.cfg.SyncMgr.SyncPeer().ToPeer() + syncPeerID := s.cfg.SyncMgr.SyncPeerID() infos := make([]*btcjson.GetPeerInfoResult, 0, len(peers)) for _, p := range peers { statsSnap := p.ToPeer().StatsSnapshot() @@ -2426,7 +2426,7 @@ func handleGetPeerInfo(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) CurrentHeight: statsSnap.LastBlock, BanScore: int32(p.BanScore()), FeeFilter: p.FeeFilter(), - SyncNode: p.ToPeer() == syncPeer, + SyncNode: statsSnap.ID == syncPeerID, } if p.ToPeer().LastPingNonce() != 0 { wait := float64(time.Since(statsSnap.LastPingTime).Nanoseconds()) @@ -4139,9 +4139,9 @@ type rpcserverSyncManager interface { // Pause pauses the sync manager until the returned channel is closed. Pause() chan<- struct{} - // SyncPeer returns the peer that is currently the peer being used to - // sync from. - SyncPeer() rpcserverPeer + // SyncPeerID returns the ID of the peer that is currently the peer being + // used to sync from or 0 if there is none. + SyncPeerID() int32 // LocateBlocks returns the hashes of the blocks after the first known // block in the provided locators until the provided stop hash or the diff --git a/server.go b/server.go index 457c25f1..9e45870f 100644 --- a/server.go +++ b/server.go @@ -120,7 +120,7 @@ type relayMsg struct { type updatePeerHeightsMsg struct { newHash *chainhash.Hash newHeight int32 - originPeer *serverPeer + originPeer *peer.Peer } // peerState maintains state of inbound, persistent, outbound peers as well @@ -212,20 +212,17 @@ type serverPeer struct { *peer.Peer - connReq *connmgr.ConnReq - server *server - persistent bool - continueHash *chainhash.Hash - relayMtx sync.Mutex - disableRelayTx bool - sentAddrs bool - requestQueue []*wire.InvVect - requestedTxns map[chainhash.Hash]struct{} - requestedBlocks map[chainhash.Hash]struct{} - filter *bloom.Filter - knownAddresses map[string]struct{} - banScore connmgr.DynamicBanScore - quit chan struct{} + connReq *connmgr.ConnReq + server *server + persistent bool + continueHash *chainhash.Hash + relayMtx sync.Mutex + disableRelayTx bool + sentAddrs bool + filter *bloom.Filter + knownAddresses map[string]struct{} + banScore connmgr.DynamicBanScore + quit chan struct{} // The following chans are used to sync blockmanager and server. txProcessed chan struct{} blockProcessed chan struct{} @@ -235,15 +232,13 @@ type serverPeer struct { // the caller. func newServerPeer(s *server, isPersistent bool) *serverPeer { return &serverPeer{ - server: s, - persistent: isPersistent, - requestedTxns: make(map[chainhash.Hash]struct{}), - requestedBlocks: make(map[chainhash.Hash]struct{}), - filter: bloom.LoadFilter(nil), - knownAddresses: make(map[string]struct{}), - quit: make(chan struct{}), - txProcessed: make(chan struct{}, 1), - blockProcessed: make(chan struct{}, 1), + server: s, + persistent: isPersistent, + filter: bloom.LoadFilter(nil), + knownAddresses: make(map[string]struct{}), + quit: make(chan struct{}), + txProcessed: make(chan struct{}, 1), + blockProcessed: make(chan struct{}, 1), } } @@ -349,7 +344,7 @@ func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) { sp.server.timeSource.AddTimeSample(sp.Addr(), msg.Timestamp) // Signal the block manager this peer is a new sync candidate. - sp.server.blockManager.NewPeer(sp) + sp.server.blockManager.NewPeer(sp.Peer) // Choose whether or not to relay transactions before a filter command // is received. @@ -485,7 +480,7 @@ func (sp *serverPeer) OnTx(_ *peer.Peer, msg *wire.MsgTx) { // processed and known good or bad. This helps prevent a malicious peer // from queuing up a bunch of bad transactions before disconnecting (or // being disconnected) and wasting memory. - sp.server.blockManager.QueueTx(tx, sp, sp.txProcessed) + sp.server.blockManager.QueueTx(tx, sp.Peer, sp.txProcessed) <-sp.txProcessed } @@ -511,7 +506,7 @@ func (sp *serverPeer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) { // reference implementation processes blocks in the same // thread and therefore blocks further messages until // the bitcoin block has been fully processed. - sp.server.blockManager.QueueBlock(block, sp, sp.blockProcessed) + sp.server.blockManager.QueueBlock(block, sp.Peer, sp.blockProcessed) <-sp.blockProcessed } @@ -522,7 +517,7 @@ func (sp *serverPeer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) { func (sp *serverPeer) OnInv(_ *peer.Peer, msg *wire.MsgInv) { if !cfg.BlocksOnly { if len(msg.InvList) > 0 { - sp.server.blockManager.QueueInv(msg, sp) + sp.server.blockManager.QueueInv(msg, sp.Peer) } return } @@ -548,14 +543,14 @@ func (sp *serverPeer) OnInv(_ *peer.Peer, msg *wire.MsgInv) { } if len(newInv.InvList) > 0 { - sp.server.blockManager.QueueInv(newInv, sp) + sp.server.blockManager.QueueInv(newInv, sp.Peer) } } // OnHeaders is invoked when a peer receives a headers bitcoin // message. The message is passed down to the block manager. func (sp *serverPeer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) { - sp.server.blockManager.QueueHeaders(msg, sp) + sp.server.blockManager.QueueHeaders(msg, sp.Peer) } // handleGetData is invoked when a peer receives a getdata bitcoin message and @@ -1250,7 +1245,7 @@ func (s *server) pushMerkleBlockMsg(sp *serverPeer, hash *chainhash.Hash, func (s *server) handleUpdatePeerHeights(state *peerState, umsg updatePeerHeightsMsg) { state.forAllPeers(func(sp *serverPeer) { // The origin peer should already have the updated height. - if sp == umsg.originPeer { + if sp.Peer == umsg.originPeer { return } @@ -1717,7 +1712,7 @@ func (s *server) peerDoneHandler(sp *serverPeer) { // Only tell block manager we are gone if we ever told it we existed. if sp.VersionKnown() { - s.blockManager.DonePeer(sp) + s.blockManager.DonePeer(sp.Peer) // Evict any remaining orphans that were sent by the peer. numEvicted := s.txMemPool.RemoveOrphansByTag(mempool.Tag(sp.ID())) @@ -1895,7 +1890,7 @@ func (s *server) NetTotals() (uint64, uint64) { // the latest connected main chain block, or a recognized orphan. These height // updates allow us to dynamically refresh peer heights, ensuring sync peer // selection has access to the latest block heights for each peer. -func (s *server) UpdatePeerHeights(latestBlkHash *chainhash.Hash, latestHeight int32, updateSource *serverPeer) { +func (s *server) UpdatePeerHeights(latestBlkHash *chainhash.Hash, latestHeight int32, updateSource *peer.Peer) { s.peerHeightsUpdate <- updatePeerHeightsMsg{ newHash: latestBlkHash, newHeight: latestHeight,