diff --git a/blockchain/accept.go b/blockchain/accept.go index 2e877bd6..f85d6558 100644 --- a/blockchain/accept.go +++ b/blockchain/accept.go @@ -60,12 +60,18 @@ func (b *BlockChain) maybeAcceptBlock(block *btcutil.Block, flags BehaviorFlags) return false, err } - // Create a new block node for the block and add it to the in-memory - // block chain (could be either a side chain or the main chain). + // Create a new block node for the block and add it to the node index. Even + // if the block ultimately gets connected to the main chain, it starts out + // on a side chain. blockHeader := &block.MsgBlock().Header newNode := newBlockNode(blockHeader, prevNode) newNode.status = statusDataStored + b.index.AddNode(newNode) + err = b.index.flushToDB() + if err != nil { + return false, err + } // Connect the passed block to the chain while respecting proper chain // selection according to the chain with the most proof of work. This diff --git a/blockchain/blockindex.go b/blockchain/blockindex.go index f3a060f1..2ff2fa27 100644 --- a/blockchain/blockindex.go +++ b/blockchain/blockindex.go @@ -231,6 +231,7 @@ type blockIndex struct { sync.RWMutex index map[chainhash.Hash]*blockNode + dirty map[*blockNode]struct{} } // newBlockIndex returns a new empty instance of a block index. The index will @@ -241,6 +242,7 @@ func newBlockIndex(db database.DB, chainParams *chaincfg.Params) *blockIndex { db: db, chainParams: chainParams, index: make(map[chainhash.Hash]*blockNode), + dirty: make(map[*blockNode]struct{}), } } @@ -265,16 +267,25 @@ func (bi *blockIndex) LookupNode(hash *chainhash.Hash) *blockNode { return node } -// AddNode adds the provided node to the block index. Duplicate entries are not -// checked so it is up to caller to avoid adding them. +// AddNode adds the provided node to the block index and marks it as dirty. +// Duplicate entries are not checked so it is up to caller to avoid adding them. // // This function is safe for concurrent access. func (bi *blockIndex) AddNode(node *blockNode) { bi.Lock() - bi.index[node.hash] = node + bi.addNode(node) + bi.dirty[node] = struct{}{} bi.Unlock() } +// addNode adds the provided node to the block index, but does not mark it as +// dirty. This can be used while initializing the block index. +// +// This function is NOT safe for concurrent access. +func (bi *blockIndex) addNode(node *blockNode) { + bi.index[node.hash] = node +} + // NodeStatus provides concurrent-safe access to the status field of a node. // // This function is safe for concurrent access. @@ -293,6 +304,7 @@ func (bi *blockIndex) NodeStatus(node *blockNode) blockStatus { func (bi *blockIndex) SetStatusFlags(node *blockNode, flags blockStatus) { bi.Lock() node.status |= flags + bi.dirty[node] = struct{}{} bi.Unlock() } @@ -303,5 +315,34 @@ func (bi *blockIndex) SetStatusFlags(node *blockNode, flags blockStatus) { func (bi *blockIndex) UnsetStatusFlags(node *blockNode, flags blockStatus) { bi.Lock() node.status &^= flags + bi.dirty[node] = struct{}{} bi.Unlock() } + +// flushToDB writes all dirty block nodes to the database. If all writes +// succeed, this clears the dirty set. +func (bi *blockIndex) flushToDB() error { + bi.Lock() + if len(bi.dirty) == 0 { + bi.Unlock() + return nil + } + + err := bi.db.Update(func(dbTx database.Tx) error { + for node := range bi.dirty { + err := dbStoreBlockNode(dbTx, node) + if err != nil { + return err + } + } + return nil + }) + + // If write was successful, clear the dirty set. + if err == nil { + bi.dirty = make(map[*blockNode]struct{}) + } + + bi.Unlock() + return err +} diff --git a/blockchain/chain.go b/blockchain/chain.go index df90d5fd..d00557e7 100644 --- a/blockchain/chain.go +++ b/blockchain/chain.go @@ -495,6 +495,8 @@ func LockTimeToSequence(isSeconds bool, locktime uint32) uint32 { // passed node is the new end of the main chain. The lists will be empty if the // passed node is not on a side chain. // +// This function may modify node statuses in the block index without flushing. +// // This function MUST be called with the chain state lock held (for reads). func (b *BlockChain) getReorganizeNodes(node *blockNode) (*list.List, *list.List) { attachNodes := list.New() @@ -585,6 +587,12 @@ func (b *BlockChain) connectBlock(node *blockNode, block *btcutil.Block, view *U } } + // Write any block status changes to DB before updating best state. + err := b.index.flushToDB() + if err != nil { + return err + } + // Generate a new best state snapshot that will be used to update the // database and later memory if all database updates are successful. b.stateLock.RLock() @@ -597,7 +605,7 @@ func (b *BlockChain) connectBlock(node *blockNode, block *btcutil.Block, view *U curTotalTxns+numTxns, node.CalcPastMedianTime()) // Atomically insert info into the database. - err := b.db.Update(func(dbTx database.Tx) error { + err = b.db.Update(func(dbTx database.Tx) error { // Update best block state. err := dbPutBestState(dbTx, state, node.workSum) if err != nil { @@ -691,6 +699,12 @@ func (b *BlockChain) disconnectBlock(node *blockNode, block *btcutil.Block, view return err } + // Write any block status changes to DB before updating best state. + err = b.index.flushToDB() + if err != nil { + return err + } + // Generate a new best state snapshot that will be used to update the // database and later memory if all database updates are successful. b.stateLock.RLock() @@ -792,6 +806,8 @@ func countSpentOutputs(block *btcutil.Block) int { // the chain) and nodes the are being attached must be in forwards order // (think pushing them onto the end of the chain). // +// This function may modify node statuses in the block index without flushing. +// // This function MUST be called with the chain state lock held (for writes). func (b *BlockChain) reorganizeChain(detachNodes, attachNodes *list.List) error { // All of the blocks to detach and related spend journal entries needed @@ -1030,13 +1046,26 @@ func (b *BlockChain) connectBestChain(node *blockNode, block *btcutil.Block, fla stxos := make([]spentTxOut, 0, countSpentOutputs(block)) if !fastAdd { err := b.checkConnectBlock(node, block, view, &stxos) - if err != nil { - if _, ok := err.(RuleError); ok { - b.index.SetStatusFlags(node, statusValidateFailed) - } + if err == nil { + b.index.SetStatusFlags(node, statusValid) + } else if _, ok := err.(RuleError); ok { + b.index.SetStatusFlags(node, statusValidateFailed) + } else { + return false, err + } + + // Intentionally ignore errors writing updated node status to DB. If + // it fails to write, it's not the end of the world. If the block is + // valid, we flush in connectBlock and if the block is invalid, the + // worst that can happen is we revalidate the block after a restart. + if writeErr := b.index.flushToDB(); writeErr != nil { + log.Warnf("Error flushing block index changes to disk: %v", + writeErr) + } + + if err != nil { return false, err } - b.index.SetStatusFlags(node, statusValid) } // In the fast add case the code to check the block connection @@ -1097,11 +1126,16 @@ func (b *BlockChain) connectBestChain(node *blockNode, block *btcutil.Block, fla // Reorganize the chain. log.Infof("REORGANIZE: Block %v is causing a reorganize.", node.hash) err := b.reorganizeChain(detachNodes, attachNodes) - if err != nil { - return false, err + + // Either getReorganizeNodes or reorganizeChain could have made unsaved + // changes to the block index, so flush regardless of whether there was an + // error. The index would only be dirty if the block failed to connect, so + // we can ignore any errors writing. + if writeErr := b.index.flushToDB(); writeErr != nil { + log.Warnf("Error flushing block index changes to disk: %v", writeErr) } - return true, nil + return err == nil, err } // isCurrent returns whether or not the chain believes it is current. Several diff --git a/blockchain/chainio.go b/blockchain/chainio.go index d7ff6ba1..44558494 100644 --- a/blockchain/chainio.go +++ b/blockchain/chainio.go @@ -1076,7 +1076,7 @@ func (b *BlockChain) createChainState() error { b.bestChain.SetTip(node) // Add the new node to the index which is used for faster lookups. - b.index.AddNode(node) + b.index.addNode(node) // Initialize the state related to the best block. Since it is the // genesis block, use its timestamp for the median time. @@ -1150,8 +1150,7 @@ func (b *BlockChain) createChainState() error { func (b *BlockChain) initChainState() error { // Determine the state of the chain database. We may need to initialize // everything from scratch or upgrade certain buckets. - var initialized bool - var hasBlockIndex bool + var initialized, hasBlockIndex bool err := b.db.View(func(dbTx database.Tx) error { initialized = dbTx.Metadata().Get(chainStateKeyName) != nil hasBlockIndex = dbTx.Metadata().Bucket(blockIndexBucketName) != nil @@ -1209,9 +1208,7 @@ func (b *BlockChain) initChainState() error { var lastNode *blockNode cursor = blockIndexBucket.Cursor() for ok := cursor.First(); ok; ok = cursor.Next() { - var header wire.BlockHeader - headerBytes := cursor.Value() - err := header.Deserialize(bytes.NewReader(headerBytes)) + header, status, err := deserializeBlockRow(cursor.Value()) if err != nil { return err } @@ -1243,9 +1240,9 @@ func (b *BlockChain) initChainState() error { // Initialize the block node for the block, connect it, // and add it to the block index. node := &blockNodes[i] - initBlockNode(node, &header, parent) - node.status = statusDataStored | statusValid - b.index.AddNode(node) + initBlockNode(node, header, parent) + node.status = status + b.index.addNode(node) lastNode = node i++ @@ -1281,6 +1278,25 @@ func (b *BlockChain) initChainState() error { }) } +// deserializeBlockRow parses a value in the block index bucket into a block +// header and block status bitfield. +func deserializeBlockRow(blockRow []byte) (*wire.BlockHeader, blockStatus, error) { + buffer := bytes.NewReader(blockRow) + + var header wire.BlockHeader + err := header.Deserialize(buffer) + if err != nil { + return nil, statusNone, err + } + + statusByte, err := buffer.ReadByte() + if err != nil { + return nil, statusNone, err + } + + return &header, blockStatus(statusByte), nil +} + // dbFetchHeaderByHash uses an existing database transaction to retrieve the // block header for the provided hash. func dbFetchHeaderByHash(dbTx database.Tx, hash *chainhash.Hash) (*wire.BlockHeader, error) { @@ -1329,40 +1345,31 @@ func dbFetchBlockByNode(dbTx database.Tx, node *blockNode) (*btcutil.Block, erro return block, nil } -// dbStoreBlock stores the provided block in the database. The block header is -// written to the block index bucket and full block data is written to ffldb. -func dbStoreBlockHeader(dbTx database.Tx, blockHeader *wire.BlockHeader, height uint32) error { - // Serialize block data to be stored. This is just the serialized header. - w := bytes.NewBuffer(make([]byte, 0, blockHdrSize)) - err := blockHeader.Serialize(w) +// dbStoreBlockNode stores the block header and validation status to the block +// index bucket. This overwrites the current entry if there exists one. +func dbStoreBlockNode(dbTx database.Tx, node *blockNode) error { + // Serialize block data to be stored. + w := bytes.NewBuffer(make([]byte, 0, blockHdrSize+1)) + header := node.Header() + err := header.Serialize(w) + if err != nil { + return err + } + err = w.WriteByte(byte(node.status)) if err != nil { return err } value := w.Bytes() // Write block header data to block index bucket. - blockHash := blockHeader.BlockHash() blockIndexBucket := dbTx.Metadata().Bucket(blockIndexBucketName) - key := blockIndexKey(&blockHash, height) + key := blockIndexKey(&node.hash, uint32(node.height)) return blockIndexBucket.Put(key, value) } -// dbStoreBlock stores the provided block in the database. The block header is -// written to the block index bucket and full block data is written to ffldb. +// dbStoreBlock stores the provided block in the database if it is not already +// there. The full block data is written to ffldb. func dbStoreBlock(dbTx database.Tx, block *btcutil.Block) error { - if block.Height() == btcutil.BlockHeightUnknown { - return fmt.Errorf("cannot store block %s with unknown height", - block.Hash()) - } - - // First store block header in the block index bucket. - err := dbStoreBlockHeader(dbTx, &block.MsgBlock().Header, - uint32(block.Height())) - if err != nil { - return err - } - - // Then store block data in ffldb if we haven't already. hasBlock, err := dbTx.HasBlock(block.Hash()) if err != nil { return err