diff --git a/wallet/wallet.go b/wallet/wallet.go index a50dbcf..221e6a1 100644 --- a/wallet/wallet.go +++ b/wallet/wallet.go @@ -322,291 +322,45 @@ func (w *Wallet) activeData(dbtx walletdb.ReadTx) ([]btcutil.Address, []wtxmgr.C // finished. The birthday block can be passed in, if set, to ensure we can // properly detect if it gets rolled back. func (w *Wallet) syncWithChain(birthdayStamp *waddrmgr.BlockStamp) error { + // To start, if we've yet to find our birthday stamp, we'll do so now. + if birthdayStamp == nil { + var err error + birthdayStamp, err = w.syncToBirthday() + if err != nil { + return err + } + } + + // If the wallet requested an on-chain recovery of its funds, we'll do + // so now. + if w.recoveryWindow > 0 { + // We'll start the recovery from our birthday unless we were + // in the middle of a previous recovery attempt. If that's the + // case, we'll resume from that point. + startHeight := birthdayStamp.Height + walletHeight := w.Manager.SyncedTo().Height + if walletHeight > startHeight { + startHeight = walletHeight + } + if err := w.recovery(startHeight); err != nil { + return err + } + } + + // Compare previously-seen blocks against the current chain. If any of + // these blocks no longer exist, rollback all of the missing blocks + // before catching up with the rescan. + rollback := false + rollbackStamp := w.Manager.SyncedTo() chainClient, err := w.requireChainClient() if err != nil { return err } - // Request notifications for transactions sending to all wallet - // addresses. - var ( - addrs []btcutil.Address - unspent []wtxmgr.Credit - ) - err = walletdb.View(w.db, func(dbtx walletdb.ReadTx) error { - var err error - addrs, unspent, err = w.activeData(dbtx) - return err - }) - if err != nil { - return err - } - - startHeight := w.Manager.SyncedTo().Height - - // We'll mark this as our first sync if we don't have any unspent - // outputs as known by the wallet. This'll allow us to skip a full - // rescan at this height, and instead wait for the backend to catch up. - isInitialSync := len(unspent) == 0 - - isRecovery := w.recoveryWindow > 0 - birthday := w.Manager.Birthday() - - // TODO(jrick): How should this handle a synced height earlier than - // the chain server best block? - - // When no addresses have been generated for the wallet, the rescan can - // be skipped. - // - // TODO: This is only correct because activeData above returns all - // addresses ever created, including those that don't need to be watched - // anymore. This code should be updated when this assumption is no - // longer true, but worst case would result in an unnecessary rescan. - if isInitialSync || isRecovery { - // Find the latest checkpoint's height. This lets us catch up to - // at least that checkpoint, since we're synchronizing from - // scratch, and lets us avoid a bunch of costly DB transactions - // in the case when we're using BDB for the walletdb backend and - // Neutrino for the chain.Interface backend, and the chain - // backend starts synchronizing at the same time as the wallet. - _, bestHeight, err := chainClient.GetBestBlock() - if err != nil { - return err - } - - checkHeight := bestHeight - if len(w.chainParams.Checkpoints) > 0 { - checkHeight = w.chainParams.Checkpoints[len( - w.chainParams.Checkpoints)-1].Height - } - - logHeight := checkHeight - if bestHeight > logHeight { - logHeight = bestHeight - } - - log.Infof("Catching up block hashes to height %d, this will "+ - "take a while...", logHeight) - - // Initialize the first database transaction. - tx, err := w.db.BeginReadWriteTx() - if err != nil { - return err - } - ns := tx.ReadWriteBucket(waddrmgrNamespaceKey) - - // Only allocate the recoveryMgr if we are actually in recovery - // mode. - var recoveryMgr *RecoveryManager - if isRecovery { - log.Infof("RECOVERY MODE ENABLED -- rescanning for "+ - "used addresses with recovery_window=%d", - w.recoveryWindow) - - // Initialize the recovery manager with a default batch - // size of 2000. - recoveryMgr = NewRecoveryManager( - w.recoveryWindow, recoveryBatchSize, - w.chainParams, - ) - - // In the event that this recovery is being resumed, we - // will need to repopulate all found addresses from the - // database. For basic recovery, we will only do so for - // the default scopes. - scopedMgrs, err := w.defaultScopeManagers() - if err != nil { - return err - } - - txmgrNs := tx.ReadBucket(wtxmgrNamespaceKey) - credits, err := w.TxStore.UnspentOutputs(txmgrNs) - if err != nil { - return err - } - - err = recoveryMgr.Resurrect(ns, scopedMgrs, credits) - if err != nil { - return err - } - } - - for height := startHeight; height <= bestHeight; height++ { - hash, err := chainClient.GetBlockHash(int64(height)) - if err != nil { - tx.Rollback() - return err - } - - // If we're using the Neutrino backend, we can check if - // it's current or not. For other backends we'll assume - // it is current if the best height has reached the - // last checkpoint. - isCurrent := func(bestHeight int32) bool { - switch c := chainClient.(type) { - case *chain.NeutrinoClient: - return c.CS.IsCurrent() - } - return bestHeight >= checkHeight - } - - // If we've found the best height the backend knows - // about, and the backend is still synchronizing, we'll - // wait. We can give it a little bit of time to - // synchronize further before updating the best height - // based on the backend. Once we see that the backend - // has advanced, we can catch up to it. - for height == bestHeight && !isCurrent(bestHeight) { - time.Sleep(100 * time.Millisecond) - _, bestHeight, err = chainClient.GetBestBlock() - if err != nil { - tx.Rollback() - return err - } - } - - header, err := chainClient.GetBlockHeader(hash) - if err != nil { - return err - } - - // Check to see if this header's timestamp has surpassed - // our birthday or if we've surpassed one previously. - timestamp := header.Timestamp - if timestamp.After(birthday) || birthdayStamp != nil { - // If this is the first block past our birthday, - // record the block stamp so that we can use - // this as the starting point for the rescan. - // This will ensure we don't miss transactions - // that are sent to the wallet during an initial - // sync. - // - // NOTE: The birthday persisted by the wallet is - // two days before the actual wallet birthday, - // to deal with potentially inaccurate header - // timestamps. - if birthdayStamp == nil { - birthdayStamp = &waddrmgr.BlockStamp{ - Height: height, - Hash: *hash, - Timestamp: timestamp, - } - - log.Debugf("Found birthday block: "+ - "height=%d, hash=%v", - birthdayStamp.Height, - birthdayStamp.Hash) - - err := w.Manager.SetBirthdayBlock( - ns, *birthdayStamp, true, - ) - if err != nil { - tx.Rollback() - return err - } - } - - // If we are in recovery mode and the check - // passes, we will add this block to our list of - // blocks to scan for recovered addresses. - if isRecovery { - recoveryMgr.AddToBlockBatch( - hash, height, timestamp, - ) - } - } - - err = w.Manager.SetSyncedTo(ns, &waddrmgr.BlockStamp{ - Hash: *hash, - Height: height, - Timestamp: timestamp, - }) - if err != nil { - tx.Rollback() - return err - } - - // If we are in recovery mode, attempt a recovery on - // blocks that have been added to the recovery manager's - // block batch thus far. If block batch is empty, this - // will be a NOP. - if isRecovery && height%recoveryBatchSize == 0 { - err := w.recoverDefaultScopes( - chainClient, tx, ns, - recoveryMgr.BlockBatch(), - recoveryMgr.State(), - ) - if err != nil { - tx.Rollback() - return err - } - - // Clear the batch of all processed blocks. - recoveryMgr.ResetBlockBatch() - } - - // Every 10K blocks, commit and start a new database TX. - if height%10000 == 0 { - err = tx.Commit() - if err != nil { - tx.Rollback() - return err - } - - log.Infof("Caught up to height %d", height) - - tx, err = w.db.BeginReadWriteTx() - if err != nil { - return err - } - - ns = tx.ReadWriteBucket(waddrmgrNamespaceKey) - } - } - - // Perform one last recovery attempt for all blocks that were - // not batched at the default granularity of 2000 blocks. - if isRecovery { - err := w.recoverDefaultScopes( - chainClient, tx, ns, recoveryMgr.BlockBatch(), - recoveryMgr.State(), - ) - if err != nil { - tx.Rollback() - return err - } - } - - // Commit (or roll back) the final database transaction. - err = tx.Commit() - if err != nil { - tx.Rollback() - return err - } - log.Info("Done catching up block hashes") - - // Since we've spent some time catching up block hashes, we - // might have new addresses waiting for us that were requested - // during initial sync. Make sure we have those before we - // request a rescan later on. - err = walletdb.View(w.db, func(dbtx walletdb.ReadTx) error { - var err error - addrs, unspent, err = w.activeData(dbtx) - return err - }) - if err != nil { - return err - } - } - - // Compare previously-seen blocks against the chain server. If any of - // these blocks no longer exist, rollback all of the missing blocks - // before catching up with the rescan. - rollback := false - rollbackStamp := w.Manager.SyncedTo() err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { addrmgrNs := tx.ReadWriteBucket(waddrmgrNamespaceKey) txmgrNs := tx.ReadWriteBucket(wtxmgrNamespaceKey) + for height := rollbackStamp.Height; true; height-- { hash, err := w.Manager.BlockHash(addrmgrNs, height) if err != nil { @@ -631,48 +385,41 @@ func (w *Wallet) syncWithChain(birthdayStamp *waddrmgr.BlockStamp) error { rollback = true } - if rollback { - err := w.Manager.SetSyncedTo(addrmgrNs, &rollbackStamp) - if err != nil { - return err - } - // Rollback unconfirms transactions at and beyond the - // passed height, so add one to the new synced-to height - // to prevent unconfirming txs from the synced-to block. - err = w.TxStore.Rollback(txmgrNs, rollbackStamp.Height+1) + // If a rollback did not happen, we can proceed safely. + if !rollback { + return nil + } + + // Otherwise, we'll mark this as our new synced height. + err := w.Manager.SetSyncedTo(addrmgrNs, &rollbackStamp) + if err != nil { + return err + } + + // If the rollback happened to go beyond our birthday stamp, + // we'll need to find a new one by syncing with the chain again + // until finding one. + if rollbackStamp.Height <= birthdayStamp.Height && + rollbackStamp.Hash != birthdayStamp.Hash { + + err := w.Manager.SetBirthdayBlock( + addrmgrNs, rollbackStamp, true, + ) if err != nil { return err } } - return nil + + // Finally, we'll roll back our transaction store to reflect the + // stale state. `Rollback` unconfirms transactions at and beyond + // the passed height, so add one to the new synced-to height to + // prevent unconfirming transactions in the synced-to block. + return w.TxStore.Rollback(txmgrNs, rollbackStamp.Height+1) }) if err != nil { return err } - // If a birthday stamp was found during the initial sync and the - // rollback causes us to revert it, update the birthday stamp so that it - // points at the new tip. - birthdayRollback := false - if birthdayStamp != nil && rollbackStamp.Height <= birthdayStamp.Height { - birthdayStamp = &rollbackStamp - birthdayRollback = true - - log.Debugf("Found new birthday block after rollback: "+ - "height=%d, hash=%v", birthdayStamp.Height, - birthdayStamp.Hash) - - err := walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { - ns := tx.ReadWriteBucket(waddrmgrNamespaceKey) - return w.Manager.SetBirthdayBlock( - ns, *birthdayStamp, true, - ) - }) - if err != nil { - return nil - } - } - // Request notifications for connected and disconnected blocks. // // TODO(jrick): Either request this notification only once, or when @@ -685,18 +432,306 @@ func (w *Wallet) syncWithChain(birthdayStamp *waddrmgr.BlockStamp) error { return err } - // If this was our initial sync, we're recovering from our seed, or our - // birthday was rolled back due to a chain reorg, we'll dispatch a - // rescan from our birthday block to ensure we detect all relevant - // on-chain events from this point. - if isInitialSync || isRecovery || birthdayRollback { - return w.rescanWithTarget(addrs, unspent, birthdayStamp) + // Finally, we'll trigger a wallet rescan from the currently synced tip + // and request notifications for transactions sending to all wallet + // addresses and spending all wallet UTXOs. + var ( + addrs []btcutil.Address + unspent []wtxmgr.Credit + ) + err = walletdb.View(w.db, func(dbtx walletdb.ReadTx) error { + addrs, unspent, err = w.activeData(dbtx) + return err + }) + if err != nil { + return err } - // Otherwise, we'll rescan from tip. return w.rescanWithTarget(addrs, unspent, nil) } +// scanChain is a helper method that scans the chain from the starting height +// until the tip of the chain. The onBlock callback can be used to perform +// certain operations for every block that we process as we scan the chain. +func (w *Wallet) scanChain(startHeight int32, + onBlock func(int32, *chainhash.Hash, *wire.BlockHeader) error) error { + + chainClient, err := w.requireChainClient() + if err != nil { + return err + } + + // isCurrent is a helper function that we'll use to determine if the + // chain backend is currently synced. When running with a btcd or + // bitcoind backend, It will use the height of the latest checkpoint as + // its lower bound. + var latestCheckptHeight int32 + if len(w.chainParams.Checkpoints) > 0 { + latestCheckptHeight = w.chainParams. + Checkpoints[len(w.chainParams.Checkpoints)-1].Height + } + isCurrent := func(bestHeight int32) bool { + switch c := chainClient.(type) { + case *chain.NeutrinoClient: + return c.CS.IsCurrent() + } + return bestHeight >= latestCheckptHeight + } + + // Determine the latest height known to the chain backend and begin + // scanning the chain from the start height up until this point. + _, bestHeight, err := chainClient.GetBestBlock() + if err != nil { + return err + } + + for height := startHeight; height <= bestHeight; height++ { + hash, err := chainClient.GetBlockHash(int64(height)) + if err != nil { + return err + } + header, err := chainClient.GetBlockHeader(hash) + if err != nil { + return err + } + + if err := onBlock(height, hash, header); err != nil { + return err + } + + // If we've reached our best height and we're not current, we'll + // wait for blocks at tip to ensure we go through all existent + // blocks. + for height == bestHeight && !isCurrent(bestHeight) { + time.Sleep(100 * time.Millisecond) + _, bestHeight, err = chainClient.GetBestBlock() + if err != nil { + return err + } + } + } + + return nil +} + +// syncToBirthday attempts to sync the wallet's point of view of the chain until +// it finds the first block whose timestamp is above the wallet's birthday. The +// wallet's birthday is already two days in the past of its actual birthday, so +// this is relatively safe to do. +func (w *Wallet) syncToBirthday() (*waddrmgr.BlockStamp, error) { + var birthdayStamp *waddrmgr.BlockStamp + birthday := w.Manager.Birthday() + + tx, err := w.db.BeginReadWriteTx() + if err != nil { + return nil, err + } + ns := tx.ReadWriteBucket(waddrmgrNamespaceKey) + + // We'll begin scanning the chain from our last sync point until finding + // the first block with a timestamp greater than our birthday. We'll use + // this block to represent our birthday stamp. errDone is an error we'll + // use to signal that we've found it and no longer need to keep scanning + // the chain. + errDone := errors.New("done") + err = w.scanChain(w.Manager.SyncedTo().Height, func(height int32, + hash *chainhash.Hash, header *wire.BlockHeader) error { + + if header.Timestamp.After(birthday) { + log.Debugf("Found birthday block: height=%d, hash=%v", + height, hash) + + birthdayStamp = &waddrmgr.BlockStamp{ + Hash: *hash, + Height: height, + Timestamp: header.Timestamp, + } + + err := w.Manager.SetBirthdayBlock( + ns, *birthdayStamp, true, + ) + if err != nil { + return err + } + } + + err = w.Manager.SetSyncedTo(ns, &waddrmgr.BlockStamp{ + Hash: *hash, + Height: height, + Timestamp: header.Timestamp, + }) + if err != nil { + return err + } + + // Checkpoint our state every 10K blocks. + if height%10000 == 0 { + if err := tx.Commit(); err != nil { + return err + } + + log.Infof("Caught up to height %d", height) + + tx, err = w.db.BeginReadWriteTx() + if err != nil { + return err + } + ns = tx.ReadWriteBucket(waddrmgrNamespaceKey) + } + + // If we've found our birthday, we can return errDone to signal + // that we should stop scanning the chain and persist our state. + if birthdayStamp != nil { + return errDone + } + + return nil + }) + if err != nil && err != errDone { + tx.Rollback() + return nil, err + } + + // If a birthday stamp has yet to be found, we'll return an error + // indicating so. + if birthdayStamp == nil { + tx.Rollback() + return nil, fmt.Errorf("did not find a suitable birthday "+ + "block with a timestamp greater than %v", birthday) + } + + if err := tx.Commit(); err != nil { + tx.Rollback() + return nil, err + } + + return birthdayStamp, nil +} + +// recovery attempts to recover any unspent outputs that pay to any of our +// addresses starting from the specified height. +// +// NOTE: The starting height must be at least the height of the wallet's +// birthday or later. +func (w *Wallet) recovery(startHeight int32) error { + log.Infof("RECOVERY MODE ENABLED -- rescanning for used addresses "+ + "with recovery_window=%d", w.recoveryWindow) + + // We'll initialize the recovery manager with a default batch size of + // 2000. + recoveryMgr := NewRecoveryManager( + w.recoveryWindow, recoveryBatchSize, w.chainParams, + ) + + // In the event that this recovery is being resumed, we will need to + // repopulate all found addresses from the database. For basic recovery, + // we will only do so for the default scopes. + scopedMgrs, err := w.defaultScopeManagers() + if err != nil { + return err + } + tx, err := w.db.BeginReadWriteTx() + if err != nil { + return err + } + txMgrNS := tx.ReadBucket(wtxmgrNamespaceKey) + credits, err := w.TxStore.UnspentOutputs(txMgrNS) + if err != nil { + tx.Rollback() + return err + } + addrMgrNS := tx.ReadWriteBucket(waddrmgrNamespaceKey) + err = recoveryMgr.Resurrect(addrMgrNS, scopedMgrs, credits) + if err != nil { + tx.Rollback() + return err + } + + // We'll also retrieve our chain backend client in order to filter the + // blocks as we go. + chainClient, err := w.requireChainClient() + if err != nil { + tx.Rollback() + return err + } + + // We'll begin scanning the chain from the specified starting height. + // Since we assume that the lowest height we start with will at least be + // that of our birthday, we can just add every block we process from + // this point forward to the recovery batch. + err = w.scanChain(startHeight, func(height int32, + hash *chainhash.Hash, header *wire.BlockHeader) error { + + recoveryMgr.AddToBlockBatch(hash, height, header.Timestamp) + + // We'll checkpoint our current batch every 2K blocks, so we'll + // need to start a new database transaction. If our current + // batch is empty, then this will act as a NOP. + if height%recoveryBatchSize == 0 { + blockBatch := recoveryMgr.BlockBatch() + err := w.recoverDefaultScopes( + chainClient, tx, addrMgrNS, blockBatch, + recoveryMgr.State(), + ) + if err != nil { + return err + } + + // Clear the batch of all processed blocks. + recoveryMgr.ResetBlockBatch() + + if err := tx.Commit(); err != nil { + return err + } + + log.Infof("Recovered addresses from blocks %d-%d", + blockBatch[0].Height, + blockBatch[len(blockBatch)-1].Height) + + tx, err = w.db.BeginReadWriteTx() + if err != nil { + return err + } + addrMgrNS = tx.ReadWriteBucket(waddrmgrNamespaceKey) + } + + // Since the recovery in a way acts as a rescan, we'll update + // the wallet's tip to point to the current block so that we + // don't unnecessarily rescan the same block again later on. + return w.Manager.SetSyncedTo(addrMgrNS, &waddrmgr.BlockStamp{ + Hash: *hash, + Height: height, + Timestamp: header.Timestamp, + }) + }) + if err != nil { + tx.Rollback() + return err + } + + // Now that we've reached the chain tip, we can process our final batch + // with the remaining blocks if it did not reach its maximum size. + blockBatch := recoveryMgr.BlockBatch() + err = w.recoverDefaultScopes( + chainClient, tx, addrMgrNS, blockBatch, recoveryMgr.State(), + ) + if err != nil { + tx.Rollback() + return err + } + + // With the recovery complete, we can persist our new state and exit. + if err := tx.Commit(); err != nil { + tx.Rollback() + return err + } + + log.Infof("Recovered addresses from blocks %d-%d", blockBatch[0].Height, + blockBatch[len(blockBatch)-1].Height) + + return nil +} + // defaultScopeManagers fetches the ScopedKeyManagers from the wallet using the // default set of key scopes. func (w *Wallet) defaultScopeManagers() (