From 4d4a97986450d053b4852d40d8b5d18f13e1e77a Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Fri, 12 Jul 2013 00:36:24 -0400 Subject: [PATCH] announce handler progress --- config/config.go | 1 + server/announce.go | 239 ++++++++++++++++++++++++++++++++++------- server/query.go | 80 +++----------- server/scrape.go | 71 +++--------- server/server.go | 12 +-- server/torrent.go | 52 +++++++++ storage/data.go | 24 ++--- storage/redis/redis.go | 8 +- storage/storage.go | 14 ++- 9 files changed, 313 insertions(+), 188 deletions(-) create mode 100644 server/torrent.go diff --git a/config/config.go b/config/config.go index b843651..8407422 100644 --- a/config/config.go +++ b/config/config.go @@ -50,6 +50,7 @@ type Config struct { Private bool `json:"private"` Freeleech bool `json:"freeleech"` + Slots bool `json:"slots"` Announce Duration `json:"announce"` MinAnnounce Duration `json:"min_announce"` diff --git a/server/announce.go b/server/announce.go index d247a36..6df632c 100644 --- a/server/announce.go +++ b/server/announce.go @@ -11,35 +11,27 @@ import ( "net/http" "path" "strconv" + "time" + + "github.com/pushrax/chihaya/storage" ) func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { passkey, _ := path.Split(r.URL.Path) - _, err := s.validatePasskey(passkey) + user, err := s.FindUser(passkey) if err != nil { fail(err, w, r) return } - pq, err := parseQuery(r.URL.RawQuery) - if err != nil { - fail(errors.New("Error parsing query"), w, r) - return - } - - _, err = pq.determineIP(r) - if err != nil { - fail(err, w, r) - return - } - - err = pq.validateAnnounceParams() + aq, err := newAnnounceQuery(r) if err != nil { fail(errors.New("Malformed request"), w, r) return } - ok, err := s.dataStore.ClientWhitelisted(pq.params["peer_id"]) + peerID := aq.PeerID() + ok, err := s.dataStore.ClientWhitelisted(peerID) if err != nil { log.Panicf("server: %s", err) } @@ -48,7 +40,7 @@ func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { return } - torrent, exists, err := s.dataStore.FindTorrent(pq.params["infohash"]) + torrent, exists, err := s.dataStore.FindTorrent(aq.Infohash()) if err != nil { log.Panicf("server: %s", err) } @@ -62,34 +54,207 @@ func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { log.Panicf("server: %s", err) } - if left, _ := pq.getUint64("left"); torrent.Status == 1 && left == 0 { - err := tx.UnpruneTorrent(torrent) + left := aq.Left() + if torrent.Pruned && left == 0 { + err := tx.Unprune(torrent.ID) if err != nil { log.Panicf("server: %s", err) } - torrent.Status = 0 - } else if torrent.Status != 0 { - fail( - fmt.Errorf( - "This torrent does not exist (status: %d, left: %d)", - torrent.Status, - left, - ), - w, - r, - ) + } else if torrent.Pruned { + e := fmt.Errorf("This torrent does not exist (pruned: %t, left: %d)", torrent.Pruned, left) + fail(e, w, r) return } - var numWant int - if numWantStr, exists := pq.params["numWant"]; exists { - numWant, err := strconv.Atoi(numWantStr) - if err != nil { - numWant = s.conf.DefaultNumWant + _ = aq.NumWant(s.conf.DefaultNumWant) + + if s.conf.Slots && user.Slots != -1 && aq.Left() != 0 { + if user.UsedSlots >= user.Slots { + fail(errors.New("You've run out of download slots."), w, r) + return + } + } + + _, isLeecher := torrent.Leechers[peerID] + _, isSeeder := torrent.Seeders[peerID] + + event := aq.Event() + completed := "completed" == event + + if event == "stopped" || event == "paused" { + if left == 0 { + err := tx.RmSeeder(torrent.ID, peerID) + if err != nil { + log.Panicf("server: %s", err) + } + } else { + err := tx.RmLeecher(torrent.ID, peerID) + if err != nil { + log.Panicf("server: %s", err) + } + err = tx.DecrementSlots(user.ID) + if err != nil { + log.Panicf("server: %s", err) + } + } + } else if completed { + err := tx.Snatch(user.ID, torrent.ID) + if err != nil { + log.Panicf("server: %s", err) } - } else { - numWant = s.conf.DefaultNumWant } - // TODO continue +} + +// An AnnounceQuery is a parsedQuery that guarantees the existance +// of parameters required for torrent client announces. +type announceQuery struct { + pq *parsedQuery + ip string + created int64 +} + +func newAnnounceQuery(r *http.Request) (*announceQuery, error) { + pq, err := parseQuery(r.URL.RawQuery) + if err != nil { + return nil, err + } + + infohash, _ := pq.Params["info_hash"] + if infohash == "" { + return nil, errors.New("infohash does not exist") + } + peerId, _ := pq.Params["peer_id"] + if peerId == "" { + return nil, errors.New("peerId does not exist") + } + _, err = pq.getUint64("port") + if err != nil { + return nil, errors.New("port does not exist") + } + _, err = pq.getUint64("uploaded") + if err != nil { + return nil, errors.New("uploaded does not exist") + } + _, err = pq.getUint64("downloaded") + if err != nil { + return nil, errors.New("downloaded does not exist") + } + _, err = pq.getUint64("left") + if err != nil { + return nil, errors.New("left does not exist") + } + + aq := &announceQuery{ + pq: pq, + created: time.Now().Unix(), + } + aq.ip, err = aq.determineIP(r) + if err != nil { + return nil, err + } + return aq, nil +} + +func (aq *announceQuery) Infohash() string { + infohash, _ := aq.pq.Params["info_hash"] + if infohash == "" { + panic("announceQuery missing infohash") + } + return infohash +} + +func (aq *announceQuery) PeerID() string { + peerID, _ := aq.pq.Params["peer_id"] + if peerID == "" { + panic("announceQuery missing peer_id") + } + return peerID +} + +func (aq *announceQuery) Port() uint64 { + port, err := aq.pq.getUint64("port") + if err != nil { + panic("announceQuery missing port") + } + return port +} + +func (aq *announceQuery) IP() string { + return aq.ip +} + +func (aq *announceQuery) Uploaded() uint64 { + ul, err := aq.pq.getUint64("uploaded") + if err != nil { + panic("announceQuery missing uploaded") + } + return ul +} +func (aq *announceQuery) Downloaded() uint64 { + dl, err := aq.pq.getUint64("downloaded") + if err != nil { + panic("announceQuery missing downloaded") + } + return dl +} +func (aq *announceQuery) Left() uint64 { + left, err := aq.pq.getUint64("left") + if err != nil { + panic("announceQuery missing left") + } + return left +} + +func (aq *announceQuery) Event() string { + return aq.pq.Params["event"] +} + +func (aq *announceQuery) determineIP(r *http.Request) (string, error) { + if ip, ok := aq.pq.Params["ip"]; ok { + return ip, nil + } else if ip, ok := aq.pq.Params["ipv4"]; ok { + return ip, nil + } else if ips, ok := aq.pq.Params["X-Real-Ip"]; ok && len(ips) > 0 { + return string(ips[0]), nil + } else { + portIndex := len(r.RemoteAddr) - 1 + for ; portIndex >= 0; portIndex-- { + if r.RemoteAddr[portIndex] == ':' { + break + } + } + if portIndex != -1 { + return r.RemoteAddr[0:portIndex], nil + } else { + return "", errors.New("Failed to parse IP address") + } + } +} + +func (aq *announceQuery) NumWant(fallback int) int { + if numWantStr, exists := aq.pq.Params["numWant"]; exists { + numWant, err := strconv.Atoi(numWantStr) + if err != nil { + return fallback + } + return numWant + } else { + return fallback + } +} + +func (aq *announceQuery) Peer(uid, tid uint64) *storage.Peer { + return &storage.Peer{ + ID: aq.PeerID(), + UserID: uid, + TorrentID: tid, + + IP: aq.IP(), + Port: aq.Port(), + + LastAnnounce: aq.created, + Uploaded: aq.Uploaded(), + Downloaded: aq.Downloaded(), + } } diff --git a/server/query.go b/server/query.go index f189c6a..2e72e73 100644 --- a/server/query.go +++ b/server/query.go @@ -6,28 +6,29 @@ package server import ( "errors" - "net/http" "net/url" "strconv" ) +// parsedQuery represents a parsed URL.Query. type parsedQuery struct { - infohashes []string - params map[string]string + Infohashes []string + Params map[string]string } -func (pq *parsedQuery) getUint64(key string) (uint64, bool) { - str, exists := pq.params[key] +func (pq *parsedQuery) getUint64(key string) (uint64, error) { + str, exists := pq.Params[key] if !exists { - return 0, false + return 0, errors.New("Value does not exist for key: " + key) } val, err := strconv.ParseUint(str, 10, 64) if err != nil { - return 0, false + return 0, err } - return val, true + return val, nil } +// parseQuery parses a raw url query. func parseQuery(query string) (*parsedQuery, error) { var ( keyStart, keyEnd int @@ -38,8 +39,8 @@ func parseQuery(query string) (*parsedQuery, error) { hasInfohash = false pq = &parsedQuery{ - infohashes: nil, - params: make(map[string]string), + Infohashes: nil, + Params: make(map[string]string), } ) @@ -67,15 +68,15 @@ func parseQuery(query string) (*parsedQuery, error) { return nil, err } - pq.params[keyStr] = valStr + pq.Params[keyStr] = valStr if keyStr == "info_hash" { if hasInfohash { // Multiple infohashes - if pq.infohashes == nil { - pq.infohashes = []string{firstInfohash} + if pq.Infohashes == nil { + pq.Infohashes = []string{firstInfohash} } - pq.infohashes = append(pq.infohashes, valStr) + pq.Infohashes = append(pq.Infohashes, valStr) } else { firstInfohash = valStr hasInfohash = true @@ -95,54 +96,3 @@ func parseQuery(query string) (*parsedQuery, error) { } return pq, nil } - -func (pq *parsedQuery) validateAnnounceParams() error { - infohash, _ := pq.params["info_hash"] - if infohash == "" { - return errors.New("infohash does not exist") - } - peerId, _ := pq.params["peer_id"] - if peerId == "" { - return errors.New("peerId does not exist") - } - _, ok := pq.getUint64("port") - if ok == false { - return errors.New("port does not exist") - } - _, ok = pq.getUint64("uploaded") - if ok == false { - return errors.New("uploaded does not exist") - } - _, ok = pq.getUint64("downloaded") - if ok == false { - return errors.New("downloaded does not exist") - } - _, ok = pq.getUint64("left") - if ok == false { - return errors.New("left does not exist") - } - return nil -} - -// TODO IPv6 support -func (pq *parsedQuery) determineIP(r *http.Request) (string, error) { - if ip, ok := pq.params["ip"]; ok { - return ip, nil - } else if ip, ok := pq.params["ipv4"]; ok { - return ip, nil - } else if ips, ok := pq.params["X-Real-Ip"]; ok && len(ips) > 0 { - return string(ips[0]), nil - } else { - portIndex := len(r.RemoteAddr) - 1 - for ; portIndex >= 0; portIndex-- { - if r.RemoteAddr[portIndex] == ':' { - break - } - } - if portIndex != -1 { - return r.RemoteAddr[0:portIndex], nil - } else { - return "", errors.New("Failed to parse IP address") - } - } -} diff --git a/server/scrape.go b/server/scrape.go index 4dda80f..7285674 100644 --- a/server/scrape.go +++ b/server/scrape.go @@ -6,20 +6,17 @@ package server import ( "errors" - "fmt" "io" "log" "net/http" "path" - "strconv" - "time" "github.com/pushrax/chihaya/storage" ) func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request) { passkey, _ := path.Split(r.URL.Path) - _, err := s.validatePasskey(passkey) + _, err := s.FindUser(passkey) if err != nil { fail(err, w, r) return @@ -32,25 +29,25 @@ func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request) { } io.WriteString(w, "d") - bencode(w, "files") - if pq.infohashes != nil { - for _, infohash := range pq.infohashes { + writeBencoded(w, "files") + if pq.Infohashes != nil { + for _, infohash := range pq.Infohashes { torrent, exists, err := s.dataStore.FindTorrent(infohash) if err != nil { log.Panicf("server: %s", err) } if exists { - bencode(w, infohash) + writeBencoded(w, infohash) writeScrapeInfo(w, torrent) } } - } else if infohash, exists := pq.params["info_hash"]; exists { + } else if infohash, exists := pq.Params["info_hash"]; exists { torrent, exists, err := s.dataStore.FindTorrent(infohash) if err != nil { log.Panicf("server: %s", err) } if exists { - bencode(w, infohash) + writeBencoded(w, infohash) writeScrapeInfo(w, torrent) } } @@ -64,53 +61,11 @@ func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request) { func writeScrapeInfo(w io.Writer, torrent *storage.Torrent) { io.WriteString(w, "d") - bencode(w, "complete") - bencode(w, len(torrent.Seeders)) - bencode(w, "downloaded") - bencode(w, torrent.Snatched) - bencode(w, "incomplete") - bencode(w, len(torrent.Leechers)) + writeBencoded(w, "complete") + writeBencoded(w, len(torrent.Seeders)) + writeBencoded(w, "downloaded") + writeBencoded(w, torrent.Snatches) + writeBencoded(w, "incomplete") + writeBencoded(w, len(torrent.Leechers)) io.WriteString(w, "e") } - -func bencode(w io.Writer, data interface{}) { - // A massive switch is faster than reflection - switch v := data.(type) { - case string: - str := fmt.Sprintf("%s:%s", strconv.Itoa(len(v)), v) - io.WriteString(w, str) - case int: - str := fmt.Sprintf("i%se", strconv.Itoa(v)) - io.WriteString(w, str) - case uint: - str := fmt.Sprintf("i%se", strconv.FormatUint(uint64(v), 10)) - io.WriteString(w, str) - case int64: - str := fmt.Sprintf("i%se", strconv.FormatInt(v, 10)) - io.WriteString(w, str) - case uint64: - str := fmt.Sprintf("i%se", strconv.FormatUint(v, 10)) - io.WriteString(w, str) - case time.Duration: // Assume seconds - str := fmt.Sprintf("i%se", strconv.FormatInt(int64(v/time.Second), 10)) - io.WriteString(w, str) - case map[string]interface{}: - io.WriteString(w, "d") - for key, val := range v { - str := fmt.Sprintf("%s:%s", strconv.Itoa(len(key)), key) - io.WriteString(w, str) - bencode(w, val) - } - io.WriteString(w, "e") - case []string: - io.WriteString(w, "l") - for _, val := range v { - bencode(w, val) - } - io.WriteString(w, "e") - default: - // Although not currently necessary, - // should handle []interface{} manually; Go can't do it implicitly - panic("Tried to bencode an unsupported type!") - } -} diff --git a/server/server.go b/server/server.go index d3b262b..7fda1a6 100644 --- a/server/server.go +++ b/server/server.go @@ -7,7 +7,6 @@ package server import ( "errors" - "fmt" "io" "log" "net" @@ -113,14 +112,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { func fail(err error, w http.ResponseWriter, r *http.Request) { errmsg := err.Error() - message := fmt.Sprintf( - "%s%s%s%s%s", - "d14:failure reason", - strconv.Itoa(len(errmsg)), - ":", - errmsg, - "e", - ) + message := "d14:failure reason" + strconv.Itoa(len(errmsg)) + ":" + errmsg + "e" length, _ := io.WriteString(w, message) r.Close = true w.Header().Add("Content-Type", "text/plain") @@ -129,7 +121,7 @@ func fail(err error, w http.ResponseWriter, r *http.Request) { w.(http.Flusher).Flush() } -func (s *Server) validatePasskey(dir string) (*storage.User, error) { +func (s *Server) FindUser(dir string) (*storage.User, error) { if len(dir) != 34 { return nil, errors.New("Passkey is invalid") } diff --git a/server/torrent.go b/server/torrent.go new file mode 100644 index 0000000..f416c87 --- /dev/null +++ b/server/torrent.go @@ -0,0 +1,52 @@ +package server + +import ( + "fmt" + "io" + "strconv" + "time" +) + +func writeBencoded(w io.Writer, data interface{}) { + switch v := data.(type) { + case string: + str := fmt.Sprintf("%s:%s", strconv.Itoa(len(v)), v) + io.WriteString(w, str) + case int: + str := fmt.Sprintf("i%se", strconv.Itoa(v)) + io.WriteString(w, str) + case uint: + str := fmt.Sprintf("i%se", strconv.FormatUint(uint64(v), 10)) + io.WriteString(w, str) + case int64: + str := fmt.Sprintf("i%se", strconv.FormatInt(v, 10)) + io.WriteString(w, str) + case uint64: + str := fmt.Sprintf("i%se", strconv.FormatUint(v, 10)) + io.WriteString(w, str) + case time.Duration: // Assume seconds + str := fmt.Sprintf("i%se", strconv.FormatInt(int64(v/time.Second), 10)) + io.WriteString(w, str) + case map[string]interface{}: + io.WriteString(w, "d") + for key, val := range v { + str := fmt.Sprintf("%s:%s", strconv.Itoa(len(key)), key) + io.WriteString(w, str) + writeBencoded(w, val) + } + io.WriteString(w, "e") + case []string: + io.WriteString(w, "l") + for _, val := range v { + writeBencoded(w, val) + } + io.WriteString(w, "e") + default: + // Although not currently necessary, + // should handle []interface{} manually; Go can't do it implicitly + panic("Tried to bencode an unsupported type!") + } +} + +func compact() { +} diff --git a/storage/data.go b/storage/data.go index 2402a49..379c4e2 100644 --- a/storage/data.go +++ b/storage/data.go @@ -9,17 +9,14 @@ type Peer struct { UserID uint64 TorrentID uint64 - Port uint IP string - Addr []byte + Port uint64 Uploaded uint64 Downloaded uint64 Left uint64 - Seeding bool - StartTimeUnix int64 - LastAnnounce int64 + LastAnnounce int64 } type Torrent struct { @@ -28,21 +25,22 @@ type Torrent struct { UpMultiplier float64 DownMultiplier float64 - Seeders map[string]*Peer - Leechers map[string]*Peer + Seeders map[string]Peer + Leechers map[string]Peer - Snatched uint - Status int64 + Snatches uint + Pruned bool LastAction int64 } type User struct { - ID uint64 - Passkey string + ID uint64 + Passkey string + UpMultiplier float64 DownMultiplier float64 - Slots int64 - UsedSlots int64 + Slots int64 + UsedSlots int64 SlotsLastChecked int64 } diff --git a/storage/redis/redis.go b/storage/redis/redis.go index e364ee6..653038c 100644 --- a/storage/redis/redis.go +++ b/storage/redis/redis.go @@ -67,7 +67,7 @@ func (ds *DS) FindUser(passkey string) (*storage.User, bool, error) { conn := ds.Get() defer conn.Close() - key := ds.conf.Prefix + "user:" + passkey + key := ds.conf.Prefix + "User:" + passkey reply, err := redis.Values(conn.Do("HGETALL", key)) if err != nil { return nil, true, err @@ -90,7 +90,7 @@ func (ds *DS) FindTorrent(infohash string) (*storage.Torrent, bool, error) { conn := ds.Get() defer conn.Close() - key := ds.conf.Prefix + "torrent:" + infohash + key := ds.conf.Prefix + "Torrent:" + infohash reply, err := redis.Values(conn.Do("HGETALL", key)) if err != nil { return nil, false, err @@ -113,7 +113,7 @@ func (ds *DS) ClientWhitelisted(peerID string) (bool, error) { conn := ds.Get() defer conn.Close() - key := ds.conf.Prefix + "whitelist:" + peerID + key := ds.conf.Prefix + "Whitelist:" + peerID exists, err := redis.Bool(conn.Do("EXISTS", key)) if err != nil { return false, err @@ -151,7 +151,7 @@ func (t *Tx) UnpruneTorrent(torrent *storage.Torrent) error { if t.done { return storage.ErrTxDone } - key := t.conf.Prefix + "torrent:" + torrent.Infohash + key := t.conf.Prefix + "Torrent:" + torrent.Infohash err := t.Send("HSET " + key + " Status 0") if err != nil { return err diff --git a/storage/storage.go b/storage/storage.go index 104f790..c09ab7e 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -72,5 +72,17 @@ type Tx interface { Commit() error Rollback() error - UnpruneTorrent(torrent *Torrent) error + // Torrents + Snatch(userID, torrentID uint64) error + Unprune(torrentID uint64) error + + // Peers + NewLeecher(torrent *Torrent, p *Peer) error + RmLeecher(torrentID uint64, peerID string) error + + NewSeeder(torrent *Torrent, p *Peer) error + RmSeeder(torrentID uint64, peerID string) error + + // Users + DecrementSlots(userID uint64) error }