From 2091ac0936f41dc1c68616fdc6631af5b16457b6 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Mon, 25 Jun 2018 17:24:23 -0700 Subject: [PATCH] chain: use ConcurrentQueue within BitcoindClient to handle event notifications --- chain/bitcoind_client.go | 131 +++++++-------------------------------- 1 file changed, 22 insertions(+), 109 deletions(-) diff --git a/chain/bitcoind_client.go b/chain/bitcoind_client.go index 0e10ec1..4876883 100644 --- a/chain/bitcoind_client.go +++ b/chain/bitcoind_client.go @@ -32,9 +32,8 @@ type BitcoindClient struct { zmqConnect string zmqPollInterval time.Duration - enqueueNotification chan interface{} - dequeueNotification chan interface{} - currentBlock chan *waddrmgr.BlockStamp + notificationQueue *ConcurrentQueue + currentBlock chan *waddrmgr.BlockStamp clientMtx sync.RWMutex rescanUpdate chan interface{} @@ -73,19 +72,17 @@ func NewBitcoindClient(chainParams *chaincfg.Params, connect, user, pass, DisableTLS: true, HTTPPostMode: true, }, - chainParams: chainParams, - zmqConnect: zmqConnect, - zmqPollInterval: zmqPollInterval, - enqueueNotification: make(chan interface{}), - dequeueNotification: make(chan interface{}), - currentBlock: make(chan *waddrmgr.BlockStamp), - rescanUpdate: make(chan interface{}), - watchOutPoints: make(map[wire.OutPoint]struct{}), - watchAddrs: make(map[string]struct{}), - watchTxIDs: make(map[chainhash.Hash]struct{}), - quit: make(chan struct{}), - memPool: make(map[chainhash.Hash]struct{}), - memPoolExp: make(map[int32]map[chainhash.Hash]struct{}), + chainParams: chainParams, + zmqConnect: zmqConnect, + zmqPollInterval: zmqPollInterval, + currentBlock: make(chan *waddrmgr.BlockStamp), + rescanUpdate: make(chan interface{}), + watchOutPoints: make(map[wire.OutPoint]struct{}), + watchAddrs: make(map[string]struct{}), + watchTxIDs: make(map[chainhash.Hash]struct{}), + quit: make(chan struct{}), + memPool: make(map[chainhash.Hash]struct{}), + memPoolExp: make(map[int32]map[chainhash.Hash]struct{}), } rpcClient, err := rpcclient.New(client.connConfig, nil) if err != nil { @@ -389,7 +386,6 @@ func (c *BitcoindClient) Start() error { c.quitMtx.Unlock() c.wg.Add(2) - go c.handler() go c.socketHandler(zmqClient) return nil } @@ -403,10 +399,7 @@ func (c *BitcoindClient) Stop() { default: close(c.quit) c.client.Shutdown() - - if !c.started { - close(c.dequeueNotification) - } + c.notificationQueue.Stop() } c.quitMtx.Unlock() } @@ -423,7 +416,7 @@ func (c *BitcoindClient) WaitForShutdown() { // may abort for running out memory, as unread notifications are queued for // later reads. func (c *BitcoindClient) Notifications() <-chan interface{} { - return c.dequeueNotification + return c.notificationQueue.ChanOut() } // SetStartTime is a non-interface method to set the birthday of the wallet @@ -452,7 +445,7 @@ func (c *BitcoindClient) BlockStamp() (*waddrmgr.BlockStamp, error) { func (c *BitcoindClient) onClientConnect() { select { - case c.enqueueNotification <- ClientConnected{}: + case c.notificationQueue.ChanIn() <- ClientConnected{}: case <-c.quit: } } @@ -460,7 +453,7 @@ func (c *BitcoindClient) onClientConnect() { func (c *BitcoindClient) onBlockConnected(hash *chainhash.Hash, height int32, time time.Time) { if c.notifying() { select { - case c.enqueueNotification <- BlockConnected{ + case c.notificationQueue.ChanIn() <- BlockConnected{ Block: wtxmgr.Block{ Hash: *hash, Height: height, @@ -476,7 +469,7 @@ func (c *BitcoindClient) onFilteredBlockConnected(height int32, header *wire.BlockHeader, relevantTxs []*wtxmgr.TxRecord) { if c.notifying() { select { - case c.enqueueNotification <- FilteredBlockConnected{ + case c.notificationQueue.ChanIn() <- FilteredBlockConnected{ Block: &wtxmgr.BlockMeta{ Block: wtxmgr.Block{ Hash: header.BlockHash(), @@ -494,7 +487,7 @@ func (c *BitcoindClient) onFilteredBlockConnected(height int32, func (c *BitcoindClient) onBlockDisconnected(hash *chainhash.Hash, height int32, time time.Time) { if c.notifying() { select { - case c.enqueueNotification <- BlockDisconnected{ + case c.notificationQueue.ChanIn() <- BlockDisconnected{ Block: wtxmgr.Block{ Hash: *hash, Height: height, @@ -516,14 +509,14 @@ func (c *BitcoindClient) onRelevantTx(rec *wtxmgr.TxRecord, } select { - case c.enqueueNotification <- RelevantTx{rec, blk}: + case c.notificationQueue.ChanIn() <- RelevantTx{rec, blk}: case <-c.quit: } } func (c *BitcoindClient) onRescanProgress(hash *chainhash.Hash, height int32, blkTime time.Time) { select { - case c.enqueueNotification <- &RescanProgress{hash, height, blkTime}: + case c.notificationQueue.ChanIn() <- &RescanProgress{hash, height, blkTime}: case <-c.quit: } } @@ -531,7 +524,7 @@ func (c *BitcoindClient) onRescanProgress(hash *chainhash.Hash, height int32, bl func (c *BitcoindClient) onRescanFinished(hash *chainhash.Hash, height int32, blkTime time.Time) { log.Infof("Rescan finished at %d (%s)", height, hash) select { - case c.enqueueNotification <- &RescanFinished{hash, height, blkTime}: + case c.notificationQueue.ChanIn() <- &RescanFinished{hash, height, blkTime}: case <-c.quit: } @@ -1177,83 +1170,3 @@ func (c *BitcoindClient) filterTx(tx *wire.MsgTx, return notifyTx, rec, nil } - -// handler maintains a queue of notifications and the current state (best -// block) of the chain. -func (c *BitcoindClient) handler() { - hash, height, err := c.GetBestBlock() - if err != nil { - log.Errorf("Failed to receive best block from chain server: %v", err) - c.Stop() - c.wg.Done() - return - } - - bs := &waddrmgr.BlockStamp{Hash: *hash, Height: height} - - // TODO: Rather than leaving this as an unbounded queue for all types of - // notifications, try dropping ones where a later enqueued notification - // can fully invalidate one waiting to be processed. For example, - // blockconnected notifications for greater block heights can remove the - // need to process earlier blockconnected notifications still waiting - // here. - - // TODO(aakselrod): Factor this logic out so it can be reused for each - // chain back end, rather than copying it. - - var notifications []interface{} - enqueue := c.enqueueNotification - var dequeue chan interface{} - var next interface{} -out: - for { - select { - case n, ok := <-enqueue: - if !ok { - // If no notifications are queued for handling, - // the queue is finished. - if len(notifications) == 0 { - break out - } - // nil channel so no more reads can occur. - enqueue = nil - continue - } - if len(notifications) == 0 { - next = n - dequeue = c.dequeueNotification - } - notifications = append(notifications, n) - - case dequeue <- next: - if n, ok := next.(BlockConnected); ok { - bs = &waddrmgr.BlockStamp{ - Height: n.Height, - Hash: n.Hash, - } - } - - notifications[0] = nil - notifications = notifications[1:] - if len(notifications) != 0 { - next = notifications[0] - } else { - // If no more notifications can be enqueued, the - // queue is finished. - if enqueue == nil { - break out - } - dequeue = nil - } - - case c.currentBlock <- bs: - - case <-c.quit: - break out - } - } - - c.Stop() - close(c.dequeueNotification) - c.wg.Done() -}