From 089cc747db3c573b07fdae8e16b311979bb4d896 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Mon, 7 Jan 2019 18:40:30 -0800 Subject: [PATCH 1/4] wallet/wallet: add new syncToBirthday method In this commit, we add a new syncToBirthday method to the wallet. This method intends to sync the wallet's point of the view of the chain until finding its birthday. Most of the logic found within it is heavily borrowed from the existing syncWithChain method. This method is currently unused, but it will end up replacing some of the existing sync logic in a later commit. Co-authored-by: Roei Erez --- wallet/wallet.go | 158 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 158 insertions(+) diff --git a/wallet/wallet.go b/wallet/wallet.go index a50dbcf..6afb508 100644 --- a/wallet/wallet.go +++ b/wallet/wallet.go @@ -697,6 +697,164 @@ func (w *Wallet) syncWithChain(birthdayStamp *waddrmgr.BlockStamp) error { return w.rescanWithTarget(addrs, unspent, nil) } +// scanChain is a helper method that scans the chain from the starting height +// until the tip of the chain. The onBlock callback can be used to perform +// certain operations for every block that we process as we scan the chain. +func (w *Wallet) scanChain(startHeight int32, + onBlock func(int32, *chainhash.Hash, *wire.BlockHeader) error) error { + + chainClient, err := w.requireChainClient() + if err != nil { + return err + } + + // isCurrent is a helper function that we'll use to determine if the + // chain backend is currently synced. When running with a btcd or + // bitcoind backend, It will use the height of the latest checkpoint as + // its lower bound. + var latestCheckptHeight int32 + if len(w.chainParams.Checkpoints) > 0 { + latestCheckptHeight = w.chainParams. + Checkpoints[len(w.chainParams.Checkpoints)-1].Height + } + isCurrent := func(bestHeight int32) bool { + switch c := chainClient.(type) { + case *chain.NeutrinoClient: + return c.CS.IsCurrent() + } + return bestHeight >= latestCheckptHeight + } + + // Determine the latest height known to the chain backend and begin + // scanning the chain from the start height up until this point. + _, bestHeight, err := chainClient.GetBestBlock() + if err != nil { + return err + } + + for height := startHeight; height <= bestHeight; height++ { + hash, err := chainClient.GetBlockHash(int64(height)) + if err != nil { + return err + } + header, err := chainClient.GetBlockHeader(hash) + if err != nil { + return err + } + + if err := onBlock(height, hash, header); err != nil { + return err + } + + // If we've reached our best height and we're not current, we'll + // wait for blocks at tip to ensure we go through all existent + // blocks. + for height == bestHeight && !isCurrent(bestHeight) { + time.Sleep(100 * time.Millisecond) + _, bestHeight, err = chainClient.GetBestBlock() + if err != nil { + return err + } + } + } + + return nil +} + +// syncToBirthday attempts to sync the wallet's point of view of the chain until +// it finds the first block whose timestamp is above the wallet's birthday. The +// wallet's birthday is already two days in the past of its actual birthday, so +// this is relatively safe to do. +func (w *Wallet) syncToBirthday() (*waddrmgr.BlockStamp, error) { + var birthdayStamp *waddrmgr.BlockStamp + birthday := w.Manager.Birthday() + + tx, err := w.db.BeginReadWriteTx() + if err != nil { + return nil, err + } + ns := tx.ReadWriteBucket(waddrmgrNamespaceKey) + + // We'll begin scanning the chain from our last sync point until finding + // the first block with a timestamp greater than our birthday. We'll use + // this block to represent our birthday stamp. errDone is an error we'll + // use to signal that we've found it and no longer need to keep scanning + // the chain. + errDone := errors.New("done") + err = w.scanChain(w.Manager.SyncedTo().Height, func(height int32, + hash *chainhash.Hash, header *wire.BlockHeader) error { + + if header.Timestamp.After(birthday) { + log.Debugf("Found birthday block: height=%d, hash=%v", + height, hash) + + birthdayStamp = &waddrmgr.BlockStamp{ + Hash: *hash, + Height: height, + Timestamp: header.Timestamp, + } + + err := w.Manager.SetBirthdayBlock( + ns, *birthdayStamp, true, + ) + if err != nil { + return err + } + } + + err = w.Manager.SetSyncedTo(ns, &waddrmgr.BlockStamp{ + Hash: *hash, + Height: height, + Timestamp: header.Timestamp, + }) + if err != nil { + return err + } + + // Checkpoint our state every 10K blocks. + if height%10000 == 0 { + if err := tx.Commit(); err != nil { + return err + } + + log.Infof("Caught up to height %d", height) + + tx, err = w.db.BeginReadWriteTx() + if err != nil { + return err + } + ns = tx.ReadWriteBucket(waddrmgrNamespaceKey) + } + + // If we've found our birthday, we can return errDone to signal + // that we should stop scanning the chain and persist our state. + if birthdayStamp != nil { + return errDone + } + + return nil + }) + if err != nil && err != errDone { + tx.Rollback() + return nil, err + } + + // If a birthday stamp has yet to be found, we'll return an error + // indicating so. + if birthdayStamp == nil { + tx.Rollback() + return nil, fmt.Errorf("did not find a suitable birthday "+ + "block with a timestamp greater than %v", birthday) + } + + if err := tx.Commit(); err != nil { + tx.Rollback() + return nil, err + } + + return birthdayStamp, nil +} + // defaultScopeManagers fetches the ScopedKeyManagers from the wallet using the // default set of key scopes. func (w *Wallet) defaultScopeManagers() ( From 29e1f0c4fb7537c040632c7d7b44b5a181787bcf Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Mon, 7 Jan 2019 18:40:46 -0800 Subject: [PATCH 2/4] wallet/wallet: add new recovery method In this commit, we add a new recovery method to the wallet. This method attempts to recover any unspent outputs which pay to any of the wallet's addresses. Most of the logic found within it is heavily borrowed from the existing syncWithChain method. This method is currently unused, but it will end up replacing some of the existing sync logic in a later commit. --- wallet/wallet.go | 124 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) diff --git a/wallet/wallet.go b/wallet/wallet.go index 6afb508..eccda56 100644 --- a/wallet/wallet.go +++ b/wallet/wallet.go @@ -855,6 +855,130 @@ func (w *Wallet) syncToBirthday() (*waddrmgr.BlockStamp, error) { return birthdayStamp, nil } +// recovery attempts to recover any unspent outputs that pay to any of our +// addresses starting from the specified height. +// +// NOTE: The starting height must be at least the height of the wallet's +// birthday or later. +func (w *Wallet) recovery(startHeight int32) error { + log.Infof("RECOVERY MODE ENABLED -- rescanning for used addresses "+ + "with recovery_window=%d", w.recoveryWindow) + + // We'll initialize the recovery manager with a default batch size of + // 2000. + recoveryMgr := NewRecoveryManager( + w.recoveryWindow, recoveryBatchSize, w.chainParams, + ) + + // In the event that this recovery is being resumed, we will need to + // repopulate all found addresses from the database. For basic recovery, + // we will only do so for the default scopes. + scopedMgrs, err := w.defaultScopeManagers() + if err != nil { + return err + } + tx, err := w.db.BeginReadWriteTx() + if err != nil { + return err + } + txMgrNS := tx.ReadBucket(wtxmgrNamespaceKey) + credits, err := w.TxStore.UnspentOutputs(txMgrNS) + if err != nil { + tx.Rollback() + return err + } + addrMgrNS := tx.ReadWriteBucket(waddrmgrNamespaceKey) + err = recoveryMgr.Resurrect(addrMgrNS, scopedMgrs, credits) + if err != nil { + tx.Rollback() + return err + } + + // We'll also retrieve our chain backend client in order to filter the + // blocks as we go. + chainClient, err := w.requireChainClient() + if err != nil { + tx.Rollback() + return err + } + + // We'll begin scanning the chain from the specified starting height. + // Since we assume that the lowest height we start with will at least be + // that of our birthday, we can just add every block we process from + // this point forward to the recovery batch. + err = w.scanChain(startHeight, func(height int32, + hash *chainhash.Hash, header *wire.BlockHeader) error { + + recoveryMgr.AddToBlockBatch(hash, height, header.Timestamp) + + // We'll checkpoint our current batch every 2K blocks, so we'll + // need to start a new database transaction. If our current + // batch is empty, then this will act as a NOP. + if height%recoveryBatchSize == 0 { + blockBatch := recoveryMgr.BlockBatch() + err := w.recoverDefaultScopes( + chainClient, tx, addrMgrNS, blockBatch, + recoveryMgr.State(), + ) + if err != nil { + return err + } + + // Clear the batch of all processed blocks. + recoveryMgr.ResetBlockBatch() + + if err := tx.Commit(); err != nil { + return err + } + + log.Infof("Recovered addresses from blocks %d-%d", + blockBatch[0].Height, + blockBatch[len(blockBatch)-1].Height) + + tx, err = w.db.BeginReadWriteTx() + if err != nil { + return err + } + addrMgrNS = tx.ReadWriteBucket(waddrmgrNamespaceKey) + } + + // Since the recovery in a way acts as a rescan, we'll update + // the wallet's tip to point to the current block so that we + // don't unnecessarily rescan the same block again later on. + return w.Manager.SetSyncedTo(addrMgrNS, &waddrmgr.BlockStamp{ + Hash: *hash, + Height: height, + Timestamp: header.Timestamp, + }) + }) + if err != nil { + tx.Rollback() + return err + } + + // Now that we've reached the chain tip, we can process our final batch + // with the remaining blocks if it did not reach its maximum size. + blockBatch := recoveryMgr.BlockBatch() + err = w.recoverDefaultScopes( + chainClient, tx, addrMgrNS, blockBatch, recoveryMgr.State(), + ) + if err != nil { + tx.Rollback() + return err + } + + // With the recovery complete, we can persist our new state and exit. + if err := tx.Commit(); err != nil { + tx.Rollback() + return err + } + + log.Infof("Recovered addresses from blocks %d-%d", blockBatch[0].Height, + blockBatch[len(blockBatch)-1].Height) + + return nil +} + // defaultScopeManagers fetches the ScopedKeyManagers from the wallet using the // default set of key scopes. func (w *Wallet) defaultScopeManagers() ( From db837f1ba3402a6e6bace1d875c3bdfbaf4b1d14 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Mon, 7 Jan 2019 18:43:19 -0800 Subject: [PATCH 3/4] wallet/wallet: use new syncToBirthday and recovery methods In this commit, we refactor the wallet's syncing logic with syncWithChain to use the newer, simpler methods: syncToBirthday and recovery. Along the way, we also fix a bug within the wallet where it was possible to sync past the birthday, but not sync to tip completely and restart, which would lead to us starting a rescan from the latest synced height, rather than from the birthday stamp. This commit slightly changes the wallet's syncing behavior to the following: 1. Ensure the wallet is synced to its birthday. 2. Perform a recovery if requested. 3. Check for chain reorgs. 4. Dispatch a rescan from the current synced height. Co-authored-by: Roei Erez --- wallet/wallet.go | 301 +++++------------------------------------------ 1 file changed, 30 insertions(+), 271 deletions(-) diff --git a/wallet/wallet.go b/wallet/wallet.go index eccda56..c7c7baa 100644 --- a/wallet/wallet.go +++ b/wallet/wallet.go @@ -327,274 +327,27 @@ func (w *Wallet) syncWithChain(birthdayStamp *waddrmgr.BlockStamp) error { return err } - // Request notifications for transactions sending to all wallet - // addresses. - var ( - addrs []btcutil.Address - unspent []wtxmgr.Credit - ) - err = walletdb.View(w.db, func(dbtx walletdb.ReadTx) error { + // To start, if we've yet to find our birthday stamp, we'll do so now. + if birthdayStamp == nil { var err error - addrs, unspent, err = w.activeData(dbtx) - return err - }) - if err != nil { - return err + birthdayStamp, err = w.syncToBirthday() + if err != nil { + return err + } } - startHeight := w.Manager.SyncedTo().Height - - // We'll mark this as our first sync if we don't have any unspent - // outputs as known by the wallet. This'll allow us to skip a full - // rescan at this height, and instead wait for the backend to catch up. - isInitialSync := len(unspent) == 0 - - isRecovery := w.recoveryWindow > 0 - birthday := w.Manager.Birthday() - - // TODO(jrick): How should this handle a synced height earlier than - // the chain server best block? - - // When no addresses have been generated for the wallet, the rescan can - // be skipped. - // - // TODO: This is only correct because activeData above returns all - // addresses ever created, including those that don't need to be watched - // anymore. This code should be updated when this assumption is no - // longer true, but worst case would result in an unnecessary rescan. - if isInitialSync || isRecovery { - // Find the latest checkpoint's height. This lets us catch up to - // at least that checkpoint, since we're synchronizing from - // scratch, and lets us avoid a bunch of costly DB transactions - // in the case when we're using BDB for the walletdb backend and - // Neutrino for the chain.Interface backend, and the chain - // backend starts synchronizing at the same time as the wallet. - _, bestHeight, err := chainClient.GetBestBlock() - if err != nil { - return err + // If the wallet requested an on-chain recovery of its funds, we'll do + // so now. + if w.recoveryWindow > 0 { + // We'll start the recovery from our birthday unless we were + // in the middle of a previous recovery attempt. If that's the + // case, we'll resume from that point. + startHeight := birthdayStamp.Height + walletHeight := w.Manager.SyncedTo().Height + if walletHeight > startHeight { + startHeight = walletHeight } - - checkHeight := bestHeight - if len(w.chainParams.Checkpoints) > 0 { - checkHeight = w.chainParams.Checkpoints[len( - w.chainParams.Checkpoints)-1].Height - } - - logHeight := checkHeight - if bestHeight > logHeight { - logHeight = bestHeight - } - - log.Infof("Catching up block hashes to height %d, this will "+ - "take a while...", logHeight) - - // Initialize the first database transaction. - tx, err := w.db.BeginReadWriteTx() - if err != nil { - return err - } - ns := tx.ReadWriteBucket(waddrmgrNamespaceKey) - - // Only allocate the recoveryMgr if we are actually in recovery - // mode. - var recoveryMgr *RecoveryManager - if isRecovery { - log.Infof("RECOVERY MODE ENABLED -- rescanning for "+ - "used addresses with recovery_window=%d", - w.recoveryWindow) - - // Initialize the recovery manager with a default batch - // size of 2000. - recoveryMgr = NewRecoveryManager( - w.recoveryWindow, recoveryBatchSize, - w.chainParams, - ) - - // In the event that this recovery is being resumed, we - // will need to repopulate all found addresses from the - // database. For basic recovery, we will only do so for - // the default scopes. - scopedMgrs, err := w.defaultScopeManagers() - if err != nil { - return err - } - - txmgrNs := tx.ReadBucket(wtxmgrNamespaceKey) - credits, err := w.TxStore.UnspentOutputs(txmgrNs) - if err != nil { - return err - } - - err = recoveryMgr.Resurrect(ns, scopedMgrs, credits) - if err != nil { - return err - } - } - - for height := startHeight; height <= bestHeight; height++ { - hash, err := chainClient.GetBlockHash(int64(height)) - if err != nil { - tx.Rollback() - return err - } - - // If we're using the Neutrino backend, we can check if - // it's current or not. For other backends we'll assume - // it is current if the best height has reached the - // last checkpoint. - isCurrent := func(bestHeight int32) bool { - switch c := chainClient.(type) { - case *chain.NeutrinoClient: - return c.CS.IsCurrent() - } - return bestHeight >= checkHeight - } - - // If we've found the best height the backend knows - // about, and the backend is still synchronizing, we'll - // wait. We can give it a little bit of time to - // synchronize further before updating the best height - // based on the backend. Once we see that the backend - // has advanced, we can catch up to it. - for height == bestHeight && !isCurrent(bestHeight) { - time.Sleep(100 * time.Millisecond) - _, bestHeight, err = chainClient.GetBestBlock() - if err != nil { - tx.Rollback() - return err - } - } - - header, err := chainClient.GetBlockHeader(hash) - if err != nil { - return err - } - - // Check to see if this header's timestamp has surpassed - // our birthday or if we've surpassed one previously. - timestamp := header.Timestamp - if timestamp.After(birthday) || birthdayStamp != nil { - // If this is the first block past our birthday, - // record the block stamp so that we can use - // this as the starting point for the rescan. - // This will ensure we don't miss transactions - // that are sent to the wallet during an initial - // sync. - // - // NOTE: The birthday persisted by the wallet is - // two days before the actual wallet birthday, - // to deal with potentially inaccurate header - // timestamps. - if birthdayStamp == nil { - birthdayStamp = &waddrmgr.BlockStamp{ - Height: height, - Hash: *hash, - Timestamp: timestamp, - } - - log.Debugf("Found birthday block: "+ - "height=%d, hash=%v", - birthdayStamp.Height, - birthdayStamp.Hash) - - err := w.Manager.SetBirthdayBlock( - ns, *birthdayStamp, true, - ) - if err != nil { - tx.Rollback() - return err - } - } - - // If we are in recovery mode and the check - // passes, we will add this block to our list of - // blocks to scan for recovered addresses. - if isRecovery { - recoveryMgr.AddToBlockBatch( - hash, height, timestamp, - ) - } - } - - err = w.Manager.SetSyncedTo(ns, &waddrmgr.BlockStamp{ - Hash: *hash, - Height: height, - Timestamp: timestamp, - }) - if err != nil { - tx.Rollback() - return err - } - - // If we are in recovery mode, attempt a recovery on - // blocks that have been added to the recovery manager's - // block batch thus far. If block batch is empty, this - // will be a NOP. - if isRecovery && height%recoveryBatchSize == 0 { - err := w.recoverDefaultScopes( - chainClient, tx, ns, - recoveryMgr.BlockBatch(), - recoveryMgr.State(), - ) - if err != nil { - tx.Rollback() - return err - } - - // Clear the batch of all processed blocks. - recoveryMgr.ResetBlockBatch() - } - - // Every 10K blocks, commit and start a new database TX. - if height%10000 == 0 { - err = tx.Commit() - if err != nil { - tx.Rollback() - return err - } - - log.Infof("Caught up to height %d", height) - - tx, err = w.db.BeginReadWriteTx() - if err != nil { - return err - } - - ns = tx.ReadWriteBucket(waddrmgrNamespaceKey) - } - } - - // Perform one last recovery attempt for all blocks that were - // not batched at the default granularity of 2000 blocks. - if isRecovery { - err := w.recoverDefaultScopes( - chainClient, tx, ns, recoveryMgr.BlockBatch(), - recoveryMgr.State(), - ) - if err != nil { - tx.Rollback() - return err - } - } - - // Commit (or roll back) the final database transaction. - err = tx.Commit() - if err != nil { - tx.Rollback() - return err - } - log.Info("Done catching up block hashes") - - // Since we've spent some time catching up block hashes, we - // might have new addresses waiting for us that were requested - // during initial sync. Make sure we have those before we - // request a rescan later on. - err = walletdb.View(w.db, func(dbtx walletdb.ReadTx) error { - var err error - addrs, unspent, err = w.activeData(dbtx) - return err - }) - if err != nil { + if err := w.recovery(startHeight); err != nil { return err } } @@ -685,15 +438,21 @@ func (w *Wallet) syncWithChain(birthdayStamp *waddrmgr.BlockStamp) error { return err } - // If this was our initial sync, we're recovering from our seed, or our - // birthday was rolled back due to a chain reorg, we'll dispatch a - // rescan from our birthday block to ensure we detect all relevant - // on-chain events from this point. - if isInitialSync || isRecovery || birthdayRollback { - return w.rescanWithTarget(addrs, unspent, birthdayStamp) + // Finally, we'll trigger a wallet rescan from the currently synced tip + // and request notifications for transactions sending to all wallet + // addresses and spending all wallet UTXOs. + var ( + addrs []btcutil.Address + unspent []wtxmgr.Credit + ) + err = walletdb.View(w.db, func(dbtx walletdb.ReadTx) error { + addrs, unspent, err = w.activeData(dbtx) + return err + }) + if err != nil { + return err } - // Otherwise, we'll rescan from tip. return w.rescanWithTarget(addrs, unspent, nil) } From c853ceaa605603d956c7713ba0fcebda67bc2a0c Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Mon, 7 Jan 2019 18:44:28 -0800 Subject: [PATCH 4/4] wallet/wallet: consolidate rollback logic In this commit, we consolidate the existing rollback logic to carry out its duties under one database transaction. Co-authored-by: Roei Erez --- wallet/wallet.go | 72 ++++++++++++++++++++++-------------------------- 1 file changed, 33 insertions(+), 39 deletions(-) diff --git a/wallet/wallet.go b/wallet/wallet.go index c7c7baa..221e6a1 100644 --- a/wallet/wallet.go +++ b/wallet/wallet.go @@ -322,11 +322,6 @@ func (w *Wallet) activeData(dbtx walletdb.ReadTx) ([]btcutil.Address, []wtxmgr.C // finished. The birthday block can be passed in, if set, to ensure we can // properly detect if it gets rolled back. func (w *Wallet) syncWithChain(birthdayStamp *waddrmgr.BlockStamp) error { - chainClient, err := w.requireChainClient() - if err != nil { - return err - } - // To start, if we've yet to find our birthday stamp, we'll do so now. if birthdayStamp == nil { var err error @@ -352,14 +347,20 @@ func (w *Wallet) syncWithChain(birthdayStamp *waddrmgr.BlockStamp) error { } } - // Compare previously-seen blocks against the chain server. If any of + // Compare previously-seen blocks against the current chain. If any of // these blocks no longer exist, rollback all of the missing blocks // before catching up with the rescan. rollback := false rollbackStamp := w.Manager.SyncedTo() + chainClient, err := w.requireChainClient() + if err != nil { + return err + } + err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { addrmgrNs := tx.ReadWriteBucket(waddrmgrNamespaceKey) txmgrNs := tx.ReadWriteBucket(wtxmgrNamespaceKey) + for height := rollbackStamp.Height; true; height-- { hash, err := w.Manager.BlockHash(addrmgrNs, height) if err != nil { @@ -384,48 +385,41 @@ func (w *Wallet) syncWithChain(birthdayStamp *waddrmgr.BlockStamp) error { rollback = true } - if rollback { - err := w.Manager.SetSyncedTo(addrmgrNs, &rollbackStamp) - if err != nil { - return err - } - // Rollback unconfirms transactions at and beyond the - // passed height, so add one to the new synced-to height - // to prevent unconfirming txs from the synced-to block. - err = w.TxStore.Rollback(txmgrNs, rollbackStamp.Height+1) + // If a rollback did not happen, we can proceed safely. + if !rollback { + return nil + } + + // Otherwise, we'll mark this as our new synced height. + err := w.Manager.SetSyncedTo(addrmgrNs, &rollbackStamp) + if err != nil { + return err + } + + // If the rollback happened to go beyond our birthday stamp, + // we'll need to find a new one by syncing with the chain again + // until finding one. + if rollbackStamp.Height <= birthdayStamp.Height && + rollbackStamp.Hash != birthdayStamp.Hash { + + err := w.Manager.SetBirthdayBlock( + addrmgrNs, rollbackStamp, true, + ) if err != nil { return err } } - return nil + + // Finally, we'll roll back our transaction store to reflect the + // stale state. `Rollback` unconfirms transactions at and beyond + // the passed height, so add one to the new synced-to height to + // prevent unconfirming transactions in the synced-to block. + return w.TxStore.Rollback(txmgrNs, rollbackStamp.Height+1) }) if err != nil { return err } - // If a birthday stamp was found during the initial sync and the - // rollback causes us to revert it, update the birthday stamp so that it - // points at the new tip. - birthdayRollback := false - if birthdayStamp != nil && rollbackStamp.Height <= birthdayStamp.Height { - birthdayStamp = &rollbackStamp - birthdayRollback = true - - log.Debugf("Found new birthday block after rollback: "+ - "height=%d, hash=%v", birthdayStamp.Height, - birthdayStamp.Hash) - - err := walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { - ns := tx.ReadWriteBucket(waddrmgrNamespaceKey) - return w.Manager.SetBirthdayBlock( - ns, *birthdayStamp, true, - ) - }) - if err != nil { - return nil - } - } - // Request notifications for connected and disconnected blocks. // // TODO(jrick): Either request this notification only once, or when