diff --git a/peer/peer.go b/peer/peer.go index d07efe9b..fecb0f15 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -27,11 +27,6 @@ const ( // MaxProtocolVersion is the max protocol version the peer supports. MaxProtocolVersion = 70011 - // BlockStallTimeout is the number of seconds we will wait for a - // "block" response after we send out a "getdata" for an announced - // block before we deem the peer inactive, and disconnect it. - BlockStallTimeout = 5 * time.Second - // outputBufferSize is the number of elements the output channels use. outputBufferSize = 50 @@ -54,6 +49,16 @@ const ( // idleTimeout is the duration of inactivity before we time out a peer. idleTimeout = 5 * time.Minute + // stallTickInterval is the interval of time between each check for + // stalled peers. + stallTickInterval = 15 * time.Second + + // stallResponseTimeout is the base maximum amount of time messages that + // expect a response will wait before disconnecting the peer for + // stalling. The deadlines are adjusted for callback running times and + // only checked on each stall tick interval. + stallResponseTimeout = 30 * time.Second + // trickleTimeout is the duration of the ticker which trickles down the // inventory to a peer. trickleTimeout = 10 * time.Second @@ -83,10 +88,10 @@ var ( // during peer initialization is ignored. Execution of multiple message // listeners occurs serially, so one callback blocks the excution of the next. // -// NOTE: Unless otherwise documented, these listeners must NOT directly call -// any blocking calls (such as WaitForShutdown) on the peer instance since the -// input handler goroutine blocks until the callback has completed. Doing so -// will result in a deadlock situation. +// NOTE: Unless otherwise documented, these listeners must NOT directly call any +// blocking calls (such as WaitForShutdown) on the peer instance since the input +// handler goroutine blocks until the callback has completed. Doing so will +// result in a deadlock. type MessageListeners struct { // OnGetAddr is invoked when a peer receives a getaddr bitcoin message. OnGetAddr func(p *Peer, msg *wire.MsgGetAddr) @@ -291,6 +296,32 @@ type outMsg struct { doneChan chan struct{} } +// stallControlCmd represents the command of a stall control message. +type stallControlCmd uint8 + +// Constants for the command of a stall control message. +const ( + // sccSendMessage indicates a message is being sent to the remote peer. + sccSendMessage stallControlCmd = iota + + // sccReceiveMessage indicates a message has been received from the + // remote peer. + sccReceiveMessage + + // sccHandlerStart indicates a callback handler is about to be invoked. + sccHandlerStart + + // sccHandlerStart indicates a callback handler has completed. + sccHandlerDone +) + +// stallControlMsg is used to signal the stall handler about specific events +// so it can properly detect and handle stalled remote peers. +type stallControlMsg struct { + command stallControlCmd + message wire.Message +} + // stats is the collection of stats related to a peer. type stats struct { statsMtx sync.RWMutex // protects all statistics below here. @@ -396,12 +427,16 @@ type Peer struct { prevGetHdrsMtx sync.Mutex prevGetHdrsBegin *wire.ShaHash prevGetHdrsStop *wire.ShaHash - outputQueue chan outMsg - sendQueue chan outMsg - sendDoneQueue chan struct{} - outputInvChan chan *wire.InvVect - queueQuit chan struct{} - quit chan struct{} + + stallControl chan stallControlMsg + outputQueue chan outMsg + sendQueue chan outMsg + sendDoneQueue chan struct{} + outputInvChan chan *wire.InvVect + inQuit chan struct{} + queueQuit chan struct{} + outQuit chan struct{} + quit chan struct{} stats } @@ -1226,6 +1261,194 @@ func (p *Peer) shouldHandleReadError(err error) bool { return true } +// maybeAddDeadline potentially adds a deadline for the appropriate expected +// response for the passed wire protocol command to the pending responses map. +func (p *Peer) maybeAddDeadline(pendingResponses map[string]time.Time, msgCmd string) { + // Setup a deadline for each message being sent that expects a response. + deadline := time.Now().Add(stallResponseTimeout) + switch msgCmd { + case wire.CmdVersion: + // Expects a verack message. + pendingResponses[wire.CmdVerAck] = deadline + + case wire.CmdGetAddr: + // Expects an addr message. + pendingResponses[wire.CmdAddr] = deadline + + case wire.CmdPing: + // Expects a pong message in later protocol versions. + if p.ProtocolVersion() > wire.BIP0031Version { + pendingResponses[wire.CmdPong] = deadline + } + + case wire.CmdMemPool: + // Expects an inv message. + pendingResponses[wire.CmdInv] = deadline + + case wire.CmdGetBlocks: + // Expects an inv message. + pendingResponses[wire.CmdInv] = deadline + + case wire.CmdGetData: + // Expects a block, tx, or notfound message. + pendingResponses[wire.CmdBlock] = deadline + pendingResponses[wire.CmdTx] = deadline + pendingResponses[wire.CmdNotFound] = deadline + + case wire.CmdGetHeaders: + // Expects a headers message. Use a longer deadline since it + // can take a while for the remote peer to load all of the + // headers. + deadline = time.Now().Add(stallResponseTimeout * 3) + pendingResponses[wire.CmdHeaders] = deadline + } +} + +// stallHandler handles stall detection for the peer. This entails keeping +// track of expected responses and assigning them deadlines while accounting for +// the time spent in callbacks. It must be run as a goroutine. +func (p *Peer) stallHandler() { + // These variables are used to adjust the deadline times forward by the + // time it takes callbacks to execute. This is done because new + // messages aren't read until the previous one is finished processing + // (which includes callbacks), so the deadline for receiving a response + // for a given message must account for the processing time as well. + var handlerActive bool + var handlersStartTime time.Time + var deadlineOffset time.Duration + + // pendingResponses tracks the expected response deadline times. + pendingResponses := make(map[string]time.Time) + + // stallTicker is used to periodically check pending responses that have + // exceeded the expected deadline and disconnect the peer due to + // stalling. + stallTicker := time.NewTicker(stallTickInterval) + defer stallTicker.Stop() + + // ioStopped is used to detect when both the input and output handler + // goroutines are done. + var ioStopped bool +out: + for { + select { + case msg := <-p.stallControl: + switch msg.command { + case sccSendMessage: + // Add a deadline for the expected response + // message if needed. + p.maybeAddDeadline(pendingResponses, + msg.message.Command()) + + case sccReceiveMessage: + // Remove received messages from the expected + // reponse map. Since certain commands expect + // one of a group of responses, remove everyting + // in the expected group accordingly. + switch msgCmd := msg.message.Command(); msgCmd { + case wire.CmdBlock: + fallthrough + case wire.CmdTx: + fallthrough + case wire.CmdNotFound: + delete(pendingResponses, wire.CmdBlock) + delete(pendingResponses, wire.CmdTx) + delete(pendingResponses, wire.CmdNotFound) + + default: + delete(pendingResponses, msgCmd) + } + + case sccHandlerStart: + // Warn on unbalanced callback signalling. + if handlerActive { + log.Warn("Received handler start " + + "control command while a " + + "handler is already active") + continue + } + + handlerActive = true + handlersStartTime = time.Now() + + case sccHandlerDone: + // Warn on unbalanced callback signalling. + if !handlerActive { + log.Warn("Received handler done " + + "control command when a " + + "handler is not already active") + continue + } + + // Extend active deadlines by the time it took + // to execute the callback. + duration := time.Now().Sub(handlersStartTime) + deadlineOffset += duration + handlerActive = false + + default: + log.Warnf("Unsupported message command %v", + msg.command) + } + + case <-stallTicker.C: + // Calculate the offset to apply to the deadline based + // on how long the handlers have taken to execute since + // the last tick. + now := time.Now() + offset := deadlineOffset + if handlerActive { + offset += now.Sub(handlersStartTime) + } + + // Disconnect the peer if any of the pending responses + // don't arrive by their adjusted deadline. + for command, deadline := range pendingResponses { + if now.Before(deadline.Add(offset)) { + continue + } + + log.Debugf("Peer %s appears to be stalled or "+ + "misbehaving, %s timeout -- "+ + "disconnecting", p, command) + p.Disconnect() + break + } + + // Reset the deadline offset for the next tick. + deadlineOffset = 0 + + case <-p.inQuit: + // The stall handler can exit once both the input and + // output handler goroutines are done. + if ioStopped { + break out + } + ioStopped = true + + case <-p.outQuit: + // The stall handler can exit once both the input and + // output handler goroutines are done. + if ioStopped { + break out + } + ioStopped = true + } + } + + // Drain any wait channels before going away so there is nothing left + // waiting on this goroutine. +cleanup: + for { + select { + case <-p.stallControl: + default: + break cleanup + } + } + log.Tracef("Peer stall handler done for %s", p) +} + // inHandler handles all incoming messages for the peer. It must be run as a // goroutine. func (p *Peer) inHandler() { @@ -1242,6 +1465,7 @@ func (p *Peer) inHandler() { } p.Disconnect() }) + out: for atomic.LoadInt32(&p.disconnect) == 0 { // Read a message and stop the idle timer as soon as the read @@ -1286,6 +1510,7 @@ out: p.statsMtx.Lock() p.lastRecv = time.Now() p.statsMtx.Unlock() + p.stallControl <- stallControlMsg{sccReceiveMessage, rmsg} // Ensure version message comes first. if vmsg, ok := rmsg.(*wire.MsgVersion); !ok && !p.VersionKnown() { @@ -1300,6 +1525,7 @@ out: } // Handle each supported message type. + p.stallControl <- stallControlMsg{sccHandlerStart, rmsg} switch msg := rmsg.(type) { case *wire.MsgVersion: p.handleVersionMsg(msg) @@ -1437,6 +1663,7 @@ out: log.Debugf("Received unhandled message of type %v:", rmsg.Command()) } + p.stallControl <- stallControlMsg{sccHandlerDone, rmsg} // A message was received so reset the idle timer. idleTimer.Reset(idleTimeout) @@ -1448,6 +1675,7 @@ out: // Ensure connection is closed. p.Disconnect() + close(p.inQuit) log.Tracef("Peer input handler done for %s", p) } @@ -1609,6 +1837,7 @@ out: } } + p.stallControl <- stallControlMsg{sccSendMessage, msg.msg} p.writeMessage(msg.msg) p.statsMtx.Lock() p.lastSend = time.Now() @@ -1649,6 +1878,7 @@ cleanup: break cleanup } } + close(p.outQuit) log.Tracef("Peer output handler done for %s", p) } @@ -1751,6 +1981,7 @@ func (p *Peer) Start() error { } // Start processing input and output. + go p.stallHandler() go p.inHandler() go p.queueHandler() go p.outHandler() @@ -1790,11 +2021,14 @@ func newPeerBase(cfg *Config, inbound bool) *Peer { p := Peer{ inbound: inbound, knownInventory: NewMruInventoryMap(maxKnownInventory), + stallControl: make(chan stallControlMsg, 1), // nonblocking sync outputQueue: make(chan outMsg, outputBufferSize), sendQueue: make(chan outMsg, 1), // nonblocking sync sendDoneQueue: make(chan struct{}, 1), // nonblocking sync outputInvChan: make(chan *wire.InvVect, outputBufferSize), + inQuit: make(chan struct{}), queueQuit: make(chan struct{}), + outQuit: make(chan struct{}), quit: make(chan struct{}), stats: stats{}, cfg: *cfg, // Copy so caller can't mutate.