peer: Consolidate Connect, Disconnect, Start, Shutdown public methods.

This commit does not change functionality. It makes the creation of inbound and outbound peers more homogeneous. As a result the Start method of peer was removed as it was found not to be necessary. This is the first of several pull requests/commits designed to make the peer public API and internals less complex.
This commit is contained in:
Jonathan Gillham 2016-02-03 14:24:28 +00:00
parent ae00fff14a
commit 73d353247c
5 changed files with 69 additions and 88 deletions

View file

@ -1,4 +1,4 @@
// Copyright (c) 2015 The btcsuite developers // Copyright (c) 2015-2016 The btcsuite developers
// Use of this source code is governed by an ISC // Use of this source code is governed by an ISC
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
@ -66,15 +66,12 @@ This provides high flexibility for things such as connecting via proxies, acting
as a proxy, creating bridge peers, choosing whether to listen for inbound peers, as a proxy, creating bridge peers, choosing whether to listen for inbound peers,
etc. etc.
For outgoing peers, the NewOutboundPeer function must be used to specify the NewOutboundPeer and NewInboundPeer functions must be followed by calling Connect
configuration followed by invoking Connect with the net.Conn instance. This with a net.Conn instance to the peer. This will start all async I/O goroutines
will start all async I/O goroutines and initiate the initial negotiation and initiate the protocol negotiation process. Once finished with the peer call
process. Once that has been completed, the peer is fully functional. Disconnect to disconnect from the peer and clean up all resources.
WaitForDisconnect can be used to block until peer disconnection and resource
For inbound peers, the NewInboundPeer function must be used to specify the cleanup has completed.
configuration and net.Conn instance followed by invoking Start. This will start
all async I/O goroutines and listen for the initial negotiation process. Once
that has been completed, the peer is fully functional.
Callbacks Callbacks

View file

@ -1,4 +1,4 @@
// Copyright (c) 2015 The btcsuite developers // Copyright (c) 2015-2016 The btcsuite developers
// Use of this source code is governed by an ISC // Use of this source code is governed by an ISC
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
@ -38,9 +38,9 @@ func mockRemotePeer() error {
} }
// Create and start the inbound peer. // Create and start the inbound peer.
p := peer.NewInboundPeer(peerCfg, conn) p := peer.NewInboundPeer(peerCfg)
if err := p.Start(); err != nil { if err := p.Connect(conn); err != nil {
fmt.Printf("Start: error %v\n", err) fmt.Printf("Connect: error %v\n", err)
return return
} }
}() }()
@ -105,8 +105,9 @@ func Example_newOutboundPeer() {
fmt.Printf("Example_peerConnection: verack timeout") fmt.Printf("Example_peerConnection: verack timeout")
} }
// Shutdown the peer. // Disconnect the peer.
p.Shutdown() p.Disconnect()
p.WaitForDisconnect()
// Output: // Output:
// outbound: received version // outbound: received version

View file

