mirror of
https://github.com/LBRYFoundation/lbcd.git
synced 2025-08-23 17:47:24 +00:00
peer: Ping on interval instead of delayed timer.
This commit modifies the ping logic in the peer to ping on an interval regardless of what other messages are being sent versus the previous method of delaying the ping each time a message that is expected to receive data is sent. This helps improve the ping statistics and simplifies its logic.
This commit is contained in:
parent
250228c32f
commit
f1bd2f8d6e
1 changed files with 18 additions and 45 deletions
63
peer/peer.go
63
peer/peer.go
|
@ -43,9 +43,9 @@ const (
|
||||||
// inventory cache.
|
// inventory cache.
|
||||||
maxKnownInventory = 1000
|
maxKnownInventory = 1000
|
||||||
|
|
||||||
// pingTimeout is the duration since we last sent a message requiring a
|
// pingInterval is the interval of time to wait in between sending ping
|
||||||
// reply before we will ping a host.
|
// messages.
|
||||||
pingTimeout = 2 * time.Minute
|
pingInterval = 2 * time.Minute
|
||||||
|
|
||||||
// negotiateTimeout is the duration of inactivity before we timeout a
|
// negotiateTimeout is the duration of inactivity before we timeout a
|
||||||
// peer that hasn't completed the initial version negotiation.
|
// peer that hasn't completed the initial version negotiation.
|
||||||
|
@ -1582,39 +1582,24 @@ cleanup:
|
||||||
// goroutine. It uses a buffered channel to serialize output messages while
|
// goroutine. It uses a buffered channel to serialize output messages while
|
||||||
// allowing the sender to continue running asynchronously.
|
// allowing the sender to continue running asynchronously.
|
||||||
func (p *Peer) outHandler() {
|
func (p *Peer) outHandler() {
|
||||||
pingTimer := time.AfterFunc(pingTimeout, func() {
|
// pingTicker is used to periodically send pings to the remote peer.
|
||||||
nonce, err := wire.RandomUint64()
|
pingTicker := time.NewTicker(pingInterval)
|
||||||
if err != nil {
|
defer pingTicker.Stop()
|
||||||
log.Errorf("Not sending ping on timeout to %s: %v",
|
|
||||||
p, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
p.QueueMessage(wire.NewMsgPing(nonce), nil)
|
|
||||||
})
|
|
||||||
out:
|
out:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case msg := <-p.sendQueue:
|
case msg := <-p.sendQueue:
|
||||||
// Reset the ping timer for messages that expect a
|
|
||||||
// reply since we only want to send pings when we would
|
|
||||||
// otherwise not receive a reply from the peer. The
|
|
||||||
// getblocks and inv messages are specifically not
|
|
||||||
// counted here since there is no guarantee they will
|
|
||||||
// result in a reply.
|
|
||||||
reset := true
|
|
||||||
switch m := msg.msg.(type) {
|
switch m := msg.msg.(type) {
|
||||||
case *wire.MsgVersion:
|
case *wire.MsgVersion:
|
||||||
// Expects a verack message. Also set the flag
|
// Set the flag which indicates the version has
|
||||||
// which indicates the version has been sent.
|
// been sent.
|
||||||
p.flagsMtx.Lock()
|
p.flagsMtx.Lock()
|
||||||
p.versionSent = true
|
p.versionSent = true
|
||||||
p.flagsMtx.Unlock()
|
p.flagsMtx.Unlock()
|
||||||
|
|
||||||
case *wire.MsgGetAddr:
|
|
||||||
// Expects an addr message.
|
|
||||||
|
|
||||||
case *wire.MsgPing:
|
case *wire.MsgPing:
|
||||||
// Expects a pong message in later protocol
|
// Only expects a pong message in later protocol
|
||||||
// versions. Also set up statistics.
|
// versions. Also set up statistics.
|
||||||
if p.ProtocolVersion() > wire.BIP0031Version {
|
if p.ProtocolVersion() > wire.BIP0031Version {
|
||||||
p.statsMtx.Lock()
|
p.statsMtx.Lock()
|
||||||
|
@ -1622,26 +1607,8 @@ out:
|
||||||
p.lastPingTime = time.Now()
|
p.lastPingTime = time.Now()
|
||||||
p.statsMtx.Unlock()
|
p.statsMtx.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
case *wire.MsgMemPool:
|
|
||||||
// Expects an inv message.
|
|
||||||
|
|
||||||
case *wire.MsgGetData:
|
|
||||||
// Expects a block, tx, or notfound message.
|
|
||||||
|
|
||||||
case *wire.MsgGetHeaders:
|
|
||||||
// Expects a headers message.
|
|
||||||
|
|
||||||
default:
|
|
||||||
// Not one of the above, no sure reply.
|
|
||||||
// We want to ping if nothing else
|
|
||||||
// interesting happens.
|
|
||||||
reset = false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if reset {
|
|
||||||
pingTimer.Reset(pingTimeout)
|
|
||||||
}
|
|
||||||
p.writeMessage(msg.msg)
|
p.writeMessage(msg.msg)
|
||||||
p.statsMtx.Lock()
|
p.statsMtx.Lock()
|
||||||
p.lastSend = time.Now()
|
p.lastSend = time.Now()
|
||||||
|
@ -1651,13 +1618,19 @@ out:
|
||||||
}
|
}
|
||||||
p.sendDoneQueue <- struct{}{}
|
p.sendDoneQueue <- struct{}{}
|
||||||
|
|
||||||
|
case <-pingTicker.C:
|
||||||
|
nonce, err := wire.RandomUint64()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Not sending ping to %s: %v", p, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
p.QueueMessage(wire.NewMsgPing(nonce), nil)
|
||||||
|
|
||||||
case <-p.quit:
|
case <-p.quit:
|
||||||
break out
|
break out
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pingTimer.Stop()
|
|
||||||
|
|
||||||
<-p.queueQuit
|
<-p.queueQuit
|
||||||
|
|
||||||
// Drain any wait channels before we go away so we don't leave something
|
// Drain any wait channels before we go away so we don't leave something
|
||||||
|
|
Loading…
Add table
Reference in a new issue