diff --git a/blockmanager.go b/blockmanager.go index 94db6699..02d6e71c 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -5,7 +5,6 @@ package main import ( - "container/list" "github.com/conformal/btcchain" "github.com/conformal/btcdb" _ "github.com/conformal/btcdb/sqlite3" @@ -21,12 +20,6 @@ const ( chanBufferSize = 50 ) -// inventoryItem is used to track known and requested inventory items. -type inventoryItem struct { - invVect *btcwire.InvVect - peers []*peer -} - // blockMsg packages a bitcoin block message and the peer it came from together // so the block handler has access to that information. type blockMsg struct { @@ -34,13 +27,6 @@ type blockMsg struct { peer *peer } -// invMsg packages a bitcoin inv message and the peer it came from together -// so the block handler has access to that information. -type invMsg struct { - msg *btcwire.MsgInv - peer *peer -} - // txMsg packages a bitcoin tx message and the peer it came from together // so the block handler has access to that information. type txMsg struct { @@ -49,26 +35,19 @@ type txMsg struct { } // blockManager provides a concurrency safe block manager for handling all -// incoming block inventory advertisement as well as issuing requests to -// download needed blocks of the block chain from other peers. It works by -// forcing all incoming block inventory advertisements through a single -// goroutine which then determines whether the block is needed and how the -// requests should be made amongst multiple peers. +// incoming blocks. type blockManager struct { server *server started bool shutdown bool blockChain *btcchain.BlockChain - requestQueue *list.List - requestMap map[string]*inventoryItem - outstandingBlocks int + blockPeer map[btcwire.ShaHash]*peer receivedLogBlocks int64 receivedLogTx int64 lastBlockLogTime time.Time processingReqs bool newBlocks chan bool blockQueue chan *blockMsg - invQueue chan *invMsg chainNotify chan *btcchain.Notification wg sync.WaitGroup quit chan bool @@ -83,7 +62,7 @@ func (b *blockManager) logBlockHeight(numTx, height int64) { now := time.Now() duration := now.Sub(b.lastBlockLogTime) - if b.outstandingBlocks != 0 && duration < time.Second*10 { + if duration < time.Second*10 { return } @@ -105,103 +84,46 @@ func (b *blockManager) logBlockHeight(numTx, height int64) { b.lastBlockLogTime = now } -// handleInvMsg handles inventory messages for all peers. It adds blocks that -// we need along with which peers know about each block to a request queue -// based upon the advertised inventory. It also attempts to strike a balance -// between the number of in-flight blocks and keeping the request queue full -// by issuing more getblocks (MsgGetBlocks) requests as needed. -func (b *blockManager) handleInvMsg(msg *btcwire.MsgInv, p *peer) { - // Find the last block in the inventory list. - invVects := msg.InvList - var lastHash *btcwire.ShaHash - for i := len(invVects) - 1; i >= 0; i-- { - if invVects[i].Type == btcwire.InvVect_Block { - lastHash = &invVects[i].Hash - break - } - } - - for _, iv := range invVects { - switch iv.Type { - case btcwire.InvVect_Block: - // Ignore this block if we already have it. - // TODO(davec): Need to check orphans too. - if b.server.db.ExistsSha(&iv.Hash) { - log.Tracef("[BMGR] Ignoring known block %v.", &iv.Hash) - continue - } - - // Add the peer to the list of peers which can serve the block if - // it's already queued to be fetched. - if item, ok := b.requestMap[iv.Hash.String()]; ok { - item.peers = append(item.peers, p) - continue - } - - // Add the item to the end of the request queue. - item := &inventoryItem{ - invVect: iv, - peers: []*peer{p}, - } - b.requestMap[item.invVect.Hash.String()] = item - b.requestQueue.PushBack(item) - b.outstandingBlocks++ - - case btcwire.InvVect_Tx: - // XXX: Handle transactions here. - } - } - - // Request more blocks if there aren't enough in-flight blocks. - if lastHash != nil && b.outstandingBlocks < btcwire.MaxBlocksPerMsg*5 { - stopHash := btcwire.ShaHash{} - gbmsg := btcwire.NewMsgGetBlocks(&stopHash) - gbmsg.AddBlockLocatorHash(lastHash) - p.QueueMessage(gbmsg) - } -} - -// handleBlockMsg handles block messages from all peers. It is currently -// very simple. It doesn't validate the block or handle orphans and side -// chains. It simply inserts the block into the database after ensuring the -// previous block is already inserted. -func (b *blockManager) handleBlockMsg(block *btcutil.Block) { - b.outstandingBlocks-- - msg := block.MsgBlock() +// handleBlockMsg handles block messages from all peers. +func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { + // Keep track of which peer the block was sent from so the notification + // handler can request the parent blocks from the appropriate peer. + blockSha, _ := bmsg.block.Sha() + b.blockPeer[*blockSha] = bmsg.peer // Process the block to include validation, best chain selection, orphan // handling, etc. - err := b.blockChain.ProcessBlock(block) + err := b.blockChain.ProcessBlock(bmsg.block) if err != nil { - blockSha, err2 := block.Sha() - if err2 != nil { - log.Errorf("[BMGR] %v", err2) - } + delete(b.blockPeer, *blockSha) log.Warnf("[BMGR] Failed to process block %v: %v", blockSha, err) return } + // Don't keep track of the peer that sent the block any longer if it's + // not an orphan. + if !b.blockChain.IsKnownOrphan(blockSha) { + delete(b.blockPeer, *blockSha) + } + // Log info about the new block height. _, height, err := b.server.db.NewestSha() if err != nil { log.Warnf("[BMGR] Failed to obtain latest sha - %v", err) return } - b.logBlockHeight(int64(len(msg.Transactions)), height) + b.logBlockHeight(int64(len(bmsg.block.MsgBlock().Transactions)), height) - // Sync the db to disk when there are no more outstanding blocks. - // NOTE: Periodic syncs happen as new data is requested as well. - if b.outstandingBlocks <= 0 { - b.server.db.Sync() - } + // Sync the db to disk. + b.server.db.Sync() } -// blockHandler is the main handler for the block manager. It must be run as a -// goroutine. It processes block and inv messages in a separate goroutine from -// the peer handlers so the block (MsgBlock) and tx (MsgTx) messages are handled -// by a single thread without needing to lock memory data structures. This is -// important because the block manager controls which blocks are needed and how -// the fetching should proceed. +// blockHandler is the main handler for the block manager. It must be run +// as a goroutine. It processes block and inv messages in a separate goroutine +// from the peer handlers so the block (MsgBlock) and tx (MsgTx) messages are +// handled by a single thread without needing to lock memory data structures. +// This is important because the block manager controls which blocks are needed +// and how the fetching should proceed. // // NOTE: Tx messages need to be handled here too. // (either that or block and tx need to be handled in separate threads) @@ -211,40 +133,9 @@ out: select { // Handle new block messages. case bmsg := <-b.blockQueue: - b.handleBlockMsg(bmsg.block) + b.handleBlockMsg(bmsg) bmsg.peer.blockProcessed <- true - // Handle new inventory messages. - case msg := <-b.invQueue: - b.handleInvMsg(msg.msg, msg.peer) - // Request the blocks. - if b.requestQueue.Len() > 0 && !b.processingReqs { - b.processingReqs = true - b.newBlocks <- true - } - - case <-b.newBlocks: - numRequested := 0 - gdmsg := btcwire.NewMsgGetData() - var p *peer - for e := b.requestQueue.Front(); e != nil; e = b.requestQueue.Front() { - item := e.Value.(*inventoryItem) - p = item.peers[0] - gdmsg.AddInvVect(item.invVect) - delete(b.requestMap, item.invVect.Hash.String()) - b.requestQueue.Remove(e) - - numRequested++ - if numRequested >= btcwire.MaxInvPerMsg { - break - } - } - b.server.db.Sync() - if len(gdmsg.InvList) > 0 && p != nil { - p.QueueMessage(gdmsg) - } - b.processingReqs = false - case <-b.quit: break out } @@ -256,14 +147,22 @@ out: // handleNotifyMsg handles notifications from btcchain. Currently it doesn't // respond to any notifications, but the idea is that it requests missing blocks // in response to orphan notifications and updates the wallet for blocks -// connected and disconnected to the main chain. +// connected to and disconnected from the main chain. func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) { switch notification.Type { case btcchain.NTOrphanBlock: - // TODO(davec): Ask the peer to fill in the missing blocks for the - // orphan root if it's not nil. orphanRoot := notification.Data.(*btcwire.ShaHash) - _ = orphanRoot + if peer, exists := b.blockPeer[*orphanRoot]; exists { + locator, err := b.blockChain.LatestBlockLocator() + if err != nil { + log.Error("[BMGR] Failed to get block locator "+ + "for the latest block: %v", err) + break + } + peer.pushGetBlocksMsg(locator, orphanRoot) + delete(b.blockPeer, *orphanRoot) + break + } case btcchain.NTBlockAccepted: // TODO(davec): Relay inventory, but don't relay old inventory @@ -300,18 +199,6 @@ func (b *blockManager) QueueBlock(block *btcutil.Block, p *peer) { b.blockQueue <- &bmsg } -// QueueInv adds the passed inventory message and peer to the inventory handling -// queue. -func (b *blockManager) QueueInv(msg *btcwire.MsgInv, p *peer) { - // Don't accept more inventory if we're shutting down. - if b.shutdown { - return - } - - imsg := invMsg{msg: msg, peer: p} - b.invQueue <- &imsg -} - // Start begins the core block handler which processes block and inv messages. func (b *blockManager) Start() { // Already started? @@ -342,51 +229,6 @@ func (b *blockManager) Stop() error { return nil } -// AddBlockLocators adds block locators to a getblocks message starting with -// the passed hash back to the genesis block hash. In order to keep the list -// of locator hashes to a reasonable number of entries, first it adds the -// most recent 10 block hashes (starting with the passed hash), then doubles the -// step each loop iteration to exponentially decrease the number of hashes the -// further away from head and closer to the genesis block it gets. -func (b *blockManager) AddBlockLocators(hash *btcwire.ShaHash, msg *btcwire.MsgGetBlocks) error { - // XXX(davec): This is fetching the block data too. - block, err := b.server.db.FetchBlockBySha(hash) - if err != nil { - log.Warnf("[BMGR] Lookup of known valid index failed %v", hash) - return err - } - blockIndex := block.Height() - - // We want inventory after the passed hash. - msg.AddBlockLocatorHash(hash) - - // Generate the block locators according to the algorithm described in - // in the function comment and make sure to leave room for the already - // added hash and final genesis hash. - increment := int64(1) - for i := 1; i < btcwire.MaxBlockLocatorsPerMsg-2; i++ { - if i > 10 { - increment *= 2 - } - blockIndex -= increment - if blockIndex <= 1 { - break - } - - h, err := b.server.db.FetchBlockShaByHeight(blockIndex) - if err != nil { - // This shouldn't happen and it's ok to ignore, so just - // continue to the next. - log.Warnf("[BMGR] Lookup of known valid index failed %v", - blockIndex) - continue - } - msg.AddBlockLocatorHash(h) - } - msg.AddBlockLocatorHash(&btcwire.GenesisHash) - return nil -} - // newBlockManager returns a new bitcoin block manager. // Use Start to begin processing asynchronous block and inv updates. func newBlockManager(s *server) *blockManager { @@ -394,12 +236,10 @@ func newBlockManager(s *server) *blockManager { bm := blockManager{ server: s, blockChain: btcchain.New(s.db, s.btcnet, chainNotify), - requestQueue: list.New(), - requestMap: make(map[string]*inventoryItem), + blockPeer: make(map[btcwire.ShaHash]*peer), lastBlockLogTime: time.Now(), newBlocks: make(chan bool, 1), blockQueue: make(chan *blockMsg, chanBufferSize), - invQueue: make(chan *invMsg, chanBufferSize), chainNotify: chainNotify, quit: make(chan bool), } diff --git a/peer.go b/peer.go index 520ce0ef..2efce380 100644 --- a/peer.go +++ b/peer.go @@ -6,8 +6,10 @@ package main import ( "bytes" + "container/list" "errors" "fmt" + "github.com/conformal/btcchain" "github.com/conformal/btcdb" "github.com/conformal/btcutil" "github.com/conformal/btcwire" @@ -91,6 +93,7 @@ type peer struct { versionKnown bool knownAddresses map[string]bool lastBlock int32 + requestQueue *list.List wg sync.WaitGroup outputQueue chan btcwire.Message blockProcessed chan bool @@ -230,18 +233,20 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { } // Request latest blocks if the peer has blocks we're interested in. - // XXX: Ask block manager for latest so we get in-flight too... - sha, lastBlock, err := p.server.db.NewestSha() + _, lastBlock, err := p.server.db.NewestSha() if err != nil { log.Errorf("[PEER] %v", err) p.Disconnect() } // If the peer has blocks we're interested in. if p.lastBlock > int32(lastBlock) { - stopHash := btcwire.ShaHash{} - gbmsg := btcwire.NewMsgGetBlocks(&stopHash) - p.server.blockManager.AddBlockLocators(sha, gbmsg) - p.outputQueue <- gbmsg + locator, err := p.server.blockManager.blockChain.LatestBlockLocator() + if err != nil { + log.Error("[PEER] Failed to get block locator for the "+ + "latest block: %v", err) + p.Disconnect() + } + p.pushGetBlocksMsg(locator, &zeroHash) } // TODO: Relay alerts. @@ -281,6 +286,115 @@ func (p *peer) pushBlockMsg(sha btcwire.ShaHash) error { return nil } +// pushGetBlocksMsg send a getblocks message for the provided block locator +// and stop hash. +func (p *peer) pushGetBlocksMsg(locator btcchain.BlockLocator, stopHash *btcwire.ShaHash) error { + msg := btcwire.NewMsgGetBlocks(stopHash) + for _, hash := range locator { + err := msg.AddBlockLocatorHash(hash) + if err != nil { + return err + } + } + p.QueueMessage(msg) + return nil +} + +// handleInvMsg is invoked when a peer receives an inv bitcoin message and is +// used to examine the inventory being advertised by the remote peer and react +// accordingly. +// +// NOTE: This will need to have tx handling added as well when they are +// supported. +func (p *peer) handleInvMsg(msg *btcwire.MsgInv) { + // Attempt to find the final block in the inventory list. There may + // not be one. + lastBlock := -1 + invVects := msg.InvList + for i := len(invVects) - 1; i >= 0; i-- { + if invVects[i].Type == btcwire.InvVect_Block { + lastBlock = i + break + } + } + + // Request the advertised inventory if we don't already have it. Also, + // request parent blocks of orphans if we receive one we already have. + // Finally, attempt to detect potential stalls due to long side chains + // we already have and request more blocks to prevent them. + chain := p.server.blockManager.blockChain + for i, iv := range invVects { + switch iv.Type { + case btcwire.InvVect_Block: + if !chain.HaveInventory(iv) { + // Add it to the request queue. + p.requestQueue.PushBack(iv) + continue + } + + // The block is an orphan block that we already have. + // When the existing orphan was processed, it requested + // the missing parent blocks. When this scenario + // happens, it means there were more blocks missing + // than are allowed into a single inventory message. As + // a result, once this peer requested the final + // advertised block, the remote peer noticed and is now + // resending the orphan block as an available block + // to signal there are more missing blocks that need to + // be requested. + if chain.IsKnownOrphan(&iv.Hash) { + // Request blocks starting at the latest known + // up to the root of the orphan that just came + // in. + orphanRoot := chain.GetOrphanRoot(&iv.Hash) + locator, err := chain.LatestBlockLocator() + if err != nil { + log.Error("[PEER] Failed to get block "+ + "locator for the latest block: "+ + "%v", err) + continue + } + p.pushGetBlocksMsg(locator, orphanRoot) + continue + } + + // We already have the final block advertised by this + // inventory message, so force a request for more. This + // should only really happen if we're on a really long + // side chain. + if i == lastBlock { + // Request blocks after this one up to the + // final one the remote peer knows about (zero + // stop hash). + locator := chain.BlockLocatorFromHash(&iv.Hash) + p.pushGetBlocksMsg(locator, &zeroHash) + } + + // Ignore unsupported inventory types. + default: + continue + } + } + + // Request as much as possible at once. Anything that won't fit into + // the request will be requested on the next inv message. + numRequested := 0 + gdmsg := btcwire.NewMsgGetData() + for e := p.requestQueue.Front(); e != nil; e = p.requestQueue.Front() { + iv := e.Value.(*btcwire.InvVect) + gdmsg.AddInvVect(iv) + p.requestQueue.Remove(e) + + numRequested++ + if numRequested >= btcwire.MaxInvPerMsg { + break + } + } + if len(gdmsg.InvList) > 0 { + p.QueueMessage(gdmsg) + } +} + // handleGetData is invoked when a peer receives a getdata bitcoin message and // is used to deliver block and transaction information. func (p *peer) handleGetDataMsg(msg *btcwire.MsgGetData) { @@ -340,8 +454,8 @@ func (p *peer) handleGetBlocksMsg(msg *btcwire.MsgGetBlocks) { } // Don't attempt to fetch more than we can put into a single message. - if endIdx-startIdx > btcwire.MaxInvPerMsg { - endIdx = startIdx + btcwire.MaxInvPerMsg + if endIdx-startIdx > btcwire.MaxBlocksPerMsg { + endIdx = startIdx + btcwire.MaxBlocksPerMsg } // Fetch the inventory from the block database. @@ -677,7 +791,7 @@ out: <-p.blockProcessed case *btcwire.MsgInv: - p.server.blockManager.QueueInv(msg, p) + p.handleInvMsg(msg) case *btcwire.MsgGetData: p.handleGetDataMsg(msg) @@ -793,6 +907,7 @@ func newPeer(s *server, conn net.Conn, inbound bool, persistent bool) *peer { inbound: inbound, persistent: persistent, knownAddresses: make(map[string]bool), + requestQueue: list.New(), outputQueue: make(chan btcwire.Message, outputBufferSize), blockProcessed: make(chan bool, 1), quit: make(chan bool),