diff --git a/spvsvc/spvchain/blockmanager.go b/spvsvc/spvchain/blockmanager.go index 4152e58..a97781a 100644 --- a/spvsvc/spvchain/blockmanager.go +++ b/spvsvc/spvchain/blockmanager.go @@ -131,6 +131,7 @@ type blockManager struct { reorgList *list.List startHeader *list.Element nextCheckpoint *chaincfg.Checkpoint + lastRequested chainhash.Hash minRetargetTimespan int64 // target timespan / adjustment factor maxRetargetTimespan int64 // target timespan * adjustment factor @@ -604,19 +605,31 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { // If this is the sync peer or we're current, get the headers // for the announced blocks and update the last announced block. if lastBlock != -1 && (imsg.peer == b.syncPeer || b.current()) { - // Make a locator starting from the latest known header we've - // processed. - locator := make(blockchain.BlockLocator, 0, - wire.MaxBlockLocatorsPerMsg) lastHash := b.headerList.Back().Value.(*headerNode).header.BlockHash() - locator = append(locator, &lastHash) - // Add locator from the database as backup. - knownLocator, err := b.server.LatestBlockLocator() - if err == nil { - locator = append(locator, knownLocator...) + // Only send getheaders if we don't already know about the last + // block hash being announced. + if lastHash != invVects[lastBlock].Hash && + b.lastRequested != invVects[lastBlock].Hash { + // Make a locator starting from the latest known header + // we've processed. + locator := make(blockchain.BlockLocator, 0, + wire.MaxBlockLocatorsPerMsg) + locator = append(locator, &lastHash) + // Add locator from the database as backup. + knownLocator, err := b.server.LatestBlockLocator() + if err == nil { + locator = append(locator, knownLocator...) + } + // Get headers based on locator. + err = imsg.peer.PushGetHeadersMsg(locator, + &invVects[lastBlock].Hash) + if err != nil { + log.Warnf("Failed to send getheaders message "+ + "to peer %s: %s", imsg.peer.Addr(), err) + return + } + b.lastRequested = invVects[lastBlock].Hash } - // Get headers based on locator. - imsg.peer.PushGetHeadersMsg(locator, &invVects[lastBlock].Hash) } } @@ -716,6 +729,12 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { if hmsg.peer != b.syncPeer && !b.current() { return } + // Check if this is the last block we know of. This is + // a shortcut for sendheaders so that each redundant + // header doesn't cause a disk read. + if blockHash == prevHash { + continue + } // Check if this block is known. If so, we continue to // the next one. _, _, err := b.server.GetBlockByHash(blockHash) @@ -764,7 +783,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { height: int32(backHeight), }) totalWork := big.NewInt(0) - for _, reorgHeader := range msg.Headers[i:] { + for j, reorgHeader := range msg.Headers[i:] { err = b.checkHeaderSanity(reorgHeader, maxTimestamp, true) if err != nil { @@ -776,6 +795,10 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { } totalWork.Add(totalWork, blockchain.CalcWork(reorgHeader.Bits)) + b.reorgList.PushBack(&headerNode{ + header: reorgHeader, + height: int32(backHeight+1) + int32(j), + }) } log.Tracef("Sane reorg attempted. Total work from "+ "reorg chain: %v", totalWork) @@ -808,13 +831,18 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { log.Tracef("Total work from known chain: %v", knownWork) // Compare the two work totals and reject the new chain // if it doesn't have more work than the previously - // known chain. - if knownWork.Cmp(totalWork) >= 0 { - log.Warnf("Reorg attempt that does not have "+ - "more work than known chain from peer "+ - "%s -- disconnecting", hmsg.peer.Addr()) + // known chain. Disconnect if it's actually less than + // the known chain. + switch knownWork.Cmp(totalWork) { + case 1: + log.Warnf("Reorg attempt that has less work "+ + "than known chain from peer %s -- "+ + "disconnecting", hmsg.peer.Addr()) hmsg.peer.Disconnect() + fallthrough + case 0: return + default: } // At this point, we have a valid reorg, so we roll // back the existing chain and add the new block header. @@ -884,18 +912,20 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { //return } - // Request the next batch of headers starting from the latest known - // header and ending with the next checkpoint. - locator := blockchain.BlockLocator([]*chainhash.Hash{finalHash}) - nextHash := zeroHash - if b.nextCheckpoint != nil { - nextHash = *b.nextCheckpoint.Hash - } - err := hmsg.peer.PushGetHeadersMsg(locator, &nextHash) - if err != nil { - log.Warnf("Failed to send getheaders message to "+ - "peer %s: %s", hmsg.peer.Addr(), err) - return + // If not current, request the next batch of headers starting from the + // latest known header and ending with the next checkpoint. + if !b.current() { + locator := blockchain.BlockLocator([]*chainhash.Hash{finalHash}) + nextHash := zeroHash + if b.nextCheckpoint != nil { + nextHash = *b.nextCheckpoint.Hash + } + err := hmsg.peer.PushGetHeadersMsg(locator, &nextHash) + if err != nil { + log.Warnf("Failed to send getheaders message to "+ + "peer %s: %s", hmsg.peer.Addr(), err) + return + } } } diff --git a/spvsvc/spvchain/db.go b/spvsvc/spvchain/db.go index 3d1c70d..cb82fd3 100644 --- a/spvsvc/spvchain/db.go +++ b/spvsvc/spvchain/db.go @@ -292,6 +292,36 @@ func LatestBlock(tx walletdb.Tx) (wire.BlockHeader, uint32, error) { return header, height, nil } +// CheckConnectivity cycles through all of the block headers, from last to +// first, and makes sure they all connect to each other. +func CheckConnectivity(tx walletdb.Tx) error { + header, height, err := LatestBlock(tx) + if err != nil { + return fmt.Errorf("Couldn't retrieve latest block: %s", err) + } + for height > 0 { + newheader, newheight, err := GetBlockByHash(tx, + header.PrevBlock) + if err != nil { + return fmt.Errorf("Couldn't retrieve block %s: %s", + header.PrevBlock, err) + } + if newheader.BlockHash() != header.PrevBlock { + return fmt.Errorf("Block %s doesn't match block %s's "+ + "PrevBlock (%s)", newheader.BlockHash(), + header.BlockHash(), header.PrevBlock) + } + if newheight != height-1 { + return fmt.Errorf("Block %s doesn't have correct "+ + "height: want %d, got %d", + newheader.BlockHash(), height-1, newheight) + } + header = newheader + height = newheight + } + return nil +} + // BlockLocatorFromHash returns a block locator based on the provided hash. func BlockLocatorFromHash(tx walletdb.Tx, hash chainhash.Hash) blockchain.BlockLocator { locator := make(blockchain.BlockLocator, 0, wire.MaxBlockLocatorsPerMsg) diff --git a/spvsvc/spvchain/spvchain.go b/spvsvc/spvchain/spvchain.go index 82ceaea..c990ab5 100644 --- a/spvsvc/spvchain/spvchain.go +++ b/spvsvc/spvchain/spvchain.go @@ -212,6 +212,23 @@ func (sp *serverPeer) pushGetCFHeadersMsg(locator blockchain.BlockLocator, return nil } +// pushSendHeadersMsg sends a sendheaders message to the connected peer. +func (sp *serverPeer) pushSendHeadersMsg() error { + if sp.VersionKnown() { + if sp.ProtocolVersion() > wire.SendHeadersVersion { + sp.QueueMessage(wire.NewMsgSendHeaders(), nil) + } + } + return nil +} + +// OnVerAck is invoked when a peer receives a verack bitcoin message and is used +// to send the "sendheaders" command to peers that are of a sufficienty new +// protocol version. +func (sp *serverPeer) OnVerAck(_ *peer.Peer, msg *wire.MsgVerAck) { + sp.pushSendHeadersMsg() +} + // OnVersion is invoked when a peer receives a version bitcoin message // and is used to negotiate the protocol version details as well as kick start // the communications. @@ -1057,7 +1074,8 @@ func (s *ChainService) AnnounceNewTransactions( /*newTxs []*mempool.TxDesc*/ ) { func newPeerConfig(sp *serverPeer) *peer.Config { return &peer.Config{ Listeners: peer.MessageListeners{ - OnVersion: sp.OnVersion, + OnVersion: sp.OnVersion, + //OnVerAck: sp.OnVerAck, // Don't use sendheaders yet OnBlock: sp.OnBlock, OnInv: sp.OnInv, OnHeaders: sp.OnHeaders,