diff --git a/cmd/reflector.go b/cmd/reflector.go index b4e19f4..8659056 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -51,6 +51,10 @@ func reflectorCmd(cmd *cobra.Command, args []string) { } peerServer := peer.NewServer(combo) + if globalConfig.SlackHookURL != "" { + peerServer.StatLogger = log.StandardLogger() + peerServer.StatReportFrequency = 1 * time.Hour + } err = peerServer.Start(":5567") if err != nil { log.Fatal(err) diff --git a/peer/server.go b/peer/server.go index 12ba14b..cd0be6e 100644 --- a/peer/server.go +++ b/peer/server.go @@ -10,9 +10,11 @@ import ( "strings" "time" + "github.com/lbryio/reflector.go/reflector" + "github.com/lbryio/reflector.go/store" + "github.com/lbryio/lbry.go/extras/errors" "github.com/lbryio/lbry.go/extras/stop" - "github.com/lbryio/reflector.go/store" log "github.com/sirupsen/logrus" ) @@ -26,10 +28,14 @@ const ( // Server is an instance of a peer server that houses the listener and store. type Server struct { + StatLogger *log.Logger // logger to log stats + StatReportFrequency time.Duration // how often to log stats + store store.BlobStore closed bool - grp *stop.Group + grp *stop.Group + stats *reflector.Stats } // NewServer returns an initialized Server pointer. @@ -43,6 +49,7 @@ func NewServer(store store.BlobStore) *Server { // Shutdown gracefully shuts down the peer server. func (s *Server) Shutdown() { log.Debug("shutting down peer server...") + s.stats.Shutdown() s.grp.StopAndWait() log.Debug("peer server stopped") } @@ -62,6 +69,11 @@ func (s *Server) Start(address string) error { s.grp.Done() }() + s.stats = reflector.NewStatLogger("DOWNLOAD", s.StatLogger, s.StatReportFrequency, s.grp.Child()) + if s.StatLogger != nil && s.StatReportFrequency > 0 { + s.stats.Start() + } + return nil } @@ -286,6 +298,14 @@ func (s *Server) logError(e error) { if e == nil { return } + shouldLog := s.stats.AddError(e) + if shouldLog { + log.Errorln(errors.FullTrace(e)) + } + + return + + // old stuff below. its here for posterity, because we're gonna have to deal with these errors someday for real err := errors.Wrap(e, 0) diff --git a/reflector/server.go b/reflector/server.go index 8638272..4584b63 100644 --- a/reflector/server.go +++ b/reflector/server.go @@ -42,7 +42,7 @@ type Server struct { store store.BlobStore grp *stop.Group - stats *stats + stats *Stats } // NewServer returns an initialized reflector server pointer. @@ -57,9 +57,7 @@ func NewServer(store store.BlobStore) *Server { // Shutdown shuts down the reflector server gracefully. func (s *Server) Shutdown() { log.Println("shutting down reflector server...") - if s.isReportStats() { - s.stats.Shutdown() - } + s.stats.Shutdown() s.grp.StopAndWait() log.Println("reflector server stopped") } @@ -88,8 +86,8 @@ func (s *Server) Start(address string) error { s.grp.Done() }() - s.stats = newStatLogger(s.StatLogger, s.StatReportFrequency, s.grp.Child()) - if s.isReportStats() { + s.stats = NewStatLogger("UPLOAD", s.StatLogger, s.StatReportFrequency, s.grp.Child()) + if s.StatLogger != nil && s.StatReportFrequency > 0 { s.stats.Start() } @@ -393,10 +391,6 @@ func (s *Server) quitting() bool { } } -func (s *Server) isReportStats() bool { - return s.StatLogger != nil && s.StatReportFrequency > 0 -} - func BlobHash(blob []byte) string { hashBytes := sha512.Sum384(blob) return hex.EncodeToString(hashBytes[:]) diff --git a/reflector/stats.go b/reflector/stats.go index 095f0ca..c41c964 100644 --- a/reflector/stats.go +++ b/reflector/stats.go @@ -14,28 +14,32 @@ import ( // TODO: store daily stats too. and maybe other intervals -type stats struct { +type Stats struct { mu *sync.Mutex blobs int streams int errors map[string]int + started bool + name string logger *log.Logger logFreq time.Duration grp *stop.Group } -func newStatLogger(logger *log.Logger, logFreq time.Duration, parentGrp *stop.Group) *stats { - return &stats{ +func NewStatLogger(name string, logger *log.Logger, logFreq time.Duration, parentGrp *stop.Group) *Stats { + return &Stats{ mu: &sync.Mutex{}, grp: stop.New(parentGrp), logger: logger, logFreq: logFreq, errors: make(map[string]int), + name: name, } } -func (s *stats) Start() { +func (s *Stats) Start() { + s.started = true s.grp.Add(1) go func() { defer s.grp.Done() @@ -43,23 +47,27 @@ func (s *stats) Start() { }() } -func (s *stats) Shutdown() { +func (s *Stats) Shutdown() { + if !s.started { + return + } s.log() s.grp.StopAndWait() + s.started = false } -func (s *stats) AddBlob() { +func (s *Stats) AddBlob() { s.mu.Lock() defer s.mu.Unlock() s.blobs++ } -func (s *stats) AddStream() { +func (s *Stats) AddStream() { s.mu.Lock() defer s.mu.Unlock() s.streams++ } -func (s *stats) AddError(e error) (shouldLog bool) { // shouldLog is a hack, but whatever +func (s *Stats) AddError(e error) (shouldLog bool) { // shouldLog is a hack, but whatever if e == nil { return } @@ -83,7 +91,7 @@ func (s *stats) AddError(e error) (shouldLog bool) { // shouldLog is a hack, but return } -func (s *stats) runSlackLogger() { +func (s *Stats) runSlackLogger() { t := time.NewTicker(s.logFreq) for { select { @@ -95,7 +103,7 @@ func (s *stats) runSlackLogger() { } } -func (s *stats) log() { +func (s *Stats) log() { s.mu.Lock() blobs, streams := s.blobs, s.streams s.blobs, s.streams = 0, 0 @@ -110,5 +118,5 @@ func (s *stats) log() { errStr = errStr[:len(errStr)-2] // trim last comma and space } - s.logger.Printf("Stats: %d blobs, %d streams, errors: %s", blobs, streams, errStr) + s.logger.Printf("%s stats: %d blobs, %d streams, errors: %s", s.name, blobs, streams, errStr) }