diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index b14435c..5c9dc92 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -12,8 +12,6 @@ import ( ee "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/stop" - "github.com/lbryio/reflector.go/store" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -67,6 +65,8 @@ const ( DirectionUpload = "upload" // to reflector DirectionDownload = "download" // from reflector + MtrLabelSource = "source" + errConnReset = "conn_reset" errReadConnReset = "read_conn_reset" errWriteConnReset = "write_conn_reset" @@ -96,6 +96,26 @@ var ( Name: "blob_download_total", Help: "Total number of blobs downloaded from reflector", }) + PeerDownloadCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: ns, + Name: "peer_download_total", + Help: "Total number of blobs downloaded from reflector through tcp protocol", + }) + Http3DownloadCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: ns, + Name: "http3_blob_download_total", + Help: "Total number of blobs downloaded from reflector through QUIC protocol", + }) + CacheHitCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: ns, + Name: "cache_hit_total", + Help: "Total number of blobs retrieved from the cache storage", + }) + CacheMissCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: ns, + Name: "cache_miss_total", + Help: "Total number of blobs retrieved from origin rather than cache storage", + }) BlobUploadCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: ns, Name: "blob_upload_total", @@ -106,6 +126,11 @@ var ( Name: "sdblob_upload_total", Help: "Total number of SD blobs (and therefore streams) uploaded to reflector", }) + RetrieverSpeed = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, + Name: "speed_mbps", + Help: "Speed of blob retrieval", + }, []string{MtrLabelSource}) ErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: ns, Name: "error_total", @@ -148,8 +173,6 @@ func TrackError(direction string, e error) (shouldLog bool) { // shouldLog is a errType = errUnexpectedEOFStr } else if errors.Is(e, syscall.EPIPE) { errType = errEPipe - } else if errors.Is(e, store.ErrBlobNotFound) { - errType = errBlobNotFound } else if strings.Contains(err.Error(), "write: broken pipe") { // tried to write to a pipe or socket that was closed by the peer // I believe this is the same as EPipe when direction == "download", but not for upload errType = errWriteBrokenPipe diff --git a/peer/http3/client.go b/peer/http3/client.go index aba77c3..85bd57d 100644 --- a/peer/http3/client.go +++ b/peer/http3/client.go @@ -8,10 +8,9 @@ import ( "net/http" "time" - "github.com/lbryio/reflector.go/store" - "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" + "github.com/lbryio/reflector.go/store" "github.com/lucas-clemente/quic-go/http3" ) @@ -79,6 +78,7 @@ func (c *Client) GetBlob(hash string) (stream.Blob, error) { return nil, errors.Err(err) } if resp.StatusCode == http.StatusNotFound { + fmt.Printf("%s blob not found %d\n", hash, resp.StatusCode) return nil, errors.Err(store.ErrBlobNotFound) } if resp.StatusCode != http.StatusOK { diff --git a/peer/http3/server.go b/peer/http3/server.go index 9be4f9c..9cc4f80 100644 --- a/peer/http3/server.go +++ b/peer/http3/server.go @@ -84,6 +84,8 @@ func (s *Server) Start(address string) error { if err != nil { s.logError(err) } + metrics.BlobDownloadCount.Inc() + metrics.Http3DownloadCount.Inc() }) r.HandleFunc("/has/{hash}", func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) diff --git a/peer/server.go b/peer/server.go index d7db443..9a53ef5 100644 --- a/peer/server.go +++ b/peer/server.go @@ -273,6 +273,7 @@ func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) { Length: len(blob), } metrics.BlobDownloadCount.Inc() + metrics.PeerDownloadCount.Inc() } } diff --git a/store/caching.go b/store/caching.go index 9d5149f..dfbff8f 100644 --- a/store/caching.go +++ b/store/caching.go @@ -1,8 +1,12 @@ package store import ( + "time" + "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" + + "github.com/lbryio/reflector.go/internal/metrics" ) // CachingBlobStore combines two stores, typically a local and a remote store, to improve performance. @@ -29,18 +33,26 @@ func (c *CachingBlobStore) Has(hash string) (bool, error) { // Get tries to get the blob from the cache first, falling back to the origin. If the blob comes // from the origin, it is also stored in the cache. func (c *CachingBlobStore) Get(hash string) (stream.Blob, error) { + start := time.Now() blob, err := c.cache.Get(hash) + retrievalTime := time.Since(start) if err == nil || !errors.Is(err, ErrBlobNotFound) { + metrics.CacheHitCount.Inc() + rate := float64(len(blob)) / 1024 / 1024 / retrievalTime.Seconds() + metrics.RetrieverSpeed.With(map[string]string{metrics.MtrLabelSource: "cache"}).Set(rate) return blob, err } + start = time.Now() blob, err = c.origin.Get(hash) if err != nil { return nil, err } - + retrievalTime = time.Since(start) err = c.cache.Put(hash, blob) - + rate := float64(len(blob)) / 1024 / 1024 / retrievalTime.Seconds() + metrics.RetrieverSpeed.With(map[string]string{metrics.MtrLabelSource: "origin"}).Set(rate) + metrics.CacheMissCount.Inc() return blob, err }