diff --git a/wallet/chainntfns.go b/wallet/chainntfns.go index b719cf9..58e83ec 100644 --- a/wallet/chainntfns.go +++ b/wallet/chainntfns.go @@ -35,17 +35,6 @@ func (w *Wallet) handleChainNotifications() { return } - sync := func(w *Wallet, birthdayStamp *waddrmgr.BlockStamp) { - // At the moment there is no recourse if the rescan fails for - // some reason, however, the wallet will not be marked synced - // and many methods will error early since the wallet is known - // to be out of date. - err := w.syncWithChain(birthdayStamp) - if err != nil && !w.ShuttingDown() { - log.Warnf("Unable to synchronize wallet to chain: %v", err) - } - } - catchUpHashes := func(w *Wallet, client chain.Interface, height int32) error { // TODO(aakselrod): There's a race conditon here, which @@ -119,14 +108,16 @@ func (w *Wallet) handleChainNotifications() { chainClient, birthdayStore, ) if err != nil && !waddrmgr.IsError(err, waddrmgr.ErrBirthdayBlockNotSet) { - err := fmt.Errorf("unable to sanity "+ + panic(fmt.Errorf("Unable to sanity "+ "check wallet birthday block: %v", - err) - log.Error(err) - panic(err) + err)) } - go sync(w, birthdayBlock) + err = w.syncWithChain(birthdayBlock) + if err != nil && !w.ShuttingDown() { + panic(fmt.Errorf("Unable to synchronize "+ + "wallet to chain: %v", err)) + } case chain.BlockConnected: err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { return w.connectBlock(tx, wtxmgr.BlockMeta(n)) diff --git a/wallet/rescan.go b/wallet/rescan.go index e3deb5d..1e5dbf4 100644 --- a/wallet/rescan.go +++ b/wallet/rescan.go @@ -56,7 +56,11 @@ type rescanBatch struct { func (w *Wallet) SubmitRescan(job *RescanJob) <-chan error { errChan := make(chan error, 1) job.err = errChan - w.rescanAddJob <- job + select { + case w.rescanAddJob <- job: + case <-w.quitChan(): + errChan <- ErrWalletShuttingDown + } return errChan } @@ -103,10 +107,11 @@ func (b *rescanBatch) done(err error) { // submissions, and possibly batching many waiting requests together so they // can be handled by a single rescan after the current one completes. func (w *Wallet) rescanBatchHandler() { + defer w.wg.Done() + var curBatch, nextBatch *rescanBatch quit := w.quitChan() -out: for { select { case job := <-w.rescanAddJob: @@ -114,7 +119,12 @@ out: // Set current batch as this job and send // request. curBatch = job.batch() - w.rescanBatch <- curBatch + select { + case w.rescanBatch <- curBatch: + case <-quit: + job.err <- ErrWalletShuttingDown + return + } } else { // Create next batch if it doesn't exist, or // merge the job. @@ -134,9 +144,16 @@ out: "currently running") continue } - w.rescanProgress <- &RescanProgressMsg{ + select { + case w.rescanProgress <- &RescanProgressMsg{ Addresses: curBatch.addrs, Notification: n, + }: + case <-quit: + for _, errChan := range curBatch.errChans { + errChan <- ErrWalletShuttingDown + } + return } case *chain.RescanFinished: @@ -146,15 +163,29 @@ out: "currently running") continue } - w.rescanFinished <- &RescanFinishedMsg{ + select { + case w.rescanFinished <- &RescanFinishedMsg{ Addresses: curBatch.addrs, Notification: n, + }: + case <-quit: + for _, errChan := range curBatch.errChans { + errChan <- ErrWalletShuttingDown + } + return } curBatch, nextBatch = nextBatch, nil if curBatch != nil { - w.rescanBatch <- curBatch + select { + case w.rescanBatch <- curBatch: + case <-quit: + for _, errChan := range curBatch.errChans { + errChan <- ErrWalletShuttingDown + } + return + } } default: @@ -163,11 +194,9 @@ out: } case <-quit: - break out + return } } - - w.wg.Done() } // rescanProgressHandler handles notifications for partially and fully completed @@ -280,5 +309,10 @@ func (w *Wallet) rescanWithTarget(addrs []btcutil.Address, } // Submit merged job and block until rescan completes. - return <-w.SubmitRescan(job) + select { + case err := <-w.SubmitRescan(job): + return err + case <-w.quitChan(): + return ErrWalletShuttingDown + } } diff --git a/wallet/wallet.go b/wallet/wallet.go index 1c5755f..9622bee 100644 --- a/wallet/wallet.go +++ b/wallet/wallet.go @@ -54,13 +54,18 @@ const ( recoveryBatchSize = 2000 ) -// ErrNotSynced describes an error where an operation cannot complete -// due wallet being out of sync (and perhaps currently syncing with) -// the remote chain server. -var ErrNotSynced = errors.New("wallet is not synchronized with the chain server") - -// Namespace bucket keys. var ( + // ErrNotSynced describes an error where an operation cannot complete + // due wallet being out of sync (and perhaps currently syncing with) + // the remote chain server. + ErrNotSynced = errors.New("wallet is not synchronized with the chain server") + + // ErrWalletShuttingDown is an error returned when we attempt to make a + // request to the wallet but it is in the process of or has already shut + // down. + ErrWalletShuttingDown = errors.New("wallet shutting down") + + // Namespace bucket keys. waddrmgrNamespaceKey = []byte("waddrmgr") wtxmgrNamespaceKey = []byte("wtxmgr") )