Introduce thread-safe PeerMap

This commit is contained in:
Jimmy Zelinskie 2014-08-04 06:15:08 -04:00
parent d610b4ebb0
commit 3cb286fb40
5 changed files with 180 additions and 138 deletions

View file

@ -374,8 +374,8 @@ func loadPrivateTestData(tkr *tracker.Tracker) error {
torrent := &models.Torrent{ torrent := &models.Torrent{
ID: 1, ID: 1,
Infohash: infoHash, Infohash: infoHash,
Seeders: models.PeerMap{}, Seeders: models.NewPeerMap(),
Leechers: models.PeerMap{}, Leechers: models.NewPeerMap(),
} }
return conn.PutTorrent(torrent) return conn.PutTorrent(torrent)

View file

@ -111,8 +111,8 @@ func filesDict(torrents []*models.Torrent) bencode.Dict {
func torrentDict(torrent *models.Torrent) bencode.Dict { func torrentDict(torrent *models.Torrent) bencode.Dict {
return bencode.Dict{ return bencode.Dict{
"complete": len(torrent.Seeders), "complete": torrent.Seeders.Len(),
"incomplete": len(torrent.Leechers), "incomplete": torrent.Leechers.Len(),
"downloaded": torrent.Snatches, "downloaded": torrent.Snatches,
} }
} }

View file

@ -5,8 +5,6 @@
package tracker package tracker
import ( import (
"net"
"github.com/chihaya/chihaya/stats" "github.com/chihaya/chihaya/stats"
"github.com/chihaya/chihaya/tracker/models" "github.com/chihaya/chihaya/tracker/models"
) )
@ -39,8 +37,8 @@ func (tkr *Tracker) HandleAnnounce(ann *models.Announce, w Writer) error {
if err == models.ErrTorrentDNE && !tkr.cfg.PrivateEnabled { if err == models.ErrTorrentDNE && !tkr.cfg.PrivateEnabled {
torrent = &models.Torrent{ torrent = &models.Torrent{
Infohash: ann.Infohash, Infohash: ann.Infohash,
Seeders: models.PeerMap{}, Seeders: models.NewPeerMap(),
Leechers: models.PeerMap{}, Leechers: models.NewPeerMap(),
} }
err = conn.PutTorrent(torrent) err = conn.PutTorrent(torrent)
@ -91,12 +89,12 @@ func newAnnounceDelta(ann *models.Announce, t *models.Torrent) *models.AnnounceD
var oldUp, oldDown, rawDeltaUp, rawDeltaDown uint64 var oldUp, oldDown, rawDeltaUp, rawDeltaDown uint64
switch { switch {
case t.InSeederPool(ann.Peer): case t.Seeders.Contains(ann.Peer.Key()):
oldPeer := t.Seeders[ann.Peer.Key()] oldPeer, _ := t.Seeders.LookUp(ann.Peer.Key())
oldUp = oldPeer.Uploaded oldUp = oldPeer.Uploaded
oldDown = oldPeer.Downloaded oldDown = oldPeer.Downloaded
case t.InLeecherPool(ann.Peer): case t.Leechers.Contains(ann.Peer.Key()):
oldPeer := t.Leechers[ann.Peer.Key()] oldPeer, _ := t.Leechers.LookUp(ann.Peer.Key())
oldUp = oldPeer.Uploaded oldUp = oldPeer.Uploaded
oldDown = oldPeer.Downloaded oldDown = oldPeer.Downloaded
} }
@ -153,13 +151,13 @@ func updatePeer(c Conn, ann *models.Announce, peer *models.Peer) (created bool,
p, t := ann.Peer, ann.Torrent p, t := ann.Peer, ann.Torrent
switch { switch {
case t.InSeederPool(p): case t.Seeders.Contains(p.Key()):
err = c.PutSeeder(t.Infohash, p) err = c.PutSeeder(t.Infohash, p)
if err != nil { if err != nil {
return return
} }
case t.InLeecherPool(p): case t.Leechers.Contains(p.Key()):
err = c.PutLeecher(t.Infohash, p) err = c.PutLeecher(t.Infohash, p)
if err != nil { if err != nil {
return return
@ -226,14 +224,14 @@ func handlePeerEvent(c Conn, ann *models.Announce, p *models.Peer) (snatched boo
case ann.Event == "stopped" || ann.Event == "paused": case ann.Event == "stopped" || ann.Event == "paused":
// updateSwarm checks if the peer is active on the torrent, // updateSwarm checks if the peer is active on the torrent,
// so one of these branches must be followed. // so one of these branches must be followed.
if t.InSeederPool(p) { if t.Seeders.Contains(p.Key()) {
err = c.DeleteSeeder(t.Infohash, p) err = c.DeleteSeeder(t.Infohash, p)
if err != nil { if err != nil {
return return
} }
stats.RecordPeerEvent(stats.DeletedSeed, p.HasIPv6()) stats.RecordPeerEvent(stats.DeletedSeed, p.HasIPv6())
} else if t.InLeecherPool(p) { } else if t.Leechers.Contains(p.Key()) {
err = c.DeleteLeecher(t.Infohash, p) err = c.DeleteLeecher(t.Infohash, p)
if err != nil { if err != nil {
return return
@ -242,10 +240,10 @@ func handlePeerEvent(c Conn, ann *models.Announce, p *models.Peer) (snatched boo
} }
case ann.Event == "completed": case ann.Event == "completed":
_, v4seed := t.Seeders[models.NewPeerKey(p.ID, false)] v4seed := t.Seeders.Contains(models.NewPeerKey(p.ID, false))
_, v6seed := t.Seeders[models.NewPeerKey(p.ID, true)] v6seed := t.Seeders.Contains(models.NewPeerKey(p.ID, true))
if t.InLeecherPool(p) { if t.Leechers.Contains(p.Key()) {
err = leecherFinished(c, t, p) err = leecherFinished(c, t, p)
} else { } else {
err = models.ErrBadRequest err = models.ErrBadRequest
@ -257,7 +255,7 @@ func handlePeerEvent(c Conn, ann *models.Announce, p *models.Peer) (snatched boo
snatched = true snatched = true
} }
case t.InLeecherPool(p) && ann.Left == 0: case t.Leechers.Contains(p.Key()) && ann.Left == 0:
// A leecher completed but the event was never received. // A leecher completed but the event was never received.
err = leecherFinished(c, t, p) err = leecherFinished(c, t, p)
} }
@ -278,8 +276,8 @@ func leecherFinished(c Conn, t *models.Torrent, p *models.Peer) error {
} }
func newAnnounceResponse(ann *models.Announce) *models.AnnounceResponse { func newAnnounceResponse(ann *models.Announce) *models.AnnounceResponse {
seedCount := len(ann.Torrent.Seeders) seedCount := ann.Torrent.Seeders.Len()
leechCount := len(ann.Torrent.Leechers) leechCount := ann.Torrent.Leechers.Len()
res := &models.AnnounceResponse{ res := &models.AnnounceResponse{
Complete: seedCount, Complete: seedCount,
@ -303,85 +301,10 @@ func getPeers(ann *models.Announce) (ipv4s, ipv6s models.PeerList) {
if ann.Left == 0 { if ann.Left == 0 {
// If they're seeding, give them only leechers. // If they're seeding, give them only leechers.
return appendPeers(ipv4s, ipv6s, ann, ann.Torrent.Leechers, ann.NumWant) return ann.Torrent.Leechers.AppendPeers(ipv4s, ipv6s, ann, ann.NumWant)
} }
// If they're leeching, prioritize giving them seeders. // If they're leeching, prioritize giving them seeders.
ipv4s, ipv6s = appendPeers(ipv4s, ipv6s, ann, ann.Torrent.Seeders, ann.NumWant) ipv4s, ipv6s = ann.Torrent.Seeders.AppendPeers(ipv4s, ipv6s, ann, ann.NumWant)
return appendPeers(ipv4s, ipv6s, ann, ann.Torrent.Leechers, ann.NumWant-len(ipv4s)-len(ipv6s)) return ann.Torrent.Leechers.AppendPeers(ipv4s, ipv6s, ann, ann.NumWant-len(ipv4s)-len(ipv6s))
}
// appendPeers implements the logic of adding peers to the IPv4 or IPv6 lists.
func appendPeers(ipv4s, ipv6s models.PeerList, ann *models.Announce, peers models.PeerMap, wanted int) (models.PeerList, models.PeerList) {
if ann.Config.PreferredSubnet {
return appendSubnetPeers(ipv4s, ipv6s, ann, peers, wanted)
}
count := 0
for _, peer := range peers {
if count >= wanted {
break
} else if peersEquivalent(&peer, ann.Peer) {
continue
}
if ann.HasIPv6() && peer.HasIPv6() {
ipv6s = append(ipv6s, peer)
count++
} else if peer.HasIPv4() {
ipv4s = append(ipv4s, peer)
count++
}
}
return ipv4s, ipv6s
}
// appendSubnetPeers is an alternative version of appendPeers used when the
// config variable PreferredSubnet is enabled.
func appendSubnetPeers(ipv4s, ipv6s models.PeerList, ann *models.Announce, peers models.PeerMap, wanted int) (models.PeerList, models.PeerList) {
var subnetIPv4 net.IPNet
var subnetIPv6 net.IPNet
if ann.HasIPv4() {
subnetIPv4 = net.IPNet{ann.IPv4, net.CIDRMask(ann.Config.PreferredIPv4Subnet, 32)}
}
if ann.HasIPv6() {
subnetIPv6 = net.IPNet{ann.IPv6, net.CIDRMask(ann.Config.PreferredIPv6Subnet, 128)}
}
// Iterate over the peers twice: first add only peers in the same subnet and
// if we still need more peers grab ones that haven't already been added.
count := 0
for _, checkInSubnet := range [2]bool{true, false} {
for _, peer := range peers {
if count >= wanted {
break
}
inSubnet4 := peer.HasIPv4() && subnetIPv4.Contains(peer.IP)
inSubnet6 := peer.HasIPv6() && subnetIPv6.Contains(peer.IP)
if peersEquivalent(&peer, ann.Peer) || checkInSubnet != (inSubnet4 || inSubnet6) {
continue
}
if ann.HasIPv6() && peer.HasIPv6() {
ipv6s = append(ipv6s, peer)
count++
} else if peer.HasIPv4() {
ipv4s = append(ipv4s, peer)
count++
}
}
}
return ipv4s, ipv6s
}
// peersEquivalent checks if two peers represent the same entity.
func peersEquivalent(a, b *models.Peer) bool {
return a.ID == b.ID || a.UserID != 0 && a.UserID == b.UserID
} }

View file

@ -88,7 +88,7 @@ func (c *Conn) DeleteLeecher(infohash string, p *models.Peer) error {
if !ok { if !ok {
return models.ErrTorrentDNE return models.ErrTorrentDNE
} }
delete(t.Leechers, p.Key()) t.Leechers.Delete(p.Key())
return nil return nil
} }
@ -101,7 +101,7 @@ func (c *Conn) DeleteSeeder(infohash string, p *models.Peer) error {
if !ok { if !ok {
return models.ErrTorrentDNE return models.ErrTorrentDNE
} }
delete(t.Seeders, p.Key()) t.Seeders.Delete(p.Key())
return nil return nil
} }
@ -114,7 +114,7 @@ func (c *Conn) PutLeecher(infohash string, p *models.Peer) error {
if !ok { if !ok {
return models.ErrTorrentDNE return models.ErrTorrentDNE
} }
t.Leechers[p.Key()] = *p t.Leechers.Put(*p)
return nil return nil
} }
@ -127,7 +127,7 @@ func (c *Conn) PutSeeder(infohash string, p *models.Peer) error {
if !ok { if !ok {
return models.ErrTorrentDNE return models.ErrTorrentDNE
} }
t.Seeders[p.Key()] = *p t.Seeders.Put(*p)
return nil return nil
} }
@ -228,19 +228,8 @@ func (c *Conn) PurgeInactivePeers(purgeEmptyTorrents bool, before time.Time) err
continue // Torrent deleted since keys were computed. continue // Torrent deleted since keys were computed.
} }
for key, peer := range torrent.Seeders { torrent.Seeders.Purge(unixtime)
if peer.LastAnnounce <= unixtime { torrent.Leechers.Purge(unixtime)
delete(torrent.Seeders, key)
stats.RecordPeerEvent(stats.ReapedSeed, peer.HasIPv6())
}
}
for key, peer := range torrent.Leechers {
if peer.LastAnnounce <= unixtime {
delete(torrent.Leechers, key)
stats.RecordPeerEvent(stats.ReapedLeech, peer.HasIPv6())
}
}
peers := torrent.PeerCount() peers := torrent.PeerCount()
c.torrentsM.Unlock() c.torrentsM.Unlock()

View file

@ -6,9 +6,11 @@ package models
import ( import (
"net" "net"
"sync"
"time" "time"
"github.com/chihaya/chihaya/config" "github.com/chihaya/chihaya/config"
"github.com/chihaya/chihaya/stats"
) )
var ( var (
@ -55,6 +57,18 @@ type Peer struct {
LastAnnounce int64 `json:"last_announce"` LastAnnounce int64 `json:"last_announce"`
} }
func (p *Peer) HasIPv4() bool {
return !p.HasIPv6()
}
func (p *Peer) HasIPv6() bool {
return len(p.IP) == net.IPv6len
}
func (p *Peer) Key() PeerKey {
return NewPeerKey(p.ID, p.HasIPv6())
}
type PeerList []Peer type PeerList []Peer
type PeerKey string type PeerKey string
@ -67,18 +81,146 @@ func NewPeerKey(peerID string, ipv6 bool) PeerKey {
} }
// PeerMap is a map from PeerKeys to Peers. // PeerMap is a map from PeerKeys to Peers.
type PeerMap map[PeerKey]Peer type PeerMap struct {
peers map[PeerKey]Peer
func (p *Peer) HasIPv4() bool { sync.RWMutex
return !p.HasIPv6()
} }
func (p *Peer) HasIPv6() bool { func NewPeerMap() PeerMap {
return len(p.IP) == net.IPv6len return PeerMap{
peers: make(map[PeerKey]Peer),
}
} }
func (p *Peer) Key() PeerKey { func (pm *PeerMap) Contains(pk PeerKey) (exists bool) {
return NewPeerKey(p.ID, p.HasIPv6()) pm.RLock()
defer pm.RUnlock()
_, exists = pm.peers[pk]
return
}
func (pm *PeerMap) LookUp(pk PeerKey) (peer Peer, exists bool) {
pm.RLock()
defer pm.RUnlock()
peer, exists = pm.peers[pk]
return
}
func (pm *PeerMap) Put(p Peer) {
pm.Lock()
defer pm.Unlock()
pm.peers[p.Key()] = p
}
func (pm *PeerMap) Delete(pk PeerKey) {
pm.Lock()
defer pm.Unlock()
delete(pm.peers, pk)
}
func (pm *PeerMap) Len() int {
pm.RLock()
defer pm.RUnlock()
return len(pm.peers)
}
func (pm *PeerMap) Purge(unixtime int64) {
pm.Lock()
defer pm.Unlock()
for key, peer := range pm.peers {
if peer.LastAnnounce <= unixtime {
delete(pm.peers, key)
stats.RecordPeerEvent(stats.ReapedSeed, peer.HasIPv6())
}
}
}
// AppendPeers implements the logic of adding peers to given IPv4 or IPv6 lists.
func (pm *PeerMap) AppendPeers(ipv4s, ipv6s PeerList, ann *Announce, wanted int) (PeerList, PeerList) {
if ann.Config.PreferredSubnet {
return pm.AppendSubnetPeers(ipv4s, ipv6s, ann, wanted)
}
pm.Lock()
defer pm.Unlock()
count := 0
for _, peer := range pm.peers {
if count >= wanted {
break
} else if peersEquivalent(&peer, ann.Peer) {
continue
}
if ann.HasIPv6() && peer.HasIPv6() {
ipv6s = append(ipv6s, peer)
count++
} else if peer.HasIPv4() {
ipv4s = append(ipv4s, peer)
count++
}
}
return ipv4s, ipv6s
}
// peersEquivalent checks if two peers represent the same entity.
func peersEquivalent(a, b *Peer) bool {
return a.ID == b.ID || a.UserID != 0 && a.UserID == b.UserID
}
// AppendSubnetPeers is an alternative version of appendPeers used when the
// config variable PreferredSubnet is enabled.
func (pm *PeerMap) AppendSubnetPeers(ipv4s, ipv6s PeerList, ann *Announce, wanted int) (PeerList, PeerList) {
var subnetIPv4 net.IPNet
var subnetIPv6 net.IPNet
if ann.HasIPv4() {
subnetIPv4 = net.IPNet{ann.IPv4, net.CIDRMask(ann.Config.PreferredIPv4Subnet, 32)}
}
if ann.HasIPv6() {
subnetIPv6 = net.IPNet{ann.IPv6, net.CIDRMask(ann.Config.PreferredIPv6Subnet, 128)}
}
pm.Lock()
defer pm.Unlock()
// Iterate over the peers twice: first add only peers in the same subnet and
// if we still need more peers grab ones that haven't already been added.
count := 0
for _, checkInSubnet := range [2]bool{true, false} {
for _, peer := range pm.peers {
if count >= wanted {
break
}
inSubnet4 := peer.HasIPv4() && subnetIPv4.Contains(peer.IP)
inSubnet6 := peer.HasIPv6() && subnetIPv6.Contains(peer.IP)
if peersEquivalent(&peer, ann.Peer) || checkInSubnet != (inSubnet4 || inSubnet6) {
continue
}
if ann.HasIPv6() && peer.HasIPv6() {
ipv6s = append(ipv6s, peer)
count++
} else if peer.HasIPv4() {
ipv4s = append(ipv4s, peer)
count++
}
}
}
return ipv4s, ipv6s
} }
// Torrent is a swarm for a given torrent file. // Torrent is a swarm for a given torrent file.
@ -95,21 +237,9 @@ type Torrent struct {
LastAction int64 `json:"last_action"` LastAction int64 `json:"last_action"`
} }
// InSeederPool returns true if a peer is within a Torrent's map of seeders.
func (t *Torrent) InSeederPool(p *Peer) (exists bool) {
_, exists = t.Seeders[p.Key()]
return
}
// InLeecherPool returns true if a peer is within a Torrent's map of leechers.
func (t *Torrent) InLeecherPool(p *Peer) (exists bool) {
_, exists = t.Leechers[p.Key()]
return
}
// PeerCount returns the total number of peers connected on this Torrent. // PeerCount returns the total number of peers connected on this Torrent.
func (t *Torrent) PeerCount() int { func (t *Torrent) PeerCount() int {
return len(t.Seeders) + len(t.Leechers) return t.Seeders.Len() + t.Leechers.Len()
} }
// User is a registered user for private trackers. // User is a registered user for private trackers.