mirror of
https://github.com/LBRYFoundation/lbcd.git
synced 2025-08-23 17:47:24 +00:00
blockchain: fix --dropcfindex
This commit is contained in:
parent
8ad7aa5d5d
commit
860100019f
2 changed files with 93 additions and 50 deletions
|
@ -25,6 +25,9 @@ const (
|
||||||
// generated and dropped in pairs, and both are indexed by a block's hash.
|
// generated and dropped in pairs, and both are indexed by a block's hash.
|
||||||
// Besides holding different content, they also live in different buckets.
|
// Besides holding different content, they also live in different buckets.
|
||||||
var (
|
var (
|
||||||
|
// cfIndexParentBucketKey is the name of the parent bucket used to house
|
||||||
|
// the index. The rest of the buckets live below this bucket.
|
||||||
|
cfIndexParentBucketKey = []byte("cfindexparentbucket")
|
||||||
// cfBasicIndexKey is the name of the db bucket used to house the
|
// cfBasicIndexKey is the name of the db bucket used to house the
|
||||||
// block hash -> basic cf index (cf#0).
|
// block hash -> basic cf index (cf#0).
|
||||||
cfBasicIndexKey = []byte("cf0byhashidx")
|
cfBasicIndexKey = []byte("cf0byhashidx")
|
||||||
|
@ -42,14 +45,14 @@ var (
|
||||||
// dbFetchFilter retrieves a block's basic or extended filter. A filter's
|
// dbFetchFilter retrieves a block's basic or extended filter. A filter's
|
||||||
// absence is not considered an error.
|
// absence is not considered an error.
|
||||||
func dbFetchFilter(dbTx database.Tx, key []byte, h *chainhash.Hash) ([]byte, error) {
|
func dbFetchFilter(dbTx database.Tx, key []byte, h *chainhash.Hash) ([]byte, error) {
|
||||||
idx := dbTx.Metadata().Bucket(key)
|
idx := dbTx.Metadata().Bucket(cfIndexParentBucketKey).Bucket(key)
|
||||||
return idx.Get(h[:]), nil
|
return idx.Get(h[:]), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// dbFetchFilterHeader retrieves a block's basic or extended filter header.
|
// dbFetchFilterHeader retrieves a block's basic or extended filter header.
|
||||||
// A filter's absence is not considered an error.
|
// A filter's absence is not considered an error.
|
||||||
func dbFetchFilterHeader(dbTx database.Tx, key []byte, h *chainhash.Hash) ([]byte, error) {
|
func dbFetchFilterHeader(dbTx database.Tx, key []byte, h *chainhash.Hash) ([]byte, error) {
|
||||||
idx := dbTx.Metadata().Bucket(key)
|
idx := dbTx.Metadata().Bucket(cfIndexParentBucketKey).Bucket(key)
|
||||||
fh := idx.Get(h[:])
|
fh := idx.Get(h[:])
|
||||||
if len(fh) != fastsha256.Size {
|
if len(fh) != fastsha256.Size {
|
||||||
return nil, errors.New("invalid filter header length")
|
return nil, errors.New("invalid filter header length")
|
||||||
|
@ -59,7 +62,7 @@ func dbFetchFilterHeader(dbTx database.Tx, key []byte, h *chainhash.Hash) ([]byt
|
||||||
|
|
||||||
// dbStoreFilter stores a block's basic or extended filter.
|
// dbStoreFilter stores a block's basic or extended filter.
|
||||||
func dbStoreFilter(dbTx database.Tx, key []byte, h *chainhash.Hash, f []byte) error {
|
func dbStoreFilter(dbTx database.Tx, key []byte, h *chainhash.Hash, f []byte) error {
|
||||||
idx := dbTx.Metadata().Bucket(key)
|
idx := dbTx.Metadata().Bucket(cfIndexParentBucketKey).Bucket(key)
|
||||||
return idx.Put(h[:], f)
|
return idx.Put(h[:], f)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -68,19 +71,19 @@ func dbStoreFilterHeader(dbTx database.Tx, key []byte, h *chainhash.Hash, fh []b
|
||||||
if len(fh) != fastsha256.Size {
|
if len(fh) != fastsha256.Size {
|
||||||
return errors.New("invalid filter header length")
|
return errors.New("invalid filter header length")
|
||||||
}
|
}
|
||||||
idx := dbTx.Metadata().Bucket(key)
|
idx := dbTx.Metadata().Bucket(cfIndexParentBucketKey).Bucket(key)
|
||||||
return idx.Put(h[:], fh)
|
return idx.Put(h[:], fh)
|
||||||
}
|
}
|
||||||
|
|
||||||
// dbDeleteFilter deletes a filter's basic or extended filter.
|
// dbDeleteFilter deletes a filter's basic or extended filter.
|
||||||
func dbDeleteFilter(dbTx database.Tx, key []byte, h *chainhash.Hash) error {
|
func dbDeleteFilter(dbTx database.Tx, key []byte, h *chainhash.Hash) error {
|
||||||
idx := dbTx.Metadata().Bucket(key)
|
idx := dbTx.Metadata().Bucket(cfIndexParentBucketKey).Bucket(key)
|
||||||
return idx.Delete(h[:])
|
return idx.Delete(h[:])
|
||||||
}
|
}
|
||||||
|
|
||||||
// dbDeleteFilterHeader deletes a filter's basic or extended filter header.
|
// dbDeleteFilterHeader deletes a filter's basic or extended filter header.
|
||||||
func dbDeleteFilterHeader(dbTx database.Tx, key []byte, h *chainhash.Hash) error {
|
func dbDeleteFilterHeader(dbTx database.Tx, key []byte, h *chainhash.Hash) error {
|
||||||
idx := dbTx.Metadata().Bucket(key)
|
idx := dbTx.Metadata().Bucket(cfIndexParentBucketKey).Bucket(key)
|
||||||
return idx.Delete(h[:])
|
return idx.Delete(h[:])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -102,7 +105,7 @@ func (idx *CfIndex) Init() error {
|
||||||
// Key returns the database key to use for the index as a byte slice. This is
|
// Key returns the database key to use for the index as a byte slice. This is
|
||||||
// part of the Indexer interface.
|
// part of the Indexer interface.
|
||||||
func (idx *CfIndex) Key() []byte {
|
func (idx *CfIndex) Key() []byte {
|
||||||
return cfBasicIndexKey
|
return cfIndexParentBucketKey
|
||||||
}
|
}
|
||||||
|
|
||||||
// Name returns the human-readable name of the index. This is part of the
|
// Name returns the human-readable name of the index. This is part of the
|
||||||
|
@ -116,19 +119,23 @@ func (idx *CfIndex) Name() string {
|
||||||
// indexes (simple, extended).
|
// indexes (simple, extended).
|
||||||
func (idx *CfIndex) Create(dbTx database.Tx) error {
|
func (idx *CfIndex) Create(dbTx database.Tx) error {
|
||||||
meta := dbTx.Metadata()
|
meta := dbTx.Metadata()
|
||||||
_, err := meta.CreateBucket(cfBasicIndexKey)
|
cfIndexParentBucket, err := meta.CreateBucket(cfIndexParentBucketKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
_, err = meta.CreateBucket(cfBasicHeaderKey)
|
_, err = cfIndexParentBucket.CreateBucket(cfBasicIndexKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
_, err = meta.CreateBucket(cfExtendedIndexKey)
|
_, err = cfIndexParentBucket.CreateBucket(cfBasicHeaderKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
_, err = meta.CreateBucket(cfExtendedHeaderKey)
|
_, err = cfIndexParentBucket.CreateBucket(cfExtendedIndexKey)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = cfIndexParentBucket.CreateBucket(cfExtendedHeaderKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -317,18 +324,5 @@ func NewCfIndex(db database.DB, chainParams *chaincfg.Params) *CfIndex {
|
||||||
|
|
||||||
// DropCfIndex drops the CF index from the provided database if exists.
|
// DropCfIndex drops the CF index from the provided database if exists.
|
||||||
func DropCfIndex(db database.DB) error {
|
func DropCfIndex(db database.DB) error {
|
||||||
err := dropIndex(db, cfBasicIndexKey, cfIndexName)
|
return dropIndex(db, cfIndexParentBucketKey, cfIndexName)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
err = dropIndex(db, cfBasicHeaderKey, cfIndexName)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
err = dropIndex(db, cfExtendedIndexKey, cfIndexName)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
err = dropIndex(db, cfExtendedHeaderKey, cfIndexName)
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -609,37 +609,90 @@ func dropIndex(db database.DB, idxKey []byte, idxName string, interrupt <-chan s
|
||||||
// the bucket in a single database transaction would result in massive
|
// the bucket in a single database transaction would result in massive
|
||||||
// memory usage and likely crash many systems due to ulimits. In order
|
// memory usage and likely crash many systems due to ulimits. In order
|
||||||
// to avoid this, use a cursor to delete a maximum number of entries out
|
// to avoid this, use a cursor to delete a maximum number of entries out
|
||||||
// of the bucket at a time.
|
// of the bucket at a time. Recurse buckets depth-first to delete any
|
||||||
|
// sub-buckets.
|
||||||
const maxDeletions = 2000000
|
const maxDeletions = 2000000
|
||||||
var totalDeleted uint64
|
var totalDeleted uint64
|
||||||
for numDeleted := maxDeletions; numDeleted == maxDeletions; {
|
|
||||||
numDeleted = 0
|
|
||||||
err := db.Update(func(dbTx database.Tx) error {
|
|
||||||
bucket := dbTx.Metadata().Bucket(idxKey)
|
|
||||||
cursor := bucket.Cursor()
|
|
||||||
for ok := cursor.First(); ok; ok = cursor.Next() &&
|
|
||||||
numDeleted < maxDeletions {
|
|
||||||
|
|
||||||
if err := cursor.Delete(); err != nil {
|
// Recurse through all buckets in the index, cataloging each for
|
||||||
return err
|
// later deletion.
|
||||||
}
|
var subBuckets [][][]byte
|
||||||
numDeleted++
|
var subBucketClosure func(database.Tx, []byte, [][]byte) error
|
||||||
}
|
subBucketClosure = func(dbTx database.Tx,
|
||||||
return nil
|
subBucket []byte, tlBucket [][]byte) error {
|
||||||
})
|
// Get full bucket name and append to subBuckets for later
|
||||||
if err != nil {
|
// deletion.
|
||||||
return err
|
var bucketName [][]byte
|
||||||
|
if (tlBucket == nil) || (len(tlBucket) == 0) {
|
||||||
|
bucketName = append(bucketName, subBucket)
|
||||||
|
} else {
|
||||||
|
bucketName = append(tlBucket, subBucket)
|
||||||
}
|
}
|
||||||
|
subBuckets = append(subBuckets, bucketName)
|
||||||
|
// Recurse sub-buckets to append to subBuckets slice.
|
||||||
|
bucket := dbTx.Metadata()
|
||||||
|
for _, subBucketName := range bucketName {
|
||||||
|
bucket = bucket.Bucket(subBucketName)
|
||||||
|
}
|
||||||
|
return bucket.ForEachBucket(func(k []byte) error {
|
||||||
|
return subBucketClosure(dbTx, k, bucketName)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
if numDeleted > 0 {
|
// Call subBucketClosure with top-level bucket.
|
||||||
totalDeleted += uint64(numDeleted)
|
err = db.View(func(dbTx database.Tx) error {
|
||||||
log.Infof("Deleted %d keys (%d total) from %s",
|
return subBucketClosure(dbTx, idxKey, nil)
|
||||||
numDeleted, totalDeleted, idxName)
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Iterate through each sub-bucket in reverse, deepest-first, deleting
|
||||||
|
// all keys inside them and then dropping the buckets themselves.
|
||||||
|
for i := range subBuckets {
|
||||||
|
bucketName := subBuckets[len(subBuckets)-1-i]
|
||||||
|
// Delete maxDeletions key/value pairs at a time.
|
||||||
|
for numDeleted := maxDeletions; numDeleted == maxDeletions; {
|
||||||
|
numDeleted = 0
|
||||||
|
err := db.Update(func(dbTx database.Tx) error {
|
||||||
|
subBucket := dbTx.Metadata()
|
||||||
|
for _, subBucketName := range bucketName {
|
||||||
|
subBucket = subBucket.Bucket(subBucketName)
|
||||||
|
}
|
||||||
|
cursor := subBucket.Cursor()
|
||||||
|
for ok := cursor.First(); ok; ok = cursor.Next() &&
|
||||||
|
numDeleted < maxDeletions {
|
||||||
|
|
||||||
|
if err := cursor.Delete(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
numDeleted++
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if numDeleted > 0 {
|
||||||
|
totalDeleted += uint64(numDeleted)
|
||||||
|
log.Infof("Deleted %d keys (%d total) from %s",
|
||||||
|
numDeleted, totalDeleted, idxName)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if interruptRequested(interrupt) {
|
if interruptRequested(interrupt) {
|
||||||
return errInterruptRequested
|
return errInterruptRequested
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Drop the bucket itself.
|
||||||
|
err = db.Update(func(dbTx database.Tx) error {
|
||||||
|
bucket := dbTx.Metadata()
|
||||||
|
for j := 0; j < len(bucketName)-1; j++ {
|
||||||
|
bucket = bucket.Bucket(bucketName[j])
|
||||||
|
}
|
||||||
|
return bucket.DeleteBucket(bucketName[len(bucketName)-1])
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Call extra index specific deinitialization for the transaction index.
|
// Call extra index specific deinitialization for the transaction index.
|
||||||
|
@ -658,10 +711,6 @@ func dropIndex(db database.DB, idxKey []byte, idxName string, interrupt <-chan s
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := meta.DeleteBucket(idxKey); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return indexesBucket.Delete(indexDropKey(idxKey))
|
return indexesBucket.Delete(indexDropKey(idxKey))
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Add table
Reference in a new issue