From e837ca5b6485d7345b6983056074e58de34aeda0 Mon Sep 17 00:00:00 2001 From: Josh Rickmar Date: Mon, 17 Feb 2014 22:18:30 -0500 Subject: [PATCH] Cleanly remove disconnected clients. Also fixes a bug where responses for a single client would be sent to every connected client. --- acctmgr.go | 4 +-- cmd.go | 4 +-- ntfns.go | 13 ++++--- rpcserver.go | 10 +++--- sockets.go | 98 ++++++++++++++++++++-------------------------------- 5 files changed, 53 insertions(+), 76 deletions(-) diff --git a/acctmgr.go b/acctmgr.go index 9d0af21..7abbfde 100644 --- a/acctmgr.go +++ b/acctmgr.go @@ -229,8 +229,8 @@ func (am *AccountManager) BlockNotify(bs *wallet.BlockStamp) { // changes, or sending these notifications as the utxos are added. confirmed := a.CalculateBalance(1) unconfirmed := a.CalculateBalance(0) - confirmed - NotifyWalletBalance(frontendNotificationMaster, a.name, confirmed) - NotifyWalletBalanceUnconfirmed(frontendNotificationMaster, a.name, + NotifyWalletBalance(allClients, a.name, confirmed) + NotifyWalletBalanceUnconfirmed(allClients, a.name, unconfirmed) // If this is the default account, update the block all accounts diff --git a/cmd.go b/cmd.go index ed93725..7e3c3eb 100644 --- a/cmd.go +++ b/cmd.go @@ -227,7 +227,7 @@ func main() { } updateBtcd <- btcd - NotifyBtcdConnection(frontendNotificationMaster) + NotifyBtcdConnection(allClients) log.Info("Established connection to btcd") // Perform handshake. @@ -246,7 +246,7 @@ func main() { // Block goroutine until the connection is lost. <-btcd.closed - NotifyBtcdConnection(frontendNotificationMaster) + NotifyBtcdConnection(allClients) log.Info("Lost btcd connection") } } diff --git a/ntfns.go b/ntfns.go index c25f291..80059bc 100644 --- a/ntfns.go +++ b/ntfns.go @@ -129,7 +129,7 @@ func NtfnProcessedTx(n btcjson.Cmd) { } else { // Notify frontends of new recv tx and mark as notified. NotifiedRecvTxChans.add <- *recvTxOP - NotifyNewTxDetails(frontendNotificationMaster, a.Name(), t.TxInfo(a.Name(), + NotifyNewTxDetails(allClients, a.Name(), t.TxInfo(a.Name(), ptn.BlockHeight, a.Wallet.Net())[0]) } @@ -151,16 +151,15 @@ func NtfnProcessedTx(n btcjson.Cmd) { // the blockconnected notifiation is processed. if u.Height == -1 { bal := a.CalculateBalance(0) - a.CalculateBalance(1) - NotifyWalletBalanceUnconfirmed(frontendNotificationMaster, - a.name, bal) + NotifyWalletBalanceUnconfirmed(allClients, a.name, bal) } } // Notify frontends of new account balance. confirmed := a.CalculateBalance(1) unconfirmed := a.CalculateBalance(0) - confirmed - NotifyWalletBalance(frontendNotificationMaster, a.name, confirmed) - NotifyWalletBalanceUnconfirmed(frontendNotificationMaster, a.name, unconfirmed) + NotifyWalletBalance(allClients, a.name, confirmed) + NotifyWalletBalanceUnconfirmed(allClients, a.name, unconfirmed) } // NtfnBlockConnected handles btcd notifications resulting from newly @@ -208,7 +207,7 @@ func NtfnBlockConnected(n btcjson.Cmd) { // Pass notification to frontends too. marshaled, _ := n.MarshalJSON() - frontendNotificationMaster <- marshaled + allClients <- marshaled } // NtfnBlockDisconnected handles btcd notifications resulting from @@ -231,7 +230,7 @@ func NtfnBlockDisconnected(n btcjson.Cmd) { // Pass notification to frontends too. marshaled, _ := n.MarshalJSON() - frontendNotificationMaster <- marshaled + allClients <- marshaled } // NtfnTxMined handles btcd notifications resulting from newly diff --git a/rpcserver.go b/rpcserver.go index f0b3a7d..6477bab 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -1357,7 +1357,7 @@ func handleSendRawTxReply(icmd btcjson.Cmd, txIDStr string, a *Account, txInfo * bs, err := GetCurBlock() if err == nil { for _, details := range sendtx.TxInfo(a.Name(), bs.Height, a.Net()) { - NotifyNewTxDetails(frontendNotificationMaster, a.Name(), + NotifyNewTxDetails(allClients, a.Name(), details) } } @@ -1379,8 +1379,8 @@ func handleSendRawTxReply(icmd btcjson.Cmd, txIDStr string, a *Account, txInfo * // confirmed balance. confirmed := a.CalculateBalance(1) unconfirmed := a.CalculateBalance(0) - confirmed - NotifyWalletBalance(frontendNotificationMaster, a.name, confirmed) - NotifyWalletBalanceUnconfirmed(frontendNotificationMaster, a.name, unconfirmed) + NotifyWalletBalance(allClients, a.name, confirmed) + NotifyWalletBalanceUnconfirmed(allClients, a.name, unconfirmed) // btcd cannot be trusted to successfully relay the tx to the // Bitcoin network. Even if this succeeds, the rawtx must be @@ -1796,7 +1796,7 @@ type AccountNtfn struct { func NotifyWalletLockStateChange(account string, locked bool) { ntfn := btcws.NewWalletLockStateNtfn(account, locked) mntfn, _ := ntfn.MarshalJSON() - frontendNotificationMaster <- mntfn + allClients <- mntfn } // NotifyWalletBalance sends a confirmed account balance notification @@ -1892,7 +1892,7 @@ func NotifyMinedTxSender(in chan *tx.RecvTx) { recv.BlockHash.String(), recv.BlockHeight, recv.BlockTime, int(recv.BlockIndex)) mntfn, _ := ntfn.MarshalJSON() - frontendNotificationMaster <- mntfn + allClients <- mntfn // Mark as sent. m[recv.TxID] = struct{}{} diff --git a/sockets.go b/sockets.go index 5ee1939..444397c 100644 --- a/sockets.go +++ b/sockets.go @@ -52,13 +52,10 @@ var ( ErrConnLost = errors.New("connection lost") // Adds a frontend listener channel - addFrontendListener = make(chan (chan []byte)) - - // Removes a frontend listener channel - deleteFrontendListener = make(chan (chan []byte)) + addClient = make(chan clientContext) // Messages sent to this channel are sent to each connected frontend. - frontendNotificationMaster = make(chan []byte, 100) + allClients = make(chan []byte, 100) ) // server holds the items the RPC server may need to access (auth, @@ -69,6 +66,11 @@ type server struct { authsha [sha256.Size]byte } +type clientContext struct { + send chan []byte + disconnected chan struct{} // closed on disconnect +} + // parseListeners splits the list of listen addresses passed in addrs into // IPv4 and IPv6 slices and returns them. This allows easy creation of the // listeners on the correct interface "tcp4" and "tcp6". It also properly @@ -262,55 +264,26 @@ func (s *server) ServeRPCRequest(w http.ResponseWriter, r *http.Request) { } } -// frontendListenerDuplicator listens for new wallet listener channels -// and duplicates messages sent to frontendNotificationMaster to all -// connected listeners. -func frontendListenerDuplicator() { - // frontendListeners is a map holding each currently connected frontend - // listener as the key. The value is ignored, as this is only used as - // a set. - frontendListeners := make(map[chan []byte]bool) +// clientResponseDuplicator listens for new wallet listener channels +// and duplicates messages sent to allClients to all connected clients. +func clientResponseDuplicator() { + clients := make(map[clientContext]struct{}) - // Don't want to add or delete a wallet listener while iterating - // through each to propigate to every attached wallet. Use a binary - // semaphore to prevent this. - sem := make(chan struct{}, 1) - sem <- struct{}{} + for { + select { + case cc := <-addClient: + clients[cc] = struct{}{} - // Check for listener channels to add or remove from set. - go func() { - for { - select { - case c := <-addFrontendListener: - <-sem - frontendListeners[c] = true - sem <- struct{}{} - - NotifyBtcdConnection(c) - bs, err := GetCurBlock() - if err == nil { - NotifyNewBlockChainHeight(c, bs) - NotifyBalances(c) + case n := <-allClients: + for cc := range clients { + select { + case <-cc.disconnected: + delete(clients, cc) + default: + cc.send <- n } - - case c := <-deleteFrontendListener: - <-sem - delete(frontendListeners, c) - sem <- struct{}{} } } - }() - - // Duplicate all messages sent across frontendNotificationMaster, as - // well as internal btcwallet notifications, to each listening wallet. - for { - ntfn := <-frontendNotificationMaster - - <-sem - for c := range frontendListeners { - c <- ntfn - } - sem <- struct{}{} } } @@ -331,11 +304,12 @@ func NotifyBtcdConnection(reply chan []byte) { func WSSendRecv(ws *websocket.Conn) { // Add frontend notification channel to set so this handler receives // updates. - frontendNotification := make(chan []byte) - addFrontendListener <- frontendNotification - defer func() { - deleteFrontendListener <- frontendNotification - }() + cc := clientContext{ + send: make(chan []byte), + disconnected: make(chan struct{}), + } + addClient <- cc + defer close(cc.disconnected) // jsonMsgs receives JSON messages from the currently connected frontend. jsonMsgs := make(chan []byte) @@ -363,11 +337,15 @@ func WSSendRecv(ws *websocket.Conn) { // Handle request here. go func(m []byte) { resp := ReplyToFrontend(m, true) - frontendNotification <- resp + + select { + case cc.send <- resp: + case <-cc.disconnected: + } }(m) - case ntfn, _ := <-frontendNotification: - if err := websocket.Message.Send(ws, ntfn); err != nil { + case m := <-cc.send: + if err := websocket.Message.Send(ws, m); err != nil { // Frontend disconnected. return } @@ -395,7 +373,7 @@ func (s *server) Start() { // requests for each channel in the set. // // Use a sync.Once to insure no extra duplicators run. - go duplicateOnce.Do(frontendListenerDuplicator) + go duplicateOnce.Do(clientResponseDuplicator) log.Trace("Starting RPC server") @@ -554,8 +532,8 @@ func Handshake(rpc ServerConn) error { if err != nil { return fmt.Errorf("cannot get best block: %v", err) } - NotifyNewBlockChainHeight(frontendNotificationMaster, bs) - NotifyBalances(frontendNotificationMaster) + NotifyNewBlockChainHeight(allClients, bs) + NotifyBalances(allClients) // Get default account. Only the default account is used to // track recently-seen blocks.