diff --git a/mempool.go b/mempool.go index 83fe1905..3309ef2a 100644 --- a/mempool.go +++ b/mempool.go @@ -912,7 +912,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isOrphan *bool, isNe // Notify websocket clients about mempool transactions. if mp.server.rpcServer != nil { go func() { - mp.server.rpcServer.ntfnMgr.NotifyForTxOuts(tx, nil) + mp.server.rpcServer.ntfnMgr.NotifyForTx(tx, nil) if isNew { mp.server.rpcServer.ntfnMgr.NotifyForNewTx(tx) diff --git a/rpcwebsocket.go b/rpcwebsocket.go index 15c3b7cf..1b31e855 100644 --- a/rpcwebsocket.go +++ b/rpcwebsocket.go @@ -13,8 +13,8 @@ import ( "encoding/base64" "encoding/hex" "encoding/json" + "errors" "fmt" - "github.com/conformal/btcdb" "github.com/conformal/btcjson" "github.com/conformal/btcscript" "github.com/conformal/btcutil" @@ -312,16 +312,11 @@ func (m *wsNotificationManager) NotifyForNewTx(tx *btcutil.Tx) { } } -// AddSpentRequest requests an notification when the passed outpoint is -// confirmed spent (contained in a block connected to the main chain) for the -// passed websocket client. The request is automatically removed once the -// notification has been sent. +// addSpentRequest is the internal function which implements the public +// AddSpentRequest. See the comment for AddSpentRequest for more details. // -// This function is safe for concurrent access. -func (m *wsNotificationManager) AddSpentRequest(wsc *wsClient, op *btcwire.OutPoint) { - m.Lock() - defer m.Unlock() - +// This function MUST be called with the notification manager lock held. +func (m *wsNotificationManager) addSpentRequest(wsc *wsClient, op *btcwire.OutPoint) { // Track the request in the client as well so it can be quickly be // removed on disconnect. wsc.spentRequests[*op] = struct{}{} @@ -336,6 +331,19 @@ func (m *wsNotificationManager) AddSpentRequest(wsc *wsClient, op *btcwire.OutPo cmap[wsc.quit] = wsc } +// AddSpentRequest requests an notification when the passed outpoint is +// confirmed spent (contained in a block connected to the main chain) for the +// passed websocket client. The request is automatically removed once the +// notification has been sent. +// +// This function is safe for concurrent access. +func (m *wsNotificationManager) AddSpentRequest(wsc *wsClient, op *btcwire.OutPoint) { + m.Lock() + defer m.Unlock() + + m.addSpentRequest(wsc, op) +} + // removeSpentRequest is the internal function which implements the public // RemoveSpentRequest. See the comment for RemoveSpentRequest for more details. // @@ -372,8 +380,18 @@ func (m *wsNotificationManager) RemoveSpentRequest(wsc *wsClient, op *btcwire.Ou m.removeSpentRequest(wsc, op) } -// notifyForTxOuts is the internal function which implements the public -// NotifyForTxOuts. See the comment for NotifyForTxOuts for more details. +// txHexString returns the serialized transaction encoded in hexadecimal. +func txHexString(tx *btcutil.Tx) string { + var buf bytes.Buffer + // Ignore Serialize's error, as writing to a bytes.buffer cannot fail. + tx.MsgTx().Serialize(&buf) + return hex.EncodeToString(buf.Bytes()) +} + +// notifyForTxOuts examines each transaction output, notifying interested +// websocket clients of the transaction if an output spends to a watched +// address. A spent notification request is automatically registered for +// the client for each matching output. // // This function MUST be called with the notification manager lock held. func (m *wsNotificationManager) notifyForTxOuts(tx *btcutil.Tx, block *btcutil.Block) { @@ -382,9 +400,11 @@ func (m *wsNotificationManager) notifyForTxOuts(tx *btcutil.Tx, block *btcutil.B return } - for i, txout := range tx.MsgTx().TxOut { + txHex := "" + wscNotified := make(map[chan bool]bool) + for i, txOut := range tx.MsgTx().TxOut { _, addrs, _, err := btcscript.ExtractPkScriptAddrs( - txout.PkScript, m.server.server.btcnet) + txOut.PkScript, m.server.server.btcnet) if err != nil { continue } @@ -396,95 +416,85 @@ func (m *wsNotificationManager) notifyForTxOuts(tx *btcutil.Tx, block *btcutil.B continue } - ntfn := &btcws.ProcessedTxNtfn{ - Receiver: encodedAddr, - TxID: tx.Sha().String(), - TxOutIndex: uint32(i), - Amount: txout.Value, - PkScript: hex.EncodeToString(txout.PkScript), - // TODO(jrick): hardcoding unspent is WRONG and needs - // to be either calculated from other block txs, or dropped. - Spent: false, - } - - if block != nil { - blkhash, err := block.Sha() - if err != nil { - rpcsLog.Error("Error getting block sha; dropping Tx notification") - break - } - ntfn.BlockHeight = int32(block.Height()) - ntfn.BlockHash = blkhash.String() - ntfn.BlockIndex = tx.Index() - ntfn.BlockTime = block.MsgBlock().Header.Timestamp.Unix() - } else { - ntfn.BlockHeight = -1 - ntfn.BlockIndex = -1 + if txHex == "" { + txHex = txHexString(tx) } + ntfn := btcws.NewRecvTxNtfn(txHex, blockDetails(block, tx.Index())) marshalledJSON, err := json.Marshal(ntfn) if err != nil { rpcsLog.Errorf("Failed to marshal processedtx notification: %v", err) + continue } - for _, wsc := range cmap { - wsc.QueueNotification(marshalledJSON) + op := btcwire.NewOutPoint(tx.Sha(), uint32(i)) + for wscQuit, wsc := range cmap { + m.addSpentRequest(wsc, op) + + if !wscNotified[wscQuit] { + wscNotified[wscQuit] = true + wsc.QueueNotification(marshalledJSON) + } } } } } -// NotifyForTxOuts examines the outputs of the passed transaction and sends a -// notification to any websocket clients that are interested in an address the -// transaction pays to. +// NotifyForTx examines the inputs and outputs of the passed transaction, +// notifying websocket clients of outputs spending to a watched address +// and inputs spending a watched outpoint. // // This function is safe for concurrent access. -func (m *wsNotificationManager) NotifyForTxOuts(tx *btcutil.Tx, block *btcutil.Block) { +func (m *wsNotificationManager) NotifyForTx(tx *btcutil.Tx, block *btcutil.Block) { m.Lock() defer m.Unlock() + m.notifyForTxIns(tx, block) m.notifyForTxOuts(tx, block) } -// newSpentNotification returns a new marshalled spent notification with the -// passed parameters. -func newSpentNotification(prevOut *btcwire.OutPoint, spender *btcutil.Tx) []byte { - // Ignore Serialize's error, as writing to a bytes.buffer cannot fail. - var serializedTx bytes.Buffer - spender.MsgTx().Serialize(&serializedTx) - txHex := hex.EncodeToString(serializedTx.Bytes()) - - // Create and marsh the notification. - ntfn := btcws.NewTxSpentNtfn(prevOut.Hash.String(), int(prevOut.Index), - txHex) - marshalledJSON, err := json.Marshal(ntfn) - if err != nil { - rpcsLog.Errorf("Failed to marshal spent notification: %v", err) - return nil - } - return marshalledJSON +// newRedeemingTxNotification returns a new marshalled redeemingtx notification +// with the passed parameters. +func newRedeemingTxNotification(txHex string, index int, block *btcutil.Block) ([]byte, error) { + // Create and marshal the notification. + ntfn := btcws.NewRedeemingTxNtfn(txHex, blockDetails(block, index)) + return json.Marshal(ntfn) } -// notifySpent examines the inputs of the passed transaction and sends -// interested websocket clients a notification. +// notifyForTxIns examines the inputs of the passed transaction and sends +// interested websocket clients a redeemingtx notification if any inputs +// spend a watched output. If block is non-nil, any matching spent +// requests are removed. // // This function MUST be called with the notification manager lock held. -func (m *wsNotificationManager) notifySpent(tx *btcutil.Tx) { +func (m *wsNotificationManager) notifyForTxIns(tx *btcutil.Tx, block *btcutil.Block) { // Nothing to do if nobody is listening for spent notifications. if len(m.spentNotifications) == 0 { return } + txHex := "" + wscNotified := make(map[chan bool]bool) for _, txIn := range tx.MsgTx().TxIn { prevOut := &txIn.PreviousOutpoint if cmap, ok := m.spentNotifications[*prevOut]; ok { - marshalledJSON := newSpentNotification(prevOut, tx) - if marshalledJSON == nil { + if txHex == "" { + txHex = txHexString(tx) + } + marshalledJSON, err := newRedeemingTxNotification(txHex, tx.Index(), block) + if err != nil { + rpcsLog.Warnf("Failed to marshal redeemingtx notification: %v", err) continue } - for _, wsc := range cmap { - wsc.QueueNotification(marshalledJSON) - m.removeSpentRequest(wsc, prevOut) + for wscQuit, wsc := range cmap { + if block != nil { + m.removeSpentRequest(wsc, prevOut) + } + + if !wscNotified[wscQuit] { + wscNotified[wscQuit] = true + wsc.QueueNotification(marshalledJSON) + } } } } @@ -505,11 +515,24 @@ func (m *wsNotificationManager) NotifyBlockTXs(block *btcutil.Block) { } for _, tx := range block.Transactions() { - m.notifySpent(tx) + m.notifyForTxIns(tx, block) m.notifyForTxOuts(tx, block) } } +func blockDetails(block *btcutil.Block, txIndex int) *btcws.BlockDetails { + if block == nil { + return nil + } + blockSha, _ := block.Sha() // never errors + return &btcws.BlockDetails{ + Height: int32(block.Height()), + Hash: blockSha.String(), + Index: txIndex, + Time: block.MsgBlock().Header.Timestamp.Unix(), + } +} + // AddAddrRequest requests notifications to the passed websocket client when // a transaction pays to the passed address. // @@ -1086,20 +1109,29 @@ func (c *wsClient) SendMessage(marshalledJSON []byte, doneChan chan bool) { c.sendChan <- wsResponse{msg: marshalledJSON, doneChan: doneChan} } +// ErrClientQuit describes the error where a client send is not processed due +// to the client having already been disconnected or dropped. +var ErrClientQuit = errors.New("client quit") + // QueueMessage queues the passed notification to be sent to the websocket // client. This function, as the name implies, is only intended for // notifications since it has additional logic to prevent other subsystems, such // as the memory pool and block manager, from blocking even when the send // channel is full. -func (c *wsClient) QueueNotification(marshalledJSON []byte) { +// +// If the client is in the process of shutting down, this function returns +// ErrClientQuit. This is intended to be checked by long-running notification +// handlers to stop processing if there is no more work needed to be done. +func (c *wsClient) QueueNotification(marshalledJSON []byte) error { // Don't queue the message if in the process of shutting down. select { case <-c.quit: - return + return ErrClientQuit default: } c.ntfnChan <- marshalledJSON + return nil } // Disconnect disconnects the websocket client. @@ -1239,73 +1271,82 @@ func handleNotifyNewTXs(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson. // rescanBlock rescans all transactions in a single block. This is a helper // function for handleRescan. -func rescanBlock(wsc *wsClient, cmd *btcws.RescanCmd, blk *btcutil.Block) { - db := wsc.server.server.db +func rescanBlock(wsc *wsClient, cmd *btcws.RescanCmd, blk *btcutil.Block, + unspent map[btcwire.OutPoint]struct{}) { for _, tx := range blk.Transactions() { - var txReply *btcdb.TxListReply - txouts: - for txOutIdx, txout := range tx.MsgTx().TxOut { - _, addrs, _, err := btcscript.ExtractPkScriptAddrs( - txout.PkScript, wsc.server.server.btcnet) - if err != nil { - continue txouts + // Hexadecimal representation of this tx. Only created if + // needed, and reused for later notifications if already made. + var txHex string + + // All inputs and outputs must be iterated through to correctly + // modify the unspent map, however, just a single notification + // for any matching transaction inputs or outputs should be + // created and sent. + spentNotified := false + recvNotified := false + + for _, txin := range tx.MsgTx().TxIn { + if _, ok := unspent[txin.PreviousOutpoint]; ok { + delete(unspent, txin.PreviousOutpoint) + + if spentNotified { + continue + } + + if txHex == "" { + txHex = txHexString(tx) + } + marshalledJSON, err := newRedeemingTxNotification(txHex, tx.Index(), blk) + if err != nil { + rpcsLog.Errorf("Failed to marshal redeemingtx notification: %v", err) + continue + } + + err = wsc.QueueNotification(marshalledJSON) + // Stop the rescan early if the websocket client + // disconnected. + if err == ErrClientQuit { + return + } + spentNotified = true } + } + + for txOutIdx, txout := range tx.MsgTx().TxOut { + _, addrs, _, _ := btcscript.ExtractPkScriptAddrs( + txout.PkScript, wsc.server.server.btcnet) for _, addr := range addrs { encodedAddr := addr.EncodeAddress() if _, ok := cmd.Addresses[encodedAddr]; !ok { continue } - // TODO(jrick): This lookup is expensive and can be avoided - // if the wallet is sent the previous outpoints for all inputs - // of the tx, so any can removed from the utxo set (since - // they are, as of this tx, now spent). - if txReply == nil { - txReplyList, err := db.FetchTxBySha(tx.Sha()) - if err != nil { - rpcsLog.Errorf("Tx Sha %v not found by db", tx.Sha()) - continue txouts - } - for i := range txReplyList { - if txReplyList[i].Height == blk.Height() { - txReply = txReplyList[i] - break - } - } + unspent[*btcwire.NewOutPoint(tx.Sha(), uint32(txOutIdx))] = struct{}{} + + if recvNotified { + continue } - // Sha never errors. - blksha, _ := blk.Sha() - - ntfn := &btcws.ProcessedTxNtfn{ - Receiver: encodedAddr, - Amount: txout.Value, - TxID: tx.Sha().String(), - TxOutIndex: uint32(txOutIdx), - PkScript: hex.EncodeToString(txout.PkScript), - BlockHash: blksha.String(), - BlockHeight: int32(blk.Height()), - BlockIndex: tx.Index(), - BlockTime: blk.MsgBlock().Header.Timestamp.Unix(), - Spent: txReply.TxSpent[txOutIdx], + if txHex == "" { + txHex = txHexString(tx) } + ntfn := btcws.NewRecvTxNtfn(txHex, blockDetails(blk, tx.Index())) + marshalledJSON, err := json.Marshal(ntfn) if err != nil { - rpcsLog.Errorf("Failed to marshal processedtx notification: %v", err) + rpcsLog.Errorf("Failed to marshal recvtx notification: %v", err) return } + err = wsc.QueueNotification(marshalledJSON) // Stop the rescan early if the websocket client // disconnected. - select { - case <-wsc.quit: + if err == ErrClientQuit { return - - default: - wsc.SendMessage(marshalledJSON, nil) } + recvNotified = true } } } @@ -1328,6 +1369,7 @@ func handleRescan(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson.Error) minBlock := int64(cmd.BeginBlock) maxBlock := int64(cmd.EndBlock) + unspent := make(map[btcwire.OutPoint]struct{}) // FetchHeightRange may not return a complete list of block shas for // the given range, so fetch range as many times as necessary. @@ -1357,7 +1399,7 @@ func handleRescan(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson.Error) blk.Height()) return nil, nil default: - rescanBlock(wsc, cmd, blk) + rescanBlock(wsc, cmd, blk, unspent) } }