diff --git a/peer/peer.go b/peer/peer.go index 0d389ef2..d07efe9b 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -43,9 +43,9 @@ const ( // inventory cache. maxKnownInventory = 1000 - // pingTimeout is the duration since we last sent a message requiring a - // reply before we will ping a host. - pingTimeout = 2 * time.Minute + // pingInterval is the interval of time to wait in between sending ping + // messages. + pingInterval = 2 * time.Minute // negotiateTimeout is the duration of inactivity before we timeout a // peer that hasn't completed the initial version negotiation. @@ -1582,39 +1582,24 @@ cleanup: // goroutine. It uses a buffered channel to serialize output messages while // allowing the sender to continue running asynchronously. func (p *Peer) outHandler() { - pingTimer := time.AfterFunc(pingTimeout, func() { - nonce, err := wire.RandomUint64() - if err != nil { - log.Errorf("Not sending ping on timeout to %s: %v", - p, err) - return - } - p.QueueMessage(wire.NewMsgPing(nonce), nil) - }) + // pingTicker is used to periodically send pings to the remote peer. + pingTicker := time.NewTicker(pingInterval) + defer pingTicker.Stop() + out: for { select { case msg := <-p.sendQueue: - // Reset the ping timer for messages that expect a - // reply since we only want to send pings when we would - // otherwise not receive a reply from the peer. The - // getblocks and inv messages are specifically not - // counted here since there is no guarantee they will - // result in a reply. - reset := true switch m := msg.msg.(type) { case *wire.MsgVersion: - // Expects a verack message. Also set the flag - // which indicates the version has been sent. + // Set the flag which indicates the version has + // been sent. p.flagsMtx.Lock() p.versionSent = true p.flagsMtx.Unlock() - case *wire.MsgGetAddr: - // Expects an addr message. - case *wire.MsgPing: - // Expects a pong message in later protocol + // Only expects a pong message in later protocol // versions. Also set up statistics. if p.ProtocolVersion() > wire.BIP0031Version { p.statsMtx.Lock() @@ -1622,26 +1607,8 @@ out: p.lastPingTime = time.Now() p.statsMtx.Unlock() } - - case *wire.MsgMemPool: - // Expects an inv message. - - case *wire.MsgGetData: - // Expects a block, tx, or notfound message. - - case *wire.MsgGetHeaders: - // Expects a headers message. - - default: - // Not one of the above, no sure reply. - // We want to ping if nothing else - // interesting happens. - reset = false } - if reset { - pingTimer.Reset(pingTimeout) - } p.writeMessage(msg.msg) p.statsMtx.Lock() p.lastSend = time.Now() @@ -1651,13 +1618,19 @@ out: } p.sendDoneQueue <- struct{}{} + case <-pingTicker.C: + nonce, err := wire.RandomUint64() + if err != nil { + log.Errorf("Not sending ping to %s: %v", p, err) + continue + } + p.QueueMessage(wire.NewMsgPing(nonce), nil) + case <-p.quit: break out } } - pingTimer.Stop() - <-p.queueQuit // Drain any wait channels before we go away so we don't leave something