diff --git a/blockchain/indexers/cfindex.go b/blockchain/indexers/cfindex.go index bf533a3a..3072445d 100644 --- a/blockchain/indexers/cfindex.go +++ b/blockchain/indexers/cfindex.go @@ -25,6 +25,9 @@ const ( // generated and dropped in pairs, and both are indexed by a block's hash. // Besides holding different content, they also live in different buckets. var ( + // cfIndexParentBucketKey is the name of the parent bucket used to house + // the index. The rest of the buckets live below this bucket. + cfIndexParentBucketKey = []byte("cfindexparentbucket") // cfBasicIndexKey is the name of the db bucket used to house the // block hash -> basic cf index (cf#0). cfBasicIndexKey = []byte("cf0byhashidx") @@ -42,14 +45,14 @@ var ( // dbFetchFilter retrieves a block's basic or extended filter. A filter's // absence is not considered an error. func dbFetchFilter(dbTx database.Tx, key []byte, h *chainhash.Hash) ([]byte, error) { - idx := dbTx.Metadata().Bucket(key) + idx := dbTx.Metadata().Bucket(cfIndexParentBucketKey).Bucket(key) return idx.Get(h[:]), nil } // dbFetchFilterHeader retrieves a block's basic or extended filter header. // A filter's absence is not considered an error. func dbFetchFilterHeader(dbTx database.Tx, key []byte, h *chainhash.Hash) ([]byte, error) { - idx := dbTx.Metadata().Bucket(key) + idx := dbTx.Metadata().Bucket(cfIndexParentBucketKey).Bucket(key) fh := idx.Get(h[:]) if len(fh) != fastsha256.Size { return nil, errors.New("invalid filter header length") @@ -59,7 +62,7 @@ func dbFetchFilterHeader(dbTx database.Tx, key []byte, h *chainhash.Hash) ([]byt // dbStoreFilter stores a block's basic or extended filter. func dbStoreFilter(dbTx database.Tx, key []byte, h *chainhash.Hash, f []byte) error { - idx := dbTx.Metadata().Bucket(key) + idx := dbTx.Metadata().Bucket(cfIndexParentBucketKey).Bucket(key) return idx.Put(h[:], f) } @@ -68,19 +71,19 @@ func dbStoreFilterHeader(dbTx database.Tx, key []byte, h *chainhash.Hash, fh []b if len(fh) != fastsha256.Size { return errors.New("invalid filter header length") } - idx := dbTx.Metadata().Bucket(key) + idx := dbTx.Metadata().Bucket(cfIndexParentBucketKey).Bucket(key) return idx.Put(h[:], fh) } // dbDeleteFilter deletes a filter's basic or extended filter. func dbDeleteFilter(dbTx database.Tx, key []byte, h *chainhash.Hash) error { - idx := dbTx.Metadata().Bucket(key) + idx := dbTx.Metadata().Bucket(cfIndexParentBucketKey).Bucket(key) return idx.Delete(h[:]) } // dbDeleteFilterHeader deletes a filter's basic or extended filter header. func dbDeleteFilterHeader(dbTx database.Tx, key []byte, h *chainhash.Hash) error { - idx := dbTx.Metadata().Bucket(key) + idx := dbTx.Metadata().Bucket(cfIndexParentBucketKey).Bucket(key) return idx.Delete(h[:]) } @@ -102,7 +105,7 @@ func (idx *CfIndex) Init() error { // Key returns the database key to use for the index as a byte slice. This is // part of the Indexer interface. func (idx *CfIndex) Key() []byte { - return cfBasicIndexKey + return cfIndexParentBucketKey } // Name returns the human-readable name of the index. This is part of the @@ -116,19 +119,23 @@ func (idx *CfIndex) Name() string { // indexes (simple, extended). func (idx *CfIndex) Create(dbTx database.Tx) error { meta := dbTx.Metadata() - _, err := meta.CreateBucket(cfBasicIndexKey) + cfIndexParentBucket, err := meta.CreateBucket(cfIndexParentBucketKey) if err != nil { return err } - _, err = meta.CreateBucket(cfBasicHeaderKey) + _, err = cfIndexParentBucket.CreateBucket(cfBasicIndexKey) if err != nil { return err } - _, err = meta.CreateBucket(cfExtendedIndexKey) + _, err = cfIndexParentBucket.CreateBucket(cfBasicHeaderKey) if err != nil { return err } - _, err = meta.CreateBucket(cfExtendedHeaderKey) + _, err = cfIndexParentBucket.CreateBucket(cfExtendedIndexKey) + if err != nil { + return err + } + _, err = cfIndexParentBucket.CreateBucket(cfExtendedHeaderKey) if err != nil { return err } @@ -317,18 +324,5 @@ func NewCfIndex(db database.DB, chainParams *chaincfg.Params) *CfIndex { // DropCfIndex drops the CF index from the provided database if exists. func DropCfIndex(db database.DB) error { - err := dropIndex(db, cfBasicIndexKey, cfIndexName) - if err != nil { - return err - } - err = dropIndex(db, cfBasicHeaderKey, cfIndexName) - if err != nil { - return err - } - err = dropIndex(db, cfExtendedIndexKey, cfIndexName) - if err != nil { - return err - } - err = dropIndex(db, cfExtendedHeaderKey, cfIndexName) - return err + return dropIndex(db, cfIndexParentBucketKey, cfIndexName) } diff --git a/blockchain/indexers/manager.go b/blockchain/indexers/manager.go index cc996dd8..a7d2af20 100644 --- a/blockchain/indexers/manager.go +++ b/blockchain/indexers/manager.go @@ -609,37 +609,90 @@ func dropIndex(db database.DB, idxKey []byte, idxName string, interrupt <-chan s // the bucket in a single database transaction would result in massive // memory usage and likely crash many systems due to ulimits. In order // to avoid this, use a cursor to delete a maximum number of entries out - // of the bucket at a time. + // of the bucket at a time. Recurse buckets depth-first to delete any + // sub-buckets. const maxDeletions = 2000000 var totalDeleted uint64 - for numDeleted := maxDeletions; numDeleted == maxDeletions; { - numDeleted = 0 - err := db.Update(func(dbTx database.Tx) error { - bucket := dbTx.Metadata().Bucket(idxKey) - cursor := bucket.Cursor() - for ok := cursor.First(); ok; ok = cursor.Next() && - numDeleted < maxDeletions { - if err := cursor.Delete(); err != nil { - return err - } - numDeleted++ - } - return nil - }) - if err != nil { - return err + // Recurse through all buckets in the index, cataloging each for + // later deletion. + var subBuckets [][][]byte + var subBucketClosure func(database.Tx, []byte, [][]byte) error + subBucketClosure = func(dbTx database.Tx, + subBucket []byte, tlBucket [][]byte) error { + // Get full bucket name and append to subBuckets for later + // deletion. + var bucketName [][]byte + if (tlBucket == nil) || (len(tlBucket) == 0) { + bucketName = append(bucketName, subBucket) + } else { + bucketName = append(tlBucket, subBucket) } + subBuckets = append(subBuckets, bucketName) + // Recurse sub-buckets to append to subBuckets slice. + bucket := dbTx.Metadata() + for _, subBucketName := range bucketName { + bucket = bucket.Bucket(subBucketName) + } + return bucket.ForEachBucket(func(k []byte) error { + return subBucketClosure(dbTx, k, bucketName) + }) + } - if numDeleted > 0 { - totalDeleted += uint64(numDeleted) - log.Infof("Deleted %d keys (%d total) from %s", - numDeleted, totalDeleted, idxName) + // Call subBucketClosure with top-level bucket. + err = db.View(func(dbTx database.Tx) error { + return subBucketClosure(dbTx, idxKey, nil) + }) + if err != nil { + return nil + } + + // Iterate through each sub-bucket in reverse, deepest-first, deleting + // all keys inside them and then dropping the buckets themselves. + for i := range subBuckets { + bucketName := subBuckets[len(subBuckets)-1-i] + // Delete maxDeletions key/value pairs at a time. + for numDeleted := maxDeletions; numDeleted == maxDeletions; { + numDeleted = 0 + err := db.Update(func(dbTx database.Tx) error { + subBucket := dbTx.Metadata() + for _, subBucketName := range bucketName { + subBucket = subBucket.Bucket(subBucketName) + } + cursor := subBucket.Cursor() + for ok := cursor.First(); ok; ok = cursor.Next() && + numDeleted < maxDeletions { + + if err := cursor.Delete(); err != nil { + return err + } + numDeleted++ + } + return nil + }) + if err != nil { + return err + } + + if numDeleted > 0 { + totalDeleted += uint64(numDeleted) + log.Infof("Deleted %d keys (%d total) from %s", + numDeleted, totalDeleted, idxName) + } } if interruptRequested(interrupt) { return errInterruptRequested } + + // Drop the bucket itself. + err = db.Update(func(dbTx database.Tx) error { + bucket := dbTx.Metadata() + for j := 0; j < len(bucketName)-1; j++ { + bucket = bucket.Bucket(bucketName[j]) + } + return bucket.DeleteBucket(bucketName[len(bucketName)-1]) + }) } // Call extra index specific deinitialization for the transaction index. @@ -658,10 +711,6 @@ func dropIndex(db database.DB, idxKey []byte, idxName string, interrupt <-chan s return err } - if err := meta.DeleteBucket(idxKey); err != nil { - return err - } - return indexesBucket.Delete(indexDropKey(idxKey)) }) if err != nil {