diff --git a/rpcserver.go b/rpcserver.go index e7372118..205a933f 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -23,6 +23,7 @@ import ( "github.com/conformal/btcws" "github.com/conformal/fastsha256" "github.com/conformal/websocket" + "io" "io/ioutil" "math/big" "math/rand" @@ -70,7 +71,7 @@ var ( ErrBadParamsField = errors.New("bad params field") ) -type commandHandler func(*rpcServer, btcjson.Cmd) (interface{}, error) +type commandHandler func(*rpcServer, btcjson.Cmd, <-chan struct{}) (interface{}, error) // handlers maps RPC command strings to appropriate handler functions. // this is copied by init because help references rpcHandlers and thus causes @@ -210,6 +211,8 @@ type rpcServer struct { ntfnMgr *wsNotificationManager numClients int numClientsMutex sync.Mutex + statusLines map[int]string + statusLock sync.RWMutex wg sync.WaitGroup listeners []net.Listener workState *workState @@ -285,6 +288,65 @@ func (s *rpcServer) Start() { s.ntfnMgr.Start() } +// httpStatusLine returns a response Status-Line (RFC 2616 Section 6.1) +// for the given request and response status code. This function was lifted and +// adapted from the standard library HTTP server code since it's not exported. +func (s *rpcServer) httpStatusLine(req *http.Request, code int) string { + // Fast path: + key := code + proto11 := req.ProtoAtLeast(1, 1) + if !proto11 { + key = -key + } + s.statusLock.RLock() + line, ok := s.statusLines[key] + s.statusLock.RUnlock() + if ok { + return line + } + + // Slow path: + proto := "HTTP/1.0" + if proto11 { + proto = "HTTP/1.1" + } + codeStr := strconv.Itoa(code) + text := http.StatusText(code) + if text != "" { + line = proto + " " + codeStr + " " + text + "\r\n" + s.statusLock.Lock() + s.statusLines[key] = line + s.statusLock.Unlock() + } else { + text = "status code " + codeStr + line = proto + " " + codeStr + " " + text + "\r\n" + } + + return line +} + +// writeHTTPResponseHeaders writes the necessary response headers prior to +// writing an HTTP body given a request to use for protocol negotiation, headers +// to write, a status code, and a writer. +func (s *rpcServer) writeHTTPResponseHeaders(req *http.Request, headers http.Header, code int, w io.Writer) error { + _, err := io.WriteString(w, s.httpStatusLine(req, code)) + if err != nil { + return err + } + + err = headers.Write(w) + if err != nil { + return err + } + + _, err = io.WriteString(w, "\r\n") + if err != nil { + return err + } + + return nil +} + // limitConnections responds with a 503 service unavailable and returns true if // adding another client would exceed the maximum allow RPC clients. // @@ -406,10 +468,11 @@ func newRPCServer(listenAddrs []string, s *server) (*rpcServer, error) { login := cfg.RPCUser + ":" + cfg.RPCPass auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login)) rpc := rpcServer{ - authsha: fastsha256.Sum256([]byte(auth)), - server: s, - workState: newWorkState(), - quit: make(chan int), + authsha: fastsha256.Sum256([]byte(auth)), + server: s, + statusLines: make(map[int]string), + workState: newWorkState(), + quit: make(chan int), } rpc.ntfnMgr = newWsNotificationManager(&rpc) @@ -484,6 +547,33 @@ func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) { return } + // Unfortunately, the http server doesn't provide the ability to + // change the read deadline for the new connection and having one breaks + // long polling. However, not having a read deadline on the initial + // connection would mean clients can connect and idle forever. Thus, + // hijack the connecton from the HTTP server, clear the read deadline, + // and handle writing the response manually. + hj, ok := w.(http.Hijacker) + if !ok { + errMsg := "webserver doesn't support hijacking" + rpcsLog.Warnf(errMsg) + errCode := http.StatusInternalServerError + http.Error(w, strconv.FormatInt(int64(errCode), 10)+" "+errMsg, + errCode) + return + } + conn, buf, err := hj.Hijack() + if err != nil { + rpcsLog.Warnf("Failed to hijack HTTP connection: %v", err) + errCode := http.StatusInternalServerError + http.Error(w, strconv.FormatInt(int64(errCode), 10)+" "+ + err.Error(), errCode) + return + } + defer conn.Close() + defer buf.Flush() + conn.SetReadDeadline(timeZeroVal) + var reply btcjson.Reply cmd, jsonErr := parseCmd(body) if cmd != nil { @@ -495,14 +585,30 @@ func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) { if jsonErr != nil { reply.Error = jsonErr } else { - reply = standardCmdReply(cmd, s) + // Setup a close notifier. Since the connection is hijacked, + // the CloseNotifer on the ResponseWriter is not available. + closeChan := make(chan struct{}, 1) + go func() { + _, err := conn.Read(make([]byte, 1)) + if err != nil { + close(closeChan) + } + }() + + reply = standardCmdReply(cmd, s, closeChan) } rpcsLog.Tracef("reply: %v", reply) - msg, err := btcjson.MarshallAndSend(reply, w) + err = s.writeHTTPResponseHeaders(r, w.Header(), http.StatusOK, buf) if err != nil { - rpcsLog.Errorf(msg) + rpcsLog.Error(err) + return + } + + msg, err := btcjson.MarshallAndSend(reply, buf) + if err != nil { + rpcsLog.Error(err) return } rpcsLog.Tracef(msg) @@ -510,19 +616,19 @@ func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) { // handleUnimplemented is a temporary handler for commands that we should // support but do not. -func handleUnimplemented(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleUnimplemented(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { return nil, btcjson.ErrUnimplemented } // handleAskWallet is the handler for commands that we do recognise as valid // but that we can not answer correctly since it involves wallet state. // These commands will be implemented in btcwallet. -func handleAskWallet(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleAskWallet(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { return nil, btcjson.ErrNoWallet } // handleAddNode handles addnode commands. -func handleAddNode(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleAddNode(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { c := cmd.(*btcjson.AddNodeCmd) addr := normalizeAddress(c.Addr, activeNetParams.DefaultPort) @@ -564,7 +670,7 @@ func messageToHex(msg btcwire.Message) (string, error) { } // handleCreateRawTransaction handles createrawtransaction commands. -func handleCreateRawTransaction(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleCreateRawTransaction(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { c := cmd.(*btcjson.CreateRawTransactionCmd) // Add all transaction inputs to a new transaction after performing @@ -650,7 +756,7 @@ func handleCreateRawTransaction(s *rpcServer, cmd btcjson.Cmd) (interface{}, err } // handleDebugLevel handles debuglevel commands. -func handleDebugLevel(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleDebugLevel(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { c := cmd.(*btcjson.DebugLevelCmd) // Special show command to list supported subsystems. @@ -781,7 +887,7 @@ func createTxRawResult(net *btcnet.Params, txSha string, mtx *btcwire.MsgTx, } // handleDecodeRawTransaction handles decoderawtransaction commands. -func handleDecodeRawTransaction(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleDecodeRawTransaction(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { c := cmd.(*btcjson.DecodeRawTransactionCmd) // Deserialize the transaction. @@ -828,7 +934,7 @@ func handleDecodeRawTransaction(s *rpcServer, cmd btcjson.Cmd) (interface{}, err } // handleDecodeScript handles decodescript commands. -func handleDecodeScript(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleDecodeScript(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { c := cmd.(*btcjson.DecodeScriptCmd) // Convert the hex script to bytes. @@ -876,7 +982,7 @@ func handleDecodeScript(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { } // handleGetAddedNodeInfo handles getaddednodeinfo commands. -func handleGetAddedNodeInfo(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleGetAddedNodeInfo(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { c := cmd.(*btcjson.GetAddedNodeInfoCmd) // Retrieve a list of persistent (added) peers from the bitcoin server @@ -959,7 +1065,7 @@ func handleGetAddedNodeInfo(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) } // handleGetBestBlock implements the getbestblock command. -func handleGetBestBlock(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleGetBestBlock(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { // All other "get block" commands give either the height, the // hash, or both but require the block SHA. This gets both for // the best block. @@ -976,7 +1082,7 @@ func handleGetBestBlock(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { } // handleGetBestBlockHash implements the getbestblockhash command. -func handleGetBestBlockHash(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleGetBestBlockHash(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { sha, _, err := s.server.db.NewestSha() if err != nil { rpcsLog.Errorf("Error getting newest sha: %v", err) @@ -987,7 +1093,7 @@ func handleGetBestBlockHash(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) } // handleGetBlock implements the getblock command. -func handleGetBlock(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleGetBlock(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { c := cmd.(*btcjson.GetBlockCmd) sha, err := btcwire.NewShaHashFromStr(c.Hash) if err != nil { @@ -1083,7 +1189,7 @@ func handleGetBlock(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { } // handleGetBlockCount implements the getblockcount command. -func handleGetBlockCount(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleGetBlockCount(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { _, maxidx, err := s.server.db.NewestSha() if err != nil { rpcsLog.Errorf("Error getting newest sha: %v", err) @@ -1094,7 +1200,7 @@ func handleGetBlockCount(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { } // handleGetBlockHash implements the getblockhash command. -func handleGetBlockHash(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleGetBlockHash(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { c := cmd.(*btcjson.GetBlockHashCmd) sha, err := s.server.db.FetchBlockShaByHeight(c.Index) if err != nil { @@ -1106,17 +1212,17 @@ func handleGetBlockHash(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { } // handleGetConnectionCount implements the getconnectioncount command. -func handleGetConnectionCount(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleGetConnectionCount(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { return s.server.ConnectedCount(), nil } // handleGetCurrentNet implements the getcurrentnet command. -func handleGetCurrentNet(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleGetCurrentNet(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { return s.server.netParams.Net, nil } // handleGetDifficulty implements the getdifficulty command. -func handleGetDifficulty(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleGetDifficulty(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { sha, _, err := s.server.db.NewestSha() if err != nil { rpcsLog.Errorf("Error getting sha: %v", err) @@ -1131,18 +1237,18 @@ func handleGetDifficulty(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { } // handleGetGenerate implements the getgenerate command. -func handleGetGenerate(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleGetGenerate(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { return s.server.cpuMiner.IsMining(), nil } // handleGetHashesPerSec implements the gethashespersec command. -func handleGetHashesPerSec(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleGetHashesPerSec(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { return int64(s.server.cpuMiner.HashesPerSecond()), nil } // handleGetInfo implements the getinfo command. We only return the fields // that are not related to wallet functionality. -func handleGetInfo(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleGetInfo(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { // We require the current block height and sha. sha, height, err := s.server.db.NewestSha() if err != nil { @@ -1172,7 +1278,7 @@ func handleGetInfo(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { // handleGetMiningInfo implements the getmininginfo command. We only return the // fields that are not related to wallet functionality. -func handleGetMiningInfo(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleGetMiningInfo(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { sha, height, err := s.server.db.NewestSha() if err != nil { rpcsLog.Errorf("Error getting sha: %v", err) @@ -1201,7 +1307,8 @@ func handleGetMiningInfo(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { Message: err.Error(), } } - networkHashesPerSecIface, err := handleGetNetworkHashPS(s, gnhpsCmd) + networkHashesPerSecIface, err := handleGetNetworkHashPS(s, gnhpsCmd, + closeChan) if err != nil { // This is already a btcjson.Error from the handler. return nil, err @@ -1230,7 +1337,7 @@ func handleGetMiningInfo(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { } // handleGetNetTotals implements the getnettotals command. -func handleGetNetTotals(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleGetNetTotals(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { totalBytesRecv, totalBytesSent := s.server.NetTotals() reply := &btcjson.GetNetTotalsResult{ TotalBytesRecv: totalBytesRecv, @@ -1241,7 +1348,7 @@ func handleGetNetTotals(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { } // handleGetNetworkHashPS implements the getnetworkhashps command. -func handleGetNetworkHashPS(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleGetNetworkHashPS(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { c := cmd.(*btcjson.GetNetworkHashPSCmd) _, newestHeight, err := s.server.db.NewestSha() @@ -1329,12 +1436,12 @@ func handleGetNetworkHashPS(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) } // handleGetPeerInfo implements the getpeerinfo command. -func handleGetPeerInfo(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleGetPeerInfo(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { return s.server.PeerInfo(), nil } // handleGetRawMempool implements the getrawmempool command. -func handleGetRawMempool(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleGetRawMempool(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { c := cmd.(*btcjson.GetRawMempoolCmd) descs := s.server.txMemPool.TxDescs() @@ -1376,7 +1483,7 @@ func handleGetRawMempool(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { } // handleGetRawTransaction implements the getrawtransaction command. -func handleGetRawTransaction(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleGetRawTransaction(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { c := cmd.(*btcjson.GetRawTransactionCmd) // Convert the provided transaction hash hex to a ShaHash. @@ -1773,7 +1880,7 @@ func handleGetWorkSubmission(s *rpcServer, hexData string) (interface{}, error) } // handleGetWork implements the getwork command. -func handleGetWork(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleGetWork(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { c := cmd.(*btcjson.GetWorkCmd) // Respond with an error if there are no addresses to pay the created @@ -1836,7 +1943,7 @@ func getHelpText(cmdName string) (string, error) { } // handleHelp implements the help command. -func handleHelp(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleHelp(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { help := cmd.(*btcjson.HelpCmd) // if no args we give a list of all known commands @@ -1867,7 +1974,7 @@ func handleHelp(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { } // handlePing implements the ping command. -func handlePing(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handlePing(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { // Ask server to ping \o_ nonce, err := btcwire.RandomUint64() if err != nil { @@ -1880,7 +1987,7 @@ func handlePing(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { } // handleSendRawTransaction implements the sendrawtransaction command. -func handleSendRawTransaction(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleSendRawTransaction(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { c := cmd.(*btcjson.SendRawTransactionCmd) // Deserialize and send off to tx relay serializedTx, err := hex.DecodeString(c.HexTx) @@ -1929,7 +2036,7 @@ func handleSendRawTransaction(s *rpcServer, cmd btcjson.Cmd) (interface{}, error } // handleSetGenerate implements the setgenerate command. -func handleSetGenerate(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleSetGenerate(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { c := cmd.(*btcjson.SetGenerateCmd) // Disable generation regardless of the provided generate flag if the @@ -1961,13 +2068,13 @@ func handleSetGenerate(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { } // handleStop implements the stop command. -func handleStop(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleStop(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { s.server.Stop() return "btcd stopping.", nil } // handleSubmitBlock implements the submitblock command. -func handleSubmitBlock(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleSubmitBlock(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { c := cmd.(*btcjson.SubmitBlockCmd) // Deserialize and send off to block processor. serializedBlock, err := hex.DecodeString(c.HexBlock) @@ -2044,7 +2151,7 @@ func verifyChain(db btcdb.Db, level, depth int32) error { return nil } -func handleVerifyChain(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { +func handleVerifyChain(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) { c := cmd.(*btcjson.VerifyChainCmd) err := verifyChain(s.server.db, c.CheckLevel, c.CheckDepth) @@ -2072,7 +2179,7 @@ func parseCmd(b []byte) (btcjson.Cmd, *btcjson.Error) { // standardCmdReply checks that a parsed command is a standard // Bitcoin JSON-RPC command and runs the proper handler to reply to the // command. -func standardCmdReply(cmd btcjson.Cmd, s *rpcServer) (reply btcjson.Reply) { +func standardCmdReply(cmd btcjson.Cmd, s *rpcServer, closeChan <-chan struct{}) (reply btcjson.Reply) { id := cmd.Id() reply.Id = &id @@ -2094,7 +2201,7 @@ func standardCmdReply(cmd btcjson.Cmd, s *rpcServer) (reply btcjson.Reply) { return reply handled: - result, err := handler(s, cmd) + result, err := handler(s, cmd, closeChan) if err != nil { jsonErr, ok := err.(btcjson.Error) if !ok { diff --git a/rpcwebsocket.go b/rpcwebsocket.go index cc591d3e..2e6485fc 100644 --- a/rpcwebsocket.go +++ b/rpcwebsocket.go @@ -1015,7 +1015,7 @@ func (c *wsClient) handleMessage(msg []byte) { if !ok { // No websocket-specific handler so handle like a legacy // RPC connection. - response := standardCmdReply(cmd, c.server) + response := standardCmdReply(cmd, c.server, nil) reply, err := json.Marshal(response) if err != nil { rpcsLog.Errorf("Failed to marshal reply for <%s> "+