Introduce PeerKey

PeerKeys are used to prevent overwriting of peers which want to announce
for both IPv4 and IPv6.
This commit is contained in:
Jimmy Zelinskie 2014-08-01 11:21:57 -04:00
parent c438b877ba
commit e52e4d5f1d
5 changed files with 107 additions and 89 deletions

View file

@ -139,11 +139,12 @@ func (s *Stats) RecordEvent(event int) {
s.events <- event s.events <- event
} }
func (s *Stats) RecordPeerEvent(event int, ipv6 bool) { func (s *Stats) RecordPeerEvent(event int, ipv string) {
if ipv6 { switch ipv {
s.ipv6PeerEvents <- event case "ipv4":
} else {
s.ipv4PeerEvents <- event s.ipv4PeerEvents <- event
case "ipv6":
s.ipv6PeerEvents <- event
} }
} }
@ -265,8 +266,8 @@ func RecordEvent(event int) {
} }
// RecordPeerEvent broadcasts a peer event to the default stats queue. // RecordPeerEvent broadcasts a peer event to the default stats queue.
func RecordPeerEvent(event int, ipv6 bool) { func RecordPeerEvent(event int, ipv string) {
DefaultStats.RecordPeerEvent(event, ipv6) DefaultStats.RecordPeerEvent(event, ipv)
} }
// RecordTiming broadcasts a timing event to the default stats queue. // RecordTiming broadcasts a timing event to the default stats queue.

View file

