From 82d4b6657bf640924de82fc254fd714f4bed4106 Mon Sep 17 00:00:00 2001 From: Brannon King Date: Tue, 27 Jul 2021 15:10:17 -0400 Subject: [PATCH] force disk flush when caught up to current --- blockchain/chain.go | 27 ++++++++++--------- blockchain/claimtrie.go | 7 ++++- claimtrie/block/blockrepo/pebble.go | 5 ++++ claimtrie/block/repo.go | 1 + claimtrie/chain/chainrepo/pebble.go | 5 ++++ claimtrie/chain/repo.go | 1 + claimtrie/claimtrie.go | 26 ++++++++++++++++++ claimtrie/merkletrie/merkletrie.go | 4 +++ claimtrie/merkletrie/merkletrierepo/pebble.go | 5 ++++ claimtrie/merkletrie/ramtrie.go | 15 +++++++++-- claimtrie/merkletrie/repo.go | 1 + claimtrie/node/log.go | 4 +++ claimtrie/node/manager.go | 5 ++++ claimtrie/node/noderepo/pebble.go | 7 ++++- claimtrie/node/repo.go | 2 ++ claimtrie/temporal/repo.go | 1 + claimtrie/temporal/temporalrepo/memory.go | 4 +++ claimtrie/temporal/temporalrepo/pebble.go | 7 ++++- 18 files changed, 109 insertions(+), 18 deletions(-) diff --git a/blockchain/chain.go b/blockchain/chain.go index 5b7caaa6..a9b58cf1 100644 --- a/blockchain/chain.go +++ b/blockchain/chain.go @@ -574,15 +574,9 @@ func (b *BlockChain) connectBlock(node *blockNode, block *btcutil.Block, "spent transaction out information") } - // Handle LBRY Claim Scripts - if b.claimTrie != nil { - if err := b.ParseClaimScripts(block, node, view, false); err != nil { - return ruleError(ErrBadClaimTrie, err.Error()) - } - } - // No warnings about unknown rules until the chain is current. - if b.isCurrent() { + current := b.isCurrent() + if current { // Warn if any unknown new rules are either about to activate or // have already been activated. if err := b.warnUnknownRuleActivations(node); err != nil { @@ -590,6 +584,13 @@ func (b *BlockChain) connectBlock(node *blockNode, block *btcutil.Block, } } + // Handle LBRY Claim Scripts + if b.claimTrie != nil { + if err := b.ParseClaimScripts(block, node, view, false, current); err != nil { + return ruleError(ErrBadClaimTrie, err.Error()) + } + } + // Write any block status changes to DB before updating best state. err := b.index.flushToDB() if err != nil { @@ -1223,7 +1224,7 @@ func (b *BlockChain) connectBestChain(node *blockNode, block *btcutil.Block, fla // factors are used to guess, but the key factors that allow the chain to // believe it is current are: // - Latest block height is after the latest checkpoint (if enabled) -// - Latest block has a timestamp newer than 24 hours ago +// - Latest block has a timestamp newer than ~6 hours ago (as LBRY block time is one fourth of bitcoin) // // This function MUST be called with the chain state lock held (for reads). func (b *BlockChain) isCurrent() bool { @@ -1234,13 +1235,13 @@ func (b *BlockChain) isCurrent() bool { return false } - // Not current if the latest best block has a timestamp before 24 hours + // Not current if the latest best block has a timestamp before 7 hours // ago. // // The chain appears to be current if none of the checks reported // otherwise. - minus24Hours := b.timeSource.AdjustedTime().Add(-24 * time.Hour).Unix() - return b.bestChain.Tip().timestamp >= minus24Hours + hours := b.timeSource.AdjustedTime().Add(-7 * time.Hour).Unix() + return b.bestChain.Tip().timestamp >= hours } // IsCurrent returns whether or not the chain believes it is current. Several @@ -1879,7 +1880,7 @@ func rebuildMissingClaimTrieData(b *BlockChain, done <-chan struct{}) error { } if h >= b.claimTrie.Height() { - err = b.ParseClaimScripts(block, n, view, true) + err = b.ParseClaimScripts(block, n, view, true, false) if err != nil { return err } diff --git a/blockchain/claimtrie.go b/blockchain/claimtrie.go index 3ca17762..f16c753e 100644 --- a/blockchain/claimtrie.go +++ b/blockchain/claimtrie.go @@ -15,7 +15,8 @@ import ( "github.com/btcsuite/btcd/claimtrie/node" ) -func (b *BlockChain) ParseClaimScripts(block *btcutil.Block, bn *blockNode, view *UtxoViewpoint, failOnHashMiss bool) error { +func (b *BlockChain) ParseClaimScripts(block *btcutil.Block, bn *blockNode, view *UtxoViewpoint, + failOnHashMiss bool, shouldFlush bool) error { ht := block.Height() for _, tx := range block.Transactions() { @@ -40,6 +41,10 @@ func (b *BlockChain) ParseClaimScripts(block *btcutil.Block, bn *blockNode, view } hash := b.claimTrie.MerkleHash() + if shouldFlush { + b.claimTrie.FlushToDisk() + } + if bn.claimTrie != *hash { if failOnHashMiss { return errors.Errorf("height: %d, ct.MerkleHash: %s != node.ClaimTrie: %s", ht, *hash, bn.claimTrie) diff --git a/claimtrie/block/blockrepo/pebble.go b/claimtrie/block/blockrepo/pebble.go index 1623c8d9..9a3ea7a6 100644 --- a/claimtrie/block/blockrepo/pebble.go +++ b/claimtrie/block/blockrepo/pebble.go @@ -69,3 +69,8 @@ func (repo *Pebble) Close() error { err = repo.db.Close() return errors.Wrap(err, "on close") } + +func (repo *Pebble) Flush() error { + _, err := repo.db.AsyncFlush() + return err +} diff --git a/claimtrie/block/repo.go b/claimtrie/block/repo.go index 2ddc46f5..fb5a8c9e 100644 --- a/claimtrie/block/repo.go +++ b/claimtrie/block/repo.go @@ -10,4 +10,5 @@ type Repo interface { Set(height int32, hash *chainhash.Hash) error Get(height int32) (*chainhash.Hash, error) Close() error + Flush() error } diff --git a/claimtrie/chain/chainrepo/pebble.go b/claimtrie/chain/chainrepo/pebble.go index 9dfe9080..ce4417b1 100644 --- a/claimtrie/chain/chainrepo/pebble.go +++ b/claimtrie/chain/chainrepo/pebble.go @@ -69,3 +69,8 @@ func (repo *Pebble) Close() error { err = repo.db.Close() return errors.Wrap(err, "on close") } + +func (repo *Pebble) Flush() error { + _, err := repo.db.AsyncFlush() + return err +} diff --git a/claimtrie/chain/repo.go b/claimtrie/chain/repo.go index 47be2f65..90a59d80 100644 --- a/claimtrie/chain/repo.go +++ b/claimtrie/chain/repo.go @@ -6,4 +6,5 @@ type Repo interface { Save(height int32, changes []change.Change) error Load(height int32) ([]change.Change, error) Close() error + Flush() error } diff --git a/claimtrie/claimtrie.go b/claimtrie/claimtrie.go index a753aec3..1f2094e9 100644 --- a/claimtrie/claimtrie.go +++ b/claimtrie/claimtrie.go @@ -410,3 +410,29 @@ func (ct *ClaimTrie) forwardNodeChange(chg change.Change) error { func (ct *ClaimTrie) Node(name []byte) (*node.Node, error) { return ct.nodeManager.Node(name) } + +func (ct *ClaimTrie) FlushToDisk() { + // maybe the user can fix the file lock shown in the warning before they shut down + if err := ct.nodeManager.Flush(); err != nil { + node.Warn("During nodeManager flush: " + err.Error()) + } + if err := ct.temporalRepo.Flush(); err != nil { + node.Warn("During temporalRepo flush: " + err.Error()) + } + if err := ct.merkleTrie.Flush(); err != nil { + node.Warn("During merkleTrie flush: " + err.Error()) + } + if err := ct.blockRepo.Flush(); err != nil { + node.Warn("During blockRepo flush: " + err.Error()) + } + if ct.reportedBlockRepo != nil { + if err := ct.reportedBlockRepo.Flush(); err != nil { + node.Warn("During reportedBlockRepo flush: " + err.Error()) + } + } + if ct.chainRepo != nil { + if err := ct.chainRepo.Flush(); err != nil { + node.Warn("During chainRepo flush: " + err.Error()) + } + } +} diff --git a/claimtrie/merkletrie/merkletrie.go b/claimtrie/merkletrie/merkletrie.go index 167d8e93..8e0b950f 100644 --- a/claimtrie/merkletrie/merkletrie.go +++ b/claimtrie/merkletrie/merkletrie.go @@ -272,3 +272,7 @@ func (t *PersistentTrie) Dump(s string) { fmt.Printf(" Child %s hash: %s\n", string(key), value.merkleHash.String()) } } + +func (t *PersistentTrie) Flush() error { + return t.repo.Flush() +} diff --git a/claimtrie/merkletrie/merkletrierepo/pebble.go b/claimtrie/merkletrie/merkletrierepo/pebble.go index ee5614dd..90bf320e 100644 --- a/claimtrie/merkletrie/merkletrierepo/pebble.go +++ b/claimtrie/merkletrie/merkletrierepo/pebble.go @@ -59,3 +59,8 @@ func (repo *Pebble) Close() error { err = repo.db.Close() return errors.Wrap(err, "on close") } + +func (repo *Pebble) Flush() error { + _, err := repo.db.AsyncFlush() + return err +} diff --git a/claimtrie/merkletrie/ramtrie.go b/claimtrie/merkletrie/ramtrie.go index 5c4987ae..b032f24d 100644 --- a/claimtrie/merkletrie/ramtrie.go +++ b/claimtrie/merkletrie/ramtrie.go @@ -2,10 +2,12 @@ package merkletrie import ( "bytes" + "runtime" + "strconv" + "sync" + "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/claimtrie/node" - "runtime" - "sync" ) type MerkleTrie interface { @@ -13,6 +15,7 @@ type MerkleTrie interface { Update(name []byte, restoreChildren bool) MerkleHash() *chainhash.Hash MerkleHashAllClaims() *chainhash.Hash + Flush() error } type RamTrie struct { @@ -50,10 +53,14 @@ func (rt *RamTrie) SetRoot(h *chainhash.Hash, names [][]byte) { runtime.GC() } + c := 0 rt.store.IterateNames(func(name []byte) bool { rt.Update(name, false) + c++ return true }) + + node.LogOnce("Completed claim trie construction. Name count: " + strconv.Itoa(c)) } else { for _, name := range names { rt.Update(name, false) @@ -147,3 +154,7 @@ func (rt *RamTrie) merkleHashAllClaims(v *collapsedVertex) *chainhash.Hash { v.merkleHash = node.HashMerkleBranches(childHash, claimHash) return v.merkleHash } + +func (rt *RamTrie) Flush() error { + return nil +} diff --git a/claimtrie/merkletrie/repo.go b/claimtrie/merkletrie/repo.go index 75c57261..68b6c8d6 100644 --- a/claimtrie/merkletrie/repo.go +++ b/claimtrie/merkletrie/repo.go @@ -9,4 +9,5 @@ type Repo interface { Get(key []byte) ([]byte, io.Closer, error) Set(key, value []byte) error Close() error + Flush() error } diff --git a/claimtrie/node/log.go b/claimtrie/node/log.go index a51dc809..07db0748 100644 --- a/claimtrie/node/log.go +++ b/claimtrie/node/log.go @@ -35,3 +35,7 @@ func LogOnce(s string) { loggedStrings[s] = true log.Info(s) } + +func Warn(s string) { + log.Warn(s) +} diff --git a/claimtrie/node/manager.go b/claimtrie/node/manager.go index dc8b5fc8..8995e7c6 100644 --- a/claimtrie/node/manager.go +++ b/claimtrie/node/manager.go @@ -25,6 +25,7 @@ type Manager interface { NextUpdateHeightOfNode(name []byte) ([]byte, int32) IterateNames(predicate func(name []byte) bool) Hash(name []byte) *chainhash.Hash + Flush() error } type nodeCacheLeaf struct { @@ -448,3 +449,7 @@ func calculateNodeHash(op wire.OutPoint, takeover int32) *chainhash.Hash { return &hh } + +func (nm *BaseManager) Flush() error { + return nm.repo.Flush() +} diff --git a/claimtrie/node/noderepo/pebble.go b/claimtrie/node/noderepo/pebble.go index e0332dd9..530a2271 100644 --- a/claimtrie/node/noderepo/pebble.go +++ b/claimtrie/node/noderepo/pebble.go @@ -79,7 +79,7 @@ func init() { func NewPebble(path string) (*Pebble, error) { - db, err := pebble.Open(path, &pebble.Options{Cache: pebble.NewCache(256 << 20), BytesPerSync: 16 << 20}) + db, err := pebble.Open(path, &pebble.Options{Cache: pebble.NewCache(32 << 20), BytesPerSync: 4 << 20}) repo := &Pebble{db: db} return repo, errors.Wrapf(err, "unable to open %s", path) @@ -218,3 +218,8 @@ func (repo *Pebble) Close() error { err = repo.db.Close() return errors.Wrap(err, "on close") } + +func (repo *Pebble) Flush() error { + _, err := repo.db.AsyncFlush() + return err +} diff --git a/claimtrie/node/repo.go b/claimtrie/node/repo.go index 4140fba3..87db4cce 100644 --- a/claimtrie/node/repo.go +++ b/claimtrie/node/repo.go @@ -26,4 +26,6 @@ type Repo interface { // IterateAll iterates keys until the predicate function returns false IterateAll(predicate func(name []byte) bool) + + Flush() error } diff --git a/claimtrie/temporal/repo.go b/claimtrie/temporal/repo.go index d3d45100..6b2df037 100644 --- a/claimtrie/temporal/repo.go +++ b/claimtrie/temporal/repo.go @@ -5,4 +5,5 @@ type Repo interface { SetNodesAt(names [][]byte, heights []int32) error NodesAt(height int32) ([][]byte, error) Close() error + Flush() error } diff --git a/claimtrie/temporal/temporalrepo/memory.go b/claimtrie/temporal/temporalrepo/memory.go index 8ddca4c6..0c1c8591 100644 --- a/claimtrie/temporal/temporalrepo/memory.go +++ b/claimtrie/temporal/temporalrepo/memory.go @@ -39,3 +39,7 @@ func (repo *Memory) NodesAt(height int32) ([][]byte, error) { func (repo *Memory) Close() error { return nil } + +func (repo *Memory) Flush() error { + return nil +} diff --git a/claimtrie/temporal/temporalrepo/pebble.go b/claimtrie/temporal/temporalrepo/pebble.go index 96f013ae..b7a852dd 100644 --- a/claimtrie/temporal/temporalrepo/pebble.go +++ b/claimtrie/temporal/temporalrepo/pebble.go @@ -14,7 +14,7 @@ type Pebble struct { func NewPebble(path string) (*Pebble, error) { - db, err := pebble.Open(path, &pebble.Options{Cache: pebble.NewCache(128 << 20)}) + db, err := pebble.Open(path, &pebble.Options{Cache: pebble.NewCache(16 << 20)}) repo := &Pebble{db: db} return repo, errors.Wrapf(err, "unable to open %s", path) @@ -79,3 +79,8 @@ func (repo *Pebble) Close() error { err = repo.db.Close() return errors.Wrap(err, "on close") } + +func (repo *Pebble) Flush() error { + _, err := repo.db.AsyncFlush() + return err +}