diff --git a/infrastructure.go b/infrastructure.go index 02b57665..8102fc46 100644 --- a/infrastructure.go +++ b/infrastructure.go @@ -145,8 +145,8 @@ func (c *Client) NextID() uint64 { } // addRequest associates the passed jsonRequest with the passed id. This allows -// the response from the remote server to be unmarshalled to the appropiate type -// and sent to the specified channel when it is received. +// the response from the remote server to be unmarshalled to the appropriate +// type and sent to the specified channel when it is received. // // If the client has already begun shutting down, ErrClientShutdown is returned // and the request is not added. @@ -196,11 +196,8 @@ func (c *Client) removeRequest(id uint64) *jsonRequest { // removeAllRequests removes all the jsonRequests which contain the response // channels for outstanding requests. // -// This function is safe for concurrent access. +// This function MUST be called with the request lock held. func (c *Client) removeAllRequests() { - c.requestLock.Lock() - defer c.requestLock.Unlock() - c.requestMap = make(map[uint64]*list.Element) c.requestList.Init() } @@ -416,11 +413,11 @@ cleanup: // block until the send channel is full. func (c *Client) sendMessage(marshalledJSON []byte) { // Don't send the message if disconnected. - if c.Disconnected() { + select { + case c.sendChan <- marshalledJSON: + case <-c.disconnect: return } - - c.sendChan <- marshalledJSON } // reregisterNtfns creates and sends commands needed to re-establish the current @@ -824,14 +821,14 @@ func (c *Client) Disconnected() bool { return c.disconnected } -// Disconnect disconnects the current websocket associated with the client. The -// connection will automatically be re-established unless the client was -// created with the DisableAutoReconnect flag. +// doDisconnect disconnects the websocket associated with the client if it +// hasn't already been disconnected. It will return false if the disconnect is +// not needed or the client is running in HTTP POST mode. // -// This function has no effect when the client is running in HTTP POST mode. -func (c *Client) Disconnect() { +// This function is safe for concurrent access. +func (c *Client) doDisconnect() bool { if c.config.HttpPostMode { - return + return false } c.mtx.Lock() @@ -839,18 +836,51 @@ func (c *Client) Disconnect() { // Nothing to do if already disconnected. if c.disconnected { - return + return false } log.Tracef("Disconnecting RPC client %s", c.config.Host) close(c.disconnect) c.wsConn.Close() c.disconnected = true + return true +} + +// doShutdown closes the shutdown channel and logs the shutdown unless shutdown +// is already in progress. It will return false if the shutdown is not needed. +// +// This function is safe for concurrent access. +func (c *Client) doShutdown() bool { + // Ignore the shutdown request if the client is already in the process + // of shutting down or already shutdown. + select { + case <-c.shutdown: + return false + default: + } + + log.Tracef("Shutting down RPC client %s", c.config.Host) + close(c.shutdown) + return true +} + +// Disconnect disconnects the current websocket associated with the client. The +// connection will automatically be re-established unless the client was +// created with the DisableAutoReconnect flag. +// +// This function has no effect when the client is running in HTTP POST mode. +func (c *Client) Disconnect() { + // Nothing to do if already disconnected or running in HTTP POST mode. + if !c.doDisconnect() { + return + } + + c.requestLock.Lock() + defer c.requestLock.Unlock() // When operating without auto reconnect, send errors to any pending // requests and shutdown the client. if c.config.DisableAutoReconnect { - c.requestLock.Lock() for e := c.requestList.Front(); e != nil; e = e.Next() { req := e.Value.(*jsonRequest) req.responseChan <- &response{ @@ -858,9 +888,8 @@ func (c *Client) Disconnect() { err: ErrClientDisconnect, } } - c.requestLock.Unlock() c.removeAllRequests() - c.Shutdown() + c.doShutdown() } } @@ -868,19 +897,18 @@ func (c *Client) Disconnect() { // with the client and, when automatic reconnect is enabled, preventing future // attempts to reconnect. It also stops all goroutines. func (c *Client) Shutdown() { + // Do the shutdown under the request lock to prevent clients from + // adding new requests while the client shutdown process is initiated. + c.requestLock.Lock() + defer c.requestLock.Unlock() + // Ignore the shutdown request if the client is already in the process // of shutting down or already shutdown. - select { - case <-c.shutdown: + if !c.doShutdown() { return - default: } - log.Tracef("Shutting down RPC client %s", c.config.Host) - close(c.shutdown) - // Send the ErrClientShutdown error to any pending requests. - c.requestLock.Lock() for e := c.requestList.Front(); e != nil; e = e.Next() { req := e.Value.(*jsonRequest) req.responseChan <- &response{ @@ -888,10 +916,10 @@ func (c *Client) Shutdown() { err: ErrClientShutdown, } } - c.requestLock.Unlock() c.removeAllRequests() - c.Disconnect() + // Disconnect the client if needed. + c.doDisconnect() } // Start begins processing input and output messages.