From 9382ac482e95285e9306d159f9cdd89a9e3defd0 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Sat, 23 Nov 2013 23:48:38 -0500 Subject: [PATCH] removed all references to "transaction" --- config/config.go | 100 ++- config/example.json | 4 +- server/announce.go | 598 +++++++-------- server/scrape.go | 10 +- server/server.go | 4 +- storage/tracker/redis/conn_test.go | 563 ++++++++++++++ storage/tracker/redis/redis.go | 894 +++++++++++----------- storage/tracker/redis/redis_bench_test.go | 430 +++++------ storage/tracker/redis/redis_test.go | 240 +++--- storage/tracker/redis/tx_test.go | 563 -------------- 10 files changed, 1698 insertions(+), 1708 deletions(-) create mode 100644 storage/tracker/redis/conn_test.go delete mode 100644 storage/tracker/redis/tx_test.go diff --git a/config/config.go b/config/config.go index c5907da..dde644b 100644 --- a/config/config.go +++ b/config/config.go @@ -6,85 +6,83 @@ package config import ( - "encoding/json" - "io" - "os" - "time" + "encoding/json" + "io" + "os" + "time" ) type Duration struct { - time.Duration + time.Duration } func (d *Duration) MarshalJSON() ([]byte, error) { - return json.Marshal(d.String()) + return json.Marshal(d.String()) } func (d *Duration) UnmarshalJSON(b []byte) error { - var str string - err := json.Unmarshal(b, &str) - d.Duration, err = time.ParseDuration(str) - return err + var str string + err := json.Unmarshal(b, &str) + d.Duration, err = time.ParseDuration(str) + return err } // DataStore represents the configuration used to connect to a data store. type DataStore struct { - Driver string `json:"driver"` - Network string `json:"network` - Host string `json:"host"` - Port string `json:"port"` - Username string `json:"user"` - Password string `json:"pass"` - Schema string `json:"schema,omitempty"` - Encoding string `json:"encoding,omitempty"` - Prefix string `json:"prefix,omitempty"` + Driver string `json:"driver"` + Network string `json:"network` + Host string `json:"host"` + Port string `json:"port"` + Username string `json:"user"` + Password string `json:"pass"` + Schema string `json:"schema,omitempty"` + Encoding string `json:"encoding,omitempty"` + Prefix string `json:"prefix,omitempty"` - MaxIdleConns int `json:"max_idle_conns,omitempty"` - IdleTimeout *Duration `json:"idle_timeout,omitempty"` + MaxIdleConns int `json:"max_idle_conns,omitempty"` + IdleTimeout *Duration `json:"idle_timeout,omitempty"` } // Config represents a configuration for a server.Server. type Config struct { - Addr string `json:"addr"` - PubAddr string `json:"pub_addr"` - Cache DataStore `json:"cache"` - Storage DataStore `json:"storage"` + Addr string `json:"addr"` + PubAddr string `json:"pub_addr"` + Cache DataStore `json:"cache"` + Storage DataStore `json:"storage"` - Private bool `json:"private"` - Freeleech bool `json:"freeleech"` - Slots bool `json:"slots"` + Private bool `json:"private"` + Freeleech bool `json:"freeleech"` + Slots bool `json:"slots"` - Announce Duration `json:"announce"` - MinAnnounce Duration `json:"min_announce"` - ReadTimeout Duration `json:"read_timeout"` - DefaultNumWant int `json:"default_num_want"` - - TxRetries int `json:"tx_retries"` + Announce Duration `json:"announce"` + MinAnnounce Duration `json:"min_announce"` + ReadTimeout Duration `json:"read_timeout"` + DefaultNumWant int `json:"default_num_want"` } // Open is a shortcut to open a file, read it, and generate a Config. // It supports relative and absolute paths. func Open(path string) (*Config, error) { - expandedPath := os.ExpandEnv(path) - f, err := os.Open(expandedPath) - if err != nil { - return nil, err - } - defer f.Close() + expandedPath := os.ExpandEnv(path) + f, err := os.Open(expandedPath) + if err != nil { + return nil, err + } + defer f.Close() - conf, err := newConfig(f) - if err != nil { - return nil, err - } - return conf, nil + conf, err := newConfig(f) + if err != nil { + return nil, err + } + return conf, nil } // New decodes JSON from a Reader into a Config. func newConfig(raw io.Reader) (*Config, error) { - conf := &Config{} - err := json.NewDecoder(raw).Decode(conf) - if err != nil { - return nil, err - } - return conf, nil + conf := &Config{} + err := json.NewDecoder(raw).Decode(conf) + if err != nil { + return nil, err + } + return conf, nil } diff --git a/config/example.json b/config/example.json index 44a1ba9..aa85f77 100644 --- a/config/example.json +++ b/config/example.json @@ -30,8 +30,6 @@ "announce": "30m", "min_announce": "15m", "read_timeout": "20s", - "default_num_want": 50, - - "tx_retries": 3 + "default_num_want": 50 } diff --git a/server/announce.go b/server/announce.go index c45ca26..42970b4 100644 --- a/server/announce.go +++ b/server/announce.go @@ -5,357 +5,351 @@ package server import ( - "errors" - "log" - "net/http" - "path" - "strconv" - "time" + "errors" + "log" + "net/http" + "path" + "strconv" + "time" - "github.com/pushrax/chihaya/storage" + "github.com/pushrax/chihaya/storage" ) func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { - // Parse the required parameters off of a query - compact, numWant, infohash, peerID, event, ip, port, uploaded, downloaded, left, err := s.validateAnnounceQuery(r) - if err != nil { - fail(err, w, r) - return - } + // Parse the required parameters off of a query + compact, numWant, infohash, peerID, event, ip, port, uploaded, downloaded, left, err := s.validateAnnounceQuery(r) + if err != nil { + fail(err, w, r) + return + } - // Retry failed transactions a specified number of times - for i := 0; i < s.conf.TxRetries; i++ { + // Get a connection to the tracker db + conn, err := s.dbConnPool.Get() + if err != nil { + log.Panicf("server: %s", err) + } - // Start a transaction - tx, err := s.dbConnPool.Get() - if err != nil { - log.Panicf("server: %s", err) - } + // Validate the user's passkey + passkey, _ := path.Split(r.URL.Path) + user, err := validateUser(conn, passkey) + if err != nil { + fail(err, w, r) + return + } - // Validate the user's passkey - passkey, _ := path.Split(r.URL.Path) - user, err := validateUser(tx, passkey) - if err != nil { - fail(err, w, r) - return - } + // Check if the user's client is whitelisted + whitelisted, err := conn.ClientWhitelisted(parsePeerID(peerID)) + if err != nil { + log.Panicf("server: %s", err) + } + if !whitelisted { + fail(errors.New("Your client is not approved"), w, r) + return + } - // Check if the user's client is whitelisted - whitelisted, err := tx.ClientWhitelisted(parsePeerID(peerID)) - if err != nil { - log.Panicf("server: %s", err) - } - if !whitelisted { - fail(errors.New("Your client is not approved"), w, r) - return - } + // Find the specified torrent + torrent, exists, err := conn.FindTorrent(infohash) + if err != nil { + log.Panicf("server: %s", err) + } + if !exists { + fail(errors.New("This torrent does not exist"), w, r) + return + } - // Find the specified torrent - torrent, exists, err := tx.FindTorrent(infohash) - if err != nil { - log.Panicf("server: %s", err) - } - if !exists { - fail(errors.New("This torrent does not exist"), w, r) - return - } + // If the torrent was pruned and the user is seeding, unprune it + if !torrent.Active && left == 0 { + err := conn.MarkActive(torrent) + if err != nil { + log.Panicf("server: %s", err) + } + } - // If the torrent was pruned and the user is seeding, unprune it - if !torrent.Active && left == 0 { - err := tx.MarkActive(torrent) - if err != nil { - log.Panicf("server: %s", err) - } - } + // Create a new peer object from the request + peer := &storage.Peer{ + ID: peerID, + UserID: user.ID, + TorrentID: torrent.ID, + IP: ip, + Port: port, + Uploaded: uploaded, + Downloaded: downloaded, + Left: left, + LastAnnounce: time.Now().Unix(), + } - // Create a new peer object from the request - peer := &storage.Peer{ - ID: peerID, - UserID: user.ID, - TorrentID: torrent.ID, - IP: ip, - Port: port, - Uploaded: uploaded, - Downloaded: downloaded, - Left: left, - LastAnnounce: time.Now().Unix(), - } + // Look for the user in in the pool of seeders and leechers + _, seeder := torrent.Seeders[storage.PeerMapKey(peer)] + _, leecher := torrent.Leechers[storage.PeerMapKey(peer)] - // Look for the user in in the pool of seeders and leechers - _, seeder := torrent.Seeders[storage.PeerMapKey(peer)] - _, leecher := torrent.Leechers[storage.PeerMapKey(peer)] + switch { + // Guarantee that no user is in both pools + case seeder && leecher: + if left == 0 { + err := conn.RemoveLeecher(torrent, peer) + if err != nil { + log.Panicf("server: %s", err) + } + leecher = false + } else { + err := conn.RemoveSeeder(torrent, peer) + if err != nil { + log.Panicf("server: %s", err) + } + seeder = false + } - switch { - // Guarantee that no user is in both pools - case seeder && leecher: - if left == 0 { - err := tx.RemoveLeecher(torrent, peer) - if err != nil { - log.Panicf("server: %s", err) - } - leecher = false - } else { - err := tx.RemoveSeeder(torrent, peer) - if err != nil { - log.Panicf("server: %s", err) - } - seeder = false - } + case seeder: + // Update the peer with the stats from the request + err := conn.SetSeeder(torrent, peer) + if err != nil { + log.Panicf("server: %s", err) + } - case seeder: - // Update the peer with the stats from the request - err := tx.SetSeeder(torrent, peer) - if err != nil { - log.Panicf("server: %s", err) - } + case leecher: + // Update the peer with the stats from the request + err := conn.SetLeecher(torrent, peer) + if err != nil { + log.Panicf("server: %s", err) + } - case leecher: - // Update the peer with the stats from the request - err := tx.SetLeecher(torrent, peer) - if err != nil { - log.Panicf("server: %s", err) - } + default: + // Check the user's slots to see if they're allowed to leech + if s.conf.Slots && user.Slots != -1 && left != 0 { + if user.SlotsUsed >= user.Slots { + fail(errors.New("You've run out of download slots."), w, r) + return + } + } - default: - // Check the user's slots to see if they're allowed to leech - if s.conf.Slots && user.Slots != -1 && left != 0 { - if user.SlotsUsed >= user.Slots { - fail(errors.New("You've run out of download slots."), w, r) - return - } - } + if left == 0 { + // Save the peer as a new seeder + err := conn.AddSeeder(torrent, peer) + if err != nil { + log.Panicf("server: %s", err) + } + } else { + // Save the peer as a new leecher and increment the user's slots + err := conn.IncrementSlots(user) + if err != nil { + log.Panicf("server: %s", err) + } + err = conn.AddLeecher(torrent, peer) + if err != nil { + log.Panicf("server: %s", err) + } + } + } - if left == 0 { - // Save the peer as a new seeder - err := tx.AddSeeder(torrent, peer) - if err != nil { - log.Panicf("server: %s", err) - } - } else { - // Save the peer as a new leecher and increment the user's slots - err := tx.IncrementSlots(user) - if err != nil { - log.Panicf("server: %s", err) - } - err = tx.AddLeecher(torrent, peer) - if err != nil { - log.Panicf("server: %s", err) - } - } - } + // Handle any events in the request + switch { + case event == "stopped" || event == "paused": + if seeder { + err := conn.RemoveSeeder(torrent, peer) + if err != nil { + log.Panicf("server: %s", err) + } + } + if leecher { + err := conn.RemoveLeecher(torrent, peer) + if err != nil { + log.Panicf("server: %s", err) + } + err = conn.DecrementSlots(user) + if err != nil { + log.Panicf("server: %s", err) + } + } - // Handle any events in the request - switch { - case event == "stopped" || event == "paused": - if seeder { - err := tx.RemoveSeeder(torrent, peer) - if err != nil { - log.Panicf("server: %s", err) - } - } - if leecher { - err := tx.RemoveLeecher(torrent, peer) - if err != nil { - log.Panicf("server: %s", err) - } - err = tx.DecrementSlots(user) - if err != nil { - log.Panicf("server: %s", err) - } - } + case event == "completed": + err := conn.RecordSnatch(user, torrent) + if err != nil { + log.Panicf("server: %s", err) + } + if leecher { + err := conn.LeecherFinished(torrent, peer) + if err != nil { + log.Panicf("server: %s", err) + } + } - case event == "completed": - err := tx.RecordSnatch(user, torrent) - if err != nil { - log.Panicf("server: %s", err) - } - if leecher { - err := tx.LeecherFinished(torrent, peer) - if err != nil { - log.Panicf("server: %s", err) - } - } + case leecher && left == 0: + // A leecher completed but the event was never received + err := conn.LeecherFinished(torrent, peer) + if err != nil { + log.Panicf("server: %s", err) + } + } - case leecher && left == 0: - // A leecher completed but the event was never received - err := tx.LeecherFinished(torrent, peer) - if err != nil { - log.Panicf("server: %s", err) - } - } + if ip != peer.IP || port != peer.Port { + peer.Port = port + peer.IP = ip + } - if ip != peer.IP || port != peer.Port { - peer.Port = port - peer.IP = ip - } + // Generate the response + seedCount := len(torrent.Seeders) + leechCount := len(torrent.Leechers) - // Generate the response - seedCount := len(torrent.Seeders) - leechCount := len(torrent.Leechers) + writeBencoded(w, "d") + writeBencoded(w, "complete") + writeBencoded(w, seedCount) + writeBencoded(w, "incomplete") + writeBencoded(w, leechCount) + writeBencoded(w, "interval") + writeBencoded(w, s.conf.Announce.Duration) + writeBencoded(w, "min interval") + writeBencoded(w, s.conf.MinAnnounce.Duration) - writeBencoded(w, "d") - writeBencoded(w, "complete") - writeBencoded(w, seedCount) - writeBencoded(w, "incomplete") - writeBencoded(w, leechCount) - writeBencoded(w, "interval") - writeBencoded(w, s.conf.Announce.Duration) - writeBencoded(w, "min interval") - writeBencoded(w, s.conf.MinAnnounce.Duration) + if numWant > 0 && event != "stopped" && event != "paused" { + writeBencoded(w, "peers") + var peerCount, count int - if numWant > 0 && event != "stopped" && event != "paused" { - writeBencoded(w, "peers") - var peerCount, count int + if compact { + if left > 0 { + peerCount = minInt(numWant, leechCount) + } else { + peerCount = minInt(numWant, leechCount+seedCount-1) + } + writeBencoded(w, strconv.Itoa(peerCount*6)) + writeBencoded(w, ":") + } else { + writeBencoded(w, "l") + } - if compact { - if left > 0 { - peerCount = minInt(numWant, leechCount) - } else { - peerCount = minInt(numWant, leechCount+seedCount-1) - } - writeBencoded(w, strconv.Itoa(peerCount*6)) - writeBencoded(w, ":") - } else { - writeBencoded(w, "l") - } + if left > 0 { + // If they're seeding, give them only leechers + writeLeechers(w, torrent, count, numWant, compact) + } else { + // If they're leeching, prioritize giving them seeders + writeSeeders(w, torrent, count, numWant, compact) + writeLeechers(w, torrent, count, numWant, compact) + } - if left > 0 { - // If they're seeding, give them only leechers - writeLeechers(w, torrent, count, numWant, compact) - } else { - // If they're leeching, prioritize giving them seeders - writeSeeders(w, torrent, count, numWant, compact) - writeLeechers(w, torrent, count, numWant, compact) - } + if compact && peerCount != count { + log.Panicf("Calculated peer count (%d) != real count (%d)", peerCount, count) + } - if compact && peerCount != count { - log.Panicf("Calculated peer count (%d) != real count (%d)", peerCount, count) - } - - if !compact { - writeBencoded(w, "e") - } - } - writeBencoded(w, "e") - - return - } + if !compact { + writeBencoded(w, "e") + } + } + writeBencoded(w, "e") } func (s Server) validateAnnounceQuery(r *http.Request) (compact bool, numWant int, infohash, peerID, event, ip string, port, uploaded, downloaded, left uint64, err error) { - pq, err := parseQuery(r.URL.RawQuery) - if err != nil { - return false, 0, "", "", "", "", 0, 0, 0, 0, err - } + pq, err := parseQuery(r.URL.RawQuery) + if err != nil { + return false, 0, "", "", "", "", 0, 0, 0, 0, err + } - compact = pq.Params["compact"] == "1" - numWant = requestedPeerCount(s.conf.DefaultNumWant, pq) - infohash, _ = pq.Params["info_hash"] - peerID, _ = pq.Params["peer_id"] - event, _ = pq.Params["event"] - ip, _ = requestedIP(r, pq) - port, portErr := pq.getUint64("port") - uploaded, uploadedErr := pq.getUint64("uploaded") - downloaded, downloadedErr := pq.getUint64("downloaded") - left, leftErr := pq.getUint64("left") + compact = pq.Params["compact"] == "1" + numWant = requestedPeerCount(s.conf.DefaultNumWant, pq) + infohash, _ = pq.Params["info_hash"] + peerID, _ = pq.Params["peer_id"] + event, _ = pq.Params["event"] + ip, _ = requestedIP(r, pq) + port, portErr := pq.getUint64("port") + uploaded, uploadedErr := pq.getUint64("uploaded") + downloaded, downloadedErr := pq.getUint64("downloaded") + left, leftErr := pq.getUint64("left") - if infohash == "" || - peerID == "" || - ip == "" || - portErr != nil || - uploadedErr != nil || - downloadedErr != nil || - leftErr != nil { - return false, 0, "", "", "", "", 0, 0, 0, 0, errors.New("Malformed request") - } - return + if infohash == "" || + peerID == "" || + ip == "" || + portErr != nil || + uploadedErr != nil || + downloadedErr != nil || + leftErr != nil { + return false, 0, "", "", "", "", 0, 0, 0, 0, errors.New("Malformed request") + } + return } func requestedPeerCount(fallback int, pq *parsedQuery) int { - if numWantStr, exists := pq.Params["numWant"]; exists { - numWant, err := strconv.Atoi(numWantStr) - if err != nil { - return fallback - } - return numWant - } - return fallback + if numWantStr, exists := pq.Params["numWant"]; exists { + numWant, err := strconv.Atoi(numWantStr) + if err != nil { + return fallback + } + return numWant + } + return fallback } func requestedIP(r *http.Request, pq *parsedQuery) (string, error) { - ip, ok := pq.Params["ip"] - ipv4, okv4 := pq.Params["ipv4"] - xRealIPs, xRealOk := pq.Params["X-Real-Ip"] + ip, ok := pq.Params["ip"] + ipv4, okv4 := pq.Params["ipv4"] + xRealIPs, xRealOk := pq.Params["X-Real-Ip"] - switch { - case ok: - return ip, nil + switch { + case ok: + return ip, nil - case okv4: - return ipv4, nil + case okv4: + return ipv4, nil - case xRealOk && len(xRealIPs) > 0: - return string(xRealIPs[0]), nil + case xRealOk && len(xRealIPs) > 0: + return string(xRealIPs[0]), nil - default: - portIndex := len(r.RemoteAddr) - 1 - for ; portIndex >= 0; portIndex-- { - if r.RemoteAddr[portIndex] == ':' { - break - } - } - if portIndex != -1 { - return r.RemoteAddr[0:portIndex], nil - } - return "", errors.New("Failed to parse IP address") - } + default: + portIndex := len(r.RemoteAddr) - 1 + for ; portIndex >= 0; portIndex-- { + if r.RemoteAddr[portIndex] == ':' { + break + } + } + if portIndex != -1 { + return r.RemoteAddr[0:portIndex], nil + } + return "", errors.New("Failed to parse IP address") + } } func minInt(a, b int) int { - if a < b { - return a - } - return b + if a < b { + return a + } + return b } func writeSeeders(w http.ResponseWriter, t *storage.Torrent, count, numWant int, compact bool) { - for _, seed := range t.Seeders { - if count >= numWant { - break - } - if compact { - // TODO writeBencoded(w, compactAddr) - } else { - writeBencoded(w, "d") - writeBencoded(w, "ip") - writeBencoded(w, seed.IP) - writeBencoded(w, "peer id") - writeBencoded(w, seed.ID) - writeBencoded(w, "port") - writeBencoded(w, seed.Port) - writeBencoded(w, "e") - } - count++ - } + for _, seed := range t.Seeders { + if count >= numWant { + break + } + if compact { + // TODO writeBencoded(w, compactAddr) + } else { + writeBencoded(w, "d") + writeBencoded(w, "ip") + writeBencoded(w, seed.IP) + writeBencoded(w, "peer id") + writeBencoded(w, seed.ID) + writeBencoded(w, "port") + writeBencoded(w, seed.Port) + writeBencoded(w, "e") + } + count++ + } } func writeLeechers(w http.ResponseWriter, t *storage.Torrent, count, numWant int, compact bool) { - for _, leech := range t.Leechers { - if count >= numWant { - break - } - if compact { - // TODO writeBencoded(w, compactAddr) - } else { - writeBencoded(w, "d") - writeBencoded(w, "ip") - writeBencoded(w, leech.IP) - writeBencoded(w, "peer id") - writeBencoded(w, leech.ID) - writeBencoded(w, "port") - writeBencoded(w, leech.Port) - writeBencoded(w, "e") - } - count++ - } + for _, leech := range t.Leechers { + if count >= numWant { + break + } + if compact { + // TODO writeBencoded(w, compactAddr) + } else { + writeBencoded(w, "d") + writeBencoded(w, "ip") + writeBencoded(w, leech.IP) + writeBencoded(w, "peer id") + writeBencoded(w, leech.ID) + writeBencoded(w, "port") + writeBencoded(w, leech.Port) + writeBencoded(w, "e") + } + count++ + } } diff --git a/server/scrape.go b/server/scrape.go index 0efd8a5..abdb376 100644 --- a/server/scrape.go +++ b/server/scrape.go @@ -22,15 +22,15 @@ func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request) { return } - // Start a transaction - tx, err := s.dbConnPool.Get() + // Get a connection to the tracker db + conn, err := s.dbConnPool.Get() if err != nil { log.Fatal(err) } // Find and validate the user passkey, _ := path.Split(r.URL.Path) - _, err = validateUser(tx, passkey) + _, err = validateUser(conn, passkey) if err != nil { fail(err, w, r) return @@ -40,7 +40,7 @@ func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request) { writeBencoded(w, "files") if pq.Infohashes != nil { for _, infohash := range pq.Infohashes { - torrent, exists, err := tx.FindTorrent(infohash) + torrent, exists, err := conn.FindTorrent(infohash) if err != nil { log.Panicf("server: %s", err) } @@ -50,7 +50,7 @@ func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request) { } } } else if infohash, exists := pq.Params["info_hash"]; exists { - torrent, exists, err := tx.FindTorrent(infohash) + torrent, exists, err := conn.FindTorrent(infohash) if err != nil { log.Panicf("server: %s", err) } diff --git a/server/server.go b/server/server.go index e54c3ae..b41cba2 100644 --- a/server/server.go +++ b/server/server.go @@ -117,13 +117,13 @@ func fail(err error, w http.ResponseWriter, r *http.Request) { w.(http.Flusher).Flush() } -func validateUser(tx tracker.Conn, dir string) (*storage.User, error) { +func validateUser(conn tracker.Conn, dir string) (*storage.User, error) { if len(dir) != 34 { return nil, errors.New("Passkey is invalid") } passkey := dir[1:33] - user, exists, err := tx.FindUser(passkey) + user, exists, err := conn.FindUser(passkey) if err != nil { log.Panicf("server: %s", err) } diff --git a/storage/tracker/redis/conn_test.go b/storage/tracker/redis/conn_test.go new file mode 100644 index 0000000..2893c6f --- /dev/null +++ b/storage/tracker/redis/conn_test.go @@ -0,0 +1,563 @@ +// Copyright 2013 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package redis + +import ( + "math/rand" + "os" + "reflect" + "testing" + "time" + + "github.com/pushrax/chihaya/config" + "github.com/pushrax/chihaya/storage" + "github.com/pushrax/chihaya/storage/tracker" +) + +func createTestConn() tracker.Conn { + testConfig, err := config.Open(os.Getenv("TESTCONFIGPATH")) + panicOnErr(err) + conf := &testConfig.Cache + + testPool, err := tracker.Open(conf) + panicOnErr(err) + + newConn, err := testPool.Get() + panicOnErr(err) + + return newConn +} + +func TestFindUserSuccess(t *testing.T) { + conn := createTestConn() + testUser := createTestUser() + + panicOnErr(conn.AddUser(testUser)) + foundUser, found, err := conn.FindUser(testUser.Passkey) + panicOnErr(err) + if !found { + t.Error("user not found", testUser) + } + if *foundUser != *testUser { + t.Error("found user mismatch", *foundUser, testUser) + } + // Cleanup + panicOnErr(conn.RemoveUser(testUser)) +} + +func TestFindUserFail(t *testing.T) { + conn := createTestConn() + testUser := createTestUser() + + foundUser, found, err := conn.FindUser(testUser.Passkey) + panicOnErr(err) + if found { + t.Error("user found", foundUser) + } +} + +func TestRemoveUser(t *testing.T) { + conn := createTestConn() + testUser := createTestUser() + + panicOnErr(conn.AddUser(testUser)) + err := conn.RemoveUser(testUser) + panicOnErr(err) + foundUser, found, err := conn.FindUser(testUser.Passkey) + panicOnErr(err) + if found { + t.Error("removed user found", foundUser) + } +} + +func TestFindTorrentSuccess(t *testing.T) { + conn := createTestConn() + testTorrent := createTestTorrent() + panicOnErr(conn.AddTorrent(testTorrent)) + + foundTorrent, found, err := conn.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + if !found { + t.Error("torrent not found", testTorrent) + } + if !reflect.DeepEqual(foundTorrent, testTorrent) { + t.Error("found torrent mismatch", foundTorrent, testTorrent) + } + // Cleanup + panicOnErr(conn.RemoveTorrent(testTorrent)) +} + +func TestFindTorrentFail(t *testing.T) { + conn := createTestConn() + testTorrent := createTestTorrent() + + foundTorrent, found, err := conn.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + if found { + t.Error("torrent found", foundTorrent) + } +} + +func TestRemoveTorrent(t *testing.T) { + conn := createTestConn() + testTorrent := createTestTorrent() + panicOnErr(conn.AddTorrent(testTorrent)) + + panicOnErr(conn.RemoveTorrent(testTorrent)) + foundTorrent, found, err := conn.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + if found { + t.Error("removed torrent found", foundTorrent) + } + // Cleanup + panicOnErr(conn.RemoveTorrent(testTorrent)) +} + +func TestClientWhitelistSuccess(t *testing.T) { + conn := createTestConn() + testPeerID := "-lt0D30-" + + panicOnErr(conn.WhitelistClient(testPeerID)) + found, err := conn.ClientWhitelisted(testPeerID) + panicOnErr(err) + if !found { + t.Error("peerID not found", testPeerID) + } + // Cleanup + panicOnErr(conn.UnWhitelistClient(testPeerID)) +} + +func TestClientWhitelistFail(t *testing.T) { + conn := createTestConn() + testPeerID2 := "TIX0192" + + found, err := conn.ClientWhitelisted(testPeerID2) + panicOnErr(err) + if found { + t.Error("peerID found", testPeerID2) + } +} + +func TestRecordSnatch(t *testing.T) { + conn := createTestConn() + testTorrent := createTestTorrent() + testUser := createTestUser() + panicOnErr(conn.AddTorrent(testTorrent)) + panicOnErr(conn.AddUser(testUser)) + + userSnatches := testUser.Snatches + torrentSnatches := testTorrent.Snatches + + panicOnErr(conn.RecordSnatch(testUser, testTorrent)) + + foundTorrent, _, err := conn.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + foundUser, _, err := conn.FindUser(testUser.Passkey) + panicOnErr(err) + + if testUser.Snatches != userSnatches+1 { + t.Error("snatch not recorded to local user", testUser.Snatches, userSnatches+1) + } + if testTorrent.Snatches != torrentSnatches+1 { + t.Error("snatch not recorded to local torrent") + } + if foundUser.Snatches != userSnatches+1 { + t.Error("snatch not recorded to cached user", foundUser.Snatches, userSnatches+1) + } + if foundTorrent.Snatches != torrentSnatches+1 { + t.Error("snatch not recorded to cached torrent") + } + // Cleanup + panicOnErr(conn.RemoveTorrent(testTorrent)) + panicOnErr(conn.RemoveUser(testUser)) +} + +func TestMarkActive(t *testing.T) { + conn := createTestConn() + testTorrent := createTestTorrent() + testTorrent.Active = false + panicOnErr(conn.AddTorrent(testTorrent)) + + panicOnErr(conn.MarkActive(testTorrent)) + foundTorrent, _, err := conn.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + + if foundTorrent.Active != true { + t.Error("cached torrent not activated") + } + if testTorrent.Active != true { + t.Error("cached torrent not activated") + } + // Cleanup + panicOnErr(conn.RemoveTorrent(testTorrent)) +} + +func TestClientWhitelistRemove(t *testing.T) { + conn := createTestConn() + testPeerID := "-lt0D30-" + panicOnErr(conn.WhitelistClient(testPeerID)) + panicOnErr(conn.UnWhitelistClient(testPeerID)) + + found, err := conn.ClientWhitelisted(testPeerID) + panicOnErr(err) + if found { + t.Error("removed peerID found", testPeerID) + } +} + +func TestAddSeeder(t *testing.T) { + conn := createTestConn() + testTorrent := createTestTorrent() + panicOnErr(conn.AddTorrent(testTorrent)) + testSeeder := createTestPeer(createTestUserID(), testTorrent.ID) + + panicOnErr(conn.AddSeeder(testTorrent, testSeeder)) + foundTorrent, found, err := conn.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + foundSeeder, found := foundTorrent.Seeders[storage.PeerMapKey(testSeeder)] + if found && foundSeeder != *testSeeder { + t.Error("seeder not added to cache", testSeeder) + } + foundSeeder, found = testTorrent.Seeders[storage.PeerMapKey(testSeeder)] + if found && foundSeeder != *testSeeder { + t.Error("seeder not added to local", testSeeder) + } + // Cleanup + panicOnErr(conn.RemoveTorrent(testTorrent)) +} + +func TestAddLeecher(t *testing.T) { + conn := createTestConn() + testTorrent := createTestTorrent() + panicOnErr(conn.AddTorrent(testTorrent)) + testLeecher := createTestPeer(createTestUserID(), testTorrent.ID) + + panicOnErr(conn.AddLeecher(testTorrent, testLeecher)) + foundTorrent, found, err := conn.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + foundLeecher, found := foundTorrent.Leechers[storage.PeerMapKey(testLeecher)] + if found && foundLeecher != *testLeecher { + t.Error("leecher not added to cache", testLeecher) + } + foundLeecher, found = testTorrent.Leechers[storage.PeerMapKey(testLeecher)] + if found && foundLeecher != *testLeecher { + t.Error("leecher not added to local", testLeecher) + } + // Cleanup + panicOnErr(conn.RemoveTorrent(testTorrent)) +} + +func TestRemoveSeeder(t *testing.T) { + conn := createTestConn() + testTorrent := createTestTorrent() + panicOnErr(conn.AddTorrent(testTorrent)) + testSeeder := createTestPeer(createTestUserID(), testTorrent.ID) + panicOnErr(conn.AddSeeder(testTorrent, testSeeder)) + + panicOnErr(conn.RemoveSeeder(testTorrent, testSeeder)) + foundSeeder, found := testTorrent.Seeders[storage.PeerMapKey(testSeeder)] + if found || foundSeeder == *testSeeder { + t.Error("seeder not removed from local", foundSeeder) + } + + foundTorrent, found, err := conn.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + foundSeeder, found = foundTorrent.Seeders[storage.PeerMapKey(testSeeder)] + if found || foundSeeder == *testSeeder { + t.Error("seeder not removed from cache", foundSeeder, *testSeeder) + } + // Cleanup + panicOnErr(conn.RemoveTorrent(testTorrent)) +} + +func TestRemoveLeecher(t *testing.T) { + conn := createTestConn() + testTorrent := createTestTorrent() + panicOnErr(conn.AddTorrent(testTorrent)) + testLeecher := createTestPeer(createTestUserID(), testTorrent.ID) + panicOnErr(conn.AddLeecher(testTorrent, testLeecher)) + + panicOnErr(conn.RemoveLeecher(testTorrent, testLeecher)) + foundTorrent, found, err := conn.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + foundLeecher, found := foundTorrent.Leechers[storage.PeerMapKey(testLeecher)] + if found || foundLeecher == *testLeecher { + t.Error("leecher not removed from cache", foundLeecher, *testLeecher) + } + foundLeecher, found = testTorrent.Leechers[storage.PeerMapKey(testLeecher)] + if found || foundLeecher == *testLeecher { + t.Error("leecher not removed from local", foundLeecher, *testLeecher) + } + // Cleanup + panicOnErr(conn.RemoveTorrent(testTorrent)) +} + +func TestSetSeeder(t *testing.T) { + conn := createTestConn() + testTorrent := createTestTorrent() + panicOnErr(conn.AddTorrent(testTorrent)) + testSeeder := createTestPeer(createTestUserID(), testTorrent.ID) + panicOnErr(conn.AddSeeder(testTorrent, testSeeder)) + + r := rand.New(rand.NewSource(time.Now().UnixNano())) + testSeeder.Uploaded += uint64(r.Int63()) + + panicOnErr(conn.SetSeeder(testTorrent, testSeeder)) + + foundTorrent, _, err := conn.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + foundSeeder, _ := foundTorrent.Seeders[storage.PeerMapKey(testSeeder)] + if foundSeeder != *testSeeder { + t.Error("seeder not updated in cache", foundSeeder, *testSeeder) + } + foundSeeder, _ = testTorrent.Seeders[storage.PeerMapKey(testSeeder)] + if foundSeeder != *testSeeder { + t.Error("seeder not updated in local", foundSeeder, *testSeeder) + } + // Cleanup + panicOnErr(conn.RemoveTorrent(testTorrent)) +} + +func TestSetLeecher(t *testing.T) { + conn := createTestConn() + testTorrent := createTestTorrent() + panicOnErr(conn.AddTorrent(testTorrent)) + testLeecher := createTestPeer(createTestUserID(), testTorrent.ID) + panicOnErr(conn.AddLeecher(testTorrent, testLeecher)) + + r := rand.New(rand.NewSource(time.Now().UnixNano())) + testLeecher.Uploaded += uint64(r.Int63()) + + panicOnErr(conn.SetLeecher(testTorrent, testLeecher)) + foundTorrent, _, err := conn.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + foundLeecher, _ := foundTorrent.Leechers[storage.PeerMapKey(testLeecher)] + if foundLeecher != *testLeecher { + t.Error("leecher not updated in cache", testLeecher) + } + foundLeecher, _ = testTorrent.Leechers[storage.PeerMapKey(testLeecher)] + if foundLeecher != *testLeecher { + t.Error("leecher not updated in local", testLeecher) + } + // Cleanup + panicOnErr(conn.RemoveTorrent(testTorrent)) +} + +func TestIncrementSlots(t *testing.T) { + conn := createTestConn() + testUser := createTestUser() + panicOnErr(conn.AddUser(testUser)) + numSlots := testUser.Slots + + panicOnErr(conn.IncrementSlots(testUser)) + foundUser, _, err := conn.FindUser(testUser.Passkey) + panicOnErr(err) + + if foundUser.Slots != numSlots+1 { + t.Error("cached slots not incremented") + } + if testUser.Slots != numSlots+1 { + t.Error("local slots not incremented") + } + // Cleanup + panicOnErr(conn.RemoveUser(testUser)) +} + +func TestDecrementSlots(t *testing.T) { + conn := createTestConn() + testUser := createTestUser() + panicOnErr(conn.AddUser(testUser)) + numSlots := testUser.Slots + + panicOnErr(conn.DecrementSlots(testUser)) + foundUser, _, err := conn.FindUser(testUser.Passkey) + panicOnErr(err) + + if foundUser.Slots != numSlots-1 { + t.Error("cached slots not incremented") + } + if testUser.Slots != numSlots-1 { + t.Error("local slots not incremented") + } + // Cleanup + panicOnErr(conn.RemoveUser(testUser)) +} + +func TestLeecherFinished(t *testing.T) { + conn := createTestConn() + testTorrent := createTestTorrent() + panicOnErr(conn.AddTorrent(testTorrent)) + testLeecher := createTestPeer(createTestUserID(), testTorrent.ID) + panicOnErr(conn.AddLeecher(testTorrent, testLeecher)) + testLeecher.Left = 0 + + panicOnErr(conn.LeecherFinished(testTorrent, testLeecher)) + + foundTorrent, _, err := conn.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + foundSeeder, _ := foundTorrent.Seeders[storage.PeerMapKey(testLeecher)] + if foundSeeder != *testLeecher { + t.Error("seeder not added to cache", foundSeeder, *testLeecher) + } + foundSeeder, _ = foundTorrent.Leechers[storage.PeerMapKey(testLeecher)] + if foundSeeder == *testLeecher { + t.Error("leecher not removed from cache", testLeecher) + } + foundSeeder, _ = testTorrent.Seeders[storage.PeerMapKey(testLeecher)] + if foundSeeder != *testLeecher { + t.Error("seeder not added to local", testLeecher) + } + foundSeeder, _ = testTorrent.Leechers[storage.PeerMapKey(testLeecher)] + if foundSeeder == *testLeecher { + t.Error("leecher not removed from local", testLeecher) + } + // Cleanup + panicOnErr(conn.RemoveTorrent(testTorrent)) +} + +// Add, update, verify remove +func TestUpdatePeer(t *testing.T) { + conn := createTestConn() + testTorrent := createTestTorrent() + testSeeder := createTestPeer(createTestUserID(), testTorrent.ID) + panicOnErr(conn.AddTorrent(testTorrent)) + panicOnErr(conn.AddSeeder(testTorrent, testSeeder)) + // Update a seeder, set it, then check to make sure it updated + r := rand.New(rand.NewSource(time.Now().UnixNano())) + testSeeder.Uploaded += uint64(r.Int63()) + + panicOnErr(conn.SetSeeder(testTorrent, testSeeder)) + + panicOnErr(conn.RemoveSeeder(testTorrent, testSeeder)) + foundTorrent, _, err := conn.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + if seeder, exists := foundTorrent.Seeders[storage.PeerMapKey(testSeeder)]; exists { + t.Error("seeder not removed from cache", seeder) + } + if seeder, exists := testTorrent.Seeders[storage.PeerMapKey(testSeeder)]; exists { + t.Error("seeder not removed from local", seeder) + } + // Cleanup + panicOnErr(conn.RemoveTorrent(testTorrent)) +} + +func TestParallelFindUser(t *testing.T) { + t.Parallel() + if testing.Short() { + t.Skip() + } + conn := createTestConn() + testUserSuccess := createTestUser() + testUserFail := createTestUser() + panicOnErr(conn.AddUser(testUserSuccess)) + + for i := 0; i < 10; i++ { + foundUser, found, err := conn.FindUser(testUserFail.Passkey) + panicOnErr(err) + if found { + t.Error("user found", foundUser) + } + foundUser, found, err = conn.FindUser(testUserSuccess.Passkey) + panicOnErr(err) + if !found { + t.Error("user not found", testUserSuccess) + } + if *foundUser != *testUserSuccess { + t.Error("found user mismatch", *foundUser, testUserSuccess) + } + } + // Cleanup + panicOnErr(conn.RemoveUser(testUserSuccess)) +} + +func TestParallelFindTorrent(t *testing.T) { + t.Parallel() + if testing.Short() { + t.Skip() + } + conn := createTestConn() + testTorrentSuccess := createTestTorrent() + testTorrentFail := createTestTorrent() + panicOnErr(conn.AddTorrent(testTorrentSuccess)) + + for i := 0; i < 10; i++ { + foundTorrent, found, err := conn.FindTorrent(testTorrentSuccess.Infohash) + panicOnErr(err) + if !found { + t.Error("torrent not found", testTorrentSuccess) + } + if !reflect.DeepEqual(foundTorrent, testTorrentSuccess) { + t.Error("found torrent mismatch", foundTorrent, testTorrentSuccess) + } + foundTorrent, found, err = conn.FindTorrent(testTorrentFail.Infohash) + panicOnErr(err) + if found { + t.Error("torrent found", foundTorrent) + } + } + // Cleanup + panicOnErr(conn.RemoveTorrent(testTorrentSuccess)) +} + +func TestParallelSetSeeder(t *testing.T) { + t.Parallel() + if testing.Short() { + t.Skip() + } + conn := createTestConn() + testTorrent := createTestTorrent() + panicOnErr(conn.AddTorrent(testTorrent)) + testSeeder := createTestPeer(createTestUserID(), testTorrent.ID) + panicOnErr(conn.AddSeeder(testTorrent, testSeeder)) + r := rand.New(rand.NewSource(time.Now().UnixNano())) + + for i := 0; i < 10; i++ { + testSeeder.Uploaded += uint64(r.Int63()) + + panicOnErr(conn.SetSeeder(testTorrent, testSeeder)) + + foundTorrent, _, err := conn.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + foundSeeder, _ := foundTorrent.Seeders[storage.PeerMapKey(testSeeder)] + if foundSeeder != *testSeeder { + t.Error("seeder not updated in cache", foundSeeder, *testSeeder) + } + foundSeeder, _ = testTorrent.Seeders[storage.PeerMapKey(testSeeder)] + if foundSeeder != *testSeeder { + t.Error("seeder not updated in local", foundSeeder, *testSeeder) + } + } + // Cleanup + panicOnErr(conn.RemoveTorrent(testTorrent)) +} + +func TestParallelAddLeecher(t *testing.T) { + t.Parallel() + if testing.Short() { + t.Skip() + } + conn := createTestConn() + testTorrent := createTestTorrent() + panicOnErr(conn.AddTorrent(testTorrent)) + + for i := 0; i < 10; i++ { + testLeecher := createTestPeer(createTestUserID(), testTorrent.ID) + + panicOnErr(conn.AddLeecher(testTorrent, testLeecher)) + + foundTorrent, found, err := conn.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + foundLeecher, found := foundTorrent.Leechers[storage.PeerMapKey(testLeecher)] + if found && foundLeecher != *testLeecher { + t.Error("leecher not added to cache", testLeecher) + } + foundLeecher, found = testTorrent.Leechers[storage.PeerMapKey(testLeecher)] + if found && foundLeecher != *testLeecher { + t.Error("leecher not added to local", testLeecher) + } + } + // Cleanup + panicOnErr(conn.RemoveTorrent(testTorrent)) +} diff --git a/storage/tracker/redis/redis.go b/storage/tracker/redis/redis.go index e4bd5b8..d96500f 100644 --- a/storage/tracker/redis/redis.go +++ b/storage/tracker/redis/redis.go @@ -24,92 +24,92 @@ package redis import ( - "errors" - "strconv" - "time" + "errors" + "strconv" + "time" - "github.com/garyburd/redigo/redis" + "github.com/garyburd/redigo/redis" - "github.com/pushrax/chihaya/config" - "github.com/pushrax/chihaya/storage" - "github.com/pushrax/chihaya/storage/tracker" + "github.com/pushrax/chihaya/config" + "github.com/pushrax/chihaya/storage" + "github.com/pushrax/chihaya/storage/tracker" ) var ( - ErrCreateUser = errors.New("redis: Incorrect reply length for user") - ErrCreateTorrent = errors.New("redis: Incorrect reply length for torrent") - ErrCreatePeer = errors.New("redis: Incorrect reply length for peer") - ErrMarkActive = errors.New("redis: Torrent doesn't exist") + ErrCreateUser = errors.New("redis: Incorrect reply length for user") + ErrCreateTorrent = errors.New("redis: Incorrect reply length for torrent") + ErrCreatePeer = errors.New("redis: Incorrect reply length for peer") + ErrMarkActive = errors.New("redis: Torrent doesn't exist") - SeedersPrefix = "seeders:" - LeechersPrefix = "leechers:" - TorrentPrefix = "torrent:" - UserPrefix = "user:" - PeerPrefix = "peer:" + SeedersPrefix = "seeders:" + LeechersPrefix = "leechers:" + TorrentPrefix = "torrent:" + UserPrefix = "user:" + PeerPrefix = "peer:" ) type driver struct{} // New creates and returns a tracker.Pool. func (d *driver) New(conf *config.DataStore) tracker.Pool { - return &Pool{ - conf: conf, - pool: redis.Pool{ - MaxIdle: conf.MaxIdleConns, - IdleTimeout: conf.IdleTimeout.Duration, - Dial: makeDialFunc(conf), - TestOnBorrow: testOnBorrow, - }, - } + return &Pool{ + conf: conf, + pool: redis.Pool{ + MaxIdle: conf.MaxIdleConns, + IdleTimeout: conf.IdleTimeout.Duration, + Dial: makeDialFunc(conf), + TestOnBorrow: testOnBorrow, + }, + } } // makeDialFunc configures and returns a new redis.Dial struct using the specified configuration. func makeDialFunc(conf *config.DataStore) func() (redis.Conn, error) { - return func() (conn redis.Conn, err error) { - conn, err = redis.Dial(conf.Network, conf.Host+":"+conf.Port) - if err != nil { - return nil, err - } - return conn, nil - } + return func() (conn redis.Conn, err error) { + conn, err = redis.Dial(conf.Network, conf.Host+":"+conf.Port) + if err != nil { + return nil, err + } + return conn, nil + } } // testOnBorrow pings the Redis instance func testOnBorrow(c redis.Conn, t time.Time) error { - _, err := c.Do("PING") - return err + _, err := c.Do("PING") + return err } type Pool struct { - conf *config.DataStore - pool redis.Pool + conf *config.DataStore + pool redis.Pool } func (p *Pool) Close() error { - return p.pool.Close() + return p.pool.Close() } func (p *Pool) Get() (tracker.Conn, error) { - retTx := &Tx{ - conf: p.conf, - done: false, - Conn: p.pool.Get(), - } - return retTx, nil + newConn := &Conn{ + conf: p.conf, + done: false, + Conn: p.pool.Get(), + } + return newConn, nil } -type Tx struct { - conf *config.DataStore - done bool - redis.Conn +type Conn struct { + conf *config.DataStore + done bool + redis.Conn } -func (tx *Tx) close() { - if tx.done { - panic("redis: transaction closed twice") - } - tx.done = true - tx.Conn.Close() +func (conn *Conn) close() { + if conn.done { + panic("redis: connection closed twice") + } + conn.done = true + conn.Conn.Close() } // createUser takes a string slice of length 14 and returns a pointer to a new @@ -120,33 +120,33 @@ func (tx *Tx) close() { // If the field value string cannot be converted to the correct type, // createUser will return a nil user and the conversion error. func createUser(userVals []string) (*storage.User, error) { - if len(userVals) != 14 { - return nil, ErrCreateUser - } - var user storage.User - var err error - for index, userString := range userVals { - switch userString { - case "id": - user.ID, err = strconv.ParseUint(userVals[index+1], 10, 64) - case "passkey": - user.Passkey = userVals[index+1] - case "up_multiplier": - user.UpMultiplier, err = strconv.ParseFloat(userVals[index+1], 64) - case "down_multiplier": - user.DownMultiplier, err = strconv.ParseFloat(userVals[index+1], 64) - case "slots": - user.Slots, err = strconv.ParseInt(userVals[index+1], 10, 64) - case "slots_used": - user.SlotsUsed, err = strconv.ParseInt(userVals[index+1], 10, 64) - case "snatches": - user.Snatches, err = strconv.ParseUint(userVals[index+1], 10, 64) - } - if err != nil { - return nil, err - } - } - return &user, nil + if len(userVals) != 14 { + return nil, ErrCreateUser + } + var user storage.User + var err error + for index, userString := range userVals { + switch userString { + case "id": + user.ID, err = strconv.ParseUint(userVals[index+1], 10, 64) + case "passkey": + user.Passkey = userVals[index+1] + case "up_multiplier": + user.UpMultiplier, err = strconv.ParseFloat(userVals[index+1], 64) + case "down_multiplier": + user.DownMultiplier, err = strconv.ParseFloat(userVals[index+1], 64) + case "slots": + user.Slots, err = strconv.ParseInt(userVals[index+1], 10, 64) + case "slots_used": + user.SlotsUsed, err = strconv.ParseInt(userVals[index+1], 10, 64) + case "snatches": + user.Snatches, err = strconv.ParseUint(userVals[index+1], 10, 64) + } + if err != nil { + return nil, err + } + } + return &user, nil } // createTorrent takes a string slice of length 14 and returns a pointer to a new storage.Torrent @@ -158,75 +158,75 @@ func createUser(userVals []string) (*storage.User, error) { // If the field values cannot be converted to the correct type, // createTorrent will return a nil user and the conversion error. // After converting the torrent fields, the seeders and leechers are populated by redis.getPeers -func (tx *Tx) createTorrent(torrentVals []string) (*storage.Torrent, error) { - if len(torrentVals) != 14 { - return nil, ErrCreateTorrent - } - var torrent storage.Torrent - var err error - for index, torrentString := range torrentVals { - switch torrentString { - case "id": - torrent.ID, err = strconv.ParseUint(torrentVals[index+1], 10, 64) - case "infohash": - torrent.Infohash = torrentVals[index+1] - case "active": - torrent.Active, err = strconv.ParseBool(torrentVals[index+1]) - case "snatches": - torrent.Snatches, err = strconv.ParseUint(torrentVals[index+1], 10, 32) - case "up_multiplier": - torrent.UpMultiplier, err = strconv.ParseFloat(torrentVals[index+1], 64) - case "down_multiplier": - torrent.DownMultiplier, err = strconv.ParseFloat(torrentVals[index+1], 64) - case "last_action": - torrent.LastAction, err = strconv.ParseInt(torrentVals[index+1], 10, 64) - } - if err != nil { - return nil, err - } - } - torrent.Seeders, err = tx.getPeers(torrent.ID, SeedersPrefix) - if err != nil { - return nil, err - } - torrent.Leechers, err = tx.getPeers(torrent.ID, LeechersPrefix) - if err != nil { - return nil, err - } - return &torrent, nil +func (conn *Conn) createTorrent(torrentVals []string) (*storage.Torrent, error) { + if len(torrentVals) != 14 { + return nil, ErrCreateTorrent + } + var torrent storage.Torrent + var err error + for index, torrentString := range torrentVals { + switch torrentString { + case "id": + torrent.ID, err = strconv.ParseUint(torrentVals[index+1], 10, 64) + case "infohash": + torrent.Infohash = torrentVals[index+1] + case "active": + torrent.Active, err = strconv.ParseBool(torrentVals[index+1]) + case "snatches": + torrent.Snatches, err = strconv.ParseUint(torrentVals[index+1], 10, 32) + case "up_multiplier": + torrent.UpMultiplier, err = strconv.ParseFloat(torrentVals[index+1], 64) + case "down_multiplier": + torrent.DownMultiplier, err = strconv.ParseFloat(torrentVals[index+1], 64) + case "last_action": + torrent.LastAction, err = strconv.ParseInt(torrentVals[index+1], 10, 64) + } + if err != nil { + return nil, err + } + } + torrent.Seeders, err = conn.getPeers(torrent.ID, SeedersPrefix) + if err != nil { + return nil, err + } + torrent.Leechers, err = conn.getPeers(torrent.ID, LeechersPrefix) + if err != nil { + return nil, err + } + return &torrent, nil } // setPeer writes or overwrites peer information, stored as a Redis hash. // The hash fields names are the same as the JSON tags on the storage.Peer struct. -func (tx *Tx) setPeer(peer *storage.Peer) error { - hashKey := tx.conf.Prefix + getPeerHashKey(peer) - _, err := tx.Do("HMSET", hashKey, - "id", peer.ID, - "user_id", peer.UserID, - "torrent_id", peer.TorrentID, - "ip", peer.IP, - "port", peer.Port, - "uploaded", peer.Uploaded, - "downloaded", peer.Downloaded, - "left", peer.Left, - "last_announce", peer.LastAnnounce) +func (conn *Conn) setPeer(peer *storage.Peer) error { + hashKey := conn.conf.Prefix + getPeerHashKey(peer) + _, err := conn.Do("HMSET", hashKey, + "id", peer.ID, + "user_id", peer.UserID, + "torrent_id", peer.TorrentID, + "ip", peer.IP, + "port", peer.Port, + "uploaded", peer.Uploaded, + "downloaded", peer.Downloaded, + "left", peer.Left, + "last_announce", peer.LastAnnounce) - return err + return err } // removePeer removes the given peer from the specified peer set (seeder or leecher), // and removes the peer information. // This function calls multiple redis commands, it's not internally atomic. // This function will not return an error if the peer to remove doesn't exist. -func (tx *Tx) removePeer(peer *storage.Peer, peerTypePrefix string) error { - setKey := tx.conf.Prefix + getPeerSetKey(peerTypePrefix, peer) - _, err := tx.Do("SREM", setKey, getPeerHashKey(peer)) - if err != nil { - return err - } - hashKey := tx.conf.Prefix + getPeerHashKey(peer) - _, err = tx.Do("DEL", hashKey) - return nil +func (conn *Conn) removePeer(peer *storage.Peer, peerTypePrefix string) error { + setKey := conn.conf.Prefix + getPeerSetKey(peerTypePrefix, peer) + _, err := conn.Do("SREM", setKey, getPeerHashKey(peer)) + if err != nil { + return err + } + hashKey := conn.conf.Prefix + getPeerHashKey(peer) + _, err = conn.Do("DEL", hashKey) + return nil } // removePeers removes all peers from specified peer set (seeders or leechers), @@ -234,21 +234,21 @@ func (tx *Tx) removePeer(peer *storage.Peer, peerTypePrefix string) error { // This function will not return an error if the peer to remove doesn't exist. // This function will only delete the peer set if all the individual peer deletions were successful // This function calls multiple redis commands, it's not internally atomic. -func (tx *Tx) removePeers(torrentID uint64, peers map[string]storage.Peer, peerTypePrefix string) error { - for _, peer := range peers { - hashKey := tx.conf.Prefix + getPeerHashKey(&peer) - _, err := tx.Do("DEL", hashKey) - if err != nil { - return err - } - delete(peers, storage.PeerMapKey(&peer)) - } - setKey := tx.conf.Prefix + peerTypePrefix + strconv.FormatUint(torrentID, 36) - _, err := tx.Do("DEL", setKey) - if err != nil { - return err - } - return nil +func (conn *Conn) removePeers(torrentID uint64, peers map[string]storage.Peer, peerTypePrefix string) error { + for _, peer := range peers { + hashKey := conn.conf.Prefix + getPeerHashKey(&peer) + _, err := conn.Do("DEL", hashKey) + if err != nil { + return err + } + delete(peers, storage.PeerMapKey(&peer)) + } + setKey := conn.conf.Prefix + peerTypePrefix + strconv.FormatUint(torrentID, 36) + _, err := conn.Do("DEL", setKey) + if err != nil { + return err + } + return nil } // getPeerHashKey returns a string with the peer.ID, encoded peer.UserID, and encoded peer.TorrentID, @@ -256,28 +256,28 @@ func (tx *Tx) removePeers(torrentID uint64, peers map[string]storage.Peer, peerT // This key corresponds to a Redis hash type with fields containing a peer's data. // The peer hashkey relies on the combination of peerID, userID, and torrentID being unique. func getPeerHashKey(peer *storage.Peer) string { - return peer.ID + ":" + strconv.FormatUint(peer.UserID, 36) + ":" + strconv.FormatUint(peer.TorrentID, 36) + return peer.ID + ":" + strconv.FormatUint(peer.UserID, 36) + ":" + strconv.FormatUint(peer.TorrentID, 36) } // getPeerSetKey returns a string that is the peer's encoded torrentID appended to the typePrefix // This key corresponds to a torrent's pool of leechers or seeders func getPeerSetKey(typePrefix string, peer *storage.Peer) string { - return typePrefix + strconv.FormatUint(peer.TorrentID, 36) + return typePrefix + strconv.FormatUint(peer.TorrentID, 36) } // addPeers adds each peer's key to the specified peer set and saves the peer's information. // This function will not return an error if the peer already exists in the set. // This function calls multiple redis commands, it's not internally atomic. -func (tx *Tx) addPeers(peers map[string]storage.Peer, peerTypePrefix string) error { - for _, peer := range peers { - setKey := tx.conf.Prefix + getPeerSetKey(peerTypePrefix, &peer) - _, err := tx.Do("SADD", setKey, getPeerHashKey(&peer)) - if err != nil { - return err - } - tx.setPeer(&peer) - } - return nil +func (conn *Conn) addPeers(peers map[string]storage.Peer, peerTypePrefix string) error { + for _, peer := range peers { + setKey := conn.conf.Prefix + getPeerSetKey(peerTypePrefix, &peer) + _, err := conn.Do("SADD", setKey, getPeerHashKey(&peer)) + if err != nil { + return err + } + conn.setPeer(&peer) + } + return nil } // createPeer takes a slice of length 9 and returns a pointer to a new storage.Peer or an error. @@ -287,403 +287,403 @@ func (tx *Tx) addPeers(peers map[string]storage.Peer, peerTypePrefix string) err // If the field value string cannot be converted to the correct type, // the function will return a nil peer and the conversion error. func createPeer(peerVals []string) (*storage.Peer, error) { - if len(peerVals) != 18 { - return nil, ErrCreatePeer - } - var peer storage.Peer - var err error - for index, peerString := range peerVals { - switch peerString { - case "id": - peer.ID = peerVals[index+1] - case "user_id": - peer.UserID, err = strconv.ParseUint(peerVals[index+1], 10, 64) - case "torrent_id": - peer.TorrentID, err = strconv.ParseUint(peerVals[index+1], 10, 64) - case "ip": - peer.IP = peerVals[index+1] - case "port": - peer.Port, err = strconv.ParseUint(peerVals[index+1], 10, 64) - case "uploaded": - peer.Uploaded, err = strconv.ParseUint(peerVals[index+1], 10, 64) - case "downloaded": - peer.Downloaded, err = strconv.ParseUint(peerVals[index+1], 10, 64) - case "left": - peer.Left, err = strconv.ParseUint(peerVals[index+1], 10, 64) - case "last_announce": - peer.LastAnnounce, err = strconv.ParseInt(peerVals[index+1], 10, 64) - } - if err != nil { - return nil, err - } - } - return &peer, nil + if len(peerVals) != 18 { + return nil, ErrCreatePeer + } + var peer storage.Peer + var err error + for index, peerString := range peerVals { + switch peerString { + case "id": + peer.ID = peerVals[index+1] + case "user_id": + peer.UserID, err = strconv.ParseUint(peerVals[index+1], 10, 64) + case "torrent_id": + peer.TorrentID, err = strconv.ParseUint(peerVals[index+1], 10, 64) + case "ip": + peer.IP = peerVals[index+1] + case "port": + peer.Port, err = strconv.ParseUint(peerVals[index+1], 10, 64) + case "uploaded": + peer.Uploaded, err = strconv.ParseUint(peerVals[index+1], 10, 64) + case "downloaded": + peer.Downloaded, err = strconv.ParseUint(peerVals[index+1], 10, 64) + case "left": + peer.Left, err = strconv.ParseUint(peerVals[index+1], 10, 64) + case "last_announce": + peer.LastAnnounce, err = strconv.ParseInt(peerVals[index+1], 10, 64) + } + if err != nil { + return nil, err + } + } + return &peer, nil } // getPeers returns a map of peers from a specified torrent's peer set(seeders or leechers). // This is a multiple action command, it's not internally atomic. -func (tx *Tx) getPeers(torrentID uint64, peerTypePrefix string) (peers map[string]storage.Peer, err error) { - peers = make(map[string]storage.Peer) - setKey := tx.conf.Prefix + peerTypePrefix + strconv.FormatUint(torrentID, 36) - peerStrings, err := redis.Strings(tx.Do("SMEMBERS", setKey)) - if err != nil { - return nil, err - } - // Keys map to peer objects stored in hashes - for _, peerHashKey := range peerStrings { - hashKey := tx.conf.Prefix + peerHashKey - peerVals, err := redis.Strings(tx.Do("HGETALL", hashKey)) - if err != nil { - return nil, err - } - if len(peerVals) == 0 { - continue - } - peer, err := createPeer(peerVals) - if err != nil { - return nil, err - } - peers[storage.PeerMapKey(peer)] = *peer - } - return +func (conn *Conn) getPeers(torrentID uint64, peerTypePrefix string) (peers map[string]storage.Peer, err error) { + peers = make(map[string]storage.Peer) + setKey := conn.conf.Prefix + peerTypePrefix + strconv.FormatUint(torrentID, 36) + peerStrings, err := redis.Strings(conn.Do("SMEMBERS", setKey)) + if err != nil { + return nil, err + } + // Keys map to peer objects stored in hashes + for _, peerHashKey := range peerStrings { + hashKey := conn.conf.Prefix + peerHashKey + peerVals, err := redis.Strings(conn.Do("HGETALL", hashKey)) + if err != nil { + return nil, err + } + if len(peerVals) == 0 { + continue + } + peer, err := createPeer(peerVals) + if err != nil { + return nil, err + } + peers[storage.PeerMapKey(peer)] = *peer + } + return } // AddTorrent writes/overwrites torrent information and saves peers from both peer sets. // The hash fields names are the same as the JSON tags on the storage.Torrent struct. // This is a multiple action command, it's not internally atomic. -func (tx *Tx) AddTorrent(t *storage.Torrent) error { - hashkey := tx.conf.Prefix + TorrentPrefix + t.Infohash - _, err := tx.Do("HMSET", hashkey, - "id", t.ID, - "infohash", t.Infohash, - "active", t.Active, - "snatches", t.Snatches, - "up_multiplier", t.UpMultiplier, - "down_multiplier", t.DownMultiplier, - "last_action", t.LastAction) - if err != nil { - return err - } +func (conn *Conn) AddTorrent(t *storage.Torrent) error { + hashkey := conn.conf.Prefix + TorrentPrefix + t.Infohash + _, err := conn.Do("HMSET", hashkey, + "id", t.ID, + "infohash", t.Infohash, + "active", t.Active, + "snatches", t.Snatches, + "up_multiplier", t.UpMultiplier, + "down_multiplier", t.DownMultiplier, + "last_action", t.LastAction) + if err != nil { + return err + } - err = tx.addPeers(t.Seeders, SeedersPrefix) - if err != nil { - return err - } - err = tx.addPeers(t.Leechers, LeechersPrefix) - if err != nil { - return err - } - return nil + err = conn.addPeers(t.Seeders, SeedersPrefix) + if err != nil { + return err + } + err = conn.addPeers(t.Leechers, LeechersPrefix) + if err != nil { + return err + } + return nil } // RemoveTorrent deletes the torrent's Redis hash and then deletes all peers. // This function will not return an error if the torrent has already been removed. // This is a multiple action command, it's not internally atomic. -func (tx *Tx) RemoveTorrent(t *storage.Torrent) error { - hashkey := tx.conf.Prefix + TorrentPrefix + t.Infohash - _, err := tx.Do("DEL", hashkey) - if err != nil { - return err - } - // Remove seeders and leechers as well - err = tx.removePeers(t.ID, t.Seeders, SeedersPrefix) - if err != nil { - return err - } - err = tx.removePeers(t.ID, t.Leechers, LeechersPrefix) - if err != nil { - return err - } - return nil +func (conn *Conn) RemoveTorrent(t *storage.Torrent) error { + hashkey := conn.conf.Prefix + TorrentPrefix + t.Infohash + _, err := conn.Do("DEL", hashkey) + if err != nil { + return err + } + // Remove seeders and leechers as well + err = conn.removePeers(t.ID, t.Seeders, SeedersPrefix) + if err != nil { + return err + } + err = conn.removePeers(t.ID, t.Leechers, LeechersPrefix) + if err != nil { + return err + } + return nil } // AddUser writes/overwrites user information to a Redis hash. // The hash fields names are the same as the JSON tags on the storage.user struct. -func (tx *Tx) AddUser(u *storage.User) error { - hashkey := tx.conf.Prefix + UserPrefix + u.Passkey - _, err := tx.Do("HMSET", hashkey, - "id", u.ID, - "passkey", u.Passkey, - "up_multiplier", u.UpMultiplier, - "down_multiplier", u.DownMultiplier, - "slots", u.Slots, - "slots_used", u.SlotsUsed, - "snatches", u.Snatches) - if err != nil { - return err - } - return nil +func (conn *Conn) AddUser(u *storage.User) error { + hashkey := conn.conf.Prefix + UserPrefix + u.Passkey + _, err := conn.Do("HMSET", hashkey, + "id", u.ID, + "passkey", u.Passkey, + "up_multiplier", u.UpMultiplier, + "down_multiplier", u.DownMultiplier, + "slots", u.Slots, + "slots_used", u.SlotsUsed, + "snatches", u.Snatches) + if err != nil { + return err + } + return nil } // RemoveUser removes the user's hash from Redis. // This function does not return an error if the user doesn't exist. -func (tx *Tx) RemoveUser(u *storage.User) error { - hashkey := tx.conf.Prefix + UserPrefix + u.Passkey - _, err := tx.Do("DEL", hashkey) - if err != nil { - return err - } - return nil +func (conn *Conn) RemoveUser(u *storage.User) error { + hashkey := conn.conf.Prefix + UserPrefix + u.Passkey + _, err := conn.Do("DEL", hashkey) + if err != nil { + return err + } + return nil } // FindUser returns a pointer to a new user struct and true if the user exists, // or nil and false if the user doesn't exist. // This function does not return an error if the torrent doesn't exist. -func (tx *Tx) FindUser(passkey string) (*storage.User, bool, error) { - hashkey := tx.conf.Prefix + UserPrefix + passkey - // Consider using HGETALL instead of HVALS here for robustness - userStrings, err := redis.Strings(tx.Do("HGETALL", hashkey)) - if err != nil { - return nil, false, err - } else if len(userStrings) == 0 { - return nil, false, nil - } - foundUser, err := createUser(userStrings) - if err != nil { - return nil, false, err - } - return foundUser, true, nil +func (conn *Conn) FindUser(passkey string) (*storage.User, bool, error) { + hashkey := conn.conf.Prefix + UserPrefix + passkey + // Consider using HGETALL instead of HVALS here for robustness + userStrings, err := redis.Strings(conn.Do("HGETALL", hashkey)) + if err != nil { + return nil, false, err + } else if len(userStrings) == 0 { + return nil, false, nil + } + foundUser, err := createUser(userStrings) + if err != nil { + return nil, false, err + } + return foundUser, true, nil } // FindTorrent returns a pointer to a new torrent struct and true if the torrent exists, // or nil and false if the torrent doesn't exist. // This is a multiple action command, it's not internally atomic. -func (tx *Tx) FindTorrent(infohash string) (*storage.Torrent, bool, error) { - hashkey := tx.conf.Prefix + TorrentPrefix + infohash - torrentStrings, err := redis.Strings(tx.Do("HGETALL", hashkey)) - if err != nil { - return nil, false, err - } else if len(torrentStrings) == 0 { - return nil, false, nil - } +func (conn *Conn) FindTorrent(infohash string) (*storage.Torrent, bool, error) { + hashkey := conn.conf.Prefix + TorrentPrefix + infohash + torrentStrings, err := redis.Strings(conn.Do("HGETALL", hashkey)) + if err != nil { + return nil, false, err + } else if len(torrentStrings) == 0 { + return nil, false, nil + } - foundTorrent, err := tx.createTorrent(torrentStrings) - if err != nil { - return nil, false, err - } - return foundTorrent, true, nil + foundTorrent, err := conn.createTorrent(torrentStrings) + if err != nil { + return nil, false, err + } + return foundTorrent, true, nil } // ClientWhitelisted returns true if the ClientID exists in the Client set. // This function does not parse the client ID from the peer ID. // The clientID must match exactly to a member of the set. -func (tx *Tx) ClientWhitelisted(peerID string) (exists bool, err error) { - key := tx.conf.Prefix + "whitelist" - return redis.Bool(tx.Do("SISMEMBER", key, peerID)) +func (conn *Conn) ClientWhitelisted(peerID string) (exists bool, err error) { + key := conn.conf.Prefix + "whitelist" + return redis.Bool(conn.Do("SISMEMBER", key, peerID)) } // WhitelistClient adds a client ID to the client whitelist set. // This function does not return an error if the client ID is already in the set. -func (tx *Tx) WhitelistClient(peerID string) error { - key := tx.conf.Prefix + "whitelist" - _, err := tx.Do("SADD", key, peerID) - return err +func (conn *Conn) WhitelistClient(peerID string) error { + key := conn.conf.Prefix + "whitelist" + _, err := conn.Do("SADD", key, peerID) + return err } // UnWhitelistClient removes a client ID from the client whitelist set // This function does not return an error if the client ID is not in the set. -func (tx *Tx) UnWhitelistClient(peerID string) error { - key := tx.conf.Prefix + "whitelist" - _, err := tx.Do("SREM", key, peerID) - return err +func (conn *Conn) UnWhitelistClient(peerID string) error { + key := conn.conf.Prefix + "whitelist" + _, err := conn.Do("SREM", key, peerID) + return err } // RecordSnatch increments the snatch counter on the torrent and user by one. // This modifies the arguments as well as the hash field in Redis. // This is a multiple action command, it's not internally atomic. -func (tx *Tx) RecordSnatch(user *storage.User, torrent *storage.Torrent) error { +func (conn *Conn) RecordSnatch(user *storage.User, torrent *storage.Torrent) error { - torrentKey := tx.conf.Prefix + TorrentPrefix + torrent.Infohash - snatchCount, err := redis.Int(tx.Do("HINCRBY", torrentKey, "snatches", 1)) - if err != nil { - return err - } - torrent.Snatches = uint64(snatchCount) + torrentKey := conn.conf.Prefix + TorrentPrefix + torrent.Infohash + snatchCount, err := redis.Int(conn.Do("HINCRBY", torrentKey, "snatches", 1)) + if err != nil { + return err + } + torrent.Snatches = uint64(snatchCount) - userKey := tx.conf.Prefix + UserPrefix + user.Passkey - snatchCount, err = redis.Int(tx.Do("HINCRBY", userKey, "snatches", 1)) - if err != nil { - return err - } - user.Snatches = uint64(snatchCount) - return nil + userKey := conn.conf.Prefix + UserPrefix + user.Passkey + snatchCount, err = redis.Int(conn.Do("HINCRBY", userKey, "snatches", 1)) + if err != nil { + return err + } + user.Snatches = uint64(snatchCount) + return nil } // MarkActive sets the active field of the torrent to true. // This modifies the argument as well as the hash field in Redis. // This function will return ErrMarkActive if the torrent does not exist. -func (tx *Tx) MarkActive(torrent *storage.Torrent) error { - hashkey := tx.conf.Prefix + TorrentPrefix + torrent.Infohash - activeExists, err := redis.Int(tx.Do("HSET", hashkey, "active", true)) - if err != nil { - return err - } - torrent.Active = true - // HSET returns 1 if hash didn't exist before - if activeExists == 1 { - return ErrMarkActive - } - return nil +func (conn *Conn) MarkActive(torrent *storage.Torrent) error { + hashkey := conn.conf.Prefix + TorrentPrefix + torrent.Infohash + activeExists, err := redis.Int(conn.Do("HSET", hashkey, "active", true)) + if err != nil { + return err + } + torrent.Active = true + // HSET returns 1 if hash didn't exist before + if activeExists == 1 { + return ErrMarkActive + } + return nil } // MarkInactive sets the active field of the torrent to false. // This modifies the argument as well as the hash field in Redis. // This function will return ErrMarkActive if the torrent does not exist. -func (tx *Tx) MarkInactive(torrent *storage.Torrent) error { - hashkey := tx.conf.Prefix + TorrentPrefix + torrent.Infohash - activeExists, err := redis.Int(tx.Do("HSET", hashkey, "active", false)) - if err != nil { - return err - } - torrent.Active = false - // HSET returns 1 if hash didn't exist before - if activeExists == 1 { - // Clean-up incomplete torrent - _, err = tx.Do("DEL", hashkey) - if err != nil { - return err - } - return ErrMarkActive - } - return nil +func (conn *Conn) MarkInactive(torrent *storage.Torrent) error { + hashkey := conn.conf.Prefix + TorrentPrefix + torrent.Infohash + activeExists, err := redis.Int(conn.Do("HSET", hashkey, "active", false)) + if err != nil { + return err + } + torrent.Active = false + // HSET returns 1 if hash didn't exist before + if activeExists == 1 { + // Clean-up incomplete torrent + _, err = conn.Do("DEL", hashkey) + if err != nil { + return err + } + return ErrMarkActive + } + return nil } // AddLeecher adds a new peer to a torrent's leecher set. // This modifies the torrent argument, as well as the torrent's set and peer's hash in Redis. // This function does not return an error if the leecher already exists. // This is a multiple action command, it's not internally atomic. -func (tx *Tx) AddLeecher(torrent *storage.Torrent, peer *storage.Peer) error { - setKey := tx.conf.Prefix + LeechersPrefix + strconv.FormatUint(torrent.ID, 36) - _, err := tx.Do("SADD", setKey, getPeerHashKey(peer)) - if err != nil { - return err - } - err = tx.setPeer(peer) - if err != nil { - return err - } - if torrent.Leechers == nil { - torrent.Leechers = make(map[string]storage.Peer) - } - torrent.Leechers[storage.PeerMapKey(peer)] = *peer - return nil +func (conn *Conn) AddLeecher(torrent *storage.Torrent, peer *storage.Peer) error { + setKey := conn.conf.Prefix + LeechersPrefix + strconv.FormatUint(torrent.ID, 36) + _, err := conn.Do("SADD", setKey, getPeerHashKey(peer)) + if err != nil { + return err + } + err = conn.setPeer(peer) + if err != nil { + return err + } + if torrent.Leechers == nil { + torrent.Leechers = make(map[string]storage.Peer) + } + torrent.Leechers[storage.PeerMapKey(peer)] = *peer + return nil } // SetLeecher updates a torrent's leecher. // This modifies the torrent argument, as well as the peer's hash in Redis. // Setting assumes that the peer is already a leecher, and only needs to be updated. // This function does not return an error if the leecher does not exist or is not in the torrent's leecher set. -func (tx *Tx) SetLeecher(t *storage.Torrent, p *storage.Peer) error { - err := tx.setPeer(p) - if err != nil { - return err - } - t.Leechers[storage.PeerMapKey(p)] = *p - return nil +func (conn *Conn) SetLeecher(t *storage.Torrent, p *storage.Peer) error { + err := conn.setPeer(p) + if err != nil { + return err + } + t.Leechers[storage.PeerMapKey(p)] = *p + return nil } // RemoveLeecher removes the given peer from a torrent's leecher set. // This modifies the torrent argument, as well as the torrent's set and peer's hash in Redis. // This function does not return an error if the peer doesn't exist, or is not in the set. -func (tx *Tx) RemoveLeecher(t *storage.Torrent, p *storage.Peer) error { - err := tx.removePeer(p, LeechersPrefix) - if err != nil { - return err - } - delete(t.Leechers, storage.PeerMapKey(p)) - return nil +func (conn *Conn) RemoveLeecher(t *storage.Torrent, p *storage.Peer) error { + err := conn.removePeer(p, LeechersPrefix) + if err != nil { + return err + } + delete(t.Leechers, storage.PeerMapKey(p)) + return nil } // LeecherFinished moves a peer's hashkey from a torrent's leecher set to the seeder set and updates the peer. // This modifies the torrent argument, as well as the torrent's set and peer's hash in Redis. // This function does not return an error if the peer doesn't exist or is not in the torrent's leecher set. -func (tx *Tx) LeecherFinished(torrent *storage.Torrent, peer *storage.Peer) error { - torrentIdKey := strconv.FormatUint(torrent.ID, 36) - seederSetKey := tx.conf.Prefix + SeedersPrefix + torrentIdKey - leecherSetKey := tx.conf.Prefix + LeechersPrefix + torrentIdKey +func (conn *Conn) LeecherFinished(torrent *storage.Torrent, peer *storage.Peer) error { + torrentIdKey := strconv.FormatUint(torrent.ID, 36) + seederSetKey := conn.conf.Prefix + SeedersPrefix + torrentIdKey + leecherSetKey := conn.conf.Prefix + LeechersPrefix + torrentIdKey - _, err := tx.Do("SMOVE", leecherSetKey, seederSetKey, getPeerHashKey(peer)) - if err != nil { - return err - } - torrent.Seeders[storage.PeerMapKey(peer)] = *peer - delete(torrent.Leechers, storage.PeerMapKey(peer)) + _, err := conn.Do("SMOVE", leecherSetKey, seederSetKey, getPeerHashKey(peer)) + if err != nil { + return err + } + torrent.Seeders[storage.PeerMapKey(peer)] = *peer + delete(torrent.Leechers, storage.PeerMapKey(peer)) - err = tx.setPeer(peer) - return err + err = conn.setPeer(peer) + return err } // AddSeeder adds a new peer to a torrent's seeder set. // This modifies the torrent argument, as well as the torrent's set and peer's hash in Redis. // This function does not return an error if the seeder already exists. // This is a multiple action command, it's not internally atomic. -func (tx *Tx) AddSeeder(torrent *storage.Torrent, peer *storage.Peer) error { - setKey := tx.conf.Prefix + SeedersPrefix + strconv.FormatUint(torrent.ID, 36) - _, err := tx.Do("SADD", setKey, getPeerHashKey(peer)) - if err != nil { - return err - } - err = tx.setPeer(peer) - if err != nil { - return err - } - if torrent.Seeders == nil { - torrent.Seeders = make(map[string]storage.Peer) - } - torrent.Seeders[storage.PeerMapKey(peer)] = *peer - return nil +func (conn *Conn) AddSeeder(torrent *storage.Torrent, peer *storage.Peer) error { + setKey := conn.conf.Prefix + SeedersPrefix + strconv.FormatUint(torrent.ID, 36) + _, err := conn.Do("SADD", setKey, getPeerHashKey(peer)) + if err != nil { + return err + } + err = conn.setPeer(peer) + if err != nil { + return err + } + if torrent.Seeders == nil { + torrent.Seeders = make(map[string]storage.Peer) + } + torrent.Seeders[storage.PeerMapKey(peer)] = *peer + return nil } // SetSeeder updates a torrent's seeder. // This modifies the torrent argument, as well as the peer's hash in Redis. // Setting assumes that the peer is already a seeder, and only needs to be updated. // This function does not return an error if the seeder does not exist or is not in the torrent's seeder set. -func (tx *Tx) SetSeeder(t *storage.Torrent, p *storage.Peer) error { - err := tx.setPeer(p) - if err != nil { - return err - } - t.Seeders[storage.PeerMapKey(p)] = *p - return nil +func (conn *Conn) SetSeeder(t *storage.Torrent, p *storage.Peer) error { + err := conn.setPeer(p) + if err != nil { + return err + } + t.Seeders[storage.PeerMapKey(p)] = *p + return nil } // RemoveSeeder removes the given peer from a torrent's seeder set. // This modifies the torrent argument, as well as the torrent's set and peer's hash in Redis. // This function does not return an error if the peer doesn't exist, or is not in the set. -func (tx *Tx) RemoveSeeder(t *storage.Torrent, p *storage.Peer) error { - err := tx.removePeer(p, SeedersPrefix) - if err != nil { - return err - } - delete(t.Seeders, storage.PeerMapKey(p)) - return nil +func (conn *Conn) RemoveSeeder(t *storage.Torrent, p *storage.Peer) error { + err := conn.removePeer(p, SeedersPrefix) + if err != nil { + return err + } + delete(t.Seeders, storage.PeerMapKey(p)) + return nil } // IncrementSlots increment a user's Slots by one. // This function modifies the argument as well as the hash field in Redis. -func (tx *Tx) IncrementSlots(u *storage.User) error { - hashkey := tx.conf.Prefix + UserPrefix + u.Passkey - slotCount, err := redis.Int(tx.Do("HINCRBY", hashkey, "slots", 1)) - if err != nil { - return err - } - u.Slots = int64(slotCount) - return nil +func (conn *Conn) IncrementSlots(u *storage.User) error { + hashkey := conn.conf.Prefix + UserPrefix + u.Passkey + slotCount, err := redis.Int(conn.Do("HINCRBY", hashkey, "slots", 1)) + if err != nil { + return err + } + u.Slots = int64(slotCount) + return nil } // IncrementSlots increment a user's Slots by one. // This function modifies the argument as well as the hash field in Redis. -func (tx *Tx) DecrementSlots(u *storage.User) error { - hashkey := tx.conf.Prefix + UserPrefix + u.Passkey - slotCount, err := redis.Int(tx.Do("HINCRBY", hashkey, "slots", -1)) - if err != nil { - return err - } - u.Slots = int64(slotCount) - return nil +func (conn *Conn) DecrementSlots(u *storage.User) error { + hashkey := conn.conf.Prefix + UserPrefix + u.Passkey + slotCount, err := redis.Int(conn.Do("HINCRBY", hashkey, "slots", -1)) + if err != nil { + return err + } + u.Slots = int64(slotCount) + return nil } // init registers the redis driver func init() { - tracker.Register("redis", &driver{}) + tracker.Register("redis", &driver{}) } diff --git a/storage/tracker/redis/redis_bench_test.go b/storage/tracker/redis/redis_bench_test.go index 74aa054..530b466 100644 --- a/storage/tracker/redis/redis_bench_test.go +++ b/storage/tracker/redis/redis_bench_test.go @@ -5,284 +5,284 @@ package redis import ( - "math/rand" - "testing" - "time" + "math/rand" + "testing" + "time" ) func BenchmarkSuccessfulFindUser(b *testing.B) { - b.StopTimer() - tx := createTestTx() - testUser := createTestUser() - panicOnErr(tx.AddUser(testUser)) - b.StartTimer() + b.StopTimer() + conn := createTestConn() + testUser := createTestUser() + panicOnErr(conn.AddUser(testUser)) + b.StartTimer() - for bCount := 0; bCount < b.N; bCount++ { + for bCount := 0; bCount < b.N; bCount++ { - foundUser, found, err := tx.FindUser(testUser.Passkey) - panicOnErr(err) - if !found { - b.Error("user not found", testUser) - } - if *foundUser != *testUser { - b.Error("found user mismatch", *foundUser, testUser) - } - } - // Cleanup - b.StopTimer() - panicOnErr(tx.RemoveUser(testUser)) - b.StartTimer() + foundUser, found, err := conn.FindUser(testUser.Passkey) + panicOnErr(err) + if !found { + b.Error("user not found", testUser) + } + if *foundUser != *testUser { + b.Error("found user mismatch", *foundUser, testUser) + } + } + // Cleanup + b.StopTimer() + panicOnErr(conn.RemoveUser(testUser)) + b.StartTimer() } func BenchmarkFailedFindUser(b *testing.B) { - b.StopTimer() - tx := createTestTx() - testUser := createTestUser() - b.StartTimer() + b.StopTimer() + conn := createTestConn() + testUser := createTestUser() + b.StartTimer() - for bCount := 0; bCount < b.N; bCount++ { + for bCount := 0; bCount < b.N; bCount++ { - _, found, err := tx.FindUser(testUser.Passkey) - panicOnErr(err) - if found { - b.Error("user not found", testUser) - } - } + _, found, err := conn.FindUser(testUser.Passkey) + panicOnErr(err) + if found { + b.Error("user not found", testUser) + } + } } func BenchmarkSuccessfulFindTorrent(b *testing.B) { - b.StopTimer() - tx := createTestTx() - testTorrent := createTestTorrent() + b.StopTimer() + conn := createTestConn() + testTorrent := createTestTorrent() - panicOnErr(tx.AddTorrent(testTorrent)) - b.StartTimer() + panicOnErr(conn.AddTorrent(testTorrent)) + b.StartTimer() - for bCount := 0; bCount < b.N; bCount++ { - foundTorrent, found, err := tx.FindTorrent(testTorrent.Infohash) - panicOnErr(err) - if !found { - b.Error("torrent not found", testTorrent) - } - // Incomplete comparison as maps make struct not nativly comparable - if foundTorrent.Infohash != testTorrent.Infohash { - b.Error("found torrent mismatch", foundTorrent, testTorrent) - } - } - // Cleanup - b.StopTimer() - panicOnErr(tx.RemoveTorrent(testTorrent)) - b.StartTimer() + for bCount := 0; bCount < b.N; bCount++ { + foundTorrent, found, err := conn.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + if !found { + b.Error("torrent not found", testTorrent) + } + // Incomplete comparison as maps make struct not nativly comparable + if foundTorrent.Infohash != testTorrent.Infohash { + b.Error("found torrent mismatch", foundTorrent, testTorrent) + } + } + // Cleanup + b.StopTimer() + panicOnErr(conn.RemoveTorrent(testTorrent)) + b.StartTimer() } func BenchmarkFailFindTorrent(b *testing.B) { - b.StopTimer() - tx := createTestTx() - testTorrent := createTestTorrent() - b.StartTimer() + b.StopTimer() + conn := createTestConn() + testTorrent := createTestTorrent() + b.StartTimer() - for bCount := 0; bCount < b.N; bCount++ { - foundTorrent, found, err := tx.FindTorrent(testTorrent.Infohash) - panicOnErr(err) - if found { - b.Error("torrent found", foundTorrent) - } - } + for bCount := 0; bCount < b.N; bCount++ { + foundTorrent, found, err := conn.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + if found { + b.Error("torrent found", foundTorrent) + } + } } func BenchmarkSuccessfulClientWhitelisted(b *testing.B) { - b.StopTimer() - tx := createTestTx() - testPeerID := "-lt0D30-" - panicOnErr(tx.WhitelistClient(testPeerID)) - b.StartTimer() + b.StopTimer() + conn := createTestConn() + testPeerID := "-lt0D30-" + panicOnErr(conn.WhitelistClient(testPeerID)) + b.StartTimer() - for bCount := 0; bCount < b.N; bCount++ { - found, err := tx.ClientWhitelisted(testPeerID) - panicOnErr(err) - if !found { - b.Error("peerID not found", testPeerID) - } - } - // Cleanup - b.StopTimer() - panicOnErr(tx.UnWhitelistClient(testPeerID)) - b.StartTimer() + for bCount := 0; bCount < b.N; bCount++ { + found, err := conn.ClientWhitelisted(testPeerID) + panicOnErr(err) + if !found { + b.Error("peerID not found", testPeerID) + } + } + // Cleanup + b.StopTimer() + panicOnErr(conn.UnWhitelistClient(testPeerID)) + b.StartTimer() } func BenchmarkFailClientWhitelisted(b *testing.B) { - b.StopTimer() - tx := createTestTx() - testPeerID2 := "TIX0192" - b.StartTimer() + b.StopTimer() + conn := createTestConn() + testPeerID2 := "TIX0192" + b.StartTimer() - for bCount := 0; bCount < b.N; bCount++ { - found, err := tx.ClientWhitelisted(testPeerID2) - panicOnErr(err) - if found { - b.Error("peerID found", testPeerID2) - } - } + for bCount := 0; bCount < b.N; bCount++ { + found, err := conn.ClientWhitelisted(testPeerID2) + panicOnErr(err) + if found { + b.Error("peerID found", testPeerID2) + } + } } func BenchmarkRecordSnatch(b *testing.B) { - b.StopTimer() - tx := createTestTx() - testTorrent := createTestTorrent() - testUser := createTestUser() - panicOnErr(tx.AddTorrent(testTorrent)) - panicOnErr(tx.AddUser(testUser)) - b.StartTimer() + b.StopTimer() + conn := createTestConn() + testTorrent := createTestTorrent() + testUser := createTestUser() + panicOnErr(conn.AddTorrent(testTorrent)) + panicOnErr(conn.AddUser(testUser)) + b.StartTimer() - for bCount := 0; bCount < b.N; bCount++ { - panicOnErr(tx.RecordSnatch(testUser, testTorrent)) - } - // Cleanup - b.StopTimer() - panicOnErr(tx.RemoveTorrent(testTorrent)) - panicOnErr(tx.RemoveUser(testUser)) - b.StartTimer() + for bCount := 0; bCount < b.N; bCount++ { + panicOnErr(conn.RecordSnatch(testUser, testTorrent)) + } + // Cleanup + b.StopTimer() + panicOnErr(conn.RemoveTorrent(testTorrent)) + panicOnErr(conn.RemoveUser(testUser)) + b.StartTimer() } func BenchmarkMarkActive(b *testing.B) { - b.StopTimer() - tx := createTestTx() - testTorrent := createTestTorrent() - testTorrent.Active = false - panicOnErr(tx.AddTorrent(testTorrent)) - b.StartTimer() + b.StopTimer() + conn := createTestConn() + testTorrent := createTestTorrent() + testTorrent.Active = false + panicOnErr(conn.AddTorrent(testTorrent)) + b.StartTimer() - for bCount := 0; bCount < b.N; bCount++ { - panicOnErr(tx.MarkActive(testTorrent)) - } - // Cleanup - b.StopTimer() - panicOnErr(tx.RemoveTorrent(testTorrent)) - b.StartTimer() + for bCount := 0; bCount < b.N; bCount++ { + panicOnErr(conn.MarkActive(testTorrent)) + } + // Cleanup + b.StopTimer() + panicOnErr(conn.RemoveTorrent(testTorrent)) + b.StartTimer() } func BenchmarkAddSeeder(b *testing.B) { - b.StopTimer() - tx := createTestTx() - testTorrent := createTestTorrent() - panicOnErr(tx.AddTorrent(testTorrent)) - b.StartTimer() + b.StopTimer() + conn := createTestConn() + testTorrent := createTestTorrent() + panicOnErr(conn.AddTorrent(testTorrent)) + b.StartTimer() - for bCount := 0; bCount < b.N; bCount++ { - b.StopTimer() - testSeeder := createTestPeer(createTestUserID(), testTorrent.ID) - b.StartTimer() + for bCount := 0; bCount < b.N; bCount++ { + b.StopTimer() + testSeeder := createTestPeer(createTestUserID(), testTorrent.ID) + b.StartTimer() - panicOnErr(tx.AddSeeder(testTorrent, testSeeder)) - } - // Cleanup - b.StopTimer() - panicOnErr(tx.RemoveTorrent(testTorrent)) - b.StartTimer() + panicOnErr(conn.AddSeeder(testTorrent, testSeeder)) + } + // Cleanup + b.StopTimer() + panicOnErr(conn.RemoveTorrent(testTorrent)) + b.StartTimer() } func BenchmarkRemoveSeeder(b *testing.B) { - b.StopTimer() - tx := createTestTx() - testTorrent := createTestTorrent() - panicOnErr(tx.AddTorrent(testTorrent)) - testSeeder := createTestPeer(createTestUserID(), testTorrent.ID) - b.StartTimer() + b.StopTimer() + conn := createTestConn() + testTorrent := createTestTorrent() + panicOnErr(conn.AddTorrent(testTorrent)) + testSeeder := createTestPeer(createTestUserID(), testTorrent.ID) + b.StartTimer() - for bCount := 0; bCount < b.N; bCount++ { - b.StopTimer() - tx.AddSeeder(testTorrent, testSeeder) - b.StartTimer() + for bCount := 0; bCount < b.N; bCount++ { + b.StopTimer() + conn.AddSeeder(testTorrent, testSeeder) + b.StartTimer() - panicOnErr(tx.RemoveSeeder(testTorrent, testSeeder)) - } - // Cleanup - b.StopTimer() - panicOnErr(tx.RemoveTorrent(testTorrent)) - b.StartTimer() + panicOnErr(conn.RemoveSeeder(testTorrent, testSeeder)) + } + // Cleanup + b.StopTimer() + panicOnErr(conn.RemoveTorrent(testTorrent)) + b.StartTimer() } func BenchmarkSetSeeder(b *testing.B) { - b.StopTimer() - tx := createTestTx() - testTorrent := createTestTorrent() - panicOnErr(tx.AddTorrent(testTorrent)) - testSeeder := createTestPeer(createTestUserID(), testTorrent.ID) - panicOnErr(tx.AddSeeder(testTorrent, testSeeder)) - r := rand.New(rand.NewSource(time.Now().UnixNano())) - b.StartTimer() + b.StopTimer() + conn := createTestConn() + testTorrent := createTestTorrent() + panicOnErr(conn.AddTorrent(testTorrent)) + testSeeder := createTestPeer(createTestUserID(), testTorrent.ID) + panicOnErr(conn.AddSeeder(testTorrent, testSeeder)) + r := rand.New(rand.NewSource(time.Now().UnixNano())) + b.StartTimer() - for bCount := 0; bCount < b.N; bCount++ { - b.StopTimer() - testSeeder.Uploaded += uint64(r.Int63()) - b.StartTimer() + for bCount := 0; bCount < b.N; bCount++ { + b.StopTimer() + testSeeder.Uploaded += uint64(r.Int63()) + b.StartTimer() - tx.SetSeeder(testTorrent, testSeeder) - } - // Cleanup - b.StopTimer() - panicOnErr(tx.RemoveTorrent(testTorrent)) - b.StartTimer() + conn.SetSeeder(testTorrent, testSeeder) + } + // Cleanup + b.StopTimer() + panicOnErr(conn.RemoveTorrent(testTorrent)) + b.StartTimer() } func BenchmarkIncrementSlots(b *testing.B) { - b.StopTimer() - tx := createTestTx() - testUser := createTestUser() - panicOnErr(tx.AddUser(testUser)) - b.StartTimer() + b.StopTimer() + conn := createTestConn() + testUser := createTestUser() + panicOnErr(conn.AddUser(testUser)) + b.StartTimer() - for bCount := 0; bCount < b.N; bCount++ { - panicOnErr(tx.IncrementSlots(testUser)) - } - // Cleanup - b.StopTimer() - panicOnErr(tx.RemoveUser(testUser)) - b.StartTimer() + for bCount := 0; bCount < b.N; bCount++ { + panicOnErr(conn.IncrementSlots(testUser)) + } + // Cleanup + b.StopTimer() + panicOnErr(conn.RemoveUser(testUser)) + b.StartTimer() } func BenchmarkLeecherFinished(b *testing.B) { - b.StopTimer() - tx := createTestTx() - testTorrent := createTestTorrent() - panicOnErr(tx.AddTorrent(testTorrent)) - b.StartTimer() + b.StopTimer() + conn := createTestConn() + testTorrent := createTestTorrent() + panicOnErr(conn.AddTorrent(testTorrent)) + b.StartTimer() - for bCount := 0; bCount < b.N; bCount++ { - b.StopTimer() - testLeecher := createTestPeer(createTestUserID(), testTorrent.ID) - panicOnErr(tx.AddLeecher(testTorrent, testLeecher)) - testLeecher.Left = 0 - b.StartTimer() + for bCount := 0; bCount < b.N; bCount++ { + b.StopTimer() + testLeecher := createTestPeer(createTestUserID(), testTorrent.ID) + panicOnErr(conn.AddLeecher(testTorrent, testLeecher)) + testLeecher.Left = 0 + b.StartTimer() - panicOnErr(tx.LeecherFinished(testTorrent, testLeecher)) - } - // Cleanup - b.StopTimer() - panicOnErr(tx.RemoveTorrent(testTorrent)) - b.StartTimer() + panicOnErr(conn.LeecherFinished(testTorrent, testLeecher)) + } + // Cleanup + b.StopTimer() + panicOnErr(conn.RemoveTorrent(testTorrent)) + b.StartTimer() } // This is a comparision to the Leecher finished function func BenchmarkRemoveLeecherAddSeeder(b *testing.B) { - b.StopTimer() - tx := createTestTx() - testTorrent := createTestTorrent() - panicOnErr(tx.AddTorrent(testTorrent)) - b.StartTimer() + b.StopTimer() + conn := createTestConn() + testTorrent := createTestTorrent() + panicOnErr(conn.AddTorrent(testTorrent)) + b.StartTimer() - for bCount := 0; bCount < b.N; bCount++ { - b.StopTimer() - testLeecher := createTestPeer(createTestUserID(), testTorrent.ID) - panicOnErr(tx.AddLeecher(testTorrent, testLeecher)) - testLeecher.Left = 0 - b.StartTimer() + for bCount := 0; bCount < b.N; bCount++ { + b.StopTimer() + testLeecher := createTestPeer(createTestUserID(), testTorrent.ID) + panicOnErr(conn.AddLeecher(testTorrent, testLeecher)) + testLeecher.Left = 0 + b.StartTimer() - panicOnErr(tx.RemoveLeecher(testTorrent, testLeecher)) - panicOnErr(tx.AddSeeder(testTorrent, testLeecher)) - } - // Cleanup - b.StopTimer() - tx.RemoveTorrent(testTorrent) - b.StartTimer() + panicOnErr(conn.RemoveLeecher(testTorrent, testLeecher)) + panicOnErr(conn.AddSeeder(testTorrent, testLeecher)) + } + // Cleanup + b.StopTimer() + conn.RemoveTorrent(testTorrent) + b.StartTimer() } diff --git a/storage/tracker/redis/redis_test.go b/storage/tracker/redis/redis_test.go index 03ba681..9ae7d2b 100644 --- a/storage/tracker/redis/redis_test.go +++ b/storage/tracker/redis/redis_test.go @@ -5,180 +5,180 @@ package redis import ( - "crypto/rand" - "fmt" - "io" - "os" - "strconv" - "testing" + "crypto/rand" + "fmt" + "io" + "os" + "strconv" + "testing" - "github.com/garyburd/redigo/redis" + "github.com/garyburd/redigo/redis" - "github.com/pushrax/chihaya/config" - "github.com/pushrax/chihaya/storage" + "github.com/pushrax/chihaya/config" + "github.com/pushrax/chihaya/storage" ) var ( - testTorrentIDChannel chan uint64 - testUserIDChannel chan uint64 - testPeerIDChannel chan int + testTorrentIDChannel chan uint64 + testUserIDChannel chan uint64 + testPeerIDChannel chan int ) func init() { - testTorrentIDChannel = make(chan uint64, 100) - testUserIDChannel = make(chan uint64, 100) - testPeerIDChannel = make(chan int, 100) - // Sync access to ID counter with buffered global channels - go func() { - for i := 0; ; i++ { - testTorrentIDChannel <- uint64(i) - } - }() - go func() { - for i := 0; ; i++ { - testUserIDChannel <- uint64(i) - } - }() - go func() { - for i := 0; ; i++ { - testPeerIDChannel <- i - } - }() + testTorrentIDChannel = make(chan uint64, 100) + testUserIDChannel = make(chan uint64, 100) + testPeerIDChannel = make(chan int, 100) + // Sync access to ID counter with buffered global channels + go func() { + for i := 0; ; i++ { + testTorrentIDChannel <- uint64(i) + } + }() + go func() { + for i := 0; ; i++ { + testUserIDChannel <- uint64(i) + } + }() + go func() { + for i := 0; ; i++ { + testPeerIDChannel <- i + } + }() } func createTestTorrentID() uint64 { - return <-testTorrentIDChannel + return <-testTorrentIDChannel } func createTestUserID() uint64 { - return <-testUserIDChannel + return <-testUserIDChannel } func createTestPeerID() string { - return "-testPeerID-" + strconv.Itoa(<-testPeerIDChannel) + return "-testPeerID-" + strconv.Itoa(<-testPeerIDChannel) } func createTestInfohash() string { - uuid := make([]byte, 40) - n, err := io.ReadFull(rand.Reader, uuid) - if n != len(uuid) || err != nil { - panic(err) - } - return string(uuid) + uuid := make([]byte, 40) + n, err := io.ReadFull(rand.Reader, uuid) + if n != len(uuid) || err != nil { + panic(err) + } + return string(uuid) } func createTestPasskey() string { - uuid := make([]byte, 40) - n, err := io.ReadFull(rand.Reader, uuid) - if n != len(uuid) || err != nil { - panic(err) - } - return string(uuid) + uuid := make([]byte, 40) + n, err := io.ReadFull(rand.Reader, uuid) + if n != len(uuid) || err != nil { + panic(err) + } + return string(uuid) } func panicOnErr(err error) { - if err != nil { - fmt.Println(err) - panic(err) - } + if err != nil { + fmt.Println(err) + panic(err) + } } -func createTestRedisTx() *Tx { - testConfig, err := config.Open(os.Getenv("TESTCONFIGPATH")) - conf := &testConfig.Cache - panicOnErr(err) +func createTestRedisConn() *Conn { + testConfig, err := config.Open(os.Getenv("TESTCONFIGPATH")) + conf := &testConfig.Cache + panicOnErr(err) - testPool := &Pool{ - conf: conf, - pool: redis.Pool{ - MaxIdle: conf.MaxIdleConns, - IdleTimeout: conf.IdleTimeout.Duration, - Dial: makeDialFunc(conf), - TestOnBorrow: testOnBorrow, - }, - } + testPool := &Pool{ + conf: conf, + pool: redis.Pool{ + MaxIdle: conf.MaxIdleConns, + IdleTimeout: conf.IdleTimeout.Duration, + Dial: makeDialFunc(conf), + TestOnBorrow: testOnBorrow, + }, + } - txObj := &Tx{ - conf: testPool.conf, - done: false, - Conn: testPool.pool.Get(), - } - panicOnErr(err) + newConn := &Conn{ + conf: testPool.conf, + done: false, + Conn: testPool.pool.Get(), + } + panicOnErr(err) - // Test connection before returning - _, err = txObj.Do("PING") - panicOnErr(err) - return txObj + // Test connection before returning + _, err = newConn.Do("PING") + panicOnErr(err) + return newConn } func createTestUser() *storage.User { - return &storage.User{ID: createTestUserID(), Passkey: createTestPasskey(), - UpMultiplier: 1.01, DownMultiplier: 1.0, Slots: 4, SlotsUsed: 2, Snatches: 7} + return &storage.User{ID: createTestUserID(), Passkey: createTestPasskey(), + UpMultiplier: 1.01, DownMultiplier: 1.0, Slots: 4, SlotsUsed: 2, Snatches: 7} } func createTestPeer(userID uint64, torrentID uint64) *storage.Peer { - return &storage.Peer{ID: createTestPeerID(), UserID: userID, TorrentID: torrentID, - IP: "127.0.0.1", Port: 6889, Uploaded: 1024, Downloaded: 3000, Left: 4200, LastAnnounce: 11} + return &storage.Peer{ID: createTestPeerID(), UserID: userID, TorrentID: torrentID, + IP: "127.0.0.1", Port: 6889, Uploaded: 1024, Downloaded: 3000, Left: 4200, LastAnnounce: 11} } func createTestPeers(torrentID uint64, num int) map[string]storage.Peer { - testPeers := make(map[string]storage.Peer) - for i := 0; i < num; i++ { - tempPeer := createTestPeer(createTestUserID(), torrentID) - testPeers[storage.PeerMapKey(tempPeer)] = *tempPeer - } - return testPeers + testPeers := make(map[string]storage.Peer) + for i := 0; i < num; i++ { + tempPeer := createTestPeer(createTestUserID(), torrentID) + testPeers[storage.PeerMapKey(tempPeer)] = *tempPeer + } + return testPeers } func createTestTorrent() *storage.Torrent { - torrentInfohash := createTestInfohash() - torrentID := createTestTorrentID() + torrentInfohash := createTestInfohash() + torrentID := createTestTorrentID() - testSeeders := createTestPeers(torrentID, 4) - testLeechers := createTestPeers(torrentID, 2) + testSeeders := createTestPeers(torrentID, 4) + testLeechers := createTestPeers(torrentID, 2) - testTorrent := storage.Torrent{ID: torrentID, Infohash: torrentInfohash, Active: true, - Seeders: testSeeders, Leechers: testLeechers, Snatches: 11, UpMultiplier: 1.0, DownMultiplier: 1.0, LastAction: 0} - return &testTorrent + testTorrent := storage.Torrent{ID: torrentID, Infohash: torrentInfohash, Active: true, + Seeders: testSeeders, Leechers: testLeechers, Snatches: 11, UpMultiplier: 1.0, DownMultiplier: 1.0, LastAction: 0} + return &testTorrent } func TestValidPeers(t *testing.T) { - testTx := createTestRedisTx() - testTorrentID := createTestTorrentID() - testPeers := createTestPeers(testTorrentID, 3) + testConn := createTestRedisConn() + testTorrentID := createTestTorrentID() + testPeers := createTestPeers(testTorrentID, 3) - panicOnErr(testTx.addPeers(testPeers, "test:")) - peerMap, err := testTx.getPeers(testTorrentID, "test:") - panicOnErr(err) - if len(peerMap) != len(testPeers) { - t.Error("Num Peers not equal ", len(peerMap), len(testPeers)) - } - panicOnErr(testTx.removePeers(testTorrentID, testPeers, "test:")) + panicOnErr(testConn.addPeers(testPeers, "test:")) + peerMap, err := testConn.getPeers(testTorrentID, "test:") + panicOnErr(err) + if len(peerMap) != len(testPeers) { + t.Error("Num Peers not equal ", len(peerMap), len(testPeers)) + } + panicOnErr(testConn.removePeers(testTorrentID, testPeers, "test:")) } func TestInvalidPeers(t *testing.T) { - testTx := createTestRedisTx() - testTorrentID := createTestTorrentID() - testPeers := createTestPeers(testTorrentID, 3) - tempPeer := createTestPeer(createTestUserID(), testTorrentID) - testPeers[storage.PeerMapKey(tempPeer)] = *tempPeer + testConn := createTestRedisConn() + testTorrentID := createTestTorrentID() + testPeers := createTestPeers(testTorrentID, 3) + tempPeer := createTestPeer(createTestUserID(), testTorrentID) + testPeers[storage.PeerMapKey(tempPeer)] = *tempPeer - panicOnErr(testTx.addPeers(testPeers, "test:")) - // Imitate a peer being removed during get - hashKey := testTx.conf.Prefix + getPeerHashKey(tempPeer) - _, err := testTx.Do("DEL", hashKey) - panicOnErr(err) + panicOnErr(testConn.addPeers(testPeers, "test:")) + // Imitate a peer being removed during get + hashKey := testConn.conf.Prefix + getPeerHashKey(tempPeer) + _, err := testConn.Do("DEL", hashKey) + panicOnErr(err) - peerMap, err := testTx.getPeers(testTorrentID, "test:") - panicOnErr(err) - // Expect 1 less peer due to delete - if len(peerMap) != len(testPeers)-1 { - t.Error("Num Peers not equal ", len(peerMap), len(testPeers)-1) - } - panicOnErr(testTx.removePeers(testTorrentID, testPeers, "test:")) - if len(testPeers) != 0 { - t.Errorf("All peers not removed, %d peers remain!", len(testPeers)) - } + peerMap, err := testConn.getPeers(testTorrentID, "test:") + panicOnErr(err) + // Expect 1 less peer due to delete + if len(peerMap) != len(testPeers)-1 { + t.Error("Num Peers not equal ", len(peerMap), len(testPeers)-1) + } + panicOnErr(testConn.removePeers(testTorrentID, testPeers, "test:")) + if len(testPeers) != 0 { + t.Errorf("All peers not removed, %d peers remain!", len(testPeers)) + } } diff --git a/storage/tracker/redis/tx_test.go b/storage/tracker/redis/tx_test.go deleted file mode 100644 index 3f793c2..0000000 --- a/storage/tracker/redis/tx_test.go +++ /dev/null @@ -1,563 +0,0 @@ -// Copyright 2013 The Chihaya Authors. All rights reserved. -// Use of this source code is governed by the BSD 2-Clause license, -// which can be found in the LICENSE file. - -package redis - -import ( - "math/rand" - "os" - "reflect" - "testing" - "time" - - "github.com/pushrax/chihaya/config" - "github.com/pushrax/chihaya/storage" - "github.com/pushrax/chihaya/storage/tracker" -) - -func createTestTx() tracker.Conn { - testConfig, err := config.Open(os.Getenv("TESTCONFIGPATH")) - panicOnErr(err) - conf := &testConfig.Cache - - testPool, err := tracker.Open(conf) - panicOnErr(err) - - txObj, err := testPool.Get() - panicOnErr(err) - - return txObj -} - -func TestFindUserSuccess(t *testing.T) { - tx := createTestTx() - testUser := createTestUser() - - panicOnErr(tx.AddUser(testUser)) - foundUser, found, err := tx.FindUser(testUser.Passkey) - panicOnErr(err) - if !found { - t.Error("user not found", testUser) - } - if *foundUser != *testUser { - t.Error("found user mismatch", *foundUser, testUser) - } - // Cleanup - panicOnErr(tx.RemoveUser(testUser)) -} - -func TestFindUserFail(t *testing.T) { - tx := createTestTx() - testUser := createTestUser() - - foundUser, found, err := tx.FindUser(testUser.Passkey) - panicOnErr(err) - if found { - t.Error("user found", foundUser) - } -} - -func TestRemoveUser(t *testing.T) { - tx := createTestTx() - testUser := createTestUser() - - panicOnErr(tx.AddUser(testUser)) - err := tx.RemoveUser(testUser) - panicOnErr(err) - foundUser, found, err := tx.FindUser(testUser.Passkey) - panicOnErr(err) - if found { - t.Error("removed user found", foundUser) - } -} - -func TestFindTorrentSuccess(t *testing.T) { - tx := createTestTx() - testTorrent := createTestTorrent() - panicOnErr(tx.AddTorrent(testTorrent)) - - foundTorrent, found, err := tx.FindTorrent(testTorrent.Infohash) - panicOnErr(err) - if !found { - t.Error("torrent not found", testTorrent) - } - if !reflect.DeepEqual(foundTorrent, testTorrent) { - t.Error("found torrent mismatch", foundTorrent, testTorrent) - } - // Cleanup - panicOnErr(tx.RemoveTorrent(testTorrent)) -} - -func TestFindTorrentFail(t *testing.T) { - tx := createTestTx() - testTorrent := createTestTorrent() - - foundTorrent, found, err := tx.FindTorrent(testTorrent.Infohash) - panicOnErr(err) - if found { - t.Error("torrent found", foundTorrent) - } -} - -func TestRemoveTorrent(t *testing.T) { - tx := createTestTx() - testTorrent := createTestTorrent() - panicOnErr(tx.AddTorrent(testTorrent)) - - panicOnErr(tx.RemoveTorrent(testTorrent)) - foundTorrent, found, err := tx.FindTorrent(testTorrent.Infohash) - panicOnErr(err) - if found { - t.Error("removed torrent found", foundTorrent) - } - // Cleanup - panicOnErr(tx.RemoveTorrent(testTorrent)) -} - -func TestClientWhitelistSuccess(t *testing.T) { - tx := createTestTx() - testPeerID := "-lt0D30-" - - panicOnErr(tx.WhitelistClient(testPeerID)) - found, err := tx.ClientWhitelisted(testPeerID) - panicOnErr(err) - if !found { - t.Error("peerID not found", testPeerID) - } - // Cleanup - panicOnErr(tx.UnWhitelistClient(testPeerID)) -} - -func TestClientWhitelistFail(t *testing.T) { - tx := createTestTx() - testPeerID2 := "TIX0192" - - found, err := tx.ClientWhitelisted(testPeerID2) - panicOnErr(err) - if found { - t.Error("peerID found", testPeerID2) - } -} - -func TestRecordSnatch(t *testing.T) { - tx := createTestTx() - testTorrent := createTestTorrent() - testUser := createTestUser() - panicOnErr(tx.AddTorrent(testTorrent)) - panicOnErr(tx.AddUser(testUser)) - - userSnatches := testUser.Snatches - torrentSnatches := testTorrent.Snatches - - panicOnErr(tx.RecordSnatch(testUser, testTorrent)) - - foundTorrent, _, err := tx.FindTorrent(testTorrent.Infohash) - panicOnErr(err) - foundUser, _, err := tx.FindUser(testUser.Passkey) - panicOnErr(err) - - if testUser.Snatches != userSnatches+1 { - t.Error("snatch not recorded to local user", testUser.Snatches, userSnatches+1) - } - if testTorrent.Snatches != torrentSnatches+1 { - t.Error("snatch not recorded to local torrent") - } - if foundUser.Snatches != userSnatches+1 { - t.Error("snatch not recorded to cached user", foundUser.Snatches, userSnatches+1) - } - if foundTorrent.Snatches != torrentSnatches+1 { - t.Error("snatch not recorded to cached torrent") - } - // Cleanup - panicOnErr(tx.RemoveTorrent(testTorrent)) - panicOnErr(tx.RemoveUser(testUser)) -} - -func TestMarkActive(t *testing.T) { - tx := createTestTx() - testTorrent := createTestTorrent() - testTorrent.Active = false - panicOnErr(tx.AddTorrent(testTorrent)) - - panicOnErr(tx.MarkActive(testTorrent)) - foundTorrent, _, err := tx.FindTorrent(testTorrent.Infohash) - panicOnErr(err) - - if foundTorrent.Active != true { - t.Error("cached torrent not activated") - } - if testTorrent.Active != true { - t.Error("cached torrent not activated") - } - // Cleanup - panicOnErr(tx.RemoveTorrent(testTorrent)) -} - -func TestClientWhitelistRemove(t *testing.T) { - tx := createTestTx() - testPeerID := "-lt0D30-" - panicOnErr(tx.WhitelistClient(testPeerID)) - panicOnErr(tx.UnWhitelistClient(testPeerID)) - - found, err := tx.ClientWhitelisted(testPeerID) - panicOnErr(err) - if found { - t.Error("removed peerID found", testPeerID) - } -} - -func TestAddSeeder(t *testing.T) { - tx := createTestTx() - testTorrent := createTestTorrent() - panicOnErr(tx.AddTorrent(testTorrent)) - testSeeder := createTestPeer(createTestUserID(), testTorrent.ID) - - panicOnErr(tx.AddSeeder(testTorrent, testSeeder)) - foundTorrent, found, err := tx.FindTorrent(testTorrent.Infohash) - panicOnErr(err) - foundSeeder, found := foundTorrent.Seeders[storage.PeerMapKey(testSeeder)] - if found && foundSeeder != *testSeeder { - t.Error("seeder not added to cache", testSeeder) - } - foundSeeder, found = testTorrent.Seeders[storage.PeerMapKey(testSeeder)] - if found && foundSeeder != *testSeeder { - t.Error("seeder not added to local", testSeeder) - } - // Cleanup - panicOnErr(tx.RemoveTorrent(testTorrent)) -} - -func TestAddLeecher(t *testing.T) { - tx := createTestTx() - testTorrent := createTestTorrent() - panicOnErr(tx.AddTorrent(testTorrent)) - testLeecher := createTestPeer(createTestUserID(), testTorrent.ID) - - panicOnErr(tx.AddLeecher(testTorrent, testLeecher)) - foundTorrent, found, err := tx.FindTorrent(testTorrent.Infohash) - panicOnErr(err) - foundLeecher, found := foundTorrent.Leechers[storage.PeerMapKey(testLeecher)] - if found && foundLeecher != *testLeecher { - t.Error("leecher not added to cache", testLeecher) - } - foundLeecher, found = testTorrent.Leechers[storage.PeerMapKey(testLeecher)] - if found && foundLeecher != *testLeecher { - t.Error("leecher not added to local", testLeecher) - } - // Cleanup - panicOnErr(tx.RemoveTorrent(testTorrent)) -} - -func TestRemoveSeeder(t *testing.T) { - tx := createTestTx() - testTorrent := createTestTorrent() - panicOnErr(tx.AddTorrent(testTorrent)) - testSeeder := createTestPeer(createTestUserID(), testTorrent.ID) - panicOnErr(tx.AddSeeder(testTorrent, testSeeder)) - - panicOnErr(tx.RemoveSeeder(testTorrent, testSeeder)) - foundSeeder, found := testTorrent.Seeders[storage.PeerMapKey(testSeeder)] - if found || foundSeeder == *testSeeder { - t.Error("seeder not removed from local", foundSeeder) - } - - foundTorrent, found, err := tx.FindTorrent(testTorrent.Infohash) - panicOnErr(err) - foundSeeder, found = foundTorrent.Seeders[storage.PeerMapKey(testSeeder)] - if found || foundSeeder == *testSeeder { - t.Error("seeder not removed from cache", foundSeeder, *testSeeder) - } - // Cleanup - panicOnErr(tx.RemoveTorrent(testTorrent)) -} - -func TestRemoveLeecher(t *testing.T) { - tx := createTestTx() - testTorrent := createTestTorrent() - panicOnErr(tx.AddTorrent(testTorrent)) - testLeecher := createTestPeer(createTestUserID(), testTorrent.ID) - panicOnErr(tx.AddLeecher(testTorrent, testLeecher)) - - panicOnErr(tx.RemoveLeecher(testTorrent, testLeecher)) - foundTorrent, found, err := tx.FindTorrent(testTorrent.Infohash) - panicOnErr(err) - foundLeecher, found := foundTorrent.Leechers[storage.PeerMapKey(testLeecher)] - if found || foundLeecher == *testLeecher { - t.Error("leecher not removed from cache", foundLeecher, *testLeecher) - } - foundLeecher, found = testTorrent.Leechers[storage.PeerMapKey(testLeecher)] - if found || foundLeecher == *testLeecher { - t.Error("leecher not removed from local", foundLeecher, *testLeecher) - } - // Cleanup - panicOnErr(tx.RemoveTorrent(testTorrent)) -} - -func TestSetSeeder(t *testing.T) { - tx := createTestTx() - testTorrent := createTestTorrent() - panicOnErr(tx.AddTorrent(testTorrent)) - testSeeder := createTestPeer(createTestUserID(), testTorrent.ID) - panicOnErr(tx.AddSeeder(testTorrent, testSeeder)) - - r := rand.New(rand.NewSource(time.Now().UnixNano())) - testSeeder.Uploaded += uint64(r.Int63()) - - panicOnErr(tx.SetSeeder(testTorrent, testSeeder)) - - foundTorrent, _, err := tx.FindTorrent(testTorrent.Infohash) - panicOnErr(err) - foundSeeder, _ := foundTorrent.Seeders[storage.PeerMapKey(testSeeder)] - if foundSeeder != *testSeeder { - t.Error("seeder not updated in cache", foundSeeder, *testSeeder) - } - foundSeeder, _ = testTorrent.Seeders[storage.PeerMapKey(testSeeder)] - if foundSeeder != *testSeeder { - t.Error("seeder not updated in local", foundSeeder, *testSeeder) - } - // Cleanup - panicOnErr(tx.RemoveTorrent(testTorrent)) -} - -func TestSetLeecher(t *testing.T) { - tx := createTestTx() - testTorrent := createTestTorrent() - panicOnErr(tx.AddTorrent(testTorrent)) - testLeecher := createTestPeer(createTestUserID(), testTorrent.ID) - panicOnErr(tx.AddLeecher(testTorrent, testLeecher)) - - r := rand.New(rand.NewSource(time.Now().UnixNano())) - testLeecher.Uploaded += uint64(r.Int63()) - - panicOnErr(tx.SetLeecher(testTorrent, testLeecher)) - foundTorrent, _, err := tx.FindTorrent(testTorrent.Infohash) - panicOnErr(err) - foundLeecher, _ := foundTorrent.Leechers[storage.PeerMapKey(testLeecher)] - if foundLeecher != *testLeecher { - t.Error("leecher not updated in cache", testLeecher) - } - foundLeecher, _ = testTorrent.Leechers[storage.PeerMapKey(testLeecher)] - if foundLeecher != *testLeecher { - t.Error("leecher not updated in local", testLeecher) - } - // Cleanup - panicOnErr(tx.RemoveTorrent(testTorrent)) -} - -func TestIncrementSlots(t *testing.T) { - tx := createTestTx() - testUser := createTestUser() - panicOnErr(tx.AddUser(testUser)) - numSlots := testUser.Slots - - panicOnErr(tx.IncrementSlots(testUser)) - foundUser, _, err := tx.FindUser(testUser.Passkey) - panicOnErr(err) - - if foundUser.Slots != numSlots+1 { - t.Error("cached slots not incremented") - } - if testUser.Slots != numSlots+1 { - t.Error("local slots not incremented") - } - // Cleanup - panicOnErr(tx.RemoveUser(testUser)) -} - -func TestDecrementSlots(t *testing.T) { - tx := createTestTx() - testUser := createTestUser() - panicOnErr(tx.AddUser(testUser)) - numSlots := testUser.Slots - - panicOnErr(tx.DecrementSlots(testUser)) - foundUser, _, err := tx.FindUser(testUser.Passkey) - panicOnErr(err) - - if foundUser.Slots != numSlots-1 { - t.Error("cached slots not incremented") - } - if testUser.Slots != numSlots-1 { - t.Error("local slots not incremented") - } - // Cleanup - panicOnErr(tx.RemoveUser(testUser)) -} - -func TestLeecherFinished(t *testing.T) { - tx := createTestTx() - testTorrent := createTestTorrent() - panicOnErr(tx.AddTorrent(testTorrent)) - testLeecher := createTestPeer(createTestUserID(), testTorrent.ID) - panicOnErr(tx.AddLeecher(testTorrent, testLeecher)) - testLeecher.Left = 0 - - panicOnErr(tx.LeecherFinished(testTorrent, testLeecher)) - - foundTorrent, _, err := tx.FindTorrent(testTorrent.Infohash) - panicOnErr(err) - foundSeeder, _ := foundTorrent.Seeders[storage.PeerMapKey(testLeecher)] - if foundSeeder != *testLeecher { - t.Error("seeder not added to cache", foundSeeder, *testLeecher) - } - foundSeeder, _ = foundTorrent.Leechers[storage.PeerMapKey(testLeecher)] - if foundSeeder == *testLeecher { - t.Error("leecher not removed from cache", testLeecher) - } - foundSeeder, _ = testTorrent.Seeders[storage.PeerMapKey(testLeecher)] - if foundSeeder != *testLeecher { - t.Error("seeder not added to local", testLeecher) - } - foundSeeder, _ = testTorrent.Leechers[storage.PeerMapKey(testLeecher)] - if foundSeeder == *testLeecher { - t.Error("leecher not removed from local", testLeecher) - } - // Cleanup - panicOnErr(tx.RemoveTorrent(testTorrent)) -} - -// Add, update, verify remove -func TestUpdatePeer(t *testing.T) { - tx := createTestTx() - testTorrent := createTestTorrent() - testSeeder := createTestPeer(createTestUserID(), testTorrent.ID) - panicOnErr(tx.AddTorrent(testTorrent)) - panicOnErr(tx.AddSeeder(testTorrent, testSeeder)) - // Update a seeder, set it, then check to make sure it updated - r := rand.New(rand.NewSource(time.Now().UnixNano())) - testSeeder.Uploaded += uint64(r.Int63()) - - panicOnErr(tx.SetSeeder(testTorrent, testSeeder)) - - panicOnErr(tx.RemoveSeeder(testTorrent, testSeeder)) - foundTorrent, _, err := tx.FindTorrent(testTorrent.Infohash) - panicOnErr(err) - if seeder, exists := foundTorrent.Seeders[storage.PeerMapKey(testSeeder)]; exists { - t.Error("seeder not removed from cache", seeder) - } - if seeder, exists := testTorrent.Seeders[storage.PeerMapKey(testSeeder)]; exists { - t.Error("seeder not removed from local", seeder) - } - // Cleanup - panicOnErr(tx.RemoveTorrent(testTorrent)) -} - -func TestParallelFindUser(t *testing.T) { - t.Parallel() - if testing.Short() { - t.Skip() - } - tx := createTestTx() - testUserSuccess := createTestUser() - testUserFail := createTestUser() - panicOnErr(tx.AddUser(testUserSuccess)) - - for i := 0; i < 10; i++ { - foundUser, found, err := tx.FindUser(testUserFail.Passkey) - panicOnErr(err) - if found { - t.Error("user found", foundUser) - } - foundUser, found, err = tx.FindUser(testUserSuccess.Passkey) - panicOnErr(err) - if !found { - t.Error("user not found", testUserSuccess) - } - if *foundUser != *testUserSuccess { - t.Error("found user mismatch", *foundUser, testUserSuccess) - } - } - // Cleanup - panicOnErr(tx.RemoveUser(testUserSuccess)) -} - -func TestParallelFindTorrent(t *testing.T) { - t.Parallel() - if testing.Short() { - t.Skip() - } - tx := createTestTx() - testTorrentSuccess := createTestTorrent() - testTorrentFail := createTestTorrent() - panicOnErr(tx.AddTorrent(testTorrentSuccess)) - - for i := 0; i < 10; i++ { - foundTorrent, found, err := tx.FindTorrent(testTorrentSuccess.Infohash) - panicOnErr(err) - if !found { - t.Error("torrent not found", testTorrentSuccess) - } - if !reflect.DeepEqual(foundTorrent, testTorrentSuccess) { - t.Error("found torrent mismatch", foundTorrent, testTorrentSuccess) - } - foundTorrent, found, err = tx.FindTorrent(testTorrentFail.Infohash) - panicOnErr(err) - if found { - t.Error("torrent found", foundTorrent) - } - } - // Cleanup - panicOnErr(tx.RemoveTorrent(testTorrentSuccess)) -} - -func TestParallelSetSeeder(t *testing.T) { - t.Parallel() - if testing.Short() { - t.Skip() - } - tx := createTestTx() - testTorrent := createTestTorrent() - panicOnErr(tx.AddTorrent(testTorrent)) - testSeeder := createTestPeer(createTestUserID(), testTorrent.ID) - panicOnErr(tx.AddSeeder(testTorrent, testSeeder)) - r := rand.New(rand.NewSource(time.Now().UnixNano())) - - for i := 0; i < 10; i++ { - testSeeder.Uploaded += uint64(r.Int63()) - - panicOnErr(tx.SetSeeder(testTorrent, testSeeder)) - - foundTorrent, _, err := tx.FindTorrent(testTorrent.Infohash) - panicOnErr(err) - foundSeeder, _ := foundTorrent.Seeders[storage.PeerMapKey(testSeeder)] - if foundSeeder != *testSeeder { - t.Error("seeder not updated in cache", foundSeeder, *testSeeder) - } - foundSeeder, _ = testTorrent.Seeders[storage.PeerMapKey(testSeeder)] - if foundSeeder != *testSeeder { - t.Error("seeder not updated in local", foundSeeder, *testSeeder) - } - } - // Cleanup - panicOnErr(tx.RemoveTorrent(testTorrent)) -} - -func TestParallelAddLeecher(t *testing.T) { - t.Parallel() - if testing.Short() { - t.Skip() - } - tx := createTestTx() - testTorrent := createTestTorrent() - panicOnErr(tx.AddTorrent(testTorrent)) - - for i := 0; i < 10; i++ { - testLeecher := createTestPeer(createTestUserID(), testTorrent.ID) - - panicOnErr(tx.AddLeecher(testTorrent, testLeecher)) - - foundTorrent, found, err := tx.FindTorrent(testTorrent.Infohash) - panicOnErr(err) - foundLeecher, found := foundTorrent.Leechers[storage.PeerMapKey(testLeecher)] - if found && foundLeecher != *testLeecher { - t.Error("leecher not added to cache", testLeecher) - } - foundLeecher, found = testTorrent.Leechers[storage.PeerMapKey(testLeecher)] - if found && foundLeecher != *testLeecher { - t.Error("leecher not added to local", testLeecher) - } - } - // Cleanup - panicOnErr(tx.RemoveTorrent(testTorrent)) -}