diff --git a/blockmanager.go b/blockmanager.go index e26a39d0..9f182f82 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -998,8 +998,8 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) { // Notify frontends if r := b.server.rpcServer; r != nil { go func() { - r.NotifyBlockTXs(b.server.db, block) - r.NotifyBlockConnected(block) + r.ntfnMgr.NotifyBlockTXs(block) + r.ntfnMgr.NotifyBlockConnected(block) }() } @@ -1025,7 +1025,7 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) { // Notify frontends if r := b.server.rpcServer; r != nil { - go r.NotifyBlockDisconnected(block) + go r.ntfnMgr.NotifyBlockDisconnected(block) } } } diff --git a/mempool.go b/mempool.go index ea5bbd81..4988b0cd 100644 --- a/mempool.go +++ b/mempool.go @@ -876,13 +876,15 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isOrphan *bool, isNe txmpLog.Debugf("Accepted transaction %v (pool size: %v)", txHash, len(mp.pool)) - // Notify wallets of mempool transactions to wallet addresses. + // Notify websocket clients about mempool transactions. if mp.server.rpcServer != nil { - mp.server.rpcServer.NotifyForTxOuts(tx, nil) + go func() { + mp.server.rpcServer.ntfnMgr.NotifyForTxOuts(tx, nil) - if isNew { - mp.server.rpcServer.NotifyForNewTx(tx) - } + if isNew { + mp.server.rpcServer.ntfnMgr.NotifyForNewTx(tx) + } + }() } return nil diff --git a/rpcserver.go b/rpcserver.go index 42f885d7..9a45fd93 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -144,7 +144,7 @@ type rpcServer struct { shutdown int32 server *server authsha [sha256.Size]byte - ws *wsContext + ntfnMgr *wsNotificationManager numClients int numClientsMutex sync.Mutex wg sync.WaitGroup @@ -184,9 +184,9 @@ func (s *rpcServer) Start() { return } jsonRPCRead(w, r, s) - }) + // Websocket endpoint. rpcServeMux.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { authenticated, err := s.checkAuth(r, false) if err != nil { @@ -195,7 +195,7 @@ func (s *rpcServer) Start() { } wsServer := websocket.Server{ Handler: websocket.Handler(func(ws *websocket.Conn) { - s.walletReqsNotifications(ws, authenticated) + s.WebsocketHandler(ws, r.RemoteAddr, authenticated) }), } wsServer.ServeHTTP(w, r) @@ -296,6 +296,7 @@ func (s *rpcServer) Stop() error { return err } } + s.ntfnMgr.Shutdown() close(s.quit) s.wg.Wait() rpcsLog.Infof("RPC server shutdown complete") @@ -333,9 +334,9 @@ func newRPCServer(listenAddrs []string, s *server) (*rpcServer, error) { rpc := rpcServer{ authsha: sha256.Sum256([]byte(auth)), server: s, - ws: newWebsocketContext(), quit: make(chan int), } + rpc.ntfnMgr = newWsNotificationManager(&rpc) // check for existence of cert file and key file if !fileExists(cfg.RPCKey) && !fileExists(cfg.RPCCert) { diff --git a/rpcwebsocket.go b/rpcwebsocket.go index a04c5ed0..f6d9af20 100644 --- a/rpcwebsocket.go +++ b/rpcwebsocket.go @@ -13,7 +13,6 @@ import ( "encoding/base64" "encoding/hex" "encoding/json" - "errors" "fmt" "github.com/conformal/btcdb" "github.com/conformal/btcjson" @@ -21,332 +20,1145 @@ import ( "github.com/conformal/btcutil" "github.com/conformal/btcwire" "github.com/conformal/btcws" + "io" "sync" "time" ) +const ( + // TODO(davec): This should be a config option. + maxWebsocketClients = 10 + + // websocketSendBufferSize is the number of elements the send channel + // can queue before blocking. Note that this only applies to requests + // handled directly in the websocket client input handler or the async + // handler since notifications have their own queueing mechanism + // independent of the send channel buffer. + websocketSendBufferSize = 50 +) + +// timeZeroVal is simply the zero value for a time.Time and is used to avoid +// creating multiple instances. var timeZeroVal time.Time -// ErrBadAuth describes an error that was the result of invalid credentials. -var ErrBadAuth = errors.New("invalid credentials") - -type ntfnChan chan btcjson.Cmd - -type handlerChans struct { - n ntfnChan // channel to send notifications - disconnected <-chan struct{} // closed when a client has disconnected. -} - -type wsCommandHandler func(*rpcServer, btcjson.Cmd, handlerChans) (interface{}, *btcjson.Error) +// wsCommandHandler describes a callback function used to handle a specific +// command. +type wsCommandHandler func(*wsClient, btcjson.Cmd) (interface{}, *btcjson.Error) // wsHandlers maps RPC command strings to appropriate websocket handler // functions. var wsHandlers = map[string]wsCommandHandler{ - "getcurrentnet": handleGetCurrentNet, - "getbestblock": handleGetBestBlock, - "notifyblocks": handleNotifyBlocks, - "notifynewtxs": handleNotifyNewTXs, - "notifyallnewtxs": handleNotifyAllNewTXs, - "notifyspent": handleNotifySpent, - "rescan": handleRescan, - "sendrawtransaction": handleWalletSendRawTransaction, + "getbestblock": handleGetBestBlock, + "getcurrentnet": handleGetCurrentNet, + "notifyblocks": handleNotifyBlocks, + "notifyallnewtxs": handleNotifyAllNewTXs, + "notifynewtxs": handleNotifyNewTXs, + "notifyspent": handleNotifySpent, + "rescan": handleRescan, } -// wsContext holds the items the RPC server needs to handle websocket -// connections for wallets. -type wsContext struct { - sync.RWMutex - - // connections holds a map of requests for each wallet using the - // wallet channel as the key. - connections map[ntfnChan]*requestContexts - - // Map of address hash to list of notificationCtx. This is the global - // list we actually use for notifications, we also keep a list in the - // requestContexts to make removal from this list on connection close - // less horrendously expensive. - txNotifications map[string]*list.List - - // Map of outpoint to list of notificationCtx. - spentNotifications map[btcwire.OutPoint]*list.List - - // Map of shas to list of notificationCtx. - minedTxNotifications map[btcwire.ShaHash]*list.List +// wsAsyncHandlers holds the websocket commands which should be run +// asynchronously to the main input handler goroutine. This allows long-running +// operations to run concurrently (and one at a time) while still responding +// to the majority of normal requests which can be answered quickly. +var wsAsyncHandlers = map[string]bool{ + "rescan": true, } -// AddBlockUpdateRequest adds the request context to mark a wallet as -// having requested updates for connected and disconnected blocks. -func (r *wsContext) AddBlockUpdateRequest(n ntfnChan) { - r.Lock() - defer r.Unlock() +// WebsocketHandler handles a new websocket client by creating a new wsClient, +// starting it, and blocking until the connection closes. Since it blocks, it +// must be run in a separate goroutine. It should be invoked from the websocket +// server handler which runs each new connection in a new goroutine thereby +// satisfying the requirement. +func (s *rpcServer) WebsocketHandler(conn *websocket.Conn, remoteAddr string, + authenticated bool) { - rc, ok := r.connections[n] - if !ok { + // Clear the read deadline that was set before the websocket hijacked + // the connection. + conn.SetReadDeadline(timeZeroVal) + + // Limit max number of websocket clients. + rpcsLog.Infof("New websocket client %s", remoteAddr) + if s.ntfnMgr.NumClients()+1 > maxWebsocketClients { + rpcsLog.Infof("Max websocket clients exceeded [%d] - "+ + "disconnecting client %s", maxWebsocketClients, + remoteAddr) + conn.Close() return } - rc.blockUpdates = true + + // Create a new websocket client to handle the new websocket connection + // and wait for it to shutdown. Once it has shutdown (and hence + // disconnected), remove it and any notifications it registered for. + client := newWebsocketClient(s, conn, remoteAddr, authenticated) + s.ntfnMgr.AddClient(client) + client.Start() + client.WaitForShutdown() + s.ntfnMgr.RemoveClient(client) + rpcsLog.Infof("Disconnected websocket client %s", remoteAddr) } -// AddAllNewTxRequest adds the request context to mark a wallet as -// having requested updates for all new transactions. -func (r *wsContext) AddAllNewTxRequest(n ntfnChan, verbose bool) { - r.Lock() - defer r.Unlock() +// wsNotificationManager is a connection and notification manager used for +// websockets. It allows websocket clients to register for notifications they +// are interested in. When an event happens elsewhere in the code such as +// transactions being added to the memory pool or block connects/disconnects, +// the notification manager is provided with the relevant details needed to +// figure out which websocket clients need to be notified based on what they +// have registered for and notifies them accordingly. It is also used to keep +// track of all connected websocket clients. +type wsNotificationManager struct { + sync.Mutex - rc, ok := r.connections[n] - if !ok { + // server is the RPC server the notification manager is associated with. + server *rpcServer + + // clients is a map of all currently connected websocket clients. + clients map[chan bool]*wsClient + + // Maps used to hold lists of websocket clients to be notified on + // certain events. Each websocket client also keeps maps for the events + // which have multiple triggers to make removal from these lists on + // connection close less horrendously expensive. + blockNotifications map[chan bool]*wsClient + txNotifications map[chan bool]*wsClient + spentNotifications map[btcwire.OutPoint]map[chan bool]*wsClient + addrNotifications map[string]map[chan bool]*wsClient +} + +// NumClients returns the number of clients actively being served. +// +// This function is safe for concurrent access. +func (m *wsNotificationManager) NumClients() int { + m.Lock() + defer m.Unlock() + + return len(m.clients) +} + +// AddBlockUpdateRequest requests block update notifications to the passed +// websocket client. +// +// This function is safe for concurrent access. +func (m *wsNotificationManager) AddBlockUpdateRequest(wsc *wsClient) { + m.Lock() + defer m.Unlock() + + // Add the client to the map to notify when block updates are seen. + // Use the quit channel as a unique id for the client since it is quite + // a bit more efficient than using the entire struct. + m.blockNotifications[wsc.quit] = wsc +} + +// RemoveBlockUpdateRequest removes block update notifications for the passed +// websocket client. +// +// This function is safe for concurrent access. +func (m *wsNotificationManager) RemoveBlockUpdateRequest(wsc *wsClient) { + m.Lock() + defer m.Unlock() + + // Delete the client from the map to notify when block updates are seen. + // Use the quit channel as a unique id for the client since it is quite + // a bit more efficient than using the entire struct. + delete(m.blockNotifications, wsc.quit) +} + +// NotifyBlockConnected notifies websocket clients that have registered for +// block updates when a block is connected to the main chain. +// +// This function is safe for concurrent access. +func (m *wsNotificationManager) NotifyBlockConnected(block *btcutil.Block) { + m.Lock() + defer m.Unlock() + + // Nothing to do if there are no websocket clients registered to + // receive notifications that result from a newly connected block. + if len(m.blockNotifications) == 0 { return } - rc.allTxUpdates = true - rc.verboseTxUpdates = verbose -} -// AddTxRequest adds the request context for new transaction notifications. -func (r *wsContext) AddTxRequest(n ntfnChan, addr string) { - r.Lock() - defer r.Unlock() - - rc, ok := r.connections[n] - if !ok { + hash, err := block.Sha() + if err != nil { + rpcsLog.Error("Bad block; connected block notification dropped") return } - rc.txRequests[addr] = struct{}{} - clist, ok := r.txNotifications[addr] - if !ok { - clist = list.New() - r.txNotifications[addr] = clist + // Notify interested websocket clients about the connected block. + ntfn := btcws.NewBlockConnectedNtfn(hash.String(), int32(block.Height())) + marshalledJSON, err := json.Marshal(ntfn) + if err != nil { + rpcsLog.Error("Failed to marshal block connected notification: "+ + "%v", err) + return + } + for _, wsc := range m.blockNotifications { + wsc.QueueNotification(marshalledJSON) } - - clist.PushBack(n) } -func (r *wsContext) removeGlobalTxRequest(n ntfnChan, addr string) { - clist := r.txNotifications[addr] - var enext *list.Element - for e := clist.Front(); e != nil; e = enext { - enext = e.Next() - c := e.Value.(ntfnChan) - if c == n { - clist.Remove(e) - break +// NotifyBlockDisconnected notifies websocket clients that have registered for +// block updates when a block is disconnected from the main chain (due to a +// reorganize). +// +// This function is safe for concurrent access. +func (m *wsNotificationManager) NotifyBlockDisconnected(block *btcutil.Block) { + m.Lock() + defer m.Unlock() + + // Nothing to do if there are no websocket clients registered to + // receive notifications that result from a newly connected block. + if len(m.blockNotifications) == 0 { + return + } + + hash, err := block.Sha() + if err != nil { + rpcsLog.Error("Bad block; disconnected block notification " + + "dropped") + return + } + + // Notify interested websocket clients about the disconnected block. + ntfn := btcws.NewBlockDisconnectedNtfn(hash.String(), + int32(block.Height())) + marshalledJSON, err := json.Marshal(ntfn) + if err != nil { + rpcsLog.Error("Failed to marshal block disconnected "+ + "notification: %v", err) + return + } + for _, wsc := range m.blockNotifications { + wsc.QueueNotification(marshalledJSON) + } +} + +// AddNewTxRequest requests notifications to the passed websocket client when +// new transactions are added to the memory pool. +// +// This function is safe for concurrent access. +func (m *wsNotificationManager) AddNewTxRequest(wsc *wsClient) { + m.Lock() + defer m.Unlock() + + // Add the client to the map to notify when a new transaction is added + // to the memory pool. Use the quit channel as a unique id for the + // client since it is quite a bit more efficient than using the entire + // struct. + m.txNotifications[wsc.quit] = wsc +} + +// RemoveNewTxRequest removes notifications to the passed websocket client when +// new transaction are added to the memory pool. +// +// This function is safe for concurrent access. +func (m *wsNotificationManager) RemoveNewTxRequest(wsc *wsClient) { + m.Lock() + defer m.Unlock() + + // Delete the client from the map to notify when a new transaction is + // seen in the memory pool. Use the quit channel as a unique id for the + // client since it is quite a bit more efficient than using the entire + // struct. + delete(m.txNotifications, wsc.quit) +} + +// NotifyForNewTx notifies websocket clients that have registerd for updates +// when a new transaction is added to the memory pool. +// +// This function is safe for concurrent access. +func (m *wsNotificationManager) NotifyForNewTx(tx *btcutil.Tx) { + m.Lock() + defer m.Unlock() + + // Nothing to do if there are no websocket clients registered to + // receive notifications about transactions added to the memory pool. + if len(m.txNotifications) == 0 { + return + } + + txID := tx.Sha().String() + mtx := tx.MsgTx() + + var amount int64 + for _, txOut := range mtx.TxOut { + amount += txOut.Value + } + + ntfn := btcws.NewAllTxNtfn(txID, amount) + marshalledJSON, err := json.Marshal(ntfn) + if err != nil { + rpcsLog.Errorf("Failed to marshal tx notification: %s", err.Error()) + return + } + + var verboseNtfn *btcws.AllVerboseTxNtfn + var marshalledJSONVerbose []byte + for _, wsc := range m.txNotifications { + if wsc.verboseTxUpdates { + if verboseNtfn == nil { + rawTx, err := createTxRawResult(m.server.server.btcnet, txID, mtx, nil, 0, nil) + if err != nil { + return + } + verboseNtfn = btcws.NewAllVerboseTxNtfn(rawTx) + marshalledJSONVerbose, err = json.Marshal(verboseNtfn) + if err != nil { + rpcsLog.Errorf("Failed to marshal verbose tx notification: %s", err.Error()) + return + } + + } + wsc.QueueNotification(marshalledJSONVerbose) + } else { + wsc.QueueNotification(marshalledJSON) + } + } +} + +// AddSpentRequest requests an notification when the passed outpoint is +// confirmed spent (contained in a block connected to the main chain) for the +// passed websocket client. The request is automatically removed once the +// notification has been sent. +// +// This function is safe for concurrent access. +func (m *wsNotificationManager) AddSpentRequest(wsc *wsClient, op *btcwire.OutPoint) { + m.Lock() + defer m.Unlock() + + // Track the request in the client as well so it can be quickly be + // removed on disconnect. + wsc.spentRequests[*op] = struct{}{} + + // Add the client to the list to notify when the outpoint is seen. + // Create the list as needed. + cmap, ok := m.spentNotifications[*op] + if !ok { + cmap = make(map[chan bool]*wsClient) + m.spentNotifications[*op] = cmap + } + cmap[wsc.quit] = wsc +} + +// removeSpentRequest is the internal function which implements the public +// RemoveSpentRequest. See the comment for RemoveSpentRequest for more details. +// +// This function MUST be called with the notification manager lock held. +func (m *wsNotificationManager) removeSpentRequest(wsc *wsClient, op *btcwire.OutPoint) { + notifyMap, ok := m.spentNotifications[*op] + if !ok { + rpcsLog.Warnf("Attempt to remove nonexistent spent request "+ + "for websocket client %s", wsc.addr) + return + } + delete(notifyMap, wsc.quit) + + // Remove the map entry altogether if there are no more clients + // interested in it. + if len(notifyMap) == 0 { + delete(m.spentNotifications, *op) + } +} + +// RemoveSpentRequest removes a request from the passed websocket client to be +// notified when the passed outpoint is confirmed spent (contained in a block +// connected to the main chain). +// +// This function is safe for concurrent access. +func (m *wsNotificationManager) RemoveSpentRequest(wsc *wsClient, op *btcwire.OutPoint) { + m.Lock() + defer m.Unlock() + + m.removeSpentRequest(wsc, op) +} + +// notifyForTxOuts is the internal function which implements the public +// NotifyForTxOuts. See the comment for NotifyForTxOuts for more details. +// +// This function MUST be called with the notification manager lock held. +func (m *wsNotificationManager) notifyForTxOuts(tx *btcutil.Tx, block *btcutil.Block) { + // Nothing to do if nobody is listening for address notifications. + if len(m.addrNotifications) == 0 { + return + } + + for i, txout := range tx.MsgTx().TxOut { + _, addrs, _, err := btcscript.ExtractPkScriptAddrs( + txout.PkScript, m.server.server.btcnet) + if err != nil { + continue + } + + for _, addr := range addrs { + encodedAddr := addr.EncodeAddress() + cmap, ok := m.addrNotifications[encodedAddr] + if !ok { + continue + } + + ntfn := &btcws.ProcessedTxNtfn{ + Receiver: encodedAddr, + TxID: tx.Sha().String(), + TxOutIndex: uint32(i), + Amount: txout.Value, + PkScript: hex.EncodeToString(txout.PkScript), + // TODO(jrick): hardcoding unspent is WRONG and needs + // to be either calculated from other block txs, or dropped. + Spent: false, + } + + if block != nil { + blkhash, err := block.Sha() + if err != nil { + rpcsLog.Error("Error getting block sha; dropping Tx notification") + break + } + ntfn.BlockHeight = int32(block.Height()) + ntfn.BlockHash = blkhash.String() + ntfn.BlockIndex = tx.Index() + ntfn.BlockTime = block.MsgBlock().Header.Timestamp.Unix() + } else { + ntfn.BlockHeight = -1 + ntfn.BlockIndex = -1 + } + + marshalledJSON, err := json.Marshal(ntfn) + if err != nil { + rpcsLog.Errorf("Failed to marshal processedtx notification: %v", err) + } + + for _, wsc := range cmap { + wsc.QueueNotification(marshalledJSON) + } + } + } +} + +// NotifyForTxOuts examines the outputs of the passed transaction and sends a +// notification to any websocket clients that are interested in an address the +// transaction pays to. +// +// This function is safe for concurrent access. +func (m *wsNotificationManager) NotifyForTxOuts(tx *btcutil.Tx, block *btcutil.Block) { + m.Lock() + defer m.Unlock() + + m.notifyForTxOuts(tx, block) +} + +// newSpentNotification returns a new marshalled spent notification with the +// passed parameters. +func newSpentNotification(prevOut *btcwire.OutPoint, spender *btcutil.Tx) []byte { + // Ignore Serialize's error, as writing to a bytes.buffer cannot fail. + var serializedTx bytes.Buffer + spender.MsgTx().Serialize(&serializedTx) + txHex := hex.EncodeToString(serializedTx.Bytes()) + + // Create and marsh the notification. + ntfn := btcws.NewTxSpentNtfn(prevOut.Hash.String(), int(prevOut.Index), + txHex) + marshalledJSON, err := json.Marshal(ntfn) + if err != nil { + rpcsLog.Errorf("Failed to marshal spent notification: %v", err) + return nil + } + return marshalledJSON +} + +// notifySpent examines the inputs of the passed transaction and sends +// interested websocket clients a notification. +// +// This function MUST be called with the notification manager lock held. +func (m *wsNotificationManager) notifySpent(tx *btcutil.Tx) { + // Nothing to do if nobody is listening for spent notifications. + if len(m.spentNotifications) == 0 { + return + } + + for _, txIn := range tx.MsgTx().TxIn { + prevOut := &txIn.PreviousOutpoint + if cmap, ok := m.spentNotifications[*prevOut]; ok { + marshalledJSON := newSpentNotification(prevOut, tx) + if marshalledJSON == nil { + continue + } + for _, wsc := range cmap { + wsc.QueueNotification(marshalledJSON) + m.removeSpentRequest(wsc, prevOut) + } + } + } +} + +// NotifyBlockTXs examines the input and outputs of the passed transaction +// and sends websocket clients notifications they are interested in. +// +// This function is safe for concurrent access. +func (m *wsNotificationManager) NotifyBlockTXs(block *btcutil.Block) { + m.Lock() + defer m.Unlock() + + // Nothing to do if there are no websocket clients registered to receive + // notifications about spent outpoints or payments to addresses. + if len(m.spentNotifications) == 0 && len(m.addrNotifications) == 0 { + return + } + + for _, tx := range block.Transactions() { + m.notifySpent(tx) + m.notifyForTxOuts(tx, block) + } +} + +// AddAddrRequest requests notifications to the passed websocket client when +// a transaction pays to the passed address. +// +// This function is safe for concurrent access. +func (m *wsNotificationManager) AddAddrRequest(wsc *wsClient, addr string) { + m.Lock() + defer m.Unlock() + + // Track the request in the client as well so it can be quickly be + // removed on disconnect. + wsc.addrRequests[addr] = struct{}{} + + // Add the client to the list to notify when the outpoint is seen. + // Create the list as needed. + cmap, ok := m.addrNotifications[addr] + if !ok { + cmap = make(map[chan bool]*wsClient) + m.addrNotifications[addr] = cmap + } + cmap[wsc.quit] = wsc +} + +// removeAddrRequest is the internal function which implements the public +// RemoveAddrRequest. See the comment for RemoveAddrRequest for more details. +// +// This function MUST be called with the notification manager lock held. +func (m *wsNotificationManager) removeAddrRequest(wsc *wsClient, addr string) { + notifyMap, ok := m.addrNotifications[addr] + if !ok { + rpcsLog.Warnf("Attempt to remove nonexistent addr request "+ + "<%s> for websocket client %s", addr, wsc.addr) + return + } + + delete(notifyMap, wsc.quit) + + // Remove the map entry altogether if there are no more clients + // interested in it. + if len(notifyMap) == 0 { + delete(m.addrNotifications, addr) + } +} + +// RemoveAddrRequest removes a request from the passed websocket client to be +// notified when a transaction pays to the passed address. +// +// This function is safe for concurrent access. +func (m *wsNotificationManager) RemoveAddrRequest(wsc *wsClient, addr string) { + m.Lock() + defer m.Unlock() + + m.removeAddrRequest(wsc, addr) +} + +// AddClient adds the passed websocket client to the notification manager. +// +// This function is safe for concurrent access. +func (m *wsNotificationManager) AddClient(wsc *wsClient) { + m.Lock() + defer m.Unlock() + + m.clients[wsc.quit] = wsc +} + +// RemoveClient removes the passed websocket client and all notifications +// registered for it. +// +// This function is safe for concurrent access. +func (m *wsNotificationManager) RemoveClient(wsc *wsClient) { + m.Lock() + defer m.Unlock() + + // Remove any requests made by the client as well as the client itself. + delete(m.blockNotifications, wsc.quit) + delete(m.txNotifications, wsc.quit) + for k := range wsc.spentRequests { + op := k + m.removeSpentRequest(wsc, &op) + } + for addr := range wsc.addrRequests { + m.removeAddrRequest(wsc, addr) + } + delete(m.clients, wsc.quit) +} + +// Shutdown disconnects all websocket clients the manager knows about. +func (m *wsNotificationManager) Shutdown() { + for _, wsc := range m.clients { + wsc.Disconnect() + } +} + +// newWsNotificationManager returns a new notification manager ready for use. +// See wsNotificationManager for more details. +func newWsNotificationManager(server *rpcServer) *wsNotificationManager { + return &wsNotificationManager{ + server: server, + clients: make(map[chan bool]*wsClient), + blockNotifications: make(map[chan bool]*wsClient), + txNotifications: make(map[chan bool]*wsClient), + spentNotifications: make(map[btcwire.OutPoint]map[chan bool]*wsClient), + addrNotifications: make(map[string]map[chan bool]*wsClient), + } +} + +// wsResponse houses a message to send to the a connected websocket client as +// well as a channel to reply on when the message is sent. +type wsResponse struct { + msg []byte + doneChan chan bool +} + +// createMarshalledReply returns a new marshalled btcjson.Reply given the +// passed parameters. It will automatically convert errors that are not of +// the type *btcjson.Error to the appropriate type as needed. +func createMarshalledReply(id, result interface{}, replyErr error) ([]byte, error) { + var jsonErr *btcjson.Error + if replyErr != nil { + if jErr, ok := replyErr.(*btcjson.Error); !ok { + jsonErr = &btcjson.Error{ + Code: btcjson.ErrInternal.Code, + Message: jErr.Error(), + } } } - if clist.Len() == 0 { - delete(r.txNotifications, addr) + response := btcjson.Reply{ + Id: &id, + Result: result, + Error: jsonErr, } + + marshalledJSON, err := json.Marshal(response) + if err != nil { + return nil, err + } + + return marshalledJSON, nil } -// AddSpentRequest adds a request context for notifications of a spent -// Outpoint. -func (r *wsContext) AddSpentRequest(n ntfnChan, op *btcwire.OutPoint) { - r.Lock() - defer r.Unlock() +// wsClient provides an abstraction for handling a websocket client. The +// overall data flow is split into 3 main goroutines, a possible 4th goroutine +// for long-running operations (only started if request is made), and a +// websocket manager which is used to allow things such as broadcasting +// requested notifications to all connected websocket clients. Inbound +// messages are read via the inHandler goroutine and generally dispatched to +// their own handler. However, certain potentially long-running operations such +// as rescans, are sent to the asyncHander goroutine and are limited to one at a +// time. There are two outbound message types - one for responding to client +// requests and another for async notifications. Responses to client requests +// use SendMessage which employs a buffered channel thereby limiting the number +// of outstanding requests that can be made. Notifications are sent via +// QueueNotification which implements a queue via notificationQueueHandler to +// ensure sending notifications from other subsystems can't block. Ultimately, +// all messages are sent via the outHandler. +type wsClient struct { + // server is the RPC server that is servicing the client. + server *rpcServer - rc, ok := r.connections[n] - if !ok { - return - } - rc.spentRequests[*op] = struct{}{} + // conn is the underlying websocket connection. + conn *websocket.Conn - clist, ok := r.spentNotifications[*op] - if !ok { - clist = list.New() - r.spentNotifications[*op] = clist - } - clist.PushBack(n) -} - -func (r *wsContext) removeGlobalSpentRequest(n ntfnChan, op *btcwire.OutPoint) { - clist := r.spentNotifications[*op] - var enext *list.Element - for e := clist.Front(); e != nil; e = enext { - enext = e.Next() - c := e.Value.(ntfnChan) - if c == n { - clist.Remove(e) - break - } - } - - if clist.Len() == 0 { - delete(r.spentNotifications, *op) - } -} - -// removeSpentRequest removes a request context for notifications of a -// spent Outpoint without grabbing any locks. -func (r *wsContext) removeSpentRequest(n ntfnChan, op *btcwire.OutPoint) { - r.removeGlobalSpentRequest(n, op) - rc, ok := r.connections[n] - if !ok { - return - } - delete(rc.spentRequests, *op) -} - -// AddMinedTxRequest adds request contexts for notifications of a -// mined transaction. -func (r *wsContext) AddMinedTxRequest(n ntfnChan, txID *btcwire.ShaHash) { - r.Lock() - defer r.Unlock() - - rc, ok := r.connections[n] - if !ok { - return - } - rc.minedTxRequests[*txID] = struct{}{} - - clist, ok := r.minedTxNotifications[*txID] - if !ok { - clist = list.New() - r.minedTxNotifications[*txID] = clist - } - clist.PushBack(n) -} - -func (r *wsContext) removeGlobalMinedTxRequest(n ntfnChan, txID *btcwire.ShaHash) { - clist := r.minedTxNotifications[*txID] - var enext *list.Element - for e := clist.Front(); e != nil; e = enext { - enext = e.Next() - c := e.Value.(ntfnChan) - if c == n { - clist.Remove(e) - break - } - } - - if clist.Len() == 0 { - delete(r.minedTxNotifications, *txID) - } -} - -// RemoveMinedTxRequest removes request contexts for notifications of a -// mined transaction. -func (r *wsContext) RemoveMinedTxRequest(n ntfnChan, txID *btcwire.ShaHash) { - r.Lock() - defer r.Unlock() - - r.removeMinedTxRequest(n, txID) -} - -// removeMinedTxRequest removes request contexts for notifications of a -// mined transaction without grabbing any locks. -func (r *wsContext) removeMinedTxRequest(n ntfnChan, txID *btcwire.ShaHash) { - r.removeGlobalMinedTxRequest(n, txID) - rc, ok := r.connections[n] - if !ok { - return - } - delete(rc.minedTxRequests, *txID) -} - -// CloseListeners removes all request contexts for notifications sent -// to a wallet notification channel and closes the channel to stop all -// goroutines currently serving that wallet. -func (r *wsContext) CloseListeners(n ntfnChan) { - r.Lock() - defer r.Unlock() - - delete(r.connections, n) - close(n) -} - -// newWebsocketContext returns a new websocket context that is used -// for handling websocket requests and notifications. -func newWebsocketContext() *wsContext { - return &wsContext{ - connections: make(map[ntfnChan]*requestContexts), - txNotifications: make(map[string]*list.List), - spentNotifications: make(map[btcwire.OutPoint]*list.List), - minedTxNotifications: make(map[btcwire.ShaHash]*list.List), - } -} - -// requestContexts holds all requests for a single wallet connection. -type requestContexts struct { - // disconnecting indicates the websocket is in the process of - // disconnecting. This is used to prevent trying to handle any more - // commands in the interim. - disconnecting bool + // addr is the remote address of the client. + addr string // authenticated specifies whether a client has been authenticated // and therefore is allowed to communicated over the websocket. authenticated bool - // blockUpdates specifies whether a client has requested notifications - // for whenever blocks are connected or disconnected from the main - // chain. - blockUpdates bool - - // allTxUpdates specifies whether a client has requested notifications - // for all new transactions. - allTxUpdates bool - - // verboseTxUpdates specifies whether a client has requested more verbose - // information about all new transactions + // verboseTxUpdates specifies whether a client has requested verbose + // information about all new transactions. verboseTxUpdates bool - // txRequests is a set of addresses a wallet has requested transactions - // updates for. It is maintained here so all requests can be removed - // when a wallet disconnects. - txRequests map[string]struct{} + // addrRequests is a set of addresses the caller has requested to be + // notified about. It is maintained here so all requests can be removed + // when a wallet disconnects. Owned by the notification manager. + addrRequests map[string]struct{} // spentRequests is a set of unspent Outpoints a wallet has requested // notifications for when they are spent by a processed transaction. + // Owned by the notification manager. spentRequests map[btcwire.OutPoint]struct{} - // minedTxRequests holds a set of transaction IDs (tx hashes) of - // transactions created by a wallet. A wallet may request - // notifications of when a tx it created is mined into a block and - // removed from the mempool. Once a tx has been mined into a - // block, wallet may remove the raw transaction from its unmined tx - // pool. - minedTxRequests map[btcwire.ShaHash]struct{} + // Networking infrastructure. + asyncStarted bool + asyncChan chan btcjson.Cmd + ntfnChan chan []byte + sendChan chan wsResponse + quit chan bool + wg sync.WaitGroup } -// respondToAnyCmd checks that a parsed command is a standard or -// extension JSON-RPC command and runs the proper handler to reply to -// the command. Any and all responses are sent to the wallet from -// this function. -func respondToAnyCmd(cmd btcjson.Cmd, s *rpcServer, c handlerChans) *btcjson.Reply { +// handleMessage is the main handler for incoming requests. It enforces +// authentication, parses the incoming json, looks up and executes handlers +// (including pass through for standard RPC commands), sends the appropriate +// response. It also detects commands which are marked as long-running and +// sends them off to the asyncHander for processing. +func (c *wsClient) handleMessage(msg string) { + if !c.authenticated { + // Disconnect immediately if the provided command fails to + // parse when the client is not already authenticated. + cmd, jsonErr := parseCmd([]byte(msg)) + if jsonErr != nil { + c.Disconnect() + return + } + + // Disconnect immediately if the first command is not + // authenticate when not already authenticated. + authCmd, ok := cmd.(*btcws.AuthenticateCmd) + if !ok { + rpcsLog.Warnf("Unauthenticated websocket message " + + "received") + c.Disconnect() + return + } + + // Check credentials. + login := authCmd.Username + ":" + authCmd.Passphrase + auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login)) + authSha := sha256.Sum256([]byte(auth)) + cmp := subtle.ConstantTimeCompare(authSha[:], c.server.authsha[:]) + if cmp != 1 { + rpcsLog.Warnf("Auth failure.") + c.Disconnect() + return + } + c.authenticated = true + + // Marshal and send response. + reply, err := createMarshalledReply(authCmd.Id(), nil, nil) + if err != nil { + rpcsLog.Errorf("Failed to marshal authenticate reply: "+ + "%v", err.Error()) + return + } + c.SendMessage(reply, nil) + return + } + + // Attmpt to parse the raw json into a known btcjson.Cmd. + cmd, jsonErr := parseCmd([]byte(msg)) + if jsonErr != nil { + // Use the provided id for errors when a valid JSON-RPC message + // was parsed. Requests with no IDs are ignored. + var id interface{} + if cmd != nil { + id = cmd.Id() + if id == nil { + return + } + } + + // Marshal and send response. + reply, err := createMarshalledReply(id, nil, jsonErr) + if err != nil { + rpcsLog.Errorf("Failed to marshal parse failure "+ + "reply: %v", err) + return + } + c.SendMessage(reply, nil) + return + } + rpcsLog.Debugf("Received command <%s> from %s", cmd.Method(), c.addr) + + // Disconnect if already authenticated and another authenticate command + // is received. + if _, ok := cmd.(*btcws.AuthenticateCmd); ok { + rpcsLog.Warnf("Websocket client %s is already authenticated", + c.addr) + c.Disconnect() + return + } + + // When the command is marked as a long-running command, send it off + // to the asyncHander goroutine for processing. + if _, ok := wsAsyncHandlers[cmd.Method()]; ok { + // Start up the async goroutine for handling long-running + // requests asynchonrously if needed. + if !c.asyncStarted { + rpcsLog.Tracef("Starting async handler for %s", c.addr) + c.wg.Add(1) + go c.asyncHandler() + c.asyncStarted = true + } + c.asyncChan <- cmd + return + } + // Lookup the websocket extension for the command and if it doesn't // exist fallback to handling the command as a standard command. wsHandler, ok := wsHandlers[cmd.Method()] if !ok { // No websocket-specific handler so handle like a legacy // RPC connection. - response := standardCmdReply(cmd, s) - return &response + response := standardCmdReply(cmd, c.server) + reply, err := json.Marshal(response) + if err != nil { + rpcsLog.Errorf("Failed to marshal reply for <%s> "+ + "command: %v", cmd.Method(), err) + + return + } + c.SendMessage(reply, nil) + return } - result, jsonErr := wsHandler(s, cmd, c) - id := cmd.Id() - response := btcjson.Reply{ - Id: &id, - Result: result, - Error: jsonErr, + + // Invoke the handler and marshal and send response. + result, jsonErr := wsHandler(c, cmd) + reply, err := createMarshalledReply(cmd.Id(), result, jsonErr) + if err != nil { + rpcsLog.Errorf("Failed to marshal reply for <%s> command: %v", + cmd.Method(), err) + return } - return &response + c.SendMessage(reply, nil) } -// handleGetCurrentNet implements the getcurrentnet command extension -// for websocket connections. -func handleGetCurrentNet(s *rpcServer, icmd btcjson.Cmd, c handlerChans) (interface{}, *btcjson.Error) { - if cfg.TestNet3 { - return btcwire.TestNet3, nil +// inHandler handles all incoming messages for the websocket connection. It +// must be run as a goroutine. +func (c *wsClient) inHandler() { +out: + for { + // Break out of the loop once the quit channel has been closed. + // Use a non-blocking select here so we fall through otherwise. + select { + case <-c.quit: + break out + default: + } + + var msg string + if err := websocket.Message.Receive(c.conn, &msg); err != nil { + // Log the error if it's not due to disconnecting. + if err != io.EOF { + rpcsLog.Errorf("Websocket receive error from "+ + "%s: %v", c.addr, err) + } + break out + } + c.handleMessage(msg) + } + + // Ensure the connection is closed. + c.Disconnect() + c.wg.Done() + rpcsLog.Tracef("Websocket client input handler done for %s", c.addr) +} + +// notificationQueueHandler handles the queueing of outgoing notifications for +// the websocket client. This runs as a muxer for various sources of input to +// ensure that queueing up notifications to be sent will not block. Otherwise, +// slow clients could bog down the other systems (such as the mempool or block +// manager) which are queueing the data. The data is passed on to outHandler to +// actually be written. It must be run as a goroutine. +func (c *wsClient) notificationQueueHandler() { + ntfnSentChan := make(chan bool, 1) // nonblocking sync + + // pendingNtfns is used as a queue for notifications that are ready to + // be sent once there are no outstanding notifications currently being + // sent. The waiting flag is used over simply checking for items in the + // pending list to ensure cleanup knows what has and hasn't been sent + // to the outHandler. Currently no special cleanup is needed, however + // if something like a done channel is added to notifications in the + // future, not knowing what has and hasn't been sent to the outHandler + // (and thus who should respond to the done channel) would be + // problematic without using this approach. + pendingNtfns := list.New() + waiting := false +out: + for { + select { + // This channel is notified when a message is being queued to + // be sent across the network socket. It will either send the + // message immediately if a send is not already in progress, or + // queue the message to be sent once the other pending messages + // are sent. + case msg := <-c.ntfnChan: + if !waiting { + c.SendMessage(msg, ntfnSentChan) + } else { + pendingNtfns.PushBack(msg) + } + waiting = true + + // This channel is notified when a notification has been sent + // across the network socket. + case <-ntfnSentChan: + // No longer waiting if there are no more messages in + // the pending messages queue. + next := pendingNtfns.Front() + if next == nil { + waiting = false + continue + } + + // Notify the outHandler about the next item to + // asynchronously send. + msg := pendingNtfns.Remove(next).([]byte) + c.SendMessage(msg, ntfnSentChan) + + case <-c.quit: + break out + } + } + + // Drain any wait channels before exiting so nothing is left waiting + // around to send. +cleanup: + for { + select { + case <-c.ntfnChan: + case <-ntfnSentChan: + default: + break cleanup + } + } + c.wg.Done() + rpcsLog.Tracef("Websocket client notification queue handler done "+ + "for %s", c.addr) +} + +// outHandler handles all outgoing messages for the websocket connection. It +// must be run as a goroutine. It uses a buffered channel to serialize output +// messages while allowing the sender to continue running asynchronously. It +// must be run as a goroutine. +func (c *wsClient) outHandler() { +out: + for { + // Send any messages ready for send until the quit channel is + // closed. + select { + case r := <-c.sendChan: + err := websocket.Message.Send(c.conn, string(r.msg)) + if err != nil { + c.Disconnect() + break out + } + if r.doneChan != nil { + r.doneChan <- true + } + + case <-c.quit: + break out + } + } + + // Drain any wait channels before exiting so nothing is left waiting + // around to send. +cleanup: + for { + select { + case r := <-c.sendChan: + if r.doneChan != nil { + r.doneChan <- false + } + default: + break cleanup + } + } + c.wg.Done() + rpcsLog.Tracef("Websocket client output handler done for %s", c.addr) +} + +// asyncHandler handles all long-running requests such as rescans which are +// not run directly in the inHandler routine unlike most requests. This allows +// normal quick requests to continue to be processed and responded to even while +// lengthy operations are underway. Only one long-running operation is +// permitted at a time, so multiple long-running requests are queued and +// serialized. It must be run as a goroutine. Also, this goroutine is not +// started until/if the first long-running request is made. +func (c *wsClient) asyncHandler() { + asyncHandlerDoneChan := make(chan bool, 1) // nonblocking sync + pendingCmds := list.New() + waiting := false + + // runHandler runs the handler for the passed command and sends the + // reply. + runHandler := func(cmd btcjson.Cmd) { + wsHandler, ok := wsHandlers[cmd.Method()] + if !ok { + rpcsLog.Warnf("No handler for command <%s>", + cmd.Method()) + return + } + + // Invoke the handler and marshal and send response. + result, jsonErr := wsHandler(c, cmd) + reply, err := createMarshalledReply(cmd.Id(), result, jsonErr) + if err != nil { + rpcsLog.Errorf("Failed to marshal reply for <%s> "+ + "command: %v", cmd.Method(), err) + return + } + c.SendMessage(reply, nil) + } + +out: + for { + select { + case cmd := <-c.asyncChan: + if !waiting { + c.wg.Add(1) + go func(cmd btcjson.Cmd) { + runHandler(cmd) + asyncHandlerDoneChan <- true + c.wg.Done() + }(cmd) + } else { + pendingCmds.PushBack(cmd) + } + waiting = true + + case <-asyncHandlerDoneChan: + // No longer waiting if there are no more messages in + // the pending messages queue. + next := pendingCmds.Front() + if next == nil { + waiting = false + continue + } + + // Notify the outHandler about the next item to + // asynchronously send. + element := pendingCmds.Remove(next) + c.wg.Add(1) + go func(cmd btcjson.Cmd) { + runHandler(cmd) + asyncHandlerDoneChan <- true + c.wg.Done() + }(element.(btcjson.Cmd)) + + case <-c.quit: + break out + } + } + + // Drain any wait channels before exiting so nothing is left waiting + // around to send. +cleanup: + for { + select { + case <-c.asyncChan: + case <-asyncHandlerDoneChan: + default: + break cleanup + } + } + + c.wg.Done() + rpcsLog.Tracef("Websocket client async handler done for %s", c.addr) +} + +// SendMessage sends the passed json to the websocket client. It is backed +// by a buffered channel, so it will not block until the send channel is full. +// Note however that QueueNotification must be used for sending async +// notifications instead of the this function. This approach allows a limit to +// the number of outstanding requests a client can make without preventing or +// blocking on async notifications. +func (c *wsClient) SendMessage(marshalledJSON []byte, doneChan chan bool) { + // Don't queue the message if in the process of shutting down. + select { + case <-c.quit: + if doneChan != nil { + doneChan <- false + } + return + default: + } + + c.sendChan <- wsResponse{msg: marshalledJSON, doneChan: doneChan} +} + +// QueueMessage queues the passed notification to be sent to the websocket +// client. This function, as the name implies, is only intended for +// notifications since it has additional logic to prevent other subsystems, such +// as the memory pool and block manager, from blocking even when the send +// channel is full. +func (c *wsClient) QueueNotification(marshalledJSON []byte) { + // Don't queue the message if in the process of shutting down. + select { + case <-c.quit: + return + default: + } + + c.ntfnChan <- marshalledJSON +} + +// Disconnect disconnects the websocket client. +func (c *wsClient) Disconnect() { + // Don't try to disconnect again if in the process of shutting down. + select { + case <-c.quit: + return + default: + } + + rpcsLog.Tracef("Disconnecting websocket client %s", c.addr) + close(c.quit) + c.conn.Close() +} + +// Start begins processing input and output messages. +func (c *wsClient) Start() { + rpcsLog.Tracef("Starting websocket client %s", c.addr) + + // Start processing input and output. + c.wg.Add(3) + go c.inHandler() + go c.notificationQueueHandler() + go c.outHandler() +} + +// WaitForShutdown blocks until the websocket client goroutines are stopped +// and the connection is closed. +func (c *wsClient) WaitForShutdown() { + c.wg.Wait() +} + +// newWebsocketClient returns a new websocket client given the notification +// manager, websocket connection, remote address, and whether or not the client +// has already been authenticated (via HTTP Basic access authentication). The +// returned client is ready to start. Once started, the client will process +// incoming and outgoing messages in separate goroutines complete with queueing +// and asynchrous handling for long-running operations. +func newWebsocketClient(server *rpcServer, conn *websocket.Conn, + remoteAddr string, authenticated bool) *wsClient { + + return &wsClient{ + conn: conn, + addr: remoteAddr, + authenticated: authenticated, + server: server, + addrRequests: make(map[string]struct{}), + spentRequests: make(map[btcwire.OutPoint]struct{}), + ntfnChan: make(chan []byte, 1), // nonblocking sync + asyncChan: make(chan btcjson.Cmd, 1), // nonblocking sync + sendChan: make(chan wsResponse, websocketSendBufferSize), + quit: make(chan bool), } - return btcwire.MainNet, nil } // handleGetBestBlock implements the getbestblock command extension // for websocket connections. -func handleGetBestBlock(s *rpcServer, icmd btcjson.Cmd, c handlerChans) (interface{}, *btcjson.Error) { +func handleGetBestBlock(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson.Error) { // All other "get block" commands give either the height, the // hash, or both but require the block SHA. This gets both for // the best block. - sha, height, err := s.server.db.NewestSha() + sha, height, err := wsc.server.server.db.NewestSha() if err != nil { return nil, &btcjson.ErrBestBlockHash } @@ -359,16 +1171,47 @@ func handleGetBestBlock(s *rpcServer, icmd btcjson.Cmd, c handlerChans) (interfa return result, nil } +// handleGetCurrentNet implements the getcurrentnet command extension +// for websocket connections. +func handleGetCurrentNet(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson.Error) { + return wsc.server.server.btcnet, nil +} + // handleNotifyBlocks implements the notifyblocks command extension for // websocket connections. -func handleNotifyBlocks(s *rpcServer, icmd btcjson.Cmd, c handlerChans) (interface{}, *btcjson.Error) { - s.ws.AddBlockUpdateRequest(c.n) +func handleNotifyBlocks(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson.Error) { + wsc.server.ntfnMgr.AddBlockUpdateRequest(wsc) + return nil, nil +} + +// handleNotifySpent implements the notifyspent command extension for +// websocket connections. +func handleNotifySpent(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson.Error) { + cmd, ok := icmd.(*btcws.NotifySpentCmd) + if !ok { + return nil, &btcjson.ErrInternal + } + + wsc.server.ntfnMgr.AddSpentRequest(wsc, cmd.OutPoint) + return nil, nil +} + +// handleNotifyAllNewTXs implements the notifyallnewtxs command extension for +// websocket connections. +func handleNotifyAllNewTXs(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson.Error) { + cmd, ok := icmd.(*btcws.NotifyAllNewTXsCmd) + if !ok { + return nil, &btcjson.ErrInternal + } + + wsc.verboseTxUpdates = cmd.Verbose + wsc.server.ntfnMgr.AddNewTxRequest(wsc) return nil, nil } // handleNotifyNewTXs implements the notifynewtxs command extension for // websocket connections. -func handleNotifyNewTXs(s *rpcServer, icmd btcjson.Cmd, c handlerChans) (interface{}, *btcjson.Error) { +func handleNotifyNewTXs(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson.Error) { cmd, ok := icmd.(*btcws.NotifyNewTXsCmd) if !ok { return nil, &btcjson.ErrInternal @@ -384,7 +1227,7 @@ func handleNotifyNewTXs(s *rpcServer, icmd btcjson.Cmd, c handlerChans) (interfa return nil, &e } - // TODO(jrick) Notifing for non-P2PKH addresses is currently + // TODO(jrick) Notifying for non-P2PKH addresses is currently // unsuported. if _, ok := addr.(*btcutil.AddressPubKeyHash); !ok { e := btcjson.Error{ @@ -394,107 +1237,23 @@ func handleNotifyNewTXs(s *rpcServer, icmd btcjson.Cmd, c handlerChans) (interfa return nil, &e } - s.ws.AddTxRequest(c.n, addr.EncodeAddress()) + wsc.server.ntfnMgr.AddAddrRequest(wsc, addr.EncodeAddress()) } return nil, nil } -// handleNotifyAllNewTXs implements the notifyallnewtxs command extension for -// websocket connections. -func handleNotifyAllNewTXs(s *rpcServer, icmd btcjson.Cmd, c handlerChans) (interface{}, *btcjson.Error) { - cmd, ok := icmd.(*btcws.NotifyAllNewTXsCmd) - if !ok { - return nil, &btcjson.ErrInternal - } +// rescanBlock rescans all transactions in a single block. This is a helper +// function for handleRescan. +func rescanBlock(wsc *wsClient, cmd *btcws.RescanCmd, blk *btcutil.Block) { + db := wsc.server.server.db - s.ws.AddAllNewTxRequest(c.n, cmd.Verbose) - return nil, nil -} - -// handleNotifySpent implements the notifyspent command extension for -// websocket connections. -func handleNotifySpent(s *rpcServer, icmd btcjson.Cmd, c handlerChans) (interface{}, *btcjson.Error) { - cmd, ok := icmd.(*btcws.NotifySpentCmd) - if !ok { - return nil, &btcjson.ErrInternal - } - - s.ws.AddSpentRequest(c.n, cmd.OutPoint) - - return nil, nil -} - -// handleRescan implements the rescan command extension for websocket -// connections. -func handleRescan(s *rpcServer, icmd btcjson.Cmd, c handlerChans) (interface{}, *btcjson.Error) { - cmd, ok := icmd.(*btcws.RescanCmd) - if !ok { - return nil, &btcjson.ErrInternal - } - - if len(cmd.Addresses) == 1 { - rpcsLog.Info("Beginning rescan for 1 address") - } else { - rpcsLog.Infof("Beginning rescan for %v addresses", - len(cmd.Addresses)) - } - - minblock := int64(cmd.BeginBlock) - maxblock := int64(cmd.EndBlock) - - // FetchHeightRange may not return a complete list of block shas for - // the given range, so fetch range as many times as necessary. - for { - blkshalist, err := s.server.db.FetchHeightRange(minblock, maxblock) - if err != nil { - rpcsLog.Errorf("Error looking up block range: %v", err) - return nil, &btcjson.ErrDatabase - } - if len(blkshalist) == 0 { - break - } - - for i := range blkshalist { - blk, err := s.server.db.FetchBlockBySha(&blkshalist[i]) - if err != nil { - rpcsLog.Errorf("Error looking up block sha: %v", err) - return nil, &btcjson.ErrDatabase - } - - // A select statement is used to stop rescans if the - // client requesting the rescan has disconnected. - select { - case <-c.disconnected: - rpcsLog.Debugf("Stopping rescan at height %v for disconnected client", - blk.Height()) - return nil, nil - - default: - rescanBlock(s, cmd, c, blk) - } - } - - if maxblock-minblock > int64(len(blkshalist)) { - minblock += int64(len(blkshalist)) - } else { - break - } - } - - rpcsLog.Info("Finished rescan") - return nil, nil -} - -// rescanBlock rescans all transactions in a single block. This is a -// helper function for handleRescan. -func rescanBlock(s *rpcServer, cmd *btcws.RescanCmd, c handlerChans, blk *btcutil.Block) { for _, tx := range blk.Transactions() { var txReply *btcdb.TxListReply txouts: for txOutIdx, txout := range tx.MsgTx().TxOut { _, addrs, _, err := btcscript.ExtractPkScriptAddrs( - txout.PkScript, s.server.btcnet) + txout.PkScript, wsc.server.server.btcnet) if err != nil { continue txouts } @@ -509,7 +1268,7 @@ func rescanBlock(s *rpcServer, cmd *btcws.RescanCmd, c handlerChans, blk *btcuti // of the tx, so any can removed from the utxo set (since // they are, as of this tx, now spent). if txReply == nil { - txReplyList, err := s.server.db.FetchTxBySha(tx.Sha()) + txReplyList, err := db.FetchTxBySha(tx.Sha()) if err != nil { rpcsLog.Errorf("Tx Sha %v not found by db", tx.Sha()) continue txouts @@ -538,515 +1297,83 @@ func rescanBlock(s *rpcServer, cmd *btcws.RescanCmd, c handlerChans, blk *btcuti BlockTime: blk.MsgBlock().Header.Timestamp.Unix(), Spent: txReply.TxSpent[txOutIdx], } + marshalledJSON, err := json.Marshal(ntfn) + if err != nil { + rpcsLog.Errorf("Failed to marshal processedtx notification: %v", err) + return + } + // Stop the rescan early if the websocket client + // disconnected. select { - case <-c.disconnected: + case <-wsc.quit: return default: - c.n <- ntfn + wsc.SendMessage(marshalledJSON, nil) } } } } } -// handleWalletSendRawTransaction implements the websocket extended version of -// the sendrawtransaction command. -func handleWalletSendRawTransaction(s *rpcServer, icmd btcjson.Cmd, c handlerChans) (interface{}, *btcjson.Error) { - result, err := handleSendRawTransaction(s, icmd) - // TODO: the standard handlers really should be changed to - // return btcjson.Errors which get used directly in the - // response. Wouldn't need this crap here then. - if err != nil { - if jsonErr, ok := err.(*btcjson.Error); ok { - return result, jsonErr - } - jsonErr := &btcjson.Error{ - Code: btcjson.ErrMisc.Code, - Message: err.Error(), - } - return result, jsonErr - } - - // The result is already guaranteed to be a valid hash string if no - // error was returned above, so it's safe to ignore the error here. - txSha, _ := btcwire.NewShaHashFromStr(result.(string)) - - // Request to be notified when the transaction is mined. - s.ws.AddMinedTxRequest(c.n, txSha) - return result, nil -} - -// websocketAuthenticate checks the authenticate command for valid credentials. -// An error is returned if the credentials are invalid or if the connection is -// already authenticated. -// -// This function MUST be called with the websocket lock held. -func websocketAuthenticate(icmd btcjson.Cmd, rc *requestContexts, authSha []byte) error { - cmd, ok := icmd.(*btcws.AuthenticateCmd) +// handleRescan implements the rescan command extension for websocket +// connections. +func handleRescan(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson.Error) { + cmd, ok := icmd.(*btcws.RescanCmd) if !ok { - return fmt.Errorf("%s", btcjson.ErrInternal.Message) + return nil, &btcjson.ErrInternal } - // Already authenticated? - if rc.authenticated { - rpcsLog.Warnf("Already authenticated") - return ErrBadAuth - } - - // Check credentials. - login := cmd.Username + ":" + cmd.Passphrase - auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login)) - calcualtedAuthSha := sha256.Sum256([]byte(auth)) - cmp := subtle.ConstantTimeCompare(calcualtedAuthSha[:], authSha) - if cmp != 1 { - rpcsLog.Warnf("Auth failure.") - return ErrBadAuth - } - - rc.authenticated = true - return nil -} - -// AddWalletListener adds a channel to listen for new messages from a -// wallet. -func (s *rpcServer) AddWalletListener(n ntfnChan, rc *requestContexts) { - s.ws.Lock() - s.ws.connections[n] = rc - s.ws.Unlock() -} - -// RemoveWalletListener removes a wallet listener channel. -func (s *rpcServer) RemoveWalletListener(n ntfnChan) { - s.ws.Lock() - - rc := s.ws.connections[n] - for k := range rc.txRequests { - s.ws.removeGlobalTxRequest(n, k) - } - for k := range rc.spentRequests { - s.ws.removeGlobalSpentRequest(n, &k) - } - for k := range rc.minedTxRequests { - s.ws.removeGlobalMinedTxRequest(n, &k) - } - - delete(s.ws.connections, n) - s.ws.Unlock() -} - -// walletReqsNotifications is the handler function for websocket -// connections from a btcwallet instance. It reads messages from wallet and -// sends back replies, as well as notififying wallets of chain updates. -func (s *rpcServer) walletReqsNotifications(ws *websocket.Conn, authenticated bool) { - // Clear the read deadline that was set before the websocket hijacked - // the connection. - ws.SetReadDeadline(timeZeroVal) - - // Add wallet notification channel so this handler receives btcd chain - // notifications. - n := make(ntfnChan) - rc := &requestContexts{ - authenticated: authenticated, - txRequests: make(map[string]struct{}), - spentRequests: make(map[btcwire.OutPoint]struct{}), - minedTxRequests: make(map[btcwire.ShaHash]struct{}), - } - s.AddWalletListener(n, rc) - defer s.RemoveWalletListener(n) - - // Channel for responses. - r := make(chan *btcjson.Reply) - - // Channels for websocket handlers. - disconnected := make(chan struct{}) - hc := handlerChans{ - n: n, - disconnected: disconnected, - } - - // msgs is a channel for all messages received over the websocket. - msgs := make(chan string) - - // Receive messages from websocket and send across reqs until the - // connection is lost. - go func() { - for { - select { - case <-s.quit: - return - - default: - var m string - if err := websocket.Message.Receive(ws, &m); err != nil { - // Only close disconnected if not closed yet. - select { - case <-disconnected: - // nothing - - default: - close(disconnected) - } - return - } - msgs <- m - } - } - }() - - for { - select { - case <-s.quit: - // Server closed. Closing disconnected signals handlers to stop - // and flushes all channels handlers may write to. - select { - case <-disconnected: - // nothing - - default: - close(disconnected) - } - - case <-disconnected: - for { - select { - case <-msgs: - case <-r: - case <-n: - default: - return - } - } - - case m := <-msgs: - // This function internally spawns a new goroutine to - // the handle request after validating authentication. - // Responses and notifications are read by channels in - // this for-select loop. - if !rc.disconnecting { - err := s.websocketJSONHandler(r, hc, m) - if err == ErrBadAuth { - rc.disconnecting = true - close(disconnected) - ws.Close() - } - } - - case response := <-r: - // Marshal and send response. - mresp, err := json.Marshal(response) - if err != nil { - rpcsLog.Errorf("Error unmarshaling response: %v", err) - continue - } - if err := websocket.Message.Send(ws, string(mresp)); err != nil { - // Only close disconnected if not closed yet. - select { - case <-disconnected: - // nothing - - default: - close(disconnected) - } - return - } - - case ntfn := <-n: - // Marshal and send notification. - mntfn, err := ntfn.MarshalJSON() - if err != nil { - rpcsLog.Errorf("Error unmarshaling notification: %v", err) - continue - } - if err := websocket.Message.Send(ws, string(mntfn)); err != nil { - // Only close disconnected if not closed yet. - select { - case <-disconnected: - // nothing - - default: - close(disconnected) - } - return - } - } - } -} - -// websocketJSONHandler parses and handles a marshalled json message, -// sending the marshalled reply to a wallet notification channel. -func (s *rpcServer) websocketJSONHandler(r chan *btcjson.Reply, c handlerChans, msg string) error { - var resp *btcjson.Reply - - cmd, jsonErr := parseCmd([]byte(msg)) - if jsonErr != nil { - resp = &btcjson.Reply{} - if cmd != nil { - // Unmarshaling at least a valid JSON-RPC message succeeded. - // Use the provided id for errors. Requests with no IDs - // should be ignored. - id := cmd.Id() - if id == nil { - return nil - } - resp.Id = &id - } - resp.Error = jsonErr + numAddrs := len(cmd.Addresses) + if numAddrs == 1 { + rpcsLog.Info("Beginning rescan for 1 address") } else { - // The first command must be the "authenticate" command if the - // connection is not already authenticated. - s.ws.Lock() - rc := s.ws.connections[c.n] - // Note: c.n is guaranteed to be in the s.ws.connections map since it is - // called from the main walletReqNotifications for loop. Any calls - // from that goroutine will always have valid connections since they - // are not removed till after that exits. - if _, ok := cmd.(*btcws.AuthenticateCmd); ok { - // Validate the provided credentials. - err := websocketAuthenticate(cmd, rc, s.authsha[:]) - if err != nil { - s.ws.Unlock() - return err - } - - // Generate an empty response to send for the successful - // authentication. - id := cmd.Id() - resp = &btcjson.Reply{ - Id: &id, - Result: nil, - Error: nil, - } - } else if !rc.authenticated { - rpcsLog.Warnf("Unauthenticated websocket message " + - "received") - s.ws.Unlock() - return ErrBadAuth - } - - s.ws.Unlock() + rpcsLog.Infof("Beginning rescan for %d addresses", numAddrs) } - // Find and run handler in new goroutine. - go func() { - if resp == nil { - resp = respondToAnyCmd(cmd, s, c) - } - select { - case <-c.disconnected: - return + minBlock := int64(cmd.BeginBlock) + maxBlock := int64(cmd.EndBlock) - default: - r <- resp - } - }() - - return nil -} - -// NotifyBlockConnected creates and marshalls a JSON message to notify -// of a new block connected to the main chain. The notification is sent -// to each connected wallet. -func (s *rpcServer) NotifyBlockConnected(block *btcutil.Block) { - s.ws.Lock() - defer s.ws.Unlock() - - hash, err := block.Sha() - if err != nil { - rpcsLog.Error("Bad block; connected block notification dropped") - return - } - - // TODO: remove int32 type conversion. - ntfn := btcws.NewBlockConnectedNtfn(hash.String(), int32(block.Height())) - for ntfnChan, rc := range s.ws.connections { - if rc.blockUpdates { - ntfnChan <- ntfn - } - } - - // Inform any interested parties about txs mined in this block. - for _, tx := range block.Transactions() { - if clist, ok := s.ws.minedTxNotifications[*tx.Sha()]; ok { - var enext *list.Element - for e := clist.Front(); e != nil; e = enext { - enext = e.Next() - n := e.Value.(ntfnChan) - // TODO: remove int32 type conversion after - // the int64 -> int32 switch is made. - ntfn := btcws.NewTxMinedNtfn(tx.Sha().String(), - hash.String(), int32(block.Height()), - block.MsgBlock().Header.Timestamp.Unix(), - tx.Index()) - n <- ntfn - s.ws.removeMinedTxRequest(n, tx.Sha()) - } - } - } -} - -// NotifyBlockDisconnected creates and marshals a JSON message to notify -// of a new block disconnected from the main chain. The notification is sent -// to each connected wallet. -func (s *rpcServer) NotifyBlockDisconnected(block *btcutil.Block) { - s.ws.Lock() - defer s.ws.Unlock() - - hash, err := block.Sha() - if err != nil { - rpcsLog.Error("Bad block; connected block notification dropped") - return - } - - // TODO: remove int32 type conversion. - ntfn := btcws.NewBlockDisconnectedNtfn(hash.String(), - int32(block.Height())) - for ntfnChan, rc := range s.ws.connections { - if rc.blockUpdates { - ntfnChan <- ntfn - } - } -} - -// NotifyBlockTXs creates and marshals a JSON message to notify wallets -// of new transactions (with both spent and unspent outputs) for a watched -// address. -func (s *rpcServer) NotifyBlockTXs(db btcdb.Db, block *btcutil.Block) { - for _, tx := range block.Transactions() { - s.NewBlockNotifyCheckTxIn(tx) - s.NotifyForTxOuts(tx, block) - } -} - -func notifySpentData(n ntfnChan, txhash *btcwire.ShaHash, index uint32, - spender *btcutil.Tx) { - - var buf bytes.Buffer - // Ignore Serialize's error, as writing to a bytes.buffer - // cannot fail. - spender.MsgTx().Serialize(&buf) - txStr := hex.EncodeToString(buf.Bytes()) - - ntfn := btcws.NewTxSpentNtfn(txhash.String(), int(index), txStr) - n <- ntfn -} - -// NewBlockNotifyCheckTxIn is a helper function to iterate through -// each transaction input of a new block and perform any checks and -// notify listening frontends when necessary. -func (s *rpcServer) NewBlockNotifyCheckTxIn(tx *btcutil.Tx) { - s.ws.Lock() - defer s.ws.Unlock() - - for _, txin := range tx.MsgTx().TxIn { - if clist, ok := s.ws.spentNotifications[txin.PreviousOutpoint]; ok { - var enext *list.Element - for e := clist.Front(); e != nil; e = enext { - enext = e.Next() - n := e.Value.(ntfnChan) - notifySpentData(n, &txin.PreviousOutpoint.Hash, - txin.PreviousOutpoint.Index, tx) - s.ws.removeSpentRequest(n, &txin.PreviousOutpoint) - } - } - } -} - -// NotifyForTxOuts iterates through all outputs of a tx, performing any -// necessary notifications for wallets. If a non-nil block is passed, -// additional block information is passed with the notifications. -func (s *rpcServer) NotifyForTxOuts(tx *btcutil.Tx, block *btcutil.Block) { - s.ws.Lock() - defer s.ws.Unlock() - - // Nothing to do if nobody is listening for transaction notifications. - if len(s.ws.txNotifications) == 0 { - return - } - - for i, txout := range tx.MsgTx().TxOut { - _, addrs, _, err := btcscript.ExtractPkScriptAddrs( - txout.PkScript, s.server.btcnet) + // FetchHeightRange may not return a complete list of block shas for + // the given range, so fetch range as many times as necessary. + db := wsc.server.server.db + for { + hashList, err := db.FetchHeightRange(minBlock, maxBlock) if err != nil { - continue + rpcsLog.Errorf("Error looking up block range: %v", err) + return nil, &btcjson.ErrDatabase + } + if len(hashList) == 0 { + break } - for _, addr := range addrs { - // Only support pay-to-pubkey-hash right now. - if _, ok := addr.(*btcutil.AddressPubKeyHash); !ok { - continue + for i := range hashList { + blk, err := db.FetchBlockBySha(&hashList[i]) + if err != nil { + rpcsLog.Errorf("Error looking up block sha: %v", err) + return nil, &btcjson.ErrDatabase } - encodedAddr := addr.EncodeAddress() - if idlist, ok := s.ws.txNotifications[encodedAddr]; ok { - for e := idlist.Front(); e != nil; e = e.Next() { - n := e.Value.(ntfnChan) - - ntfn := &btcws.ProcessedTxNtfn{ - Receiver: encodedAddr, - TxID: tx.Sha().String(), - TxOutIndex: uint32(i), - Amount: txout.Value, - PkScript: hex.EncodeToString(txout.PkScript), - // TODO(jrick): hardcoding unspent is WRONG and needs - // to be either calculated from other block txs, or dropped. - Spent: false, - } - - if block != nil { - blkhash, err := block.Sha() - if err != nil { - rpcsLog.Error("Error getting block sha; dropping Tx notification") - break - } - ntfn.BlockHeight = int32(block.Height()) - ntfn.BlockHash = blkhash.String() - ntfn.BlockIndex = tx.Index() - ntfn.BlockTime = block.MsgBlock().Header.Timestamp.Unix() - } else { - ntfn.BlockHeight = -1 - ntfn.BlockIndex = -1 - } - - n <- ntfn - } - } - } - } -} - -// NotifyForNewTx sends delivers the new tx to any client that has -// registered for all new TX. -func (s *rpcServer) NotifyForNewTx(tx *btcutil.Tx) { - s.ws.Lock() - defer s.ws.Unlock() - - txId := tx.Sha().String() - mtx := tx.MsgTx() - - var amount int64 - for _, txOut := range mtx.TxOut { - amount += txOut.Value - } - - ntfn := btcws.NewAllTxNtfn(txId, amount) - var verboseNtfn *btcws.AllVerboseTxNtfn - - for ntfnChan, rc := range s.ws.connections { - if rc.allTxUpdates { - if rc.verboseTxUpdates { - if verboseNtfn == nil { - rawTx, err := createTxRawResult(s.server.btcnet, txId, mtx, nil, 0, nil) - if err != nil { - return - } - verboseNtfn = btcws.NewAllVerboseTxNtfn(rawTx) - } - ntfnChan <- verboseNtfn - } else { - ntfnChan <- ntfn + // A select statement is used to stop rescans if the + // client requesting the rescan has disconnected. + select { + case <-wsc.quit: + rpcsLog.Debugf("Stopped rescan at height %v for disconnected client", + blk.Height()) + return nil, nil + default: + rescanBlock(wsc, cmd, blk) } } + + if maxBlock-minBlock > int64(len(hashList)) { + minBlock += int64(len(hashList)) + } else { + break + } } + + rpcsLog.Info("Finished rescan") + return nil, nil }