diff --git a/account.go b/account.go index 3a43d72..00630f7 100644 --- a/account.go +++ b/account.go @@ -19,7 +19,6 @@ package main import ( "bytes" "encoding/base64" - "errors" "fmt" "github.com/conformal/btcutil" "github.com/conformal/btcwallet/tx" @@ -29,12 +28,10 @@ import ( "sync" ) -// ErrNotFound describes an error where a map lookup failed due to a -// key not being in the map. -var ErrNotFound = errors.New("not found") - // addressAccountMap holds a map of addresses to names of the // accounts that hold each address. +// +// TODO: move this to AccountManager var addressAccountMap = struct { sync.RWMutex m map[string]string @@ -64,28 +61,18 @@ func LookupAccountByAddress(address string) (string, error) { // Account is a structure containing all the components for a // complete wallet. It contains the Armory-style wallet (to store -// addresses and keys), and tx and utxo data stores, along with locks -// to prevent against incorrect multiple access. +// addresses and keys), and tx and utxo stores, and a mutex to prevent +// incorrect multiple access. type Account struct { - *wallet.Wallet - mtx sync.RWMutex name string fullRescan bool - UtxoStore struct { - sync.RWMutex - s tx.UtxoStore - } - TxStore struct { - sync.RWMutex - s tx.TxStore - } + *wallet.Wallet + tx.UtxoStore + tx.TxStore } // Lock locks the underlying wallet for an account. func (a *Account) Lock() error { - a.mtx.Lock() - defer a.mtx.Unlock() - switch err := a.Wallet.Lock(); err { case nil: NotifyWalletLockStateChange(a.Name(), true) @@ -102,9 +89,6 @@ func (a *Account) Lock() error { // Unlock unlocks the underlying wallet for an account. func (a *Account) Unlock(passphrase []byte) error { - a.mtx.Lock() - defer a.mtx.Unlock() - if err := a.Wallet.Unlock(passphrase); err != nil { return err } @@ -113,26 +97,6 @@ func (a *Account) Unlock(passphrase []byte) error { return nil } -// Rollback reverts each stored Account to a state before the block -// with the passed chainheight and block hash was connected to the main -// chain. This is used to remove transactions and utxos for each wallet -// that occured on a chain no longer considered to be the main chain. -func (a *Account) Rollback(height int32, hash *btcwire.ShaHash) { - a.UtxoStore.Lock() - modified := a.UtxoStore.s.Rollback(height, hash) - a.UtxoStore.Unlock() - if modified { - a.ScheduleUtxoStoreWrite() - } - - a.TxStore.Lock() - modified = a.TxStore.s.Rollback(height, hash) - a.TxStore.Unlock() - if modified { - a.ScheduleTxStoreWrite() - } -} - // AddressUsed returns whether there are any recorded transactions spending to // a given address. Assumming correct TxStore usage, this will return true iff // there are any transactions with outputs to this address in the blockchain or @@ -142,13 +106,10 @@ func (a *Account) AddressUsed(addr btcutil.Address) bool { // opening an account, and keeping it up to date each time a new // received tx arrives. - a.TxStore.RLock() - defer a.TxStore.RUnlock() - pkHash := addr.ScriptAddress() - for i := range a.TxStore.s { - rtx, ok := a.TxStore.s[i].(*tx.RecvTx) + for i := range a.TxStore { + rtx, ok := a.TxStore[i].(*tx.RecvTx) if !ok { continue } @@ -170,22 +131,19 @@ func (a *Account) AddressUsed(addr btcutil.Address) bool { // the balance will be calculated based on how many how many blocks // include a UTXO. func (a *Account) CalculateBalance(confirms int) float64 { - var bal uint64 // Measured in satoshi - bs, err := GetCurBlock() if bs.Height == int32(btcutil.BlockHeightUnknown) || err != nil { return 0. } - a.UtxoStore.RLock() - for _, u := range a.UtxoStore.s { + var bal uint64 // Measured in satoshi + for _, u := range a.UtxoStore { // Utxos not yet in blocks (height -1) should only be // added if confirmations is 0. if confirms == 0 || (u.Height != -1 && int(bs.Height-u.Height+1) >= confirms) { bal += u.Amt } } - a.UtxoStore.RUnlock() return float64(bal) / float64(btcutil.SatoshiPerBitcoin) } @@ -199,15 +157,13 @@ func (a *Account) CalculateBalance(confirms int) float64 { // the balance will be calculated based on how many how many blocks // include a UTXO. func (a *Account) CalculateAddressBalance(addr *btcutil.AddressPubKeyHash, confirms int) float64 { - var bal uint64 // Measured in satoshi - bs, err := GetCurBlock() if bs.Height == int32(btcutil.BlockHeightUnknown) || err != nil { return 0. } - a.UtxoStore.RLock() - for _, u := range a.UtxoStore.s { + var bal uint64 // Measured in satoshi + for _, u := range a.UtxoStore { // Utxos not yet in blocks (height -1) should only be // added if confirmations is 0. if confirms == 0 || (u.Height != -1 && int(bs.Height-u.Height+1) >= confirms) { @@ -216,7 +172,6 @@ func (a *Account) CalculateAddressBalance(addr *btcutil.AddressPubKeyHash, confi } } } - a.UtxoStore.RUnlock() return float64(bal) / float64(btcutil.SatoshiPerBitcoin) } @@ -225,9 +180,7 @@ func (a *Account) CalculateAddressBalance(addr *btcutil.AddressPubKeyHash, confi // one transaction spending to it in the blockchain or btcd mempool), the next // chained address is returned. func (a *Account) CurrentAddress() (btcutil.Address, error) { - a.mtx.RLock() addr := a.Wallet.LastChainedAddress() - a.mtx.RUnlock() // Get next chained address if the last one has already been used. if a.AddressUsed(addr) { @@ -243,10 +196,7 @@ func (a *Account) CurrentAddress() (btcutil.Address, error) { // replies. func (a *Account) ListSinceBlock(since, curBlockHeight int32, minconf int) ([]map[string]interface{}, error) { var txInfoList []map[string]interface{} - a.TxStore.RLock() - defer a.TxStore.RUnlock() - - for _, tx := range a.TxStore.s { + for _, tx := range a.TxStore { // check block number. if since != -1 && tx.Height() <= since { continue @@ -271,15 +221,13 @@ func (a *Account) ListTransactions(from, count int) ([]map[string]interface{}, e } var txInfoList []map[string]interface{} - a.TxStore.RLock() - lastLookupIdx := len(a.TxStore.s) - count + lastLookupIdx := len(a.TxStore) - count // Search in reverse order: lookup most recently-added first. - for i := len(a.TxStore.s) - 1; i >= from && i >= lastLookupIdx; i-- { + for i := len(a.TxStore) - 1; i >= from && i >= lastLookupIdx; i-- { txInfoList = append(txInfoList, - a.TxStore.s[i].TxInfo(a.name, bs.Height, a.Net())...) + a.TxStore[i].TxInfo(a.name, bs.Height, a.Net())...) } - a.TxStore.RUnlock() return txInfoList, nil } @@ -298,10 +246,8 @@ func (a *Account) ListAddressTransactions(pkHashes map[string]struct{}) ( } var txInfoList []map[string]interface{} - a.TxStore.RLock() - - for i := range a.TxStore.s { - rtx, ok := a.TxStore.s[i].(*tx.RecvTx) + for i := range a.TxStore { + rtx, ok := a.TxStore[i].(*tx.RecvTx) if !ok { continue } @@ -310,7 +256,6 @@ func (a *Account) ListAddressTransactions(pkHashes map[string]struct{}) ( txInfoList = append(txInfoList, info...) } } - a.TxStore.RUnlock() return txInfoList, nil } @@ -326,15 +271,12 @@ func (a *Account) ListAllTransactions() ([]map[string]interface{}, error) { return nil, err } - var txInfoList []map[string]interface{} - a.TxStore.RLock() - // Search in reverse order: lookup most recently-added first. - for i := len(a.TxStore.s) - 1; i >= 0; i-- { + var txInfoList []map[string]interface{} + for i := len(a.TxStore) - 1; i >= 0; i-- { txInfoList = append(txInfoList, - a.TxStore.s[i].TxInfo(a.name, bs.Height, a.Net())...) + a.TxStore[i].TxInfo(a.name, bs.Height, a.Net())...) } - a.TxStore.RUnlock() return txInfoList, nil } @@ -342,9 +284,6 @@ func (a *Account) ListAllTransactions() ([]map[string]interface{}, error) { // DumpPrivKeys returns the WIF-encoded private keys for all addresses with // private keys in a wallet. func (a *Account) DumpPrivKeys() ([]string, error) { - a.mtx.RLock() - defer a.mtx.RUnlock() - // Iterate over each active address, appending the private // key to privkeys. var privkeys []string @@ -367,9 +306,6 @@ func (a *Account) DumpPrivKeys() ([]string, error) { // DumpWIFPrivateKey returns the WIF encoded private key for a // single wallet address. func (a *Account) DumpWIFPrivateKey(addr btcutil.Address) (string, error) { - a.mtx.RLock() - defer a.mtx.RUnlock() - // Get private key from wallet if it exists. key, err := a.Wallet.AddressKey(addr) if err != nil { @@ -387,58 +323,19 @@ func (a *Account) DumpWIFPrivateKey(addr btcutil.Address) (string, error) { return btcutil.EncodePrivateKey(key.D.Bytes(), a.Net(), info.Compressed) } -// ImportPrivKey imports a WIF-encoded private key into an account's wallet. -// This function is not recommended, as it gives no hints as to when the -// address first appeared (not just in the blockchain, but since the address -// was first generated, or made public), and will cause all future rescans to -// start from the genesis block. -func (a *Account) ImportPrivKey(wif string, rescan bool) error { - bs := &wallet.BlockStamp{} - addr, err := a.ImportWIFPrivateKey(wif, bs) - if err != nil { - return err - } - - if rescan { - // Do not wait for rescan to finish before returning to the - // caller. - go func() { - addrs := map[string]struct{}{ - addr: struct{}{}, - } - - Rescan(CurrentRPCConn(), bs.Height, addrs) - a.WriteScheduledToDisk() - }() - } - return nil -} - -// ImportWIFPrivateKey takes a WIF-encoded private key and adds it to the -// wallet. If the import is successful, the payment address string is -// returned. -func (a *Account) ImportWIFPrivateKey(wif string, bs *wallet.BlockStamp) (string, error) { - // Decode WIF private key and perform sanity checking. - privkey, net, compressed, err := btcutil.DecodePrivateKey(wif) - if err != nil { - return "", err - } - if net != a.Net() { - return "", errors.New("wrong network") - } - +// ImportPrivateKey imports a private key to the account's wallet and +// writes the new wallet to disk. +func (a *Account) ImportPrivateKey(pk []byte, compressed bool, bs *wallet.BlockStamp) (string, error) { // Attempt to import private key into wallet. - a.mtx.Lock() - addr, err := a.Wallet.ImportPrivateKey(privkey, compressed, bs) - a.mtx.Unlock() + addr, err := a.Wallet.ImportPrivateKey(pk, compressed, bs) if err != nil { return "", err } addrStr := addr.String() // Immediately write wallet to disk. - a.ScheduleWalletWrite() - if err := a.WriteScheduledToDisk(); err != nil { + AcctMgr.ds.ScheduleWalletWrite(a) + if err := AcctMgr.ds.FlushAccount(a); err != nil { return "", fmt.Errorf("cannot write account: %v", err) } @@ -451,15 +348,24 @@ func (a *Account) ImportWIFPrivateKey(wif string, bs *wallet.BlockStamp) (string return addrStr, nil } +// ExportToDirectory writes an account to a special export directory. Any +// previous files are overwritten. +func (a *Account) ExportToDirectory(dirBaseName string) error { + dir := filepath.Join(networkDir(cfg.Net()), dirBaseName) + if err := checkCreateDir(dir); err != nil { + return err + } + + return AcctMgr.ds.ExportAccount(a, dir) +} + // ExportWatchingWallet returns a new account with a watching wallet // exported by this a's wallet. Both wallets share the same tx and utxo // stores, so locking one will lock the other as well. The returned account // should be exported quickly, either to file or to an rpc caller, and then // dropped from scope. func (a *Account) ExportWatchingWallet() (*Account, error) { - a.mtx.RLock() ww, err := a.Wallet.ExportWatchingWallet() - a.mtx.RUnlock() if err != nil { return nil, err } @@ -475,27 +381,20 @@ func (a *Account) exportBase64() (map[string]string, error) { buf := &bytes.Buffer{} m := make(map[string]string) - a.mtx.RLock() _, err := a.Wallet.WriteTo(buf) - a.mtx.RUnlock() if err != nil { return nil, err } m["wallet"] = base64.StdEncoding.EncodeToString(buf.Bytes()) buf.Reset() - a.TxStore.RLock() - _, err = a.TxStore.s.WriteTo(buf) - a.TxStore.RUnlock() - if err != nil { + if _, err = a.TxStore.WriteTo(buf); err != nil { return nil, err } m["tx"] = base64.StdEncoding.EncodeToString(buf.Bytes()) buf.Reset() - a.UtxoStore.RLock() - _, err = a.UtxoStore.s.WriteTo(buf) - a.UtxoStore.RUnlock() + _, err = a.UtxoStore.WriteTo(buf) if err != nil { return nil, err } @@ -518,16 +417,14 @@ func (a *Account) Track() { i++ } - err := NotifyNewTXs(CurrentRPCConn(), addrstrs) + err := NotifyNewTXs(CurrentServerConn(), addrstrs) if err != nil { log.Error("Unable to request transaction updates for address.") } - a.UtxoStore.RLock() - for _, utxo := range a.UtxoStore.s { + for _, utxo := range a.UtxoStore { ReqSpentUtxoNtfn(utxo) } - a.UtxoStore.RUnlock() } // RescanActiveAddresses requests btcd to rescan the blockchain for new @@ -538,7 +435,6 @@ func (a *Account) Track() { func (a *Account) RescanActiveAddresses() { // Determine the block to begin the rescan from. beginBlock := int32(0) - a.mtx.RLock() if a.fullRescan { // Need to perform a complete rescan since the wallet creation // block. @@ -556,19 +452,16 @@ func (a *Account) RescanActiveAddresses() { // If we're synced with block x, must scan the blocks x+1 to best block. beginBlock = bs.Height + 1 } - a.mtx.RUnlock() // Rescan active addresses starting at the determined block height. - Rescan(CurrentRPCConn(), beginBlock, a.ActivePaymentAddresses()) - a.WriteScheduledToDisk() + Rescan(CurrentServerConn(), beginBlock, a.ActivePaymentAddresses()) + AcctMgr.ds.FlushAccount(a) } // SortedActivePaymentAddresses returns a slice of all active payment // addresses in an account. func (a *Account) SortedActivePaymentAddresses() []string { - a.mtx.RLock() infos := a.Wallet.SortedActiveAddresses() - a.mtx.RUnlock() addrs := make([]string, len(infos)) for i, info := range infos { @@ -581,9 +474,7 @@ func (a *Account) SortedActivePaymentAddresses() []string { // ActivePaymentAddresses returns a set of all active pubkey hashes // in an account. func (a *Account) ActivePaymentAddresses() map[string]struct{} { - a.mtx.RLock() infos := a.ActiveAddresses() - a.mtx.RUnlock() addrs := make(map[string]struct{}, len(infos)) for _, info := range infos { @@ -602,16 +493,14 @@ func (a *Account) NewAddress() (btcutil.Address, error) { } // Get next address from wallet. - a.mtx.Lock() addr, err := a.Wallet.NextChainedAddress(&bs, cfg.KeypoolSize) - a.mtx.Unlock() if err != nil { return nil, err } // Immediately write updated wallet to disk. - a.ScheduleWalletWrite() - if err := a.WriteScheduledToDisk(); err != nil { + AcctMgr.ds.ScheduleWalletWrite(a) + if err := AcctMgr.ds.FlushAccount(a); err != nil { return nil, fmt.Errorf("account write failed: %v", err) } @@ -628,17 +517,13 @@ func (a *Account) NewAddress() (btcutil.Address, error) { func (a *Account) RecoverAddresses(n int) error { // Get info on the last chained address. The rescan starts at the // earliest block height the last chained address might appear at. - a.mtx.RLock() last := a.Wallet.LastChainedAddress() lastInfo, err := a.Wallet.AddressInfo(last) - a.mtx.RUnlock() if err != nil { return err } - a.mtx.Lock() addrs, err := a.Wallet.ExtendActiveAddresses(n, cfg.KeypoolSize) - a.mtx.Unlock() if err != nil { return err } @@ -649,7 +534,7 @@ func (a *Account) RecoverAddresses(n int) error { m[addrs[i].EncodeAddress()] = struct{}{} } go func(addrs map[string]struct{}) { - jsonErr := Rescan(CurrentRPCConn(), lastInfo.FirstBlock, addrs) + jsonErr := Rescan(CurrentServerConn(), lastInfo.FirstBlock, addrs) if jsonErr != nil { log.Errorf("Rescanning for recovered addresses failed: %v", jsonErr.Message) @@ -670,7 +555,7 @@ func (a *Account) ReqNewTxsForAddress(addr btcutil.Address) { log.Debugf("Requesting notifications of TXs sending to address %v", apkh) - err := NotifyNewTXs(CurrentRPCConn(), []string{apkh.EncodeAddress()}) + err := NotifyNewTXs(CurrentServerConn(), []string{apkh.EncodeAddress()}) if err != nil { log.Error("Unable to request transaction updates for address.") } @@ -682,7 +567,7 @@ func ReqSpentUtxoNtfn(u *tx.Utxo) { log.Debugf("Requesting spent UTXO notifications for Outpoint hash %s index %d", u.Out.Hash, u.Out.Index) - NotifySpent(CurrentRPCConn(), (*btcwire.OutPoint)(&u.Out)) + NotifySpent(CurrentServerConn(), (*btcwire.OutPoint)(&u.Out)) } // accountdir returns the directory containing an account's wallet, utxo, diff --git a/accountstore.go b/accountstore.go deleted file mode 100644 index bcd9099..0000000 --- a/accountstore.go +++ /dev/null @@ -1,548 +0,0 @@ -/* - * Copyright (c) 2013, 2014 Conformal Systems LLC - * - * Permission to use, copy, modify, and distribute this software for any - * purpose with or without fee is hereby granted, provided that the above - * copyright notice and this permission notice appear in all copies. - * - * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - */ - -package main - -import ( - "bytes" - "errors" - "fmt" - "github.com/conformal/btcutil" - "github.com/conformal/btcwallet/tx" - "github.com/conformal/btcwallet/wallet" - "github.com/conformal/btcwire" - "os" - "sync" -) - -// Errors relating to accounts. -var ( - ErrAcctExists = errors.New("account already exists") - ErrAcctNotExist = errors.New("account does not exist") -) - -var accountstore = NewAccountStore() - -// AccountStore stores all wallets currently being handled by -// btcwallet. Wallet are stored in a map with the account name as the -// key. A RWMutex is used to protect against incorrect concurrent -// access. -type AccountStore struct { - sync.RWMutex - accounts map[string]*Account -} - -// NewAccountStore returns an initialized and empty AccountStore. -func NewAccountStore() *AccountStore { - return &AccountStore{ - accounts: make(map[string]*Account), - } -} - -// Account returns the account specified by name, or ErrAcctNotExist -// as an error if the account is not found. -func (store *AccountStore) Account(name string) (*Account, error) { - store.RLock() - defer store.RUnlock() - - account, ok := store.accounts[name] - if !ok { - return nil, ErrAcctNotExist - } - return account, nil -} - -// Rollback rolls back each Account saved in the store. -func (store *AccountStore) Rollback(height int32, hash *btcwire.ShaHash) { - log.Debugf("Rolling back tx history since block height %v hash %v", - height, hash) - - store.RLock() - defer store.RUnlock() - - for _, account := range store.accounts { - account.Rollback(height, hash) - } -} - -// BlockNotify runs after btcwallet is notified of a new block connected to -// the best chain. It notifies all frontends of any changes from the new -// block, including changed balances. Each account is then set to be synced -// with the latest block. -func (store *AccountStore) BlockNotify(bs *wallet.BlockStamp) { - store.RLock() - defer store.RUnlock() - - for name, a := range store.accounts { - // TODO: need a flag or check that the utxo store was actually - // modified, or this will notify even if there are no balance - // changes, or sending these notifications as the utxos are added. - confirmed := a.CalculateBalance(1) - unconfirmed := a.CalculateBalance(0) - confirmed - NotifyWalletBalance(frontendNotificationMaster, a.name, confirmed) - NotifyWalletBalanceUnconfirmed(frontendNotificationMaster, a.name, - unconfirmed) - - // If this is the default account, update the block all accounts - // are synced with, and schedule a wallet write. - if name == "" { - a.mtx.Lock() - a.Wallet.SetSyncedWith(bs) - a.mtx.Unlock() - a.ScheduleWalletWrite() - } - } -} - -// RecordMinedTx searches through each account's TxStore, searching for a -// sent transaction with the same txid as from a txmined notification. If -// the transaction IDs match, the record in the TxStore is updated with -// the full information about the newly-mined tx, and the TxStore is -// scheduled to be written to disk.. -func (store *AccountStore) RecordMinedTx(txid *btcwire.ShaHash, - blkhash *btcwire.ShaHash, blkheight int32, blkindex int, - blktime int64) error { - - store.RLock() - defer store.RUnlock() - - for _, account := range store.accounts { - // The tx stores will be searched through while holding the - // reader lock, and the writer will only be grabbed if necessary. - account.TxStore.RLock() - - // Search in reverse order. Since more recently-created - // transactions are appended to the end of the store, it's - // more likely to find it when searching from the end. - for i := len(account.TxStore.s) - 1; i >= 0; i-- { - sendtx, ok := account.TxStore.s[i].(*tx.SendTx) - if ok { - if bytes.Equal(txid.Bytes(), sendtx.TxID[:]) { - account.TxStore.RUnlock() - - account.TxStore.Lock() - copy(sendtx.BlockHash[:], blkhash.Bytes()) - sendtx.BlockHeight = blkheight - sendtx.BlockIndex = int32(blkindex) - sendtx.BlockTime = blktime - account.TxStore.Unlock() - - account.ScheduleTxStoreWrite() - - return nil - } - } - } - - account.TxStore.RUnlock() - } - - return errors.New("txid does not match any recorded sent transaction") -} - -// CalculateBalance returns the balance, calculated using minconf -// block confirmations, of an account. -func (store *AccountStore) CalculateBalance(account string, - minconf int) (float64, error) { - - a, err := store.Account(account) - if err != nil { - return 0, err - } - - return a.CalculateBalance(minconf), nil -} - -// CreateEncryptedWallet creates a new account with a wallet file -// encrypted with passphrase. -func (store *AccountStore) CreateEncryptedWallet(name, desc string, passphrase []byte) error { - store.RLock() - _, ok := store.accounts[name] - store.RUnlock() - if ok { - return ErrAcctExists - } - - // Get current block's height and hash. - bs, err := GetCurBlock() - if err != nil { - return err - } - - // Create new wallet in memory. - wlt, err := wallet.NewWallet(name, desc, passphrase, cfg.Net(), &bs, cfg.KeypoolSize) - if err != nil { - return err - } - - // Create new account with the wallet. A new JSON ID is set for - // transaction notifications. - account := &Account{ - Wallet: wlt, - name: name, - } - account.ScheduleWalletWrite() - account.ScheduleTxStoreWrite() - account.ScheduleUtxoStoreWrite() - - // Mark all active payment addresses as belonging to this account. - for addr := range account.ActivePaymentAddresses() { - MarkAddressForAccount(addr, name) - } - - // Save the account in the global account map. The mutex is - // already held at this point, and will be unlocked when this - // func returns. - store.Lock() - store.accounts[name] = account - store.Unlock() - - // Begin tracking account against a connected btcd. - // - // TODO(jrick): this should *only* happen if btcd is connected. - account.Track() - - // Ensure that the account is written out to disk. - if err := account.WriteScheduledToDisk(); err != nil { - return err - } - - return nil -} - -// ChangePassphrase unlocks all account wallets with the old -// passphrase, and re-encrypts each using the new passphrase. -// -// TODO(jrick): this is a perfect example of how awful the account -// locking is. It must be replaced. -func (store *AccountStore) ChangePassphrase(old, new []byte) error { - // Due to the undefined order of ranging over the accountstore - // map and how all account wallet writer locks are grabbed - // simultaneously and unlocked with a defer, this function is - // unsafe to call simulateously with other accountstore functions, - // even though the store itself is not modified. - store.Lock() - defer store.Unlock() - - if err := store.changePassphrase(old, new); err != nil { - return err - } - - // Immediately write out to disk. - return store.WriteAllToDisk() -} - -// changePassphrase changes all passphrases for all accounts without grabbing -// any accountstore locks. -func (store *AccountStore) changePassphrase(old, new []byte) error { - // Check that each account can be unlocked with the old passphrase. - for _, a := range store.accounts { - a.mtx.Lock() - defer a.mtx.Unlock() - - if locked := a.Wallet.IsLocked(); !locked { - if err := a.Wallet.Lock(); err != nil { - return err - } - } - - if err := a.Wallet.Unlock(old); err != nil { - return err - } - defer a.Wallet.Lock() - } - - // Change passphrase for each unlocked wallet. - for _, a := range store.accounts { - if err := a.Wallet.ChangePassphrase(new); err != nil { - return err - } - } - - return nil -} - -// LockWallets locks all account's wallets in the store. -func (store *AccountStore) LockWallets() error { - store.RLock() - defer store.RUnlock() - - for _, a := range store.accounts { - if err := a.Lock(); err != nil { - return err - } - } - - return nil -} - -// UnlockWallets unlocks all account's wallets in the store with the provided -// passphrase. If any wallet unlocks fail, all successfully unlocked wallets -// are locked again. -func (store *AccountStore) UnlockWallets(passphrase string) error { - store.RLock() - defer store.RUnlock() - - unlockedAccts := make([]*Account, 0, len(store.accounts)) - for _, a := range store.accounts { - if err := a.Unlock([]byte(passphrase)); err != nil { - for _, ua := range unlockedAccts { - ua.Lock() - } - return fmt.Errorf("cannot unlock account %v: %v", - a.name, err) - } - unlockedAccts = append(unlockedAccts, a) - } - - return nil -} - -// DumpKeys returns all WIF-encoded private keys associated with all -// accounts. All wallets must be unlocked for this operation to succeed. -func (store *AccountStore) DumpKeys() ([]string, error) { - store.RLock() - defer store.RUnlock() - - var keys []string - for _, a := range store.accounts { - switch walletKeys, err := a.DumpPrivKeys(); err { - case wallet.ErrWalletLocked: - return nil, err - - case nil: - keys = append(keys, walletKeys...) - - default: // any other non-nil error - return nil, err - } - - } - return keys, nil -} - -// DumpWIFPrivateKey searches through all accounts for the bitcoin -// payment address addr and returns the WIF-encdoded private key. -func (store *AccountStore) DumpWIFPrivateKey(addr btcutil.Address) (string, error) { - store.RLock() - defer store.RUnlock() - - for _, a := range store.accounts { - switch wif, err := a.DumpWIFPrivateKey(addr); err { - case wallet.ErrAddressNotFound: - // Move on to the next account. - continue - - case nil: - return wif, nil - - default: // all other non-nil errors - return "", err - } - } - - return "", errors.New("address does not refer to a key") -} - -// NotifyBalances notifies a wallet frontend of all confirmed and unconfirmed -// account balances. -func (store *AccountStore) NotifyBalances(frontend chan []byte) { - store.RLock() - defer store.RUnlock() - - for _, account := range store.accounts { - balance := account.CalculateBalance(1) - unconfirmed := account.CalculateBalance(0) - balance - NotifyWalletBalance(frontend, account.name, balance) - NotifyWalletBalanceUnconfirmed(frontend, account.name, unconfirmed) - } -} - -// ListAccounts returns a map of account names to their current account -// balances. The balances are calculated using minconf confirmations. -func (store *AccountStore) ListAccounts(minconf int) map[string]float64 { - store.RLock() - defer store.RUnlock() - - // Create and fill a map of account names and their balances. - pairs := make(map[string]float64) - for name, a := range store.accounts { - pairs[name] = a.CalculateBalance(minconf) - } - return pairs -} - -// ListSinceBlock returns a slice of maps of strings to interface containing -// structures defining all transactions in the wallets since the given block. -// To be used for the listsinceblock command. -func (store *AccountStore) ListSinceBlock(since, curBlockHeight int32, minconf int) ([]map[string]interface{}, error) { - store.RLock() - defer store.RUnlock() - - // Create and fill a map of account names and their balances. - txInfoList := []map[string]interface{}{} - for _, a := range store.accounts { - txTmp, err := a.ListSinceBlock(since, curBlockHeight, minconf) - if err != nil { - return nil, err - } - txInfoList = append(txInfoList, txTmp...) - } - return txInfoList, nil -} - -// RescanActiveAddresses begins a rescan for all active addresses for -// each account. -// -// TODO(jrick): batch addresses for all accounts together so multiple -// rescan commands can be avoided. -func (store *AccountStore) RescanActiveAddresses() { - store.RLock() - defer store.RUnlock() - - for _, account := range store.accounts { - account.RescanActiveAddresses() - } -} - -// Track begins tracking all addresses in all accounts for updates from -// btcd. -func (store *AccountStore) Track() { - store.RLock() - defer store.RUnlock() - - for _, account := range store.accounts { - account.Track() - } -} - -// WalletOpenError is a special error type so problems opening wallet -// files can be differentiated (by a type assertion) from other errors. -type WalletOpenError struct { - Err string -} - -// Error satisifies the builtin error interface. -func (e *WalletOpenError) Error() string { - return e.Err -} - -// OpenAccount opens an account described by account in the data -// directory specified by cfg. If the wallet does not exist, ErrNoWallet -// is returned as an error. -// -// Wallets opened from this function are not set to track against a -// btcd connection. -func (store *AccountStore) OpenAccount(name string, cfg *config) error { - wlt := new(wallet.Wallet) - - a := &Account{ - Wallet: wlt, - name: name, - } - - netdir := networkDir(cfg.Net()) - if err := checkCreateDir(netdir); err != nil { - return err - } - - wfilepath := accountFilename("wallet.bin", name, netdir) - utxofilepath := accountFilename("utxo.bin", name, netdir) - txfilepath := accountFilename("tx.bin", name, netdir) - var wfile, utxofile, txfile *os.File - - // Read wallet file. - wfile, err := os.Open(wfilepath) - if err != nil { - if os.IsNotExist(err) { - // Must create and save wallet first. - return ErrNoWallet - } - msg := fmt.Sprintf("cannot open wallet file: %s", err) - return &WalletOpenError{msg} - } - defer wfile.Close() - - if _, err = wlt.ReadFrom(wfile); err != nil { - msg := fmt.Sprintf("cannot read wallet: %s", err) - return &WalletOpenError{msg} - } - - // Read tx file. If this fails, return a ErrNoTxs error and let - // the caller decide if a rescan is necessary. - var finalErr error - if txfile, err = os.Open(txfilepath); err != nil { - log.Errorf("cannot open tx file: %s", err) - // This is not a error we should immediately return with, - // but other errors can be more important, so only return - // this if none of the others are hit. - finalErr = ErrNoTxs - } else { - defer txfile.Close() - var txs tx.TxStore - if _, err = txs.ReadFrom(txfile); err != nil { - log.Errorf("cannot read tx file: %s", err) - finalErr = ErrNoTxs - } else { - a.TxStore.s = txs - } - } - - // Read utxo file. If this fails, return a ErrNoUtxos error so a - // rescan can be done since the wallet creation block. - var utxos tx.UtxoStore - utxofile, err = os.Open(utxofilepath) - if err != nil { - log.Errorf("cannot open utxo file: %s", err) - finalErr = ErrNoUtxos - } else { - defer utxofile.Close() - if _, err = utxos.ReadFrom(utxofile); err != nil { - log.Errorf("cannot read utxo file: %s", err) - finalErr = ErrNoUtxos - } else { - a.UtxoStore.s = utxos - } - } - - store.Lock() - switch finalErr { - case ErrNoTxs: - // Do nothing special for now. This will be implemented when - // the tx history file is properly written. - store.accounts[name] = a - - case ErrNoUtxos: - // Add wallet, but mark wallet as needing a full rescan since - // the wallet creation block. This will take place when btcd - // connects. - a.fullRescan = true - store.accounts[name] = a - case nil: - store.accounts[name] = a - - default: - log.Warnf("cannot open wallet: %v", err) - } - store.Unlock() - - // Mark all active payment addresses as belonging to this account. - for addr := range a.ActivePaymentAddresses() { - MarkAddressForAccount(addr, name) - } - - return nil -} diff --git a/acctmgr.go b/acctmgr.go new file mode 100644 index 0000000..8a5159f --- /dev/null +++ b/acctmgr.go @@ -0,0 +1,494 @@ +/* + * Copyright (c) 2013, 2014 Conformal Systems LLC + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +package main + +import ( + "bytes" + "container/list" + "errors" + "fmt" + "github.com/conformal/btcutil" + "github.com/conformal/btcwallet/tx" + "github.com/conformal/btcwallet/wallet" + "github.com/conformal/btcwire" + "time" +) + +// Errors relating to accounts. +var ( + ErrAccountExists = errors.New("account already exists") + ErrWalletExists = errors.New("wallet already exists") + ErrNotFound = errors.New("not found") +) + +// AcctMgr is the global account manager for all opened accounts. +var AcctMgr = NewAccountManager() + +// AccountManager manages a collection of accounts. +type AccountManager struct { + // The accounts accessed through the account manager are not safe for + // concurrent access. The account manager therefore contains a + // binary semaphore channel to prevent incorrect access. + bsem chan struct{} + + accessAccount chan *accessAccountRequest + accessAll chan *accessAllRequest + add chan *Account + remove chan *Account + + ds *DiskSyncer // might move to inside Start +} + +// NewAccountManager returns a new AccountManager. +func NewAccountManager() *AccountManager { + am := &AccountManager{ + bsem: make(chan struct{}, 1), + accessAccount: make(chan *accessAccountRequest), + accessAll: make(chan *accessAllRequest), + add: make(chan *Account), + remove: make(chan *Account), + } + am.ds = NewDiskSyncer(am) + return am +} + +// Start maintains accounts and structures for quick lookups for account +// information. Access to these structures must be done through with the +// channels in the AccountManger struct fields. This function never returns +// and should be called as a new goroutine. +func (am *AccountManager) Start() { + // Ready the semaphore - can't grab unless the manager has started. + am.bsem <- struct{}{} + + // Start the account manager's disk syncer. + go am.ds.Start() + + // List and map of all accounts. + l := list.New() + m := make(map[string]*Account) + + wait := 10 * time.Second + timer := time.NewTimer(wait) + for { + select { + case access := <-am.accessAccount: + a, ok := m[access.name] + access.resp <- &accessAccountResponse{ + a: a, + ok: ok, + } + + case access := <-am.accessAll: + s := make([]*Account, 0, l.Len()) + for e := l.Front(); e != nil; e = e.Next() { + s = append(s, e.Value.(*Account)) + } + access.resp <- s + + case a := <-am.add: + if _, ok := m[a.name]; ok { + break + } + m[a.name] = a + l.PushBack(a) + + case a := <-am.remove: + if _, ok := m[a.name]; ok { + delete(m, a.name) + for e := l.Front(); e != nil; e = e.Next() { + v := e.Value.(*Account) + if v == a { + l.Remove(e) + break + } + } + } + + case <-timer.C: + if err := am.ds.FlushScheduled(); err != nil { + log.Errorf("Cannot write account: %v", err) + } + timer = time.NewTimer(wait) + } + } +} + +// Grab grabs the account manager's binary semaphore. +func (am *AccountManager) Grab() { + <-am.bsem +} + +// Release releases the account manager's binary semaphore. +func (am *AccountManager) Release() { + am.bsem <- struct{}{} +} + +type accessAccountRequest struct { + name string + resp chan *accessAccountResponse +} + +type accessAccountResponse struct { + a *Account + ok bool +} + +// Account returns the account specified by name, or ErrNotFound +// as an error if the account is not found. +func (am *AccountManager) Account(name string) (*Account, error) { + req := &accessAccountRequest{ + name: name, + resp: make(chan *accessAccountResponse), + } + am.accessAccount <- req + resp := <-req.resp + if !resp.ok { + return nil, ErrNotFound + } + return resp.a, nil +} + +type accessAllRequest struct { + resp chan []*Account +} + +// AllAccounts returns a slice of all managed accounts. +func (am *AccountManager) AllAccounts() []*Account { + req := &accessAllRequest{ + resp: make(chan []*Account), + } + am.accessAll <- req + return <-req.resp +} + +// AddAccount adds an account to the collection managed by an AccountManager. +func (am *AccountManager) AddAccount(a *Account) { + am.add <- a +} + +// RemoveAccount removes an account to the collection managed by an +// AccountManager. +func (am *AccountManager) RemoveAccount(a *Account) { + am.remove <- a +} + +// RegisterNewAccount adds a new memory account to the account manager, +// and immediately writes the account to disk. +func (am *AccountManager) RegisterNewAccount(a *Account) error { + am.AddAccount(a) + + // Ensure that the new account is written out to disk. + am.ds.ScheduleWalletWrite(a) + am.ds.ScheduleTxStoreWrite(a) + am.ds.ScheduleUtxoStoreWrite(a) + if err := am.ds.FlushAccount(a); err != nil { + am.RemoveAccount(a) + return err + } + return nil +} + +// Rollback rolls back each managed Account to the state before the block +// specified by height and hash was connected to the main chain. +func (am *AccountManager) Rollback(height int32, hash *btcwire.ShaHash) { + log.Debugf("Rolling back tx history since block height %v hash %v", + height, hash) + + for _, a := range am.AllAccounts() { + if a.UtxoStore.Rollback(height, hash) { + am.ds.ScheduleUtxoStoreWrite(a) + } + + if a.TxStore.Rollback(height, hash) { + am.ds.ScheduleTxStoreWrite(a) + } + } +} + +// Rollback reverts each stored Account to a state before the block +// with the passed chainheight and block hash was connected to the main +// chain. This is used to remove transactions and utxos for each wallet +// that occured on a chain no longer considered to be the main chain. +func (a *Account) Rollback(height int32, hash *btcwire.ShaHash) { +} + +// BlockNotify notifies all frontends of any changes from the new block, +// including changed balances. Each account is then set to be synced +// with the latest block. +func (am *AccountManager) BlockNotify(bs *wallet.BlockStamp) { + for _, a := range am.AllAccounts() { + // TODO: need a flag or check that the utxo store was actually + // modified, or this will notify even if there are no balance + // changes, or sending these notifications as the utxos are added. + confirmed := a.CalculateBalance(1) + unconfirmed := a.CalculateBalance(0) - confirmed + NotifyWalletBalance(frontendNotificationMaster, a.name, confirmed) + NotifyWalletBalanceUnconfirmed(frontendNotificationMaster, a.name, + unconfirmed) + + // If this is the default account, update the block all accounts + // are synced with, and schedule a wallet write. + if a.Name() == "" { + a.Wallet.SetSyncedWith(bs) + am.ds.ScheduleWalletWrite(a) + } + } +} + +// RecordMinedTx searches through each account's TxStore, searching for a +// sent transaction with the same txid as from a txmined notification. If +// the transaction IDs match, the record in the TxStore is updated with +// the full information about the newly-mined tx, and the TxStore is +// scheduled to be written to disk.. +func (am *AccountManager) RecordMinedTx(txid *btcwire.ShaHash, + blkhash *btcwire.ShaHash, blkheight int32, blkindex int, + blktime int64) error { + + for _, a := range am.AllAccounts() { + // Search in reverse order. Since more recently-created + // transactions are appended to the end of the store, it's + // more likely to find it when searching from the end. + for i := len(a.TxStore) - 1; i >= 0; i-- { + sendtx, ok := a.TxStore[i].(*tx.SendTx) + if ok { + if bytes.Equal(txid.Bytes(), sendtx.TxID[:]) { + copy(sendtx.BlockHash[:], blkhash.Bytes()) + sendtx.BlockHeight = blkheight + sendtx.BlockIndex = int32(blkindex) + sendtx.BlockTime = blktime + + am.ds.ScheduleTxStoreWrite(a) + + return nil + } + } + } + } + + return errors.New("txid does not match any recorded sent transaction") +} + +// CalculateBalance returns the balance, calculated using minconf block +// confirmations, of an account. +func (am *AccountManager) CalculateBalance(account string, minconf int) (float64, error) { + a, err := am.Account(account) + if err != nil { + return 0, err + } + + return a.CalculateBalance(minconf), nil +} + +// CreateEncryptedWallet creates a new default account with a wallet file +// encrypted with passphrase. +func (am *AccountManager) CreateEncryptedWallet(passphrase []byte) error { + if len(am.AllAccounts()) != 0 { + return ErrWalletExists + } + + // Get current block's height and hash. + bs, err := GetCurBlock() + if err != nil { + return err + } + + // Create new wallet in memory. + wlt, err := wallet.NewWallet("", "Default acccount", passphrase, + cfg.Net(), &bs, cfg.KeypoolSize) + if err != nil { + return err + } + + // Create new account and begin managing with the global account + // manager. Registering will fail if the new account can not be + // written immediately to disk. + a := &Account{ + Wallet: wlt, + } + if err := am.RegisterNewAccount(a); err != nil { + return err + } + + // Mark all active payment addresses as belonging to this account. + // + // TODO(jrick) move this to the account manager + for addr := range a.ActivePaymentAddresses() { + MarkAddressForAccount(addr, "") + } + + // Begin tracking account against a connected btcd. + a.Track() + + return nil +} + +// ChangePassphrase unlocks all account wallets with the old +// passphrase, and re-encrypts each using the new passphrase. +func (am *AccountManager) ChangePassphrase(old, new []byte) error { + accts := am.AllAccounts() + + for _, a := range accts { + if locked := a.Wallet.IsLocked(); !locked { + if err := a.Wallet.Lock(); err != nil { + return err + } + } + + if err := a.Wallet.Unlock(old); err != nil { + return err + } + defer a.Wallet.Lock() + } + + // Change passphrase for each unlocked wallet. + for _, a := range accts { + if err := a.Wallet.ChangePassphrase(new); err != nil { + return err + } + } + + // Immediately write out to disk. + return am.ds.WriteBatch(accts) +} + +// LockWallets locks all managed account wallets. +func (am *AccountManager) LockWallets() error { + for _, a := range am.AllAccounts() { + if err := a.Lock(); err != nil { + return err + } + } + + return nil +} + +// UnlockWallets unlocks all managed account's wallets. If any wallet unlocks +// fail, all successfully unlocked wallets are locked again. +func (am *AccountManager) UnlockWallets(passphrase string) error { + accts := am.AllAccounts() + unlockedAccts := make([]*Account, 0, len(accts)) + + for _, a := range accts { + if err := a.Unlock([]byte(passphrase)); err != nil { + for _, ua := range unlockedAccts { + ua.Lock() + } + return fmt.Errorf("cannot unlock account %v: %v", + a.name, err) + } + unlockedAccts = append(unlockedAccts, a) + } + + return nil +} + +// DumpKeys returns all WIF-encoded private keys associated with all +// accounts. All wallets must be unlocked for this operation to succeed. +func (am *AccountManager) DumpKeys() ([]string, error) { + var keys []string + for _, a := range am.AllAccounts() { + switch walletKeys, err := a.DumpPrivKeys(); err { + case wallet.ErrWalletLocked: + return nil, err + + case nil: + keys = append(keys, walletKeys...) + + default: // any other non-nil error + return nil, err + } + + } + return keys, nil +} + +// DumpWIFPrivateKey searches through all accounts for the bitcoin +// payment address addr and returns the WIF-encdoded private key. +func (am *AccountManager) DumpWIFPrivateKey(addr btcutil.Address) (string, error) { + for _, a := range am.AllAccounts() { + switch wif, err := a.DumpWIFPrivateKey(addr); err { + case wallet.ErrAddressNotFound: + // Move on to the next account. + continue + + case nil: + return wif, nil + + default: // all other non-nil errors + return "", err + } + } + + return "", errors.New("address does not refer to a key") +} + +// NotifyBalances notifies a wallet frontend of all confirmed and unconfirmed +// account balances. +func (am *AccountManager) NotifyBalances(frontend chan []byte) { + for _, a := range am.AllAccounts() { + balance := a.CalculateBalance(1) + unconfirmed := a.CalculateBalance(0) - balance + NotifyWalletBalance(frontend, a.name, balance) + NotifyWalletBalanceUnconfirmed(frontend, a.name, unconfirmed) + } +} + +// ListAccounts returns a map of account names to their current account +// balances. The balances are calculated using minconf confirmations. +func (am *AccountManager) ListAccounts(minconf int) map[string]float64 { + // Create and fill a map of account names and their balances. + pairs := make(map[string]float64) + for _, a := range am.AllAccounts() { + pairs[a.name] = a.CalculateBalance(minconf) + } + return pairs +} + +// ListSinceBlock returns a slice of maps of strings to interface containing +// structures defining all transactions in the wallets since the given block. +// To be used for the listsinceblock command. +func (am *AccountManager) ListSinceBlock(since, curBlockHeight int32, minconf int) ([]map[string]interface{}, error) { + // Create and fill a map of account names and their balances. + txInfoList := []map[string]interface{}{} + for _, a := range am.AllAccounts() { + txTmp, err := a.ListSinceBlock(since, curBlockHeight, minconf) + if err != nil { + return nil, err + } + txInfoList = append(txInfoList, txTmp...) + } + return txInfoList, nil +} + +// RescanActiveAddresses begins a rescan for all active addresses for +// each account. +// +// TODO(jrick): batch addresses for all accounts together so multiple +// rescan commands can be avoided. +func (am *AccountManager) RescanActiveAddresses() { + for _, account := range am.AllAccounts() { + account.RescanActiveAddresses() + } +} + +// Track begins tracking all addresses in all accounts for updates from +// btcd. +func (am *AccountManager) Track() { + for _, a := range am.AllAccounts() { + a.Track() + } +} diff --git a/cmd.go b/cmd.go index 91bb815..ed93725 100644 --- a/cmd.go +++ b/cmd.go @@ -18,8 +18,10 @@ package main import ( "errors" + "fmt" "github.com/conformal/btcjson" "github.com/conformal/btcutil" + "github.com/conformal/btcwallet/tx" "github.com/conformal/btcwallet/wallet" "github.com/conformal/btcwire" "io/ioutil" @@ -72,7 +74,7 @@ func GetCurBlock() (bs wallet.BlockStamp, err error) { return bs, nil } - bb, _ := GetBestBlock(CurrentRPCConn()) + bb, _ := GetBestBlock(CurrentServerConn()) if bb == nil { return wallet.BlockStamp{ Height: int32(btcutil.BlockHeightUnknown), @@ -149,6 +151,8 @@ func main() { // Check and update any old file locations. updateOldFileLocations() + go AcctMgr.Start() + // Open all account saved to disk. OpenAccounts() @@ -159,9 +163,6 @@ func main() { os.Exit(1) } - // Start account disk syncer goroutine. - go AccountDiskSyncer() - go func() { s, err := newServer(cfg.SvrListeners) if err != nil { @@ -177,6 +178,10 @@ func main() { // Begin generating new IDs for JSON calls. go JSONIDGenerator(NewJSONID) + // Begin RPC server goroutines. + go RPCGateway() + go WalletRequestProcessor() + // Begin maintanence goroutines. go SendBeforeReceiveHistorySync(SendTxHistSyncChans.add, SendTxHistSyncChans.done, @@ -207,8 +212,8 @@ func main() { case conn := <-updateBtcd: btcd = conn - case access := <-accessRPC: - access.rpc <- btcd + case access := <-accessServer: + access.server <- btcd } } }() @@ -246,6 +251,99 @@ func main() { } } +// WalletOpenError is a special error type so problems opening wallet +// files can be differentiated (by a type assertion) from other errors. +type WalletOpenError struct { + Err string +} + +// Error satisifies the builtin error interface. +func (e *WalletOpenError) Error() string { + return e.Err +} + +// OpenSavedAccount opens a named account from disk. If the wallet does not +// exist, ErrNoWallet is returned as an error. +func OpenSavedAccount(name string, cfg *config) (*Account, error) { + netdir := networkDir(cfg.Net()) + if err := checkCreateDir(netdir); err != nil { + return nil, err + } + + wlt := new(wallet.Wallet) + a := &Account{ + Wallet: wlt, + name: name, + } + + wfilepath := accountFilename("wallet.bin", name, netdir) + utxofilepath := accountFilename("utxo.bin", name, netdir) + txfilepath := accountFilename("tx.bin", name, netdir) + var wfile, utxofile, txfile *os.File + + // Read wallet file. + wfile, err := os.Open(wfilepath) + if err != nil { + if os.IsNotExist(err) { + // Must create and save wallet first. + return nil, ErrNoWallet + } + msg := fmt.Sprintf("cannot open wallet file: %s", err) + return nil, &WalletOpenError{msg} + } + defer wfile.Close() + + if _, err = wlt.ReadFrom(wfile); err != nil { + msg := fmt.Sprintf("cannot read wallet: %s", err) + return nil, &WalletOpenError{msg} + } + + // Read tx file. If this fails, return a ErrNoTxs error and let + // the caller decide if a rescan is necessary. + var finalErr error + if txfile, err = os.Open(txfilepath); err != nil { + log.Errorf("cannot open tx file: %s", err) + // This is not a error we should immediately return with, + // but other errors can be more important, so only return + // this if none of the others are hit. + finalErr = ErrNoTxs + } else { + defer txfile.Close() + var txs tx.TxStore + if _, err = txs.ReadFrom(txfile); err != nil { + log.Errorf("cannot read tx file: %s", err) + finalErr = ErrNoTxs + } else { + a.TxStore = txs + } + } + + // Read utxo file. If this fails, return a ErrNoUtxos error so a + // rescan can be done since the wallet creation block. + var utxos tx.UtxoStore + utxofile, err = os.Open(utxofilepath) + if err != nil { + log.Errorf("cannot open utxo file: %s", err) + finalErr = ErrNoUtxos + a.fullRescan = true + } else { + defer utxofile.Close() + if _, err = utxos.ReadFrom(utxofile); err != nil { + log.Errorf("cannot read utxo file: %s", err) + finalErr = ErrNoUtxos + } else { + a.UtxoStore = utxos + } + } + + // Mark all active payment addresses as belonging to this account. + for addr := range a.ActivePaymentAddresses() { + MarkAddressForAccount(addr, name) + } + + return a, finalErr +} + // OpenAccounts attempts to open all saved accounts. func OpenAccounts() { // If the network (account) directory is missing, but the temporary @@ -265,7 +363,8 @@ func OpenAccounts() { // The default account must exist, or btcwallet acts as if no // wallets/accounts have been created yet. - if err := accountstore.OpenAccount("", cfg); err != nil { + a, err := OpenSavedAccount("", cfg) + if err != nil { switch err.(type) { case *WalletOpenError: log.Errorf("Default account wallet file unreadable: %v", err) @@ -275,6 +374,7 @@ func OpenAccounts() { log.Warnf("Non-critical problem opening an account file: %v", err) } } + AcctMgr.AddAccount(a) // Read all filenames in the account directory, and look for any // filenames matching '*-wallet.bin'. These are wallets for @@ -305,7 +405,8 @@ func OpenAccounts() { // Log txstore/utxostore errors as these will be recovered // from with a rescan, but wallet errors must be returned // to the caller. - if err := accountstore.OpenAccount(a, cfg); err != nil { + a, err := OpenSavedAccount(a, cfg) + if err != nil { switch err.(type) { case *WalletOpenError: log.Errorf("Error opening account's wallet: %v", err) @@ -313,24 +414,26 @@ func OpenAccounts() { default: log.Warnf("Non-critical error opening an account file: %v", err) } + } else { + AcctMgr.AddAccount(a) } } } -var accessRPC = make(chan *AccessCurrentRPCConn) +var accessServer = make(chan *AccessCurrentServerConn) -// AccessCurrentRPCConn is used to access the current RPC connection +// AccessCurrentServerConn is used to access the current RPC connection // from the goroutine managing btcd-side RPC connections. -type AccessCurrentRPCConn struct { - rpc chan RPCConn +type AccessCurrentServerConn struct { + server chan ServerConn } -// CurrentRPCConn returns the most recently-connected btcd-side +// CurrentServerConn returns the most recently-connected btcd-side // RPC connection. -func CurrentRPCConn() RPCConn { - access := &AccessCurrentRPCConn{ - rpc: make(chan RPCConn), +func CurrentServerConn() ServerConn { + access := &AccessCurrentServerConn{ + server: make(chan ServerConn), } - accessRPC <- access - return <-access.rpc + accessServer <- access + return <-access.server } diff --git a/createtx.go b/createtx.go index 62ac9d6..d204de8 100644 --- a/createtx.go +++ b/createtx.go @@ -158,11 +158,6 @@ func selectInputs(s tx.UtxoStore, amt uint64, minconf int) (inputs []*tx.Utxo, b // block hash) Utxo. ErrInsufficientFunds is returned if there are not // enough eligible unspent outputs to create the transaction. func (a *Account) txToPairs(pairs map[string]int64, minconf int) (*CreatedTx, error) { - // Recorded unspent transactions should not be modified until this - // finishes. - a.UtxoStore.RLock() - defer a.UtxoStore.RUnlock() - // Create a new transaction which will include all input scripts. msgtx := btcwire.NewMsgTx() @@ -224,13 +219,13 @@ func (a *Account) txToPairs(pairs map[string]int64, minconf int) (*CreatedTx, er // Get the number of satoshis to increment fee by when searching for // the minimum tx fee needed. - var fee int64 = 0 + fee := int64(0) for { msgtx = txNoInputs.Copy() // Select unspent outputs to be used in transaction based on the amount // neededing to sent, and the current fee estimation. - inputs, btcin, err := selectInputs(a.UtxoStore.s, uint64(amt+fee), + inputs, btcin, err := selectInputs(a.UtxoStore, uint64(amt+fee), minconf) if err != nil { return nil, err diff --git a/createtx_test.go b/createtx_test.go index 5e5b8e1..b9f5070 100644 --- a/createtx_test.go +++ b/createtx_test.go @@ -109,7 +109,7 @@ func TestFakeTxs(t *testing.T) { utxo.Subscript = tx.PkScript(ss) utxo.Amt = 1000000 utxo.Height = 12345 - a.UtxoStore.s = append(a.UtxoStore.s, utxo) + a.UtxoStore = append(a.UtxoStore, utxo) // Fake our current block height so btcd doesn't need to be queried. curBlock.BlockStamp.Height = 12346 diff --git a/disksync.go b/disksync.go index dc1b164..1536150 100644 --- a/disksync.go +++ b/disksync.go @@ -22,7 +22,6 @@ import ( "io/ioutil" "os" "path/filepath" - "time" ) // networkDir returns the directory name of a network directory to hold account @@ -122,9 +121,9 @@ func newSyncSchedule(dir string) *syncSchedule { return s } -// FlushAccount writes all scheduled account files to disk for +// flushAccount writes all scheduled account files to disk for // a single account and removes them from the schedule. -func (s *syncSchedule) FlushAccount(a *Account) error { +func (s *syncSchedule) flushAccount(a *Account) error { if _, ok := s.utxos[a]; ok { if err := a.writeUtxoStore(s.dir); err != nil { return err @@ -147,9 +146,9 @@ func (s *syncSchedule) FlushAccount(a *Account) error { return nil } -// Flush writes all scheduled account files and removes each +// flush writes all scheduled account files and removes each // from the schedule. -func (s *syncSchedule) Flush() error { +func (s *syncSchedule) flush() error { for a := range s.utxos { if err := a.writeUtxoStore(s.dir); err != nil { return err @@ -174,22 +173,16 @@ func (s *syncSchedule) Flush() error { return nil } -// Channels for AccountDiskSyncer. -var ( - scheduleWalletWrite = make(chan *Account) - scheduleTxStoreWrite = make(chan *Account) - scheduleUtxoStoreWrite = make(chan *Account) - syncBatch = make(chan *syncBatchRequest) - syncAccount = make(chan *syncRequest) - exportAccount = make(chan *exportRequest) -) +type flushScheduledRequest struct { + err chan error +} -type syncRequest struct { +type flushAccountRequest struct { a *Account err chan error } -type syncBatchRequest struct { +type writeBatchRequest struct { a []*Account err chan error } @@ -200,14 +193,48 @@ type exportRequest struct { err chan error } -// AccountDiskSyncer manages a set of "dirty" account files which must -// be written to disk, and synchronizes all writes in a single goroutine. -// After 10 seconds since the latest sync, all unwritten files are written -// and removed. Writes for a single account may be scheduled immediately by -// calling WriteScheduledToDisk. +// DiskSyncer manages all disk write operations for a collection of accounts. +type DiskSyncer struct { + // Flush scheduled account writes. + flushScheduled chan *flushScheduledRequest + flushAccount chan *flushAccountRequest + + // Schedule file writes for an account. + scheduleWallet chan *Account + scheduleTxStore chan *Account + scheduleUtxoStore chan *Account + + // Write a collection of accounts all at once. + writeBatch chan *writeBatchRequest + + // Write an account export. + exportAccount chan *exportRequest + + // Account manager for this DiskSyncer. This is only + // needed to grab the account manager semaphore. + am *AccountManager +} + +// NewDiskSyncer creates a new DiskSyncer. +func NewDiskSyncer(am *AccountManager) *DiskSyncer { + return &DiskSyncer{ + flushScheduled: make(chan *flushScheduledRequest), + flushAccount: make(chan *flushAccountRequest), + scheduleWallet: make(chan *Account), + scheduleTxStore: make(chan *Account), + scheduleUtxoStore: make(chan *Account), + writeBatch: make(chan *writeBatchRequest), + exportAccount: make(chan *exportRequest), + am: am, + } +} + +// Start starts the disk syncer. It manages a set of "dirty" account files +// which must be written to disk, and synchronizes all writes in a single +// goroutine. Periodic flush operations may be signaled by an AccountManager. // -// This never returns and is meant to be called from a goroutine. -func AccountDiskSyncer() { +// This never returns and is should be called from a new goroutine. +func (ds *DiskSyncer) Start() { netdir := networkDir(cfg.Net()) if err := checkCreateDir(netdir); err != nil { log.Errorf("Unable to create or write to account directory: %v", err) @@ -215,22 +242,24 @@ func AccountDiskSyncer() { tmpnetdir := tmpNetworkDir(cfg.Net()) schedule := newSyncSchedule(netdir) - ticker := time.Tick(10 * time.Second) for { select { - case a := <-scheduleWalletWrite: + case fr := <-ds.flushScheduled: + fr.err <- schedule.flush() + + case fr := <-ds.flushAccount: + fr.err <- schedule.flushAccount(fr.a) + + case a := <-ds.scheduleWallet: schedule.wallets[a] = struct{}{} - case a := <-scheduleTxStoreWrite: + case a := <-ds.scheduleTxStore: schedule.txs[a] = struct{}{} - case a := <-scheduleUtxoStoreWrite: + case a := <-ds.scheduleUtxoStore: schedule.utxos[a] = struct{}{} - case sr := <-syncAccount: - sr.err <- schedule.FlushAccount(sr.a) - - case sr := <-syncBatch: + case sr := <-ds.writeBatch: err := batchWriteAccounts(sr.a, tmpnetdir, netdir) if err == nil { // All accounts have been synced, old schedule @@ -239,40 +268,71 @@ func AccountDiskSyncer() { } sr.err <- err - case er := <-exportAccount: + case er := <-ds.exportAccount: a := er.a dir := er.dir er.err <- a.writeAll(dir) - - case <-ticker: - if err := schedule.Flush(); err != nil { - log.Errorf("Cannot write account: %v", err) - } } } } -// WriteAllToDisk writes all account files for all accounts at once. Unlike -// writing individual account files, this causes each account file to be -// written to a new network directory to replace the old one. Use this -// function when it is needed to ensure an all or nothing write for all -// account files. -// -// It is a runtime error to call this without holding the store writer lock. -func (store *AccountStore) WriteAllToDisk() error { - accts := make([]*Account, 0, len(store.accounts)) - for _, a := range store.accounts { - accts = append(accts, a) - } +// FlushScheduled writes all scheduled account files to disk. +func (ds *DiskSyncer) FlushScheduled() error { + ds.am.Grab() + err := make(chan error) + ds.flushScheduled <- &flushScheduledRequest{err} + ds.am.Release() + return <-err +} - err := make(chan error, 1) - syncBatch <- &syncBatchRequest{ - a: accts, +// FlushAccount writes all scheduled account files to disk for a single +// account. +func (ds *DiskSyncer) FlushAccount(a *Account) error { + err := make(chan error) + ds.flushAccount <- &flushAccountRequest{a: a, err: err} + return <-err +} + +// ScheduleWalletWrite schedules an account's wallet to be written to disk. +func (ds *DiskSyncer) ScheduleWalletWrite(a *Account) { + ds.scheduleWallet <- a +} + +// ScheduleTxStoreWrite schedules an account's transaction store to be +// written to disk. +func (ds *DiskSyncer) ScheduleTxStoreWrite(a *Account) { + ds.scheduleTxStore <- a +} + +// ScheduleUtxoStoreWrite schedules an account's utxo store to be written +// to disk. +func (ds *DiskSyncer) ScheduleUtxoStoreWrite(a *Account) { + ds.scheduleUtxoStore <- a +} + +// WriteBatch safely replaces all account files in the network directory +// with new files created from all accounts in a. +func (ds *DiskSyncer) WriteBatch(a []*Account) error { + err := make(chan error) + ds.writeBatch <- &writeBatchRequest{ + a: a, err: err, } return <-err } +// ExportAccount writes all account files for a to a new directory. +func (ds *DiskSyncer) ExportAccount(a *Account, dir string) error { + err := make(chan error) + er := &exportRequest{ + dir: dir, + a: a, + err: err, + } + ds.exportAccount <- er + return <-err +} + func batchWriteAccounts(accts []*Account, tmpdir, netdir string) error { if err := freshDir(tmpdir); err != nil { return err @@ -294,52 +354,6 @@ func batchWriteAccounts(accts []*Account, tmpdir, netdir string) error { return nil } -// WriteScheduledToDisk signals AccountDiskSyncer to write all scheduled -// account files for a to disk now instead of waiting for the next sync -// interval. This function blocks until all the file writes for a have -// finished, and returns a non-nil error if any of the file writes failed. -func (a *Account) WriteScheduledToDisk() error { - err := make(chan error, 1) - syncAccount <- &syncRequest{ - a: a, - err: err, - } - return <-err -} - -// ScheduleWalletWrite schedules a write of an account's wallet file. -func (a *Account) ScheduleWalletWrite() { - scheduleWalletWrite <- a -} - -// ScheduleTxStoreWrite schedules a write of an account's tx store file. -func (a *Account) ScheduleTxStoreWrite() { - scheduleTxStoreWrite <- a -} - -// ScheduleUtxoStoreWrite schedules a write of an account's utxo store file. -func (a *Account) ScheduleUtxoStoreWrite() { - scheduleUtxoStoreWrite <- a -} - -// ExportToDirectory writes an account to a special export directory. Any -// previous files are overwritten. -func (a *Account) ExportToDirectory(dirBaseName string) error { - dir := filepath.Join(networkDir(cfg.Net()), dirBaseName) - if err := checkCreateDir(dir); err != nil { - return err - } - - err := make(chan error) - er := &exportRequest{ - dir: dir, - a: a, - err: err, - } - exportAccount <- er - return <-err -} - func (a *Account) writeAll(dir string) error { if err := a.writeUtxoStore(dir); err != nil { return err @@ -362,10 +376,7 @@ func (a *Account) writeWallet(dir string) error { } defer tmpfile.Close() - a.mtx.RLock() - _, err = a.Wallet.WriteTo(tmpfile) - a.mtx.RUnlock() - if err != nil { + if _, err = a.Wallet.WriteTo(tmpfile); err != nil { return err } @@ -385,10 +396,7 @@ func (a *Account) writeTxStore(dir string) error { } defer tmpfile.Close() - a.TxStore.RLock() - _, err = a.TxStore.s.WriteTo(tmpfile) - a.TxStore.RUnlock() - if err != nil { + if _, err = a.TxStore.WriteTo(tmpfile); err != nil { return err } @@ -408,10 +416,7 @@ func (a *Account) writeUtxoStore(dir string) error { } defer tmpfile.Close() - a.UtxoStore.RLock() - _, err = a.UtxoStore.s.WriteTo(tmpfile) - a.UtxoStore.RUnlock() - if err != nil { + if _, err = a.UtxoStore.WriteTo(tmpfile); err != nil { return err } diff --git a/btcdrpc.go b/ntfns.go similarity index 51% rename from btcdrpc.go rename to ntfns.go index 22bb666..c25f291 100644 --- a/btcdrpc.go +++ b/ntfns.go @@ -14,15 +14,12 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ -// This file implements the websocket RPC connection to a btcd instance. +// This file implements the notification handlers for btcd-side notifications. package main import ( - "code.google.com/p/go.net/websocket" "encoding/hex" - "encoding/json" - "errors" "github.com/conformal/btcjson" "github.com/conformal/btcutil" "github.com/conformal/btcwallet/tx" @@ -33,260 +30,7 @@ import ( "time" ) -// ErrBtcdDisconnected describes an error where an operation cannot -// successfully complete due to btcwallet not being connected to -// btcd. -var ErrBtcdDisconnected = btcjson.Error{ - Code: -1, - Message: "btcd disconnected", -} - -// BtcdRPCConn is a type managing a client connection to a btcd RPC server -// over websockets. -type BtcdRPCConn struct { - ws *websocket.Conn - addRequest chan *AddRPCRequest - closed chan struct{} -} - -// Ensure that BtcdRPCConn can be used as an RPCConn. -var _ RPCConn = &BtcdRPCConn{} - -// NewBtcdRPCConn creates a new RPC connection from a btcd websocket -// connection to btcd. -func NewBtcdRPCConn(ws *websocket.Conn) *BtcdRPCConn { - conn := &BtcdRPCConn{ - ws: ws, - addRequest: make(chan *AddRPCRequest), - closed: make(chan struct{}), - } - return conn -} - -// SendRequest sends an RPC request and returns a channel to read the response's -// result and error. Part of the RPCConn interface. -func (btcd *BtcdRPCConn) SendRequest(request *RPCRequest) chan *RPCResponse { - select { - case <-btcd.closed: - // The connection has closed, so instead of adding and sending - // a request, return a channel that just replies with the - // error for a disconnected btcd. - responseChan := make(chan *RPCResponse) - go func() { - response := &RPCResponse{ - Err: &ErrBtcdDisconnected, - } - responseChan <- response - }() - return responseChan - - default: - addRequest := &AddRPCRequest{ - Request: request, - ResponseChan: make(chan chan *RPCResponse, 1), - } - btcd.addRequest <- addRequest - return <-addRequest.ResponseChan - } -} - -// Connected returns whether the connection remains established to the RPC -// server. -// -// This function probably should be removed, as any checks for confirming -// the connection are no longer valid after the check and may result in -// races. -func (btcd *BtcdRPCConn) Connected() bool { - select { - case <-btcd.closed: - return false - - default: - return true - } -} - -// AddRPCRequest is used to add an RPCRequest to the pool of requests -// being manaaged by a btcd RPC connection. -type AddRPCRequest struct { - Request *RPCRequest - ResponseChan chan chan *RPCResponse -} - -// send performs the actual send of the marshaled request over the btcd -// websocket connection. -func (btcd *BtcdRPCConn) send(rpcrequest *RPCRequest) error { - // btcjson.Cmds define their own MarshalJSON which returns an error - // to satisify the json.Marshaler interface, but will never error. - mrequest, _ := rpcrequest.request.MarshalJSON() - return websocket.Message.Send(btcd.ws, mrequest) -} - -type receivedResponse struct { - id uint64 - raw string - reply *btcjson.Reply -} - -// Start starts the goroutines required to send RPC requests and listen for -// replies. -func (btcd *BtcdRPCConn) Start() { - done := btcd.closed - responses := make(chan *receivedResponse) - - // Maintain a map of JSON IDs to RPCRequests currently being waited on. - go func() { - m := make(map[uint64]*RPCRequest) - for { - select { - case addrequest := <-btcd.addRequest: - rpcrequest := addrequest.Request - m[rpcrequest.request.Id().(uint64)] = rpcrequest - - if err := btcd.send(rpcrequest); err != nil { - // Connection lost. - btcd.ws.Close() - close(done) - } - - addrequest.ResponseChan <- rpcrequest.response - - case recvResponse := <-responses: - rpcrequest, ok := m[recvResponse.id] - if !ok { - log.Warnf("Received unexpected btcd response") - continue - } - delete(m, recvResponse.id) - - // If no result var was set, create and send - // send the response unmarshaled by the json - // package. - if rpcrequest.result == nil { - response := &RPCResponse{ - Result: recvResponse.reply.Result, - Err: recvResponse.reply.Error, - } - rpcrequest.response <- response - continue - } - - // A return var was set, so unmarshal again - // into the var before sending the response. - r := &btcjson.Reply{ - Result: rpcrequest.result, - } - json.Unmarshal([]byte(recvResponse.raw), &r) - response := &RPCResponse{ - Result: r.Result, - Err: r.Error, - } - rpcrequest.response <- response - - case <-done: - for _, request := range m { - response := &RPCResponse{ - Err: &ErrBtcdDisconnected, - } - request.response <- response - } - return - } - } - }() - - // Listen for replies/notifications from btcd, and decide how to handle them. - go func() { - // Idea: instead of reading btcd messages from just one websocket - // connection, maybe use two so the same connection isn't used - // for both notifications and responses? Should make handling - // must faster as unnecessary unmarshal attempts could be avoided. - - for { - var m string - if err := websocket.Message.Receive(btcd.ws, &m); err != nil { - log.Debugf("Cannot receive btcd message: %v", err) - close(done) - return - } - - // Try notifications (requests with nil ids) first. - n, err := unmarshalNotification(m) - if err == nil { - // Begin processing the notification. - go processNotification(n, m) - continue - } - - // Must be a response. - r, err := unmarshalResponse(m) - if err == nil { - responses <- r - continue - } - - // Not sure what was received but it isn't correct. - log.Warnf("Received invalid message from btcd") - } - }() -} - -// unmarshalResponse attempts to unmarshal a marshaled JSON-RPC -// response. -func unmarshalResponse(s string) (*receivedResponse, error) { - var r btcjson.Reply - if err := json.Unmarshal([]byte(s), &r); err != nil { - return nil, err - } - - // Check for a valid ID. - if r.Id == nil { - return nil, errors.New("id is nil") - } - fid, ok := (*r.Id).(float64) - if !ok { - return nil, errors.New("id is not a number") - } - response := &receivedResponse{ - id: uint64(fid), - raw: s, - reply: &r, - } - return response, nil -} - -// unmarshalNotification attempts to unmarshal a marshaled JSON-RPC -// notification (Request with a nil or no ID). -func unmarshalNotification(s string) (btcjson.Cmd, error) { - req, err := btcjson.ParseMarshaledCmd([]byte(s)) - if err != nil { - return nil, err - } - - if req.Id() != nil { - return nil, errors.New("id is non-nil") - } - - return req, nil -} - -// processNotification checks for a handler for a notification, and sends -func processNotification(n btcjson.Cmd, s string) { - // Message is a btcd notification. Check the method and dispatch - // correct handler, or if no handler, pass up to each wallet. - if ntfnHandler, ok := notificationHandlers[n.Method()]; ok { - log.Debugf("Running notification handler for method %v", - n.Method()) - ntfnHandler(n, []byte(s)) - } else { - // No handler; send to all wallets. - log.Debugf("Sending notification with method %v to all wallets", - n.Method()) - frontendNotificationMaster <- []byte(s) - } -} - -type notificationHandler func(btcjson.Cmd, []byte) +type notificationHandler func(btcjson.Cmd) var notificationHandlers = map[string]notificationHandler{ btcws.BlockConnectedNtfnMethod: NtfnBlockConnected, @@ -297,7 +41,7 @@ var notificationHandlers = map[string]notificationHandler{ } // NtfnProcessedTx handles the btcws.ProcessedTxNtfn notification. -func NtfnProcessedTx(n btcjson.Cmd, marshaled []byte) { +func NtfnProcessedTx(n btcjson.Cmd) { ptn, ok := n.(*btcws.ProcessedTxNtfn) if !ok { log.Errorf("%v handler: unexpected type", n.Method()) @@ -332,8 +76,8 @@ func NtfnProcessedTx(n btcjson.Cmd, marshaled []byte) { log.Warnf("Received rescan result for unknown address %v", ptn.Receiver) return } - a, err := accountstore.Account(aname) - if err == ErrAcctNotExist { + a, err := AcctMgr.Account(aname) + if err == ErrNotFound { log.Errorf("Missing account for rescaned address %v", ptn.Receiver) } @@ -366,11 +110,8 @@ func NtfnProcessedTx(n btcjson.Cmd, marshaled []byte) { } // Record the tx history. - a.TxStore.Lock() - a.TxStore.s.InsertRecvTx(t) - a.TxStore.Unlock() - a.ScheduleTxStoreWrite() - + a.TxStore.InsertRecvTx(t) + AcctMgr.ds.ScheduleTxStoreWrite(a) // Notify frontends of tx. If the tx is unconfirmed, it is always // notified and the outpoint is marked as notified. If the outpoint // has already been notified and is now in a block, a txmined notifiction @@ -402,10 +143,8 @@ func NtfnProcessedTx(n btcjson.Cmd, marshaled []byte) { u.Out.Index = uint32(ptn.TxOutIndex) copy(u.AddrHash[:], receiver.ScriptAddress()) copy(u.BlockHash[:], blockHash[:]) - a.UtxoStore.Lock() - a.UtxoStore.s.Insert(u) - a.UtxoStore.Unlock() - a.ScheduleUtxoStoreWrite() + a.UtxoStore.Insert(u) + AcctMgr.ds.ScheduleUtxoStoreWrite(a) // If this notification came from mempool, notify frontends of // the new unconfirmed balance immediately. Otherwise, wait until @@ -431,7 +170,7 @@ func NtfnProcessedTx(n btcjson.Cmd, marshaled []byte) { // to mark wallet files with a possibly-better earliest block height, // and will greatly reduce rescan times for wallets created with an // out of sync btcd. -func NtfnBlockConnected(n btcjson.Cmd, marshaled []byte) { +func NtfnBlockConnected(n btcjson.Cmd) { bcn, ok := n.(*btcws.BlockConnectedNtfn) if !ok { log.Errorf("%v handler: unexpected type", n.Method()) @@ -465,16 +204,17 @@ func NtfnBlockConnected(n btcjson.Cmd, marshaled []byte) { wg.Wait() NotifyBalanceSyncerChans.remove <- *hash } - accountstore.BlockNotify(bs) + AcctMgr.BlockNotify(bs) // Pass notification to frontends too. + marshaled, _ := n.MarshalJSON() frontendNotificationMaster <- marshaled } // NtfnBlockDisconnected handles btcd notifications resulting from // blocks disconnected from the main chain in the event of a chain // switch and notifies frontends of the new blockchain height. -func NtfnBlockDisconnected(n btcjson.Cmd, marshaled []byte) { +func NtfnBlockDisconnected(n btcjson.Cmd) { bdn, ok := n.(*btcws.BlockDisconnectedNtfn) if !ok { log.Errorf("%v handler: unexpected type", n.Method()) @@ -487,15 +227,16 @@ func NtfnBlockDisconnected(n btcjson.Cmd, marshaled []byte) { } // Rollback Utxo and Tx data stores. - accountstore.Rollback(bdn.Height, hash) + AcctMgr.Rollback(bdn.Height, hash) // Pass notification to frontends too. + marshaled, _ := n.MarshalJSON() frontendNotificationMaster <- marshaled } // NtfnTxMined handles btcd notifications resulting from newly // mined transactions that originated from this wallet. -func NtfnTxMined(n btcjson.Cmd, marshaled []byte) { +func NtfnTxMined(n btcjson.Cmd) { tmn, ok := n.(*btcws.TxMinedNtfn) if !ok { log.Errorf("%v handler: unexpected type", n.Method()) @@ -513,7 +254,7 @@ func NtfnTxMined(n btcjson.Cmd, marshaled []byte) { return } - err = accountstore.RecordMinedTx(txid, blockhash, + err = AcctMgr.RecordMinedTx(txid, blockhash, tmn.BlockHeight, tmn.Index, tmn.BlockTime) if err != nil { log.Errorf("%v handler: %v", n.Method(), err) @@ -528,7 +269,7 @@ func NtfnTxMined(n btcjson.Cmd, marshaled []byte) { // NtfnTxSpent handles btcd txspent notifications resulting from a block // transaction being processed that spents a wallet UTXO. -func NtfnTxSpent(n btcjson.Cmd, marshaled []byte) { +func NtfnTxSpent(n btcjson.Cmd) { // TODO(jrick): This might actually be useless and maybe it shouldn't // be implemented. } diff --git a/rpc.go b/rpc.go index d1816b8..a8c350b 100644 --- a/rpc.go +++ b/rpc.go @@ -14,149 +14,104 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ -// This file implements the RPC connection interface and functions to -// communicate with a bitcoin RPC server. - package main import ( "github.com/conformal/btcjson" - "github.com/conformal/btcwire" - "github.com/conformal/btcws" ) -// RPCRequest is a type responsible for handling RPC requests and providing -// a method to access the response. -type RPCRequest struct { - request btcjson.Cmd - result interface{} - response chan *RPCResponse +// RPCResponse is an interface type covering both server +// (frontend <-> btcwallet) and client (btcwallet <-> btcd) responses. +type RPCResponse interface { + Result() interface{} + Error() *btcjson.Error } -// NewRPCRequest creates a new RPCRequest from a btcjson.Cmd. request may be -// nil to create a new var for the result (with types determined by the -// unmarshaling rules described in the json package), or set to a var with -// an expected type (i.e. *btcjson.BlockResult) to directly unmarshal the -// response's result into a convenient type. -func NewRPCRequest(request btcjson.Cmd, result interface{}) *RPCRequest { - return &RPCRequest{ +// ClientRequest is a type holding a bitcoin client's request and +// a channel to send the response. +type ClientRequest struct { + ws bool + request btcjson.Cmd + response chan RPCResponse +} + +// NewClientRequest creates a new ClientRequest from a btcjson.Cmd. +func NewClientRequest(request btcjson.Cmd, ws bool) *ClientRequest { + return &ClientRequest{ + ws: ws, request: request, - result: result, - response: make(chan *RPCResponse), + response: make(chan RPCResponse), } } -// RPCResponse holds a response's result and error returned from sending a -// RPCRequest. -type RPCResponse struct { +// Handle sends a client request to the RPC gateway for processing, +// and returns the result when handling is finished. +func (r *ClientRequest) Handle() (interface{}, *btcjson.Error) { + clientRequests <- r + resp := <-r.response + return resp.Result(), resp.Error() +} + +// ClientResponse holds a result and error returned from handling a +// client's request. +type ClientResponse struct { + result interface{} + err *btcjson.Error +} + +// Result returns the result of a response to a client. +func (r *ClientResponse) Result() interface{} { + return r.result +} + +// Error returns the error of a response to a client, or nil if +// there is no error. +func (r *ClientResponse) Error() *btcjson.Error { + return r.err +} + +// ServerRequest is a type responsible for handling requests to a bitcoin +// server and providing a method to access the response. +type ServerRequest struct { + request btcjson.Cmd + result interface{} + response chan RPCResponse +} + +// NewServerRequest creates a new ServerRequest from a btcjson.Cmd. request +// may be nil to create a new var for the result (with types determined by +// the unmarshaling rules described in the json package), or set to a var +// with an expected type (i.e. *btcjson.BlockResult) to directly unmarshal +// the response's result into a convenient type. +func NewServerRequest(request btcjson.Cmd, result interface{}) *ServerRequest { + return &ServerRequest{ + request: request, + result: result, + response: make(chan RPCResponse, 1), + } +} + +// ServerResponse holds a response's result and error returned from sending a +// ServerRequest. +type ServerResponse struct { // Result will be set to a concrete type (i.e. *btcjson.BlockResult) // and may be type asserted to that type if a non-nil result was used - // to create the originating RPCRequest. Otherwise, Result will be + // to create the originating ServerRequest. Otherwise, Result will be // set to new memory allocated by json.Unmarshal, and the type rules // for unmarshaling described in the json package should be followed // when type asserting Result. - Result interface{} + result interface{} // Err points to an unmarshaled error, or nil if result is valid. - Err *btcjson.Error + err *btcjson.Error } -// RPCConn is an interface representing a client connection to a bitcoin RPC -// server. -type RPCConn interface { - // SendRequest sends a bitcoin RPC request, returning a channel to - // read the reply. A channel is used so both synchronous and - // asynchronous RPC can be supported. - SendRequest(request *RPCRequest) chan *RPCResponse +// Result returns the result of a server's RPC response. +func (r *ServerResponse) Result() interface{} { + return r.result } -// GetBestBlockResult holds the result of a getbestblock response. -// -// TODO(jrick): shove this in btcws. -type GetBestBlockResult struct { - Hash string `json:"hash"` - Height int32 `json:"height"` -} - -// GetBestBlock gets both the block height and hash of the best block -// in the main chain. -func GetBestBlock(rpc RPCConn) (*GetBestBlockResult, *btcjson.Error) { - cmd := btcws.NewGetBestBlockCmd(<-NewJSONID) - request := NewRPCRequest(cmd, new(GetBestBlockResult)) - response := <-rpc.SendRequest(request) - if response.Err != nil { - return nil, response.Err - } - return response.Result.(*GetBestBlockResult), nil -} - -// GetBlock requests details about a block with the given hash. -func GetBlock(rpc RPCConn, blockHash string) (*btcjson.BlockResult, *btcjson.Error) { - // NewGetBlockCmd cannot fail with no optargs, so omit the check. - cmd, _ := btcjson.NewGetBlockCmd(<-NewJSONID, blockHash) - request := NewRPCRequest(cmd, new(btcjson.BlockResult)) - response := <-rpc.SendRequest(request) - if response.Err != nil { - return nil, response.Err - } - return response.Result.(*btcjson.BlockResult), nil -} - -// GetCurrentNet requests the network a bitcoin RPC server is running on. -func GetCurrentNet(rpc RPCConn) (btcwire.BitcoinNet, *btcjson.Error) { - cmd := btcws.NewGetCurrentNetCmd(<-NewJSONID) - request := NewRPCRequest(cmd, nil) - response := <-rpc.SendRequest(request) - if response.Err != nil { - return 0, response.Err - } - return btcwire.BitcoinNet(uint32(response.Result.(float64))), nil -} - -// NotifyBlocks requests blockconnected and blockdisconnected notifications. -func NotifyBlocks(rpc RPCConn) *btcjson.Error { - cmd := btcws.NewNotifyBlocksCmd(<-NewJSONID) - request := NewRPCRequest(cmd, nil) - response := <-rpc.SendRequest(request) - return response.Err -} - -// NotifyNewTXs requests notifications for new transactions that spend -// to any of the addresses in addrs. -func NotifyNewTXs(rpc RPCConn, addrs []string) *btcjson.Error { - cmd := btcws.NewNotifyNewTXsCmd(<-NewJSONID, addrs) - request := NewRPCRequest(cmd, nil) - response := <-rpc.SendRequest(request) - return response.Err -} - -// NotifySpent requests notifications for when a transaction is processed which -// spends op. -func NotifySpent(rpc RPCConn, op *btcwire.OutPoint) *btcjson.Error { - cmd := btcws.NewNotifySpentCmd(<-NewJSONID, op) - request := NewRPCRequest(cmd, nil) - response := <-rpc.SendRequest(request) - return response.Err -} - -// Rescan requests a blockchain rescan for transactions to any number of -// addresses and notifications to inform wallet about such transactions. -func Rescan(rpc RPCConn, beginBlock int32, addrs map[string]struct{}) *btcjson.Error { - // NewRescanCmd cannot fail with no optargs, so omit the check. - cmd, _ := btcws.NewRescanCmd(<-NewJSONID, beginBlock, addrs) - request := NewRPCRequest(cmd, nil) - response := <-rpc.SendRequest(request) - return response.Err -} - -// SendRawTransaction sends a hex-encoded transaction for relay. -func SendRawTransaction(rpc RPCConn, hextx string) (txid string, error *btcjson.Error) { - // NewSendRawTransactionCmd cannot fail, so omit the check. - cmd, _ := btcjson.NewSendRawTransactionCmd(<-NewJSONID, hextx) - request := NewRPCRequest(cmd, new(string)) - response := <-rpc.SendRequest(request) - if response.Err != nil { - return "", response.Err - } - return *response.Result.(*string), nil +// Result returns the error of a server's RPC response. +func (r *ServerResponse) Error() *btcjson.Error { + return r.err } diff --git a/rpcclient.go b/rpcclient.go new file mode 100644 index 0000000..6679607 --- /dev/null +++ b/rpcclient.go @@ -0,0 +1,363 @@ +/* + * Copyright (c) 2013, 2014 Conformal Systems LLC + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +// This file implements the websocket client connection to a bitcoin RPC +// server. + +package main + +import ( + "code.google.com/p/go.net/websocket" + "encoding/json" + "errors" + "github.com/conformal/btcjson" + "github.com/conformal/btcwire" + "github.com/conformal/btcws" +) + +// ServerConn is an interface representing a client connection to a bitcoin RPC +// server. +type ServerConn interface { + // SendRequest sends a bitcoin RPC request, returning a channel to + // read the reply. A channel is used so both synchronous and + // asynchronous RPC can be supported. + SendRequest(request *ServerRequest) chan RPCResponse +} + +// ErrBtcdDisconnected describes an error where an operation cannot +// successfully complete due to btcwallet not being connected to +// btcd. +var ErrBtcdDisconnected = btcjson.Error{ + Code: -1, + Message: "btcd disconnected", +} + +// BtcdRPCConn is a type managing a client connection to a btcd RPC server +// over websockets. +type BtcdRPCConn struct { + ws *websocket.Conn + addRequest chan *AddRPCRequest + closed chan struct{} +} + +// Ensure that BtcdRPCConn can be used as an RPCConn. +var _ ServerConn = &BtcdRPCConn{} + +// NewBtcdRPCConn creates a new RPC connection from a btcd websocket +// connection to btcd. +func NewBtcdRPCConn(ws *websocket.Conn) *BtcdRPCConn { + conn := &BtcdRPCConn{ + ws: ws, + addRequest: make(chan *AddRPCRequest), + closed: make(chan struct{}), + } + return conn +} + +// SendRequest sends an RPC request and returns a channel to read the response's +// result and error. Part of the RPCConn interface. +func (btcd *BtcdRPCConn) SendRequest(request *ServerRequest) chan RPCResponse { + select { + case <-btcd.closed: + // The connection has closed, so instead of adding and sending + // a request, return a channel that just replies with the + // error for a disconnected btcd. + responseChan := make(chan RPCResponse, 1) + response := &ServerResponse{ + err: &ErrBtcdDisconnected, + } + responseChan <- response + return responseChan + + default: + addRequest := &AddRPCRequest{ + Request: request, + ResponseChan: make(chan chan RPCResponse, 1), + } + btcd.addRequest <- addRequest + return <-addRequest.ResponseChan + } +} + +// Connected returns whether the connection remains established to the RPC +// server. +// +// This function probably should be removed, as any checks for confirming +// the connection are no longer valid after the check and may result in +// races. +func (btcd *BtcdRPCConn) Connected() bool { + select { + case <-btcd.closed: + return false + + default: + return true + } +} + +// AddRPCRequest is used to add an RPCRequest to the pool of requests +// being manaaged by a btcd RPC connection. +type AddRPCRequest struct { + Request *ServerRequest + ResponseChan chan chan RPCResponse +} + +// send performs the actual send of the marshaled request over the btcd +// websocket connection. +func (btcd *BtcdRPCConn) send(rpcrequest *ServerRequest) error { + // btcjson.Cmds define their own MarshalJSON which returns an error + // to satisify the json.Marshaler interface, but will never error. + mrequest, _ := rpcrequest.request.MarshalJSON() + return websocket.Message.Send(btcd.ws, mrequest) +} + +type receivedResponse struct { + id uint64 + raw string + reply *btcjson.Reply +} + +// Start starts the goroutines required to send RPC requests and listen for +// replies. +func (btcd *BtcdRPCConn) Start() { + done := btcd.closed + responses := make(chan *receivedResponse) + + // Maintain a map of JSON IDs to RPCRequests currently being waited on. + go func() { + m := make(map[uint64]*ServerRequest) + for { + select { + case addrequest := <-btcd.addRequest: + rpcrequest := addrequest.Request + m[rpcrequest.request.Id().(uint64)] = rpcrequest + + if err := btcd.send(rpcrequest); err != nil { + // Connection lost. + btcd.ws.Close() + close(done) + } + + addrequest.ResponseChan <- rpcrequest.response + + case recvResponse := <-responses: + rpcrequest, ok := m[recvResponse.id] + if !ok { + log.Warnf("Received unexpected btcd response") + continue + } + delete(m, recvResponse.id) + + // If no result var was set, create and send + // send the response unmarshaled by the json + // package. + if rpcrequest.result == nil { + response := &ServerResponse{ + result: recvResponse.reply.Result, + err: recvResponse.reply.Error, + } + rpcrequest.response <- response + continue + } + + // A return var was set, so unmarshal again + // into the var before sending the response. + r := &btcjson.Reply{ + Result: rpcrequest.result, + } + json.Unmarshal([]byte(recvResponse.raw), &r) + response := &ServerResponse{ + result: r.Result, + err: r.Error, + } + rpcrequest.response <- response + + case <-done: + for _, request := range m { + response := &ServerResponse{ + err: &ErrBtcdDisconnected, + } + request.response <- response + } + return + } + } + }() + + // Listen for replies/notifications from btcd, and decide how to handle them. + go func() { + // Idea: instead of reading btcd messages from just one websocket + // connection, maybe use two so the same connection isn't used + // for both notifications and responses? Should make handling + // must faster as unnecessary unmarshal attempts could be avoided. + + for { + var m string + if err := websocket.Message.Receive(btcd.ws, &m); err != nil { + log.Debugf("Cannot receive btcd message: %v", err) + close(done) + return + } + + // Try notifications (requests with nil ids) first. + n, err := unmarshalNotification(m) + if err == nil { + svrNtfns <- n + continue + } + + // Must be a response. + r, err := unmarshalResponse(m) + if err == nil { + responses <- r + continue + } + + // Not sure what was received but it isn't correct. + log.Warnf("Received invalid message from btcd") + } + }() +} + +// unmarshalResponse attempts to unmarshal a marshaled JSON-RPC +// response. +func unmarshalResponse(s string) (*receivedResponse, error) { + var r btcjson.Reply + if err := json.Unmarshal([]byte(s), &r); err != nil { + return nil, err + } + + // Check for a valid ID. + if r.Id == nil { + return nil, errors.New("id is nil") + } + fid, ok := (*r.Id).(float64) + if !ok { + return nil, errors.New("id is not a number") + } + response := &receivedResponse{ + id: uint64(fid), + raw: s, + reply: &r, + } + return response, nil +} + +// unmarshalNotification attempts to unmarshal a marshaled JSON-RPC +// notification (Request with a nil or no ID). +func unmarshalNotification(s string) (btcjson.Cmd, error) { + req, err := btcjson.ParseMarshaledCmd([]byte(s)) + if err != nil { + return nil, err + } + + if req.Id() != nil { + return nil, errors.New("id is non-nil") + } + + return req, nil +} + +// GetBestBlockResult holds the result of a getbestblock response. +// +// TODO(jrick): shove this in btcws. +type GetBestBlockResult struct { + Hash string `json:"hash"` + Height int32 `json:"height"` +} + +// GetBestBlock gets both the block height and hash of the best block +// in the main chain. +func GetBestBlock(rpc ServerConn) (*GetBestBlockResult, *btcjson.Error) { + cmd := btcws.NewGetBestBlockCmd(<-NewJSONID) + request := NewServerRequest(cmd, new(GetBestBlockResult)) + response := <-rpc.SendRequest(request) + if response.Error() != nil { + return nil, response.Error() + } + return response.Result().(*GetBestBlockResult), nil +} + +// GetBlock requests details about a block with the given hash. +func GetBlock(rpc ServerConn, blockHash string) (*btcjson.BlockResult, *btcjson.Error) { + // NewGetBlockCmd cannot fail with no optargs, so omit the check. + cmd, _ := btcjson.NewGetBlockCmd(<-NewJSONID, blockHash) + request := NewServerRequest(cmd, new(btcjson.BlockResult)) + response := <-rpc.SendRequest(request) + if response.Error() != nil { + return nil, response.Error() + } + return response.Result().(*btcjson.BlockResult), nil +} + +// GetCurrentNet requests the network a bitcoin RPC server is running on. +func GetCurrentNet(rpc ServerConn) (btcwire.BitcoinNet, *btcjson.Error) { + cmd := btcws.NewGetCurrentNetCmd(<-NewJSONID) + request := NewServerRequest(cmd, nil) + response := <-rpc.SendRequest(request) + if response.Error() != nil { + return 0, response.Error() + } + return btcwire.BitcoinNet(uint32(response.Result().(float64))), nil +} + +// NotifyBlocks requests blockconnected and blockdisconnected notifications. +func NotifyBlocks(rpc ServerConn) *btcjson.Error { + cmd := btcws.NewNotifyBlocksCmd(<-NewJSONID) + request := NewServerRequest(cmd, nil) + response := <-rpc.SendRequest(request) + return response.Error() +} + +// NotifyNewTXs requests notifications for new transactions that spend +// to any of the addresses in addrs. +func NotifyNewTXs(rpc ServerConn, addrs []string) *btcjson.Error { + cmd := btcws.NewNotifyNewTXsCmd(<-NewJSONID, addrs) + request := NewServerRequest(cmd, nil) + response := <-rpc.SendRequest(request) + return response.Error() +} + +// NotifySpent requests notifications for when a transaction is processed which +// spends op. +func NotifySpent(rpc ServerConn, op *btcwire.OutPoint) *btcjson.Error { + cmd := btcws.NewNotifySpentCmd(<-NewJSONID, op) + request := NewServerRequest(cmd, nil) + response := <-rpc.SendRequest(request) + return response.Error() +} + +// Rescan requests a blockchain rescan for transactions to any number of +// addresses and notifications to inform wallet about such transactions. +func Rescan(rpc ServerConn, beginBlock int32, addrs map[string]struct{}) *btcjson.Error { + // NewRescanCmd cannot fail with no optargs, so omit the check. + cmd, _ := btcws.NewRescanCmd(<-NewJSONID, beginBlock, addrs) + request := NewServerRequest(cmd, nil) + response := <-rpc.SendRequest(request) + return response.Error() +} + +// SendRawTransaction sends a hex-encoded transaction for relay. +func SendRawTransaction(rpc ServerConn, hextx string) (txid string, error *btcjson.Error) { + // NewSendRawTransactionCmd cannot fail, so omit the check. + cmd, _ := btcjson.NewSendRawTransactionCmd(<-NewJSONID, hextx) + request := NewServerRequest(cmd, new(string)) + response := <-rpc.SendRequest(request) + if response.Error() != nil { + return "", response.Error() + } + return *response.Result().(*string), nil +} diff --git a/cmdmgr.go b/rpcserver.go similarity index 87% rename from cmdmgr.go rename to rpcserver.go index 4246ff8..dc0e774 100644 --- a/cmdmgr.go +++ b/rpcserver.go @@ -97,57 +97,116 @@ var wsHandlers = map[string]cmdHandler{ "walletislocked": WalletIsLocked, } -// ProcessFrontendRequest checks the requests sent from a frontend. If the -// request method is one that must be handled by btcwallet, the -// request is processed here. Otherwise, the request is sent to btcd -// and btcd's reply is routed back to the frontend. -func ProcessFrontendRequest(msg []byte, ws bool) *btcjson.Reply { - // Parse marshaled command. - cmd, err := btcjson.ParseMarshaledCmd(msg) - if err != nil || cmd.Id() == nil { - // Invalid JSON-RPC request. - response := &btcjson.Reply{ - Error: &btcjson.ErrInvalidRequest, - } - return response - } +// Channels to control RPCGateway +var ( + // Incoming requests from frontends + clientRequests = make(chan *ClientRequest) - id := cmd.Id() - var result interface{} - var jsonErr *btcjson.Error + // Incoming notifications from a bitcoin server (btcd) + svrNtfns = make(chan btcjson.Cmd) +) - // Check for a handler to reply to cmd. If none exist, defer handlng - // to btcd. - if f, ok := rpcHandlers[cmd.Method()]; ok { - result, jsonErr = f(cmd) - } else if f, ok := wsHandlers[cmd.Method()]; ws && ok { - result, jsonErr = f(cmd) - } else { - // btcwallet does not have a handler for the command, so ask - // btcd. - result, jsonErr = DeferToBtcd(cmd) - } - - // Create and return response. - response := &btcjson.Reply{ - Id: &id, - Result: result, - Error: jsonErr, - } - return response +// ErrServerBusy is a custom JSON-RPC error for when a client's request +// could not be added to the server request queue for handling. +var ErrServerBusy = btcjson.Error{ + Code: -32000, + Message: "Server busy", } -// DeferToBtcd sends a marshaled JSON-RPC request to btcd and returns -// the reply. -func DeferToBtcd(cmd btcjson.Cmd) (interface{}, *btcjson.Error) { - // Update cmd with a new ID so replies can be handled without frontend - // IDs clashing with requests originating in btcwallet. The original - // request ID is always used in the frontend's response. - cmd.SetId(<-NewJSONID) +// RPCGateway is the common entry point for all client RPC requests and +// server notifications. If a request needs to be handled by btcwallet, +// it is sent to WalletRequestProcessor's request queue, or dropped if the +// queue is full. If a request is unhandled, it is recreated with a new +// JSON-RPC id and sent to btcd for handling. Notifications are also queued +// if they cannot be immediately handled, but are never dropped (queue may +// grow infinitely large). +func RPCGateway() { + var ntfnQueue []btcjson.Cmd + unreadChan := make(chan btcjson.Cmd) - request := NewRPCRequest(cmd, nil) - response := <-CurrentRPCConn().SendRequest(request) - return response.Result, response.Err + for { + var ntfnOut chan btcjson.Cmd + var oldestNtfn btcjson.Cmd + if len(ntfnQueue) > 0 { + ntfnOut = handleNtfn + oldestNtfn = ntfnQueue[0] + } else { + ntfnOut = unreadChan + } + + select { + case r := <-clientRequests: + // Check whether to handle request or send to btcd. + _, std := rpcHandlers[r.request.Method()] + _, ext := wsHandlers[r.request.Method()] + if std || ext { + select { + case requestQueue <- r: + default: + // Server busy with too many requests. + resp := ClientResponse{ + err: &ErrServerBusy, + } + r.response <- &resp + } + } else { + r.request.SetId(<-NewJSONID) + request := &ServerRequest{ + request: r.request, + result: nil, + response: r.response, + } + CurrentServerConn().SendRequest(request) + } + + case n := <-svrNtfns: + ntfnQueue = append(ntfnQueue, n) + + case ntfnOut <- oldestNtfn: + ntfnQueue = ntfnQueue[1:] + } + } +} + +// Channels to control WalletRequestProcessor +var ( + requestQueue = make(chan *ClientRequest, 100) + handleNtfn = make(chan btcjson.Cmd) +) + +// WalletRequestProcessor processes client requests and btcd notifications. +// Notifications are preferred over client requests. +func WalletRequestProcessor() { + for { + select { + case r := <-requestQueue: + var result interface{} + var jsonErr *btcjson.Error + if f, ok := rpcHandlers[r.request.Method()]; ok { + AcctMgr.Grab() + result, jsonErr = f(r.request) + AcctMgr.Release() + } else if f, ok := wsHandlers[r.request.Method()]; r.ws && ok { + AcctMgr.Grab() + result, jsonErr = f(r.request) + AcctMgr.Release() + } else { + result, jsonErr = Unimplemented(r.request) + } + resp := &ClientResponse{ + result: result, + err: jsonErr, + } + r.response <- resp + + case n := <-handleNtfn: + if f, ok := notificationHandlers[n.Method()]; ok { + AcctMgr.Grab() + f(n) + AcctMgr.Release() + } + } + } } // Unimplemented handles an unimplemented RPC request with the @@ -181,7 +240,7 @@ func DumpPrivKey(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { return nil, &btcjson.ErrInvalidAddressOrKey } - switch key, err := accountstore.DumpWIFPrivateKey(addr); err { + switch key, err := AcctMgr.DumpWIFPrivateKey(addr); err { case nil: // Key was found. return key, nil @@ -210,7 +269,7 @@ func DumpWallet(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { return nil, &btcjson.ErrInternal } - switch keys, err := accountstore.DumpKeys(); err { + switch keys, err := AcctMgr.DumpKeys(); err { case nil: // Reply with sorted WIF encoded private keys return keys, nil @@ -238,12 +297,12 @@ func ExportWatchingWallet(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { return nil, &btcjson.ErrInternal } - a, err := accountstore.Account(cmd.Account) + a, err := AcctMgr.Account(cmd.Account) switch err { case nil: break - case ErrAcctNotExist: + case ErrNotFound: return nil, &btcjson.ErrWalletInvalidAccountName default: // all other non-nil errors @@ -299,12 +358,12 @@ func GetAddressesByAccount(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { return nil, &btcjson.ErrInternal } - switch a, err := accountstore.Account(cmd.Account); err { + switch a, err := AcctMgr.Account(cmd.Account); err { case nil: // Return sorted active payment addresses. return a.SortedActivePaymentAddresses(), nil - case ErrAcctNotExist: + case ErrNotFound: return nil, &btcjson.ErrWalletInvalidAccountName default: // all other non-nil errors @@ -326,7 +385,7 @@ func GetBalance(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { return nil, &btcjson.ErrInternal } - balance, err := accountstore.CalculateBalance(cmd.Account, cmd.MinConf) + balance, err := AcctMgr.CalculateBalance(cmd.Account, cmd.MinConf) if err != nil { return nil, &btcjson.ErrWalletInvalidAccountName } @@ -339,19 +398,18 @@ func GetBalance(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { // information about the current state of btcwallet. // exist. func GetInfo(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { - // Call down to btcd for all of the information in this command known // by them. This call can not realistically ever fail. gicmd, _ := btcjson.NewGetInfoCmd(<-NewJSONID) - response := <-(CurrentRPCConn().SendRequest(NewRPCRequest(gicmd, - make(map[string]interface{})))) - if response.Err != nil { - return nil, response.Err + req := NewServerRequest(gicmd, make(map[string]interface{})) + response := <-CurrentServerConn().SendRequest(req) + if response.Error() != nil { + return nil, response.Error() } - ret := response.Result.(map[string]interface{}) + ret := response.Result().(map[string]interface{}) balance := float64(0.0) - accounts := accountstore.ListAccounts(1) + accounts := AcctMgr.ListAccounts(1) for _, v := range accounts { balance += v } @@ -429,12 +487,12 @@ func GetAccountAddress(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { } // Lookup account for this request. - a, err := accountstore.Account(cmd.Account) + a, err := AcctMgr.Account(cmd.Account) switch err { case nil: break - case ErrAcctNotExist: + case ErrNotFound: return nil, &btcjson.ErrWalletInvalidAccountName default: // all other non-nil errors @@ -494,7 +552,7 @@ func GetAddressBalance(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { // Get the account which holds the address in the request. // This should not fail, so if it does, return an internal // error to the frontend. - a, err := accountstore.Account(aname) + a, err := AcctMgr.Account(aname) if err != nil { return nil, &btcjson.ErrInternal } @@ -513,12 +571,12 @@ func GetUnconfirmedBalance(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { } // Get the account included in the request. - a, err := accountstore.Account(cmd.Account) + a, err := AcctMgr.Account(cmd.Account) switch err { case nil: break - case ErrAcctNotExist: + case ErrNotFound: return nil, &btcjson.ErrWalletInvalidAccountName default: @@ -545,12 +603,12 @@ func ImportPrivKey(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { // Get the acount included in the request. Yes, Label is the // account name... - a, err := accountstore.Account(cmd.Label) + a, err := AcctMgr.Account(cmd.Label) switch err { case nil: break - case ErrAcctNotExist: + case ErrNotFound: return nil, &btcjson.ErrWalletInvalidAccountName default: @@ -561,8 +619,14 @@ func ImportPrivKey(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { return nil, &e } + pk, net, compressed, err := btcutil.DecodePrivateKey(cmd.PrivKey) + if err != nil || net != a.Net() { + return nil, &btcjson.ErrInvalidAddressOrKey + } + // Import the private key, handling any errors. - switch err := a.ImportPrivKey(cmd.PrivKey, cmd.Rescan); err { + bs := &wallet.BlockStamp{} + switch _, err := a.ImportPrivateKey(pk, compressed, bs); err { case nil: // If the import was successful, reply with nil. return nil, nil @@ -596,7 +660,7 @@ func KeypoolRefill(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { // (map[string]interface{}) of all accounts and their balances, instead of // separate notifications for each account. func NotifyBalances(frontend chan []byte) { - accountstore.NotifyBalances(frontend) + AcctMgr.NotifyBalances(frontend) } // GetNewAddress handlesa getnewaddress request by returning a new @@ -609,20 +673,16 @@ func GetNewAddress(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { return nil, &btcjson.ErrInternal } - a, err := accountstore.Account(cmd.Account) + a, err := AcctMgr.Account(cmd.Account) switch err { case nil: break - case ErrAcctNotExist: + case ErrNotFound: return nil, &btcjson.ErrWalletInvalidAccountName case ErrBtcdDisconnected: - e := btcjson.Error{ - Code: btcjson.ErrInternal.Code, - Message: "btcd disconnected", - } - return nil, &e + return nil, &ErrBtcdDisconnected default: // all other non-nil errors e := btcjson.Error{ @@ -664,7 +724,7 @@ func ListAccounts(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { } // Return the map. This will be marshaled into a JSON object. - return accountstore.ListAccounts(cmd.MinConf), nil + return AcctMgr.ListAccounts(cmd.MinConf), nil } // ListSinceBlock handles a listsinceblock request by returning an array of maps @@ -677,7 +737,7 @@ func ListSinceBlock(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { height := int32(-1) if cmd.BlockHash != "" { - br, err := GetBlock(CurrentRPCConn(), cmd.BlockHash) + br, err := GetBlock(CurrentServerConn(), cmd.BlockHash) if err != nil { return nil, err } @@ -704,9 +764,10 @@ func ListSinceBlock(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { } } - bhChan := CurrentRPCConn().SendRequest(NewRPCRequest(gbh, new(string))) + req := NewServerRequest(gbh, new(string)) + bhChan := CurrentServerConn().SendRequest(req) - txInfoList, err := accountstore.ListSinceBlock(height, bs.Height, + txInfoList, err := AcctMgr.ListSinceBlock(height, bs.Height, cmd.TargetConfirmations) if err != nil { return nil, &btcjson.Error{ @@ -717,11 +778,11 @@ func ListSinceBlock(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { // Done with work, get the response. response := <-bhChan - if response.Err != nil { - return nil, response.Err + if response.Error() != nil { + return nil, response.Error() } - hash := response.Result.(*string) + hash := response.Result().(*string) res := make(map[string]interface{}) res["transactions"] = txInfoList @@ -739,12 +800,12 @@ func ListTransactions(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { return nil, &btcjson.ErrInternal } - a, err := accountstore.Account(cmd.Account) + a, err := AcctMgr.Account(cmd.Account) switch err { case nil: break - case ErrAcctNotExist: + case ErrNotFound: return nil, &btcjson.ErrWalletInvalidAccountName default: // all other non-nil errors @@ -788,12 +849,12 @@ func ListAddressTransactions(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { return nil, &btcjson.ErrInternal } - a, err := accountstore.Account(cmd.Account) + a, err := AcctMgr.Account(cmd.Account) switch err { case nil: break - case ErrAcctNotExist: + case ErrNotFound: return nil, &btcjson.ErrWalletInvalidAccountName default: // all other non-nil errors @@ -840,12 +901,12 @@ func ListAllTransactions(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { return nil, &btcjson.ErrInternal } - a, err := accountstore.Account(cmd.Account) + a, err := AcctMgr.Account(cmd.Account) switch err { case nil: break - case ErrAcctNotExist: + case ErrNotFound: return nil, &btcjson.ErrWalletInvalidAccountName default: // all other non-nil errors @@ -906,7 +967,7 @@ func SendFrom(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { } // Check that the account specified in the request exists. - a, err := accountstore.Account(cmd.FromAccount) + a, err := AcctMgr.Account(cmd.FromAccount) if err != nil { return nil, &btcjson.ErrWalletInvalidAccountName } @@ -945,8 +1006,8 @@ func SendFrom(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { // If a change address was added, sync wallet to disk and request // transaction notifications to the change address. if createdTx.changeAddr != nil { - a.ScheduleWalletWrite() - if err := a.WriteScheduledToDisk(); err != nil { + AcctMgr.ds.ScheduleWalletWrite(a) + if err := AcctMgr.ds.FlushAccount(a); err != nil { e := btcjson.Error{ Code: btcjson.ErrWallet.Code, Message: "Cannot write account: " + err.Error(), @@ -959,14 +1020,13 @@ func SendFrom(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { hextx := hex.EncodeToString(createdTx.rawTx) // NewSendRawTransactionCmd will never fail so don't check error. sendtx, _ := btcjson.NewSendRawTransactionCmd(<-NewJSONID, hextx) - var txid string - request := NewRPCRequest(sendtx, txid) - response := <-CurrentRPCConn().SendRequest(request) - txid = response.Result.(string) + request := NewServerRequest(sendtx, new(string)) + response := <-CurrentServerConn().SendRequest(request) + txid := *response.Result().(*string) - if response.Err != nil { + if response.Error() != nil { SendTxHistSyncChans.remove <- createdTx.txid - return nil, response.Err + return nil, response.Error() } return handleSendRawTxReply(cmd, txid, a, createdTx) @@ -994,7 +1054,7 @@ func SendMany(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { } // Check that the account specified in the request exists. - a, err := accountstore.Account(cmd.FromAccount) + a, err := AcctMgr.Account(cmd.FromAccount) if err != nil { return nil, &btcjson.ErrWalletInvalidAccountName } @@ -1028,8 +1088,8 @@ func SendMany(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { // If a change address was added, sync wallet to disk and request // transaction notifications to the change address. if createdTx.changeAddr != nil { - a.ScheduleWalletWrite() - if err := a.WriteScheduledToDisk(); err != nil { + AcctMgr.ds.ScheduleWalletWrite(a) + if err := AcctMgr.ds.FlushAccount(a); err != nil { e := btcjson.Error{ Code: btcjson.ErrWallet.Code, Message: "Cannot write account: " + err.Error(), @@ -1042,14 +1102,13 @@ func SendMany(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { hextx := hex.EncodeToString(createdTx.rawTx) // NewSendRawTransactionCmd will never fail so don't check error. sendtx, _ := btcjson.NewSendRawTransactionCmd(<-NewJSONID, hextx) - var txid string - request := NewRPCRequest(sendtx, txid) - response := <-CurrentRPCConn().SendRequest(request) - txid = response.Result.(string) + request := NewServerRequest(sendtx, new(string)) + response := <-CurrentServerConn().SendRequest(request) + txid := *response.Result().(*string) - if response.Err != nil { + if response.Error() != nil { SendTxHistSyncChans.remove <- createdTx.txid - return nil, response.Err + return nil, response.Error() } return handleSendRawTxReply(cmd, txid, a, createdTx) @@ -1128,10 +1187,8 @@ func handleSendRawTxReply(icmd btcjson.Cmd, txIDStr string, a *Account, txInfo * Fee: txInfo.fee, Receivers: txInfo.outputs, } - a.TxStore.Lock() - a.TxStore.s = append(a.TxStore.s, sendtx) - a.TxStore.Unlock() - a.ScheduleTxStoreWrite() + a.TxStore = append(a.TxStore, sendtx) + AcctMgr.ds.ScheduleTxStoreWrite(a) // Notify frontends of new SendTx. bs, err := GetCurBlock() @@ -1146,15 +1203,12 @@ func handleSendRawTxReply(icmd btcjson.Cmd, txIDStr string, a *Account, txInfo * SendTxHistSyncChans.done <- txInfo.txid // Remove previous unspent outputs now spent by the tx. - a.UtxoStore.Lock() - modified := a.UtxoStore.s.Remove(txInfo.inputs) - a.UtxoStore.Unlock() - if modified { - a.ScheduleUtxoStoreWrite() + if a.UtxoStore.Remove(txInfo.inputs) { + AcctMgr.ds.ScheduleUtxoStoreWrite(a) } // Disk sync tx and utxo stores. - if err := a.WriteScheduledToDisk(); err != nil { + if err := AcctMgr.ds.FlushAccount(a); err != nil { log.Errorf("cannot write account: %v", err) } @@ -1236,39 +1290,36 @@ func CreateEncryptedWallet(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { return nil, &btcjson.ErrInternal } - err := accountstore.CreateEncryptedWallet("", "", []byte(cmd.Passphrase)) + err := AcctMgr.CreateEncryptedWallet([]byte(cmd.Passphrase)) switch err { case nil: // A nil reply is sent upon successful wallet creation. return nil, nil - case ErrAcctNotExist: + case ErrWalletExists: return nil, &btcjson.ErrWalletInvalidAccountName case ErrBtcdDisconnected: - e := btcjson.Error{ - Code: btcjson.ErrInternal.Code, - Message: "btcd disconnected", - } - return nil, &e + return nil, &ErrBtcdDisconnected default: // all other non-nil errors return nil, &btcjson.ErrInternal } } +// RecoverAddresses recovers the next n addresses from an account's wallet. func RecoverAddresses(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { cmd, ok := icmd.(*btcws.RecoverAddressesCmd) if !ok { return nil, &btcjson.ErrInternal } - a, err := accountstore.Account(cmd.Account) + a, err := AcctMgr.Account(cmd.Account) switch err { case nil: break - case ErrAcctNotExist: + case ErrNotFound: return nil, &btcjson.ErrWalletInvalidAccountName default: // all other non-nil errors @@ -1301,12 +1352,12 @@ func WalletIsLocked(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { return nil, &btcjson.ErrInternal } - a, err := accountstore.Account(cmd.Account) + a, err := AcctMgr.Account(cmd.Account) switch err { case nil: break - case ErrAcctNotExist: + case ErrNotFound: return nil, &btcjson.ErrWalletInvalidAccountName default: // all other non-nil errors @@ -1317,19 +1368,14 @@ func WalletIsLocked(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { return nil, &e } - a.mtx.RLock() - locked := a.Wallet.IsLocked() - a.mtx.RUnlock() - - // Reply with true for a locked wallet, and false for unlocked. - return locked, nil + return a.Wallet.IsLocked(), nil } // WalletLock handles a walletlock request by locking the all account // wallets, returning an error if any wallet is not encrypted (for example, // a watching-only wallet). func WalletLock(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { - if err := accountstore.LockWallets(); err != nil { + if err := AcctMgr.LockWallets(); err != nil { e := btcjson.Error{ Code: btcjson.ErrWallet.Code, Message: err.Error(), @@ -1350,7 +1396,7 @@ func WalletPassphrase(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { return nil, &btcjson.ErrInternal } - if err := accountstore.UnlockWallets(cmd.Passphrase); err != nil { + if err := AcctMgr.UnlockWallets(cmd.Passphrase); err != nil { e := btcjson.Error{ Code: btcjson.ErrWallet.Code, Message: err.Error(), @@ -1360,7 +1406,7 @@ func WalletPassphrase(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { go func(timeout int64) { time.Sleep(time.Second * time.Duration(timeout)) - _ = accountstore.LockWallets() + _ = AcctMgr.LockWallets() }(cmd.Timeout) return nil, nil @@ -1379,7 +1425,7 @@ func WalletPassphraseChange(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { return nil, &btcjson.ErrInternal } - err := accountstore.ChangePassphrase([]byte(cmd.OldPassphrase), + err := AcctMgr.ChangePassphrase([]byte(cmd.OldPassphrase), []byte(cmd.NewPassphrase)) switch err { case nil: diff --git a/sockets.go b/sockets.go index 0d5053f..a6f68f0 100644 --- a/sockets.go +++ b/sockets.go @@ -19,7 +19,6 @@ package main import ( "code.google.com/p/go.net/websocket" "crypto/sha256" - _ "crypto/sha512" // for cert generation "crypto/subtle" "crypto/tls" "crypto/x509" @@ -158,7 +157,7 @@ func newServer(listenAddrs []string) (*server, error) { listeners = append(listeners, listener) } if len(listeners) == 0 { - return nil, errors.New("RPCS: No valid listen address") + return nil, errors.New("no valid listen address") } s.listeners = listeners @@ -201,25 +200,64 @@ func genCertPair(certFile, keyFile string) error { return nil } -// handleRPCRequest processes a JSON-RPC request from a frontend. -func (s *server) handleRPCRequest(w http.ResponseWriter, r *http.Request) { +// ParseRequest parses a command or notification out of a JSON-RPC request, +// returning any errors as a JSON-RPC error. +func ParseRequest(msg []byte) (btcjson.Cmd, *btcjson.Error) { + cmd, err := btcjson.ParseMarshaledCmd(msg) + if err != nil || cmd.Id() == nil { + return cmd, &btcjson.ErrInvalidRequest + } + return cmd, nil +} + +// ReplyToFrontend responds to a marshaled JSON-RPC request with a +// marshaled JSON-RPC response for both standard and extension +// (websocket) clients. +func ReplyToFrontend(msg []byte, ws bool) []byte { + cmd, jsonErr := ParseRequest(msg) + var id interface{} + if cmd != nil { + id = cmd.Id() + } + if jsonErr != nil { + response := btcjson.Reply{ + Id: &id, + Error: jsonErr, + } + mresponse, _ := json.Marshal(response) + return mresponse + } + + cReq := NewClientRequest(cmd, ws) + result, jsonErr := cReq.Handle() + + response := btcjson.Reply{ + Id: &id, + Result: result, + Error: jsonErr, + } + mresponse, err := json.Marshal(response) + if err != nil { + log.Errorf("Cannot marhal response: %v", err) + response = btcjson.Reply{ + Id: &id, + Error: &btcjson.ErrInternal, + } + mresponse, _ = json.Marshal(&response) + } + + return mresponse +} + +// ServeRPCRequest processes and replies to a JSON-RPC client request. +func (s *server) ServeRPCRequest(w http.ResponseWriter, r *http.Request) { body, err := btcjson.GetRaw(r.Body) if err != nil { log.Errorf("RPCS: Error getting JSON message: %v", err) } - response := ProcessFrontendRequest(body, false) - mresponse, err := json.Marshal(response) - if err != nil { - id := response.Id - response = &btcjson.Reply{ - Id: id, - Error: &btcjson.ErrInternal, - } - mresponse, _ = json.Marshal(response) - } - - if _, err := w.Write(mresponse); err != nil { + resp := ReplyToFrontend(body, false) + if _, err := w.Write(resp); err != nil { log.Warnf("RPCS: could not respond to RPC request: %v", err) } } @@ -234,18 +272,19 @@ func frontendListenerDuplicator() { frontendListeners := make(map[chan []byte]bool) // Don't want to add or delete a wallet listener while iterating - // through each to propigate to every attached wallet. Use a mutex to - // prevent this. - var mtx sync.Mutex + // through each to propigate to every attached wallet. Use a binary + // semaphore to prevent this. + sem := make(chan struct{}, 1) + sem <- struct{}{} // Check for listener channels to add or remove from set. go func() { for { select { case c := <-addFrontendListener: - mtx.Lock() + <-sem frontendListeners[c] = true - mtx.Unlock() + sem <- struct{}{} NotifyBtcdConnection(c) bs, err := GetCurBlock() @@ -255,9 +294,9 @@ func frontendListenerDuplicator() { } case c := <-deleteFrontendListener: - mtx.Lock() + <-sem delete(frontendListeners, c) - mtx.Unlock() + sem <- struct{}{} } } }() @@ -267,18 +306,18 @@ func frontendListenerDuplicator() { for { ntfn := <-frontendNotificationMaster - mtx.Lock() + <-sem for c := range frontendListeners { c <- ntfn } - mtx.Unlock() + sem <- struct{}{} } } // NotifyBtcdConnection notifies a frontend of the current connection // status of btcwallet to btcd. func NotifyBtcdConnection(reply chan []byte) { - if btcd, ok := CurrentRPCConn().(*BtcdRPCConn); ok { + if btcd, ok := CurrentServerConn().(*BtcdRPCConn); ok { ntfn := btcws.NewBtcdConnectedNtfn(btcd.Connected()) mntfn, _ := ntfn.MarshalJSON() reply <- mntfn @@ -286,12 +325,10 @@ func NotifyBtcdConnection(reply chan []byte) { } -// frontendSendRecv is the handler function for websocket connections from -// a btcwallet instance. It reads requests and sends responses to a -// frontend, as well as notififying wallets of chain updates. There can -// possibly be many of these running, one for each currently connected -// frontend. -func frontendSendRecv(ws *websocket.Conn) { +// WSSendRecv is the handler for websocket client connections. It loops +// forever (until disconnected), reading JSON-RPC requests and sending +// sending responses and notifications. +func WSSendRecv(ws *websocket.Conn) { // Add frontend notification channel to set so this handler receives // updates. frontendNotification := make(chan []byte) @@ -324,11 +361,10 @@ func frontendSendRecv(ws *websocket.Conn) { return } // Handle request here. - go func() { - reply := ProcessFrontendRequest(m, true) - mreply, _ := json.Marshal(reply) - frontendNotification <- mreply - }() + go func(m []byte) { + resp := ReplyToFrontend(m, true) + frontendNotification <- resp + }(m) case ntfn, _ := <-frontendNotification: if err := websocket.Message.Send(ws, ntfn); err != nil { @@ -370,14 +406,14 @@ func (s *server) Start() { http.Error(w, "401 Unauthorized.", http.StatusUnauthorized) return } - s.handleRPCRequest(w, r) + s.ServeRPCRequest(w, r) }) serveMux.HandleFunc("/frontend", func(w http.ResponseWriter, r *http.Request) { if err := s.checkAuth(r); err != nil { http.Error(w, "401 Unauthorized.", http.StatusUnauthorized) return } - websocket.Handler(frontendSendRecv).ServeHTTP(w, r) + websocket.Handler(WSSendRecv).ServeHTTP(w, r) }) for _, listener := range s.listeners { s.wg.Add(1) @@ -480,7 +516,7 @@ func BtcdConnect(certificates []byte) (*BtcdRPCConn, error) { func resendUnminedTxs() { for _, createdTx := range UnminedTxs.m { hextx := hex.EncodeToString(createdTx.rawTx) - if txid, err := SendRawTransaction(CurrentRPCConn(), hextx); err != nil { + if txid, err := SendRawTransaction(CurrentServerConn(), hextx); err != nil { // TODO(jrick): Check error for if this tx is a double spend, // remove it if so. } else { @@ -498,7 +534,7 @@ func resendUnminedTxs() { // TODO(jrick): Track and Rescan commands should be replaced with a // single TrackSince function (or similar) which requests address // notifications and performs the rescan since some block height. -func Handshake(rpc RPCConn) error { +func Handshake(rpc ServerConn) error { net, jsonErr := GetCurrentNet(rpc) if jsonErr != nil { return jsonErr @@ -523,7 +559,7 @@ func Handshake(rpc RPCConn) error { // Get default account. Only the default account is used to // track recently-seen blocks. - a, err := accountstore.Account("") + a, err := AcctMgr.Account("") if err != nil { // No account yet is not a handshake error, but means our // handshake is done. @@ -555,7 +591,7 @@ func Handshake(rpc RPCConn) error { // try to write new tx and utxo files on each rollback. if it.Next() { bs := it.BlockStamp() - accountstore.Rollback(bs.Height, &bs.Hash) + AcctMgr.Rollback(bs.Height, &bs.Hash) } // Set default account to be marked in sync with the current @@ -563,8 +599,8 @@ func Handshake(rpc RPCConn) error { a.Wallet.SetSyncedWith(bs) // Begin tracking wallets against this btcd instance. - accountstore.Track() - accountstore.RescanActiveAddresses() + AcctMgr.Track() + AcctMgr.RescanActiveAddresses() // (Re)send any unmined transactions to btcd in case of a btcd restart. resendUnminedTxs() @@ -581,8 +617,8 @@ func Handshake(rpc RPCConn) error { // and start a new rescan since the earliest block wallet must know // about. a.fullRescan = true - accountstore.Track() - accountstore.RescanActiveAddresses() + AcctMgr.Track() + AcctMgr.RescanActiveAddresses() resendUnminedTxs() return nil }