@ -56,15 +56,35 @@ func (tkr *Tracker) HandleAnnounce(ann *models.Announce, w Writer) error {
peer := models.NewPeer(ann, user, torrent) peer := models.NewPeer(ann, user, torrent)
created, err := updateSwarm(conn, ann, peer, torrent) var createdIPv4, createdIPv6 bool
if err != nil { if peer.HasIPv4() {
return err createdIPv4, err = updateSwarm(conn, ann, peer, torrent, "ipv4")
if err != nil {
return err
}
} }
if peer.HasIPv6() {
createdIPv6, err = updateSwarm(conn, ann, peer, torrent, "ipv6")
if err != nil {
return err
}
}
created := createdIPv4 || createdIPv6
snatched, err := handleEvent(conn, ann, peer, user, torrent) var snatchedIPv4, snatchedIPv6 bool
if err != nil { if peer.HasIPv4() {
return err snatchedIPv4, err = handleEvent(conn, ann, peer, user, torrent, "ipv4")
if err != nil {
return err
}
} }
if peer.HasIPv6() {
snatchedIPv6, err = handleEvent(conn, ann, peer, user, torrent, "ipv6")
if err != nil {
return err
}
}
snatched := snatchedIPv4 || snatchedIPv6
if tkr.cfg.PrivateEnabled { if tkr.cfg.PrivateEnabled {
delta := models.NewAnnounceDelta(ann, peer, user, torrent, created, snatched) delta := models.NewAnnounceDelta(ann, peer, user, torrent, created, snatched)
@ -83,23 +103,23 @@ func (tkr *Tracker) HandleAnnounce(ann *models.Announce, w Writer) error {
} }
// updateSwarm handles the changes to a torrent's swarm given an announce. // updateSwarm handles the changes to a torrent's swarm given an announce.
func updateSwarm(c Conn, ann *models.Announce, p *models.Peer, t *models.Torrent) (created bool, err error) { func updateSwarm(c Conn, ann *models.Announce, p *models.Peer, t *models.Torrent, ipv string) (created bool, err error) {
c.TouchTorrent(t.Infohash) c.TouchTorrent(t.Infohash)
switch { switch {
case t.InSeederPool(p): case t.InSeederPool(p.ID, ipv):
err = c.PutSeeder(t.Infohash, p) err = c.PutSeeder(t.Infohash, ipv, p)
if err != nil { if err != nil {
return return
} }
t.Seeders[p.ID] = *p t.Seeders[models.NewPeerKey(p.ID, ipv)] = *p
case t.InLeecherPool(p): case t.InLeecherPool(p.ID, ipv):
err = c.PutLeecher(t.Infohash, p) err = c.PutLeecher(t.Infohash, ipv, p)
if err != nil { if err != nil {
return return
} }
t.Leechers[p.ID] = *p t.Leechers[models.NewPeerKey(p.ID, ipv)] = *p
default: default:
if ann.Event != "" && ann.Event != "started" { if ann.Event != "" && ann.Event != "started" {
@ -108,20 +128,20 @@ func updateSwarm(c Conn, ann *models.Announce, p *models.Peer, t *models.Torrent
} }
if ann.Left == 0 { if ann.Left == 0 {
err = c.PutSeeder(t.Infohash, p) err = c.PutSeeder(t.Infohash, ipv, p)
if err != nil { if err != nil {
return return
} }
t.Seeders[p.ID] = *p t.Seeders[models.NewPeerKey(p.ID, ipv)] = *p
stats.RecordPeerEvent(stats.NewSeed, p.HasIPv6()) stats.RecordPeerEvent(stats.NewSeed, ipv)
} else { } else {
err = c.PutLeecher(t.Infohash, p) err = c.PutLeecher(t.Infohash, ipv, p)
if err != nil { if err != nil {
return return
} }
t.Leechers[p.ID] = *p t.Leechers[models.NewPeerKey(p.ID, ipv)] = *p
stats.RecordPeerEvent(stats.NewLeech, p.HasIPv6()) stats.RecordPeerEvent(stats.NewLeech, ipv)
} }
created = true created = true
} }
@ -131,26 +151,28 @@ func updateSwarm(c Conn, ann *models.Announce, p *models.Peer, t *models.Torrent
// handleEvent checks to see whether an announce has an event and if it does, // handleEvent checks to see whether an announce has an event and if it does,
// properly handles that event. // properly handles that event.
func handleEvent(c Conn, ann *models.Announce, p *models.Peer, u *models.User, t *models.Torrent) (snatched bool, err error) { func handleEvent(c Conn, ann *models.Announce, p *models.Peer, u *models.User, t *models.Torrent, ipv string) (snatched bool, err error) {
peerkey := models.NewPeerKey(p.ID, ipv)
switch { switch {
case ann.Event == "stopped" || ann.Event == "paused": case ann.Event == "stopped" || ann.Event == "paused":
// updateSwarm checks if the peer is active on the torrent, // updateSwarm checks if the peer is active on the torrent,
// so one of these branches must be followed. // so one of these branches must be followed.
if t.InSeederPool(p) { if t.InSeederPool(p.ID, ipv) {
err = c.DeleteSeeder(t.Infohash, p.ID) err = c.DeleteSeeder(t.Infohash, peerkey)
if err != nil { if err != nil {
return return
} }
delete(t.Seeders, p.ID) delete(t.Seeders, models.NewPeerKey(p.ID, ipv))
stats.RecordPeerEvent(stats.DeletedSeed, p.HasIPv6()) stats.RecordPeerEvent(stats.DeletedSeed, ipv)
} else if t.InLeecherPool(p) { } else if t.InLeecherPool(p.ID, ipv) {
err = c.DeleteLeecher(t.Infohash, p.ID) err = c.DeleteLeecher(t.Infohash, peerkey)
if err != nil { if err != nil {
return return
} }
delete(t.Leechers, p.ID) delete(t.Leechers, models.NewPeerKey(p.ID, ipv))
stats.RecordPeerEvent(stats.DeletedLeech, p.HasIPv6()) stats.RecordPeerEvent(stats.DeletedLeech, ipv)
} }
case ann.Event == "completed": case ann.Event == "completed":
@ -168,32 +190,40 @@ func handleEvent(c Conn, ann *models.Announce, p *models.Peer, u *models.User, t
u.Snatches++ u.Snatches++
} }
if t.InLeecherPool(p) { if t.InLeecherPool(p.ID, ipv) {
err = leecherFinished(c, t.Infohash, p) err = leecherFinished(c, t, p, ipv)
} else { } else {
err = models.ErrBadRequest err = models.ErrBadRequest
} }
snatched = true
case t.InLeecherPool(p) && ann.Left == 0: // If one of the dual-stacked peers is already a seeder, they have already
// snatched.
if !(t.InSeederPool(p.ID, "ipv4") || t.InSeederPool(p.ID, "ipv6")) {
snatched = true
}
case t.InLeecherPool(p.ID, ipv) && ann.Left == 0:
// A leecher completed but the event was never received. // A leecher completed but the event was never received.
err = leecherFinished(c, t.Infohash, p) err = leecherFinished(c, t, p, ipv)
} }
return return
} }
// leecherFinished moves a peer from the leeching pool to the seeder pool. // leecherFinished moves a peer from the leeching pool to the seeder pool.
func leecherFinished(c Conn, infohash string, p *models.Peer) error { func leecherFinished(c Conn, t *models.Torrent, p *models.Peer, ipv string) error {
if err := c.DeleteLeecher(infohash, p.ID); err != nil { peerkey := models.NewPeerKey(p.ID, ipv)
if err := c.DeleteLeecher(t.Infohash, peerkey); err != nil {
return err return err
} }
delete(t.Leechers, peerkey)
if err := c.PutSeeder(infohash, p); err != nil { if err := c.PutSeeder(t.Infohash, ipv, p); err != nil {
return err return err
} }
t.Seeders[peerkey] = *p
stats.RecordPeerEvent(stats.Completed, p.HasIPv6()) stats.RecordPeerEvent(stats.Completed, ipv)
return nil return nil
} }

View file

@ -64,11 +64,11 @@ type Conn interface {
DeleteTorrent(infohash string) error DeleteTorrent(infohash string) error
IncrementTorrentSnatches(infohash string) error IncrementTorrentSnatches(infohash string) error
PutLeecher(infohash string, p *models.Peer) error PutLeecher(infohash, ipv string, p *models.Peer) error
DeleteLeecher(infohash, peerID string) error DeleteLeecher(infohash string, pk models.PeerKey) error
PutSeeder(infohash string, p *models.Peer) error PutSeeder(infohash, ipv string, p *models.Peer) error
DeleteSeeder(infohash, peerID string) error DeleteSeeder(infohash string, pk models.PeerKey) error
PurgeInactiveTorrent(infohash string) error PurgeInactiveTorrent(infohash string) error
PurgeInactivePeers(purgeEmptyTorrents bool, before time.Time) error PurgeInactivePeers(purgeEmptyTorrents bool, before time.Time) error

View file

@ -93,7 +93,7 @@ func (c *Conn) TouchTorrent(infohash string) error {
return nil return nil
} }
func (c *Conn) AddLeecher(infohash string, p *models.Peer) error { func (c *Conn) DeleteLeecher(infohash string, pk models.PeerKey) error {
c.torrentsM.Lock() c.torrentsM.Lock()
defer c.torrentsM.Unlock() defer c.torrentsM.Unlock()
@ -101,12 +101,12 @@ func (c *Conn) AddLeecher(infohash string, p *models.Peer) error {
if !ok { if !ok {
return models.ErrTorrentDNE return models.ErrTorrentDNE
} }
t.Leechers[p.ID] = *p delete(t.Leechers, pk)
return nil return nil
} }
func (c *Conn) AddSeeder(infohash string, p *models.Peer) error { func (c *Conn) DeleteSeeder(infohash string, pk models.PeerKey) error {
c.torrentsM.Lock() c.torrentsM.Lock()
defer c.torrentsM.Unlock() defer c.torrentsM.Unlock()
@ -114,12 +114,12 @@ func (c *Conn) AddSeeder(infohash string, p *models.Peer) error {
if !ok { if !ok {
return models.ErrTorrentDNE return models.ErrTorrentDNE
} }
t.Seeders[p.ID] = *p delete(t.Seeders, pk)
return nil return nil
} }
func (c *Conn) DeleteLeecher(infohash, peerkey string) error { func (c *Conn) PutLeecher(infohash, ipv string, p *models.Peer) error {
c.torrentsM.Lock() c.torrentsM.Lock()
defer c.torrentsM.Unlock() defer c.torrentsM.Unlock()
@ -127,12 +127,12 @@ func (c *Conn) DeleteLeecher(infohash, peerkey string) error {
if !ok { if !ok {
return models.ErrTorrentDNE return models.ErrTorrentDNE
} }
delete(t.Leechers, peerkey) t.Leechers[models.NewPeerKey(p.ID, ipv)] = *p
return nil return nil
} }
func (c *Conn) DeleteSeeder(infohash, peerkey string) error { func (c *Conn) PutSeeder(infohash, ipv string, p *models.Peer) error {
c.torrentsM.Lock() c.torrentsM.Lock()
defer c.torrentsM.Unlock() defer c.torrentsM.Unlock()
@ -140,33 +140,7 @@ func (c *Conn) DeleteSeeder(infohash, peerkey string) error {
if !ok { if !ok {
return models.ErrTorrentDNE return models.ErrTorrentDNE
} }
delete(t.Seeders, peerkey) t.Seeders[models.NewPeerKey(p.ID, ipv)] = *p
return nil
}
func (c *Conn) PutLeecher(infohash string, p *models.Peer) error {
c.torrentsM.Lock()
defer c.torrentsM.Unlock()
t, ok := c.torrents[infohash]
if !ok {
return models.ErrTorrentDNE
}
t.Leechers[p.ID] = *p
return nil
}
func (c *Conn) PutSeeder(infohash string, p *models.Peer) error {
c.torrentsM.Lock()
defer c.torrentsM.Unlock()
t, ok := c.torrents[infohash]
if !ok {
return models.ErrTorrentDNE
}
t.Seeders[p.ID] = *p
return nil return nil
} }
@ -270,14 +244,14 @@ func (c *Conn) PurgeInactivePeers(purgeEmptyTorrents bool, before time.Time) err
for key, peer := range torrent.Seeders { for key, peer := range torrent.Seeders {
if peer.LastAnnounce <= unixtime { if peer.LastAnnounce <= unixtime {
delete(torrent.Seeders, key) delete(torrent.Seeders, key)
stats.RecordPeerEvent(stats.ReapedSeed, peer.HasIPv6()) stats.RecordPeerEvent(stats.ReapedSeed, string(key[:4]))
} }
} }
for key, peer := range torrent.Leechers { for key, peer := range torrent.Leechers {
if peer.LastAnnounce <= unixtime { if peer.LastAnnounce <= unixtime {
delete(torrent.Leechers, key) delete(torrent.Leechers, key)
stats.RecordPeerEvent(stats.ReapedLeech, peer.HasIPv6()) stats.RecordPeerEvent(stats.ReapedLeech, string(key[:4]))
} }
} }

View file

@ -6,6 +6,7 @@ package models
import ( import (
"net" "net"
"strings"
"time" "time"
"github.com/chihaya/chihaya/config" "github.com/chihaya/chihaya/config"
@ -57,9 +58,21 @@ type Peer struct {
} }
type PeerList []Peer type PeerList []Peer
type PeerKey string
// PeerMap is a map from PeerIDs to Peers. func NewPeerKey(peerID, ipv string) (pk PeerKey) {
type PeerMap map[string]Peer switch strings.ToLower(ipv) {
case "ipv4":
pk = PeerKey("IPv4" + peerID)
case "ipv6":
pk = PeerKey("IPv6" + peerID)
}
return pk
}
// PeerMap is a map from PeerKeys to Peers.
type PeerMap map[PeerKey]Peer
// NewPeer returns the Peer representation of an Announce. When provided nil // NewPeer returns the Peer representation of an Announce. When provided nil
// for the announce parameter, it panics. When provided nil for the user or // for the announce parameter, it panics. When provided nil for the user or
@ -117,14 +130,14 @@ type Torrent struct {
} }
// InSeederPool returns true if a peer is within a Torrent's map of seeders. // InSeederPool returns true if a peer is within a Torrent's map of seeders.
func (t *Torrent) InSeederPool(p *Peer) (exists bool) { func (t *Torrent) InSeederPool(peerID, ipv string) (exists bool) {
_, exists = t.Seeders[p.ID] _, exists = t.Seeders[NewPeerKey(peerID, ipv)]
return return
} }
// InLeecherPool returns true if a peer is within a Torrent's map of leechers. // InLeecherPool returns true if a peer is within a Torrent's map of leechers.
func (t *Torrent) InLeecherPool(p *Peer) (exists bool) { func (t *Torrent) InLeecherPool(peerID, ipv string) (exists bool) {
_, exists = t.Leechers[p.ID] _, exists = t.Leechers[NewPeerKey(peerID, ipv)]
return return
} }