mirror of
https://github.com/LBRYFoundation/lbcd.git
synced 2025-08-23 17:47:24 +00:00
interrupt initial trie build via ctrl+c
This commit is contained in:
parent
ba05456830
commit
dc230888ef
4 changed files with 30 additions and 12 deletions
|
@ -140,11 +140,10 @@ func New(cfg config.Config) (*ClaimTrie, error) {
|
||||||
}
|
}
|
||||||
err = trie.SetRoot(hash) // keep this after IncrementHeightTo
|
err = trie.SetRoot(hash) // keep this after IncrementHeightTo
|
||||||
if err == merkletrie.ErrFullRebuildRequired {
|
if err == merkletrie.ErrFullRebuildRequired {
|
||||||
// TODO: pass in the interrupt signal here:
|
ct.runFullTrieRebuild(nil, cfg.Interrupt)
|
||||||
ct.runFullTrieRebuild(nil)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if !ct.MerkleHash().IsEqual(hash) {
|
if interruptRequested(cfg.Interrupt) || !ct.MerkleHash().IsEqual(hash) {
|
||||||
ct.Close()
|
ct.Close()
|
||||||
return nil, errors.Errorf("unable to restore the claim hash to %s at height %d", hash.String(), previousHeight)
|
return nil, errors.Errorf("unable to restore the claim hash to %s at height %d", hash.String(), previousHeight)
|
||||||
}
|
}
|
||||||
|
@ -247,7 +246,7 @@ func (ct *ClaimTrie) AppendBlock() error {
|
||||||
names = append(names, expirations...)
|
names = append(names, expirations...)
|
||||||
names = removeDuplicates(names)
|
names = removeDuplicates(names)
|
||||||
|
|
||||||
nhns := ct.makeNameHashNext(names, false)
|
nhns := ct.makeNameHashNext(names, false, nil)
|
||||||
for nhn := range nhns {
|
for nhn := range nhns {
|
||||||
|
|
||||||
ct.merkleTrie.Update(nhn.Name, nhn.Hash, true)
|
ct.merkleTrie.Update(nhn.Name, nhn.Hash, true)
|
||||||
|
@ -284,7 +283,7 @@ func (ct *ClaimTrie) updateTrieForHashForkIfNecessary() bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
node.LogOnce(fmt.Sprintf("Rebuilding all trie nodes for the hash fork at %d...", ct.height))
|
node.LogOnce(fmt.Sprintf("Rebuilding all trie nodes for the hash fork at %d...", ct.height))
|
||||||
ct.runFullTrieRebuild(nil)
|
ct.runFullTrieRebuild(nil, nil) // I don't think it's safe to allow interrupt during fork
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -330,7 +329,7 @@ func (ct *ClaimTrie) ResetHeight(height int32) error {
|
||||||
}
|
}
|
||||||
err = ct.merkleTrie.SetRoot(hash)
|
err = ct.merkleTrie.SetRoot(hash)
|
||||||
if err == merkletrie.ErrFullRebuildRequired {
|
if err == merkletrie.ErrFullRebuildRequired {
|
||||||
ct.runFullTrieRebuild(names)
|
ct.runFullTrieRebuild(names, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !ct.MerkleHash().IsEqual(hash) {
|
if !ct.MerkleHash().IsEqual(hash) {
|
||||||
|
@ -339,14 +338,14 @@ func (ct *ClaimTrie) ResetHeight(height int32) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ct *ClaimTrie) runFullTrieRebuild(names [][]byte) {
|
func (ct *ClaimTrie) runFullTrieRebuild(names [][]byte, interrupt <-chan struct{}) {
|
||||||
var nhns chan NameHashNext
|
var nhns chan NameHashNext
|
||||||
if names == nil {
|
if names == nil {
|
||||||
node.LogOnce("Building the entire claim trie in RAM...")
|
node.LogOnce("Building the entire claim trie in RAM...")
|
||||||
|
|
||||||
nhns = ct.makeNameHashNext(nil, true)
|
nhns = ct.makeNameHashNext(nil, true, interrupt)
|
||||||
} else {
|
} else {
|
||||||
nhns = ct.makeNameHashNext(names, false)
|
nhns = ct.makeNameHashNext(names, false, interrupt)
|
||||||
}
|
}
|
||||||
|
|
||||||
for nhn := range nhns {
|
for nhn := range nhns {
|
||||||
|
@ -423,7 +422,17 @@ type NameHashNext struct {
|
||||||
Next int32
|
Next int32
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ct *ClaimTrie) makeNameHashNext(names [][]byte, all bool) chan NameHashNext {
|
func interruptRequested(interrupted <-chan struct{}) bool {
|
||||||
|
select {
|
||||||
|
case <-interrupted: // should never block on nil
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ct *ClaimTrie) makeNameHashNext(names [][]byte, all bool, interrupt <-chan struct{}) chan NameHashNext {
|
||||||
inputs := make(chan []byte, 512)
|
inputs := make(chan []byte, 512)
|
||||||
outputs := make(chan NameHashNext, 512)
|
outputs := make(chan NameHashNext, 512)
|
||||||
|
|
||||||
|
@ -448,6 +457,9 @@ func (ct *ClaimTrie) makeNameHashNext(names [][]byte, all bool) chan NameHashNex
|
||||||
go func() {
|
go func() {
|
||||||
if all {
|
if all {
|
||||||
ct.nodeManager.IterateNames(func(name []byte) bool {
|
ct.nodeManager.IterateNames(func(name []byte) bool {
|
||||||
|
if interruptRequested(interrupt) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
clone := make([]byte, len(name))
|
clone := make([]byte, len(name))
|
||||||
copy(clone, name) // iteration name buffer is reused on future loops
|
copy(clone, name) // iteration name buffer is reused on future loops
|
||||||
inputs <- clone
|
inputs <- clone
|
||||||
|
@ -455,6 +467,9 @@ func (ct *ClaimTrie) makeNameHashNext(names [][]byte, all bool) chan NameHashNex
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
for _, name := range names {
|
for _, name := range names {
|
||||||
|
if interruptRequested(interrupt) {
|
||||||
|
break
|
||||||
|
}
|
||||||
inputs <- name
|
inputs <- name
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -240,7 +240,7 @@ func TestRebuild(t *testing.T) {
|
||||||
r.NotEqual(*merkletrie.EmptyTrieHash, *m)
|
r.NotEqual(*merkletrie.EmptyTrieHash, *m)
|
||||||
|
|
||||||
ct.merkleTrie = merkletrie.NewRamTrie()
|
ct.merkleTrie = merkletrie.NewRamTrie()
|
||||||
ct.runFullTrieRebuild(nil)
|
ct.runFullTrieRebuild(nil, nil)
|
||||||
|
|
||||||
m2 := ct.MerkleHash()
|
m2 := ct.MerkleHash()
|
||||||
r.NotNil(m2)
|
r.NotNil(m2)
|
||||||
|
@ -432,7 +432,7 @@ func TestNormalizationRollbackFuzz(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if j > 7 {
|
if j > 7 {
|
||||||
ct.runFullTrieRebuild(nil)
|
ct.runFullTrieRebuild(nil, nil)
|
||||||
h := ct.MerkleHash()
|
h := ct.MerkleHash()
|
||||||
r.True(h.IsEqual(hashes[len(hashes)-1]))
|
r.True(h.IsEqual(hashes[len(hashes)-1]))
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,6 +40,8 @@ type Config struct {
|
||||||
NodeRepoPebble pebbleConfig
|
NodeRepoPebble pebbleConfig
|
||||||
TemporalRepoPebble pebbleConfig
|
TemporalRepoPebble pebbleConfig
|
||||||
MerkleTrieRepoPebble pebbleConfig
|
MerkleTrieRepoPebble pebbleConfig
|
||||||
|
|
||||||
|
Interrupt <-chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
type pebbleConfig struct {
|
type pebbleConfig struct {
|
||||||
|
|
|
@ -2729,6 +2729,7 @@ func newServer(listenAddrs, agentBlacklist, agentWhitelist []string,
|
||||||
|
|
||||||
claimTrieCfg := claimtrieconfig.DefaultConfig
|
claimTrieCfg := claimtrieconfig.DefaultConfig
|
||||||
claimTrieCfg.DataDir = cfg.DataDir
|
claimTrieCfg.DataDir = cfg.DataDir
|
||||||
|
claimTrieCfg.Interrupt = interrupt
|
||||||
|
|
||||||
var ct *claimtrie.ClaimTrie
|
var ct *claimtrie.ClaimTrie
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue