From 248ea9c08f9e2e907e7a6b5647b73d37c8249ad7 Mon Sep 17 00:00:00 2001 From: Josh Rickmar Date: Wed, 30 Jul 2014 09:47:50 -0500 Subject: [PATCH] Send btcdconnected notifications without a wallet. The notified chain server connection state was being passed through the wallet and then notified to the RPC server, which prevented this notification from ever firing if a wallet didn't exist yet. Instead, make the RPC server register for these notifications directly from the chain server RPC client. I'm not happy with this notification and how it's handled in the code, but to not break existing clients this change is being made. Fixing the notifiation mess and modifying existing clients to use a new notification API will need to be done sometime later. --- chain/chain.go | 57 ++++++++++++++++++++++++++++++++++++++-- chainntfns.go | 2 -- rpcserver.go | 71 +++++++++++++++++++++++++++----------------------- wallet.go | 37 +++++--------------------- 4 files changed, 99 insertions(+), 68 deletions(-) diff --git a/chain/chain.go b/chain/chain.go index 15181c4..fa98b15 100644 --- a/chain/chain.go +++ b/chain/chain.go @@ -38,6 +38,13 @@ type Client struct { dequeueNotification chan interface{} currentBlock chan *keystore.BlockStamp + // Notification channels regarding the state of the client. These exist + // so other components can listen in on chain activity. These are + // initialized as nil, and must be created by calling one of the Listen* + // methods. + connected chan bool + notificationLock sync.Locker + quit chan struct{} wg sync.WaitGroup started bool @@ -50,6 +57,7 @@ func NewClient(net *btcnet.Params, connect, user, pass string, certs []byte) (*C enqueueNotification: make(chan interface{}), dequeueNotification: make(chan interface{}), currentBlock: make(chan *keystore.BlockStamp), + notificationLock: new(sync.Mutex), quit: make(chan struct{}), } initializedClient := make(chan struct{}) @@ -144,7 +152,6 @@ func (c *Client) BlockStamp() (*keystore.BlockStamp, error) { // btcrpcclient callbacks, which isn't very Go-like and doesn't allow // blocking client calls. type ( - ClientConnected struct{} BlockConnected keystore.BlockStamp BlockDisconnected keystore.BlockStamp RecvTx struct { @@ -188,7 +195,7 @@ func parseBlock(block *btcws.BlockDetails) (blk *txstore.Block, idx int, err err func (c *Client) onClientConnect() { log.Info("Established websocket RPC connection to btcd") - c.enqueueNotification <- ClientConnected{} + c.notifyConnected(true) } func (c *Client) onBlockConnected(hash *btcwire.ShaHash, height int32) { @@ -308,3 +315,49 @@ out: close(c.dequeueNotification) c.wg.Done() } + +// ErrDuplicateListen is returned for any attempts to listen for the same +// notification more than once. If callers must pass along a notifiation to +// multiple places, they must broadcast it themself. +var ErrDuplicateListen = errors.New("duplicate listen") + +type noopLocker struct{} + +func (noopLocker) Lock() {} +func (noopLocker) Unlock() {} + +// ListenConnected returns a channel that passes the current connection state +// of the client. This will be automatically sent to when the client is first +// connected, as well as the current state whenever NotifyConnected is +// forcibly called. +// +// If this is called twice, ErrDuplicateListen is returned. +func (c *Client) ListenConnected() (<-chan bool, error) { + c.notificationLock.Lock() + defer c.notificationLock.Unlock() + + if c.connected != nil { + return nil, ErrDuplicateListen + } + c.connected = make(chan bool) + c.notificationLock = noopLocker{} + return c.connected, nil +} + +func (c *Client) notifyConnected(connected bool) { + c.notificationLock.Lock() + if c.connected != nil { + c.connected <- connected + } + c.notificationLock.Unlock() +} + +// NotifyConnected sends the channel notification for a connected or +// disconnected client. This is exported so it can be called by other +// packages which require notifying the current connection state. +// +// TODO: This shouldn't exist, but the current notification API requires it. +func (c *Client) NotifyConnected() { + connected := !c.Client.Disconnected() + c.notifyConnected(connected) +} diff --git a/chainntfns.go b/chainntfns.go index 8b1e297..07d706b 100644 --- a/chainntfns.go +++ b/chainntfns.go @@ -28,8 +28,6 @@ func (w *Wallet) handleChainNotifications() { for n := range w.chainSvr.Notifications() { var err error switch n := n.(type) { - case chain.ClientConnected: - w.notifyChainServerConnected(true) case chain.BlockConnected: w.connectBlock(keystore.BlockStamp(n)) case chain.BlockDisconnected: diff --git a/rpcserver.go b/rpcserver.go index d018f46..ac7e3e9 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -265,17 +265,18 @@ type rpcServer struct { // Channels read from other components from which notifications are // created. - connectedBlocks <-chan keystore.BlockStamp - disconnectedBlocks <-chan keystore.BlockStamp - newCredits <-chan txstore.Credit - newDebits <-chan txstore.Debits - minedCredits <-chan txstore.Credit - minedDebits <-chan txstore.Debits - keystoreLocked <-chan bool - confirmedBalance <-chan btcutil.Amount - unconfirmedBalance <-chan btcutil.Amount - chainServerConnected <-chan bool - registerWalletNtfns chan struct{} + connectedBlocks <-chan keystore.BlockStamp + disconnectedBlocks <-chan keystore.BlockStamp + newCredits <-chan txstore.Credit + newDebits <-chan txstore.Debits + minedCredits <-chan txstore.Credit + minedDebits <-chan txstore.Debits + keystoreLocked <-chan bool + confirmedBalance <-chan btcutil.Amount + unconfirmedBalance <-chan btcutil.Amount + chainServerConnected <-chan bool + registerWalletNtfns chan struct{} + registerChainSvrNtfns chan struct{} // enqueueNotification and dequeueNotification handle both sides of an // infinitly growing queue for websocket client notifications. @@ -310,6 +311,7 @@ func newRPCServer(listenAddrs []string, maxPost, maxWebsockets int64) (*rpcServe registerWSC: make(chan *websocketClient), unregisterWSC: make(chan *websocketClient), registerWalletNtfns: make(chan struct{}), + registerChainSvrNtfns: make(chan struct{}), enqueueNotification: make(chan wsClientNotification), dequeueNotification: make(chan wsClientNotification), notificationHandlerQuit: make(chan struct{}), @@ -514,7 +516,6 @@ func (s *rpcServer) SetWallet(wallet *Wallet) { s.wallet = wallet s.registerWalletNtfns <- struct{}{} - chainSvrConnected := false if s.chainSvr != nil { // If the chain server rpc client is also set, there's no reason // to keep the mutex around. Make the locker simply execute @@ -525,14 +526,10 @@ func (s *rpcServer) SetWallet(wallet *Wallet) { // ok to run. s.handlerLookup = lookupAnyHandler - chainSvrConnected = !s.chainSvr.Disconnected() + // Make sure already connected websocket clients get a notification + // if the chain RPC client connection is set and connected. + s.chainSvr.NotifyConnected() } - - // Make sure already connected websocket clients get a notification - // if the chain RPC client connection is set and connected. This is - // run as a goroutine since it must acquire the handlerLock, which is - // locked here. - go wallet.notifyChainServerConnected(chainSvrConnected) } // SetChainServer sets the chain server client component needed to run a fully @@ -545,6 +542,8 @@ func (s *rpcServer) SetChainServer(chainSvr *chain.Client) { defer s.handlerLock.Unlock() s.chainSvr = chainSvr + s.registerChainSvrNtfns <- struct{}{} + if s.wallet != nil { // If the wallet had already been set, there's no reason to keep // the mutex around. Make the locker simply execute noops @@ -554,12 +553,6 @@ func (s *rpcServer) SetChainServer(chainSvr *chain.Client) { // With both the chain server and wallet set, all handlers are // ok to run. s.handlerLookup = lookupAnyHandler - - // Make sure already connected websocket clients get a - // notification if the chain RPC client connection is set and - // connected. This is run as a goroutine since it must acquire - // the handlerLock, which is locked here. - go s.wallet.notifyChainServerConnected(!chainSvr.Disconnected()) } } @@ -877,8 +870,8 @@ func (s *rpcServer) WebsocketClientRPC(wsc *websocketClient) { // TODO(jrick): this is crappy. kill it. s.handlerLock.Lock() - if s.wallet != nil && s.chainSvr != nil { - s.wallet.notifyChainServerConnected(!s.chainSvr.Disconnected()) + if s.chainSvr != nil { + s.chainSvr.NotifyConnected() } s.handlerLock.Unlock() @@ -1137,12 +1130,6 @@ out: "balance changes: %v", err) continue } - chainServerConnected, err := s.wallet.ListenChainServerConnected() - if err != nil { - log.Errorf("Could not register for chain server "+ - "connection changes: %v", err) - continue - } s.connectedBlocks = connectedBlocks s.disconnectedBlocks = disconnectedBlocks s.newCredits = newCredits @@ -1152,8 +1139,25 @@ out: s.keystoreLocked = keystoreLocked s.confirmedBalance = confirmedBalance s.unconfirmedBalance = unconfirmedBalance + + case <-s.registerChainSvrNtfns: + chainServerConnected, err := s.chainSvr.ListenConnected() + if err != nil { + log.Errorf("Could not register for chain server "+ + "connection changes: %v", err) + continue + } s.chainServerConnected = chainServerConnected + // Make sure already connected websocket clients get a + // notification for the current client connection state. + // + // TODO(jrick): I am appalled by doing this but trying + // not to change how notifications work for the moment. + // A revamped notification API without this horror will + // be implemented soon. + go s.chainSvr.NotifyConnected() + case <-s.quit: break out } @@ -1175,6 +1179,7 @@ func (s *rpcServer) drainNotifications() { case <-s.confirmedBalance: case <-s.unconfirmedBalance: case <-s.registerWalletNtfns: + case <-s.registerChainSvrNtfns: } } } diff --git a/wallet.go b/wallet.go index 80b17ba..f41c84b 100644 --- a/wallet.go +++ b/wallet.go @@ -95,13 +95,12 @@ type Wallet struct { // Notification channels so other components can listen in on wallet // activity. These are initialized as nil, and must be created by // calling one of the Listen* methods. - connectedBlocks chan keystore.BlockStamp - disconnectedBlocks chan keystore.BlockStamp - lockStateChanges chan bool // true when locked - confirmedBalance chan btcutil.Amount - unconfirmedBalance chan btcutil.Amount - chainServerConnected chan bool - notificationLock sync.Locker + connectedBlocks chan keystore.BlockStamp + disconnectedBlocks chan keystore.BlockStamp + lockStateChanges chan bool // true when locked + confirmedBalance chan btcutil.Amount + unconfirmedBalance chan btcutil.Amount + notificationLock sync.Locker wg sync.WaitGroup quit chan struct{} @@ -148,8 +147,6 @@ func (w *Wallet) updateNotificationLock() { case w.confirmedBalance == nil: fallthrough case w.unconfirmedBalance == nil: - fallthrough - case w.chainServerConnected == nil: return } w.notificationLock = noopLocker{} @@ -241,18 +238,6 @@ func (w *Wallet) ListenUnconfirmedBalance() (<-chan btcutil.Amount, error) { return w.unconfirmedBalance, nil } -func (w *Wallet) ListenChainServerConnected() (<-chan bool, error) { - w.notificationLock.Lock() - defer w.notificationLock.Unlock() - - if w.chainServerConnected != nil { - return nil, ErrDuplicateListen - } - w.chainServerConnected = make(chan bool) - w.updateNotificationLock() - return w.chainServerConnected, nil -} - func (w *Wallet) notifyConnectedBlock(block keystore.BlockStamp) { w.notificationLock.Lock() if w.connectedBlocks != nil { @@ -293,14 +278,6 @@ func (w *Wallet) notifyUnconfirmedBalance(bal btcutil.Amount) { w.notificationLock.Unlock() } -func (w *Wallet) notifyChainServerConnected(connected bool) { - w.notificationLock.Lock() - if w.chainServerConnected != nil { - w.chainServerConnected <- connected - } - w.notificationLock.Unlock() -} - // openWallet opens a new wallet from disk. func openWallet() (*Wallet, error) { netdir := networkDir(activeNet.Params) @@ -376,8 +353,6 @@ func (w *Wallet) Start(chainServer *chain.Client) { w.chainSvrLock.Lock() defer w.chainSvrLock.Unlock() - w.notifyChainServerConnected(!chainServer.Disconnected()) - w.chainSvr = chainServer w.chainSvrLock = noopLocker{}