From 8c2b9ae06e122a3077b1b11c784a2599fc9f4364 Mon Sep 17 00:00:00 2001 From: Josh Rickmar Date: Wed, 6 Nov 2013 11:20:36 -0500 Subject: [PATCH] Begin using btcws. This change adds support for the unmarshaling custom commands sent by btcwallet and supported in the btcws package. --- rpcserver.go | 427 +++++++++++++++++---------------------------------- 1 file changed, 139 insertions(+), 288 deletions(-) diff --git a/rpcserver.go b/rpcserver.go index 6c08b838..f4dd1fae 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -18,6 +18,7 @@ import ( "github.com/conformal/btcscript" "github.com/conformal/btcutil" "github.com/conformal/btcwire" + "github.com/conformal/btcws" "math/big" "net" "net/http" @@ -304,8 +305,8 @@ func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) { log.Debugf(msg) } +// TODO(jrick): Remove the wallet notification chan. type commandHandler func(*rpcServer, btcjson.Cmd, chan []byte) (interface{}, error) -type wsCommandHandler func(*rpcServer, *btcjson.Message, chan []byte, chan *btcjson.Reply) error var handlers = map[string]commandHandler{ "addnode": handleAddNode, @@ -326,11 +327,13 @@ var handlers = map[string]commandHandler{ "stop": handleStop, } +type wsCommandHandler func(*rpcServer, btcjson.Cmd, chan []byte) error + var wsHandlers = map[string]wsCommandHandler{ "getcurrentnet": handleGetCurrentNet, "getbestblock": handleGetBestBlock, "rescan": handleRescan, - "notifynewtxs": handleNotifyNewTxs, + "notifynewtxs": handleNotifyNewTXs, "notifyspent": handleNotifySpent, } @@ -734,8 +737,11 @@ func jsonRead(body []byte, s *rpcServer, walletNotification chan []byte) (reply // handleGetCurrentNet implements the getcurrentnet command extension // for websocket connections. -func handleGetCurrentNet(s *rpcServer, message *btcjson.Message, - walletNotification chan []byte, replychan chan *btcjson.Reply) error { +func handleGetCurrentNet(s *rpcServer, cmd btcjson.Cmd, + walletNotification chan []byte) error { + + id := cmd.Id() + reply := &btcjson.Reply{Id: &id} var net btcwire.BitcoinNet if cfg.TestNet3 { @@ -744,130 +750,60 @@ func handleGetCurrentNet(s *rpcServer, message *btcjson.Message, net = btcwire.MainNet } - rawReply := &btcjson.Reply{ - Result: float64(net), - Id: &message.Id, - } - replychan <- rawReply + reply.Result = float64(net) + mreply, _ := json.Marshal(reply) + walletNotification <- mreply return nil } // handleGetBestBlock implements the getbestblock command extension // for websocket connections. -func handleGetBestBlock(s *rpcServer, message *btcjson.Message, - walletNotification chan []byte, replychan chan *btcjson.Reply) error { +func handleGetBestBlock(s *rpcServer, cmd btcjson.Cmd, + walletNotification chan []byte) error { + + id := cmd.Id() + reply := &btcjson.Reply{Id: &id} // All other "get block" commands give either the height, the // hash, or both but require the block SHA. This gets both for // the best block. sha, height, err := s.server.db.NewestSha() if err != nil { - log.Errorf("RPCS: Error getting newest block: %v", err) - rawReply := &btcjson.Reply{ - Result: nil, - Error: &btcjson.ErrBestBlockHash, - Id: &message.Id, - } - replychan <- rawReply - return err + return btcjson.ErrBestBlockHash } - rawReply := &btcjson.Reply{ - Result: map[string]interface{}{ - "hash": sha.String(), - "height": height, - }, - Id: &message.Id, + + reply.Result = map[string]interface{}{ + "hash": sha.String(), + "height": height, } - replychan <- rawReply + mreply, _ := json.Marshal(reply) + walletNotification <- mreply return nil } // handleRescan implements the rescan command extension for websocket // connections. -func handleRescan(s *rpcServer, message *btcjson.Message, - walletNotification chan []byte, replychan chan *btcjson.Reply) error { +func handleRescan(s *rpcServer, cmd btcjson.Cmd, + walletNotification chan []byte) error { - minblock, maxblock := int64(0), btcdb.AllShas - params, ok := message.Params.([]interface{}) - if !ok || len(params) < 2 { - rawReply := &btcjson.Reply{ - Result: nil, - Error: &btcjson.ErrInvalidParams, - Id: &message.Id, - } - replychan <- rawReply - return ErrBadParamsField - } - fminblock, ok := params[0].(float64) + id := cmd.Id() + reply := &btcjson.Reply{Id: &id} + + rescanCmd, ok := cmd.(*btcws.RescanCmd) if !ok { - rawReply := &btcjson.Reply{ - Result: nil, - Error: &btcjson.ErrInvalidParams, - Id: &message.Id, - } - replychan <- rawReply - return ErrBadParamsField - } - minblock = int64(fminblock) - iaddrs, ok := params[1].([]interface{}) - if !ok { - rawReply := &btcjson.Reply{ - Result: nil, - Error: &btcjson.ErrInvalidParams, - Id: &message.Id, - } - replychan <- rawReply - return ErrBadParamsField - } - - // addrHashes holds a set of string-ified address hashes. - addrHashes := make(map[string]bool, len(iaddrs)) - for i := range iaddrs { - addr, ok := iaddrs[i].(string) - if !ok { - rawReply := &btcjson.Reply{ - Result: nil, - Error: &btcjson.ErrInvalidParams, - Id: &message.Id, - } - replychan <- rawReply - return ErrBadParamsField - } - - addrhash, _, err := btcutil.DecodeAddress(addr) - if err != nil { - rawReply := &btcjson.Reply{ - Result: nil, - Error: &btcjson.ErrInvalidParams, - Id: &message.Id, - } - replychan <- rawReply - return ErrBadParamsField - } - - addrHashes[string(addrhash)] = true - } - - if len(params) > 2 { - fmaxblock, ok := params[2].(float64) - if !ok { - rawReply := &btcjson.Reply{ - Result: nil, - Error: &btcjson.ErrInvalidParams, - Id: &message.Id, - } - replychan <- rawReply - return ErrBadParamsField - } - maxblock = int64(fmaxblock) + return btcjson.ErrInternal } log.Debugf("RPCS: Begining rescan") + minblock := int64(rescanCmd.BeginBlock) + maxblock := int64(rescanCmd.EndBlock) + // FetchHeightRange may not return a complete list of block shas for // the given range, so fetch range as many times as necessary. for { - blkshalist, err := s.server.db.FetchHeightRange(minblock, maxblock) + blkshalist, err := s.server.db.FetchHeightRange(minblock, + maxblock) if err != nil { return err } @@ -899,33 +835,30 @@ func handleRescan(s *rpcServer, message *btcjson.Message, log.Errorf("Error encoding address: %v", err) return err } - if ok := addrHashes[string(txaddrhash)]; ok { - reply := &btcjson.Reply{ - Result: struct { - Sender string `json:"sender"` - Receiver string `json:"receiver"` - BlockHash string `json:"blockhash"` - Height int64 `json:"height"` - TxHash string `json:"txhash"` - Index uint32 `json:"index"` - Amount int64 `json:"amount"` - PkScript string `json:"pkscript"` - Spent bool `json:"spent"` - }{ - Sender: "Unknown", // TODO(jrick) - Receiver: txaddr, - BlockHash: blkshalist[i].String(), - Height: blk.Height(), - TxHash: txReply.Sha.String(), - Index: uint32(txOutIdx), - Amount: txout.Value, - PkScript: btcutil.Base58Encode(txout.PkScript), - Spent: txReply.TxSpent[txOutIdx], - }, - Error: nil, - Id: &message.Id, + if _, ok := rescanCmd.Addresses[txaddr]; ok { + reply.Result = struct { + Sender string `json:"sender"` + Receiver string `json:"receiver"` + BlockHash string `json:"blockhash"` + Height int64 `json:"height"` + TxHash string `json:"txhash"` + Index uint32 `json:"index"` + Amount int64 `json:"amount"` + PkScript string `json:"pkscript"` + Spent bool `json:"spent"` + }{ + Sender: "Unknown", // TODO(jrick) + Receiver: txaddr, + BlockHash: blkshalist[i].String(), + Height: blk.Height(), + TxHash: txReply.Sha.String(), + Index: uint32(txOutIdx), + Amount: txout.Value, + PkScript: btcutil.Base58Encode(txout.PkScript), + Spent: txReply.TxSpent[txOutIdx], } - replychan <- reply + mreply, _ := json.Marshal(reply) + walletNotification <- mreply } } } @@ -938,175 +871,118 @@ func handleRescan(s *rpcServer, message *btcjson.Message, } } - rawReply := &btcjson.Reply{ - Result: nil, - Error: nil, - Id: &message.Id, - } - replychan <- rawReply + mreply, _ := json.Marshal(reply) + walletNotification <- mreply log.Debug("RPCS: Finished rescan") return nil } -// handleNotifyNewTxs implements the notifynewtxs command extension for +// handleNotifyNewTXs implements the notifynewtxs command extension for // websocket connections. -func handleNotifyNewTxs(s *rpcServer, message *btcjson.Message, - walletNotification chan []byte, replychan chan *btcjson.Reply) error { +func handleNotifyNewTXs(s *rpcServer, cmd btcjson.Cmd, + walletNotification chan []byte) error { - params, ok := message.Params.([]interface{}) - if !ok || len(params) != 1 { - rawReply := &btcjson.Reply{ - Result: nil, - Error: &btcjson.ErrInvalidParams, - Id: &message.Id, - } - replychan <- rawReply - return ErrBadParamsField - } - addr, ok := params[0].(string) + id := cmd.Id() + reply := &btcjson.Reply{Id: &id} + + notifyCmd, ok := cmd.(*btcws.NotifyNewTXsCmd) if !ok { - rawReply := &btcjson.Reply{ - Result: nil, - Error: &btcjson.ErrInvalidParams, - Id: &message.Id, - } - replychan <- rawReply - return ErrBadParamsField + return btcjson.ErrInternal } - addrhash, _, err := btcutil.DecodeAddress(addr) - if err != nil { - jsonError := btcjson.Error{ - Code: btcjson.ErrInvalidParams.Code, - Message: "Cannot decode address", - } - rawReply := &btcjson.Reply{ - Result: nil, - Error: &jsonError, - Id: &message.Id, - } - replychan <- rawReply - return ErrBadParamsField - } - s.ws.requests.AddTxRequest(walletNotification, string(addrhash), message.Id) - rawReply := &btcjson.Reply{ - Result: nil, - Error: nil, - Id: &message.Id, + for _, addr := range notifyCmd.Addresses { + hash, _, err := btcutil.DecodeAddress(addr) + if err != nil { + return fmt.Errorf("cannot decode address: %v", err) + } + s.ws.requests.AddTxRequest(walletNotification, + string(hash[:]), id) } - replychan <- rawReply + + mreply, _ := json.Marshal(reply) + walletNotification <- mreply return nil } // handleNotifySpent implements the notifyspent command extension for // websocket connections. -func handleNotifySpent(s *rpcServer, message *btcjson.Message, - walletNotification chan []byte, replychan chan *btcjson.Reply) error { +func handleNotifySpent(s *rpcServer, cmd btcjson.Cmd, + walletNotification chan []byte) error { - params, ok := message.Params.([]interface{}) - if !ok || len(params) != 2 { - rawReply := &btcjson.Reply{ - Result: nil, - Error: &btcjson.ErrInvalidParams, - Id: &message.Id, - } - replychan <- rawReply - return ErrBadParamsField - } - hashBE, ok1 := params[0].(string) - index, ok2 := params[1].(float64) - if !ok1 || !ok2 { - rawReply := &btcjson.Reply{ - Result: nil, - Error: &btcjson.ErrInvalidParams, - Id: &message.Id, - } - replychan <- rawReply - return ErrBadParamsField - } - hash, err := btcwire.NewShaHashFromStr(hashBE) - if err != nil { - jsonError := btcjson.Error{ - Code: btcjson.ErrInvalidParams.Code, - Message: "Hash string cannot be parsed.", - } - rawReply := &btcjson.Reply{ - Result: nil, - Error: &jsonError, - Id: &message.Id, - } - replychan <- rawReply - return ErrBadParamsField - } - op := btcwire.NewOutPoint(hash, uint32(index)) - s.ws.requests.AddSpentRequest(walletNotification, op, message.Id) + id := cmd.Id() + reply := &btcjson.Reply{Id: &id} - rawReply := &btcjson.Reply{ - Result: nil, - Error: nil, - Id: &message.Id, + notifyCmd, ok := cmd.(*btcws.NotifySpentCmd) + if !ok { + return btcjson.ErrInternal } - replychan <- rawReply + + s.ws.requests.AddSpentRequest(walletNotification, notifyCmd.OutPoint, id) + + mreply, _ := json.Marshal(reply) + walletNotification <- mreply return nil } -func jsonWSRead(walletNotification chan []byte, replychan chan *btcjson.Reply, body []byte, s *rpcServer) error { - var message btcjson.Message - err := json.Unmarshal(body, &message) +func jsonWSRead(body []byte, s *rpcServer, walletNotification chan []byte) error { + var reply btcjson.Reply + + cmd, err := btcjson.ParseMarshaledCmd(body) if err != nil { - reply := btcjson.Reply{ - Result: nil, - Error: &btcjson.ErrParse, - Id: nil, + if cmd != nil { + // Unmarshaling a valid JSON-RPC message succeeded. Use + // the provided id for errors. + id := cmd.Id() + reply.Id = &id } - log.Tracef("RPCS: reply: %v", reply) - - replychan <- &reply - return fmt.Errorf("RPCS: Error unmarshalling json message: %v", err) + jsonErr, ok := err.(btcjson.Error) + if !ok { + jsonErr = btcjson.Error{ + Code: btcjson.ErrMisc.Code, + Message: err.Error(), + } + } + reply.Error = &jsonErr + mreply, _ := json.Marshal(reply) + walletNotification <- mreply + return err } - log.Tracef("RPCS: received: %v", message) - defer func() { - close(replychan) - }() + id := cmd.Id() + reply.Id = &id - wsHandler, ok := wsHandlers[message.Method] + wsHandler, ok := wsHandlers[cmd.Method()] if !ok { - rawReply := &btcjson.Reply{ - Result: nil, - Error: &btcjson.ErrMethodNotFound, - Id: &message.Id, - } - replychan <- rawReply + reply.Error = &btcjson.ErrMethodNotFound + mreply, _ := json.Marshal(reply) + walletNotification <- mreply return btcjson.ErrMethodNotFound } - if err := wsHandler(s, &message, walletNotification, replychan); err != nil { - if jsonErr, ok := err.(btcjson.Error); ok { - rawReply := &btcjson.Reply{ - Error: &jsonErr, - Id: &message.Id, - } - replychan <- rawReply - err = errors.New(jsonErr.Message) - } else { - // In the case where we did not have a btcjson - // error to begin with, make a new one to send, - // but this really should not happen. - rawJSONError := btcjson.Error{ - Code: btcjson.ErrInternal.Code, - Message: err.Error(), - } - rawReply := &btcjson.Reply{ - Error: &rawJSONError, - Id: &message.Id, - } - replychan <- rawReply + if err := wsHandler(s, cmd, walletNotification); err != nil { + jsonErr, ok := err.(btcjson.Error) + if ok { + reply.Error = &jsonErr + mreply, _ := json.Marshal(reply) + walletNotification <- mreply + return errors.New(jsonErr.Message) } + + // In the case where we did not have a btcjson + // error to begin with, make a new one to send, + // but this really should not happen. + jsonErr = btcjson.Error{ + Code: btcjson.ErrInternal.Code, + Message: err.Error(), + } + reply.Error = &jsonErr + mreply, _ := json.Marshal(reply) + walletNotification <- mreply + return err } - return err + return nil } // getDifficultyRatio returns the proof-of-work difficulty as a multiple of the @@ -1237,33 +1113,8 @@ func (s *rpcServer) websocketJSONHandler(walletNotification chan []byte, msg []b } // Try websocket extensions - replychan := make(chan *btcjson.Reply) - go func() { - for { - select { - case reply, ok := <-replychan: - if !ok { - // no more replies expected. - return - } - if reply == nil { - continue - } - log.Tracef("[RPCS] reply: %v", *reply) - replyBytes, err := json.Marshal(reply) - if err != nil { - log.Errorf("RPCS: Error Marshalling reply: %v", err) - return - } - walletNotification <- replyBytes - - case <-s.quit: - return - } - } - }() s.wg.Add(1) - err = jsonWSRead(walletNotification, replychan, msg, s) + err = jsonWSRead(msg, s, walletNotification) s.wg.Done() }