diff --git a/infrastructure.go b/infrastructure.go index af0af9d8..fd007113 100644 --- a/infrastructure.go +++ b/infrastructure.go @@ -14,6 +14,7 @@ import ( "errors" "fmt" "github.com/conformal/btcjson" + "github.com/conformal/btcws" "github.com/conformal/go-socks" "github.com/gorilla/websocket" "net" @@ -127,8 +128,9 @@ type Client struct { requestMap map[uint64]*list.Element requestList *list.List - // Notification handlers. + // Notifications. ntfnHandlers *NotificationHandlers + ntfnState *notificationState // Networking infrastructure. sendChan chan []byte @@ -193,6 +195,42 @@ func (c *Client) removeAllRequests() { c.requestList.Init() } +// trackRegisteredNtfns examines the passed command to see if it is one of +// the notification commands and updates the notification state that is used +// to automatically re-establish registered notifications on reconnects. +func (c *Client) trackRegisteredNtfns(cmd btcjson.Cmd) { + // Nothing to do if the caller is not interested in notifications. + if c.ntfnHandlers == nil { + return + } + + c.ntfnState.Lock() + defer c.ntfnState.Unlock() + + switch bcmd := cmd.(type) { + case *btcws.NotifyBlocksCmd: + c.ntfnState.notifyBlocks = true + + case *btcws.NotifyNewTransactionsCmd: + if bcmd.Verbose { + c.ntfnState.notifyNewTxVerbose = true + } else { + c.ntfnState.notifyNewTx = true + + } + + case *btcws.NotifySpentCmd: + for _, op := range bcmd.OutPoints { + c.ntfnState.notifySpent[op] = struct{}{} + } + + case *btcws.NotifyReceivedCmd: + for _, addr := range bcmd.Addresses { + c.ntfnState.notifyReceived[addr] = struct{}{} + } + } +} + // handleMessage is the main handler for incoming requests. It enforces // authentication, parses the incoming json, looks up and executes handlers // (including pass through for standard RPC commands), sends the appropriate @@ -259,6 +297,13 @@ func (c *Client) handleMessage(msg []byte) { request.responseChan <- &futureResult{reply: nil, err: err} return } + + // Since the command was successful, examine it to see if it's a + // notification, and if is, add it to the notification state so it + // can automatically be re-established on reconnect. + c.trackRegisteredNtfns(request.cmd) + + // Deliver the reply. request.responseChan <- &futureResult{reply: &reply, err: nil} } @@ -341,10 +386,86 @@ func (c *Client) sendMessage(marshalledJSON []byte) { c.sendChan <- marshalledJSON } +// reregisterNtfns creates and sends commands needed to re-establish the current +// notification state associated with the client. It should only be called on +// on reconnect by the resendCmds function. +func (c *Client) reregisterNtfns() error { + // Nothing to do if the caller is not interested in notifications. + if c.ntfnHandlers == nil { + return nil + } + + // In order to avoid holding the lock on the notification state for the + // entire time of the potentially long running RPCs issued below, make a + // copy of it and work from that. + // + // Also, other commands will be running concurrently which could modify + // the notification state (while not under the lock of course) which + // also register it with the remote RPC server, so this prevents double + // registrations. + stateCopy := c.ntfnState.Copy() + + // Reregister notifyblocks if needed. + if stateCopy.notifyBlocks { + log.Debugf("Reregistering [notifyblocks]") + if err := c.NotifyBlocks(); err != nil { + return err + } + } + + // Reregister notifynewtransactions if needed. + if stateCopy.notifyNewTx || stateCopy.notifyNewTxVerbose { + log.Debugf("Reregistering [notifynewtransactions] (verbose=%v)", + stateCopy.notifyNewTxVerbose) + err := c.NotifyNewTransactions(stateCopy.notifyNewTxVerbose) + if err != nil { + return err + } + } + + // Reregister the combination of all previously registered notifyspent + // outpoints in one command if needed. + nslen := len(stateCopy.notifySpent) + if nslen > 0 { + outpoints := make([]btcws.OutPoint, 0, nslen) + for op := range stateCopy.notifySpent { + outpoints = append(outpoints, op) + } + log.Debugf("Reregistering [notifyspent] outpoints: %v", outpoints) + if err := c.notifySpentInternal(outpoints).Receive(); err != nil { + return err + } + } + + // Reregister the combination of all previously registered + // notifyreceived addresses in one command if needed. + nrlen := len(stateCopy.notifyReceived) + if nrlen > 0 { + addresses := make([]string, 0, nrlen) + for addr := range stateCopy.notifyReceived { + addresses = append(addresses, addr) + } + log.Debugf("Reregistering [notifyreceived] addresses: %v", addresses) + if err := c.notifyReceivedInternal(addresses).Receive(); err != nil { + return err + } + } + + return nil +} + // resendCmds resends any commands that had not completed when the client -// disconnected. It is intended to be called once the client has reconnected -// as a separate goroutine. +// disconnected. It is intended to be called once the client has reconnected as +// a separate goroutine. func (c *Client) resendCmds() { + // Set the notification state back up. If anything goes wrong, + // disconnect the client. + if err := c.reregisterNtfns(); err != nil { + log.Warnf("Unable to re-establish notification state: %v", err) + c.Disconnect() + return + } + // Since it's possible to block on send and more commands might be // added by the caller while resending, make a copy of all of the // commands that need to be resent now and work from the copy. This @@ -928,6 +1049,7 @@ func New(config *ConnConfig, ntfnHandlers *NotificationHandlers) (*Client, error requestMap: make(map[uint64]*list.Element), requestList: list.New(), ntfnHandlers: ntfnHandlers, + ntfnState: newNotificationState(), sendChan: make(chan []byte, sendBufferSize), sendPostChan: make(chan *sendPostDetails, sendPostBufferSize), disconnect: make(chan struct{}), diff --git a/notify.go b/notify.go index a3fe3b6f..fa112e7d 100644 --- a/notify.go +++ b/notify.go @@ -12,6 +12,7 @@ import ( "github.com/conformal/btcutil" "github.com/conformal/btcwire" "github.com/conformal/btcws" + "sync" ) var ( @@ -23,6 +24,46 @@ var ( "supported when running in HTTP POST mode") ) +// notificationState is used to track the current state of successfuly +// registered notification so the state can be automatically re-established on +// reconnect. +type notificationState struct { + sync.Mutex + notifyBlocks bool + notifyNewTx bool + notifyNewTxVerbose bool + notifyReceived map[string]struct{} + notifySpent map[btcws.OutPoint]struct{} +} + +// Copy returns a deep copy of the receiver. +// +// This function is safe for concurrent access. +func (s *notificationState) Copy() *notificationState { + s.Lock() + defer s.Unlock() + + stateCopy := *s + stateCopy.notifyReceived = make(map[string]struct{}) + for addr := range s.notifyReceived { + stateCopy.notifyReceived[addr] = struct{}{} + } + stateCopy.notifySpent = make(map[btcws.OutPoint]struct{}) + for op := range s.notifySpent { + stateCopy.notifySpent[op] = struct{}{} + } + + return &stateCopy +} + +// newNotificationState returns a new notification state ready to be populated. +func newNotificationState() *notificationState { + return ¬ificationState{ + notifyReceived: make(map[string]struct{}), + notifySpent: make(map[btcws.OutPoint]struct{}), + } +} + // newNilFutureResult returns a new future result channel that already has the // result waiting on the channel with the reply set to nil. This is useful // to ignore things such as notifications when the caller didn't specify any @@ -371,6 +412,27 @@ func (r FutureNotifySpentResult) Receive() error { return nil } +// notifySpentInternal is the same as notifySpentAsync except it accepts +// the converted outpoints as a parameter so the client can more efficiently +// recreate the previous notification state on reconnect. +func (c *Client) notifySpentInternal(outpoints []btcws.OutPoint) FutureNotifySpentResult { + // Not supported in HTTP POST mode. + if c.config.HttpPostMode { + return newFutureError(ErrNotificationsNotSupported) + } + + // Ignore the notification if the client is not interested in + // notifications. + if c.ntfnHandlers == nil { + return newNilFutureResult() + } + + id := c.NextID() + cmd := btcws.NewNotifySpentCmd(id, outpoints) + + return c.sendCmd(cmd) +} + // NotifySpentAsync returns an instance of a type that can be used to get the // result of the RPC at some future time by invoking the Receive function on // the returned instance. @@ -487,6 +549,28 @@ func (r FutureNotifyReceivedResult) Receive() error { return nil } +// notifyReceivedInternal is the same as notifyReceivedAsync except it accepts +// the converted addresses as a parameter so the client can more efficiently +// recreate the previous notification state on reconnect. +func (c *Client) notifyReceivedInternal(addresses []string) FutureNotifyReceivedResult { + // Not supported in HTTP POST mode. + if c.config.HttpPostMode { + return newFutureError(ErrNotificationsNotSupported) + } + + // Ignore the notification if the client is not interested in + // notifications. + if c.ntfnHandlers == nil { + return newNilFutureResult() + } + + // Convert addresses to strings. + id := c.NextID() + cmd := btcws.NewNotifyReceivedCmd(id, addresses) + + return c.sendCmd(cmd) +} + // NotifyReceivedAsync returns an instance of a type that can be used to get the // result of the RPC at some future time by invoking the Receive function on // the returned instance.