diff --git a/chihaya.go b/chihaya.go index 3c20394..9245072 100644 --- a/chihaya.go +++ b/chihaya.go @@ -7,9 +7,11 @@ package chihaya import ( "flag" "os" + "os/signal" "runtime" "runtime/pprof" "sync" + "syscall" "github.com/golang/glog" @@ -80,28 +82,49 @@ func Boot() { } var wg sync.WaitGroup + var servers []tracker.Server if cfg.HTTPListenAddr != "" { wg.Add(1) + srv := http.NewServer(cfg, tkr) + servers = append(servers, srv) + go func() { defer wg.Done() - http.Serve(cfg, tkr) + srv.Serve() }() } if cfg.UDPListenAddr != "" { wg.Add(1) + srv := udp.NewServer(cfg, tkr) + servers = append(servers, srv) + go func() { defer wg.Done() - udp.Serve(cfg, tkr) + srv.Serve() }() } - wg.Wait() + shutdown := make(chan os.Signal) + signal.Notify(shutdown, syscall.SIGINT, syscall.SIGTERM) + + go func() { + wg.Wait() + signal.Stop(shutdown) + close(shutdown) + }() + + <-shutdown + glog.Info("Shutting down...") + + for _, srv := range servers { + srv.Stop() + } + + <-shutdown if err := tkr.Close(); err != nil { glog.Errorf("Failed to shut down tracker cleanly: %s", err.Error()) } - - glog.Info("Gracefully shut down") } diff --git a/http/http.go b/http/http.go index 06771c9..0031ed0 100644 --- a/http/http.go +++ b/http/http.go @@ -24,8 +24,10 @@ type ResponseHandler func(http.ResponseWriter, *http.Request, httprouter.Params) // Server represents an HTTP serving torrent tracker. type Server struct { - config *config.Config - tracker *tracker.Tracker + config *config.Config + tracker *tracker.Tracker + grace *graceful.Server + stopping bool } // makeHandler wraps our ResponseHandlers while timing requests, collecting, @@ -120,34 +122,50 @@ func (s *Server) connState(conn net.Conn, state http.ConnState) { // Serve creates a new Server and proceeds to block while handling requests // until a graceful shutdown. -func Serve(cfg *config.Config, tkr *tracker.Tracker) { - srv := &Server{ - config: cfg, - tracker: tkr, - } +func (s *Server) Serve() { + glog.V(0).Info("Starting HTTP on ", s.config.HTTPListenAddr) - glog.V(0).Info("Starting HTTP on ", cfg.HTTPListenAddr) - if cfg.HTTPListenLimit != 0 { - glog.V(0).Info("Limiting connections to ", cfg.HTTPListenLimit) + if s.config.HTTPListenLimit != 0 { + glog.V(0).Info("Limiting connections to ", s.config.HTTPListenLimit) } grace := &graceful.Server{ - Timeout: cfg.HTTPRequestTimeout.Duration, - ConnState: srv.connState, - ListenLimit: cfg.HTTPListenLimit, + Timeout: s.config.HTTPRequestTimeout.Duration, + ConnState: s.connState, + ListenLimit: s.config.HTTPListenLimit, Server: &http.Server{ - Addr: cfg.HTTPListenAddr, - Handler: newRouter(srv), - ReadTimeout: cfg.HTTPReadTimeout.Duration, - WriteTimeout: cfg.HTTPWriteTimeout.Duration, + Addr: s.config.HTTPListenAddr, + Handler: newRouter(s), + ReadTimeout: s.config.HTTPReadTimeout.Duration, + WriteTimeout: s.config.HTTPWriteTimeout.Duration, }, } + s.grace = grace grace.SetKeepAlivesEnabled(false) + grace.ShutdownInitiated = func() { s.stopping = true } if err := grace.ListenAndServe(); err != nil { if opErr, ok := err.(*net.OpError); !ok || (ok && opErr.Op != "accept") { glog.Errorf("Failed to gracefully run HTTP server: %s", err.Error()) + return } } + + glog.Info("HTTP server shut down cleanly") +} + +// Stop cleanly shuts down the server. +func (s *Server) Stop() { + if !s.stopping { + s.grace.Stop(s.grace.Timeout) + } +} + +// NewServer returns a new HTTP server for a given configuration and tracker. +func NewServer(cfg *config.Config, tkr *tracker.Tracker) *Server { + return &Server{ + config: cfg, + tracker: tkr, + } } diff --git a/tracker/tracker.go b/tracker/tracker.go index eae116f..2576f77 100644 --- a/tracker/tracker.go +++ b/tracker/tracker.go @@ -24,6 +24,15 @@ type Tracker struct { *Storage } +// Server represents a server for a given BitTorrent tracker protocol. +type Server interface { + // Serve runs the server and blocks until the server has shut down. + Serve() + + // Stop cleanly shuts down the server in a non-blocking manner. + Stop() +} + // New creates a new Tracker, and opens any necessary connections. // Maintenance routines are automatically spawned in the background. func New(cfg *config.Config) (*Tracker, error) { diff --git a/udp/udp.go b/udp/udp.go index 113814b..efde844 100644 --- a/udp/udp.go +++ b/udp/udp.go @@ -8,6 +8,7 @@ package udp import ( "net" + "time" "github.com/golang/glog" "github.com/pushrax/bufferpool" @@ -20,10 +21,12 @@ import ( type Server struct { config *config.Config tracker *tracker.Tracker + + done bool } -func (srv *Server) ListenAndServe() error { - listenAddr, err := net.ResolveUDPAddr("udp", srv.config.UDPListenAddr) +func (s *Server) serve() error { + listenAddr, err := net.ResolveUDPAddr("udp", s.config.UDPListenAddr) if err != nil { return err } @@ -34,37 +37,55 @@ func (srv *Server) ListenAndServe() error { return err } - if srv.config.UDPReadBufferSize > 0 { - sock.SetReadBuffer(srv.config.UDPReadBufferSize) + if s.config.UDPReadBufferSize > 0 { + sock.SetReadBuffer(s.config.UDPReadBufferSize) } pool := bufferpool.New(1000, 2048) - for { + for !s.done { buffer := pool.TakeSlice() + sock.SetReadDeadline(time.Now().Add(time.Second)) n, addr, err := sock.ReadFromUDP(buffer) + if err != nil { + if netErr, ok := err.(net.Error); ok && netErr.Temporary() { + continue + } return err } go func() { - response := srv.handlePacket(buffer[:n], addr) + response := s.handlePacket(buffer[:n], addr) if response != nil { sock.WriteToUDP(response, addr) } pool.GiveSlice(buffer) }() } + + return nil } -func Serve(cfg *config.Config, tkr *tracker.Tracker) { - srv := &Server{ +func (s *Server) Serve() { + glog.V(0).Info("Starting UDP on ", s.config.UDPListenAddr) + + if err := s.serve(); err != nil { + glog.Errorf("Failed to run UDP server: %s", err.Error()) + } else { + glog.Info("UDP server shut down cleanly") + } +} + +// Stop cleanly shuts down the server. +func (s *Server) Stop() { + s.done = true +} + +// NewServer returns a new UDP server for a given configuration and tracker. +func NewServer(cfg *config.Config, tkr *tracker.Tracker) *Server { + return &Server{ config: cfg, tracker: tkr, } - - glog.V(0).Info("Starting UDP on ", cfg.UDPListenAddr) - if err := srv.ListenAndServe(); err != nil { - glog.Errorf("Failed to run UDP server: %s", err.Error()) - } }