From 3cb286fb40116317f96bcd56bf8aeddf6ec8bec2 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Mon, 4 Aug 2014 06:15:08 -0400 Subject: [PATCH] Introduce thread-safe PeerMap --- http/announce_test.go | 4 +- http/writer.go | 4 +- tracker/announce.go | 115 +++++--------------------- tracker/memory/conn.go | 23 ++---- tracker/models/models.go | 172 ++++++++++++++++++++++++++++++++++----- 5 files changed, 180 insertions(+), 138 deletions(-) diff --git a/http/announce_test.go b/http/announce_test.go index 8b89610..5495965 100644 --- a/http/announce_test.go +++ b/http/announce_test.go @@ -374,8 +374,8 @@ func loadPrivateTestData(tkr *tracker.Tracker) error { torrent := &models.Torrent{ ID: 1, Infohash: infoHash, - Seeders: models.PeerMap{}, - Leechers: models.PeerMap{}, + Seeders: models.NewPeerMap(), + Leechers: models.NewPeerMap(), } return conn.PutTorrent(torrent) diff --git a/http/writer.go b/http/writer.go index 7e1f39b..5b34b17 100644 --- a/http/writer.go +++ b/http/writer.go @@ -111,8 +111,8 @@ func filesDict(torrents []*models.Torrent) bencode.Dict { func torrentDict(torrent *models.Torrent) bencode.Dict { return bencode.Dict{ - "complete": len(torrent.Seeders), - "incomplete": len(torrent.Leechers), + "complete": torrent.Seeders.Len(), + "incomplete": torrent.Leechers.Len(), "downloaded": torrent.Snatches, } } diff --git a/tracker/announce.go b/tracker/announce.go index f4e04ce..4976b72 100644 --- a/tracker/announce.go +++ b/tracker/announce.go @@ -5,8 +5,6 @@ package tracker import ( - "net" - "github.com/chihaya/chihaya/stats" "github.com/chihaya/chihaya/tracker/models" ) @@ -39,8 +37,8 @@ func (tkr *Tracker) HandleAnnounce(ann *models.Announce, w Writer) error { if err == models.ErrTorrentDNE && !tkr.cfg.PrivateEnabled { torrent = &models.Torrent{ Infohash: ann.Infohash, - Seeders: models.PeerMap{}, - Leechers: models.PeerMap{}, + Seeders: models.NewPeerMap(), + Leechers: models.NewPeerMap(), } err = conn.PutTorrent(torrent) @@ -91,12 +89,12 @@ func newAnnounceDelta(ann *models.Announce, t *models.Torrent) *models.AnnounceD var oldUp, oldDown, rawDeltaUp, rawDeltaDown uint64 switch { - case t.InSeederPool(ann.Peer): - oldPeer := t.Seeders[ann.Peer.Key()] + case t.Seeders.Contains(ann.Peer.Key()): + oldPeer, _ := t.Seeders.LookUp(ann.Peer.Key()) oldUp = oldPeer.Uploaded oldDown = oldPeer.Downloaded - case t.InLeecherPool(ann.Peer): - oldPeer := t.Leechers[ann.Peer.Key()] + case t.Leechers.Contains(ann.Peer.Key()): + oldPeer, _ := t.Leechers.LookUp(ann.Peer.Key()) oldUp = oldPeer.Uploaded oldDown = oldPeer.Downloaded } @@ -153,13 +151,13 @@ func updatePeer(c Conn, ann *models.Announce, peer *models.Peer) (created bool, p, t := ann.Peer, ann.Torrent switch { - case t.InSeederPool(p): + case t.Seeders.Contains(p.Key()): err = c.PutSeeder(t.Infohash, p) if err != nil { return } - case t.InLeecherPool(p): + case t.Leechers.Contains(p.Key()): err = c.PutLeecher(t.Infohash, p) if err != nil { return @@ -226,14 +224,14 @@ func handlePeerEvent(c Conn, ann *models.Announce, p *models.Peer) (snatched boo case ann.Event == "stopped" || ann.Event == "paused": // updateSwarm checks if the peer is active on the torrent, // so one of these branches must be followed. - if t.InSeederPool(p) { + if t.Seeders.Contains(p.Key()) { err = c.DeleteSeeder(t.Infohash, p) if err != nil { return } stats.RecordPeerEvent(stats.DeletedSeed, p.HasIPv6()) - } else if t.InLeecherPool(p) { + } else if t.Leechers.Contains(p.Key()) { err = c.DeleteLeecher(t.Infohash, p) if err != nil { return @@ -242,10 +240,10 @@ func handlePeerEvent(c Conn, ann *models.Announce, p *models.Peer) (snatched boo } case ann.Event == "completed": - _, v4seed := t.Seeders[models.NewPeerKey(p.ID, false)] - _, v6seed := t.Seeders[models.NewPeerKey(p.ID, true)] + v4seed := t.Seeders.Contains(models.NewPeerKey(p.ID, false)) + v6seed := t.Seeders.Contains(models.NewPeerKey(p.ID, true)) - if t.InLeecherPool(p) { + if t.Leechers.Contains(p.Key()) { err = leecherFinished(c, t, p) } else { err = models.ErrBadRequest @@ -257,7 +255,7 @@ func handlePeerEvent(c Conn, ann *models.Announce, p *models.Peer) (snatched boo snatched = true } - case t.InLeecherPool(p) && ann.Left == 0: + case t.Leechers.Contains(p.Key()) && ann.Left == 0: // A leecher completed but the event was never received. err = leecherFinished(c, t, p) } @@ -278,8 +276,8 @@ func leecherFinished(c Conn, t *models.Torrent, p *models.Peer) error { } func newAnnounceResponse(ann *models.Announce) *models.AnnounceResponse { - seedCount := len(ann.Torrent.Seeders) - leechCount := len(ann.Torrent.Leechers) + seedCount := ann.Torrent.Seeders.Len() + leechCount := ann.Torrent.Leechers.Len() res := &models.AnnounceResponse{ Complete: seedCount, @@ -303,85 +301,10 @@ func getPeers(ann *models.Announce) (ipv4s, ipv6s models.PeerList) { if ann.Left == 0 { // If they're seeding, give them only leechers. - return appendPeers(ipv4s, ipv6s, ann, ann.Torrent.Leechers, ann.NumWant) + return ann.Torrent.Leechers.AppendPeers(ipv4s, ipv6s, ann, ann.NumWant) } // If they're leeching, prioritize giving them seeders. - ipv4s, ipv6s = appendPeers(ipv4s, ipv6s, ann, ann.Torrent.Seeders, ann.NumWant) - return appendPeers(ipv4s, ipv6s, ann, ann.Torrent.Leechers, ann.NumWant-len(ipv4s)-len(ipv6s)) -} - -// appendPeers implements the logic of adding peers to the IPv4 or IPv6 lists. -func appendPeers(ipv4s, ipv6s models.PeerList, ann *models.Announce, peers models.PeerMap, wanted int) (models.PeerList, models.PeerList) { - if ann.Config.PreferredSubnet { - return appendSubnetPeers(ipv4s, ipv6s, ann, peers, wanted) - } - - count := 0 - - for _, peer := range peers { - if count >= wanted { - break - } else if peersEquivalent(&peer, ann.Peer) { - continue - } - - if ann.HasIPv6() && peer.HasIPv6() { - ipv6s = append(ipv6s, peer) - count++ - } else if peer.HasIPv4() { - ipv4s = append(ipv4s, peer) - count++ - } - } - - return ipv4s, ipv6s -} - -// appendSubnetPeers is an alternative version of appendPeers used when the -// config variable PreferredSubnet is enabled. -func appendSubnetPeers(ipv4s, ipv6s models.PeerList, ann *models.Announce, peers models.PeerMap, wanted int) (models.PeerList, models.PeerList) { - var subnetIPv4 net.IPNet - var subnetIPv6 net.IPNet - - if ann.HasIPv4() { - subnetIPv4 = net.IPNet{ann.IPv4, net.CIDRMask(ann.Config.PreferredIPv4Subnet, 32)} - } - - if ann.HasIPv6() { - subnetIPv6 = net.IPNet{ann.IPv6, net.CIDRMask(ann.Config.PreferredIPv6Subnet, 128)} - } - - // Iterate over the peers twice: first add only peers in the same subnet and - // if we still need more peers grab ones that haven't already been added. - count := 0 - for _, checkInSubnet := range [2]bool{true, false} { - for _, peer := range peers { - if count >= wanted { - break - } - - inSubnet4 := peer.HasIPv4() && subnetIPv4.Contains(peer.IP) - inSubnet6 := peer.HasIPv6() && subnetIPv6.Contains(peer.IP) - - if peersEquivalent(&peer, ann.Peer) || checkInSubnet != (inSubnet4 || inSubnet6) { - continue - } - - if ann.HasIPv6() && peer.HasIPv6() { - ipv6s = append(ipv6s, peer) - count++ - } else if peer.HasIPv4() { - ipv4s = append(ipv4s, peer) - count++ - } - } - } - - return ipv4s, ipv6s -} - -// peersEquivalent checks if two peers represent the same entity. -func peersEquivalent(a, b *models.Peer) bool { - return a.ID == b.ID || a.UserID != 0 && a.UserID == b.UserID + ipv4s, ipv6s = ann.Torrent.Seeders.AppendPeers(ipv4s, ipv6s, ann, ann.NumWant) + return ann.Torrent.Leechers.AppendPeers(ipv4s, ipv6s, ann, ann.NumWant-len(ipv4s)-len(ipv6s)) } diff --git a/tracker/memory/conn.go b/tracker/memory/conn.go index a598782..229c87a 100644 --- a/tracker/memory/conn.go +++ b/tracker/memory/conn.go @@ -88,7 +88,7 @@ func (c *Conn) DeleteLeecher(infohash string, p *models.Peer) error { if !ok { return models.ErrTorrentDNE } - delete(t.Leechers, p.Key()) + t.Leechers.Delete(p.Key()) return nil } @@ -101,7 +101,7 @@ func (c *Conn) DeleteSeeder(infohash string, p *models.Peer) error { if !ok { return models.ErrTorrentDNE } - delete(t.Seeders, p.Key()) + t.Seeders.Delete(p.Key()) return nil } @@ -114,7 +114,7 @@ func (c *Conn) PutLeecher(infohash string, p *models.Peer) error { if !ok { return models.ErrTorrentDNE } - t.Leechers[p.Key()] = *p + t.Leechers.Put(*p) return nil } @@ -127,7 +127,7 @@ func (c *Conn) PutSeeder(infohash string, p *models.Peer) error { if !ok { return models.ErrTorrentDNE } - t.Seeders[p.Key()] = *p + t.Seeders.Put(*p) return nil } @@ -228,19 +228,8 @@ func (c *Conn) PurgeInactivePeers(purgeEmptyTorrents bool, before time.Time) err continue // Torrent deleted since keys were computed. } - for key, peer := range torrent.Seeders { - if peer.LastAnnounce <= unixtime { - delete(torrent.Seeders, key) - stats.RecordPeerEvent(stats.ReapedSeed, peer.HasIPv6()) - } - } - - for key, peer := range torrent.Leechers { - if peer.LastAnnounce <= unixtime { - delete(torrent.Leechers, key) - stats.RecordPeerEvent(stats.ReapedLeech, peer.HasIPv6()) - } - } + torrent.Seeders.Purge(unixtime) + torrent.Leechers.Purge(unixtime) peers := torrent.PeerCount() c.torrentsM.Unlock() diff --git a/tracker/models/models.go b/tracker/models/models.go index 6794640..7d0d805 100644 --- a/tracker/models/models.go +++ b/tracker/models/models.go @@ -6,9 +6,11 @@ package models import ( "net" + "sync" "time" "github.com/chihaya/chihaya/config" + "github.com/chihaya/chihaya/stats" ) var ( @@ -55,6 +57,18 @@ type Peer struct { LastAnnounce int64 `json:"last_announce"` } +func (p *Peer) HasIPv4() bool { + return !p.HasIPv6() +} + +func (p *Peer) HasIPv6() bool { + return len(p.IP) == net.IPv6len +} + +func (p *Peer) Key() PeerKey { + return NewPeerKey(p.ID, p.HasIPv6()) +} + type PeerList []Peer type PeerKey string @@ -67,18 +81,146 @@ func NewPeerKey(peerID string, ipv6 bool) PeerKey { } // PeerMap is a map from PeerKeys to Peers. -type PeerMap map[PeerKey]Peer - -func (p *Peer) HasIPv4() bool { - return !p.HasIPv6() +type PeerMap struct { + peers map[PeerKey]Peer + sync.RWMutex } -func (p *Peer) HasIPv6() bool { - return len(p.IP) == net.IPv6len +func NewPeerMap() PeerMap { + return PeerMap{ + peers: make(map[PeerKey]Peer), + } } -func (p *Peer) Key() PeerKey { - return NewPeerKey(p.ID, p.HasIPv6()) +func (pm *PeerMap) Contains(pk PeerKey) (exists bool) { + pm.RLock() + defer pm.RUnlock() + + _, exists = pm.peers[pk] + + return +} + +func (pm *PeerMap) LookUp(pk PeerKey) (peer Peer, exists bool) { + pm.RLock() + defer pm.RUnlock() + + peer, exists = pm.peers[pk] + + return +} + +func (pm *PeerMap) Put(p Peer) { + pm.Lock() + defer pm.Unlock() + + pm.peers[p.Key()] = p +} + +func (pm *PeerMap) Delete(pk PeerKey) { + pm.Lock() + defer pm.Unlock() + + delete(pm.peers, pk) +} + +func (pm *PeerMap) Len() int { + pm.RLock() + defer pm.RUnlock() + + return len(pm.peers) +} + +func (pm *PeerMap) Purge(unixtime int64) { + pm.Lock() + defer pm.Unlock() + + for key, peer := range pm.peers { + if peer.LastAnnounce <= unixtime { + delete(pm.peers, key) + stats.RecordPeerEvent(stats.ReapedSeed, peer.HasIPv6()) + } + } +} + +// AppendPeers implements the logic of adding peers to given IPv4 or IPv6 lists. +func (pm *PeerMap) AppendPeers(ipv4s, ipv6s PeerList, ann *Announce, wanted int) (PeerList, PeerList) { + if ann.Config.PreferredSubnet { + return pm.AppendSubnetPeers(ipv4s, ipv6s, ann, wanted) + } + + pm.Lock() + defer pm.Unlock() + + count := 0 + for _, peer := range pm.peers { + if count >= wanted { + break + } else if peersEquivalent(&peer, ann.Peer) { + continue + } + + if ann.HasIPv6() && peer.HasIPv6() { + ipv6s = append(ipv6s, peer) + count++ + } else if peer.HasIPv4() { + ipv4s = append(ipv4s, peer) + count++ + } + } + + return ipv4s, ipv6s +} + +// peersEquivalent checks if two peers represent the same entity. +func peersEquivalent(a, b *Peer) bool { + return a.ID == b.ID || a.UserID != 0 && a.UserID == b.UserID +} + +// AppendSubnetPeers is an alternative version of appendPeers used when the +// config variable PreferredSubnet is enabled. +func (pm *PeerMap) AppendSubnetPeers(ipv4s, ipv6s PeerList, ann *Announce, wanted int) (PeerList, PeerList) { + var subnetIPv4 net.IPNet + var subnetIPv6 net.IPNet + + if ann.HasIPv4() { + subnetIPv4 = net.IPNet{ann.IPv4, net.CIDRMask(ann.Config.PreferredIPv4Subnet, 32)} + } + + if ann.HasIPv6() { + subnetIPv6 = net.IPNet{ann.IPv6, net.CIDRMask(ann.Config.PreferredIPv6Subnet, 128)} + } + + pm.Lock() + defer pm.Unlock() + + // Iterate over the peers twice: first add only peers in the same subnet and + // if we still need more peers grab ones that haven't already been added. + count := 0 + for _, checkInSubnet := range [2]bool{true, false} { + for _, peer := range pm.peers { + if count >= wanted { + break + } + + inSubnet4 := peer.HasIPv4() && subnetIPv4.Contains(peer.IP) + inSubnet6 := peer.HasIPv6() && subnetIPv6.Contains(peer.IP) + + if peersEquivalent(&peer, ann.Peer) || checkInSubnet != (inSubnet4 || inSubnet6) { + continue + } + + if ann.HasIPv6() && peer.HasIPv6() { + ipv6s = append(ipv6s, peer) + count++ + } else if peer.HasIPv4() { + ipv4s = append(ipv4s, peer) + count++ + } + } + } + + return ipv4s, ipv6s } // Torrent is a swarm for a given torrent file. @@ -95,21 +237,9 @@ type Torrent struct { LastAction int64 `json:"last_action"` } -// InSeederPool returns true if a peer is within a Torrent's map of seeders. -func (t *Torrent) InSeederPool(p *Peer) (exists bool) { - _, exists = t.Seeders[p.Key()] - return -} - -// InLeecherPool returns true if a peer is within a Torrent's map of leechers. -func (t *Torrent) InLeecherPool(p *Peer) (exists bool) { - _, exists = t.Leechers[p.Key()] - return -} - // PeerCount returns the total number of peers connected on this Torrent. func (t *Torrent) PeerCount() int { - return len(t.Seeders) + len(t.Leechers) + return t.Seeders.Len() + t.Leechers.Len() } // User is a registered user for private trackers.