Greatly simplify design.

This change removes a lot of unnecessary and complicated locking (if
serializing requests is needed in the future, a goroutine will be used
instead) and also shifts the heavy lifting from frontends to btcwallet
itself to handle any notifications when they can be properly handled.
Although it's still legal to, frontends no longer need to explicitly
request account balances as these are calculated and sent as an async
notification on frontend connect, and these notifications will only
occur if btcd is currently connected.  Likewise, when btcd connects,
all frontends are immediately notified of all notifications that
require btcd information, such as the current block height for
calculating account balances.
This commit is contained in:
Josh Rickmar 2013-10-29 02:19:40 -04:00
parent f6af03bf98
commit 540cbb0930
3 changed files with 132 additions and 130 deletions

29
cmd.go
View file

@ -82,7 +82,7 @@ type BtcWallet struct {
// key. A RWMutex is used to protect against incorrect concurrent
// access.
type BtcWalletStore struct {
sync.RWMutex
sync.Mutex
m map[string]*BtcWallet
}
@ -98,11 +98,9 @@ func NewBtcWalletStore() *BtcWalletStore {
// TODO(jrick): This must also roll back the UTXO and TX stores, and notify
// all wallets of new account balances.
func (s *BtcWalletStore) Rollback(height int64, hash *btcwire.ShaHash) {
s.Lock()
for _, w := range s.m {
w.Rollback(height, hash)
}
s.Unlock()
}
// Rollback reverts each stored BtcWallet to a state before the block
@ -276,7 +274,7 @@ func getCurHeight() (height int64) {
// a UTXO must be in a block. If confirmations is 1 or greater,
// the balance will be calculated based on how many how many blocks
// include a UTXO.
func (w *BtcWallet) CalculateBalance(confirmations int) float64 {
func (w *BtcWallet) CalculateBalance(confirms int) float64 {
var bal uint64 // Measured in satoshi
height := getCurHeight()
@ -288,7 +286,7 @@ func (w *BtcWallet) CalculateBalance(confirmations int) float64 {
for _, u := range w.UtxoStore.s {
// Utxos not yet in blocks (height -1) should only be
// added if confirmations is 0.
if confirmations == 0 || (u.Height != -1 && int(height-u.Height+1) >= confirmations) {
if confirms == 0 || (u.Height != -1 && int(height-u.Height+1) >= confirms) {
bal += u.Amt
}
}
@ -528,7 +526,7 @@ func (w *BtcWallet) newBlockTxHandler(result interface{}, e *btcjson.Error) bool
return false
}
go func() {
// Add to TxStore
t := &tx.RecvTx{
Amt: uint64(amt),
}
@ -546,11 +544,9 @@ func (w *BtcWallet) newBlockTxHandler(result interface{}, e *btcjson.Error) bool
if err = w.writeDirtyToDisk(); err != nil {
log.Errorf("cannot sync dirty wallet: %v", err)
}
}()
// Do not add output to utxo store if spent.
// Add to UtxoStore if unspent.
if !spent {
go func() {
// First, iterate through all stored utxos. If an unconfirmed utxo
// (not present in a block) has the same outpoint as this utxo,
// update the block height and hash.
@ -561,10 +557,18 @@ func (w *BtcWallet) newBlockTxHandler(result interface{}, e *btcjson.Error) bool
}
if bytes.Equal(u.Out.Hash[:], txhash[:]) && u.Out.Index == uint32(index) {
// Found it.
w.UtxoStore.RUnlock()
w.UtxoStore.Lock()
copy(u.BlockHash[:], blockhash[:])
u.Height = int64(height)
w.UtxoStore.RUnlock()
return
w.UtxoStore.dirty = true
w.UtxoStore.Unlock()
if err = w.writeDirtyToDisk(); err != nil {
log.Errorf("cannot sync dirty wallet: %v", err)
}
return false
}
}
w.UtxoStore.RUnlock()
@ -576,10 +580,8 @@ func (w *BtcWallet) newBlockTxHandler(result interface{}, e *btcjson.Error) bool
}
copy(u.Out.Hash[:], txhash[:])
u.Out.Index = uint32(index)
copy(u.AddrHash[:], receiverHash)
copy(u.BlockHash[:], blockhash[:])
w.UtxoStore.Lock()
w.UtxoStore.s = append(w.UtxoStore.s, u)
w.UtxoStore.dirty = true
@ -592,7 +594,6 @@ func (w *BtcWallet) newBlockTxHandler(result interface{}, e *btcjson.Error) bool
unconfirmed := w.CalculateBalance(0) - confirmed
NotifyWalletBalance(frontendNotificationMaster, w.name, confirmed)
NotifyWalletBalanceUnconfirmed(frontendNotificationMaster, w.name, unconfirmed)
}()
}
// Never remove this handler.

View file

@ -132,11 +132,7 @@ func GetAddressesByAccount(reply chan []byte, msg *btcjson.Message) {
}
var result interface{}
wallets.RLock()
w := wallets.m[account]
wallets.RUnlock()
if w != nil {
if w := wallets.m[account]; w != nil {
result = w.Wallet.GetActiveAddresses()
} else {
ReplyError(reply, msg.Id, &btcjson.ErrWalletInvalidAccountName)
@ -172,11 +168,8 @@ func GetBalance(reply chan []byte, msg *btcjson.Message) {
}
}
wallets.RLock()
w := wallets.m[wname]
wallets.RUnlock()
var result interface{}
if w != nil {
if w := wallets.m[wname]; w != nil {
result = w.CalculateBalance(conf)
ReplySuccess(reply, msg.Id, result)
} else {
@ -186,20 +179,24 @@ func GetBalance(reply chan []byte, msg *btcjson.Message) {
}
}
// GetBalances notifies each attached wallet of the current confirmed
// GetBalances responds to the extension 'getbalances' command,
// replying with account balances for a single wallet request.
func GetBalances(reply chan []byte, msg *btcjson.Message) {
NotifyBalances(reply)
}
// NotifyBalances notifies an attached wallet of the current confirmed
// and unconfirmed account balances.
//
// TODO(jrick): Switch this to return a JSON object (map) of all accounts
// and their balances, instead of separate notifications for each account.
func GetBalances(reply chan []byte, msg *btcjson.Message) {
wallets.RLock()
func NotifyBalances(reply chan []byte) {
for _, w := range wallets.m {
balance := w.CalculateBalance(1)
unconfirmed := w.CalculateBalance(0) - balance
NotifyWalletBalance(reply, w.name, balance)
NotifyWalletBalanceUnconfirmed(reply, w.name, unconfirmed)
}
wallets.RUnlock()
}
// GetNewAddress gets or generates a new address for an account. If
@ -222,10 +219,7 @@ func GetNewAddress(reply chan []byte, msg *btcjson.Message) {
}
}
wallets.RLock()
w := wallets.m[wname]
wallets.RUnlock()
if w != nil {
if w := wallets.m[wname]; w != nil {
// TODO(jrick): generate new addresses if the address pool is empty.
addr, err := w.NextUnusedAddress()
if err != nil {
@ -265,11 +259,9 @@ func ListAccounts(reply chan []byte, msg *btcjson.Message) {
pairs := make(map[string]float64)
wallets.RLock()
for account, w := range wallets.m {
pairs[account] = w.CalculateBalance(minconf)
}
wallets.RUnlock()
ReplySuccess(reply, msg.Id, pairs)
}
@ -346,9 +338,11 @@ func SendFrom(reply chan []byte, msg *btcjson.Message) {
}
// Is wallet for this account unlocked?
wallets.Lock()
w := wallets.m[fromaccount]
wallets.Unlock()
w, ok := wallets.m[fromaccount]
if !ok {
ReplyError(reply, msg.Id, &btcjson.ErrWalletInvalidAccountName)
return
}
if w.IsLocked() {
ReplyError(reply, msg.Id, &btcjson.ErrWalletUnlockNeeded)
return
@ -507,9 +501,11 @@ func SendMany(reply chan []byte, msg *btcjson.Message) {
}
// Is wallet for this account unlocked?
wallets.Lock()
w := wallets.m[fromaccount]
wallets.Unlock()
w, ok := wallets.m[fromaccount]
if !ok {
ReplyError(reply, msg.Id, &btcjson.ErrWalletInvalidAccountName)
return
}
if w.IsLocked() {
ReplyError(reply, msg.Id, &btcjson.ErrWalletUnlockNeeded)
return
@ -678,12 +674,13 @@ func CreateEncryptedWallet(reply chan []byte, msg *btcjson.Message) {
return
}
// Does this wallet already exist?
// Prevent two wallets with the same account name from being added.
wallets.Lock()
defer wallets.Unlock()
// Does this wallet already exist?
if w := wallets.m[wname]; w != nil {
e := btcjson.ErrWalletInvalidAccountName
e.Message = "Wallet already exists."
ReplyError(reply, msg.Id, &e)
return
}
@ -694,7 +691,7 @@ func CreateEncryptedWallet(reply chan []byte, msg *btcjson.Message) {
} else {
net = btcwire.TestNet3
}
w, err := wallet.NewWallet(wname, desc, []byte(pass), net)
wlt, err := wallet.NewWallet(wname, desc, []byte(pass), net)
if err != nil {
log.Error("Error creating wallet: " + err.Error())
ReplyError(reply, msg.Id, &btcjson.ErrInternal)
@ -704,7 +701,7 @@ func CreateEncryptedWallet(reply chan []byte, msg *btcjson.Message) {
// Grab a new unique sequence number for tx notifications in new blocks.
n := <-NewJSONID
bw := &BtcWallet{
Wallet: w,
Wallet: wlt,
name: wname,
dirty: true,
NewBlockTxSeqN: n,
@ -736,10 +733,8 @@ func WalletIsLocked(reply chan []byte, msg *btcjson.Message) {
return
}
}
wallets.RLock()
w := wallets.m[account]
wallets.RUnlock()
if w != nil {
if w := wallets.m[account]; w != nil {
result := w.IsLocked()
ReplySuccess(reply, msg.Id, result)
} else {
@ -753,10 +748,7 @@ func WalletIsLocked(reply chan []byte, msg *btcjson.Message) {
// with this. Lock all the wallets, like if all accounts are locked
// for one bitcoind wallet?
func WalletLock(reply chan []byte, msg *btcjson.Message) {
wallets.RLock()
w := wallets.m[""]
wallets.RUnlock()
if w != nil {
if w := wallets.m[""]; w != nil {
if err := w.Lock(); err != nil {
ReplyError(reply, msg.Id, &btcjson.ErrWalletWrongEncState)
} else {
@ -787,10 +779,7 @@ func WalletPassphrase(reply chan []byte, msg *btcjson.Message) {
return
}
wallets.RLock()
w := wallets.m[""]
wallets.RUnlock()
if w != nil {
if w := wallets.m[""]; w != nil {
if err := w.Unlock([]byte(passphrase)); err != nil {
ReplyError(reply, msg.Id, &btcjson.ErrWalletPassphraseIncorrect)
return

View file

@ -38,7 +38,8 @@ var (
// process cannot be established.
ErrConnLost = errors.New("connection lost")
// Channel to close to notify that connection to btcd has been lost.
// Channel for updates and boolean with the most recent update of
// whether the connection to btcd is active or not.
btcdConnected = struct {
b bool
c chan bool
@ -104,6 +105,14 @@ func frontendListenerDuplicator() {
frontendListeners[c] = true
mtx.Unlock()
// TODO(jrick): these notifications belong somewhere better.
// Probably want to copy AddWalletListener from btcd, and
// place these notifications in that function.
if btcdConnected.b {
NotifyNewBlockChainHeight(c, getCurHeight())
NotifyBalances(c)
}
case c := <-deleteFrontendListener:
mtx.Lock()
delete(frontendListeners, c)
@ -319,6 +328,18 @@ func ProcessBtcdNotificationReply(b []byte) {
}
}
// NotifyNewBlockChainHeight notifies all frontends of a new
// blockchain height.
func NotifyNewBlockChainHeight(reply chan []byte, height int64) {
var id interface{} = "btcwallet:newblockchainheight"
msgRaw := &btcjson.Reply{
Result: height,
Id: &id,
}
msg, _ := json.Marshal(msgRaw)
reply <- msg
}
// NtfnBlockConnected handles btcd notifications resulting from newly
// connected blocks to the main blockchain. Currently, this only creates
// a new notification for frontends with the new blockchain height.
@ -363,17 +384,9 @@ func NtfnBlockConnected(r interface{}) {
// TODO(jrick): update TxStore and UtxoStore with new hash
_ = hash
var id interface{} = "btcwallet:newblockchainheight"
msgRaw := &btcjson.Reply{
Result: height,
Id: &id,
}
msg, err := json.Marshal(msgRaw)
if err != nil {
log.Error("btcd:blockconnected handler: unable to marshal reply")
return
}
frontendNotificationMaster <- msg
// Notify frontends of new blockchain height.
NotifyNewBlockChainHeight(frontendNotificationMaster, height)
// Remove all mined transactions from pool.
UnminedTxs.Lock()
@ -437,17 +450,8 @@ func NtfnBlockDisconnected(r interface{}) {
wallets.Rollback(height, hash)
}()
var id interface{} = "btcwallet:newblockchainheight"
msgRaw := &btcjson.Reply{
Result: height,
Id: &id,
}
msg, err := json.Marshal(msgRaw)
if err != nil {
log.Error("btcd:blockdisconnected handler: unable to marshal reply")
return
}
frontendNotificationMaster <- msg
// Notify frontends of new blockchain height.
NotifyNewBlockChainHeight(frontendNotificationMaster, height)
}
var duplicateOnce sync.Once
@ -560,11 +564,19 @@ func BtcdHandshake(ws *websocket.Conn) {
}
// Begin tracking wallets against this btcd instance.
wallets.RLock()
for _, w := range wallets.m {
w.Track()
}
wallets.RUnlock()
// Request the new block height, and notify frontends.
//
// TODO(jrick): Check that there was not any reorgs done
// since last connection.
NotifyNewBlockChainHeight(frontendNotificationMaster, getCurHeight())
// Notify frontends of all account balances, calculated based
// from the block height of this new btcd connection.
NotifyBalances(frontendNotificationMaster)
// (Re)send any unmined transactions to btcd in case of a btcd restart.
resendUnminedTxs()