Only keep one IP in the Peer type

This commit is contained in:
Justin Li 2014-08-01 12:37:35 -04:00
parent dfeda26c16
commit b628b934ac
6 changed files with 129 additions and 118 deletions

View file

@ -70,12 +70,12 @@ func compactPeers(ipv6 bool, peers models.PeerList) []byte {
if ipv6 { if ipv6 {
for _, peer := range peers { for _, peer := range peers {
compactPeers.Write(peer.IPv6) compactPeers.Write(peer.IP)
compactPeers.Write([]byte{byte(peer.Port >> 8), byte(peer.Port & 0xff)}) compactPeers.Write([]byte{byte(peer.Port >> 8), byte(peer.Port & 0xff)})
} }
} else { } else {
for _, peer := range peers { for _, peer := range peers {
compactPeers.Write(peer.IPv4) compactPeers.Write(peer.IP)
compactPeers.Write([]byte{byte(peer.Port >> 8), byte(peer.Port & 0xff)}) compactPeers.Write([]byte{byte(peer.Port >> 8), byte(peer.Port & 0xff)})
} }
} }
@ -94,16 +94,8 @@ func peersList(ipv4s, ipv6s models.PeerList) (peers []bencode.Dict) {
} }
func peerDict(peer *models.Peer, ipv6 bool) bencode.Dict { func peerDict(peer *models.Peer, ipv6 bool) bencode.Dict {
var ip string
if ipv6 {
ip = peer.IPv6.String()
} else {
ip = peer.IPv4.String()
}
return bencode.Dict{ return bencode.Dict{
"ip": ip, "ip": peer.IP.String(),
"peer id": peer.ID, "peer id": peer.ID,
"port": peer.Port, "port": peer.Port,
} }

View file

@ -141,12 +141,11 @@ func (s *Stats) RecordEvent(event int) {
s.events <- event s.events <- event
} }
func (s *Stats) RecordPeerEvent(event int, ipv string) { func (s *Stats) RecordPeerEvent(event int, ipv6 bool) {
switch ipv { if ipv6 {
case "ipv4":
s.ipv4PeerEvents <- event
case "ipv6":
s.ipv6PeerEvents <- event s.ipv6PeerEvents <- event
} else {
s.ipv4PeerEvents <- event
} }
} }
@ -268,8 +267,8 @@ func RecordEvent(event int) {
} }
// RecordPeerEvent broadcasts a peer event to the default stats queue. // RecordPeerEvent broadcasts a peer event to the default stats queue.
func RecordPeerEvent(event int, ipv string) { func RecordPeerEvent(event int, ipv6 bool) {
DefaultStats.RecordPeerEvent(event, ipv) DefaultStats.RecordPeerEvent(event, ipv6)
} }
// RecordTiming broadcasts a timing event to the default stats queue. // RecordTiming broadcasts a timing event to the default stats queue.

View file

