Fix rescans across wallet process restarts.

This change immediately writes a new empty transaction store out to
disk if the old one could not be read.  Since old transaction store
versions are not read in at start, and were previously not written out
until new transaction history was received, it was possible that a
full rescan started and finished without ever marking a synced tx
history for the next wallet start.
This commit is contained in:
Josh Rickmar 2014-05-30 15:17:51 -05:00
parent c7200659d1
commit 368204a58a
5 changed files with 78 additions and 143 deletions

View file

@ -34,8 +34,7 @@ import (
// addresses and keys), and tx and utxo stores, and a mutex to prevent // addresses and keys), and tx and utxo stores, and a mutex to prevent
// incorrect multiple access. // incorrect multiple access.
type Account struct { type Account struct {
name string name string
fullRescan bool
*wallet.Wallet *wallet.Wallet
TxStore *txstore.Store TxStore *txstore.Store
} }
@ -466,16 +465,7 @@ func (a *Account) Track() {
func (a *Account) RescanActiveJob() (*RescanJob, error) { func (a *Account) RescanActiveJob() (*RescanJob, error) {
// Determine the block necesary to start the rescan for all active // Determine the block necesary to start the rescan for all active
// addresses. // addresses.
height := int32(0) height := a.SyncHeight()
if a.fullRescan {
// Need to perform a complete rescan since the wallet creation
// block.
height = a.EarliestBlockHeight()
} else {
// The last synced block height should be used the starting
// point for block rescanning. Grab the block stamp here.
height = a.SyncHeight()
}
actives := a.SortedActiveAddresses() actives := a.SortedActiveAddresses()
addrs := make([]btcutil.Address, 0, len(actives)) addrs := make([]btcutil.Address, 0, len(actives))

View file

@ -25,6 +25,7 @@ import (
"github.com/conformal/btcwallet/txstore" "github.com/conformal/btcwallet/txstore"
"github.com/conformal/btcwallet/wallet" "github.com/conformal/btcwallet/wallet"
"github.com/conformal/btcwire" "github.com/conformal/btcwire"
"io/ioutil"
"os" "os"
"strings" "strings"
) )
@ -198,38 +199,86 @@ func openSavedAccount(name string, cfg *config) (*Account, error) {
msg := fmt.Sprintf("cannot open wallet file: %s", err) msg := fmt.Sprintf("cannot open wallet file: %s", err)
return nil, &walletOpenError{msg} return nil, &walletOpenError{msg}
} }
defer func() { closeWallet := func() {
if err := wfile.Close(); err != nil { if err := wfile.Close(); err != nil {
log.Warnf("Cannot close wallet file: %v", err) log.Warnf("Cannot close wallet file: %v", err)
} }
}() }
if _, err = wlt.ReadFrom(wfile); err != nil { if _, err = wlt.ReadFrom(wfile); err != nil {
msg := fmt.Sprintf("cannot read wallet: %s", err) msg := fmt.Sprintf("cannot read wallet: %s", err)
closeWallet()
return nil, &walletOpenError{msg} return nil, &walletOpenError{msg}
} }
// Read tx file. If this fails, return a errNoTxs error and let // Read tx file. If this fails, return a errNoTxs error and let
// the caller decide if a rescan is necessary. // the caller decide if a rescan is necessary.
var finalErr error
txfile, err := os.Open(txfilepath) txfile, err := os.Open(txfilepath)
if err != nil { if err != nil {
log.Errorf("cannot open tx file: %s", err) log.Errorf("Cannot open tx file: %s", err)
a.fullRescan = true
// Mark wallet as unsynced and write back to disk.. Later calls
// to SyncHeight will use the wallet creation height, or possibly
// an earlier height for imported keys.
a.SetSyncedWith(nil)
closeWallet()
tmpwallet, err := ioutil.TempFile(netdir, "wallet.bin")
if err != nil {
log.Errorf("Cannot create temporary wallet: %v", err)
return a, errNoTxs
}
if _, err := wlt.WriteTo(tmpwallet); err != nil {
log.Warnf("Cannot write back unsynced wallet: %v", err)
return a, errNoTxs
}
tmpwalletpath := tmpwallet.Name()
if err := tmpwallet.Close(); err != nil {
log.Warnf("Cannot close temporary wallet file: %v", err)
return a, errNoTxs
}
if err := Rename(tmpwalletpath, wfilepath); err != nil {
log.Warnf("Cannot move temporary wallet file: %v", err)
return a, errNoTxs
}
// Create and write empty txstore, if it doesn't exist.
if !fileExists(txfilepath) {
txfile, err = os.Create(txfilepath)
if err != nil {
log.Warnf("Cannot create new new txstore file: %v", err)
return a, errNoTxs
}
defer func() {
if err := txfile.Close(); err != nil {
log.Warnf("Cannot close txstore file: %v", err)
}
}()
if _, err := txs.WriteTo(txfile); err != nil {
log.Warnf("Cannot write new txstore file: %v", err)
}
}
return a, errNoTxs return a, errNoTxs
} }
defer closeWallet()
defer func() { defer func() {
if err := txfile.Close(); err != nil { if err := txfile.Close(); err != nil {
log.Warnf("Cannot close txstore file: %v", err) log.Warnf("Cannot close txstore file: %v", err)
} }
}() }()
if _, err = txs.ReadFrom(txfile); err != nil { if _, err = txs.ReadFrom(txfile); err != nil {
log.Errorf("cannot read tx file: %s", err) log.Errorf("Cannot read tx file: %s", err)
a.fullRescan = true
finalErr = errNoTxs // Mark wallet as unsynced. Later calls to SyncHeight will use the
// wallet creation height, or possibly an earlier height for
// imported keys.
a.SetSyncedWith(nil)
if _, err := txs.WriteTo(txfile); err != nil {
log.Warnf("Cannot write new txstore file: %v", err)
}
return a, errNoTxs
} }
return a, finalErr return a, nil
} }
// openAccounts attempts to open all saved accounts. // openAccounts attempts to open all saved accounts.
@ -394,7 +443,6 @@ func (am *AccountManager) rescanListener() {
continue continue
} }
} }
acct.fullRescan = false
am.ds.ScheduleWalletWrite(acct) am.ds.ScheduleWalletWrite(acct)
err := am.ds.FlushAccount(acct) err := am.ds.FlushAccount(acct)
if err != nil { if err != nil {
@ -420,7 +468,6 @@ func (am *AccountManager) rescanListener() {
continue continue
} }
} }
acct.fullRescan = false
am.ds.ScheduleWalletWrite(acct) am.ds.ScheduleWalletWrite(acct)
err := am.ds.FlushAccount(acct) err := am.ds.FlushAccount(acct)
if err != nil { if err != nil {

View file

@ -809,14 +809,8 @@ func Handshake(rpc ServerConn) error {
return nil return nil
} }
log.Warnf("None of the previous saved blocks in btcd chain. Must perform full rescan.")
// Iterator was invalid (wallet has never been synced) or there was a // Iterator was invalid (wallet has never been synced) or there was a
// huge chain fork + reorg (more than 20 blocks). Since we don't know // huge chain fork + reorg (more than 20 blocks).
// what block (if any) this wallet is synced to, roll back everything
// and start a new rescan since the earliest block wallet must know
// about.
a.fullRescan = true
AcctMgr.Track() AcctMgr.Track()
if err := AcctMgr.RescanActiveAddresses(); err != nil { if err := AcctMgr.RescanActiveAddresses(); err != nil {
return err return err

View file

@ -1259,7 +1259,16 @@ func (w *Wallet) SetSyncStatus(a btcutil.Address, s SyncStatus) error {
// Unsynced addresses are unaffected by this method and must be marked // Unsynced addresses are unaffected by this method and must be marked
// as in sync with MarkAddressSynced or MarkAllSynced to be considered // as in sync with MarkAddressSynced or MarkAllSynced to be considered
// in sync with bs. // in sync with bs.
//
// If bs is nil, the entire wallet is marked unsynced.
func (w *Wallet) SetSyncedWith(bs *BlockStamp) { func (w *Wallet) SetSyncedWith(bs *BlockStamp) {
if bs == nil {
w.recent.hashes = w.recent.hashes[:0]
w.recent.lastHeight = w.keyGenerator.firstBlock
w.keyGenerator.setSyncStatus(Unsynced(w.keyGenerator.firstBlock))
return
}
// Check if we're trying to rollback the last seen history. // Check if we're trying to rollback the last seen history.
// If so, and this bs is already saved, remove anything // If so, and this bs is already saved, remove anything
// after and return. Otherwire, remove previous hashes. // after and return. Otherwire, remove previous hashes.
@ -1299,12 +1308,8 @@ func (w *Wallet) SetSyncedWith(bs *BlockStamp) {
// addresses marked as unsynced, whichever is smaller. This is the // addresses marked as unsynced, whichever is smaller. This is the
// height that rescans on an entire wallet should begin at to fully // height that rescans on an entire wallet should begin at to fully
// sync all wallet addresses. // sync all wallet addresses.
func (w *Wallet) SyncHeight() (height int32) { func (w *Wallet) SyncHeight() int32 {
if len(w.recent.hashes) == 0 { height := w.recent.lastHeight
return 0
}
height = w.recent.lastHeight
for _, a := range w.addrMap { for _, a := range w.addrMap {
var syncHeight int32 var syncHeight int32
switch e := a.SyncStatus().(type) { switch e := a.SyncStatus().(type) {
@ -1324,7 +1329,6 @@ func (w *Wallet) SyncHeight() (height int32) {
} }
} }
} }
return height return height
} }
@ -1335,42 +1339,6 @@ func (w *Wallet) NewIterateRecentBlocks() RecentBlockIterator {
return w.recent.NewIterator() return w.recent.NewIterator()
} }
// EarliestBlockHeight returns the height of the blockchain for when any
// wallet address first appeared. This will usually be the block height
// at the time of wallet creation, unless a private key with an earlier
// block height was imported into the wallet. This is needed when
// performing a full rescan to prevent unnecessary rescanning before
// wallet addresses first appeared.
func (w *Wallet) EarliestBlockHeight() int32 {
height := w.keyGenerator.firstBlock
// Imported keys will be the only ones that may have an earlier
// blockchain height. Check each and set the returned height
for _, addr := range w.importedAddrs {
aheight := addr.FirstBlock()
if aheight < height {
height = aheight
// Can't go any lower than 0.
if height == 0 {
break
}
}
}
return height
}
// SetBetterEarliestBlockHeight sets a better earliest block height.
// At wallet creation time, a earliest block is guessed, but this
// could be incorrect if btcd is out of sync. This function can be
// used to correct a previous guess with a better value.
func (w *Wallet) SetBetterEarliestBlockHeight(height int32) {
if height > w.keyGenerator.firstBlock {
w.keyGenerator.firstBlock = height
}
}
// ImportPrivateKey imports a WIF private key into the keystore. The imported // ImportPrivateKey imports a WIF private key into the keystore. The imported
// address is created using either a compressed or uncompressed serialized // address is created using either a compressed or uncompressed serialized
// public key, depending on the CompressPubKey bool of the WIF. // public key, depending on the CompressPubKey bool of the WIF.
@ -1822,13 +1790,6 @@ func (rb *recentBlocks) ReadFrom(r io.Reader) (int64, error) {
return read, errors.New("number of last seen blocks exceeds maximum of 20") return read, errors.New("number of last seen blocks exceeds maximum of 20")
} }
// If number of blocks is 0, our work here is done.
if nBlocks == 0 {
rb.lastHeight = -1
rb.hashes = nil
return read, nil
}
// Read most recently seen block height. // Read most recently seen block height.
var heightBytes [4]byte // 4 bytes for a int32 var heightBytes [4]byte // 4 bytes for a int32
n, err = io.ReadFull(r, heightBytes[:]) n, err = io.ReadFull(r, heightBytes[:])
@ -1876,9 +1837,6 @@ func (rb *recentBlocks) WriteTo(w io.Writer) (int64, error) {
if nBlocks != 0 && rb.lastHeight < 0 { if nBlocks != 0 && rb.lastHeight < 0 {
return written, errors.New("number of block hashes is positive, but height is negative") return written, errors.New("number of block hashes is positive, but height is negative")
} }
if nBlocks == 0 && rb.lastHeight != -1 {
return written, errors.New("no block hashes available, but height is not -1")
}
var nBlockBytes [4]byte // 4 bytes for a uint32 var nBlockBytes [4]byte // 4 bytes for a uint32
binary.LittleEndian.PutUint32(nBlockBytes[:], nBlocks) binary.LittleEndian.PutUint32(nBlockBytes[:], nBlocks)
n, err := w.Write(nBlockBytes[:]) n, err := w.Write(nBlockBytes[:])
@ -1886,10 +1844,6 @@ func (rb *recentBlocks) WriteTo(w io.Writer) (int64, error) {
if err != nil { if err != nil {
return written, err return written, err
} }
// If number of blocks is 0, our work here is done.
if nBlocks == 0 {
return written, nil
}
// Write most recently seen block height. // Write most recently seen block height.
var heightBytes [4]byte // 4 bytes for a int32 var heightBytes [4]byte // 4 bytes for a int32
@ -1921,7 +1875,7 @@ type RecentBlockIterator interface {
} }
func (rb *recentBlocks) NewIterator() RecentBlockIterator { func (rb *recentBlocks) NewIterator() RecentBlockIterator {
if rb.lastHeight == -1 { if rb.lastHeight == -1 || len(rb.hashes) == 0 {
return nil return nil
} }
return &blockIterator{ return &blockIterator{

View file

@ -751,10 +751,6 @@ func TestImportPrivateKey(t *testing.T) {
// verify that the entire wallet's sync height matches the // verify that the entire wallet's sync height matches the
// expected createHeight. // expected createHeight.
if h := w.EarliestBlockHeight(); h != createHeight {
t.Error("Initial earliest height %v does not match expected %v.", h, createHeight)
return
}
if h := w.SyncHeight(); h != createHeight { if h := w.SyncHeight(); h != createHeight {
t.Error("Initial sync height %v does not match expected %v.", h, createHeight) t.Error("Initial sync height %v does not match expected %v.", h, createHeight)
return return
@ -791,12 +787,7 @@ func TestImportPrivateKey(t *testing.T) {
return return
} }
// verify that the earliest block and sync heights now match the // verify that the sync height now match the (smaller) import height.
// (smaller) import height.
if h := w.EarliestBlockHeight(); h != importHeight {
t.Errorf("After import earliest height %v does not match expected %v.", h, importHeight)
return
}
if h := w.SyncHeight(); h != importHeight { if h := w.SyncHeight(); h != importHeight {
t.Errorf("After import sync height %v does not match expected %v.", h, importHeight) t.Errorf("After import sync height %v does not match expected %v.", h, importHeight)
return return
@ -818,11 +809,7 @@ func TestImportPrivateKey(t *testing.T) {
return return
} }
// Verify that the earliest and sync height match expected after the reserialization. // Verify that the sync height match expected after the reserialization.
if h := w2.EarliestBlockHeight(); h != importHeight {
t.Errorf("After reserialization earliest height %v does not match expected %v.", h, importHeight)
return
}
if h := w2.SyncHeight(); h != importHeight { if h := w2.SyncHeight(); h != importHeight {
t.Errorf("After reserialization sync height %v does not match expected %v.", h, importHeight) t.Errorf("After reserialization sync height %v does not match expected %v.", h, importHeight)
return return
@ -835,10 +822,6 @@ func TestImportPrivateKey(t *testing.T) {
t.Errorf("Cannot mark address partially synced: %v", err) t.Errorf("Cannot mark address partially synced: %v", err)
return return
} }
if h := w2.EarliestBlockHeight(); h != importHeight {
t.Errorf("After address partial sync, earliest height %v does not match expected %v.", h, importHeight)
return
}
if h := w2.SyncHeight(); h != partialHeight { if h := w2.SyncHeight(); h != partialHeight {
t.Errorf("After address partial sync, sync height %v does not match expected %v.", h, partialHeight) t.Errorf("After address partial sync, sync height %v does not match expected %v.", h, partialHeight)
return return
@ -871,10 +854,6 @@ func TestImportPrivateKey(t *testing.T) {
t.Errorf("Cannot mark address synced: %v", err) t.Errorf("Cannot mark address synced: %v", err)
return return
} }
if h := w3.EarliestBlockHeight(); h != importHeight {
t.Errorf("After address unsync, earliest height %v does not match expected %v.", h, importHeight)
return
}
if h := w3.SyncHeight(); h != importHeight { if h := w3.SyncHeight(); h != importHeight {
t.Errorf("After address unsync, sync height %v does not match expected %v.", h, importHeight) t.Errorf("After address unsync, sync height %v does not match expected %v.", h, importHeight)
return return
@ -887,10 +866,6 @@ func TestImportPrivateKey(t *testing.T) {
t.Errorf("Cannot mark address synced: %v", err) t.Errorf("Cannot mark address synced: %v", err)
return return
} }
if h := w3.EarliestBlockHeight(); h != importHeight {
t.Errorf("After address sync, earliest height %v does not match expected %v.", h, importHeight)
return
}
if h := w3.SyncHeight(); h != createHeight { if h := w3.SyncHeight(); h != createHeight {
t.Errorf("After address sync, sync height %v does not match expected %v.", h, createHeight) t.Errorf("After address sync, sync height %v does not match expected %v.", h, createHeight)
return return
@ -940,10 +915,6 @@ func TestImportScript(t *testing.T) {
// verify that the entire wallet's sync height matches the // verify that the entire wallet's sync height matches the
// expected createHeight. // expected createHeight.
if h := w.EarliestBlockHeight(); h != createHeight {
t.Error("Initial earliest height %v does not match expected %v.", h, createHeight)
return
}
if h := w.SyncHeight(); h != createHeight { if h := w.SyncHeight(); h != createHeight {
t.Error("Initial sync height %v does not match expected %v.", h, createHeight) t.Error("Initial sync height %v does not match expected %v.", h, createHeight)
return return
@ -1017,12 +988,7 @@ func TestImportScript(t *testing.T) {
return return
} }
// verify that the earliest block and sync heights now match the // verify that the sync height now match the (smaller) import height.
// (smaller) import height.
if h := w.EarliestBlockHeight(); h != importHeight {
t.Errorf("After import earliest height %v does not match expected %v.", h, importHeight)
return
}
if h := w.SyncHeight(); h != importHeight { if h := w.SyncHeight(); h != importHeight {
t.Errorf("After import sync height %v does not match expected %v.", h, importHeight) t.Errorf("After import sync height %v does not match expected %v.", h, importHeight)
return return
@ -1044,11 +1010,7 @@ func TestImportScript(t *testing.T) {
return return
} }
// Verify that the earliest and sync height match expected after the reserialization. // Verify that the sync height matches expected after the reserialization.
if h := w2.EarliestBlockHeight(); h != importHeight {
t.Errorf("After reserialization earliest height %v does not match expected %v.", h, importHeight)
return
}
if h := w2.SyncHeight(); h != importHeight { if h := w2.SyncHeight(); h != importHeight {
t.Errorf("After reserialization sync height %v does not match expected %v.", h, importHeight) t.Errorf("After reserialization sync height %v does not match expected %v.", h, importHeight)
return return
@ -1127,10 +1089,6 @@ func TestImportScript(t *testing.T) {
t.Errorf("Cannot mark address partially synced: %v", err) t.Errorf("Cannot mark address partially synced: %v", err)
return return
} }
if h := w2.EarliestBlockHeight(); h != importHeight {
t.Errorf("After address partial sync, earliest height %v does not match expected %v.", h, importHeight)
return
}
if h := w2.SyncHeight(); h != partialHeight { if h := w2.SyncHeight(); h != partialHeight {
t.Errorf("After address partial sync, sync height %v does not match expected %v.", h, partialHeight) t.Errorf("After address partial sync, sync height %v does not match expected %v.", h, partialHeight)
return return
@ -1163,10 +1121,6 @@ func TestImportScript(t *testing.T) {
t.Errorf("Cannot mark address synced: %v", err) t.Errorf("Cannot mark address synced: %v", err)
return return
} }
if h := w3.EarliestBlockHeight(); h != importHeight {
t.Errorf("After address unsync, earliest height %v does not match expected %v.", h, importHeight)
return
}
if h := w3.SyncHeight(); h != importHeight { if h := w3.SyncHeight(); h != importHeight {
t.Errorf("After address unsync, sync height %v does not match expected %v.", h, importHeight) t.Errorf("After address unsync, sync height %v does not match expected %v.", h, importHeight)
return return
@ -1179,10 +1133,6 @@ func TestImportScript(t *testing.T) {
t.Errorf("Cannot mark address synced: %v", err) t.Errorf("Cannot mark address synced: %v", err)
return return
} }
if h := w3.EarliestBlockHeight(); h != importHeight {
t.Errorf("After address sync, earliest height %v does not match expected %v.", h, importHeight)
return
}
if h := w3.SyncHeight(); h != createHeight { if h := w3.SyncHeight(); h != createHeight {
t.Errorf("After address sync, sync height %v does not match expected %v.", h, createHeight) t.Errorf("After address sync, sync height %v does not match expected %v.", h, createHeight)
return return