From b7c5bcbf45bde90d6f74d822deb04fe995bee94d Mon Sep 17 00:00:00 2001 From: Alex Date: Fri, 24 Mar 2017 18:11:50 -0600 Subject: [PATCH] More updates on checkpoint, reorg, and sync handling and tests. --- spvsvc/spvchain/blockmanager.go | 63 ++++++++- spvsvc/spvchain/db.go | 23 ++++ spvsvc/spvchain/spvchain.go | 44 +++++- spvsvc/spvchain/sync_test.go | 231 ++++++++++++++++++++++++++++++++ 4 files changed, 350 insertions(+), 11 deletions(-) create mode 100644 spvsvc/spvchain/sync_test.go diff --git a/spvsvc/spvchain/blockmanager.go b/spvsvc/spvchain/blockmanager.go index 2f3498d..49ca53d 100644 --- a/spvsvc/spvchain/blockmanager.go +++ b/spvsvc/spvchain/blockmanager.go @@ -387,11 +387,11 @@ func (b *blockManager) findPreviousHeaderCheckpoint(height int32) *chaincfg.Chec // Find the latest checkpoint lower than height or return genesis block // if there are none. checkpoints := b.server.chainParams.Checkpoints - for _, ckpt := range checkpoints { - if height <= ckpt.Height { + for i := 0; i < len(checkpoints); i++ { + if height <= checkpoints[i].Height { break } - prevCheckpoint = &ckpt + prevCheckpoint = &checkpoints[i] } return prevCheckpoint } @@ -716,6 +716,55 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { b.startHeader = e } } else { + // The block doesn't connect to the last block we know. + // We will need to do some additional checks to process + // possible reorganizations or incorrect chain on either + // our or the peer's side. + // If we got these headers from a peer that's not our + // sync peer, they might not be aligned correctly or + // even on the right chain. Just ignore the rest of the + // message. + if hmsg.peer != b.syncPeer { + return + } + // Check if this block is known. If so, we continue to + // the next one. + _, _, err := b.server.GetBlockByHash( + blockHeader.BlockHash()) + if err == nil { + continue + } + // Check if the previous block is known. If it is, this + // is probably a reorg based on the estimated latest + // block that matches between us and the sync peer as + // derived from the block locator we sent to request + // these headers. Otherwise, the headers don't connect + // to anything we know and we should disconnect the + // peer. + _, backHeight, err := b.server.GetBlockByHash( + blockHeader.PrevBlock) + if err != nil { + log.Errorf("Couldn't get block by hash from "+ + "the database (%v) -- disconnecting "+ + "peer %s", err, hmsg.peer.Addr()) + hmsg.peer.Disconnect() + return + } + // We've found a branch we weren't aware of. If the + // branch is earlier than the latest synchronized + // checkpoint, it's invalid and we need to disconnect + // the reporting peer. + prevCheckpoint := b.findPreviousHeaderCheckpoint( + prevNode.height) + if backHeight < uint32(prevCheckpoint.Height) { + log.Errorf("Attempt at a reorg earlier (%v) than a "+ + "checkpoint (%v) past which we've already "+ + "synchronized -- disconnecting peer "+ + "%s", backHeight, prevCheckpoint.Height, hmsg.peer.Addr()) + hmsg.peer.Disconnect() + return + } + // TODO: Add real reorg handling here log.Warnf("Received block header that does not "+ "properly connect to the chain from peer %s "+ "-- disconnecting", hmsg.peer.Addr()) @@ -743,7 +792,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { "checkpoint at height %d/hash %s", prevCheckpoint.Height, prevCheckpoint.Hash) - b.server.putMaxBlockHeight(uint32( + b.server.rollbackToHeight(uint32( prevCheckpoint.Height)) hmsg.peer.Disconnect() return @@ -762,9 +811,9 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { // the next header links properly, it must be removed before // fetching the blocks. b.headerList.Remove(b.headerList.Front()) - log.Infof("Received %v block headers: Fetching blocks", - b.headerList.Len()) - b.progressLogger.SetLastLogTime(time.Now()) + //log.Infof("Received %v block headers: Fetching blocks", + // b.headerList.Len()) + //b.progressLogger.SetLastLogTime(time.Now()) b.nextCheckpoint = b.findNextHeaderCheckpoint(finalHeight) //b.fetchHeaderBlocks() //return diff --git a/spvsvc/spvchain/db.go b/spvsvc/spvchain/db.go index 5b9322b..4595211 100644 --- a/spvsvc/spvchain/db.go +++ b/spvsvc/spvchain/db.go @@ -191,6 +191,29 @@ func putExtHeader(tx walletdb.Tx, blockHash chainhash.Hash, return putHeader(tx, blockHash, extHeaderBucketName, filterTip) } +// rollbackLastBlock rolls back the last known block and returns the BlockStamp +// representing the new last known block. +func rollbackLastBlock(tx walletdb.Tx) (*waddrmgr.BlockStamp, error) { + bs, err := SyncedTo(tx) + if err != nil { + return nil, err + } + bucket := tx.RootBucket().Bucket(spvBucketName).Bucket(blockHeaderBucketName) + err = bucket.Delete(bs.Hash[:]) + if err != nil { + return nil, err + } + err = bucket.Delete(uint32ToBytes(uint32(bs.Height))) + if err != nil { + return nil, err + } + err = putMaxBlockHeight(tx, uint32(bs.Height-1)) + if err != nil { + return nil, err + } + return SyncedTo(tx) +} + // GetBlockByHash retrieves the block header, filter, and filter tip, based on // the provided block hash, from the database. func GetBlockByHash(tx walletdb.Tx, blockHash chainhash.Hash) (wire.BlockHeader, diff --git a/spvsvc/spvchain/spvchain.go b/spvsvc/spvchain/spvchain.go index e5ac9c5..6d57889 100644 --- a/spvsvc/spvchain/spvchain.go +++ b/spvsvc/spvchain/spvchain.go @@ -278,8 +278,8 @@ func (sp *serverPeer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) { // used to examine the inventory being advertised by the remote peer and react // accordingly. We pass the message down to blockmanager which will call // QueueMessage with any appropriate responses. -func (sp *serverPeer) OnInv(_ *peer.Peer, msg *wire.MsgInv) { - log.Tracef("Got inv with %v items", len(msg.InvList)) +func (sp *serverPeer) OnInv(p *peer.Peer, msg *wire.MsgInv) { + log.Tracef("Got inv with %d items from %s", len(msg.InvList), p.Addr()) newInv := wire.NewMsgInvSizeHint(uint(len(msg.InvList))) for _, invVect := range msg.InvList { if invVect.Type == wire.InvTypeTx { @@ -307,8 +307,9 @@ func (sp *serverPeer) OnInv(_ *peer.Peer, msg *wire.MsgInv) { // OnHeaders is invoked when a peer receives a headers bitcoin // message. The message is passed down to the block manager. -func (sp *serverPeer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) { - log.Tracef("Got headers with %v items", len(msg.Headers)) +func (sp *serverPeer) OnHeaders(p *peer.Peer, msg *wire.MsgHeaders) { + log.Tracef("Got headers with %d items from %s", len(msg.Headers), + p.Addr()) sp.server.blockManager.QueueHeaders(msg, sp) } @@ -1371,3 +1372,38 @@ func (s *ChainService) putMaxBlockHeight(maxBlockHeight uint32) error { return putMaxBlockHeight(dbTx, maxBlockHeight) }) } + +func (s *ChainService) rollbackLastBlock() (*waddrmgr.BlockStamp, error) { + var bs *waddrmgr.BlockStamp + var err error + err = s.namespace.Update(func(dbTx walletdb.Tx) error { + bs, err = rollbackLastBlock(dbTx) + return err + }) + return bs, err +} + +func (s *ChainService) rollbackToHeight(height uint32) (*waddrmgr.BlockStamp, error) { + var bs *waddrmgr.BlockStamp + var err error + err = s.namespace.Update(func(dbTx walletdb.Tx) error { + bs, err = SyncedTo(dbTx) + if err != nil { + return err + } + for uint32(bs.Height) > height { + bs, err = rollbackLastBlock(dbTx) + if err != nil { + return err + } + } + return nil + }) + return bs, err +} + +// IsCurrent lets the caller know whether the chain service's block manager +// thinks its view of the network is current. +func (s *ChainService) IsCurrent() bool { + return s.blockManager.IsCurrent() +} diff --git a/spvsvc/spvchain/sync_test.go b/spvsvc/spvchain/sync_test.go new file mode 100644 index 0000000..6e66f8f --- /dev/null +++ b/spvsvc/spvchain/sync_test.go @@ -0,0 +1,231 @@ +package spvchain_test + +import ( + "fmt" + "io/ioutil" + "os" + "testing" + "time" + + "github.com/aakselrod/btctestlog" + "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/rpctest" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btclog" + "github.com/btcsuite/btcwallet/spvsvc/spvchain" + "github.com/btcsuite/btcwallet/waddrmgr" + "github.com/btcsuite/btcwallet/walletdb" + _ "github.com/btcsuite/btcwallet/walletdb/bdb" +) + +func TestSetup(t *testing.T) { + // Create a btcd SimNet node and generate 500 blocks + h1, err := rpctest.New(&chaincfg.SimNetParams, nil, nil) + if err != nil { + t.Fatalf("Couldn't create harness: %v", err) + } + defer h1.TearDown() + err = h1.SetUp(false, 0) + if err != nil { + t.Fatalf("Couldn't set up harness: %v", err) + } + _, err = h1.Node.Generate(500) + if err != nil { + t.Fatalf("Couldn't generate blocks: %v", err) + } + + // Create a second btcd SimNet node + h2, err := rpctest.New(&chaincfg.SimNetParams, nil, nil) + if err != nil { + t.Fatalf("Couldn't create harness: %v", err) + } + defer h2.TearDown() + err = h2.SetUp(false, 0) + if err != nil { + t.Fatalf("Couldn't set up harness: %v", err) + } + + // Create a third btcd SimNet node and generate 900 blocks + h3, err := rpctest.New(&chaincfg.SimNetParams, nil, nil) + if err != nil { + t.Fatalf("Couldn't create harness: %v", err) + } + defer h3.TearDown() + err = h3.SetUp(false, 0) + if err != nil { + t.Fatalf("Couldn't set up harness: %v", err) + } + _, err = h3.Node.Generate(900) + if err != nil { + t.Fatalf("Couldn't generate blocks: %v", err) + } + + // Connect, sync, and disconnect h1 and h2 + err = csd([]*rpctest.Harness{h1, h2}) + if err != nil { + t.Fatalf("Couldn't connect/sync/disconnect h1 and h2: %v", err) + } + + // Generate 300 blocks on the first node and 350 on the second + _, err = h1.Node.Generate(300) + if err != nil { + t.Fatalf("Couldn't generate blocks: %v", err) + } + _, err = h2.Node.Generate(350) + if err != nil { + t.Fatalf("Couldn't generate blocks: %v", err) + } + + // Now we have a node with 800 blocks (h1), 850 blocks (h2), and + // 900 blocks (h3). The chains of nodes h1 and h2 match up to block + // 500. By default, a synchronizing wallet connected to all three + // should synchronize to h3. However, we're going to take checkpoints + // from h1 at 111, 333, 555, and 777, and add those to the + // synchronizing wallet's chain parameters so that it should + // disconnect from h3 at block 111, and from h2 at block 555, and + // then synchronize to block 800 from h1. Order of connection is + // unfortunately not guaranteed, so the reorg may not happen with every + // test. + + // Copy parameters and insert checkpoints + modParams := chaincfg.SimNetParams + for _, height := range []int64{111, 333, 555, 777} { + hash, err := h1.Node.GetBlockHash(height) + if err != nil { + t.Fatalf("Couldn't get block hash for height %v: %v", + height, err) + } + modParams.Checkpoints = append(modParams.Checkpoints, + chaincfg.Checkpoint{ + Hash: hash, + Height: int32(height), + }) + } + + // Create a temporary directory, initialize an empty walletdb with an + // SPV chain namespace, and create a configuration for the ChainService. + tempDir, err := ioutil.TempDir("", "spvchain") + if err != nil { + t.Fatalf("Failed to create temporary directory: %v", err) + } + defer os.RemoveAll(tempDir) + db, err := walletdb.Create("bdb", tempDir+"/weks.db") + defer db.Close() + if err != nil { + t.Fatalf("Error opening DB: %v\n", err) + } + ns, err := db.Namespace([]byte("weks")) + if err != nil { + t.Fatalf("Error geting namespace: %v\n", err) + } + config := spvchain.Config{ + DataDir: tempDir, + Namespace: ns, + ChainParams: modParams, + AddPeers: []string{ + h3.P2PAddress(), + h2.P2PAddress(), + h1.P2PAddress(), + }, + } + + spvchain.Services = 0 + spvchain.MaxPeers = 3 + spvchain.RequiredServices = wire.SFNodeNetwork + logger, err := btctestlog.NewTestLogger(t) + if err != nil { + t.Fatalf("Could not set up logger: %v", err) + } + chainLogger := btclog.NewSubsystemLogger(logger, "CHAIN: ") + chainLogger.SetLevel(btclog.TraceLvl) + spvchain.UseLogger(chainLogger) //*/ + svc, err := spvchain.NewChainService(config) + if err != nil { + t.Fatalf("Error creating ChainService: %v", err) + } + svc.Start() + defer svc.Stop() + + // Make sure the client synchronizes with the correct node + err = waitForSync(t, svc, h1, time.Second, 30*time.Second) + if err != nil { + t.Fatalf("Couldn't sync ChainService: %v", err) + } + + // Generate 150 blocks on h1 to make sure it reorgs the other nodes. + // Ensure the ChainService instance stays caught up. + h1.Node.Generate(150) + err = waitForSync(t, svc, h1, time.Second, 30*time.Second) + if err != nil { + t.Fatalf("Couldn't sync ChainService: %v", err) + } + + // Connect/sync/disconnect the other nodes to make them reorg to the h1 + // chain. + err = csd([]*rpctest.Harness{h1, h2, h3}) + if err != nil { + t.Fatalf("Couldn't sync h2 and h3 to h1: %v", err) + } +} + +// csd does a connect-sync-disconnect between nodes in order to support +// reorg testing. It brings up and tears down a temporary node, otherwise the +// nodes try to reconnect to each other which results in unintended reorgs. +func csd(harnesses []*rpctest.Harness) error { + hTemp, err := rpctest.New(&chaincfg.SimNetParams, nil, nil) + if err != nil { + return err + } + // Tear down node at the end of the function. + defer hTemp.TearDown() + err = hTemp.SetUp(false, 0) + if err != nil { + return err + } + for _, harness := range harnesses { + err = rpctest.ConnectNode(hTemp, harness) + if err != nil { + return err + } + } + return rpctest.JoinNodes(harnesses, rpctest.Blocks) +} + +// waitForSync waits for the ChainService to sync to the current chain state. +func waitForSync(t *testing.T, svc *spvchain.ChainService, + correctSyncNode *rpctest.Harness, checkInterval, + timeout time.Duration) error { + knownBestHash, knownBestHeight, err := + correctSyncNode.Node.GetBestBlock() + if err != nil { + return err + } + t.Logf("Syncing to %v (%v)", knownBestHeight, knownBestHash) + var haveBest *waddrmgr.BlockStamp + haveBest, err = svc.BestSnapshot() + if err != nil { + return err + } + var total time.Duration + for haveBest.Hash != *knownBestHash { + if total > timeout { + return fmt.Errorf("Timed out after %v.", timeout) + } + if haveBest.Height > knownBestHeight { + return fmt.Errorf("Synchronized to the wrong chain.") + } + time.Sleep(checkInterval) + total += checkInterval + haveBest, err = svc.BestSnapshot() + if err != nil { + return fmt.Errorf("Couldn't get best snapshot from "+ + "ChainService: %v", err) + } + t.Logf("Synced to %v (%v)", haveBest.Height, haveBest.Hash) + } + // Check if we're current + if !svc.IsCurrent() { + return fmt.Errorf("ChainService doesn't see itself as current!") + } + return nil +}