diff --git a/cmd/reflector.go b/cmd/reflector.go index ddac74b..d46fd09 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -159,7 +159,7 @@ func wrapWithCache(s store.BlobStore) store.BlobStore { if reflectorCmdMemCache > 0 { wrapped = store.NewCachingStore(wrapped, - store.NewLRUStore(store.NewMemoryStore(), reflectorCmdMemCache)) + store.NewLRUStore(store.NewMemStore(), reflectorCmdMemCache)) } return wrapped diff --git a/cmd/test.go b/cmd/test.go index e611b22..a330856 100644 --- a/cmd/test.go +++ b/cmd/test.go @@ -29,7 +29,7 @@ func init() { func testCmd(cmd *cobra.Command, args []string) { log.Printf("reflector %s", meta.VersionString()) - memStore := store.NewMemoryStore() + memStore := store.NewMemStore() reflectorServer := reflector.NewServer(memStore) reflectorServer.Timeout = 3 * time.Minute diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 17ce4e0..59cf050 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -57,7 +57,8 @@ func (s *Server) Shutdown() { } const ( - ns = "reflector" + ns = "reflector" + subsystemCache = "cache" labelDirection = "direction" labelErrorType = "error_type" @@ -65,7 +66,8 @@ const ( DirectionUpload = "upload" // to reflector DirectionDownload = "download" // from reflector - MtrLabelSource = "source" + LabelCacheType = "cache_type" + LabelSource = "source" errConnReset = "conn_reset" errReadConnReset = "read_conn_reset" @@ -116,25 +118,35 @@ var ( CacheHitCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: ns, - Name: "cache_hit_total", + Subsystem: subsystemCache, + Name: "hit_total", Help: "Total number of blobs retrieved from the cache storage", }) CacheMissCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: ns, - Name: "cache_miss_total", + Subsystem: subsystemCache, + Name: "miss_total", Help: "Total number of blobs retrieved from origin rather than cache storage", }) CacheOriginRequestsCount = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: ns, - Name: "cache_origin_requests_total", + Subsystem: subsystemCache, + Name: "origin_requests_total", Help: "How many Get requests are in flight from the cache to the origin", }) // during thundering-herd situations, the metric below should be a lot smaller than the metric above CacheWaitingRequestsCount = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: ns, - Name: "cache_waiting_requests_total", + Subsystem: subsystemCache, + Name: "waiting_requests_total", Help: "How many cache requests are waiting for an in-flight origin request", }) + CacheLRUEvictCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: ns, + Subsystem: subsystemCache, + Name: "evict_total", + Help: "Count of blobs evicted from cache", + }, []string{LabelCacheType}) BlobUploadCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: ns, @@ -151,7 +163,7 @@ var ( Namespace: ns, Name: "speed_mbps", Help: "Speed of blob retrieval", - }, []string{MtrLabelSource}) + }, []string{LabelSource}) MtrInBytesTcp = promauto.NewCounter(prometheus.CounterOpts{ Namespace: ns, diff --git a/peer/http3/store.go b/peer/http3/store.go index dea0290..4749ab0 100644 --- a/peer/http3/store.go +++ b/peer/http3/store.go @@ -55,6 +55,8 @@ func (p *Store) getClient() (*Client, error) { return c, errors.Prefix("connection error", err) } +func (p *Store) Name() string { return "http3" } + // Has asks the peer if they have a hash func (p *Store) Has(hash string) (bool, error) { c, err := p.getClient() diff --git a/peer/server_test.go b/peer/server_test.go index 7c9b5f2..7d21921 100644 --- a/peer/server_test.go +++ b/peer/server_test.go @@ -34,7 +34,7 @@ var availabilityRequests = []pair{ } func getServer(t *testing.T, withBlobs bool) *Server { - st := store.NewMemoryStore() + st := store.NewMemStore() if withBlobs { for k, v := range blobs { err := st.Put(k, v) diff --git a/peer/store.go b/peer/store.go index 3857426..b8abedd 100644 --- a/peer/store.go +++ b/peer/store.go @@ -30,6 +30,8 @@ func (p *Store) getClient() (*Client, error) { return c, errors.Prefix("connection error", err) } +func (p *Store) Name() string { return "peer" } + // Has asks the peer if they have a hash func (p *Store) Has(hash string) (bool, error) { c, err := p.getClient() diff --git a/reflector/server_test.go b/reflector/server_test.go index 1039417..0de200e 100644 --- a/reflector/server_test.go +++ b/reflector/server_test.go @@ -22,7 +22,7 @@ func startServerOnRandomPort(t *testing.T) (*Server, int) { t.Fatal(err) } - srv := NewServer(store.NewMemoryStore()) + srv := NewServer(store.NewMemStore()) err = srv.Start("127.0.0.1:" + strconv.Itoa(port)) if err != nil { t.Fatal(err) @@ -119,7 +119,7 @@ func TestServer_Timeout(t *testing.T) { t.Fatal(err) } - srv := NewServer(store.NewMemoryStore()) + srv := NewServer(store.NewMemStore()) srv.Timeout = testTimeout err = srv.Start("127.0.0.1:" + strconv.Itoa(port)) if err != nil { @@ -161,7 +161,7 @@ func TestServer_Timeout(t *testing.T) { //} type mockPartialStore struct { - *store.MemoryStore + *store.MemStore missing []string } @@ -181,7 +181,7 @@ func TestServer_PartialUpload(t *testing.T) { missing[i] = bits.Rand().String() } - st := store.BlobStore(&mockPartialStore{MemoryStore: store.NewMemoryStore(), missing: missing}) + st := store.BlobStore(&mockPartialStore{MemStore: store.NewMemStore(), missing: missing}) if _, ok := st.(neededBlobChecker); !ok { t.Fatal("mock does not implement the relevant interface") } diff --git a/store/caching.go b/store/caching.go index e4ff6b3..c5ee404 100644 --- a/store/caching.go +++ b/store/caching.go @@ -25,6 +25,11 @@ func NewCachingStore(origin, cache BlobStore) *CachingStore { return &CachingStore{origin: origin, cache: cache, sf: new(singleflight.Group)} } +const nameCaching = "caching" + +// Name is the cache type name +func (c *CachingStore) Name() string { return nameCaching } + // Has checks the cache and then the origin for a hash. It returns true if either store has it. func (c *CachingStore) Has(hash string) (bool, error) { has, err := c.cache.Has(hash) @@ -42,7 +47,7 @@ func (c *CachingStore) Get(hash string) (stream.Blob, error) { if err == nil || !errors.Is(err, ErrBlobNotFound) { metrics.CacheHitCount.Inc() rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds() - metrics.RetrieverSpeed.With(map[string]string{metrics.MtrLabelSource: "cache"}).Set(rate) + metrics.RetrieverSpeed.With(map[string]string{metrics.LabelSource: "cache"}).Set(rate) return blob, err } @@ -66,7 +71,7 @@ func (c *CachingStore) getFromOrigin(hash string) (stream.Blob, error) { } rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds() - metrics.RetrieverSpeed.With(map[string]string{metrics.MtrLabelSource: "origin"}).Set(rate) + metrics.RetrieverSpeed.With(map[string]string{metrics.LabelSource: "origin"}).Set(rate) err = c.cache.Put(hash, blob) return blob, err diff --git a/store/caching_test.go b/store/caching_test.go index f56c833..5ea61d5 100644 --- a/store/caching_test.go +++ b/store/caching_test.go @@ -9,9 +9,9 @@ import ( "github.com/lbryio/lbry.go/v2/stream" ) -func TestCachingBlobStore_Put(t *testing.T) { - origin := NewMemoryStore() - cache := NewMemoryStore() +func TestCachingStore_Put(t *testing.T) { + origin := NewMemStore() + cache := NewMemStore() s := NewCachingStore(origin, cache) b := []byte("this is a blob of stuff") @@ -39,9 +39,9 @@ func TestCachingBlobStore_Put(t *testing.T) { } } -func TestCachingBlobStore_CacheMiss(t *testing.T) { - origin := NewMemoryStore() - cache := NewMemoryStore() +func TestCachingStore_CacheMiss(t *testing.T) { + origin := NewMemStore() + cache := NewMemStore() s := NewCachingStore(origin, cache) b := []byte("this is a blob of stuff") @@ -76,10 +76,10 @@ func TestCachingBlobStore_CacheMiss(t *testing.T) { } } -func TestCachingBlobStore_ThunderingHerd(t *testing.T) { +func TestCachingStore_ThunderingHerd(t *testing.T) { storeDelay := 100 * time.Millisecond origin := NewSlowBlobStore(storeDelay) - cache := NewMemoryStore() + cache := NewMemStore() s := NewCachingStore(origin, cache) b := []byte("this is a blob of stuff") @@ -129,16 +129,19 @@ func TestCachingBlobStore_ThunderingHerd(t *testing.T) { // SlowBlobStore adds a delay to each request type SlowBlobStore struct { - mem *MemoryStore + mem *MemStore delay time.Duration } func NewSlowBlobStore(delay time.Duration) *SlowBlobStore { return &SlowBlobStore{ - mem: NewMemoryStore(), + mem: NewMemStore(), delay: delay, } } +func (s *SlowBlobStore) Name() string { + return "slow" +} func (s *SlowBlobStore) Has(hash string) (bool, error) { time.Sleep(s.delay) diff --git a/store/cloudfront.go b/store/cloudfront.go index c4adb3f..32f1246 100644 --- a/store/cloudfront.go +++ b/store/cloudfront.go @@ -34,6 +34,11 @@ func NewCloudFrontStore(s3 *S3Store, cfEndpoint string) *CloudFrontStore { } } +const nameCloudFront = "cloudfront" + +// Name is the cache type name +func (c *CloudFrontStore) Name() string { return nameCloudFront } + // Has checks if the hash is in the store. func (c *CloudFrontStore) Has(hash string) (bool, error) { status, body, err := c.cfRequest(http.MethodHead, hash) diff --git a/store/dbbacked.go b/store/dbbacked.go index 1b554ca..59c47b5 100644 --- a/store/dbbacked.go +++ b/store/dbbacked.go @@ -25,6 +25,11 @@ func NewDBBackedStore(blobs BlobStore, db *db.SQL) *DBBackedStore { return &DBBackedStore{blobs: blobs, db: db} } +const nameDBBacked = "db-backed" + +// Name is the cache type name +func (d *DBBackedStore) Name() string { return nameDBBacked } + // Has returns true if the blob is in the store func (d *DBBackedStore) Has(hash string) (bool, error) { return d.db.HasBlob(hash) diff --git a/store/disk.go b/store/disk.go index 71c8058..2a11e8e 100644 --- a/store/disk.go +++ b/store/disk.go @@ -35,34 +35,10 @@ func NewDiskStore(dir string, prefixLength int) *DiskStore { } } -func (d *DiskStore) dir(hash string) string { - if d.prefixLength <= 0 || len(hash) < d.prefixLength { - return d.blobDir - } - return path.Join(d.blobDir, hash[:d.prefixLength]) -} +const nameDisk = "disk" -func (d *DiskStore) path(hash string) string { - return path.Join(d.dir(hash), hash) -} - -func (d *DiskStore) ensureDirExists(dir string) error { - return errors.Err(d.fs.MkdirAll(dir, 0755)) -} - -func (d *DiskStore) initOnce() error { - if d.initialized { - return nil - } - - err := d.ensureDirExists(d.blobDir) - if err != nil { - return err - } - - d.initialized = true - return nil -} +// Name is the cache type name +func (d *DiskStore) Name() string { return nameDisk } // Has returns T/F or Error if it the blob stored already. It will error with any IO disk error. func (d *DiskStore) Has(hash string) (bool, error) { @@ -166,3 +142,32 @@ func (d *DiskStore) list() ([]string, error) { return existing, nil } + +func (d *DiskStore) dir(hash string) string { + if d.prefixLength <= 0 || len(hash) < d.prefixLength { + return d.blobDir + } + return path.Join(d.blobDir, hash[:d.prefixLength]) +} + +func (d *DiskStore) path(hash string) string { + return path.Join(d.dir(hash), hash) +} + +func (d *DiskStore) ensureDirExists(dir string) error { + return errors.Err(d.fs.MkdirAll(dir, 0755)) +} + +func (d *DiskStore) initOnce() error { + if d.initialized { + return nil + } + + err := d.ensureDirExists(d.blobDir) + if err != nil { + return err + } + + d.initialized = true + return nil +} diff --git a/store/lru.go b/store/lru.go index 22386fa..a9199c4 100644 --- a/store/lru.go +++ b/store/lru.go @@ -3,6 +3,7 @@ package store import ( "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" + "github.com/lbryio/reflector.go/internal/metrics" golru "github.com/hashicorp/golang-lru" ) @@ -18,6 +19,7 @@ type LRUStore struct { // NewLRUStore initialize a new LRUStore func NewLRUStore(store BlobStore, maxItems int) *LRUStore { lru, err := golru.NewWithEvict(maxItems, func(key interface{}, value interface{}) { + metrics.CacheLRUEvictCount.WithLabelValues(store.Name()).Inc() _ = store.Delete(key.(string)) // TODO: log this error. may happen if underlying entry is gone but cache entry still there }) if err != nil { @@ -39,6 +41,11 @@ func NewLRUStore(store BlobStore, maxItems int) *LRUStore { return l } +const nameLRU = "lru" + +// Name is the cache type name +func (l *LRUStore) Name() string { return nameLRU } + // Has returns whether the blob is in the store, without updating the recent-ness. func (l *LRUStore) Has(hash string) (bool, error) { return l.lru.Contains(hash), nil diff --git a/store/lru_test.go b/store/lru_test.go index c95ca6b..5b41220 100644 --- a/store/lru_test.go +++ b/store/lru_test.go @@ -14,7 +14,7 @@ import ( const cacheMaxBlobs = 3 -func testLRUStore() (*LRUStore, *DiskStore) { +func getTestLRUStore() (*LRUStore, *DiskStore) { d := NewDiskStore("/", 2) d.fs = afero.NewMemMapFs() return NewLRUStore(d, 3), d @@ -42,7 +42,7 @@ func countOnDisk(t *testing.T, disk *DiskStore) int { } func TestLRUStore_Eviction(t *testing.T) { - lru, disk := testLRUStore() + lru, disk := getTestLRUStore() b := []byte("x") err := lru.Put("one", b) require.NoError(t, err) @@ -98,7 +98,7 @@ func TestLRUStore_Eviction(t *testing.T) { } func TestLRUStore_UnderlyingBlobMissing(t *testing.T) { - lru, disk := testLRUStore() + lru, disk := getTestLRUStore() hash := "hash" b := []byte("this is a blob of stuff") err := lru.Put(hash, b) diff --git a/store/memory.go b/store/memory.go index c50f282..47969fc 100644 --- a/store/memory.go +++ b/store/memory.go @@ -5,25 +5,30 @@ import ( "github.com/lbryio/lbry.go/v2/stream" ) -// MemoryStore is an in memory only blob store with no persistence. -type MemoryStore struct { +// MemStore is an in memory only blob store with no persistence. +type MemStore struct { blobs map[string]stream.Blob } -func NewMemoryStore() *MemoryStore { - return &MemoryStore{ +func NewMemStore() *MemStore { + return &MemStore{ blobs: make(map[string]stream.Blob), } } +const nameMem = "mem" + +// Name is the cache type name +func (m *MemStore) Name() string { return nameMem } + // Has returns T/F if the blob is currently stored. It will never error. -func (m *MemoryStore) Has(hash string) (bool, error) { +func (m *MemStore) Has(hash string) (bool, error) { _, ok := m.blobs[hash] return ok, nil } // Get returns the blob byte slice if present and errors if the blob is not found. -func (m *MemoryStore) Get(hash string) (stream.Blob, error) { +func (m *MemStore) Get(hash string) (stream.Blob, error) { blob, ok := m.blobs[hash] if !ok { return nil, errors.Err(ErrBlobNotFound) @@ -32,23 +37,23 @@ func (m *MemoryStore) Get(hash string) (stream.Blob, error) { } // Put stores the blob in memory -func (m *MemoryStore) Put(hash string, blob stream.Blob) error { +func (m *MemStore) Put(hash string, blob stream.Blob) error { m.blobs[hash] = blob return nil } // PutSD stores the sd blob in memory -func (m *MemoryStore) PutSD(hash string, blob stream.Blob) error { +func (m *MemStore) PutSD(hash string, blob stream.Blob) error { return m.Put(hash, blob) } // Delete deletes the blob from the store -func (m *MemoryStore) Delete(hash string) error { +func (m *MemStore) Delete(hash string) error { delete(m.blobs, hash) return nil } // Debug returns the blobs in memory. It's useful for testing and debugging. -func (m *MemoryStore) Debug() map[string]stream.Blob { +func (m *MemStore) Debug() map[string]stream.Blob { return m.blobs } diff --git a/store/memory_test.go b/store/memory_test.go index 892c735..8d85114 100644 --- a/store/memory_test.go +++ b/store/memory_test.go @@ -7,8 +7,8 @@ import ( "github.com/lbryio/lbry.go/v2/extras/errors" ) -func TestMemoryBlobStore_Put(t *testing.T) { - s := NewMemoryStore() +func TestMemStore_Put(t *testing.T) { + s := NewMemStore() blob := []byte("abcdefg") err := s.Put("abc", blob) if err != nil { @@ -16,8 +16,8 @@ func TestMemoryBlobStore_Put(t *testing.T) { } } -func TestMemoryBlobStore_Get(t *testing.T) { - s := NewMemoryStore() +func TestMemStore_Get(t *testing.T) { + s := NewMemStore() hash := "abc" blob := []byte("abcdefg") err := s.Put(hash, blob) diff --git a/store/s3.go b/store/s3.go index 2189ef1..53451be 100644 --- a/store/s3.go +++ b/store/s3.go @@ -38,22 +38,10 @@ func NewS3Store(awsID, awsSecret, region, bucket string) *S3Store { } } -func (s *S3Store) initOnce() error { - if s.session != nil { - return nil - } +const nameS3 = "s3" - sess, err := session.NewSession(&aws.Config{ - Credentials: credentials.NewStaticCredentials(s.awsID, s.awsSecret, ""), - Region: aws.String(s.region), - }) - if err != nil { - return err - } - - s.session = sess - return nil -} +// Name is the cache type name +func (s *S3Store) Name() string { return nameS3 } // Has returns T/F or Error ( from S3 ) if the store contains the blob. func (s *S3Store) Has(hash string) (bool, error) { @@ -153,3 +141,20 @@ func (s *S3Store) Delete(hash string) error { return err } + +func (s *S3Store) initOnce() error { + if s.session != nil { + return nil + } + + sess, err := session.NewSession(&aws.Config{ + Credentials: credentials.NewStaticCredentials(s.awsID, s.awsSecret, ""), + Region: aws.String(s.region), + }) + if err != nil { + return err + } + + s.session = sess + return nil +} diff --git a/store/store.go b/store/store.go index f8f6dd3..80d56c1 100644 --- a/store/store.go +++ b/store/store.go @@ -7,6 +7,8 @@ import ( // BlobStore is an interface for handling blob storage. type BlobStore interface { + // Name of blob store (useful for metrics) + Name() string // Does blob exist in the store Has(hash string) (bool, error) // Get the blob from the store