diff --git a/main.go b/main.go index 0eb0aef..16c9941 100644 --- a/main.go +++ b/main.go @@ -18,25 +18,18 @@ import ( var ( profile bool - configFile string + configPath string ) func init() { flag.BoolVar(&profile, "profile", false, "Generate profiling data for pprof into chihaya.cpu") - flag.StringVar(&configFile, "config", "", "The location of a valid configuration file.") + flag.StringVar(&configPath, "config", "", "The location of a valid configuration file.") } func main() { flag.Parse() runtime.GOMAXPROCS(runtime.NumCPU()) - if configFile != "" { - conf, err := config.New(configFile) - if err != nil { - log.Fatalf("Failed to parse configuration file: %s\n", err) - } - } - if profile { log.Println("Running with profiling enabled") f, err := os.Create("chihaya.cpu") @@ -47,6 +40,13 @@ func main() { pprof.StartCPUProfile(f) } + if configPath == "" { + log.Fatalf("Must specify a configuration file") + } + conf, err := config.New(configPath) + if err != nil { + log.Fatalf("Failed to parse configuration file: %s\n", err) + } s := server.New(conf) go func() { @@ -68,7 +68,7 @@ func main() { os.Exit(0) }() - err = s.Start() + err = s.ListenAndServe() if err != nil { log.Fatalf("Failed to start server: %s\n", err) } diff --git a/server/announce.go b/server/announce.go index e2b544a..eaf7fdd 100644 --- a/server/announce.go +++ b/server/announce.go @@ -7,56 +7,57 @@ package server import ( "errors" "fmt" + "log" "net/http" "path" "github.com/pushrax/chihaya/config" ) -func (h *handler) serveAnnounce(w http.ResponseWriter, r *http.Request) { - passkey, action := path.Split(r.URL.Path) - user, err := validatePasskey(passkey, h.storage) +func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { + passkey, _ := path.Split(r.URL.Path) + user, err := validatePasskey(passkey, s.storage) if err != nil { - fail(err, w) + fail(err, w, r) return } pq, err := parseQuery(r.URL.RawQuery) if err != nil { - fail(errors.New("Error parsing query"), w) + fail(errors.New("Error parsing query"), w, r) return } ip, err := pq.determineIP(r) if err != nil { - fail(err, w) + fail(err, w, r) return } err = validateParsedQuery(pq) if err != nil { - fail(errors.New("Malformed request"), w) + fail(errors.New("Malformed request"), w, r) return } - if !whitelisted(pq.params["peerId"], h.conf) { - fail(errors.New("Your client is not approved"), w) + if !whitelisted(pq.params["peerId"], s.conf) { + fail(errors.New("Your client is not approved"), w, r) return } - torrent, exists, err := h.storage.FindTorrent(pq.params["infohash"]) + torrent, exists, err := s.storage.FindTorrent(pq.params["infohash"]) if err != nil { - panic("server: failed to find torrent") + log.Panicf("server: %s", err) } if !exists { - fail(errors.New("This torrent does not exist"), w) + fail(errors.New("This torrent does not exist"), w, r) return } if left, _ := pq.getUint64("left"); torrent.Status == 1 && left == 0 { - err := h.storage.UnpruneTorrent(torrent) + err := s.storage.UnpruneTorrent(torrent) if err != nil { - panic("server: failed to unprune torrent") + log.Panicf("server: %s", err) } torrent.Status = 0 } else if torrent.Status != 0 { @@ -67,14 +68,34 @@ func (h *handler) serveAnnounce(w http.ResponseWriter, r *http.Request) { left, ), w, + r, ) return } - // TODO + // TODO continue } func whitelisted(peerId string, conf *config.Config) bool { - // TODO + var ( + widLen int + matched bool + ) + + for _, whitelistedId := range conf.Whitelist { + widLen = len(whitelistedId) + if widLen <= len(peerId) { + matched = true + for i := 0; i < widLen; i++ { + if peerId[i] != whitelistedId[i] { + matched = false + break + } + } + if matched { + return true + } + } + } return false } diff --git a/server/query.go b/server/query.go index b263ead..45573d5 100644 --- a/server/query.go +++ b/server/query.go @@ -124,6 +124,7 @@ func validateParsedQuery(pq *parsedQuery) error { return nil } +// TODO IPv6 support func (pq *parsedQuery) determineIP(r *http.Request) (string, error) { ip, ok := pq.params["ip"] if !ok { diff --git a/server/scrape.go b/server/scrape.go new file mode 100644 index 0000000..f16cfff --- /dev/null +++ b/server/scrape.go @@ -0,0 +1,106 @@ +package server + +import ( + "errors" + "fmt" + "io" + "net/http" + "path" + "strconv" + "time" + + "github.com/pushrax/chihaya/storage" +) + +func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request) { + passkey, _ := path.Split(r.URL.Path) + user, err := validatePasskey(passkey, s.storage) + if err != nil { + fail(err, w, r) + return + } + + pq, err := parseQuery(r.URL.RawQuery) + if err != nil { + fail(errors.New("Error parsing query"), w, r) + return + } + + io.WriteString(w, "d") + bencode(w, "files") + if pq.infohashes != nil { + for _, infohash := range pq.infohashes { + torrent, exists, err := s.storage.FindTorrent(infohash) + if err != nil { + panic("server: failed to find torrent") + } + if exists { + bencode(w, infohash) + writeScrapeInfo(w, torrent) + } + } + } else if infohash, exists := pq.params["info_hash"]; exists { + torrent, exists, err := s.storage.FindTorrent(infohash) + if err != nil { + panic("server: failed to find torrent") + } + if exists { + bencode(w, infohash) + writeScrapeInfo(w, torrent) + } + } + io.WriteString(w, "e") + finalizeResponse(w, r) +} + +func writeScrapeInfo(w io.Writer, torrent *storage.Torrent) { + io.WriteString(w, "d") + bencode(w, "complete") + bencode(w, len(torrent.Seeders)) + bencode(w, "downloaded") + bencode(w, torrent.Snatched) + bencode(w, "incomplete") + bencode(w, len(torrent.Leechers)) + io.WriteString(w, "e") +} + +func bencode(w io.Writer, data interface{}) { + switch v := data.(type) { + case string: + str := fmt.Sprintf("%s:%s", strconv.Itoa(len(v)), v) + io.WriteString(w, str) + case int: + str := fmt.Sprintf("i%se", strconv.Itoa(v)) + io.WriteString(w, str) + case uint: + str := fmt.Sprintf("i%se", strconv.FormatUint(uint64(v), 10)) + io.WriteString(w, str) + case int64: + str := fmt.Sprintf("i%se", strconv.FormatInt(v, 10)) + io.WriteString(w, str) + case uint64: + str := fmt.Sprintf("i%se", strconv.FormatUint(v, 10)) + io.WriteString(w, str) + case time.Duration: // Assume seconds + str := fmt.Sprintf("i%se", strconv.FormatInt(int64(v/time.Second), 10)) + io.WriteString(w, str) + case map[string]interface{}: + io.WriteString(w, "d") + for key, val := range v { + str := fmt.Sprintf("%s:%s", strconv.Itoa(len(key)), key) + io.WriteString(w, str) + bencode(w, val) + } + io.WriteString(w, "e") + case []string: + io.WriteString(w, "l") + for _, val := range v { + bencode(w, val) + } + io.WriteString(w, "e") + default: + // Although not currently necessary, + // should handle []interface{} manually; Go can't do it implicitly + panic("Tried to bencode an unsupported type!") + } +} diff --git a/server/server.go b/server/server.go index 2647bf7..096eb5e 100644 --- a/server/server.go +++ b/server/server.go @@ -14,134 +14,114 @@ import ( "strconv" "sync" "sync/atomic" + "time" "github.com/pushrax/chihaya/config" "github.com/pushrax/chihaya/storage" ) type Server struct { - conf *config.Config - listener net.Listener - storage storage.Storage - terminated *bool - waitgroup *sync.WaitGroup + conf *config.Config + listener net.Listener + storage storage.Storage + + serving bool + startTime time.Time + + deltaRequests int64 + rpm int64 + + waitgroup sync.WaitGroup http.Server } func New(conf *config.Config) (*Server, error) { - var ( - wg sync.WaitGroup - terminated bool - ) - store, err := storage.New(&conf.Storage) if err != nil { return nil, err } - handler := &handler{ - conf: conf, - storage: store, - terminated: &terminated, - waitgroup: &wg, - } - s := &Server{ - conf: conf, - storage: store, - terminated: &terminated, - waitgroup: &wg, + conf: conf, + storage: store, + Server: http.Server{ + Addr: conf.Addr, + }, } + s.Server.Handler = s - s.Server.Addr = conf.Addr - s.Server.Handler = handler return s, nil } -func (s *Server) Start() error { - listener, err := net.Listen("tcp", s.conf.Addr) +func (s *Server) ListenAndServe() error { + listener, err := net.Listen("tcp", s.Addr) + s.listener = listener if err != nil { return err } - *s.terminated = false + s.serving = true + s.startTime = time.Now() + + go s.updateRPM() s.Serve(s.listener) + s.waitgroup.Wait() return nil } func (s *Server) Stop() error { - *s.terminated = true + s.serving = false s.waitgroup.Wait() - err := s.storage.Shutdown() + err := s.storage.Close() if err != nil { return err } return s.listener.Close() } -type handler struct { - conf *config.Config - deltaRequests int64 - storage storage.Storage - terminated *bool - waitgroup *sync.WaitGroup -} - -func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if *h.terminated { +func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if !s.serving { return } - h.waitgroup.Add(1) - defer h.waitgroup.Done() + s.waitgroup.Add(1) + defer s.waitgroup.Done() + defer atomic.AddInt64(&s.deltaRequests, 1) + defer finalizeResponse(w, r) - if r.URL.Path == "/stats" { - h.serveStats(w, r) - return - } - - passkey, action := path.Split(r.URL.Path) + _, action := path.Split(r.URL.Path) switch action { case "announce": - h.serveAnnounce(w, r) + s.serveAnnounce(w, r) return case "scrape": - // TODO - h.serveScrape(w, r) + s.serveScrape(w, r) return default: - written := fail(errors.New("Unknown action"), w) - h.finalizeResponse(w, r, written) + fail(errors.New("Unknown action"), w, r) return } } -func (h *handler) finalizeResponse( - w http.ResponseWriter, - r *http.Request, - written int, -) { +func finalizeResponse(w http.ResponseWriter, r *http.Request) { r.Close = true w.Header().Add("Content-Type", "text/plain") w.Header().Add("Connection", "close") - w.Header().Add("Content-Length", strconv.Itoa(written)) w.(http.Flusher).Flush() - atomic.AddInt64(&h.deltaRequests, 1) } -func fail(err error, w http.ResponseWriter) int { - e := err.Error() +func fail(err error, w http.ResponseWriter, r *http.Request) { + errmsg := err.Error() message := fmt.Sprintf( "%s%s%s%s%s", "d14:failure reason", - strconv.Itoa(len(e)), - ':', - e, - 'e', + strconv.Itoa(len(errmsg)), + ":", + errmsg, + "e", ) - written, _ := io.WriteString(w, message) - return written + io.WriteString(w, message) } func validatePasskey(dir string, s storage.Storage) (*storage.User, error) { diff --git a/server/stats.go b/server/stats.go new file mode 100644 index 0000000..56f5f70 --- /dev/null +++ b/server/stats.go @@ -0,0 +1,34 @@ +// Copyright 2013 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package server + +import ( + "encoding/json" + "net/http" + "sync/atomic" + "time" + + "github.com/pushrax/chihaya/config" +) + +type stats struct { + Uptime config.Duration `json:"uptime"` + RPM int64 `json:"req_per_min"` +} + +func (s *Server) serveStats(w http.ResponseWriter, r *http.Request) { + stats, _ := json.Marshal(&stats{ + config.Duration{time.Now().Sub(s.startTime)}, + s.rpm, + }) + w.Write(stats) +} + +func (s *Server) updateRPM() { + for _ = range time.NewTicker(time.Minute).C { + s.rpm = s.deltaRequests + atomic.StoreInt64(&s.deltaRequests, 0) + } +} diff --git a/storage/storage.go b/storage/storage.go index 06e02b4..76ff92c 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -42,7 +42,7 @@ func New(conf *config.Storage) (Storage, error) { } type Storage interface { - Shutdown() error + Close() error FindUser(passkey string) (*User, bool, error) FindTorrent(infohash string) (*Torrent, bool, error)