diff --git a/chain/chain.go b/chain/chain.go index 15181c4..fa98b15 100644 --- a/chain/chain.go +++ b/chain/chain.go @@ -38,6 +38,13 @@ type Client struct { dequeueNotification chan interface{} currentBlock chan *keystore.BlockStamp + // Notification channels regarding the state of the client. These exist + // so other components can listen in on chain activity. These are + // initialized as nil, and must be created by calling one of the Listen* + // methods. + connected chan bool + notificationLock sync.Locker + quit chan struct{} wg sync.WaitGroup started bool @@ -50,6 +57,7 @@ func NewClient(net *btcnet.Params, connect, user, pass string, certs []byte) (*C enqueueNotification: make(chan interface{}), dequeueNotification: make(chan interface{}), currentBlock: make(chan *keystore.BlockStamp), + notificationLock: new(sync.Mutex), quit: make(chan struct{}), } initializedClient := make(chan struct{}) @@ -144,7 +152,6 @@ func (c *Client) BlockStamp() (*keystore.BlockStamp, error) { // btcrpcclient callbacks, which isn't very Go-like and doesn't allow // blocking client calls. type ( - ClientConnected struct{} BlockConnected keystore.BlockStamp BlockDisconnected keystore.BlockStamp RecvTx struct { @@ -188,7 +195,7 @@ func parseBlock(block *btcws.BlockDetails) (blk *txstore.Block, idx int, err err func (c *Client) onClientConnect() { log.Info("Established websocket RPC connection to btcd") - c.enqueueNotification <- ClientConnected{} + c.notifyConnected(true) } func (c *Client) onBlockConnected(hash *btcwire.ShaHash, height int32) { @@ -308,3 +315,49 @@ out: close(c.dequeueNotification) c.wg.Done() } + +// ErrDuplicateListen is returned for any attempts to listen for the same +// notification more than once. If callers must pass along a notifiation to +// multiple places, they must broadcast it themself. +var ErrDuplicateListen = errors.New("duplicate listen") + +type noopLocker struct{} + +func (noopLocker) Lock() {} +func (noopLocker) Unlock() {} + +// ListenConnected returns a channel that passes the current connection state +// of the client. This will be automatically sent to when the client is first +// connected, as well as the current state whenever NotifyConnected is +// forcibly called. +// +// If this is called twice, ErrDuplicateListen is returned. +func (c *Client) ListenConnected() (<-chan bool, error) { + c.notificationLock.Lock() + defer c.notificationLock.Unlock() + + if c.connected != nil { + return nil, ErrDuplicateListen + } + c.connected = make(chan bool) + c.notificationLock = noopLocker{} + return c.connected, nil +} + +func (c *Client) notifyConnected(connected bool) { + c.notificationLock.Lock() + if c.connected != nil { + c.connected <- connected + } + c.notificationLock.Unlock() +} + +// NotifyConnected sends the channel notification for a connected or +// disconnected client. This is exported so it can be called by other +// packages which require notifying the current connection state. +// +// TODO: This shouldn't exist, but the current notification API requires it. +func (c *Client) NotifyConnected() { + connected := !c.Client.Disconnected() + c.notifyConnected(connected) +} diff --git a/chainntfns.go b/chainntfns.go index 8b1e297..07d706b 100644 --- a/chainntfns.go +++ b/chainntfns.go @@ -28,8 +28,6 @@ func (w *Wallet) handleChainNotifications() { for n := range w.chainSvr.Notifications() { var err error switch n := n.(type) { - case chain.ClientConnected: - w.notifyChainServerConnected(true) case chain.BlockConnected: w.connectBlock(keystore.BlockStamp(n)) case chain.BlockDisconnected: diff --git a/rpcserver.go b/rpcserver.go index d018f46..ac7e3e9 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -265,17 +265,18 @@ type rpcServer struct { // Channels read from other components from which notifications are // created. - connectedBlocks <-chan keystore.BlockStamp - disconnectedBlocks <-chan keystore.BlockStamp - newCredits <-chan txstore.Credit - newDebits <-chan txstore.Debits - minedCredits <-chan txstore.Credit - minedDebits <-chan txstore.Debits - keystoreLocked <-chan bool - confirmedBalance <-chan btcutil.Amount - unconfirmedBalance <-chan btcutil.Amount - chainServerConnected <-chan bool - registerWalletNtfns chan struct{} + connectedBlocks <-chan keystore.BlockStamp + disconnectedBlocks <-chan keystore.BlockStamp + newCredits <-chan txstore.Credit + newDebits <-chan txstore.Debits + minedCredits <-chan txstore.Credit + minedDebits <-chan txstore.Debits + keystoreLocked <-chan bool + confirmedBalance <-chan btcutil.Amount + unconfirmedBalance <-chan btcutil.Amount + chainServerConnected <-chan bool + registerWalletNtfns chan struct{} + registerChainSvrNtfns chan struct{} // enqueueNotification and dequeueNotification handle both sides of an // infinitly growing queue for websocket client notifications. @@ -310,6 +311,7 @@ func newRPCServer(listenAddrs []string, maxPost, maxWebsockets int64) (*rpcServe registerWSC: make(chan *websocketClient), unregisterWSC: make(chan *websocketClient), registerWalletNtfns: make(chan struct{}), + registerChainSvrNtfns: make(chan struct{}), enqueueNotification: make(chan wsClientNotification), dequeueNotification: make(chan wsClientNotification), notificationHandlerQuit: make(chan struct{}), @@ -514,7 +516,6 @@ func (s *rpcServer) SetWallet(wallet *Wallet) { s.wallet = wallet s.registerWalletNtfns <- struct{}{} - chainSvrConnected := false if s.chainSvr != nil { // If the chain server rpc client is also set, there's no reason // to keep the mutex around. Make the locker simply execute @@ -525,14 +526,10 @@ func (s *rpcServer) SetWallet(wallet *Wallet) { // ok to run. s.handlerLookup = lookupAnyHandler - chainSvrConnected = !s.chainSvr.Disconnected() + // Make sure already connected websocket clients get a notification + // if the chain RPC client connection is set and connected. + s.chainSvr.NotifyConnected() } - - // Make sure already connected websocket clients get a notification - // if the chain RPC client connection is set and connected. This is - // run as a goroutine since it must acquire the handlerLock, which is - // locked here. - go wallet.notifyChainServerConnected(chainSvrConnected) } // SetChainServer sets the chain server client component needed to run a fully @@ -545,6 +542,8 @@ func (s *rpcServer) SetChainServer(chainSvr *chain.Client) { defer s.handlerLock.Unlock() s.chainSvr = chainSvr + s.registerChainSvrNtfns <- struct{}{} + if s.wallet != nil { // If the wallet had already been set, there's no reason to keep // the mutex around. Make the locker simply execute noops @@ -554,12 +553,6 @@ func (s *rpcServer) SetChainServer(chainSvr *chain.Client) { // With both the chain server and wallet set, all handlers are // ok to run. s.handlerLookup = lookupAnyHandler - - // Make sure already connected websocket clients get a - // notification if the chain RPC client connection is set and - // connected. This is run as a goroutine since it must acquire - // the handlerLock, which is locked here. - go s.wallet.notifyChainServerConnected(!chainSvr.Disconnected()) } } @@ -877,8 +870,8 @@ func (s *rpcServer) WebsocketClientRPC(wsc *websocketClient) { // TODO(jrick): this is crappy. kill it. s.handlerLock.Lock() - if s.wallet != nil && s.chainSvr != nil { - s.wallet.notifyChainServerConnected(!s.chainSvr.Disconnected()) + if s.chainSvr != nil { + s.chainSvr.NotifyConnected() } s.handlerLock.Unlock() @@ -1137,12 +1130,6 @@ out: "balance changes: %v", err) continue } - chainServerConnected, err := s.wallet.ListenChainServerConnected() - if err != nil { - log.Errorf("Could not register for chain server "+ - "connection changes: %v", err) - continue - } s.connectedBlocks = connectedBlocks s.disconnectedBlocks = disconnectedBlocks s.newCredits = newCredits @@ -1152,8 +1139,25 @@ out: s.keystoreLocked = keystoreLocked s.confirmedBalance = confirmedBalance s.unconfirmedBalance = unconfirmedBalance + + case <-s.registerChainSvrNtfns: + chainServerConnected, err := s.chainSvr.ListenConnected() + if err != nil { + log.Errorf("Could not register for chain server "+ + "connection changes: %v", err) + continue + } s.chainServerConnected = chainServerConnected + // Make sure already connected websocket clients get a + // notification for the current client connection state. + // + // TODO(jrick): I am appalled by doing this but trying + // not to change how notifications work for the moment. + // A revamped notification API without this horror will + // be implemented soon. + go s.chainSvr.NotifyConnected() + case <-s.quit: break out } @@ -1175,6 +1179,7 @@ func (s *rpcServer) drainNotifications() { case <-s.confirmedBalance: case <-s.unconfirmedBalance: case <-s.registerWalletNtfns: + case <-s.registerChainSvrNtfns: } } } diff --git a/wallet.go b/wallet.go index 80b17ba..f41c84b 100644 --- a/wallet.go +++ b/wallet.go @@ -95,13 +95,12 @@ type Wallet struct { // Notification channels so other components can listen in on wallet // activity. These are initialized as nil, and must be created by // calling one of the Listen* methods. - connectedBlocks chan keystore.BlockStamp - disconnectedBlocks chan keystore.BlockStamp - lockStateChanges chan bool // true when locked - confirmedBalance chan btcutil.Amount - unconfirmedBalance chan btcutil.Amount - chainServerConnected chan bool - notificationLock sync.Locker + connectedBlocks chan keystore.BlockStamp + disconnectedBlocks chan keystore.BlockStamp + lockStateChanges chan bool // true when locked + confirmedBalance chan btcutil.Amount + unconfirmedBalance chan btcutil.Amount + notificationLock sync.Locker wg sync.WaitGroup quit chan struct{} @@ -148,8 +147,6 @@ func (w *Wallet) updateNotificationLock() { case w.confirmedBalance == nil: fallthrough case w.unconfirmedBalance == nil: - fallthrough - case w.chainServerConnected == nil: return } w.notificationLock = noopLocker{} @@ -241,18 +238,6 @@ func (w *Wallet) ListenUnconfirmedBalance() (<-chan btcutil.Amount, error) { return w.unconfirmedBalance, nil } -func (w *Wallet) ListenChainServerConnected() (<-chan bool, error) { - w.notificationLock.Lock() - defer w.notificationLock.Unlock() - - if w.chainServerConnected != nil { - return nil, ErrDuplicateListen - } - w.chainServerConnected = make(chan bool) - w.updateNotificationLock() - return w.chainServerConnected, nil -} - func (w *Wallet) notifyConnectedBlock(block keystore.BlockStamp) { w.notificationLock.Lock() if w.connectedBlocks != nil { @@ -293,14 +278,6 @@ func (w *Wallet) notifyUnconfirmedBalance(bal btcutil.Amount) { w.notificationLock.Unlock() } -func (w *Wallet) notifyChainServerConnected(connected bool) { - w.notificationLock.Lock() - if w.chainServerConnected != nil { - w.chainServerConnected <- connected - } - w.notificationLock.Unlock() -} - // openWallet opens a new wallet from disk. func openWallet() (*Wallet, error) { netdir := networkDir(activeNet.Params) @@ -376,8 +353,6 @@ func (w *Wallet) Start(chainServer *chain.Client) { w.chainSvrLock.Lock() defer w.chainSvrLock.Unlock() - w.notifyChainServerConnected(!chainServer.Disconnected()) - w.chainSvr = chainServer w.chainSvrLock = noopLocker{}