From e52e4d5f1d27aa27fb9d012cbc16e1b1cba4eca0 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Fri, 1 Aug 2014 11:21:57 -0400 Subject: [PATCH] Introduce PeerKey PeerKeys are used to prevent overwriting of peers which want to announce for both IPv4 and IPv6. --- stats/stats.go | 13 ++--- tracker/announce.go | 104 +++++++++++++++++++++++++-------------- tracker/conn.go | 8 +-- tracker/memory/conn.go | 46 ++++------------- tracker/models/models.go | 25 +++++++--- 5 files changed, 107 insertions(+), 89 deletions(-) diff --git a/stats/stats.go b/stats/stats.go index 9a074e2..51ef1a9 100644 --- a/stats/stats.go +++ b/stats/stats.go @@ -139,11 +139,12 @@ func (s *Stats) RecordEvent(event int) { s.events <- event } -func (s *Stats) RecordPeerEvent(event int, ipv6 bool) { - if ipv6 { - s.ipv6PeerEvents <- event - } else { +func (s *Stats) RecordPeerEvent(event int, ipv string) { + switch ipv { + case "ipv4": s.ipv4PeerEvents <- event + case "ipv6": + s.ipv6PeerEvents <- event } } @@ -265,8 +266,8 @@ func RecordEvent(event int) { } // RecordPeerEvent broadcasts a peer event to the default stats queue. -func RecordPeerEvent(event int, ipv6 bool) { - DefaultStats.RecordPeerEvent(event, ipv6) +func RecordPeerEvent(event int, ipv string) { + DefaultStats.RecordPeerEvent(event, ipv) } // RecordTiming broadcasts a timing event to the default stats queue. diff --git a/tracker/announce.go b/tracker/announce.go index ec21676..98c04f5 100644 --- a/tracker/announce.go +++ b/tracker/announce.go @@ -56,15 +56,35 @@ func (tkr *Tracker) HandleAnnounce(ann *models.Announce, w Writer) error { peer := models.NewPeer(ann, user, torrent) - created, err := updateSwarm(conn, ann, peer, torrent) - if err != nil { - return err + var createdIPv4, createdIPv6 bool + if peer.HasIPv4() { + createdIPv4, err = updateSwarm(conn, ann, peer, torrent, "ipv4") + if err != nil { + return err + } } + if peer.HasIPv6() { + createdIPv6, err = updateSwarm(conn, ann, peer, torrent, "ipv6") + if err != nil { + return err + } + } + created := createdIPv4 || createdIPv6 - snatched, err := handleEvent(conn, ann, peer, user, torrent) - if err != nil { - return err + var snatchedIPv4, snatchedIPv6 bool + if peer.HasIPv4() { + snatchedIPv4, err = handleEvent(conn, ann, peer, user, torrent, "ipv4") + if err != nil { + return err + } } + if peer.HasIPv6() { + snatchedIPv6, err = handleEvent(conn, ann, peer, user, torrent, "ipv6") + if err != nil { + return err + } + } + snatched := snatchedIPv4 || snatchedIPv6 if tkr.cfg.PrivateEnabled { delta := models.NewAnnounceDelta(ann, peer, user, torrent, created, snatched) @@ -83,23 +103,23 @@ func (tkr *Tracker) HandleAnnounce(ann *models.Announce, w Writer) error { } // updateSwarm handles the changes to a torrent's swarm given an announce. -func updateSwarm(c Conn, ann *models.Announce, p *models.Peer, t *models.Torrent) (created bool, err error) { +func updateSwarm(c Conn, ann *models.Announce, p *models.Peer, t *models.Torrent, ipv string) (created bool, err error) { c.TouchTorrent(t.Infohash) switch { - case t.InSeederPool(p): - err = c.PutSeeder(t.Infohash, p) + case t.InSeederPool(p.ID, ipv): + err = c.PutSeeder(t.Infohash, ipv, p) if err != nil { return } - t.Seeders[p.ID] = *p + t.Seeders[models.NewPeerKey(p.ID, ipv)] = *p - case t.InLeecherPool(p): - err = c.PutLeecher(t.Infohash, p) + case t.InLeecherPool(p.ID, ipv): + err = c.PutLeecher(t.Infohash, ipv, p) if err != nil { return } - t.Leechers[p.ID] = *p + t.Leechers[models.NewPeerKey(p.ID, ipv)] = *p default: if ann.Event != "" && ann.Event != "started" { @@ -108,20 +128,20 @@ func updateSwarm(c Conn, ann *models.Announce, p *models.Peer, t *models.Torrent } if ann.Left == 0 { - err = c.PutSeeder(t.Infohash, p) + err = c.PutSeeder(t.Infohash, ipv, p) if err != nil { return } - t.Seeders[p.ID] = *p - stats.RecordPeerEvent(stats.NewSeed, p.HasIPv6()) + t.Seeders[models.NewPeerKey(p.ID, ipv)] = *p + stats.RecordPeerEvent(stats.NewSeed, ipv) } else { - err = c.PutLeecher(t.Infohash, p) + err = c.PutLeecher(t.Infohash, ipv, p) if err != nil { return } - t.Leechers[p.ID] = *p - stats.RecordPeerEvent(stats.NewLeech, p.HasIPv6()) + t.Leechers[models.NewPeerKey(p.ID, ipv)] = *p + stats.RecordPeerEvent(stats.NewLeech, ipv) } created = true } @@ -131,26 +151,28 @@ func updateSwarm(c Conn, ann *models.Announce, p *models.Peer, t *models.Torrent // handleEvent checks to see whether an announce has an event and if it does, // properly handles that event. -func handleEvent(c Conn, ann *models.Announce, p *models.Peer, u *models.User, t *models.Torrent) (snatched bool, err error) { +func handleEvent(c Conn, ann *models.Announce, p *models.Peer, u *models.User, t *models.Torrent, ipv string) (snatched bool, err error) { + peerkey := models.NewPeerKey(p.ID, ipv) + switch { case ann.Event == "stopped" || ann.Event == "paused": // updateSwarm checks if the peer is active on the torrent, // so one of these branches must be followed. - if t.InSeederPool(p) { - err = c.DeleteSeeder(t.Infohash, p.ID) + if t.InSeederPool(p.ID, ipv) { + err = c.DeleteSeeder(t.Infohash, peerkey) if err != nil { return } - delete(t.Seeders, p.ID) - stats.RecordPeerEvent(stats.DeletedSeed, p.HasIPv6()) + delete(t.Seeders, models.NewPeerKey(p.ID, ipv)) + stats.RecordPeerEvent(stats.DeletedSeed, ipv) - } else if t.InLeecherPool(p) { - err = c.DeleteLeecher(t.Infohash, p.ID) + } else if t.InLeecherPool(p.ID, ipv) { + err = c.DeleteLeecher(t.Infohash, peerkey) if err != nil { return } - delete(t.Leechers, p.ID) - stats.RecordPeerEvent(stats.DeletedLeech, p.HasIPv6()) + delete(t.Leechers, models.NewPeerKey(p.ID, ipv)) + stats.RecordPeerEvent(stats.DeletedLeech, ipv) } case ann.Event == "completed": @@ -168,32 +190,40 @@ func handleEvent(c Conn, ann *models.Announce, p *models.Peer, u *models.User, t u.Snatches++ } - if t.InLeecherPool(p) { - err = leecherFinished(c, t.Infohash, p) + if t.InLeecherPool(p.ID, ipv) { + err = leecherFinished(c, t, p, ipv) } else { err = models.ErrBadRequest } - snatched = true - case t.InLeecherPool(p) && ann.Left == 0: + // If one of the dual-stacked peers is already a seeder, they have already + // snatched. + if !(t.InSeederPool(p.ID, "ipv4") || t.InSeederPool(p.ID, "ipv6")) { + snatched = true + } + + case t.InLeecherPool(p.ID, ipv) && ann.Left == 0: // A leecher completed but the event was never received. - err = leecherFinished(c, t.Infohash, p) + err = leecherFinished(c, t, p, ipv) } return } // leecherFinished moves a peer from the leeching pool to the seeder pool. -func leecherFinished(c Conn, infohash string, p *models.Peer) error { - if err := c.DeleteLeecher(infohash, p.ID); err != nil { +func leecherFinished(c Conn, t *models.Torrent, p *models.Peer, ipv string) error { + peerkey := models.NewPeerKey(p.ID, ipv) + if err := c.DeleteLeecher(t.Infohash, peerkey); err != nil { return err } + delete(t.Leechers, peerkey) - if err := c.PutSeeder(infohash, p); err != nil { + if err := c.PutSeeder(t.Infohash, ipv, p); err != nil { return err } + t.Seeders[peerkey] = *p - stats.RecordPeerEvent(stats.Completed, p.HasIPv6()) + stats.RecordPeerEvent(stats.Completed, ipv) return nil } diff --git a/tracker/conn.go b/tracker/conn.go index 083c98f..e492d58 100644 --- a/tracker/conn.go +++ b/tracker/conn.go @@ -64,11 +64,11 @@ type Conn interface { DeleteTorrent(infohash string) error IncrementTorrentSnatches(infohash string) error - PutLeecher(infohash string, p *models.Peer) error - DeleteLeecher(infohash, peerID string) error + PutLeecher(infohash, ipv string, p *models.Peer) error + DeleteLeecher(infohash string, pk models.PeerKey) error - PutSeeder(infohash string, p *models.Peer) error - DeleteSeeder(infohash, peerID string) error + PutSeeder(infohash, ipv string, p *models.Peer) error + DeleteSeeder(infohash string, pk models.PeerKey) error PurgeInactiveTorrent(infohash string) error PurgeInactivePeers(purgeEmptyTorrents bool, before time.Time) error diff --git a/tracker/memory/conn.go b/tracker/memory/conn.go index 53ca725..ea7b30a 100644 --- a/tracker/memory/conn.go +++ b/tracker/memory/conn.go @@ -93,7 +93,7 @@ func (c *Conn) TouchTorrent(infohash string) error { return nil } -func (c *Conn) AddLeecher(infohash string, p *models.Peer) error { +func (c *Conn) DeleteLeecher(infohash string, pk models.PeerKey) error { c.torrentsM.Lock() defer c.torrentsM.Unlock() @@ -101,12 +101,12 @@ func (c *Conn) AddLeecher(infohash string, p *models.Peer) error { if !ok { return models.ErrTorrentDNE } - t.Leechers[p.ID] = *p + delete(t.Leechers, pk) return nil } -func (c *Conn) AddSeeder(infohash string, p *models.Peer) error { +func (c *Conn) DeleteSeeder(infohash string, pk models.PeerKey) error { c.torrentsM.Lock() defer c.torrentsM.Unlock() @@ -114,12 +114,12 @@ func (c *Conn) AddSeeder(infohash string, p *models.Peer) error { if !ok { return models.ErrTorrentDNE } - t.Seeders[p.ID] = *p + delete(t.Seeders, pk) return nil } -func (c *Conn) DeleteLeecher(infohash, peerkey string) error { +func (c *Conn) PutLeecher(infohash, ipv string, p *models.Peer) error { c.torrentsM.Lock() defer c.torrentsM.Unlock() @@ -127,12 +127,12 @@ func (c *Conn) DeleteLeecher(infohash, peerkey string) error { if !ok { return models.ErrTorrentDNE } - delete(t.Leechers, peerkey) + t.Leechers[models.NewPeerKey(p.ID, ipv)] = *p return nil } -func (c *Conn) DeleteSeeder(infohash, peerkey string) error { +func (c *Conn) PutSeeder(infohash, ipv string, p *models.Peer) error { c.torrentsM.Lock() defer c.torrentsM.Unlock() @@ -140,33 +140,7 @@ func (c *Conn) DeleteSeeder(infohash, peerkey string) error { if !ok { return models.ErrTorrentDNE } - delete(t.Seeders, peerkey) - - return nil -} - -func (c *Conn) PutLeecher(infohash string, p *models.Peer) error { - c.torrentsM.Lock() - defer c.torrentsM.Unlock() - - t, ok := c.torrents[infohash] - if !ok { - return models.ErrTorrentDNE - } - t.Leechers[p.ID] = *p - - return nil -} - -func (c *Conn) PutSeeder(infohash string, p *models.Peer) error { - c.torrentsM.Lock() - defer c.torrentsM.Unlock() - - t, ok := c.torrents[infohash] - if !ok { - return models.ErrTorrentDNE - } - t.Seeders[p.ID] = *p + t.Seeders[models.NewPeerKey(p.ID, ipv)] = *p return nil } @@ -270,14 +244,14 @@ func (c *Conn) PurgeInactivePeers(purgeEmptyTorrents bool, before time.Time) err for key, peer := range torrent.Seeders { if peer.LastAnnounce <= unixtime { delete(torrent.Seeders, key) - stats.RecordPeerEvent(stats.ReapedSeed, peer.HasIPv6()) + stats.RecordPeerEvent(stats.ReapedSeed, string(key[:4])) } } for key, peer := range torrent.Leechers { if peer.LastAnnounce <= unixtime { delete(torrent.Leechers, key) - stats.RecordPeerEvent(stats.ReapedLeech, peer.HasIPv6()) + stats.RecordPeerEvent(stats.ReapedLeech, string(key[:4])) } } diff --git a/tracker/models/models.go b/tracker/models/models.go index eaed0d9..363ae32 100644 --- a/tracker/models/models.go +++ b/tracker/models/models.go @@ -6,6 +6,7 @@ package models import ( "net" + "strings" "time" "github.com/chihaya/chihaya/config" @@ -57,9 +58,21 @@ type Peer struct { } type PeerList []Peer +type PeerKey string -// PeerMap is a map from PeerIDs to Peers. -type PeerMap map[string]Peer +func NewPeerKey(peerID, ipv string) (pk PeerKey) { + switch strings.ToLower(ipv) { + case "ipv4": + pk = PeerKey("IPv4" + peerID) + case "ipv6": + pk = PeerKey("IPv6" + peerID) + } + + return pk +} + +// PeerMap is a map from PeerKeys to Peers. +type PeerMap map[PeerKey]Peer // NewPeer returns the Peer representation of an Announce. When provided nil // for the announce parameter, it panics. When provided nil for the user or @@ -117,14 +130,14 @@ type Torrent struct { } // InSeederPool returns true if a peer is within a Torrent's map of seeders. -func (t *Torrent) InSeederPool(p *Peer) (exists bool) { - _, exists = t.Seeders[p.ID] +func (t *Torrent) InSeederPool(peerID, ipv string) (exists bool) { + _, exists = t.Seeders[NewPeerKey(peerID, ipv)] return } // InLeecherPool returns true if a peer is within a Torrent's map of leechers. -func (t *Torrent) InLeecherPool(p *Peer) (exists bool) { - _, exists = t.Leechers[p.ID] +func (t *Torrent) InLeecherPool(peerID, ipv string) (exists bool) { + _, exists = t.Leechers[NewPeerKey(peerID, ipv)] return }