diff --git a/rpcserver.go b/rpcserver.go index 8e788ac..01c50c3 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -477,7 +477,6 @@ func (s *rpcServer) WebsocketClientRead(wsc *websocketClient) { } wsc.allRequests <- request } - s.wg.Done() } type rawRequest struct { @@ -582,40 +581,57 @@ type unauthedRequest struct { func (s *rpcServer) WebsocketClientGateway(wsc *websocketClient) { out: - for request := range wsc.allRequests { - // Get the method of the request and check whether it should be - // handled by wallet or passed down to btcd. If the latter, - // handle in a new goroutine (to not block or be blocked by - // the handling of actual wallet requests). - // - // This is done by unmarshaling the JSON bytes into a rawRequest - // to avoid the mangling of unmarshaling and re-marshaling of - // large JSON numbers, as well as the overhead of unneeded - // unmarshals and marshals. - var raw rawRequest - if err := json.Unmarshal(request, &raw); err != nil { - if !wsc.authenticated { - // Disconnect immediately. + // A for-select with a read of the quit channel is used instead of a + // for-range to provide clean shutdown. This is necessary due to + // WebsocketClientRead (which sends to the allRequests chan) not closing + // allRequests during shutdown if the remote websocket client is still + // connected. + for { + select { + case request, ok := <-wsc.allRequests: + if !ok { + // client disconnected break out } - err = wsc.send(marshalError(idPointer(raw.ID))) - if err != nil { - break out + // Get the method of the request and check whether it + // should be handled by wallet or passed down to btcd. + // If the latter, handle in a new goroutine (to not + // block or be blocked by the handling of actual wallet + // requests). + // + // This is done by unmarshaling the JSON bytes into a + // rawRequest to avoid the mangling of unmarshaling and + // re-marshaling of large JSON numbers, as well as the + // overhead of unneeded unmarshals and marshals. + var raw rawRequest + if err := json.Unmarshal(request, &raw); err != nil { + if !wsc.authenticated { + // Disconnect immediately. + break out + } + err = wsc.send(marshalError(idPointer(raw.ID))) + if err != nil { + break out + } + continue } - continue - } - f, ok := handlerFunc(raw.Method, true) - if ok || raw.Method == "authenticate" { - // unauthedRequests is buffered to the max number of - // concurrent websocket client requests so as to not - // block the passthrough of later btcd requests. - wsc.unauthedRequests <- unauthedRequest{request, f} - } else { - // websocketPassthrough is run as a goroutine to - // send an unhandled request to the chain server without - // blocking the handling of later wallet requests. - go s.websocketPassthrough(wsc, raw) + f, ok := handlerFunc(raw.Method, true) + if ok || raw.Method == "authenticate" { + // unauthedRequests is buffered to the max + // number of concurrent websocket client + // requests so as to not block the passthrough + // of later btcd requests. + wsc.unauthedRequests <- unauthedRequest{request, f} + } else { + // websocketPassthrough is run as a goroutine to + // send an unhandled request to the chain server + // without blocking the handling of later wallet + // requests. + go s.websocketPassthrough(wsc, raw) + } + case <-s.quit: + break out } } close(wsc.unauthedRequests) @@ -773,8 +789,13 @@ func (s *rpcServer) WebsocketClientRPC(wsc *websocketClient) { return } - s.wg.Add(4) + // WebsocketClientRead is intentionally not run with the waitgroup + // so it is ignored during shutdown. This is to prevent a hang during + // shutdown where the goroutine is blocked on a read of the + // websocket connection if the client is still connected. go s.WebsocketClientRead(wsc) + + s.wg.Add(3) go s.WebsocketClientGateway(wsc) go s.WebsocketClientRespond(wsc) go s.WebsocketClientSend(wsc) @@ -1044,7 +1065,15 @@ out: jsonErr.Code = btcjson.ErrWallet.Code } } - r.response <- handlerResponse{result, jsonErr} + // The goroutine which requested this may not be running + // anymore. If the quit chan is read instead, break out + // of the loop now so more requests aren't potentially + // read after reentering the loop. + select { + case r.response <- handlerResponse{result, jsonErr}: + case <-s.quit: + break out + } case <-s.quit: break out