diff --git a/chain/bitcoind_conn.go b/chain/bitcoind_conn.go index 71474c5..2ed1192 100644 --- a/chain/bitcoind_conn.go +++ b/chain/bitcoind_conn.go @@ -192,15 +192,14 @@ func NewBitcoindConn(cfg *BitcoindConfig) (*BitcoindConn, error) { if chainInfo.Pruned { prunedBlockDispatcher, err = NewPrunedBlockDispatcher( &PrunedBlockDispatcherConfig{ - ChainParams: cfg.ChainParams, - NumTargetPeers: cfg.PrunedModeMaxPeers, - Dial: cfg.Dialer, - GetPeers: client.GetPeerInfo, - PeerReadyTimeout: defaultPeerReadyTimeout, - RefreshPeersTicker: ticker.New( - defaultRefreshPeersInterval, - ), - MaxRequestInvs: wire.MaxInvPerMsg, + ChainParams: cfg.ChainParams, + NumTargetPeers: cfg.PrunedModeMaxPeers, + Dial: cfg.Dialer, + GetPeers: client.GetPeerInfo, + GetNodeAddresses: client.GetNodeAddresses, + PeerReadyTimeout: defaultPeerReadyTimeout, + RefreshPeersTicker: ticker.New(defaultRefreshPeersInterval), + MaxRequestInvs: wire.MaxInvPerMsg, }, ) if err != nil { diff --git a/chain/pruned_block_dispatcher.go b/chain/pruned_block_dispatcher.go index 1631e2e..aec8d02 100644 --- a/chain/pruned_block_dispatcher.go +++ b/chain/pruned_block_dispatcher.go @@ -99,6 +99,11 @@ type PrunedBlockDispatcherConfig struct { // GetPeers retrieves the active set of peers known to the backend node. GetPeers func() ([]btcjson.GetPeerInfoResult, error) + // GetNodeAddresses returns random reachable addresses known to the + // backend node. An optional number of addresses to return can be + // provided, otherwise 8 are returned by default. + GetNodeAddresses func(*int32) ([]btcjson.GetNodeAddressesResult, error) + // PeerReadyTimeout is the amount of time we'll wait for a query peer to // be ready to receive incoming block requests. Peers cannot respond to // requests until the version exchange is completed upon connection @@ -256,7 +261,7 @@ func (d *PrunedBlockDispatcher) pollPeers() { // If we do, attempt to establish connections until // we've reached our target number. if err := d.connectToPeers(); err != nil { - log.Warnf("Unable to establish peer "+ + log.Warnf("Failed to establish peer "+ "connections: %v", err) continue } @@ -277,90 +282,150 @@ func (d *PrunedBlockDispatcher) connectToPeers() error { if err != nil { return err } - peers, err = filterPeers(peers) + addrs, err := filterPeers(peers) if err != nil { return err } - rand.Shuffle(len(peers), func(i, j int) { - peers[i], peers[j] = peers[j], peers[i] + rand.Shuffle(len(addrs), func(i, j int) { + addrs[i], addrs[j] = addrs[j], addrs[i] }) - // For each unbanned peer we don't already have a connection to, try to - // establish one, and if successful, notify the peer. - for _, peer := range peers { - d.peerMtx.Lock() - _, isBanned := d.bannedPeers[peer.Addr] - _, isConnected := d.currentPeers[peer.Addr] - d.peerMtx.Unlock() - if isBanned || isConnected { - continue - } - - queryPeer, err := d.newQueryPeer(peer) + for _, addr := range addrs { + needMore, err := d.connectToPeer(addr) if err != nil { - return fmt.Errorf("unable to configure query peer %v: "+ - "%v", peer.Addr, err) - } - if err := d.connectToPeer(queryPeer); err != nil { - log.Debugf("Failed connecting to peer %v: %v", - peer.Addr, err) + log.Debugf("Failed connecting to peer %v: %v", addr, err) continue } - - select { - case d.peersConnected <- queryPeer: - case <-d.quit: - return errors.New("shutting down") + if !needMore { + return nil } + } - // If the new peer helped us reach our target number, we're done - // and can exit. - d.peerMtx.Lock() - d.currentPeers[queryPeer.Addr()] = queryPeer.Peer - numPeers := len(d.currentPeers) - d.peerMtx.Unlock() - if numPeers == d.cfg.NumTargetPeers { - break + // We still need more addresses so we'll also invoke the + // `getnodeaddresses` RPC to receive random reachable addresses. We'll + // also filter out any that do not meet our requirements. The nil + // argument will return a default number of addresses, which is + // currently 8. We don't care how many addresses are returned as long as + // 1 is returned, since this will be polled regularly if needed. + nodeAddrs, err := d.cfg.GetNodeAddresses(nil) + if err != nil { + return err + } + addrs = filterNodeAddrs(nodeAddrs) + for _, addr := range addrs { + if _, err := d.connectToPeer(addr); err != nil { + log.Debugf("Failed connecting to peer %v: %v", addr, err) } } return nil } +// connectToPeer attempts to establish a connection to the given peer and waits +// up to PeerReadyTimeout for the version exchange to complete so that we can +// begin sending it our queries. +func (d *PrunedBlockDispatcher) connectToPeer(addr string) (bool, error) { + // Prevent connections to peers we've already connected to or we've + // banned. + d.peerMtx.Lock() + _, isBanned := d.bannedPeers[addr] + _, isConnected := d.currentPeers[addr] + d.peerMtx.Unlock() + if isBanned || isConnected { + return true, nil + } + + peer, err := d.newQueryPeer(addr) + if err != nil { + return true, fmt.Errorf("unable to configure query peer %v: "+ + "%v", addr, err) + } + + // Establish the connection and wait for the protocol negotiation to + // complete. + conn, err := d.cfg.Dial(addr) + if err != nil { + return true, err + } + peer.AssociateConnection(conn) + + select { + case <-peer.ready: + case <-time.After(d.cfg.PeerReadyTimeout): + peer.Disconnect() + return true, errors.New("timed out waiting for protocol negotiation") + case <-d.quit: + return false, errors.New("shutting down") + } + + // Remove the peer once it has disconnected. + peer.signalUponDisconnect(func() { + d.peerMtx.Lock() + delete(d.currentPeers, peer.Addr()) + d.peerMtx.Unlock() + }) + + d.peerMtx.Lock() + d.currentPeers[addr] = peer.Peer + numPeers := len(d.currentPeers) + d.peerMtx.Unlock() + + // Notify the new peer connection to our workManager. + select { + case d.peersConnected <- peer: + case <-d.quit: + return false, errors.New("shutting down") + } + + // Request more peer connections if we haven't reached our target number + // with the new peer. + return numPeers < d.cfg.NumTargetPeers, nil +} + // filterPeers filters out any peers which cannot handle arbitrary witness block // requests, i.e., any peer which is not considered a segwit-enabled // "full-node". -func filterPeers(peers []btcjson.GetPeerInfoResult) ( - []btcjson.GetPeerInfoResult, error) { - - var eligible []btcjson.GetPeerInfoResult +func filterPeers(peers []btcjson.GetPeerInfoResult) ([]string, error) { + var eligible []string for _, peer := range peers { rawServices, err := hex.DecodeString(peer.Services) if err != nil { return nil, err } services := wire.ServiceFlag(binary.BigEndian.Uint64(rawServices)) - - // Skip nodes that cannot serve full block witness data. - if services&requiredServices != requiredServices { + if !satisfiesRequiredServices(services) { continue } - // Skip pruned nodes. - if services&prunedNodeService == prunedNodeService { - continue - } - - eligible = append(eligible, peer) + eligible = append(eligible, peer.Addr) } - return eligible, nil } +// filterNodeAddrs filters out any peers which cannot handle arbitrary witness +// block requests, i.e., any peer which is not considered a segwit-enabled +// "full-node". +func filterNodeAddrs(nodeAddrs []btcjson.GetNodeAddressesResult) []string { + var eligible []string + for _, nodeAddr := range nodeAddrs { + services := wire.ServiceFlag(nodeAddr.Services) + if !satisfiesRequiredServices(services) { + continue + } + eligible = append(eligible, nodeAddr.Address) + } + return eligible +} + +// satisfiesRequiredServices determines whether the services signaled by a peer +// satisfy our requirements for retrieving pruned blocks from them. +func satisfiesRequiredServices(services wire.ServiceFlag) bool { + return services&requiredServices == requiredServices && + services&prunedNodeService != prunedNodeService +} + // newQueryPeer creates a new peer instance configured to relay any received // messages to the internal workManager. -func (d *PrunedBlockDispatcher) newQueryPeer( - peerInfo btcjson.GetPeerInfoResult) (*queryPeer, error) { - +func (d *PrunedBlockDispatcher) newQueryPeer(addr string) (*queryPeer, error) { ready := make(chan struct{}) msgsRecvd := make(chan wire.Message) @@ -409,7 +474,7 @@ func (d *PrunedBlockDispatcher) newQueryPeer( }, AllowSelfConns: true, } - p, err := peer.NewOutboundPeer(cfg, peerInfo.Addr) + p, err := peer.NewOutboundPeer(cfg, addr) if err != nil { return nil, err } @@ -422,35 +487,6 @@ func (d *PrunedBlockDispatcher) newQueryPeer( }, nil } -// connectToPeer attempts to establish a connection to the given peer and waits -// up to PeerReadyTimeout for the version exchange to complete so that we can -// begin sending it our queries. -func (d *PrunedBlockDispatcher) connectToPeer(peer *queryPeer) error { - conn, err := d.cfg.Dial(peer.Addr()) - if err != nil { - return err - } - peer.AssociateConnection(conn) - - select { - case <-peer.ready: - case <-time.After(d.cfg.PeerReadyTimeout): - peer.Disconnect() - return errors.New("timed out waiting for protocol negotiation") - case <-d.quit: - return errors.New("shutting down") - } - - // Remove the peer once it has disconnected. - peer.signalUponDisconnect(func() { - d.peerMtx.Lock() - delete(d.currentPeers, peer.Addr()) - d.peerMtx.Unlock() - }) - - return nil -} - // banPeer bans a peer by disconnecting them and ensuring we don't reconnect. func (d *PrunedBlockDispatcher) banPeer(peer string) { d.peerMtx.Lock() diff --git a/chain/pruned_block_dispatcher_test.go b/chain/pruned_block_dispatcher_test.go index 06cd7c8..8b319d7 100644 --- a/chain/pruned_block_dispatcher_test.go +++ b/chain/pruned_block_dispatcher_test.go @@ -5,7 +5,6 @@ import ( "encoding/hex" "fmt" "net" - "os" "sync" "sync/atomic" "testing" @@ -16,18 +15,10 @@ import ( "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/peer" "github.com/btcsuite/btcd/wire" - "github.com/btcsuite/btclog" "github.com/lightningnetwork/lnd/ticker" "github.com/stretchr/testify/require" ) -func init() { - b := btclog.NewBackend(os.Stdout) - l := b.Logger("") - l.SetLevel(btclog.LevelTrace) - UseLogger(l) -} - var ( addrCounter int32 // Increased atomically. @@ -49,12 +40,13 @@ type prunedBlockDispatcherHarness struct { hashes []*chainhash.Hash blocks map[chainhash.Hash]*wire.MsgBlock - peerMtx sync.Mutex - peers map[string]*peer.Peer - localConns map[string]net.Conn // Connections to peers. - remoteConns map[string]net.Conn // Connections from peers. + peerMtx sync.Mutex + peers map[string]*peer.Peer + fallbackAddrs map[string]*peer.Peer + localConns map[string]net.Conn // Connections to peers. + remoteConns map[string]net.Conn // Connections from peers. - dialedPeer chan struct{} + dialedPeer chan string queriedPeer chan struct{} blocksQueried map[chainhash.Hash]int @@ -68,22 +60,25 @@ func newNetworkBlockTestHarness(t *testing.T, numBlocks, h := &prunedBlockDispatcherHarness{ t: t, + dispatcher: &PrunedBlockDispatcher{}, peers: make(map[string]*peer.Peer, numPeers), + fallbackAddrs: make(map[string]*peer.Peer, numPeers), localConns: make(map[string]net.Conn, numPeers), remoteConns: make(map[string]net.Conn, numPeers), - dialedPeer: make(chan struct{}), + dialedPeer: make(chan string), queriedPeer: make(chan struct{}), blocksQueried: make(map[chainhash.Hash]int), + shouldReply: 0, } h.hashes, h.blocks = genBlockChain(numBlocks) for i := uint32(0); i < numPeers; i++ { - h.addPeer() + h.addPeer(false) } dial := func(addr string) (net.Conn, error) { go func() { - h.dialedPeer <- struct{}{} + h.dialedPeer <- addr }() h.peerMtx.Lock() @@ -98,7 +93,12 @@ func newNetworkBlockTestHarness(t *testing.T, numBlocks, return nil, fmt.Errorf("remote conn %v not found", addr) } - h.peers[addr].AssociateConnection(remoteConn) + if p, ok := h.peers[addr]; ok { + p.AssociateConnection(remoteConn) + } + if p, ok := h.fallbackAddrs[addr]; ok { + p.AssociateConnection(remoteConn) + } return localConn, nil } @@ -126,6 +126,22 @@ func newNetworkBlockTestHarness(t *testing.T, numBlocks, return res, nil }, + GetNodeAddresses: func(*int32) ([]btcjson.GetNodeAddressesResult, error) { + h.peerMtx.Lock() + defer h.peerMtx.Unlock() + + res := make( + []btcjson.GetNodeAddressesResult, 0, + len(h.fallbackAddrs), + ) + for addr, peer := range h.fallbackAddrs { + res = append(res, btcjson.GetNodeAddressesResult{ + Services: uint64(peer.Services()), + Address: addr, + }) + } + return res, nil + }, PeerReadyTimeout: time.Hour, RefreshPeersTicker: ticker.NewForce(time.Hour), AllowSelfPeerConns: true, @@ -175,20 +191,24 @@ func (h *prunedBlockDispatcherHarness) stop() { // addPeer adds a new random peer available for use by the // PrunedBlockDispatcher. -func (h *prunedBlockDispatcherHarness) addPeer() string { +func (h *prunedBlockDispatcherHarness) addPeer(fallback bool) string { addr := nextAddr() h.peerMtx.Lock() defer h.peerMtx.Unlock() - h.resetPeer(addr) + h.resetPeer(addr, fallback) return addr } // resetPeer resets the internal peer connection state allowing the // PrunedBlockDispatcher to establish a mock connection to it. -func (h *prunedBlockDispatcherHarness) resetPeer(addr string) { - h.peers[addr] = h.newPeer() +func (h *prunedBlockDispatcherHarness) resetPeer(addr string, fallback bool) { + if fallback { + h.fallbackAddrs[addr] = h.newPeer() + } else { + h.peers[addr] = h.newPeer() + } // Establish a mock connection between us and each peer. inConn, outConn := pipe( @@ -280,7 +300,7 @@ func (h *prunedBlockDispatcherHarness) refreshPeers() { } // disconnectPeer simulates a peer disconnecting from the PrunedBlockDispatcher. -func (h *prunedBlockDispatcherHarness) disconnectPeer(addr string) { +func (h *prunedBlockDispatcherHarness) disconnectPeer(addr string, fallback bool) { h.t.Helper() h.peerMtx.Lock() @@ -303,7 +323,7 @@ func (h *prunedBlockDispatcherHarness) disconnectPeer(addr string) { }, time.Second, 200*time.Millisecond) // Reset the peer connection state to allow connections to them again. - h.resetPeer(addr) + h.resetPeer(addr, fallback) } // assertPeerDialed asserts that a connection was made to the given peer. @@ -317,6 +337,18 @@ func (h *prunedBlockDispatcherHarness) assertPeerDialed() { } } +// assertPeerDialedWithAddr asserts that a connection was made to the given peer. +func (h *prunedBlockDispatcherHarness) assertPeerDialedWithAddr(addr string) { + h.t.Helper() + + select { + case dialedAddr := <-h.dialedPeer: + require.Equal(h.t, addr, dialedAddr) + case <-time.After(5 * time.Second): + h.t.Fatalf("expected peer to be dialed") + } +} + // assertPeerQueried asserts that query was sent to the given peer. func (h *prunedBlockDispatcherHarness) assertPeerQueried() { h.t.Helper() @@ -494,7 +526,7 @@ func TestPrunedBlockDispatcherMultipleQueryPeers(t *testing.T) { // We should see one query per block. for i := 0; i < numBlocks; i++ { h.assertPeerQueried() - h.assertPeerReplied(blockChans[i], errChans[i], i == numBlocks-1) + h.assertPeerReplied(blockChans[i], errChans[i], true) } } @@ -523,13 +555,13 @@ func TestPrunedBlockDispatcherPeerPoller(t *testing.T) { // We'll disable replies for now, as we'll want to test the disconnect // case. h.disablePeerReplies() - peer := h.addPeer() + peer := h.addPeer(false) h.refreshPeers() - h.assertPeerDialed() + h.assertPeerDialedWithAddr(peer) h.assertPeerQueried() // Disconnect our peer and re-enable replies. - h.disconnectPeer(peer) + h.disconnectPeer(peer, false) h.enablePeerReplies() h.assertNoReply(blockChan, errChan) @@ -539,11 +571,11 @@ func TestPrunedBlockDispatcherPeerPoller(t *testing.T) { h.assertPeerDialed() h.assertPeerQueried() - // Refresh our peers again. We can afford to have one more query peer, - // but there isn't another one available. We also shouldn't dial the one - // we're currently connected to again. + // Add a fallback addresses and force refresh our peers again. We can + // afford to have one more query peer, so a connection should be made. + fallbackPeer := h.addPeer(true) h.refreshPeers() - h.assertNoPeerDialed() + h.assertPeerDialedWithAddr(fallbackPeer) // Now that we know we've connected to the peer, we should be able to // receive their response. @@ -578,7 +610,7 @@ func TestPrunedBlockDispatcherInvalidBlock(t *testing.T) { // Signal to our peers to send valid replies and add a new peer. h.enablePeerReplies() - _ = h.addPeer() + _ = h.addPeer(false) // Force a refresh, which should cause our new peer to be dialed and // queried. We expect them to send a valid block and fulfill our