diff --git a/blockmanager.go b/blockmanager.go index e085fe62..86d87660 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -372,7 +372,6 @@ func (b *blockManager) current() bool { // handleBlockMsg handles block messages from all peers. func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { - defer func() { if b.startBlock != nil && len(bmsg.peer.requestedBlocks) < 10 { @@ -382,11 +381,9 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { } }() - // Keep track of which peer the block was sent from so the notification - // handler can request the parent blocks from the appropriate peer. - blockSha, _ := bmsg.block.Sha() // If we didn't ask for this block then the peer is misbehaving. + blockSha, _ := bmsg.block.Sha() if _, ok := bmsg.peer.requestedBlocks[*blockSha]; !ok { // The regression test intentionally sends some blocks twice // to test duplicate block insertion fails. Don't disconnect @@ -400,6 +397,9 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { return } } + + // Keep track of which peer the block was sent from so the notification + // handler can request the parent blocks from the appropriate peer. b.blockPeer[*blockSha] = bmsg.peer fastAdd := false @@ -420,9 +420,12 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { } } } - // Process the block to include validation, best chain selection, orphan - // handling, etc. - err := b.blockChain.ProcessBlock(bmsg.block, fastAdd) + + // Remove block from request maps. Either chain will know about it and + // so we shouldn't have any more instances of trying to fetch it, or we + // will fail the insert and thus we'll retry next time we get an inv. + delete(bmsg.peer.requestedBlocks, *blockSha) + delete(b.requestedBlocks, *blockSha) if fastAdd && blockSha.IsEqual(b.lastBlock) { // have processed all blocks, switch to normal handling @@ -434,12 +437,9 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { b.headerOrphan = make(map[btcwire.ShaHash]*headerstr) } - // Remove block from request maps. Either chain knows about it and such - // we shouldn't have any more instances of trying to fetch it, or we - // failed to insert and thus we'll retry next time we get an inv. - delete(bmsg.peer.requestedBlocks, *blockSha) - delete(b.requestedBlocks, *blockSha) - + // Process the block to include validation, best chain selection, orphan + // handling, etc. + err := b.blockChain.ProcessBlock(bmsg.block, fastAdd) if err != nil { delete(b.blockPeer, *blockSha) @@ -474,6 +474,116 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { b.server.db.Sync() } +// fetchHeaderBlocks is creates and sends a request to the syncPeer for +// the next list of blocks to be downloaded. +func (b *blockManager) fetchHeaderBlocks() { + gdmsg := btcwire.NewMsgGetDataSizeHint(btcwire.MaxInvPerMsg) + numRequested := 0 + startBlock := b.startBlock + for { + if b.startBlock == nil { + break + } + blockHash := b.startBlock + firstblock, ok := b.headerPool[*blockHash] + if !ok { + bmgrLog.Warnf("current fetch block %v missing from headerPool", blockHash) + break + } + iv := btcwire.NewInvVect(btcwire.InvTypeBlock, blockHash) + if !b.haveInventory(iv) { + b.requestedBlocks[*blockHash] = true + b.syncPeer.requestedBlocks[*blockHash] = true + gdmsg.AddInvVect(iv) + numRequested++ + } + + if b.fetchBlock == nil { + b.fetchBlock = b.startBlock + } + if firstblock.next == nil { + b.startBlock = nil + break + } else { + b.startBlock = &firstblock.next.sha + } + + if numRequested >= btcwire.MaxInvPerMsg { + break + } + } + if len(gdmsg.InvList) > 0 { + bmgrLog.Debugf("requesting block %v len %v\n", startBlock, len(gdmsg.InvList)) + + b.syncPeer.QueueMessage(gdmsg, nil) + } +} + +// handleHeadersMsghandles headers messages from all peers. +func (b *blockManager) handleHeadersMsg(bmsg *headersMsg) { + msg := bmsg.headers + + nheaders := len(msg.Headers) + if nheaders == 0 { + bmgrLog.Infof("Received %v block headers: Fetching blocks", + len(b.headerPool)) + b.fetchHeaderBlocks() + return + } + var blockhash btcwire.ShaHash + + if b.latestCheckpoint == nil { + b.latestCheckpoint = b.blockChain.LatestCheckpoint() + } + + for hdridx := range msg.Headers { + blockhash, _ = msg.Headers[hdridx].BlockSha() + var headerst headerstr + headerst.header = msg.Headers[hdridx] + headerst.sha = blockhash + prev, ok := b.headerPool[headerst.header.PrevBlock] + if ok { + if prev.next == nil { + prev.next = &headerst + } else { + bmgrLog.Infof("two children of the same block ??? %v %v %v", prev.sha, prev.next.sha, blockhash) + } + headerst.height = prev.height + 1 + } else if headerst.header.PrevBlock.IsEqual(activeNetParams.genesisHash) { + ok = true + headerst.height = 1 + b.startBlock = &headerst.sha + } + if int64(headerst.height) == b.latestCheckpoint.Height { + if headerst.sha.IsEqual(b.latestCheckpoint.Hash) { + // we can trust this header first download + // TODO flag this? + } else { + // XXX marker does not match, must throw + // away headers !?!?! + // XXX dont trust peer? + } + } + if ok { + b.headerPool[blockhash] = &headerst + b.lastBlock = &blockhash + } else { + bmgrLog.Infof("found orphan block %v", blockhash) + b.headerOrphan[headerst.header.PrevBlock] = &headerst + } + } + + // Construct the getheaders request and queue it to be sent. + ghmsg := btcwire.NewMsgGetHeaders() + err := ghmsg.AddBlockLocatorHash(&blockhash) + if err != nil { + bmgrLog.Infof("msgheaders bad addheaders", blockhash) + return + } + + b.syncPeer.QueueMessage(ghmsg, nil) +} + // haveInventory returns whether or not the inventory represented by the passed // inventory vector is known. This includes checking all of the various places // inventory can be when it is in different states such as blocks that are part @@ -824,7 +934,8 @@ func (b *blockManager) QueueInv(inv *btcwire.MsgInv, p *peer) { b.msgChan <- &invMsg{inv: inv, peer: p} } -// QueueInv adds the passed headers message and peer to the block handling queue. +// QueueHeaders adds the passed headers message and peer to the block handling +// queue. func (b *blockManager) QueueHeaders(headers *btcwire.MsgHeaders, p *peer) { // No channel handling here because peers do not need to block on inv // messages. @@ -1056,116 +1167,3 @@ func loadBlockDB() (btcdb.Db, error) { btcdLog.Infof("Block database loaded with block height %d", height) return db, nil } - -// handleHeadersMsg is invoked when a peer receives a headers bitcoin -// message. -func (b *blockManager) handleHeadersMsg(bmsg *headersMsg) { - msg := bmsg.headers - - nheaders := len(msg.Headers) - if nheaders == 0 { - bmgrLog.Infof("Received %v block headers: Fetching blocks", - len(b.headerPool)) - b.fetchHeaderBlocks() - return - } - var blockhash btcwire.ShaHash - - if b.latestCheckpoint == nil { - b.latestCheckpoint = b.blockChain.LatestCheckpoint() - } - - for hdridx := range msg.Headers { - blockhash, _ = msg.Headers[hdridx].BlockSha() - var headerst headerstr - headerst.header = msg.Headers[hdridx] - headerst.sha = blockhash - prev, ok := b.headerPool[headerst.header.PrevBlock] - if ok { - if prev.next == nil { - prev.next = &headerst - } else { - bmgrLog.Infof("two children of the same block ??? %v %v %v", prev.sha, prev.next.sha, blockhash) - } - headerst.height = prev.height + 1 - } else if headerst.header.PrevBlock.IsEqual(activeNetParams.genesisHash) { - ok = true - headerst.height = 1 - b.startBlock = &headerst.sha - } - if int64(headerst.height) == b.latestCheckpoint.Height { - if headerst.sha.IsEqual(b.latestCheckpoint.Hash) { - // we can trust this header first download - // TODO flag this? - } else { - // XXX marker does not match, must throw - // away headers !?!?! - // XXX dont trust peer? - } - } - if ok { - b.headerPool[blockhash] = &headerst - b.lastBlock = &blockhash - } else { - bmgrLog.Infof("found orphan block %v", blockhash) - b.headerOrphan[headerst.header.PrevBlock] = &headerst - } - } - - // Construct the getheaders request and queue it to be sent. - ghmsg := btcwire.NewMsgGetHeaders() - err := ghmsg.AddBlockLocatorHash(&blockhash) - if err != nil { - bmgrLog.Infof("msgheaders bad addheaders", blockhash) - return - } - - b.syncPeer.QueueMessage(ghmsg, nil) -} - -// fetchHeaderBlocks is creates and sends a request to the syncPeer for -// the next list of blocks to downloaded. -func (b *blockManager) fetchHeaderBlocks() { - gdmsg := btcwire.NewMsgGetDataSizeHint(btcwire.MaxInvPerMsg) - numRequested := 0 - startBlock := b.startBlock - for { - if b.startBlock == nil { - break - } - blockhash := b.startBlock - firstblock, ok := b.headerPool[*blockhash] - if !ok { - bmgrLog.Warnf("current fetch block %v missing from headerPool", blockhash) - break - } - var iv btcwire.InvVect - iv.Hash = *blockhash - iv.Type = btcwire.InvTypeBlock - if !b.haveInventory(&iv) { - b.requestedBlocks[*blockhash] = true - b.syncPeer.requestedBlocks[*blockhash] = true - gdmsg.AddInvVect(&iv) - numRequested++ - } - - if b.fetchBlock == nil { - b.fetchBlock = b.startBlock - } - if firstblock.next == nil { - b.startBlock = nil - break - } else { - b.startBlock = &firstblock.next.sha - } - - if numRequested >= btcwire.MaxInvPerMsg { - break - } - } - if len(gdmsg.InvList) > 0 { - bmgrLog.Debugf("requesting block %v len %v\n", startBlock, len(gdmsg.InvList)) - - b.syncPeer.QueueMessage(gdmsg, nil) - } -} diff --git a/btcd.go b/btcd.go index d1d5377e..3df423cd 100644 --- a/btcd.go +++ b/btcd.go @@ -64,6 +64,7 @@ func btcdMain(serverChan chan<- *server) error { return err } pprof.StartCPUProfile(f) + defer f.Close() defer pprof.StopCPUProfile() } diff --git a/peer.go b/peer.go index adf2d25a..c041a1bd 100644 --- a/peer.go +++ b/peer.go @@ -460,20 +460,19 @@ func (p *peer) PushGetBlocksMsg(locator btcchain.BlockLocator, stopHash *btcwire // and stop hash. It will ignore back-to-back duplicate requests. func (p *peer) PushGetHeadersMsg(locator btcchain.BlockLocator) error { // Extract the begin hash from the block locator, if one was specified, - // to use for filtering duplicate getblocks requests. - // request. + // to use for filtering duplicate getheaders requests. var beginHash *btcwire.ShaHash if len(locator) > 0 { beginHash = locator[0] } - // Filter duplicate getblocks requests. + // Filter duplicate getheaders requests. if p.prevGetBlocksBegin != nil && beginHash != nil && beginHash.IsEqual(p.prevGetBlocksBegin) { - peerLog.Tracef("PEER: Filtering duplicate [getblocks] with begin "+ - "hash %v", beginHash) + peerLog.Tracef("PEER: Filtering duplicate [getheaders] with "+ + "begin hash %v", beginHash) return nil } @@ -487,7 +486,7 @@ func (p *peer) PushGetHeadersMsg(locator btcchain.BlockLocator) error { } p.QueueMessage(msg, nil) - // Update the previous getblocks request information for filtering + // Update the previous getheaders request information for filtering // duplicates. p.prevGetBlocksBegin = beginHash return nil @@ -578,16 +577,14 @@ func (p *peer) handleBlockMsg(msg *btcwire.MsgBlock, buf []byte) { // handleInvMsg is invoked when a peer receives an inv bitcoin message and is // used to examine the inventory being advertised by the remote peer and react -// accordingly. We pass the message down to blockmanager which will call +// accordingly. We pass the message down to blockmanager which will call // QueueMessage with any appropriate responses. func (p *peer) handleInvMsg(msg *btcwire.MsgInv) { p.server.blockManager.QueueInv(msg, p) } -// handleHeadersMsg is invoked when a peer receives an inv bitcoin message and -// is used to examine the inventory being advertised by the remote peer and -// react accordingly. We pass the message down to blockmanager which will call -// QueueMessage with any appropriate responses. +// handleHeadersMsg is invoked when a peer receives a headers bitcoin message. +// The message is passed down to the block manager. func (p *peer) handleHeadersMsg(msg *btcwire.MsgHeaders) { p.server.blockManager.QueueHeaders(msg, p) } @@ -1138,6 +1135,9 @@ out: p.handleInvMsg(msg) markConnected = true + case *btcwire.MsgHeaders: + p.handleHeadersMsg(msg) + case *btcwire.MsgNotFound: // TODO(davec): Ignore this for now, but ultimately // it should probably be used to detect when something @@ -1154,9 +1154,6 @@ out: case *btcwire.MsgGetHeaders: p.handleGetHeadersMsg(msg) - case *btcwire.MsgHeaders: - p.handleHeadersMsg(msg) - default: peerLog.Debugf("Received unhandled message of type %v: Fix Me", rmsg.Command())