@ -54,36 +54,36 @@ func (tkr *Tracker) HandleAnnounce(ann *models.Announce, w Writer) error {
return err return err
} }
peer := models.NewPeer(ann, user, torrent) var createdIPv4, createdIPv6, snatchedIPv4, snatchedIPv6 bool
peer, peerv4, peerv6 := models.NewPeer(ann, user, torrent)
var createdIPv4, createdIPv6 bool if peerv4 != nil {
if peer.HasIPv4() { createdIPv4, err = updateSwarm(conn, ann, peerv4, torrent)
createdIPv4, err = updateSwarm(conn, ann, peer, torrent, "ipv4")
if err != nil { if err != nil {
return err return err
} }
} }
if peer.HasIPv6() { if peerv6 != nil {
createdIPv6, err = updateSwarm(conn, ann, peer, torrent, "ipv6") createdIPv6, err = updateSwarm(conn, ann, peerv6, torrent)
if err != nil { if err != nil {
return err return err
} }
} }
if peerv4 != nil {
snatchedIPv4, err = handleEvent(conn, ann, peerv4, user, torrent)
if err != nil {
return err
}
}
if peerv6 != nil {
snatchedIPv6, err = handleEvent(conn, ann, peerv6, user, torrent)
if err != nil {
return err
}
}
created := createdIPv4 || createdIPv6 created := createdIPv4 || createdIPv6
var snatchedIPv4, snatchedIPv6 bool
if peer.HasIPv4() {
snatchedIPv4, err = handleEvent(conn, ann, peer, user, torrent, "ipv4")
if err != nil {
return err
}
}
if peer.HasIPv6() {
snatchedIPv6, err = handleEvent(conn, ann, peer, user, torrent, "ipv6")
if err != nil {
return err
}
}
snatched := snatchedIPv4 || snatchedIPv6 snatched := snatchedIPv4 || snatchedIPv6
if tkr.cfg.PrivateEnabled { if tkr.cfg.PrivateEnabled {
@ -103,23 +103,23 @@ func (tkr *Tracker) HandleAnnounce(ann *models.Announce, w Writer) error {
} }
// updateSwarm handles the changes to a torrent's swarm given an announce. // updateSwarm handles the changes to a torrent's swarm given an announce.
func updateSwarm(c Conn, ann *models.Announce, p *models.Peer, t *models.Torrent, ipv string) (created bool, err error) { func updateSwarm(c Conn, ann *models.Announce, p *models.Peer, t *models.Torrent) (created bool, err error) {
c.TouchTorrent(t.Infohash) c.TouchTorrent(t.Infohash)
switch { switch {
case t.InSeederPool(p.ID, ipv): case t.InSeederPool(p):
err = c.PutSeeder(t.Infohash, ipv, p) err = c.PutSeeder(t.Infohash, p)
if err != nil { if err != nil {
return return
} }
t.Seeders[models.NewPeerKey(p.ID, ipv)] = *p t.Seeders[p.Key()] = *p
case t.InLeecherPool(p.ID, ipv): case t.InLeecherPool(p):
err = c.PutLeecher(t.Infohash, ipv, p) err = c.PutLeecher(t.Infohash, p)
if err != nil { if err != nil {
return return
} }
t.Leechers[models.NewPeerKey(p.ID, ipv)] = *p t.Leechers[p.Key()] = *p
default: default:
if ann.Event != "" && ann.Event != "started" { if ann.Event != "" && ann.Event != "started" {
@ -128,20 +128,20 @@ func updateSwarm(c Conn, ann *models.Announce, p *models.Peer, t *models.Torrent
} }
if ann.Left == 0 { if ann.Left == 0 {
err = c.PutSeeder(t.Infohash, ipv, p) err = c.PutSeeder(t.Infohash, p)
if err != nil { if err != nil {
return return
} }
t.Seeders[models.NewPeerKey(p.ID, ipv)] = *p t.Seeders[p.Key()] = *p
stats.RecordPeerEvent(stats.NewSeed, ipv) stats.RecordPeerEvent(stats.NewSeed, p.HasIPv6())
} else { } else {
err = c.PutLeecher(t.Infohash, ipv, p) err = c.PutLeecher(t.Infohash, p)
if err != nil { if err != nil {
return return
} }
t.Leechers[models.NewPeerKey(p.ID, ipv)] = *p t.Leechers[p.Key()] = *p
stats.RecordPeerEvent(stats.NewLeech, ipv) stats.RecordPeerEvent(stats.NewLeech, p.HasIPv6())
} }
created = true created = true
} }
@ -151,28 +151,26 @@ func updateSwarm(c Conn, ann *models.Announce, p *models.Peer, t *models.Torrent
// handleEvent checks to see whether an announce has an event and if it does, // handleEvent checks to see whether an announce has an event and if it does,
// properly handles that event. // properly handles that event.
func handleEvent(c Conn, ann *models.Announce, p *models.Peer, u *models.User, t *models.Torrent, ipv string) (snatched bool, err error) { func handleEvent(c Conn, ann *models.Announce, p *models.Peer, u *models.User, t *models.Torrent) (snatched bool, err error) {
peerkey := models.NewPeerKey(p.ID, ipv)
switch { switch {
case ann.Event == "stopped" || ann.Event == "paused": case ann.Event == "stopped" || ann.Event == "paused":
// updateSwarm checks if the peer is active on the torrent, // updateSwarm checks if the peer is active on the torrent,
// so one of these branches must be followed. // so one of these branches must be followed.
if t.InSeederPool(p.ID, ipv) { if t.InSeederPool(p) {
err = c.DeleteSeeder(t.Infohash, peerkey) err = c.DeleteSeeder(t.Infohash, p)
if err != nil { if err != nil {
return return
} }
delete(t.Seeders, models.NewPeerKey(p.ID, ipv)) delete(t.Seeders, p.Key())
stats.RecordPeerEvent(stats.DeletedSeed, ipv) stats.RecordPeerEvent(stats.DeletedSeed, p.HasIPv6())
} else if t.InLeecherPool(p.ID, ipv) { } else if t.InLeecherPool(p) {
err = c.DeleteLeecher(t.Infohash, peerkey) err = c.DeleteLeecher(t.Infohash, p)
if err != nil { if err != nil {
return return
} }
delete(t.Leechers, models.NewPeerKey(p.ID, ipv)) delete(t.Leechers, p.Key())
stats.RecordPeerEvent(stats.DeletedLeech, ipv) stats.RecordPeerEvent(stats.DeletedLeech, p.HasIPv6())
} }
case ann.Event == "completed": case ann.Event == "completed":
@ -190,40 +188,41 @@ func handleEvent(c Conn, ann *models.Announce, p *models.Peer, u *models.User, t
u.Snatches++ u.Snatches++
} }
if t.InLeecherPool(p.ID, ipv) { if t.InLeecherPool(p) {
err = leecherFinished(c, t, p, ipv) err = leecherFinished(c, t, p)
} else { } else {
err = models.ErrBadRequest err = models.ErrBadRequest
} }
// If one of the dual-stacked peers is already a seeder, they have already // If one of the dual-stacked peers is already a seeder, they have already
// snatched. // snatched.
if !(t.InSeederPool(p.ID, "ipv4") || t.InSeederPool(p.ID, "ipv6")) { _, v4seed := t.Seeders[models.NewPeerKey(p.ID, false)]
_, v6seed := t.Seeders[models.NewPeerKey(p.ID, true)]
if !(v4seed || v6seed) {
snatched = true snatched = true
} }
case t.InLeecherPool(p.ID, ipv) && ann.Left == 0: case t.InLeecherPool(p) && ann.Left == 0:
// A leecher completed but the event was never received. // A leecher completed but the event was never received.
err = leecherFinished(c, t, p, ipv) err = leecherFinished(c, t, p)
} }
return return
} }
// leecherFinished moves a peer from the leeching pool to the seeder pool. // leecherFinished moves a peer from the leeching pool to the seeder pool.
func leecherFinished(c Conn, t *models.Torrent, p *models.Peer, ipv string) error { func leecherFinished(c Conn, t *models.Torrent, p *models.Peer) error {
peerkey := models.NewPeerKey(p.ID, ipv) if err := c.DeleteLeecher(t.Infohash, p); err != nil {
if err := c.DeleteLeecher(t.Infohash, peerkey); err != nil {
return err return err
} }
delete(t.Leechers, peerkey) delete(t.Leechers, p.Key())
if err := c.PutSeeder(t.Infohash, ipv, p); err != nil { if err := c.PutSeeder(t.Infohash, p); err != nil {
return err return err
} }
t.Seeders[peerkey] = *p t.Seeders[p.Key()] = *p
stats.RecordPeerEvent(stats.Completed, ipv) stats.RecordPeerEvent(stats.Completed, p.HasIPv6())
return nil return nil
} }
@ -276,7 +275,7 @@ func appendPeers(ipv4s, ipv6s models.PeerList, ann *models.Announce, announcer *
continue continue
} }
if announcer.HasIPv6() && peer.HasIPv6() { if ann.HasIPv6() && peer.HasIPv6() {
ipv6s = append(ipv6s, peer) ipv6s = append(ipv6s, peer)
count++ count++
} else if peer.HasIPv4() { } else if peer.HasIPv4() {
@ -294,12 +293,12 @@ func appendSubnetPeers(ipv4s, ipv6s models.PeerList, ann *models.Announce, annou
var subnetIPv4 net.IPNet var subnetIPv4 net.IPNet
var subnetIPv6 net.IPNet var subnetIPv6 net.IPNet
if announcer.HasIPv4() { if ann.HasIPv4() {
subnetIPv4 = net.IPNet{announcer.IPv4, net.CIDRMask(ann.Config.PreferredIPv4Subnet, 32)} subnetIPv4 = net.IPNet{ann.IPv4, net.CIDRMask(ann.Config.PreferredIPv4Subnet, 32)}
} }
if announcer.HasIPv6() { if ann.HasIPv6() {
subnetIPv6 = net.IPNet{announcer.IPv6, net.CIDRMask(ann.Config.PreferredIPv6Subnet, 128)} subnetIPv6 = net.IPNet{ann.IPv6, net.CIDRMask(ann.Config.PreferredIPv6Subnet, 128)}
} }
// Iterate over the peers twice: first add only peers in the same subnet and // Iterate over the peers twice: first add only peers in the same subnet and
@ -311,14 +310,14 @@ func appendSubnetPeers(ipv4s, ipv6s models.PeerList, ann *models.Announce, annou
break break
} }
inSubnet4 := peer.HasIPv4() && subnetIPv4.Contains(peer.IPv4) inSubnet4 := peer.HasIPv4() && subnetIPv4.Contains(peer.IP)
inSubnet6 := peer.HasIPv6() && subnetIPv6.Contains(peer.IPv6) inSubnet6 := peer.HasIPv6() && subnetIPv6.Contains(peer.IP)
if peersEquivalent(&peer, announcer) || checkInSubnet != (inSubnet4 || inSubnet6) { if peersEquivalent(&peer, announcer) || checkInSubnet != (inSubnet4 || inSubnet6) {
continue continue
} }
if announcer.HasIPv6() && peer.HasIPv6() { if ann.HasIPv6() && peer.HasIPv6() {
ipv6s = append(ipv6s, peer) ipv6s = append(ipv6s, peer)
count++ count++
} else if peer.HasIPv4() { } else if peer.HasIPv4() {

View file

@ -64,11 +64,11 @@ type Conn interface {
DeleteTorrent(infohash string) error DeleteTorrent(infohash string) error
IncrementTorrentSnatches(infohash string) error IncrementTorrentSnatches(infohash string) error
PutLeecher(infohash, ipv string, p *models.Peer) error PutLeecher(infohash string, p *models.Peer) error
DeleteLeecher(infohash string, pk models.PeerKey) error DeleteLeecher(infohash string, p *models.Peer) error
PutSeeder(infohash, ipv string, p *models.Peer) error PutSeeder(infohash string, p *models.Peer) error
DeleteSeeder(infohash string, pk models.PeerKey) error DeleteSeeder(infohash string, p *models.Peer) error
PurgeInactiveTorrent(infohash string) error PurgeInactiveTorrent(infohash string) error
PurgeInactivePeers(purgeEmptyTorrents bool, before time.Time) error PurgeInactivePeers(purgeEmptyTorrents bool, before time.Time) error

View file

@ -93,7 +93,7 @@ func (c *Conn) TouchTorrent(infohash string) error {
return nil return nil
} }
func (c *Conn) DeleteLeecher(infohash string, pk models.PeerKey) error { func (c *Conn) DeleteLeecher(infohash string, p *models.Peer) error {
c.torrentsM.Lock() c.torrentsM.Lock()
defer c.torrentsM.Unlock() defer c.torrentsM.Unlock()
@ -101,12 +101,12 @@ func (c *Conn) DeleteLeecher(infohash string, pk models.PeerKey) error {
if !ok { if !ok {
return models.ErrTorrentDNE return models.ErrTorrentDNE
} }
delete(t.Leechers, pk) delete(t.Leechers, p.Key())
return nil return nil
} }
func (c *Conn) DeleteSeeder(infohash string, pk models.PeerKey) error { func (c *Conn) DeleteSeeder(infohash string, p *models.Peer) error {
c.torrentsM.Lock() c.torrentsM.Lock()
defer c.torrentsM.Unlock() defer c.torrentsM.Unlock()
@ -114,12 +114,12 @@ func (c *Conn) DeleteSeeder(infohash string, pk models.PeerKey) error {
if !ok { if !ok {
return models.ErrTorrentDNE return models.ErrTorrentDNE
} }
delete(t.Seeders, pk) delete(t.Seeders, p.Key())
return nil return nil
} }
func (c *Conn) PutLeecher(infohash, ipv string, p *models.Peer) error { func (c *Conn) PutLeecher(infohash string, p *models.Peer) error {
c.torrentsM.Lock() c.torrentsM.Lock()
defer c.torrentsM.Unlock() defer c.torrentsM.Unlock()
@ -127,12 +127,12 @@ func (c *Conn) PutLeecher(infohash, ipv string, p *models.Peer) error {
if !ok { if !ok {
return models.ErrTorrentDNE return models.ErrTorrentDNE
} }
t.Leechers[models.NewPeerKey(p.ID, ipv)] = *p t.Leechers[p.Key()] = *p
return nil return nil
} }
func (c *Conn) PutSeeder(infohash, ipv string, p *models.Peer) error { func (c *Conn) PutSeeder(infohash string, p *models.Peer) error {
c.torrentsM.Lock() c.torrentsM.Lock()
defer c.torrentsM.Unlock() defer c.torrentsM.Unlock()
@ -140,7 +140,7 @@ func (c *Conn) PutSeeder(infohash, ipv string, p *models.Peer) error {
if !ok { if !ok {
return models.ErrTorrentDNE return models.ErrTorrentDNE
} }
t.Seeders[models.NewPeerKey(p.ID, ipv)] = *p t.Seeders[p.Key()] = *p
return nil return nil
} }
@ -244,14 +244,14 @@ func (c *Conn) PurgeInactivePeers(purgeEmptyTorrents bool, before time.Time) err
for key, peer := range torrent.Seeders { for key, peer := range torrent.Seeders {
if peer.LastAnnounce <= unixtime { if peer.LastAnnounce <= unixtime {
delete(torrent.Seeders, key) delete(torrent.Seeders, key)
stats.RecordPeerEvent(stats.ReapedSeed, string(key[:4])) stats.RecordPeerEvent(stats.ReapedSeed, peer.HasIPv6())
} }
} }
for key, peer := range torrent.Leechers { for key, peer := range torrent.Leechers {
if peer.LastAnnounce <= unixtime { if peer.LastAnnounce <= unixtime {
delete(torrent.Leechers, key) delete(torrent.Leechers, key)
stats.RecordPeerEvent(stats.ReapedLeech, string(key[:4])) stats.RecordPeerEvent(stats.ReapedLeech, peer.HasIPv6())
} }
} }

View file

@ -6,7 +6,6 @@ package models
import ( import (
"net" "net"
"strings"
"time" "time"
"github.com/chihaya/chihaya/config" "github.com/chihaya/chihaya/config"
@ -47,8 +46,7 @@ type Peer struct {
UserID uint64 `json:"user_id"` UserID uint64 `json:"user_id"`
TorrentID uint64 `json:"torrent_id"` TorrentID uint64 `json:"torrent_id"`
IPv4 net.IP `json:"ipv4,omitempty"` IP net.IP `json:"ip,omitempty"`
IPv6 net.IP `json:"ipv6,omitempty"`
Port uint64 `json:"port"` Port uint64 `json:"port"`
Uploaded uint64 `json:"uploaded"` Uploaded uint64 `json:"uploaded"`
@ -60,15 +58,12 @@ type Peer struct {
type PeerList []Peer type PeerList []Peer
type PeerKey string type PeerKey string
func NewPeerKey(peerID, ipv string) (pk PeerKey) { func NewPeerKey(peerID string, ipv6 bool) PeerKey {
switch strings.ToLower(ipv) { if ipv6 {
case "ipv4": return PeerKey("6:" + peerID)
pk = PeerKey("IPv4" + peerID) } else {
case "ipv6": return PeerKey("4:" + peerID)
pk = PeerKey("IPv6" + peerID)
} }
return pk
} }
// PeerMap is a map from PeerKeys to Peers. // PeerMap is a map from PeerKeys to Peers.
@ -78,9 +73,9 @@ type PeerMap map[PeerKey]Peer
// for the announce parameter, it panics. When provided nil for the user or // for the announce parameter, it panics. When provided nil for the user or
// torrent parameter, it returns a Peer{UserID: 0} or Peer{TorrentID: 0} // torrent parameter, it returns a Peer{UserID: 0} or Peer{TorrentID: 0}
// respectively. // respectively.
func NewPeer(a *Announce, u *User, t *Torrent) *Peer { func NewPeer(a *Announce, u *User, t *Torrent) (peer *Peer, v4 *Peer, v6 *Peer) {
if a == nil { if a == nil {
panic("tracker: announce cannot equal nil") panic("models: announce cannot equal nil")
} }
var userID uint64 var userID uint64
@ -93,26 +88,44 @@ func NewPeer(a *Announce, u *User, t *Torrent) *Peer {
torrentID = t.ID torrentID = t.ID
} }
return &Peer{ peer = &Peer{
ID: a.PeerID, ID: a.PeerID,
UserID: userID, UserID: userID,
TorrentID: torrentID, TorrentID: torrentID,
IPv4: a.IPv4,
IPv6: a.IPv6,
Port: a.Port, Port: a.Port,
Uploaded: a.Uploaded, Uploaded: a.Uploaded,
Downloaded: a.Downloaded, Downloaded: a.Downloaded,
Left: a.Left, Left: a.Left,
LastAnnounce: time.Now().Unix(), LastAnnounce: time.Now().Unix(),
} }
if a.IPv4 != nil && a.IPv6 != nil {
v4 = peer
v4.IP = a.IPv4
v6 = &*peer
v6.IP = a.IPv6
} else if a.IPv4 != nil {
v4 = peer
v4.IP = a.IPv4
} else if a.IPv6 != nil {
v6 = peer
v6.IP = a.IPv6
} else {
panic("models: announce must have an IP")
}
return
} }
func (p *Peer) HasIPv4() bool { func (p *Peer) HasIPv4() bool {
return p.IPv4 != nil return !p.HasIPv6()
} }
func (p *Peer) HasIPv6() bool { func (p *Peer) HasIPv6() bool {
return p.IPv6 != nil return len(p.IP) == net.IPv6len
}
func (p *Peer) Key() PeerKey {
return NewPeerKey(p.ID, p.HasIPv6())
} }
// Torrent is a swarm for a given torrent file. // Torrent is a swarm for a given torrent file.
@ -130,14 +143,14 @@ type Torrent struct {
} }
// InSeederPool returns true if a peer is within a Torrent's map of seeders. // InSeederPool returns true if a peer is within a Torrent's map of seeders.
func (t *Torrent) InSeederPool(peerID, ipv string) (exists bool) { func (t *Torrent) InSeederPool(p *Peer) (exists bool) {
_, exists = t.Seeders[NewPeerKey(peerID, ipv)] _, exists = t.Seeders[p.Key()]
return return
} }
// InLeecherPool returns true if a peer is within a Torrent's map of leechers. // InLeecherPool returns true if a peer is within a Torrent's map of leechers.
func (t *Torrent) InLeecherPool(peerID, ipv string) (exists bool) { func (t *Torrent) InLeecherPool(p *Peer) (exists bool) {
_, exists = t.Leechers[NewPeerKey(peerID, ipv)] _, exists = t.Leechers[p.Key()]
return return
} }
@ -191,6 +204,14 @@ func (a Announce) ClientID() (clientID string) {
return return
} }
func (a Announce) HasIPv4() bool {
return a.IPv4 != nil
}
func (a Announce) HasIPv6() bool {
return a.IPv6 != nil
}
// AnnounceDelta contains the changes to a Peer's state. These changes are // AnnounceDelta contains the changes to a Peer's state. These changes are
// recorded by the backend driver. // recorded by the backend driver.
type AnnounceDelta struct { type AnnounceDelta struct {