diff --git a/config.go b/config.go index 5a04d8c1..7dff0200 100644 --- a/config.go +++ b/config.go @@ -40,6 +40,7 @@ const ( defaultBanThreshold = 100 defaultMaxRPCClients = 10 defaultMaxRPCWebsockets = 25 + defaultMaxRPCConcurrentReqs = 20 defaultVerifyEnabled = false defaultDbType = "ffldb" defaultFreeTxRelayLimit = 15.0 @@ -82,73 +83,74 @@ func minUint32(a, b uint32) uint32 { // // See loadConfig for details on the configuration load process. type config struct { - ShowVersion bool `short:"V" long:"version" description:"Display version information and exit"` - ConfigFile string `short:"C" long:"configfile" description:"Path to configuration file"` - DataDir string `short:"b" long:"datadir" description:"Directory to store data"` - LogDir string `long:"logdir" description:"Directory to log output."` - AddPeers []string `short:"a" long:"addpeer" description:"Add a peer to connect with at startup"` - ConnectPeers []string `long:"connect" description:"Connect only to the specified peers at startup"` - DisableListen bool `long:"nolisten" description:"Disable listening for incoming connections -- NOTE: Listening is automatically disabled if the --connect or --proxy options are used without also specifying listen interfaces via --listen"` - Listeners []string `long:"listen" description:"Add an interface/port to listen for connections (default all interfaces port: 8333, testnet: 18333)"` - MaxPeers int `long:"maxpeers" description:"Max number of inbound and outbound peers"` - DisableBanning bool `long:"nobanning" description:"Disable banning of misbehaving peers"` - BanDuration time.Duration `long:"banduration" description:"How long to ban misbehaving peers. Valid time units are {s, m, h}. Minimum 1 second"` - BanThreshold uint32 `long:"banthreshold" description:"Maximum allowed ban score before disconnecting and banning misbehaving peers."` - RPCUser string `short:"u" long:"rpcuser" description:"Username for RPC connections"` - RPCPass string `short:"P" long:"rpcpass" default-mask:"-" description:"Password for RPC connections"` - RPCLimitUser string `long:"rpclimituser" description:"Username for limited RPC connections"` - RPCLimitPass string `long:"rpclimitpass" default-mask:"-" description:"Password for limited RPC connections"` - RPCListeners []string `long:"rpclisten" description:"Add an interface/port to listen for RPC connections (default port: 8334, testnet: 18334)"` - RPCCert string `long:"rpccert" description:"File containing the certificate file"` - RPCKey string `long:"rpckey" description:"File containing the certificate key"` - RPCMaxClients int `long:"rpcmaxclients" description:"Max number of RPC clients for standard connections"` - RPCMaxWebsockets int `long:"rpcmaxwebsockets" description:"Max number of RPC websocket connections"` - DisableRPC bool `long:"norpc" description:"Disable built-in RPC server -- NOTE: The RPC server is disabled by default if no rpcuser/rpcpass or rpclimituser/rpclimitpass is specified"` - DisableTLS bool `long:"notls" description:"Disable TLS for the RPC server -- NOTE: This is only allowed if the RPC server is bound to localhost"` - DisableDNSSeed bool `long:"nodnsseed" description:"Disable DNS seeding for peers"` - ExternalIPs []string `long:"externalip" description:"Add an ip to the list of local addresses we claim to listen on to peers"` - Proxy string `long:"proxy" description:"Connect via SOCKS5 proxy (eg. 127.0.0.1:9050)"` - ProxyUser string `long:"proxyuser" description:"Username for proxy server"` - ProxyPass string `long:"proxypass" default-mask:"-" description:"Password for proxy server"` - OnionProxy string `long:"onion" description:"Connect to tor hidden services via SOCKS5 proxy (eg. 127.0.0.1:9050)"` - OnionProxyUser string `long:"onionuser" description:"Username for onion proxy server"` - OnionProxyPass string `long:"onionpass" default-mask:"-" description:"Password for onion proxy server"` - NoOnion bool `long:"noonion" description:"Disable connecting to tor hidden services"` - TorIsolation bool `long:"torisolation" description:"Enable Tor stream isolation by randomizing user credentials for each connection."` - TestNet3 bool `long:"testnet" description:"Use the test network"` - RegressionTest bool `long:"regtest" description:"Use the regression test network"` - SimNet bool `long:"simnet" description:"Use the simulation test network"` - DisableCheckpoints bool `long:"nocheckpoints" description:"Disable built-in checkpoints. Don't do this unless you know what you're doing."` - DbType string `long:"dbtype" description:"Database backend to use for the Block Chain"` - Profile string `long:"profile" description:"Enable HTTP profiling on given port -- NOTE port must be between 1024 and 65536"` - CPUProfile string `long:"cpuprofile" description:"Write CPU profile to the specified file"` - DebugLevel string `short:"d" long:"debuglevel" description:"Logging level for all subsystems {trace, debug, info, warn, error, critical} -- You may also specify =,=,... to set the log level for individual subsystems -- Use show to list available subsystems"` - Upnp bool `long:"upnp" description:"Use UPnP to map our listening port outside of NAT"` - MinRelayTxFee float64 `long:"minrelaytxfee" description:"The minimum transaction fee in BTC/kB to be considered a non-zero fee."` - FreeTxRelayLimit float64 `long:"limitfreerelay" description:"Limit relay of transactions with no transaction fee to the given amount in thousands of bytes per minute"` - NoRelayPriority bool `long:"norelaypriority" description:"Do not require free or low-fee transactions to have high priority for relaying"` - MaxOrphanTxs int `long:"maxorphantx" description:"Max number of orphan transactions to keep in memory"` - Generate bool `long:"generate" description:"Generate (mine) bitcoins using the CPU"` - MiningAddrs []string `long:"miningaddr" description:"Add the specified payment address to the list of addresses to use for generated blocks -- At least one address is required if the generate option is set"` - BlockMinSize uint32 `long:"blockminsize" description:"Mininum block size in bytes to be used when creating a block"` - BlockMaxSize uint32 `long:"blockmaxsize" description:"Maximum block size in bytes to be used when creating a block"` - BlockPrioritySize uint32 `long:"blockprioritysize" description:"Size in bytes for high-priority/low-fee transactions when creating a block"` - GetWorkKeys []string `long:"getworkkey" description:"DEPRECATED -- Use the --miningaddr option instead"` - NoPeerBloomFilters bool `long:"nopeerbloomfilters" description:"Disable bloom filtering support"` - SigCacheMaxSize uint `long:"sigcachemaxsize" description:"The maximum number of entries in the signature verification cache"` - BlocksOnly bool `long:"blocksonly" description:"Do not accept transactions from remote peers."` - TxIndex bool `long:"txindex" description:"Maintain a full hash-based transaction index which makes all transactions available via the getrawtransaction RPC"` - DropTxIndex bool `long:"droptxindex" description:"Deletes the hash-based transaction index from the database on start up and then exits."` - AddrIndex bool `long:"addrindex" description:"Maintain a full address-based transaction index which makes the searchrawtransactions RPC available"` - DropAddrIndex bool `long:"dropaddrindex" description:"Deletes the address-based transaction index from the database on start up and then exits."` - RelayNonStd bool `long:"relaynonstd" description:"Relay non-standard transactions regardless of the default settings for the active network."` - RejectNonStd bool `long:"rejectnonstd" description:"Reject non-standard transactions regardless of the default settings for the active network."` - onionlookup func(string) ([]net.IP, error) - lookup func(string) ([]net.IP, error) - oniondial func(string, string) (net.Conn, error) - dial func(string, string) (net.Conn, error) - miningAddrs []btcutil.Address - minRelayTxFee btcutil.Amount + ShowVersion bool `short:"V" long:"version" description:"Display version information and exit"` + ConfigFile string `short:"C" long:"configfile" description:"Path to configuration file"` + DataDir string `short:"b" long:"datadir" description:"Directory to store data"` + LogDir string `long:"logdir" description:"Directory to log output."` + AddPeers []string `short:"a" long:"addpeer" description:"Add a peer to connect with at startup"` + ConnectPeers []string `long:"connect" description:"Connect only to the specified peers at startup"` + DisableListen bool `long:"nolisten" description:"Disable listening for incoming connections -- NOTE: Listening is automatically disabled if the --connect or --proxy options are used without also specifying listen interfaces via --listen"` + Listeners []string `long:"listen" description:"Add an interface/port to listen for connections (default all interfaces port: 8333, testnet: 18333)"` + MaxPeers int `long:"maxpeers" description:"Max number of inbound and outbound peers"` + DisableBanning bool `long:"nobanning" description:"Disable banning of misbehaving peers"` + BanDuration time.Duration `long:"banduration" description:"How long to ban misbehaving peers. Valid time units are {s, m, h}. Minimum 1 second"` + BanThreshold uint32 `long:"banthreshold" description:"Maximum allowed ban score before disconnecting and banning misbehaving peers."` + RPCUser string `short:"u" long:"rpcuser" description:"Username for RPC connections"` + RPCPass string `short:"P" long:"rpcpass" default-mask:"-" description:"Password for RPC connections"` + RPCLimitUser string `long:"rpclimituser" description:"Username for limited RPC connections"` + RPCLimitPass string `long:"rpclimitpass" default-mask:"-" description:"Password for limited RPC connections"` + RPCListeners []string `long:"rpclisten" description:"Add an interface/port to listen for RPC connections (default port: 8334, testnet: 18334)"` + RPCCert string `long:"rpccert" description:"File containing the certificate file"` + RPCKey string `long:"rpckey" description:"File containing the certificate key"` + RPCMaxClients int `long:"rpcmaxclients" description:"Max number of RPC clients for standard connections"` + RPCMaxWebsockets int `long:"rpcmaxwebsockets" description:"Max number of RPC websocket connections"` + RPCMaxConcurrentReqs int `long:"rpcmaxconcurrentreqs" description:"Max number of concurrent RPC requests that may be processed concurrently"` + DisableRPC bool `long:"norpc" description:"Disable built-in RPC server -- NOTE: The RPC server is disabled by default if no rpcuser/rpcpass or rpclimituser/rpclimitpass is specified"` + DisableTLS bool `long:"notls" description:"Disable TLS for the RPC server -- NOTE: This is only allowed if the RPC server is bound to localhost"` + DisableDNSSeed bool `long:"nodnsseed" description:"Disable DNS seeding for peers"` + ExternalIPs []string `long:"externalip" description:"Add an ip to the list of local addresses we claim to listen on to peers"` + Proxy string `long:"proxy" description:"Connect via SOCKS5 proxy (eg. 127.0.0.1:9050)"` + ProxyUser string `long:"proxyuser" description:"Username for proxy server"` + ProxyPass string `long:"proxypass" default-mask:"-" description:"Password for proxy server"` + OnionProxy string `long:"onion" description:"Connect to tor hidden services via SOCKS5 proxy (eg. 127.0.0.1:9050)"` + OnionProxyUser string `long:"onionuser" description:"Username for onion proxy server"` + OnionProxyPass string `long:"onionpass" default-mask:"-" description:"Password for onion proxy server"` + NoOnion bool `long:"noonion" description:"Disable connecting to tor hidden services"` + TorIsolation bool `long:"torisolation" description:"Enable Tor stream isolation by randomizing user credentials for each connection."` + TestNet3 bool `long:"testnet" description:"Use the test network"` + RegressionTest bool `long:"regtest" description:"Use the regression test network"` + SimNet bool `long:"simnet" description:"Use the simulation test network"` + DisableCheckpoints bool `long:"nocheckpoints" description:"Disable built-in checkpoints. Don't do this unless you know what you're doing."` + DbType string `long:"dbtype" description:"Database backend to use for the Block Chain"` + Profile string `long:"profile" description:"Enable HTTP profiling on given port -- NOTE port must be between 1024 and 65536"` + CPUProfile string `long:"cpuprofile" description:"Write CPU profile to the specified file"` + DebugLevel string `short:"d" long:"debuglevel" description:"Logging level for all subsystems {trace, debug, info, warn, error, critical} -- You may also specify =,=,... to set the log level for individual subsystems -- Use show to list available subsystems"` + Upnp bool `long:"upnp" description:"Use UPnP to map our listening port outside of NAT"` + MinRelayTxFee float64 `long:"minrelaytxfee" description:"The minimum transaction fee in BTC/kB to be considered a non-zero fee."` + FreeTxRelayLimit float64 `long:"limitfreerelay" description:"Limit relay of transactions with no transaction fee to the given amount in thousands of bytes per minute"` + NoRelayPriority bool `long:"norelaypriority" description:"Do not require free or low-fee transactions to have high priority for relaying"` + MaxOrphanTxs int `long:"maxorphantx" description:"Max number of orphan transactions to keep in memory"` + Generate bool `long:"generate" description:"Generate (mine) bitcoins using the CPU"` + MiningAddrs []string `long:"miningaddr" description:"Add the specified payment address to the list of addresses to use for generated blocks -- At least one address is required if the generate option is set"` + BlockMinSize uint32 `long:"blockminsize" description:"Mininum block size in bytes to be used when creating a block"` + BlockMaxSize uint32 `long:"blockmaxsize" description:"Maximum block size in bytes to be used when creating a block"` + BlockPrioritySize uint32 `long:"blockprioritysize" description:"Size in bytes for high-priority/low-fee transactions when creating a block"` + GetWorkKeys []string `long:"getworkkey" description:"DEPRECATED -- Use the --miningaddr option instead"` + NoPeerBloomFilters bool `long:"nopeerbloomfilters" description:"Disable bloom filtering support"` + SigCacheMaxSize uint `long:"sigcachemaxsize" description:"The maximum number of entries in the signature verification cache"` + BlocksOnly bool `long:"blocksonly" description:"Do not accept transactions from remote peers."` + TxIndex bool `long:"txindex" description:"Maintain a full hash-based transaction index which makes all transactions available via the getrawtransaction RPC"` + DropTxIndex bool `long:"droptxindex" description:"Deletes the hash-based transaction index from the database on start up and then exits."` + AddrIndex bool `long:"addrindex" description:"Maintain a full address-based transaction index which makes the searchrawtransactions RPC available"` + DropAddrIndex bool `long:"dropaddrindex" description:"Deletes the address-based transaction index from the database on start up and then exits."` + RelayNonStd bool `long:"relaynonstd" description:"Relay non-standard transactions regardless of the default settings for the active network."` + RejectNonStd bool `long:"rejectnonstd" description:"Reject non-standard transactions regardless of the default settings for the active network."` + onionlookup func(string) ([]net.IP, error) + lookup func(string) ([]net.IP, error) + oniondial func(string, string) (net.Conn, error) + dial func(string, string) (net.Conn, error) + miningAddrs []btcutil.Address + minRelayTxFee btcutil.Amount } // serviceOptions defines the configuration options for btcd as a service on @@ -334,28 +336,29 @@ func newConfigParser(cfg *config, so *serviceOptions, options flags.Options) *fl func loadConfig() (*config, []string, error) { // Default config. cfg := config{ - ConfigFile: defaultConfigFile, - DebugLevel: defaultLogLevel, - MaxPeers: defaultMaxPeers, - BanDuration: defaultBanDuration, - BanThreshold: defaultBanThreshold, - RPCMaxClients: defaultMaxRPCClients, - RPCMaxWebsockets: defaultMaxRPCWebsockets, - DataDir: defaultDataDir, - LogDir: defaultLogDir, - DbType: defaultDbType, - RPCKey: defaultRPCKeyFile, - RPCCert: defaultRPCCertFile, - MinRelayTxFee: mempool.DefaultMinRelayTxFee.ToBTC(), - FreeTxRelayLimit: defaultFreeTxRelayLimit, - BlockMinSize: defaultBlockMinSize, - BlockMaxSize: defaultBlockMaxSize, - BlockPrioritySize: mempool.DefaultBlockPrioritySize, - MaxOrphanTxs: defaultMaxOrphanTransactions, - SigCacheMaxSize: defaultSigCacheMaxSize, - Generate: defaultGenerate, - TxIndex: defaultTxIndex, - AddrIndex: defaultAddrIndex, + ConfigFile: defaultConfigFile, + DebugLevel: defaultLogLevel, + MaxPeers: defaultMaxPeers, + BanDuration: defaultBanDuration, + BanThreshold: defaultBanThreshold, + RPCMaxClients: defaultMaxRPCClients, + RPCMaxWebsockets: defaultMaxRPCWebsockets, + RPCMaxConcurrentReqs: defaultMaxRPCConcurrentReqs, + DataDir: defaultDataDir, + LogDir: defaultLogDir, + DbType: defaultDbType, + RPCKey: defaultRPCKeyFile, + RPCCert: defaultRPCCertFile, + MinRelayTxFee: mempool.DefaultMinRelayTxFee.ToBTC(), + FreeTxRelayLimit: defaultFreeTxRelayLimit, + BlockMinSize: defaultBlockMinSize, + BlockMaxSize: defaultBlockMaxSize, + BlockPrioritySize: mempool.DefaultBlockPrioritySize, + MaxOrphanTxs: defaultMaxOrphanTransactions, + SigCacheMaxSize: defaultSigCacheMaxSize, + Generate: defaultGenerate, + TxIndex: defaultTxIndex, + AddrIndex: defaultAddrIndex, } // Service options which are only added on Windows. @@ -634,6 +637,15 @@ func loadConfig() (*config, []string, error) { } } + if cfg.RPCMaxConcurrentReqs < 0 { + str := "%s: The rpcmaxwebsocketconcurrentrequests option may " + + "not be less than 0 -- parsed [%d]" + err := fmt.Errorf(str, funcName, cfg.RPCMaxConcurrentReqs) + fmt.Fprintln(os.Stderr, err) + fmt.Fprintln(os.Stderr, usageMessage) + return nil, nil, err + } + // Validate the the minrelaytxfee. cfg.minRelayTxFee, err = btcutil.NewAmount(cfg.MinRelayTxFee) if err != nil { diff --git a/rpcwebsocket.go b/rpcwebsocket.go index 95fcfe65..76cbe84c 100644 --- a/rpcwebsocket.go +++ b/rpcwebsocket.go @@ -39,6 +39,15 @@ const ( websocketSendBufferSize = 50 ) +type semaphore chan struct{} + +func makeSemaphore(n int) semaphore { + return make(chan struct{}, n) +} + +func (s semaphore) acquire() { s <- struct{}{} } +func (s semaphore) release() { <-s } + // timeZeroVal is simply the zero value for a time.Time and is used to avoid // creating multiple instances. var timeZeroVal time.Time @@ -65,14 +74,6 @@ var wsHandlersBeforeInit = map[string]wsCommandHandler{ "rescan": handleRescan, } -// wsAsyncHandlers holds the websocket commands which should be run -// asynchronously to the main input handler goroutine. This allows long-running -// operations to run concurrently (and one at a time) while still responding -// to the majority of normal requests which can be answered quickly. -var wsAsyncHandlers = map[string]struct{}{ - "rescan": {}, -} - // WebsocketHandler handles a new websocket client by creating a new wsClient, // starting it, and blocking until the connection closes. Since it blocks, it // must be run in a separate goroutine. It should be invoked from the websocket @@ -898,178 +899,11 @@ type wsClient struct { spentRequests map[wire.OutPoint]struct{} // Networking infrastructure. - asyncStarted bool - asyncChan chan *parsedRPCCmd - ntfnChan chan []byte - sendChan chan wsResponse - quit chan struct{} - wg sync.WaitGroup -} - -// handleMessage is the main handler for incoming requests. It enforces -// authentication, parses the incoming json, looks up and executes handlers -// (including pass through for standard RPC commands), and sends the appropriate -// response. It also detects commands which are marked as long-running and -// sends them off to the asyncHander for processing. -func (c *wsClient) handleMessage(msg []byte) { - if !c.authenticated { - // Disconnect immediately if the provided command fails to - // parse when the client is not already authenticated. - var request btcjson.Request - if err := json.Unmarshal(msg, &request); err != nil { - c.Disconnect() - return - } - parsedCmd := parseCmd(&request) - if parsedCmd.err != nil { - c.Disconnect() - return - } - - // Disconnect immediately if the first command is not - // authenticate when not already authenticated. - authCmd, ok := parsedCmd.cmd.(*btcjson.AuthenticateCmd) - if !ok { - rpcsLog.Warnf("Unauthenticated websocket message " + - "received") - c.Disconnect() - return - } - - // Check credentials. - login := authCmd.Username + ":" + authCmd.Passphrase - auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login)) - authSha := fastsha256.Sum256([]byte(auth)) - cmp := subtle.ConstantTimeCompare(authSha[:], c.server.authsha[:]) - limitcmp := subtle.ConstantTimeCompare(authSha[:], c.server.limitauthsha[:]) - if cmp != 1 && limitcmp != 1 { - rpcsLog.Warnf("Auth failure.") - c.Disconnect() - return - } - c.authenticated = true - c.isAdmin = cmp == 1 - - // Marshal and send response. - reply, err := createMarshalledReply(parsedCmd.id, nil, nil) - if err != nil { - rpcsLog.Errorf("Failed to marshal authenticate reply: "+ - "%v", err.Error()) - return - } - c.SendMessage(reply, nil) - return - } - - // Attempt to parse the raw message into a JSON-RPC request. - var request btcjson.Request - if err := json.Unmarshal(msg, &request); err != nil { - jsonErr := &btcjson.RPCError{ - Code: btcjson.ErrRPCParse.Code, - Message: "Failed to parse request: " + err.Error(), - } - - // Marshal and send response. - reply, err := createMarshalledReply(nil, nil, jsonErr) - if err != nil { - rpcsLog.Errorf("Failed to marshal parse failure "+ - "reply: %v", err) - return - } - c.SendMessage(reply, nil) - return - } - // Requests with no ID (notifications) must not have a response per the - // JSON-RPC spec. - if request.ID == nil { - return - } - - // Check if the user is limited and disconnect client if unauthorized - if !c.isAdmin { - if _, ok := rpcLimited[request.Method]; !ok { - jsonErr := &btcjson.RPCError{ - Code: btcjson.ErrRPCInvalidParams.Code, - Message: "limited user not authorized for this method", - } - // Marshal and send response. - reply, err := createMarshalledReply(request.ID, nil, jsonErr) - if err != nil { - rpcsLog.Errorf("Failed to marshal parse failure "+ - "reply: %v", err) - return - } - c.SendMessage(reply, nil) - return - } - } - - // Attempt to parse the JSON-RPC request into a known concrete command. - cmd := parseCmd(&request) - if cmd.err != nil { - // Marshal and send response. - reply, err := createMarshalledReply(cmd.id, nil, cmd.err) - if err != nil { - rpcsLog.Errorf("Failed to marshal parse failure "+ - "reply: %v", err) - return - } - c.SendMessage(reply, nil) - return - } - rpcsLog.Debugf("Received command <%s> from %s", cmd.method, c.addr) - - // Disconnect if already authenticated and another authenticate command - // is received. - if _, ok := cmd.cmd.(*btcjson.AuthenticateCmd); ok { - rpcsLog.Warnf("Websocket client %s is already authenticated", - c.addr) - c.Disconnect() - return - } - - // When the command is marked as a long-running command, send it off - // to the asyncHander goroutine for processing. - if _, ok := wsAsyncHandlers[cmd.method]; ok { - // Start up the async goroutine for handling long-running - // requests asynchonrously if needed. - if !c.asyncStarted { - rpcsLog.Tracef("Starting async handler for %s", c.addr) - c.wg.Add(1) - go c.asyncHandler() - c.asyncStarted = true - } - c.asyncChan <- cmd - return - } - - // Lookup the websocket extension for the command and if it doesn't - // exist fallback to handling the command as a standard command. - wsHandler, ok := wsHandlers[cmd.method] - if !ok { - // No websocket-specific handler so handle like a legacy - // RPC connection. - result, jsonErr := c.server.standardCmdResult(cmd, nil) - reply, err := createMarshalledReply(cmd.id, result, jsonErr) - if err != nil { - rpcsLog.Errorf("Failed to marshal reply for <%s> "+ - "command: %v", cmd.method, err) - return - } - - c.SendMessage(reply, nil) - return - } - - // Invoke the handler and marshal and send response. - result, jsonErr := wsHandler(c, cmd.cmd) - reply, err := createMarshalledReply(cmd.id, result, jsonErr) - if err != nil { - rpcsLog.Errorf("Failed to marshal reply for <%s> command: %v", - cmd.method, err) - return - } - c.SendMessage(reply, nil) + serviceRequestSem semaphore + ntfnChan chan []byte + sendChan chan wsResponse + quit chan struct{} + wg sync.WaitGroup } // inHandler handles all incoming messages for the websocket connection. It @@ -1094,7 +928,138 @@ out: } break out } - c.handleMessage(msg) + + var request btcjson.Request + err = json.Unmarshal(msg, &request) + if err != nil { + if !c.authenticated { + break out + } + + jsonErr := &btcjson.RPCError{ + Code: btcjson.ErrRPCParse.Code, + Message: "Failed to parse request: " + err.Error(), + } + reply, err := createMarshalledReply(nil, nil, jsonErr) + if err != nil { + rpcsLog.Errorf("Failed to marshal parse failure "+ + "reply: %v", err) + continue + } + c.SendMessage(reply, nil) + continue + } + + // Requests with no ID (notifications) must not have a response per the + // JSON-RPC spec. + if request.ID == nil { + if !c.authenticated { + break out + } + continue + } + + cmd := parseCmd(&request) + if cmd.err != nil { + if !c.authenticated { + break out + } + + reply, err := createMarshalledReply(cmd.id, nil, cmd.err) + if err != nil { + rpcsLog.Errorf("Failed to marshal parse failure "+ + "reply: %v", err) + continue + } + c.SendMessage(reply, nil) + continue + } + rpcsLog.Debugf("Received command <%s> from %s", cmd.method, c.addr) + + // Check auth. The client is immediately disconnected if the + // first request of an unauthentiated websocket client is not + // the authenticate request, an authenticate request is received + // when the client is already authenticated, or incorrect + // authentication credentials are provided in the request. + switch authCmd, ok := cmd.cmd.(*btcjson.AuthenticateCmd); { + case c.authenticated && ok: + rpcsLog.Warnf("Websocket client %s is already authenticated", + c.addr) + break out + case !c.authenticated && !ok: + rpcsLog.Warnf("Unauthenticated websocket message " + + "received") + break out + case !c.authenticated: + // Check credentials. + login := authCmd.Username + ":" + authCmd.Passphrase + auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login)) + authSha := fastsha256.Sum256([]byte(auth)) + cmp := subtle.ConstantTimeCompare(authSha[:], c.server.authsha[:]) + limitcmp := subtle.ConstantTimeCompare(authSha[:], c.server.limitauthsha[:]) + if cmp != 1 && limitcmp != 1 { + rpcsLog.Warnf("Auth failure.") + break out + } + c.authenticated = true + c.isAdmin = cmp == 1 + + // Marshal and send response. + reply, err := createMarshalledReply(cmd.id, nil, nil) + if err != nil { + rpcsLog.Errorf("Failed to marshal authenticate reply: "+ + "%v", err.Error()) + continue + } + c.SendMessage(reply, nil) + continue + } + + // Check if the client is using limited RPC credentials and + // error when not authorized to call this RPC. + if !c.isAdmin { + if _, ok := rpcLimited[request.Method]; !ok { + jsonErr := &btcjson.RPCError{ + Code: btcjson.ErrRPCInvalidParams.Code, + Message: "limited user not authorized for this method", + } + // Marshal and send response. + reply, err := createMarshalledReply(request.ID, nil, jsonErr) + if err != nil { + rpcsLog.Errorf("Failed to marshal parse failure "+ + "reply: %v", err) + continue + } + c.SendMessage(reply, nil) + continue + } + } + + // Asynchronously handle the request. A semaphore is used to + // limit the number of concurrent requests currently being + // serviced. If the semaphore can not be acquired, simply wait + // until a request finished before reading the next RPC request + // from the websocket client. + // + // This could be a little fancier by timing out and erroring + // when it takes too long to service the request, but if that is + // done, the read of the next request should not be blocked by + // this semaphore, otherwise the next request will be read and + // will probably sit here for another few seconds before timing + // out as well. This will cause the total timeout duration for + // later requests to be much longer than the check here would + // imply. + // + // If a timeout is added, the semaphore acquiring should be + // moved inside of the new goroutine with a select statement + // that also reads a time.After channel. This will unblock the + // read of the next request from the websocket client and allow + // many requests to be waited on concurrently. + c.serviceRequestSem.acquire() + go func() { + c.serviceRequest(cmd) + c.serviceRequestSem.release() + }() } // Ensure the connection is closed. @@ -1103,6 +1068,32 @@ out: rpcsLog.Tracef("Websocket client input handler done for %s", c.addr) } +// serviceRequest services a parsed RPC request by looking up and executing the +// appropiate RPC handler. The response is marshalled and sent to the websocket +// client. +func (c *wsClient) serviceRequest(r *parsedRPCCmd) { + var ( + result interface{} + err error + ) + + // Lookup the websocket extension for the command and if it doesn't + // exist fallback to handling the command as a standard command. + wsHandler, ok := wsHandlers[r.method] + if ok { + result, err = wsHandler(c, r.cmd) + } else { + result, err = c.server.standardCmdResult(r, nil) + } + reply, err := createMarshalledReply(r.id, result, err) + if err != nil { + rpcsLog.Errorf("Failed to marshal reply for <%s> "+ + "command: %v", r.method, err) + return + } + c.SendMessage(reply, nil) +} + // notificationQueueHandler handles the queuing of outgoing notifications for // the websocket client. This runs as a muxer for various sources of input to // ensure that queuing up notifications to be sent will not block. Otherwise, @@ -1218,96 +1209,6 @@ cleanup: rpcsLog.Tracef("Websocket client output handler done for %s", c.addr) } -// asyncHandler handles all long-running requests such as rescans which are -// not run directly in the inHandler routine unlike most requests. This allows -// normal quick requests to continue to be processed and responded to even while -// lengthy operations are underway. Only one long-running operation is -// permitted at a time, so multiple long-running requests are queued and -// serialized. It must be run as a goroutine. Also, this goroutine is not -// started until/if the first long-running request is made. -func (c *wsClient) asyncHandler() { - asyncHandlerDoneChan := make(chan struct{}, 1) // nonblocking sync - pendingCmds := list.New() - waiting := false - - // runHandler runs the handler for the passed command and sends the - // reply. - runHandler := func(parsedCmd *parsedRPCCmd) { - wsHandler, ok := wsHandlers[parsedCmd.method] - if !ok { - rpcsLog.Warnf("No handler for command <%s>", - parsedCmd.method) - return - } - - // Invoke the handler and marshal and send response. - result, jsonErr := wsHandler(c, parsedCmd.cmd) - reply, err := createMarshalledReply(parsedCmd.id, result, - jsonErr) - if err != nil { - rpcsLog.Errorf("Failed to marshal reply for <%s> "+ - "command: %v", parsedCmd.method, err) - return - } - c.SendMessage(reply, nil) - } - -out: - for { - select { - case cmd := <-c.asyncChan: - if !waiting { - c.wg.Add(1) - go func(cmd *parsedRPCCmd) { - runHandler(cmd) - asyncHandlerDoneChan <- struct{}{} - c.wg.Done() - }(cmd) - } else { - pendingCmds.PushBack(cmd) - } - waiting = true - - case <-asyncHandlerDoneChan: - // No longer waiting if there are no more messages in - // the pending messages queue. - next := pendingCmds.Front() - if next == nil { - waiting = false - continue - } - - // Notify the outHandler about the next item to - // asynchronously send. - element := pendingCmds.Remove(next) - c.wg.Add(1) - go func(cmd *parsedRPCCmd) { - runHandler(cmd) - asyncHandlerDoneChan <- struct{}{} - c.wg.Done() - }(element.(*parsedRPCCmd)) - - case <-c.quit: - break out - } - } - - // Drain any wait channels before exiting so nothing is left waiting - // around to send. -cleanup: - for { - select { - case <-c.asyncChan: - case <-asyncHandlerDoneChan: - default: - break cleanup - } - } - - c.wg.Done() - rpcsLog.Tracef("Websocket client async handler done for %s", c.addr) -} - // SendMessage sends the passed json to the websocket client. It is backed // by a buffered channel, so it will not block until the send channel is full. // Note however that QueueNotification must be used for sending async @@ -1406,18 +1307,18 @@ func newWebsocketClient(server *rpcServer, conn *websocket.Conn, } client := &wsClient{ - conn: conn, - addr: remoteAddr, - authenticated: authenticated, - isAdmin: isAdmin, - sessionID: sessionID, - server: server, - addrRequests: make(map[string]struct{}), - spentRequests: make(map[wire.OutPoint]struct{}), - ntfnChan: make(chan []byte, 1), // nonblocking sync - asyncChan: make(chan *parsedRPCCmd, 1), // nonblocking sync - sendChan: make(chan wsResponse, websocketSendBufferSize), - quit: make(chan struct{}), + conn: conn, + addr: remoteAddr, + authenticated: authenticated, + isAdmin: isAdmin, + sessionID: sessionID, + server: server, + addrRequests: make(map[string]struct{}), + spentRequests: make(map[wire.OutPoint]struct{}), + serviceRequestSem: makeSemaphore(cfg.RPCMaxConcurrentReqs), + ntfnChan: make(chan []byte, 1), // nonblocking sync + sendChan: make(chan wsResponse, websocketSendBufferSize), + quit: make(chan struct{}), } return client, nil }