Begin using btcws.

This change adds support for the unmarshaling custom commands sent by
btcwallet and supported in the btcws package.
This commit is contained in:
Josh Rickmar 2013-11-06 11:20:36 -05:00
parent b72f0c6474
commit 8c2b9ae06e

View file

@ -18,6 +18,7 @@ import (
"github.com/conformal/btcscript" "github.com/conformal/btcscript"
"github.com/conformal/btcutil" "github.com/conformal/btcutil"
"github.com/conformal/btcwire" "github.com/conformal/btcwire"
"github.com/conformal/btcws"
"math/big" "math/big"
"net" "net"
"net/http" "net/http"
@ -304,8 +305,8 @@ func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) {
log.Debugf(msg) log.Debugf(msg)
} }
// TODO(jrick): Remove the wallet notification chan.
type commandHandler func(*rpcServer, btcjson.Cmd, chan []byte) (interface{}, error) type commandHandler func(*rpcServer, btcjson.Cmd, chan []byte) (interface{}, error)
type wsCommandHandler func(*rpcServer, *btcjson.Message, chan []byte, chan *btcjson.Reply) error
var handlers = map[string]commandHandler{ var handlers = map[string]commandHandler{
"addnode": handleAddNode, "addnode": handleAddNode,
@ -326,11 +327,13 @@ var handlers = map[string]commandHandler{
"stop": handleStop, "stop": handleStop,
} }
type wsCommandHandler func(*rpcServer, btcjson.Cmd, chan []byte) error
var wsHandlers = map[string]wsCommandHandler{ var wsHandlers = map[string]wsCommandHandler{
"getcurrentnet": handleGetCurrentNet, "getcurrentnet": handleGetCurrentNet,
"getbestblock": handleGetBestBlock, "getbestblock": handleGetBestBlock,
"rescan": handleRescan, "rescan": handleRescan,
"notifynewtxs": handleNotifyNewTxs, "notifynewtxs": handleNotifyNewTXs,
"notifyspent": handleNotifySpent, "notifyspent": handleNotifySpent,
} }
@ -734,8 +737,11 @@ func jsonRead(body []byte, s *rpcServer, walletNotification chan []byte) (reply
// handleGetCurrentNet implements the getcurrentnet command extension // handleGetCurrentNet implements the getcurrentnet command extension
// for websocket connections. // for websocket connections.
func handleGetCurrentNet(s *rpcServer, message *btcjson.Message, func handleGetCurrentNet(s *rpcServer, cmd btcjson.Cmd,
walletNotification chan []byte, replychan chan *btcjson.Reply) error { walletNotification chan []byte) error {
id := cmd.Id()
reply := &btcjson.Reply{Id: &id}
var net btcwire.BitcoinNet var net btcwire.BitcoinNet
if cfg.TestNet3 { if cfg.TestNet3 {
@ -744,130 +750,60 @@ func handleGetCurrentNet(s *rpcServer, message *btcjson.Message,
net = btcwire.MainNet net = btcwire.MainNet
} }
rawReply := &btcjson.Reply{ reply.Result = float64(net)
Result: float64(net), mreply, _ := json.Marshal(reply)
Id: &message.Id, walletNotification <- mreply
}
replychan <- rawReply
return nil return nil
} }
// handleGetBestBlock implements the getbestblock command extension // handleGetBestBlock implements the getbestblock command extension
// for websocket connections. // for websocket connections.
func handleGetBestBlock(s *rpcServer, message *btcjson.Message, func handleGetBestBlock(s *rpcServer, cmd btcjson.Cmd,
walletNotification chan []byte, replychan chan *btcjson.Reply) error { walletNotification chan []byte) error {
id := cmd.Id()
reply := &btcjson.Reply{Id: &id}
// All other "get block" commands give either the height, the // All other "get block" commands give either the height, the
// hash, or both but require the block SHA. This gets both for // hash, or both but require the block SHA. This gets both for
// the best block. // the best block.
sha, height, err := s.server.db.NewestSha() sha, height, err := s.server.db.NewestSha()
if err != nil { if err != nil {
log.Errorf("RPCS: Error getting newest block: %v", err) return btcjson.ErrBestBlockHash
rawReply := &btcjson.Reply{
Result: nil,
Error: &btcjson.ErrBestBlockHash,
Id: &message.Id,
} }
replychan <- rawReply
return err reply.Result = map[string]interface{}{
}
rawReply := &btcjson.Reply{
Result: map[string]interface{}{
"hash": sha.String(), "hash": sha.String(),
"height": height, "height": height,
},
Id: &message.Id,
} }
replychan <- rawReply mreply, _ := json.Marshal(reply)
walletNotification <- mreply
return nil return nil
} }
// handleRescan implements the rescan command extension for websocket // handleRescan implements the rescan command extension for websocket
// connections. // connections.
func handleRescan(s *rpcServer, message *btcjson.Message, func handleRescan(s *rpcServer, cmd btcjson.Cmd,
walletNotification chan []byte, replychan chan *btcjson.Reply) error { walletNotification chan []byte) error {
minblock, maxblock := int64(0), btcdb.AllShas id := cmd.Id()
params, ok := message.Params.([]interface{}) reply := &btcjson.Reply{Id: &id}
if !ok || len(params) < 2 {
rawReply := &btcjson.Reply{ rescanCmd, ok := cmd.(*btcws.RescanCmd)
Result: nil,
Error: &btcjson.ErrInvalidParams,
Id: &message.Id,
}
replychan <- rawReply
return ErrBadParamsField
}
fminblock, ok := params[0].(float64)
if !ok { if !ok {
rawReply := &btcjson.Reply{ return btcjson.ErrInternal
Result: nil,
Error: &btcjson.ErrInvalidParams,
Id: &message.Id,
}
replychan <- rawReply
return ErrBadParamsField
}
minblock = int64(fminblock)
iaddrs, ok := params[1].([]interface{})
if !ok {
rawReply := &btcjson.Reply{
Result: nil,
Error: &btcjson.ErrInvalidParams,
Id: &message.Id,
}
replychan <- rawReply
return ErrBadParamsField
}
// addrHashes holds a set of string-ified address hashes.
addrHashes := make(map[string]bool, len(iaddrs))
for i := range iaddrs {
addr, ok := iaddrs[i].(string)
if !ok {
rawReply := &btcjson.Reply{
Result: nil,
Error: &btcjson.ErrInvalidParams,
Id: &message.Id,
}
replychan <- rawReply
return ErrBadParamsField
}
addrhash, _, err := btcutil.DecodeAddress(addr)
if err != nil {
rawReply := &btcjson.Reply{
Result: nil,
Error: &btcjson.ErrInvalidParams,
Id: &message.Id,
}
replychan <- rawReply
return ErrBadParamsField
}
addrHashes[string(addrhash)] = true
}
if len(params) > 2 {
fmaxblock, ok := params[2].(float64)
if !ok {
rawReply := &btcjson.Reply{
Result: nil,
Error: &btcjson.ErrInvalidParams,
Id: &message.Id,
}
replychan <- rawReply
return ErrBadParamsField
}
maxblock = int64(fmaxblock)
} }
log.Debugf("RPCS: Begining rescan") log.Debugf("RPCS: Begining rescan")
minblock := int64(rescanCmd.BeginBlock)
maxblock := int64(rescanCmd.EndBlock)
// FetchHeightRange may not return a complete list of block shas for // FetchHeightRange may not return a complete list of block shas for
// the given range, so fetch range as many times as necessary. // the given range, so fetch range as many times as necessary.
for { for {
blkshalist, err := s.server.db.FetchHeightRange(minblock, maxblock) blkshalist, err := s.server.db.FetchHeightRange(minblock,
maxblock)
if err != nil { if err != nil {
return err return err
} }
@ -899,9 +835,8 @@ func handleRescan(s *rpcServer, message *btcjson.Message,
log.Errorf("Error encoding address: %v", err) log.Errorf("Error encoding address: %v", err)
return err return err
} }
if ok := addrHashes[string(txaddrhash)]; ok { if _, ok := rescanCmd.Addresses[txaddr]; ok {
reply := &btcjson.Reply{ reply.Result = struct {
Result: struct {
Sender string `json:"sender"` Sender string `json:"sender"`
Receiver string `json:"receiver"` Receiver string `json:"receiver"`
BlockHash string `json:"blockhash"` BlockHash string `json:"blockhash"`
@ -921,11 +856,9 @@ func handleRescan(s *rpcServer, message *btcjson.Message,
Amount: txout.Value, Amount: txout.Value,
PkScript: btcutil.Base58Encode(txout.PkScript), PkScript: btcutil.Base58Encode(txout.PkScript),
Spent: txReply.TxSpent[txOutIdx], Spent: txReply.TxSpent[txOutIdx],
},
Error: nil,
Id: &message.Id,
} }
replychan <- reply mreply, _ := json.Marshal(reply)
walletNotification <- mreply
} }
} }
} }
@ -938,175 +871,118 @@ func handleRescan(s *rpcServer, message *btcjson.Message,
} }
} }
rawReply := &btcjson.Reply{ mreply, _ := json.Marshal(reply)
Result: nil, walletNotification <- mreply
Error: nil,
Id: &message.Id,
}
replychan <- rawReply
log.Debug("RPCS: Finished rescan") log.Debug("RPCS: Finished rescan")
return nil return nil
} }
// handleNotifyNewTxs implements the notifynewtxs command extension for // handleNotifyNewTXs implements the notifynewtxs command extension for
// websocket connections. // websocket connections.
func handleNotifyNewTxs(s *rpcServer, message *btcjson.Message, func handleNotifyNewTXs(s *rpcServer, cmd btcjson.Cmd,
walletNotification chan []byte, replychan chan *btcjson.Reply) error { walletNotification chan []byte) error {
params, ok := message.Params.([]interface{}) id := cmd.Id()
if !ok || len(params) != 1 { reply := &btcjson.Reply{Id: &id}
rawReply := &btcjson.Reply{
Result: nil, notifyCmd, ok := cmd.(*btcws.NotifyNewTXsCmd)
Error: &btcjson.ErrInvalidParams,
Id: &message.Id,
}
replychan <- rawReply
return ErrBadParamsField
}
addr, ok := params[0].(string)
if !ok { if !ok {
rawReply := &btcjson.Reply{ return btcjson.ErrInternal
Result: nil,
Error: &btcjson.ErrInvalidParams,
Id: &message.Id,
} }
replychan <- rawReply
return ErrBadParamsField
}
addrhash, _, err := btcutil.DecodeAddress(addr)
if err != nil {
jsonError := btcjson.Error{
Code: btcjson.ErrInvalidParams.Code,
Message: "Cannot decode address",
}
rawReply := &btcjson.Reply{
Result: nil,
Error: &jsonError,
Id: &message.Id,
}
replychan <- rawReply
return ErrBadParamsField
}
s.ws.requests.AddTxRequest(walletNotification, string(addrhash), message.Id)
rawReply := &btcjson.Reply{ for _, addr := range notifyCmd.Addresses {
Result: nil, hash, _, err := btcutil.DecodeAddress(addr)
Error: nil, if err != nil {
Id: &message.Id, return fmt.Errorf("cannot decode address: %v", err)
} }
replychan <- rawReply s.ws.requests.AddTxRequest(walletNotification,
string(hash[:]), id)
}
mreply, _ := json.Marshal(reply)
walletNotification <- mreply
return nil return nil
} }
// handleNotifySpent implements the notifyspent command extension for // handleNotifySpent implements the notifyspent command extension for
// websocket connections. // websocket connections.
func handleNotifySpent(s *rpcServer, message *btcjson.Message, func handleNotifySpent(s *rpcServer, cmd btcjson.Cmd,
walletNotification chan []byte, replychan chan *btcjson.Reply) error { walletNotification chan []byte) error {
params, ok := message.Params.([]interface{}) id := cmd.Id()
if !ok || len(params) != 2 { reply := &btcjson.Reply{Id: &id}
rawReply := &btcjson.Reply{
Result: nil,
Error: &btcjson.ErrInvalidParams,
Id: &message.Id,
}
replychan <- rawReply
return ErrBadParamsField
}
hashBE, ok1 := params[0].(string)
index, ok2 := params[1].(float64)
if !ok1 || !ok2 {
rawReply := &btcjson.Reply{
Result: nil,
Error: &btcjson.ErrInvalidParams,
Id: &message.Id,
}
replychan <- rawReply
return ErrBadParamsField
}
hash, err := btcwire.NewShaHashFromStr(hashBE)
if err != nil {
jsonError := btcjson.Error{
Code: btcjson.ErrInvalidParams.Code,
Message: "Hash string cannot be parsed.",
}
rawReply := &btcjson.Reply{
Result: nil,
Error: &jsonError,
Id: &message.Id,
}
replychan <- rawReply
return ErrBadParamsField
}
op := btcwire.NewOutPoint(hash, uint32(index))
s.ws.requests.AddSpentRequest(walletNotification, op, message.Id)
rawReply := &btcjson.Reply{ notifyCmd, ok := cmd.(*btcws.NotifySpentCmd)
Result: nil, if !ok {
Error: nil, return btcjson.ErrInternal
Id: &message.Id,
} }
replychan <- rawReply
s.ws.requests.AddSpentRequest(walletNotification, notifyCmd.OutPoint, id)
mreply, _ := json.Marshal(reply)
walletNotification <- mreply
return nil return nil
} }
func jsonWSRead(walletNotification chan []byte, replychan chan *btcjson.Reply, body []byte, s *rpcServer) error { func jsonWSRead(body []byte, s *rpcServer, walletNotification chan []byte) error {
var message btcjson.Message var reply btcjson.Reply
err := json.Unmarshal(body, &message)
cmd, err := btcjson.ParseMarshaledCmd(body)
if err != nil { if err != nil {
reply := btcjson.Reply{ if cmd != nil {
Result: nil, // Unmarshaling a valid JSON-RPC message succeeded. Use
Error: &btcjson.ErrParse, // the provided id for errors.
Id: nil, id := cmd.Id()
reply.Id = &id
} }
log.Tracef("RPCS: reply: %v", reply) jsonErr, ok := err.(btcjson.Error)
replychan <- &reply
return fmt.Errorf("RPCS: Error unmarshalling json message: %v", err)
}
log.Tracef("RPCS: received: %v", message)
defer func() {
close(replychan)
}()
wsHandler, ok := wsHandlers[message.Method]
if !ok { if !ok {
rawReply := &btcjson.Reply{ jsonErr = btcjson.Error{
Result: nil, Code: btcjson.ErrMisc.Code,
Error: &btcjson.ErrMethodNotFound, Message: err.Error(),
Id: &message.Id,
} }
replychan <- rawReply }
reply.Error = &jsonErr
mreply, _ := json.Marshal(reply)
walletNotification <- mreply
return err
}
id := cmd.Id()
reply.Id = &id
wsHandler, ok := wsHandlers[cmd.Method()]
if !ok {
reply.Error = &btcjson.ErrMethodNotFound
mreply, _ := json.Marshal(reply)
walletNotification <- mreply
return btcjson.ErrMethodNotFound return btcjson.ErrMethodNotFound
} }
if err := wsHandler(s, &message, walletNotification, replychan); err != nil { if err := wsHandler(s, cmd, walletNotification); err != nil {
if jsonErr, ok := err.(btcjson.Error); ok { jsonErr, ok := err.(btcjson.Error)
rawReply := &btcjson.Reply{ if ok {
Error: &jsonErr, reply.Error = &jsonErr
Id: &message.Id, mreply, _ := json.Marshal(reply)
walletNotification <- mreply
return errors.New(jsonErr.Message)
} }
replychan <- rawReply
err = errors.New(jsonErr.Message)
} else {
// In the case where we did not have a btcjson // In the case where we did not have a btcjson
// error to begin with, make a new one to send, // error to begin with, make a new one to send,
// but this really should not happen. // but this really should not happen.
rawJSONError := btcjson.Error{ jsonErr = btcjson.Error{
Code: btcjson.ErrInternal.Code, Code: btcjson.ErrInternal.Code,
Message: err.Error(), Message: err.Error(),
} }
rawReply := &btcjson.Reply{ reply.Error = &jsonErr
Error: &rawJSONError, mreply, _ := json.Marshal(reply)
Id: &message.Id, walletNotification <- mreply
}
replychan <- rawReply
}
}
return err return err
}
return nil
} }
// getDifficultyRatio returns the proof-of-work difficulty as a multiple of the // getDifficultyRatio returns the proof-of-work difficulty as a multiple of the
@ -1237,33 +1113,8 @@ func (s *rpcServer) websocketJSONHandler(walletNotification chan []byte, msg []b
} }
// Try websocket extensions // Try websocket extensions
replychan := make(chan *btcjson.Reply)
go func() {
for {
select {
case reply, ok := <-replychan:
if !ok {
// no more replies expected.
return
}
if reply == nil {
continue
}
log.Tracef("[RPCS] reply: %v", *reply)
replyBytes, err := json.Marshal(reply)
if err != nil {
log.Errorf("RPCS: Error Marshalling reply: %v", err)
return
}
walletNotification <- replyBytes
case <-s.quit:
return
}
}
}()
s.wg.Add(1) s.wg.Add(1)
err = jsonWSRead(walletNotification, replychan, msg, s) err = jsonWSRead(msg, s, walletNotification)
s.wg.Done() s.wg.Done()
} }