diff --git a/store/disk.go b/store/disk.go index 5c4d2ff..dfbc0f2 100644 --- a/store/disk.go +++ b/store/disk.go @@ -8,22 +8,17 @@ import ( "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" - "github.com/spf13/afero" - lru "github.com/hashicorp/golang-lru" + "github.com/spf13/afero" ) // DiskBlobStore stores blobs on a local disk type DiskBlobStore struct { // the location of blobs on disk blobDir string - // max number of blobs to store - maxBlobs int // store files in subdirectories based on the first N chars in the filename. 0 = don't create subdirectories. prefixLength int - // lru cache - lru *lru.Cache // filesystem abstraction fs afero.Fs @@ -32,14 +27,12 @@ type DiskBlobStore struct { } // NewDiskBlobStore returns an initialized file disk store pointer. -func NewDiskBlobStore(dir string, maxBlobs, prefixLength int) *DiskBlobStore { - dbs := DiskBlobStore{ +func NewDiskBlobStore(dir string, prefixLength int) *DiskBlobStore { + return &DiskBlobStore{ blobDir: dir, - maxBlobs: maxBlobs, prefixLength: prefixLength, fs: afero.NewOsFs(), } - return &dbs } func (d *DiskBlobStore) dir(hash string) string { @@ -67,19 +60,6 @@ func (d *DiskBlobStore) initOnce() error { return err } - l, err := lru.NewWithEvict(d.maxBlobs, func(key interface{}, value interface{}) { - _ = d.fs.Remove(d.path(key.(string))) // TODO: log this error. may happen if file is gone but cache entry still there? - }) - if err != nil { - return errors.Err(err) - } - d.lru = l - - err = d.loadExisting() - if err != nil { - return err - } - d.initialized = true return nil } @@ -91,7 +71,14 @@ func (d *DiskBlobStore) Has(hash string) (bool, error) { return false, err } - return d.lru.Contains(hash), nil + _, err = d.fs.Stat(d.path(hash)) + if err != nil { + if os.IsNotExist(err) { + return false, nil + } + return false, errors.Err(err) + } + return true, nil } // Get returns the blob or an error if the blob doesn't exist. @@ -101,22 +88,17 @@ func (d *DiskBlobStore) Get(hash string) (stream.Blob, error) { return nil, err } - _, has := d.lru.Get(hash) - if !has { - return nil, errors.Err(ErrBlobNotFound) - } - file, err := d.fs.Open(d.path(hash)) if err != nil { if os.IsNotExist(err) { - d.lru.Remove(hash) return nil, errors.Err(ErrBlobNotFound) } return nil, err } defer file.Close() - return ioutil.ReadAll(file) + blob, err := ioutil.ReadAll(file) + return blob, errors.Err(err) } // Put stores the blob on disk @@ -132,13 +114,7 @@ func (d *DiskBlobStore) Put(hash string, blob stream.Blob) error { } err = afero.WriteFile(d.fs, d.path(hash), blob, 0644) - if err != nil { - return errors.Err(err) - } - - d.lru.Add(hash, true) - - return nil + return errors.Err(err) } // PutSD stores the sd blob on the disk @@ -153,30 +129,40 @@ func (d *DiskBlobStore) Delete(hash string) error { return err } - d.lru.Remove(hash) - return nil -} - -// loadExisting scans the blobDir and imports existing blobs into lru cache -func (d *DiskBlobStore) loadExisting() error { - dirs, err := afero.ReadDir(d.fs, d.blobDir) + has, err := d.Has(hash) if err != nil { return err } + if !has { + return nil + } + + err = d.fs.Remove(d.path(hash)) + return errors.Err(err) +} + +// list returns a slice of blobs that already exist in the blobDir +func (d *DiskBlobStore) list() ([]string, error) { + dirs, err := afero.ReadDir(d.fs, d.blobDir) + if err != nil { + return nil, err + } + + var existing []string for _, dir := range dirs { if dir.IsDir() { files, err := afero.ReadDir(d.fs, filepath.Join(d.blobDir, dir.Name())) if err != nil { - return err + return nil, err } for _, file := range files { if file.Mode().IsRegular() && !file.IsDir() { - d.lru.Add(file.Name(), true) + existing = append(existing, file.Name()) } } } } - return nil + return existing, nil } diff --git a/store/lru.go b/store/lru.go new file mode 100644 index 0000000..22386fa --- /dev/null +++ b/store/lru.go @@ -0,0 +1,113 @@ +package store + +import ( + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/stream" + + golru "github.com/hashicorp/golang-lru" +) + +// LRUStore adds a max cache size and LRU eviction to a BlobStore +type LRUStore struct { + // underlying store + store BlobStore + // lru implementation + lru *golru.Cache +} + +// NewLRUStore initialize a new LRUStore +func NewLRUStore(store BlobStore, maxItems int) *LRUStore { + lru, err := golru.NewWithEvict(maxItems, func(key interface{}, value interface{}) { + _ = store.Delete(key.(string)) // TODO: log this error. may happen if underlying entry is gone but cache entry still there + }) + if err != nil { + panic(err) + } + + l := &LRUStore{ + store: store, + lru: lru, + } + + if lstr, ok := store.(lister); ok { + err = l.loadExisting(lstr, maxItems) + if err != nil { + panic(err) // TODO: what should happen here? panic? return nil? just keep going? + } + } + + return l +} + +// Has returns whether the blob is in the store, without updating the recent-ness. +func (l *LRUStore) Has(hash string) (bool, error) { + return l.lru.Contains(hash), nil +} + +// Get returns the blob or an error if the blob doesn't exist. +func (l *LRUStore) Get(hash string) (stream.Blob, error) { + _, has := l.lru.Get(hash) + if !has { + return nil, errors.Err(ErrBlobNotFound) + } + blob, err := l.store.Get(hash) + if errors.Is(err, ErrBlobNotFound) { + // Blob disappeared from underlying store + l.lru.Remove(hash) + } + return blob, err +} + +// Put stores the blob +func (l *LRUStore) Put(hash string, blob stream.Blob) error { + err := l.store.Put(hash, blob) + if err != nil { + return err + } + + l.lru.Add(hash, true) + return nil +} + +// PutSD stores the sd blob +func (l *LRUStore) PutSD(hash string, blob stream.Blob) error { + err := l.store.PutSD(hash, blob) + if err != nil { + return err + } + + l.lru.Add(hash, true) + return nil +} + +// Delete deletes the blob from the store +func (l *LRUStore) Delete(hash string) error { + err := l.store.Delete(hash) + if err != nil { + return err + } + + // This must come after store.Delete() + // Remove triggers onEvict function, which also tries to delete blob from store + // We need to delete it manually first so any errors can be propagated up + l.lru.Remove(hash) + return nil +} + +// loadExisting imports existing blobs from the underlying store into the LRU cache +func (l *LRUStore) loadExisting(store lister, maxItems int) error { + existing, err := store.list() + if err != nil { + return err + } + + added := 0 + for _, h := range existing { + l.lru.Add(h, true) + added++ + if maxItems > 0 && added >= maxItems { // underlying cache is bigger than LRU cache + break + } + } + return nil +} diff --git a/store/disk_test.go b/store/lru_test.go similarity index 52% rename from store/disk_test.go rename to store/lru_test.go index d4f6a80..92ddb7d 100644 --- a/store/disk_test.go +++ b/store/lru_test.go @@ -14,16 +14,17 @@ import ( const cacheMaxBlobs = 3 -func memDiskStore() *DiskBlobStore { - d := NewDiskBlobStore("/", cacheMaxBlobs, 2) +func testLRUStore() (*LRUStore, *DiskBlobStore) { + d := NewDiskBlobStore("/", 2) d.fs = afero.NewMemMapFs() - return d + return NewLRUStore(d, 3), d } -func countOnDisk(t *testing.T, fs afero.Fs) int { +func countOnDisk(t *testing.T, disk *DiskBlobStore) int { t.Helper() + count := 0 - afero.Walk(fs, "/", func(path string, info os.FileInfo, err error) error { + afero.Walk(disk.fs, "/", func(path string, info os.FileInfo, err error) error { if err != nil { t.Fatal(err) } @@ -32,24 +33,29 @@ func countOnDisk(t *testing.T, fs afero.Fs) int { } return nil }) + + list, err := disk.list() + require.NoError(t, err) + require.Equal(t, count, len(list)) + return count } -func TestDiskBlobStore_LRU(t *testing.T) { - d := memDiskStore() +func TestLRUStore_Eviction(t *testing.T) { + lru, disk := testLRUStore() b := []byte("x") - err := d.Put("one", b) + err := lru.Put("one", b) require.NoError(t, err) - err = d.Put("two", b) + err = lru.Put("two", b) require.NoError(t, err) - err = d.Put("three", b) + err = lru.Put("three", b) require.NoError(t, err) - err = d.Put("four", b) + err = lru.Put("four", b) require.NoError(t, err) - err = d.Put("five", b) + err = lru.Put("five", b) require.NoError(t, err) - assert.Equal(t, cacheMaxBlobs, countOnDisk(t, d.fs)) + assert.Equal(t, cacheMaxBlobs, countOnDisk(t, disk)) for k, v := range map[string]bool{ "one": false, @@ -59,15 +65,15 @@ func TestDiskBlobStore_LRU(t *testing.T) { "five": true, "six": false, } { - has, err := d.Has(k) + has, err := lru.Has(k) assert.NoError(t, err) assert.Equal(t, v, has) } - d.Get("three") // touch so it stays in cache - d.Put("six", b) + lru.Get("three") // touch so it stays in cache + lru.Put("six", b) - assert.Equal(t, cacheMaxBlobs, countOnDisk(t, d.fs)) + assert.Equal(t, cacheMaxBlobs, countOnDisk(t, disk)) for k, v := range map[string]bool{ "one": false, @@ -77,33 +83,39 @@ func TestDiskBlobStore_LRU(t *testing.T) { "five": true, "six": true, } { - has, err := d.Has(k) + has, err := lru.Has(k) assert.NoError(t, err) assert.Equal(t, v, has) } - err = d.Delete("three") + err = lru.Delete("three") assert.NoError(t, err) - err = d.Delete("five") + err = lru.Delete("five") assert.NoError(t, err) - err = d.Delete("six") + err = lru.Delete("six") assert.NoError(t, err) - assert.Equal(t, 0, countOnDisk(t, d.fs)) + assert.Equal(t, 0, countOnDisk(t, disk)) } -func TestDiskBlobStore_FileMissingOnDisk(t *testing.T) { - d := memDiskStore() +func TestLRUStore_UnderlyingBlobMissing(t *testing.T) { + lru, disk := testLRUStore() hash := "hash" b := []byte("this is a blob of stuff") - err := d.Put(hash, b) + err := lru.Put(hash, b) require.NoError(t, err) - err = d.fs.Remove("/ha/hash") + err = disk.fs.Remove("/ha/hash") require.NoError(t, err) - blob, err := d.Get(hash) + // hash still exists in lru + assert.True(t, lru.lru.Contains(hash)) + + blob, err := lru.Get(hash) assert.Nil(t, blob) assert.True(t, errors.Is(err, ErrBlobNotFound), "expected (%s) %s, got (%s) %s", reflect.TypeOf(ErrBlobNotFound).String(), ErrBlobNotFound.Error(), reflect.TypeOf(err).String(), err.Error()) + + // lru.Get() removes hash if underlying store doesn't have it + assert.False(t, lru.lru.Contains(hash)) } diff --git a/store/store.go b/store/store.go index e2614e5..f8f6dd3 100644 --- a/store/store.go +++ b/store/store.go @@ -27,5 +27,11 @@ type Blocklister interface { Wants(hash string) (bool, error) } +// lister is a store that can list cached blobs. This is helpful when an overlay +// cache needs to track blob existence. +type lister interface { + list() ([]string, error) +} + //ErrBlobNotFound is a standard error when a blob is not found in the store. var ErrBlobNotFound = errors.Base("blob not found")