diff --git a/database/db.go b/database/db.go index 2b8a3fae..2486a3fc 100644 --- a/database/db.go +++ b/database/db.go @@ -14,7 +14,7 @@ import ( // Errors that the various database functions may return. var ( - ErrAddrIndexDoesNotExist = errors.New("address index hasn't been built up yet") + ErrAddrIndexDoesNotExist = errors.New("address index hasn't been built or is an older version") ErrUnsupportedAddressType = errors.New("address type is not supported " + "by the address-index") ErrPrevShaMissing = errors.New("previous sha missing from database") diff --git a/database/ldb/block.go b/database/ldb/block.go index ac755ca7..7e2f8ff2 100644 --- a/database/ldb/block.go +++ b/database/ldb/block.go @@ -286,6 +286,28 @@ func (db *LevelDb) NewestSha() (rsha *wire.ShaHash, rblkid int64, err error) { return &sha, db.lastBlkIdx, nil } +// checkAddrIndexVersion returns an error if the address index version stored +// in the database is less than the current version, or if it doesn't exist. +// This function is used on startup to signal OpenDB to drop the address index +// if it's in an old, incompatible format. +func (db *LevelDb) checkAddrIndexVersion() error { + db.dbLock.Lock() + defer db.dbLock.Unlock() + + data, err := db.lDb.Get(addrIndexVersionKey, db.ro) + if err != nil { + return database.ErrAddrIndexDoesNotExist + } + + indexVersion := binary.LittleEndian.Uint16(data) + + if indexVersion != uint16(addrIndexCurrentVersion) { + return database.ErrAddrIndexDoesNotExist + } + + return nil +} + // fetchAddrIndexTip returns the last block height and block sha to be indexed. // Meta-data about the address tip is currently cached in memory, and will be // updated accordingly by functions that modify the state. This function is diff --git a/database/ldb/internal_test.go b/database/ldb/internal_test.go index 7601abc6..c0a954bd 100644 --- a/database/ldb/internal_test.go +++ b/database/ldb/internal_test.go @@ -28,7 +28,7 @@ func TestAddrIndexKeySerialization(t *testing.T) { } serializedKey := addrIndexToKey(&fakeIndex) - copy(packedIndex[:], serializedKey[22:34]) + copy(packedIndex[:], serializedKey[23:35]) unpackedIndex := unpackTxIndex(packedIndex) if unpackedIndex.blkHeight != fakeIndex.blkHeight { diff --git a/database/ldb/leveldb.go b/database/ldb/leveldb.go index 11632bbf..6440b5b6 100644 --- a/database/ldb/leveldb.go +++ b/database/ldb/leveldb.go @@ -80,6 +80,9 @@ func parseArgs(funcName string, args ...interface{}) (string, error) { return dbPath, nil } +// CurrentDBVersion is the database version. +var CurrentDBVersion int32 = 1 + // OpenDB opens an existing database for use. func OpenDB(args ...interface{}) (database.Db, error) { dbpath, err := parseArgs("OpenDB", args...) @@ -141,10 +144,20 @@ blocknarrow: } } + log.Infof("Checking address index") + // Load the last block whose transactions have been indexed by address. if sha, idx, err := ldb.fetchAddrIndexTip(); err == nil { - ldb.lastAddrIndexBlkSha = *sha - ldb.lastAddrIndexBlkIdx = idx + if err = ldb.checkAddrIndexVersion(); err == nil { + ldb.lastAddrIndexBlkSha = *sha + ldb.lastAddrIndexBlkIdx = idx + log.Infof("Address index good, continuing") + } else { + log.Infof("Address index in old, incompatible format, dropping...") + ldb.deleteOldAddrIndex() + ldb.DeleteAddrIndex() + log.Infof("Old, incompatible address index dropped and can now be rebuilt") + } } else { ldb.lastAddrIndexBlkIdx = -1 } @@ -156,9 +169,6 @@ blocknarrow: return db, nil } -// CurrentDBVersion is the database version. -var CurrentDBVersion int32 = 1 - func openDB(dbpath string, create bool) (pbdb database.Db, err error) { var db LevelDb var tlDb *leveldb.DB @@ -630,15 +640,20 @@ func shaBlkToKey(sha *wire.ShaHash) []byte { return shaB } +// These are used here and in tx.go's deleteOldAddrIndex() to prevent deletion +// of indexes other than the addrindex now. +var recordSuffixTx = []byte{'t', 'x'} +var recordSuffixSpentTx = []byte{'s', 'x'} + func shaTxToKey(sha *wire.ShaHash) []byte { shaB := sha.Bytes() - shaB = append(shaB, "tx"...) + shaB = append(shaB, recordSuffixTx...) return shaB } func shaSpentTxToKey(sha *wire.ShaHash) []byte { shaB := sha.Bytes() - shaB = append(shaB, "sx"...) + shaB = append(shaB, recordSuffixSpentTx...) return shaB } diff --git a/database/ldb/tx.go b/database/ldb/tx.go index eedce301..4f834e76 100644 --- a/database/ldb/tx.go +++ b/database/ldb/tx.go @@ -22,18 +22,27 @@ const ( // -------------------------------------------------------- // | Prefix | Hash160 | BlkHeight | Tx Offset | Tx Size | // -------------------------------------------------------- - // | 2 bytes | 20 bytes | 4 bytes | 4 bytes | 4 bytes | + // | 3 bytes | 20 bytes | 4 bytes | 4 bytes | 4 bytes | // -------------------------------------------------------- - addrIndexKeyLength = 2 + ripemd160.Size + 4 + 4 + 4 + addrIndexKeyLength = 3 + ripemd160.Size + 4 + 4 + 4 batchDeleteThreshold = 10000 + + addrIndexCurrentVersion = 1 ) var addrIndexMetaDataKey = []byte("addrindex") // All address index entries share this prefix to facilitate the use of // iterators. -var addrIndexKeyPrefix = []byte("a-") +var addrIndexKeyPrefix = []byte("a+-") + +// Address index version is required to drop/rebuild address index if version +// is older than current as the format of the index may have changed. This is +// true when going from no version to version 1 as the address index is stored +// as big endian in version 1 and little endian in the original code. Version +// is stored as two bytes, little endian (to match all the code but the index). +var addrIndexVersionKey = []byte("addrindexversion") type txUpdateObj struct { txSha *wire.ShaHash @@ -372,15 +381,19 @@ func (db *LevelDb) FetchTxBySha(txsha *wire.ShaHash) ([]*database.TxListReply, e } // addrIndexToKey serializes the passed txAddrIndex for storage within the DB. +// We want to use BigEndian to store at least block height and TX offset +// in order to ensure that the transactions are sorted in the index. +// This gives us the ability to use the index in more client-side +// applications that are order-dependent (specifically by dependency). func addrIndexToKey(index *txAddrIndex) []byte { record := make([]byte, addrIndexKeyLength, addrIndexKeyLength) - copy(record[0:2], addrIndexKeyPrefix) - copy(record[2:22], index.hash160[:]) + copy(record[0:3], addrIndexKeyPrefix) + copy(record[3:23], index.hash160[:]) // The index itself. - binary.LittleEndian.PutUint32(record[22:26], uint32(index.blkHeight)) - binary.LittleEndian.PutUint32(record[26:30], uint32(index.txoffset)) - binary.LittleEndian.PutUint32(record[30:34], uint32(index.txlen)) + binary.BigEndian.PutUint32(record[23:27], uint32(index.blkHeight)) + binary.BigEndian.PutUint32(record[27:31], uint32(index.txoffset)) + binary.BigEndian.PutUint32(record[31:35], uint32(index.txlen)) return record } @@ -388,9 +401,9 @@ func addrIndexToKey(index *txAddrIndex) []byte { // unpackTxIndex deserializes the raw bytes of a address tx index. func unpackTxIndex(rawIndex [12]byte) *txAddrIndex { return &txAddrIndex{ - blkHeight: int64(binary.LittleEndian.Uint32(rawIndex[0:4])), - txoffset: int(binary.LittleEndian.Uint32(rawIndex[4:8])), - txlen: int(binary.LittleEndian.Uint32(rawIndex[8:12])), + blkHeight: int64(binary.BigEndian.Uint32(rawIndex[0:4])), + txoffset: int(binary.BigEndian.Uint32(rawIndex[4:8])), + txlen: int(binary.BigEndian.Uint32(rawIndex[8:12])), } } @@ -446,9 +459,9 @@ func (db *LevelDb) FetchTxsForAddr(addr btcutil.Address, skip int, } // Create the prefix for our search. - addrPrefix := make([]byte, 22, 22) - copy(addrPrefix[0:2], addrIndexKeyPrefix) - copy(addrPrefix[2:22], addrKey) + addrPrefix := make([]byte, 23, 23) + copy(addrPrefix[0:3], addrIndexKeyPrefix) + copy(addrPrefix[3:23], addrKey) iter := db.lDb.NewIterator(bytesPrefix(addrPrefix), nil) for skip != 0 && iter.Next() { @@ -459,7 +472,7 @@ func (db *LevelDb) FetchTxsForAddr(addr btcutil.Address, skip int, var replies []*database.TxListReply var rawIndex [12]byte for iter.Next() && limit != 0 { - copy(rawIndex[:], iter.Key()[22:34]) + copy(rawIndex[:], iter.Key()[23:35]) addrIndex := unpackTxIndex(rawIndex) tx, blkSha, blkHeight, _, err := db.fetchTxDataByLoc(addrIndex.blkHeight, @@ -528,6 +541,12 @@ func (db *LevelDb) UpdateAddrIndexForBlock(blkSha *wire.ShaHash, blkHeight int64 binary.LittleEndian.PutUint64(newIndexTip[32:40], uint64(blkHeight)) batch.Put(addrIndexMetaDataKey, newIndexTip) + // Ensure we're writing an address index version + newIndexVersion := make([]byte, 2, 2) + binary.LittleEndian.PutUint16(newIndexVersion[0:2], + uint16(addrIndexCurrentVersion)) + batch.Put(addrIndexVersionKey, newIndexVersion) + if err := db.lDb.Write(batch, db.wo); err != nil { return err } @@ -552,9 +571,12 @@ func (db *LevelDb) DeleteAddrIndex() error { numInBatch := 0 for iter.Next() { key := iter.Key() - batch.Delete(key) - - numInBatch++ + // With a 24-bit index key prefix, 1 in every 2^24 keys is a collision. + // We check the length to make sure we only delete address index keys. + if len(key) == addrIndexKeyLength { + batch.Delete(key) + numInBatch++ + } // Delete in chunks to potentially avoid very large batches. if numInBatch >= batchDeleteThreshold { @@ -572,6 +594,61 @@ func (db *LevelDb) DeleteAddrIndex() error { } batch.Delete(addrIndexMetaDataKey) + batch.Delete(addrIndexVersionKey) + + if err := db.lDb.Write(batch, db.wo); err != nil { + return err + } + + db.lastAddrIndexBlkIdx = -1 + db.lastAddrIndexBlkSha = wire.ShaHash{} + + return nil +} + +// deleteOldAddrIndex deletes the entire addrindex stored within the DB for a +// 2-byte addrIndexKeyPrefix. It also resets the cached in-memory metadata about +// the addr index. +func (db *LevelDb) deleteOldAddrIndex() error { + db.dbLock.Lock() + defer db.dbLock.Unlock() + + batch := db.lBatch() + defer batch.Reset() + + // Delete the entire index along with any metadata about it. + iter := db.lDb.NewIterator(bytesPrefix([]byte("a-")), db.ro) + numInBatch := 0 + for iter.Next() { + key := iter.Key() + // With a 24-bit index key prefix, 1 in every 2^24 keys is a collision. + // We check the length to make sure we only delete address index keys. + // We also check the last two bytes to make sure the suffix doesn't + // match other types of index that are 34 bytes long. + if len(key) == 34 && !bytes.HasSuffix(key, recordSuffixTx) && + !bytes.HasSuffix(key, recordSuffixSpentTx) { + batch.Delete(key) + numInBatch++ + } + + // Delete in chunks to potentially avoid very large batches. + if numInBatch >= batchDeleteThreshold { + if err := db.lDb.Write(batch, db.wo); err != nil { + iter.Release() + return err + } + batch.Reset() + numInBatch = 0 + } + } + iter.Release() + if err := iter.Error(); err != nil { + return err + } + + batch.Delete(addrIndexMetaDataKey) + batch.Delete(addrIndexVersionKey) + if err := db.lDb.Write(batch, db.wo); err != nil { return err } diff --git a/mempool.go b/mempool.go index 5c1004fd..59667375 100644 --- a/mempool.go +++ b/mempool.go @@ -97,7 +97,7 @@ type txMemPool struct { pool map[wire.ShaHash]*TxDesc orphans map[wire.ShaHash]*btcutil.Tx orphansByPrev map[wire.ShaHash]*list.List - addrindex map[string]map[*btcutil.Tx]struct{} // maps address to txs + addrindex map[string]map[wire.ShaHash]struct{} // maps address to txs outpoints map[wire.OutPoint]*btcutil.Tx lastUpdated time.Time // last time pool was updated pennyTotal float64 // exponentially decaying total for penny spends. @@ -653,7 +653,7 @@ func (mp *txMemPool) removeScriptFromAddrIndex(pkScript []byte, tx *btcutil.Tx) return err } for _, addr := range addresses { - delete(mp.addrindex[addr.EncodeAddress()], tx) + delete(mp.addrindex[addr.EncodeAddress()], *tx.Sha()) } return nil @@ -777,9 +777,9 @@ func (mp *txMemPool) indexScriptAddressToTx(pkScript []byte, tx *btcutil.Tx) err for _, addr := range addresses { if mp.addrindex[addr.EncodeAddress()] == nil { - mp.addrindex[addr.EncodeAddress()] = make(map[*btcutil.Tx]struct{}) + mp.addrindex[addr.EncodeAddress()] = make(map[wire.ShaHash]struct{}) } - mp.addrindex[addr.EncodeAddress()][tx] = struct{}{} + mp.addrindex[addr.EncodeAddress()][*tx.Sha()] = struct{}{} } return nil @@ -965,8 +965,10 @@ func (mp *txMemPool) FilterTransactionsByAddress(addr btcutil.Address) ([]*btcut if txs, exists := mp.addrindex[addr.EncodeAddress()]; exists { addressTxs := make([]*btcutil.Tx, 0, len(txs)) - for tx := range txs { - addressTxs = append(addressTxs, tx) + for txHash := range txs { + if tx, exists := mp.pool[txHash]; exists { + addressTxs = append(addressTxs, tx.Tx) + } } return addressTxs, nil } @@ -1494,7 +1496,7 @@ func newTxMemPool(server *server) *txMemPool { outpoints: make(map[wire.OutPoint]*btcutil.Tx), } if cfg.AddrIndex { - memPool.addrindex = make(map[string]map[*btcutil.Tx]struct{}) + memPool.addrindex = make(map[string]map[wire.ShaHash]struct{}) } return memPool } diff --git a/rpcserver.go b/rpcserver.go index 87791f31..7d4b017e 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -2598,15 +2598,6 @@ func handleSearchRawTransactions(s *rpcServer, cmd interface{}, closeChan <-chan var addressTxs []*database.TxListReply - // First check the mempool for relevent transactions. - memPoolTxs, err := s.server.txMemPool.FilterTransactionsByAddress(addr) - if err == nil && len(memPoolTxs) != 0 { - for _, tx := range memPoolTxs { - txReply := &database.TxListReply{Tx: tx.MsgTx(), Sha: tx.Sha()} - addressTxs = append(addressTxs, txReply) - } - } - var numRequested, numToSkip int if c.Count != nil { numRequested = *c.Count @@ -2620,17 +2611,31 @@ func handleSearchRawTransactions(s *rpcServer, cmd interface{}, closeChan <-chan numToSkip = 0 } } - if len(addressTxs) >= numRequested { - // Tx's in the mempool exceed the requested number of tx's. - // Slice off any possible overflow. - addressTxs = addressTxs[:numRequested] - } else { - // Otherwise, we'll also take a look into the database. - dbTxs, err := s.server.db.FetchTxsForAddr(addr, numToSkip, - numRequested-len(addressTxs)) - if err == nil && len(dbTxs) != 0 { - for _, txReply := range dbTxs { + + // While it's more efficient to check the mempool for relevant transactions + // first, we want to return results in order of occurrence/dependency so + // we'll check the mempool only if there aren't enough results returned + // by the database. + dbTxs, err := s.server.db.FetchTxsForAddr(addr, numToSkip, + numRequested-len(addressTxs)) + if err == nil { + for _, txReply := range dbTxs { + addressTxs = append(addressTxs, txReply) + } + } + + // This code (and txMemPool.FilterTransactionsByAddress()) doesn't sort by + // dependency. This might be something we want to do in the future when we + // return results for the client's convenience, or leave it to the client. + if len(addressTxs) < numRequested { + memPoolTxs, err := s.server.txMemPool.FilterTransactionsByAddress(addr) + if err == nil { + for _, tx := range memPoolTxs { + txReply := &database.TxListReply{Tx: tx.MsgTx(), Sha: tx.Sha()} addressTxs = append(addressTxs, txReply) + if len(addressTxs) == numRequested { + break + } } } }