mirror of
https://github.com/LBRYFoundation/lbcwallet.git
synced 2025-09-10 12:39:47 +00:00
Merge pull request #632 from wpaulino/bitcoind-block-zmq-read
chain: explicitly close connections in BitcoindConn's Stop method
This commit is contained in:
commit
79761f5121
3 changed files with 65 additions and 57 deletions
|
@ -3,6 +3,7 @@ package chain
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
@ -33,17 +34,13 @@ type BitcoindConn struct {
|
||||||
// client is the RPC client to the bitcoind node.
|
// client is the RPC client to the bitcoind node.
|
||||||
client *rpcclient.Client
|
client *rpcclient.Client
|
||||||
|
|
||||||
// zmqBlockHost is the host listening for ZMQ connections that will be
|
// zmqBlockConn is the ZMQ connection we'll use to read raw block
|
||||||
// responsible for delivering raw transaction events.
|
// events.
|
||||||
zmqBlockHost string
|
zmqBlockConn *gozmq.Conn
|
||||||
|
|
||||||
// zmqTxHost is the host listening for ZMQ connections that will be
|
// zmqTxConn is the ZMQ connection we'll use to read raw transaction
|
||||||
// responsible for delivering raw transaction events.
|
// events.
|
||||||
zmqTxHost string
|
zmqTxConn *gozmq.Conn
|
||||||
|
|
||||||
// zmqPollInterval is the interval at which we'll attempt to retrieve an
|
|
||||||
// event from the ZMQ connection.
|
|
||||||
zmqPollInterval time.Duration
|
|
||||||
|
|
||||||
// rescanClients is the set of active bitcoind rescan clients to which
|
// rescanClients is the set of active bitcoind rescan clients to which
|
||||||
// ZMQ event notfications will be sent to.
|
// ZMQ event notfications will be sent to.
|
||||||
|
@ -55,10 +52,9 @@ type BitcoindConn struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBitcoindConn creates a client connection to the node described by the host
|
// NewBitcoindConn creates a client connection to the node described by the host
|
||||||
// string. The connection is not established immediately, but must be done using
|
// string. The ZMQ connections are established immediately to ensure liveness.
|
||||||
// the Start method. If the remote node does not operate on the same bitcoin
|
// If the remote node does not operate on the same bitcoin network as described
|
||||||
// network as described by the passed chain parameters, the connection will be
|
// by the passed chain parameters, the connection will be disconnected.
|
||||||
// disconnected.
|
|
||||||
func NewBitcoindConn(chainParams *chaincfg.Params,
|
func NewBitcoindConn(chainParams *chaincfg.Params,
|
||||||
host, user, pass, zmqBlockHost, zmqTxHost string,
|
host, user, pass, zmqBlockHost, zmqTxHost string,
|
||||||
zmqPollInterval time.Duration) (*BitcoindConn, error) {
|
zmqPollInterval time.Duration) (*BitcoindConn, error) {
|
||||||
|
@ -78,12 +74,32 @@ func NewBitcoindConn(chainParams *chaincfg.Params,
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Establish two different ZMQ connections to bitcoind to retrieve block
|
||||||
|
// and transaction event notifications. We'll use two as a separation of
|
||||||
|
// concern to ensure one type of event isn't dropped from the connection
|
||||||
|
// queue due to another type of event filling it up.
|
||||||
|
zmqBlockConn, err := gozmq.Subscribe(
|
||||||
|
zmqBlockHost, []string{"rawblock"}, zmqPollInterval,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("unable to subscribe for zmq block "+
|
||||||
|
"events: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
zmqTxConn, err := gozmq.Subscribe(
|
||||||
|
zmqTxHost, []string{"rawtx"}, zmqPollInterval,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
zmqBlockConn.Close()
|
||||||
|
return nil, fmt.Errorf("unable to subscribe for zmq tx "+
|
||||||
|
"events: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
conn := &BitcoindConn{
|
conn := &BitcoindConn{
|
||||||
chainParams: chainParams,
|
chainParams: chainParams,
|
||||||
client: client,
|
client: client,
|
||||||
zmqBlockHost: zmqBlockHost,
|
zmqBlockConn: zmqBlockConn,
|
||||||
zmqTxHost: zmqTxHost,
|
zmqTxConn: zmqTxConn,
|
||||||
zmqPollInterval: zmqPollInterval,
|
|
||||||
rescanClients: make(map[uint64]*BitcoindClient),
|
rescanClients: make(map[uint64]*BitcoindClient),
|
||||||
quit: make(chan struct{}),
|
quit: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
@ -104,40 +120,16 @@ func (c *BitcoindConn) Start() error {
|
||||||
// Verify that the node is running on the expected network.
|
// Verify that the node is running on the expected network.
|
||||||
net, err := c.getCurrentNet()
|
net, err := c.getCurrentNet()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.client.Disconnect()
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if net != c.chainParams.Net {
|
if net != c.chainParams.Net {
|
||||||
c.client.Disconnect()
|
|
||||||
return fmt.Errorf("expected network %v, got %v",
|
return fmt.Errorf("expected network %v, got %v",
|
||||||
c.chainParams.Net, net)
|
c.chainParams.Net, net)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Establish two different ZMQ connections to bitcoind to retrieve block
|
|
||||||
// and transaction event notifications. We'll use two as a separation of
|
|
||||||
// concern to ensure one type of event isn't dropped from the connection
|
|
||||||
// queue due to another type of event filling it up.
|
|
||||||
zmqBlockConn, err := gozmq.Subscribe(
|
|
||||||
c.zmqBlockHost, []string{"rawblock"}, c.zmqPollInterval,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
c.client.Disconnect()
|
|
||||||
return fmt.Errorf("unable to subscribe for zmq block events: "+
|
|
||||||
"%v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
zmqTxConn, err := gozmq.Subscribe(
|
|
||||||
c.zmqTxHost, []string{"rawtx"}, c.zmqPollInterval,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
c.client.Disconnect()
|
|
||||||
return fmt.Errorf("unable to subscribe for zmq tx events: %v",
|
|
||||||
err)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.wg.Add(2)
|
c.wg.Add(2)
|
||||||
go c.blockEventHandler(zmqBlockConn)
|
go c.blockEventHandler()
|
||||||
go c.txEventHandler(zmqTxConn)
|
go c.txEventHandler()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -155,6 +147,8 @@ func (c *BitcoindConn) Stop() {
|
||||||
|
|
||||||
close(c.quit)
|
close(c.quit)
|
||||||
c.client.Shutdown()
|
c.client.Shutdown()
|
||||||
|
c.zmqBlockConn.Close()
|
||||||
|
c.zmqTxConn.Close()
|
||||||
|
|
||||||
c.client.WaitForShutdown()
|
c.client.WaitForShutdown()
|
||||||
c.wg.Wait()
|
c.wg.Wait()
|
||||||
|
@ -164,12 +158,11 @@ func (c *BitcoindConn) Stop() {
|
||||||
// forwards them along to the current rescan clients.
|
// forwards them along to the current rescan clients.
|
||||||
//
|
//
|
||||||
// NOTE: This must be run as a goroutine.
|
// NOTE: This must be run as a goroutine.
|
||||||
func (c *BitcoindConn) blockEventHandler(conn *gozmq.Conn) {
|
func (c *BitcoindConn) blockEventHandler() {
|
||||||
defer c.wg.Done()
|
defer c.wg.Done()
|
||||||
defer conn.Close()
|
|
||||||
|
|
||||||
log.Info("Started listening for bitcoind block notifications via ZMQ "+
|
log.Info("Started listening for bitcoind block notifications via ZMQ "+
|
||||||
"on", c.zmqBlockHost)
|
"on", c.zmqBlockConn.RemoteAddr())
|
||||||
|
|
||||||
for {
|
for {
|
||||||
// Before attempting to read from the ZMQ socket, we'll make
|
// Before attempting to read from the ZMQ socket, we'll make
|
||||||
|
@ -181,8 +174,14 @@ func (c *BitcoindConn) blockEventHandler(conn *gozmq.Conn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Poll an event from the ZMQ socket.
|
// Poll an event from the ZMQ socket.
|
||||||
msgBytes, err := conn.Receive()
|
msgBytes, err := c.zmqBlockConn.Receive()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// EOF should only be returned if the connection was
|
||||||
|
// explicitly closed, so we can exit at this point.
|
||||||
|
if err == io.EOF {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// It's possible that the connection to the socket
|
// It's possible that the connection to the socket
|
||||||
// continuously times out, so we'll prevent logging this
|
// continuously times out, so we'll prevent logging this
|
||||||
// error to prevent spamming the logs.
|
// error to prevent spamming the logs.
|
||||||
|
@ -240,12 +239,11 @@ func (c *BitcoindConn) blockEventHandler(conn *gozmq.Conn) {
|
||||||
// them along to the current rescan clients.
|
// them along to the current rescan clients.
|
||||||
//
|
//
|
||||||
// NOTE: This must be run as a goroutine.
|
// NOTE: This must be run as a goroutine.
|
||||||
func (c *BitcoindConn) txEventHandler(conn *gozmq.Conn) {
|
func (c *BitcoindConn) txEventHandler() {
|
||||||
defer c.wg.Done()
|
defer c.wg.Done()
|
||||||
defer conn.Close()
|
|
||||||
|
|
||||||
log.Info("Started listening for bitcoind transaction notifications "+
|
log.Info("Started listening for bitcoind transaction notifications "+
|
||||||
"via ZMQ on", c.zmqTxHost)
|
"via ZMQ on", c.zmqTxConn.RemoteAddr())
|
||||||
|
|
||||||
for {
|
for {
|
||||||
// Before attempting to read from the ZMQ socket, we'll make
|
// Before attempting to read from the ZMQ socket, we'll make
|
||||||
|
@ -257,8 +255,14 @@ func (c *BitcoindConn) txEventHandler(conn *gozmq.Conn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Poll an event from the ZMQ socket.
|
// Poll an event from the ZMQ socket.
|
||||||
msgBytes, err := conn.Receive()
|
msgBytes, err := c.zmqTxConn.Receive()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// EOF should only be returned if the connection was
|
||||||
|
// explicitly closed, so we can exit at this point.
|
||||||
|
if err == io.EOF {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// It's possible that the connection to the socket
|
// It's possible that the connection to the socket
|
||||||
// continuously times out, so we'll prevent logging this
|
// continuously times out, so we'll prevent logging this
|
||||||
// error to prevent spamming the logs.
|
// error to prevent spamming the logs.
|
||||||
|
|
3
go.mod
3
go.mod
|
@ -13,8 +13,9 @@ require (
|
||||||
github.com/jrick/logrotate v1.0.0
|
github.com/jrick/logrotate v1.0.0
|
||||||
github.com/kkdai/bstream v0.0.0-20181106074824-b3251f7901ec // indirect
|
github.com/kkdai/bstream v0.0.0-20181106074824-b3251f7901ec // indirect
|
||||||
github.com/kr/pretty v0.1.0 // indirect
|
github.com/kr/pretty v0.1.0 // indirect
|
||||||
github.com/lightninglabs/gozmq v0.0.0-20180324010646-462a8a753885
|
github.com/lightninglabs/gozmq v0.0.0-20190710231225-cea2a031735d
|
||||||
github.com/lightninglabs/neutrino v0.0.0-20190313035638-e1ad4c33fb18
|
github.com/lightninglabs/neutrino v0.0.0-20190313035638-e1ad4c33fb18
|
||||||
|
go.etcd.io/bbolt v1.3.3 // indirect
|
||||||
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67
|
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67
|
||||||
golang.org/x/net v0.0.0-20190206173232-65e2d4e15006
|
golang.org/x/net v0.0.0-20190206173232-65e2d4e15006
|
||||||
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 // indirect
|
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 // indirect
|
||||||
|
|
5
go.sum
5
go.sum
|
@ -66,8 +66,9 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
|
||||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||||
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
||||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||||
github.com/lightninglabs/gozmq v0.0.0-20180324010646-462a8a753885 h1:fTLuPUkaKIIV0+gA1IxiBDvDxtF8tzpSF6N6NfFGmsU=
|
|
||||||
github.com/lightninglabs/gozmq v0.0.0-20180324010646-462a8a753885/go.mod h1:KUh15naRlx/TmUMFS/p4JJrCrE6F7RGF7rsnvuu45E4=
|
github.com/lightninglabs/gozmq v0.0.0-20180324010646-462a8a753885/go.mod h1:KUh15naRlx/TmUMFS/p4JJrCrE6F7RGF7rsnvuu45E4=
|
||||||
|
github.com/lightninglabs/gozmq v0.0.0-20190710231225-cea2a031735d h1:tt8hwvxl6fksSfchjBGaWu+pnWJQfG1OWiCM20qOSAE=
|
||||||
|
github.com/lightninglabs/gozmq v0.0.0-20190710231225-cea2a031735d/go.mod h1:vxmQPeIQxPf6Jf9rM8R+B4rKBqLA2AjttNxkFBL2Plk=
|
||||||
github.com/lightninglabs/neutrino v0.0.0-20181017011010-4d6069299130 h1:6sZc23+5VbEz2uiHxW12xvS4JYZ3hhgkE5qxzwxaXzg=
|
github.com/lightninglabs/neutrino v0.0.0-20181017011010-4d6069299130 h1:6sZc23+5VbEz2uiHxW12xvS4JYZ3hhgkE5qxzwxaXzg=
|
||||||
github.com/lightninglabs/neutrino v0.0.0-20181017011010-4d6069299130/go.mod h1:KJq43Fu9ceitbJsSXMILcT4mGDNI/crKmPIkDOZXFyM=
|
github.com/lightninglabs/neutrino v0.0.0-20181017011010-4d6069299130/go.mod h1:KJq43Fu9ceitbJsSXMILcT4mGDNI/crKmPIkDOZXFyM=
|
||||||
github.com/lightninglabs/neutrino v0.0.0-20190213031021-ae4583a89cfb h1:Bwqgn9JXHo7I19lb4zTH2Xb0bfHgNuAJugQE7s00xqA=
|
github.com/lightninglabs/neutrino v0.0.0-20190213031021-ae4583a89cfb h1:Bwqgn9JXHo7I19lb4zTH2Xb0bfHgNuAJugQE7s00xqA=
|
||||||
|
@ -90,6 +91,8 @@ go.etcd.io/bbolt v1.3.0 h1:oY10fI923Q5pVCVt1GBTZMn8LHo5M+RCInFpeMnV4QI=
|
||||||
go.etcd.io/bbolt v1.3.0/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
|
go.etcd.io/bbolt v1.3.0/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
|
||||||
go.etcd.io/bbolt v1.3.2 h1:Z/90sZLPOeCy2PwprqkFa25PdkusRzaj9P8zm/KNyvk=
|
go.etcd.io/bbolt v1.3.2 h1:Z/90sZLPOeCy2PwprqkFa25PdkusRzaj9P8zm/KNyvk=
|
||||||
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
|
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
|
||||||
|
go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk=
|
||||||
|
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
|
||||||
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44 h1:9lP3x0pW80sDI6t1UMSLA4to18W7R7imwAI/sWS9S8Q=
|
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44 h1:9lP3x0pW80sDI6t1UMSLA4to18W7R7imwAI/sWS9S8Q=
|
||||||
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||||
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67 h1:ng3VDlRp5/DHpSWl02R4rM9I+8M2rhmsuLwAMmkLQWE=
|
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67 h1:ng3VDlRp5/DHpSWl02R4rM9I+8M2rhmsuLwAMmkLQWE=
|
||||||
|
|
Loading…
Add table
Reference in a new issue