From 560e180e368ca18dd5dc3c23d31e0c3454ae7256 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Thu, 29 Oct 2020 12:39:53 -0400 Subject: [PATCH] separate singleflight cache wrapper, component names for cache metrics --- cmd/getstream.go | 1 + cmd/reflector.go | 14 +++++--- internal/metrics/metrics.go | 38 ++++++++++++--------- store/caching.go | 51 ++++++++++------------------ store/caching_test.go | 6 ++-- store/lru.go | 4 +-- store/lru_test.go | 4 +-- store/singleflight.go | 67 +++++++++++++++++++++++++++++++++++++ 8 files changed, 126 insertions(+), 59 deletions(-) create mode 100644 store/singleflight.go diff --git a/cmd/getstream.go b/cmd/getstream.go index ae586d7..c02ad0b 100644 --- a/cmd/getstream.go +++ b/cmd/getstream.go @@ -29,6 +29,7 @@ func getStreamCmd(cmd *cobra.Command, args []string) { sdHash := args[1] s := store.NewCachingStore( + "getstream", peer.NewStore(peer.StoreOpts{Address: addr}), store.NewDiskStore("/tmp/lbry_downloaded_blobs", 2), ) diff --git a/cmd/reflector.go b/cmd/reflector.go index ff379ad..2f687d0 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -153,13 +153,19 @@ func wrapWithCache(s store.BlobStore) store.BlobStore { if err != nil { log.Fatal(err) } - wrapped = store.NewCachingStore(wrapped, - store.NewLRUStore(store.NewDiskStore(diskCachePath, 2), diskCacheMaxSize)) + wrapped = store.NewCachingStore( + "reflector", + wrapped, + store.NewLRUStore("peer_server", store.NewDiskStore(diskCachePath, 2), diskCacheMaxSize), + ) } if reflectorCmdMemCache > 0 { - wrapped = store.NewCachingStore(wrapped, - store.NewLRUStore(store.NewMemStore(), reflectorCmdMemCache)) + wrapped = store.NewCachingStore( + "reflector", + wrapped, + store.NewLRUStore("peer_server", store.NewMemStore(), reflectorCmdMemCache), + ) } return wrapped diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 59cf050..8d46266 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -12,6 +12,7 @@ import ( ee "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/stop" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -67,6 +68,7 @@ const ( DirectionDownload = "download" // from reflector LabelCacheType = "cache_type" + LabelComponent = "component" LabelSource = "source" errConnReset = "conn_reset" @@ -116,37 +118,42 @@ var ( Help: "Total number of blobs downloaded from reflector through QUIC protocol", }) - CacheHitCount = promauto.NewCounter(prometheus.CounterOpts{ + CacheHitCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: ns, Subsystem: subsystemCache, Name: "hit_total", Help: "Total number of blobs retrieved from the cache storage", - }) - CacheMissCount = promauto.NewCounter(prometheus.CounterOpts{ + }, []string{LabelCacheType, LabelComponent}) + CacheMissCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: ns, Subsystem: subsystemCache, Name: "miss_total", Help: "Total number of blobs retrieved from origin rather than cache storage", - }) - CacheOriginRequestsCount = promauto.NewGauge(prometheus.GaugeOpts{ + }, []string{LabelCacheType, LabelComponent}) + CacheOriginRequestsCount = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: ns, Subsystem: subsystemCache, Name: "origin_requests_total", Help: "How many Get requests are in flight from the cache to the origin", - }) + }, []string{LabelCacheType, LabelComponent}) // during thundering-herd situations, the metric below should be a lot smaller than the metric above - CacheWaitingRequestsCount = promauto.NewGauge(prometheus.GaugeOpts{ + CacheWaitingRequestsCount = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: ns, Subsystem: subsystemCache, Name: "waiting_requests_total", Help: "How many cache requests are waiting for an in-flight origin request", - }) + }, []string{LabelCacheType, LabelComponent}) CacheLRUEvictCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: ns, Subsystem: subsystemCache, Name: "evict_total", Help: "Count of blobs evicted from cache", - }, []string{LabelCacheType}) + }, []string{LabelCacheType, LabelComponent}) + CacheRetrievalSpeed = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, + Name: "speed_mbps", + Help: "Speed of blob retrieval from cache or from origin", + }, []string{LabelCacheType, LabelComponent, LabelSource}) BlobUploadCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: ns, @@ -159,12 +166,6 @@ var ( Help: "Total number of SD blobs (and therefore streams) uploaded to reflector", }) - RetrieverSpeed = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: ns, - Name: "speed_mbps", - Help: "Speed of blob retrieval", - }, []string{LabelSource}) - MtrInBytesTcp = promauto.NewCounter(prometheus.CounterOpts{ Namespace: ns, Name: "tcp_in_bytes", @@ -202,6 +203,13 @@ var ( }) ) +func CacheLabels(name, component string) prometheus.Labels { + return prometheus.Labels{ + LabelCacheType: name, + LabelComponent: component, + } +} + func TrackError(direction string, e error) (shouldLog bool) { // shouldLog is a hack, but whatever if e == nil { return diff --git a/store/caching.go b/store/caching.go index c5ee404..0a06395 100644 --- a/store/caching.go +++ b/store/caching.go @@ -7,8 +7,6 @@ import ( "github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/reflector.go/internal/metrics" - - "golang.org/x/sync/singleflight" ) // CachingStore combines two stores, typically a local and a remote store, to improve performance. @@ -16,13 +14,16 @@ import ( // are retrieved from the origin and cached. Puts are cached and also forwarded to the origin. type CachingStore struct { origin, cache BlobStore - - sf *singleflight.Group + component string } // NewCachingStore makes a new caching disk store and returns a pointer to it. -func NewCachingStore(origin, cache BlobStore) *CachingStore { - return &CachingStore{origin: origin, cache: cache, sf: new(singleflight.Group)} +func NewCachingStore(component string, origin, cache BlobStore) *CachingStore { + return &CachingStore{ + component: component, + origin: WithSingleFlight(component, origin), + cache: cache, + } } const nameCaching = "caching" @@ -45,41 +46,25 @@ func (c *CachingStore) Get(hash string) (stream.Blob, error) { start := time.Now() blob, err := c.cache.Get(hash) if err == nil || !errors.Is(err, ErrBlobNotFound) { - metrics.CacheHitCount.Inc() + metrics.CacheHitCount.With(metrics.CacheLabels(c.cache.Name(), c.component)).Inc() rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds() - metrics.RetrieverSpeed.With(map[string]string{metrics.LabelSource: "cache"}).Set(rate) + metrics.CacheRetrievalSpeed.With(map[string]string{ + metrics.LabelCacheType: c.cache.Name(), + metrics.LabelComponent: c.component, + metrics.LabelSource: "cache", + }).Set(rate) return blob, err } - metrics.CacheMissCount.Inc() - return c.getFromOrigin(hash) -} + metrics.CacheMissCount.With(metrics.CacheLabels(c.cache.Name(), c.component)).Inc() -// getFromOrigin ensures that only one Get per hash is sent to the origin at a time, -// thereby protecting against https://en.wikipedia.org/wiki/Thundering_herd_problem -func (c *CachingStore) getFromOrigin(hash string) (stream.Blob, error) { - metrics.CacheWaitingRequestsCount.Inc() - defer metrics.CacheWaitingRequestsCount.Dec() - originBlob, err, _ := c.sf.Do(hash, func() (interface{}, error) { - metrics.CacheOriginRequestsCount.Inc() - defer metrics.CacheOriginRequestsCount.Dec() - - start := time.Now() - blob, err := c.origin.Get(hash) - if err != nil { - return nil, err - } - - rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds() - metrics.RetrieverSpeed.With(map[string]string{metrics.LabelSource: "origin"}).Set(rate) - - err = c.cache.Put(hash, blob) - return blob, err - }) + blob, err = c.origin.Get(hash) if err != nil { return nil, err } - return originBlob.(stream.Blob), nil + + err = c.cache.Put(hash, blob) + return blob, err } // Put stores the blob in the origin and the cache diff --git a/store/caching_test.go b/store/caching_test.go index 5ea61d5..34f928c 100644 --- a/store/caching_test.go +++ b/store/caching_test.go @@ -12,7 +12,7 @@ import ( func TestCachingStore_Put(t *testing.T) { origin := NewMemStore() cache := NewMemStore() - s := NewCachingStore(origin, cache) + s := NewCachingStore("test", origin, cache) b := []byte("this is a blob of stuff") hash := "hash" @@ -42,7 +42,7 @@ func TestCachingStore_Put(t *testing.T) { func TestCachingStore_CacheMiss(t *testing.T) { origin := NewMemStore() cache := NewMemStore() - s := NewCachingStore(origin, cache) + s := NewCachingStore("test", origin, cache) b := []byte("this is a blob of stuff") hash := "hash" @@ -80,7 +80,7 @@ func TestCachingStore_ThunderingHerd(t *testing.T) { storeDelay := 100 * time.Millisecond origin := NewSlowBlobStore(storeDelay) cache := NewMemStore() - s := NewCachingStore(origin, cache) + s := NewCachingStore("test", origin, cache) b := []byte("this is a blob of stuff") hash := "hash" diff --git a/store/lru.go b/store/lru.go index a9199c4..4ef4908 100644 --- a/store/lru.go +++ b/store/lru.go @@ -17,9 +17,9 @@ type LRUStore struct { } // NewLRUStore initialize a new LRUStore -func NewLRUStore(store BlobStore, maxItems int) *LRUStore { +func NewLRUStore(component string, store BlobStore, maxItems int) *LRUStore { lru, err := golru.NewWithEvict(maxItems, func(key interface{}, value interface{}) { - metrics.CacheLRUEvictCount.WithLabelValues(store.Name()).Inc() + metrics.CacheLRUEvictCount.With(metrics.CacheLabels(store.Name(), component)).Inc() _ = store.Delete(key.(string)) // TODO: log this error. may happen if underlying entry is gone but cache entry still there }) if err != nil { diff --git a/store/lru_test.go b/store/lru_test.go index 9a2a94a..0576377 100644 --- a/store/lru_test.go +++ b/store/lru_test.go @@ -17,7 +17,7 @@ const cacheMaxBlobs = 3 func getTestLRUStore() (*LRUStore, *DiskStore) { d := NewDiskStore("/", 2) d.fs = afero.NewMemMapFs() - return NewLRUStore(d, 3), d + return NewLRUStore("test", d, 3), d } func countOnDisk(t *testing.T, disk *DiskStore) int { @@ -134,7 +134,7 @@ func TestLRUStore_loadExisting(t *testing.T) { require.Equal(t, 1, len(existing), "blob should exist in cache") assert.Equal(t, hash, existing[0]) - lru := NewLRUStore(d, 3) // lru should load existing blobs when it's created + lru := NewLRUStore("test", d, 3) // lru should load existing blobs when it's created has, err := lru.Has(hash) require.NoError(t, err) assert.True(t, has, "hash should be loaded from disk store but it's not") diff --git a/store/singleflight.go b/store/singleflight.go new file mode 100644 index 0000000..fbe314f --- /dev/null +++ b/store/singleflight.go @@ -0,0 +1,67 @@ +package store + +import ( + "time" + + "github.com/lbryio/reflector.go/internal/metrics" + + "github.com/lbryio/lbry.go/v2/stream" + + "golang.org/x/sync/singleflight" +) + +func WithSingleFlight(component string, origin BlobStore) BlobStore { + return &singleflightStore{ + BlobStore: origin, + component: component, + sf: new(singleflight.Group), + } +} + +type singleflightStore struct { + BlobStore + + component string + sf *singleflight.Group +} + +func (s *singleflightStore) Name() string { + return "sf_" + s.BlobStore.Name() +} + +// Get ensures that only one request per hash is sent to the origin at a time, +// thereby protecting against https://en.wikipedia.org/wiki/Thundering_herd_problem +func (s *singleflightStore) Get(hash string) (stream.Blob, error) { + metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Inc() + defer metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Dec() + + blob, err, _ := s.sf.Do(hash, s.getter(hash)) + if err != nil { + return nil, err + } + return blob.(stream.Blob), nil +} + +// getter returns a function that gets a blob from the origin +// only one getter per hash will be executing at a time +func (s *singleflightStore) getter(hash string) func() (interface{}, error) { + return func() (interface{}, error) { + metrics.CacheOriginRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Inc() + defer metrics.CacheOriginRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Dec() + + start := time.Now() + blob, err := s.BlobStore.Get(hash) + if err != nil { + return nil, err + } + + rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds() + metrics.CacheRetrievalSpeed.With(map[string]string{ + metrics.LabelCacheType: s.BlobStore.Name(), + metrics.LabelComponent: s.component, + metrics.LabelSource: "origin", + }).Set(rate) + + return blob, nil + } +}