better separation between filters; comments

This commit is contained in:
pedro martelletto 2017-01-19 14:55:39 +00:00 committed by Olaoluwa Osuntokun
parent 57995fd111
commit 396d28955c
2 changed files with 80 additions and 83 deletions

View file

@ -5,15 +5,11 @@
package indexers package indexers
import ( import (
"fmt"
"github.com/btcsuite/btcd/blockchain" "github.com/btcsuite/btcd/blockchain"
"github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/database" "github.com/btcsuite/btcd/database"
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/btcsuite/btcutil/gcs/builder" "github.com/btcsuite/btcutil/gcs/builder"
"os"
) )
const ( const (
@ -21,67 +17,86 @@ const (
cfIndexName = "committed filter index" cfIndexName = "committed filter index"
) )
// Committed filters come in two flavours: basic and extended. They are
// generated and dropped in pairs, and both are indexed by a block's hash.
// Besides holding different content, they also live in different buckets.
var ( var (
// cfBasicIndexKey is the name of the db bucket used to house the // cfBasicIndexKey is the name of the db bucket used to house the
// block hash -> Basic CF index (CF #0). // block hash -> Basic cf index (cf#0).
cfBasicIndexKey = []byte("cf0byhashidx") cfBasicIndexKey = []byte("cf0byhashidx")
// cfExtendedIndexKey is the name of the db bucket used to house the // cfExtendedIndexKey is the name of the db bucket used to house the
// block hash -> Extended CF index (CF #1). // block hash -> Extended cf index (cf#1).
cfExtendedIndexKey = []byte("cf1byhashidx") cfExtendedIndexKey = []byte("cf1byhashidx")
) )
func dbFetchCFIndexEntry(dbTx database.Tx, blockHash *chainhash.Hash) ([]byte, // dbFetchBasicEntry() retrieves a block's basic filter. An entry's absence is
error) { // not considered an error. The filter is returned serialized.
// Load the record from the database and return now if it doesn't exist. func dbFetchBasicEntry(dbTx database.Tx, h *chainhash.Hash) ([]byte, error) {
index := dbTx.Metadata().Bucket(cfBasicIndexKey) idx := dbTx.Metadata().Bucket(cfBasicIndexKey)
serializedFilter := index.Get(blockHash[:]) return idx.Get(h[:]), nil
if len(serializedFilter) == 0 {
return nil, nil
} }
return serializedFilter, nil // dbFetchExtendedEntry() retrieves a block's extended filter. An entry's
// absence is not considered an error. The filter is returned serialized.
func dbFetchExtendedEntry(dbTx database.Tx, h *chainhash.Hash) ([]byte, error) {
idx := dbTx.Metadata().Bucket(cfExtendedIndexKey)
return idx.Get(h[:]), nil
} }
// The serialized format for keys and values in the block hash to CF bucket is: // dbStoreBasicEntry() stores a block's basic filter.
// <hash> = <CF> func dbStoreBasicEntry(dbTx database.Tx, h *chainhash.Hash, f []byte) error {
// idx := dbTx.Metadata().Bucket(cfBasicIndexKey)
// Field Type Size return idx.Put(h[:], f)
// hash chainhash.Hash 32 bytes }
// CF []byte variable
// -----
// Total: > 32 bytes
// CFIndex implements a CF by hash index. // dbStoreBasicEntry() stores a block's extended filter.
type CFIndex struct { func dbStoreExtendedEntry(dbTx database.Tx, h *chainhash.Hash, f []byte) error {
idx := dbTx.Metadata().Bucket(cfExtendedIndexKey)
return idx.Put(h[:], f)
}
// dbDeleteBasicEntry() deletes a block's basic filter.
func dbDeleteBasicEntry(dbTx database.Tx, h *chainhash.Hash) error {
idx := dbTx.Metadata().Bucket(cfBasicIndexKey)
return idx.Delete(h[:])
}
// dbDeleteExtendedEntry() deletes a block's extended filter.
func dbDeleteExtendedEntry(dbTx database.Tx, h *chainhash.Hash) error {
idx := dbTx.Metadata().Bucket(cfExtendedIndexKey)
return idx.Delete(h[:])
}
// CfIndex implements a committed filter (cf) by hash index.
type CfIndex struct {
db database.DB db database.DB
} }
// Ensure the CFIndex type implements the Indexer interface. // Ensure the CfIndex type implements the Indexer interface.
var _ Indexer = (*CFIndex)(nil) var _ Indexer = (*CfIndex)(nil)
// Init initializes the hash-based CF index. // Init initializes the hash-based cf index. This is part of the Indexer
// // interface.
// This is part of the Indexer interface. func (idx *CfIndex) Init() error {
func (idx *CFIndex) Init() error { return nil // Nothing to do.
return nil
} }
// Key returns the database key to use for the index as a byte slice. // Key returns the database key to use for the index as a byte slice. This is
func (idx *CFIndex) Key() []byte { // part of the Indexer interface.
func (idx *CfIndex) Key() []byte {
return cfBasicIndexKey return cfBasicIndexKey
} }
// Name returns the human-readable name of the index. // Name returns the human-readable name of the index. This is part of the
// // Indexer interface.
// This is part of the Indexer interface. func (idx *CfIndex) Name() string {
func (idx *CFIndex) Name() string {
return cfIndexName return cfIndexName
} }
// Create is invoked when the indexer manager determines the index needs to be // Create is invoked when the indexer manager determines the index needs to be
// created for the first time. It creates buckets for the two hash-based CF // created for the first time. It creates buckets for the two hash-based cf
// indexes (simple, extended). // indexes (simple, extended).
func (idx *CFIndex) Create(dbTx database.Tx) error { func (idx *CfIndex) Create(dbTx database.Tx) error {
meta := dbTx.Metadata() meta := dbTx.Metadata()
_, err := meta.CreateBucket(cfBasicIndexKey) _, err := meta.CreateBucket(cfBasicIndexKey)
if err != nil { if err != nil {
@ -91,7 +106,9 @@ func (idx *CFIndex) Create(dbTx database.Tx) error {
return err return err
} }
func generateFilterForBlock(block *btcutil.Block) ([]byte, error) { // makeBasicFilter() builds a block's basic filter, which consists of all
// outpoints referenced by transactions within the block.
func makeBasicFilterForBlock(block *btcutil.Block) ([]byte, error) {
b := builder.WithKeyHash(block.Hash()) b := builder.WithKeyHash(block.Hash())
_, err := b.Key() _, err := b.Key()
if err != nil { if err != nil {
@ -110,67 +127,47 @@ func generateFilterForBlock(block *btcutil.Block) ([]byte, error) {
} }
// ConnectBlock is invoked by the index manager when a new block has been // ConnectBlock is invoked by the index manager when a new block has been
// connected to the main chain. This indexer adds a hash-to-CF mapping for // connected to the main chain. This indexer adds a hash-to-cf mapping for
// every passed block. // every passed block. This is part of the Indexer interface.
// func (idx *CfIndex) ConnectBlock(dbTx database.Tx, block *btcutil.Block,
// This is part of the Indexer interface.
func (idx *CFIndex) ConnectBlock(dbTx database.Tx, block *btcutil.Block,
view *blockchain.UtxoViewpoint) error { view *blockchain.UtxoViewpoint) error {
filterBytes, err := generateFilterForBlock(block) f, err := makeBasicFilterForBlock(block)
if err != nil { if err != nil {
return err return err
} }
return dbStoreBasicEntry(dbTx, block.Hash(), f)
meta := dbTx.Metadata()
index := meta.Bucket(cfBasicIndexKey)
err = index.Put(block.Hash()[:], filterBytes)
if err != nil {
return err
}
fmt.Fprintf(os.Stderr, "Stored CF for block %v", block.Hash())
return nil
} }
// DisconnectBlock is invoked by the index manager when a block has been // DisconnectBlock is invoked by the index manager when a block has been
// disconnected from the main chain. This indexer removes the hash-to-CF // disconnected from the main chain. This indexer removes the hash-to-cf
// mapping for every passed block. // mapping for every passed block. This is part of the Indexer interface.
// func (idx *CfIndex) DisconnectBlock(dbTx database.Tx, block *btcutil.Block,
// This is part of the Indexer interface.
func (idx *CFIndex) DisconnectBlock(dbTx database.Tx, block *btcutil.Block,
view *blockchain.UtxoViewpoint) error { view *blockchain.UtxoViewpoint) error {
index := dbTx.Metadata().Bucket(cfBasicIndexKey) return dbDeleteBasicEntry(dbTx, block.Hash())
filterBytes := index.Get(block.Hash()[:])
if len(filterBytes) == 0 {
return fmt.Errorf("can't remove non-existent filter %s from " +
"the cfilter index", block.Hash())
}
return index.Delete(block.Hash()[:])
} }
func (idx *CFIndex) FilterByBlockHash(hash *chainhash.Hash) ([]byte, error) { func (idx *CfIndex) FilterByBlockHash(hash *chainhash.Hash) ([]byte, error) {
var filterBytes []byte var filterBytes []byte
err := idx.db.View(func(dbTx database.Tx) error { err := idx.db.View(func(dbTx database.Tx) error {
var err error var err error
filterBytes, err = dbFetchCFIndexEntry(dbTx, hash) filterBytes, err = dbFetchBasicEntry(dbTx, hash)
return err return err
}) })
return filterBytes, err return filterBytes, err
} }
// NewCFIndex returns a new instance of an indexer that is used to create a // NewCfIndex returns a new instance of an indexer that is used to create a
// mapping of the hashes of all blocks in the blockchain to their respective // mapping of the hashes of all blocks in the blockchain to their respective
// committed bloom filters. // committed filters.
// //
// It implements the Indexer interface which plugs into the IndexManager that in // It implements the Indexer interface which plugs into the IndexManager that in
// turn is used by the blockchain package. This allows the index to be // turn is used by the blockchain package. This allows the index to be
// seamlessly maintained along with the chain. // seamlessly maintained along with the chain.
func NewCFIndex(db database.DB) *CFIndex { func NewCfIndex(db database.DB) *CfIndex {
return &CFIndex{db: db} return &CfIndex{db: db}
} }
// DropCFIndex drops the CF index from the provided database if exists. // DropCfIndex drops the CF index from the provided database if exists.
func DropCFIndex(db database.DB) error { func DropCfIndex(db database.DB) error {
return dropIndex(db, cfBasicIndexKey, cfIndexName) return dropIndex(db, cfBasicIndexKey, cfIndexName)
} }

View file

@ -229,7 +229,7 @@ type server struct {
// do not need to be protected for concurrent access. // do not need to be protected for concurrent access.
txIndex *indexers.TxIndex txIndex *indexers.TxIndex
addrIndex *indexers.AddrIndex addrIndex *indexers.AddrIndex
cfIndex *indexers.CFIndex cfIndex *indexers.CfIndex
} }
// serverPeer extends the peer to maintain state shared by the server and // serverPeer extends the peer to maintain state shared by the server and
@ -2244,8 +2244,8 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param
indexes = append(indexes, s.addrIndex) indexes = append(indexes, s.addrIndex)
} }
if !cfg.NoCFilters { if !cfg.NoCFilters {
indxLog.Info("CF index is enabled") indxLog.Info("cf index is enabled")
s.cfIndex = indexers.NewCFIndex(db) s.cfIndex = indexers.NewCfIndex(db)
indexes = append(indexes, s.cfIndex) indexes = append(indexes, s.cfIndex)
} }