@ -387,8 +387,7 @@ type HostToNetAddrFunc func(host string, port uint16,
// of specific types that typically require common special handling are // of specific types that typically require common special handling are
// provided as a convenience. // provided as a convenience.
type Peer struct { type Peer struct {
// The following variables must only be used atomically // The following variables must only be used atomically.
started int32
connected int32 connected int32
disconnect int32 disconnect int32
bytesReceived uint64 bytesReceived uint64
@ -1943,11 +1942,14 @@ func (p *Peer) Connect(conn net.Conn) error {
return nil return nil
} }
if p.inbound {
p.addr = conn.RemoteAddr().String()
}
p.conn = conn p.conn = conn
p.timeConnected = time.Now() p.timeConnected = time.Now()
atomic.AddInt32(&p.connected, 1) atomic.AddInt32(&p.connected, 1)
return p.Start() return p.start()
} }
// Connected returns whether or not the peer is currently connected. // Connected returns whether or not the peer is currently connected.
@ -1975,18 +1977,12 @@ func (p *Peer) Disconnect() {
// Start begins processing input and output messages. It also sends the initial // Start begins processing input and output messages. It also sends the initial
// version message for outbound connections to start the negotiation process. // version message for outbound connections to start the negotiation process.
func (p *Peer) Start() error { func (p *Peer) start() error {
// Already started?
if atomic.AddInt32(&p.started, 1) != 1 {
return nil
}
log.Tracef("Starting peer %s", p) log.Tracef("Starting peer %s", p)
// Send an initial version message if this is an outbound connection. // Send an initial version message if this is an outbound connection.
if !p.inbound { if !p.inbound {
err := p.pushVersionMsg() if err := p.pushVersionMsg(); err != nil {
if err != nil {
log.Errorf("Can't send outbound version message %v", err) log.Errorf("Can't send outbound version message %v", err)
p.Disconnect() p.Disconnect()
return err return err
@ -2002,16 +1998,11 @@ func (p *Peer) Start() error {
return nil return nil
} }
// Shutdown gracefully shuts down the peer by disconnecting it. // WaitForDisconnect waits until the peer has completely disconnected and all
func (p *Peer) Shutdown() { // resources are cleaned up. This will happen if either the local or remote
log.Tracef("Shutdown peer %s", p) // side has been disconnected or the peer is forcibly disconnected via
p.Disconnect() // Disconnect.
} func (p *Peer) WaitForDisconnect() {
// WaitForShutdown waits until the peer has completely shutdown. This will
// happen if either the local or remote side has been disconnected or the peer
// is forcibly shutdown via Shutdown.
func (p *Peer) WaitForShutdown() {
<-p.quit <-p.quit
} }
@ -2052,13 +2043,8 @@ func newPeerBase(cfg *Config, inbound bool) *Peer {
// NewInboundPeer returns a new inbound bitcoin peer. Use Start to begin // NewInboundPeer returns a new inbound bitcoin peer. Use Start to begin
// processing incoming and outgoing messages. // processing incoming and outgoing messages.
func NewInboundPeer(cfg *Config, conn net.Conn) *Peer { func NewInboundPeer(cfg *Config) *Peer {
p := newPeerBase(cfg, true) return newPeerBase(cfg, true)
p.conn = conn
p.addr = conn.RemoteAddr().String()
p.timeConnected = time.Now()
atomic.AddInt32(&p.connected, 1)
return p
} }
// NewOutboundPeer returns a new outbound bitcoin peer. // NewOutboundPeer returns a new outbound bitcoin peer.

View file

@ -1,4 +1,4 @@
// Copyright (c) 2015 The btcsuite developers // Copyright (c) 2015-2016 The btcsuite developers
// Use of this source code is governed by an ISC // Use of this source code is governed by an ISC
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
@ -244,9 +244,8 @@ func TestPeerConnection(t *testing.T) {
&conn{raddr: "10.0.0.1:8333"}, &conn{raddr: "10.0.0.1:8333"},
&conn{raddr: "10.0.0.2:8333"}, &conn{raddr: "10.0.0.2:8333"},
) )
inPeer := peer.NewInboundPeer(peerCfg, inConn) inPeer := peer.NewInboundPeer(peerCfg)
err := inPeer.Start() if err := inPeer.Connect(inConn); err != nil {
if err != nil {
return nil, nil, err return nil, nil, err
} }
outPeer, err := peer.NewOutboundPeer(peerCfg, "10.0.0.2:8333") outPeer, err := peer.NewOutboundPeer(peerCfg, "10.0.0.2:8333")
@ -256,6 +255,7 @@ func TestPeerConnection(t *testing.T) {
if err := outPeer.Connect(outConn); err != nil { if err := outPeer.Connect(outConn); err != nil {
return nil, nil, err return nil, nil, err
} }
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
select { select {
case <-verack: case <-verack:
@ -273,9 +273,8 @@ func TestPeerConnection(t *testing.T) {
&conn{raddr: "10.0.0.1:8333", proxy: true}, &conn{raddr: "10.0.0.1:8333", proxy: true},
&conn{raddr: "10.0.0.2:8333"}, &conn{raddr: "10.0.0.2:8333"},
) )
inPeer := peer.NewInboundPeer(peerCfg, inConn) inPeer := peer.NewInboundPeer(peerCfg)
err := inPeer.Start() if err := inPeer.Connect(inConn); err != nil {
if err != nil {
return nil, nil, err return nil, nil, err
} }
outPeer, err := peer.NewOutboundPeer(peerCfg, "10.0.0.2:8333") outPeer, err := peer.NewOutboundPeer(peerCfg, "10.0.0.2:8333")
@ -306,8 +305,8 @@ func TestPeerConnection(t *testing.T) {
testPeer(t, inPeer, wantStats) testPeer(t, inPeer, wantStats)
testPeer(t, outPeer, wantStats) testPeer(t, outPeer, wantStats)
inPeer.Shutdown() inPeer.Disconnect()
outPeer.Shutdown() outPeer.Disconnect()
} }
} }
@ -390,9 +389,8 @@ func TestPeerListeners(t *testing.T) {
&conn{raddr: "10.0.0.1:8333"}, &conn{raddr: "10.0.0.1:8333"},
&conn{raddr: "10.0.0.2:8333"}, &conn{raddr: "10.0.0.2:8333"},
) )
inPeer := peer.NewInboundPeer(peerCfg, inConn) inPeer := peer.NewInboundPeer(peerCfg)
err := inPeer.Start() if err := inPeer.Connect(inConn); err != nil {
if err != nil {
t.Errorf("TestPeerListeners: unexpected err %v\n", err) t.Errorf("TestPeerListeners: unexpected err %v\n", err)
return return
} }
@ -513,8 +511,8 @@ func TestPeerListeners(t *testing.T) {
return return
} }
} }
inPeer.Shutdown() inPeer.Disconnect()
outPeer.Shutdown() outPeer.Disconnect()
} }
// TestOutboundPeer tests that the outbound peer works as expected. // TestOutboundPeer tests that the outbound peer works as expected.
@ -542,22 +540,17 @@ func TestOutboundPeer(t *testing.T) {
return return
} }
// Test Connect err
wantErr := errBlockNotFound wantErr := errBlockNotFound
if err := p.Connect(c); err != wantErr { if err := p.Connect(c); err != wantErr {
t.Errorf("Connect: expected err %v, got %v\n", wantErr, err) t.Errorf("Connect: expected err %v, got %v\n", wantErr, err)
return return
} }
// Test already connected
// Test already connected.
if err := p.Connect(c); err != nil { if err := p.Connect(c); err != nil {
t.Errorf("Connect: unexpected err %v\n", err) t.Errorf("Connect: unexpected err %v\n", err)
return return
} }
// Test already started
if err := p.Start(); err != nil {
t.Errorf("Start: unexpected err %v\n", err)
return
}
// Test Queue Inv // Test Queue Inv
fakeBlockHash := &wire.ShaHash{0x00, 0x01} fakeBlockHash := &wire.ShaHash{0x00, 0x01}
@ -572,7 +565,7 @@ func TestOutboundPeer(t *testing.T) {
done := make(chan struct{}) done := make(chan struct{})
p.QueueMessage(fakeMsg, done) p.QueueMessage(fakeMsg, done)
<-done <-done
p.Shutdown() p.Disconnect()
// Test NewestBlock // Test NewestBlock
var newestBlock = func() (*wire.ShaHash, int32, error) { var newestBlock = func() (*wire.ShaHash, int32, error) {
@ -612,7 +605,7 @@ func TestOutboundPeer(t *testing.T) {
// Test Queue Inv after connection // Test Queue Inv after connection
p1.QueueInventory(fakeInv) p1.QueueInventory(fakeInv)
p1.Shutdown() p1.Disconnect()
// Test regression // Test regression
peerCfg.ChainParams = &chaincfg.RegressionNetParams peerCfg.ChainParams = &chaincfg.RegressionNetParams
@ -657,7 +650,7 @@ func TestOutboundPeer(t *testing.T) {
p2.QueueMessage(wire.NewMsgGetData(), nil) p2.QueueMessage(wire.NewMsgGetData(), nil)
p2.QueueMessage(wire.NewMsgGetHeaders(), nil) p2.QueueMessage(wire.NewMsgGetHeaders(), nil)
p2.Shutdown() p2.Disconnect()
} }
func init() { func init() {

View file

@ -1038,9 +1038,8 @@ func (s *server) handleAddPeerMsg(state *peerState, sp *serverPeer) bool {
// Ignore new peers if we're shutting down. // Ignore new peers if we're shutting down.
if atomic.LoadInt32(&s.shutdown) != 0 { if atomic.LoadInt32(&s.shutdown) != 0 {
srvrLog.Infof("New peer %s ignored - server is shutting "+ srvrLog.Infof("New peer %s ignored - server is shutting down", sp)
"down", sp) sp.Disconnect()
sp.Shutdown()
return false return false
} }
@ -1048,14 +1047,14 @@ func (s *server) handleAddPeerMsg(state *peerState, sp *serverPeer) bool {
host, _, err := net.SplitHostPort(sp.Addr()) host, _, err := net.SplitHostPort(sp.Addr())
if err != nil { if err != nil {
srvrLog.Debugf("can't split hostport %v", err) srvrLog.Debugf("can't split hostport %v", err)
sp.Shutdown() sp.Disconnect()
return false return false
} }
if banEnd, ok := state.banned[host]; ok { if banEnd, ok := state.banned[host]; ok {
if time.Now().Before(banEnd) { if time.Now().Before(banEnd) {
srvrLog.Debugf("Peer %s is banned for another %v - "+ srvrLog.Debugf("Peer %s is banned for another %v - disconnecting",
"disconnecting", host, banEnd.Sub(time.Now())) host, banEnd.Sub(time.Now()))
sp.Shutdown() sp.Disconnect()
return false return false
} }
@ -1070,16 +1069,16 @@ func (s *server) handleAddPeerMsg(state *peerState, sp *serverPeer) bool {
if state.OutboundCount() >= state.maxOutboundPeers { if state.OutboundCount() >= state.maxOutboundPeers {
srvrLog.Infof("Max outbound peers reached [%d] - disconnecting "+ srvrLog.Infof("Max outbound peers reached [%d] - disconnecting "+
"peer %s", state.maxOutboundPeers, sp) "peer %s", state.maxOutboundPeers, sp)
sp.Shutdown() sp.Disconnect()
return false return false
} }
} }
// Limit max number of total peers. // Limit max number of total peers.
if state.Count() >= cfg.MaxPeers { if state.Count() >= cfg.MaxPeers {
srvrLog.Infof("Max peers reached [%d] - disconnecting "+ srvrLog.Infof("Max peers reached [%d] - disconnecting peer %s",
"peer %s", cfg.MaxPeers, sp) cfg.MaxPeers, sp)
sp.Shutdown() sp.Disconnect()
// TODO(oga) how to handle permanent peers here? // TODO(oga) how to handle permanent peers here?
// they should be rescheduled. // they should be rescheduled.
return false return false
@ -1415,15 +1414,19 @@ func (s *server) listenHandler(listener net.Listener) {
if err != nil { if err != nil {
// Only log the error if we're not forcibly shutting down. // Only log the error if we're not forcibly shutting down.
if atomic.LoadInt32(&s.shutdown) == 0 { if atomic.LoadInt32(&s.shutdown) == 0 {
srvrLog.Errorf("can't accept connection: %v", srvrLog.Errorf("Can't accept connection: %v", err)
err)
} }
continue continue
} }
sp := newServerPeer(s, false) sp := newServerPeer(s, false)
sp.Peer = peer.NewInboundPeer(newPeerConfig(sp), conn) sp.Peer = peer.NewInboundPeer(newPeerConfig(sp))
sp.Start()
go s.peerDoneHandler(sp) go s.peerDoneHandler(sp)
if err := sp.Connect(conn); err != nil {
if atomic.LoadInt32(&s.shutdown) == 0 {
srvrLog.Errorf("Can't accept connection: %v", err)
}
continue
}
} }
s.wg.Done() s.wg.Done()
srvrLog.Tracef("Listener handler done for %s", listener.Addr()) srvrLog.Tracef("Listener handler done for %s", listener.Addr())
@ -1502,7 +1505,7 @@ func (s *server) peerConnHandler(sp *serverPeer) {
// peerDoneHandler handles peer disconnects by notifiying the server that it's // peerDoneHandler handles peer disconnects by notifiying the server that it's
// done. // done.
func (s *server) peerDoneHandler(sp *serverPeer) { func (s *server) peerDoneHandler(sp *serverPeer) {
sp.WaitForShutdown() sp.WaitForDisconnect()
s.donePeers <- sp s.donePeers <- sp
// Only tell block manager we are gone if we ever told it we existed. // Only tell block manager we are gone if we ever told it we existed.
@ -1639,11 +1642,11 @@ out:
case qmsg := <-s.query: case qmsg := <-s.query:
s.handleQuery(state, qmsg) s.handleQuery(state, qmsg)
// Shutdown the peer handler.
case <-s.quit: case <-s.quit:
// Shutdown peers. // Disconnect all peers on server shutdown.
state.forAllPeers(func(sp *serverPeer) { state.forAllPeers(func(sp *serverPeer) {
sp.Shutdown() srvrLog.Tracef("Shutdown peer %s", sp.Peer)
sp.Disconnect()
}) })
break out break out
} }
@ -1660,7 +1663,8 @@ out:
if !state.NeedMoreOutbound() || len(cfg.ConnectPeers) > 0 || if !state.NeedMoreOutbound() || len(cfg.ConnectPeers) > 0 ||
atomic.LoadInt32(&s.shutdown) != 0 { atomic.LoadInt32(&s.shutdown) != 0 {
state.forPendingPeers(func(sp *serverPeer) { state.forPendingPeers(func(sp *serverPeer) {
sp.Shutdown() srvrLog.Tracef("Shutdown peer %s", sp.Peer)
sp.Disconnect()
}) })
continue continue
} }