From 2ca83139dfe70df44ecfe72d6886a0daaba2018e Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Thu, 3 Oct 2019 16:34:57 -0400 Subject: [PATCH] use stream.Blob for BlobStore interface --- cmd/getstream.go | 6 ++-- peer/store.go | 7 +++-- store/caching.go | 7 +++-- store/dbbacked.go | 8 ++++-- store/{file.go => disk.go} | 57 +++++++++++++++++++------------------- store/memory.go | 11 +++++--- store/s3.go | 13 +++++---- store/store.go | 11 +++++--- 8 files changed, 65 insertions(+), 55 deletions(-) rename store/{file.go => disk.go} (54%) diff --git a/cmd/getstream.go b/cmd/getstream.go index 90990f1..9d45023 100644 --- a/cmd/getstream.go +++ b/cmd/getstream.go @@ -5,11 +5,10 @@ import ( "os" "github.com/lbryio/reflector.go/peer" + "github.com/lbryio/reflector.go/store" "github.com/lbryio/lbry.go/stream" - "github.com/lbryio/reflector.go/store" - log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) @@ -56,11 +55,10 @@ func getStreamCmd(cmd *cobra.Command, args []string) { } for i := 0; i < len(sd.BlobInfos)-1; i++ { - bb, err := s.Get(hex.EncodeToString(sd.BlobInfos[i].BlobHash)) + b, err := s.Get(hex.EncodeToString(sd.BlobInfos[i].BlobHash)) if err != nil { log.Fatal(err) } - b := stream.Blob(bb) data, err := b.Plaintext(sd.Key, sd.BlobInfos[i].IV) if err != nil { diff --git a/peer/store.go b/peer/store.go index a98d141..f642040 100644 --- a/peer/store.go +++ b/peer/store.go @@ -2,6 +2,7 @@ package peer import ( "github.com/lbryio/lbry.go/extras/errors" + "github.com/lbryio/lbry.go/stream" ) // Store is a blob store that gets blobs from a peer. @@ -28,7 +29,7 @@ func (p *Store) Has(hash string) (bool, error) { } // Get downloads the blob from the peer -func (p *Store) Get(hash string) ([]byte, error) { +func (p *Store) Get(hash string) (stream.Blob, error) { if p.connErr != nil { return nil, errors.Prefix("connection error", p.connErr) } @@ -37,12 +38,12 @@ func (p *Store) Get(hash string) ([]byte, error) { } // Put is not supported -func (p *Store) Put(hash string, blob []byte) error { +func (p *Store) Put(hash string, blob stream.Blob) error { panic("PeerStore cannot put or delete blobs") } // PutSD is not supported -func (p *Store) PutSD(hash string, blob []byte) error { +func (p *Store) PutSD(hash string, blob stream.Blob) error { panic("PeerStore cannot put or delete blobs") } diff --git a/store/caching.go b/store/caching.go index 747f3a2..422410d 100644 --- a/store/caching.go +++ b/store/caching.go @@ -2,6 +2,7 @@ package store import ( "github.com/lbryio/lbry.go/extras/errors" + "github.com/lbryio/lbry.go/stream" ) // CachingBlobStore combines two stores, typically a local and a remote store, to improve performance. @@ -27,7 +28,7 @@ func (c *CachingBlobStore) Has(hash string) (bool, error) { // Get tries to get the blob from the cache first, falling back to the origin. If the blob comes // from the origin, it is also stored in the cache. -func (c *CachingBlobStore) Get(hash string) ([]byte, error) { +func (c *CachingBlobStore) Get(hash string) (stream.Blob, error) { blob, err := c.cache.Get(hash) if err == nil || !errors.Is(err, ErrBlobNotFound) { return blob, err @@ -44,7 +45,7 @@ func (c *CachingBlobStore) Get(hash string) ([]byte, error) { } // Put stores the blob in the origin and the cache -func (c *CachingBlobStore) Put(hash string, blob []byte) error { +func (c *CachingBlobStore) Put(hash string, blob stream.Blob) error { err := c.origin.Put(hash, blob) if err != nil { return err @@ -53,7 +54,7 @@ func (c *CachingBlobStore) Put(hash string, blob []byte) error { } // PutSD stores the sd blob in the origin and the cache -func (c *CachingBlobStore) PutSD(hash string, blob []byte) error { +func (c *CachingBlobStore) PutSD(hash string, blob stream.Blob) error { err := c.origin.PutSD(hash, blob) if err != nil { return err diff --git a/store/dbbacked.go b/store/dbbacked.go index 528fe37..f5feb0a 100644 --- a/store/dbbacked.go +++ b/store/dbbacked.go @@ -7,6 +7,8 @@ import ( "github.com/lbryio/reflector.go/db" "github.com/lbryio/lbry.go/extras/errors" + "github.com/lbryio/lbry.go/stream" + log "github.com/sirupsen/logrus" ) @@ -29,12 +31,12 @@ func (d *DBBackedS3Store) Has(hash string) (bool, error) { } // Get gets the blob -func (d *DBBackedS3Store) Get(hash string) ([]byte, error) { +func (d *DBBackedS3Store) Get(hash string) (stream.Blob, error) { return d.s3.Get(hash) } // Put stores the blob in the S3 store and stores the blob information in the DB. -func (d *DBBackedS3Store) Put(hash string, blob []byte) error { +func (d *DBBackedS3Store) Put(hash string, blob stream.Blob) error { err := d.s3.Put(hash, blob) if err != nil { return err @@ -45,7 +47,7 @@ func (d *DBBackedS3Store) Put(hash string, blob []byte) error { // PutSD stores the SDBlob in the S3 store. It will return an error if the sd blob is missing the stream hash or if // there is an error storing the blob information in the DB. -func (d *DBBackedS3Store) PutSD(hash string, blob []byte) error { +func (d *DBBackedS3Store) PutSD(hash string, blob stream.Blob) error { var blobContents db.SdBlob err := json.Unmarshal(blob, &blobContents) if err != nil { diff --git a/store/file.go b/store/disk.go similarity index 54% rename from store/file.go rename to store/disk.go index c47772c..765914f 100644 --- a/store/file.go +++ b/store/disk.go @@ -6,6 +6,7 @@ import ( "path" "github.com/lbryio/lbry.go/extras/errors" + "github.com/lbryio/lbry.go/stream" ) // DiskBlobStore stores blobs on a local disk @@ -23,43 +24,43 @@ func NewDiskBlobStore(dir string, prefixLength int) *DiskBlobStore { return &DiskBlobStore{blobDir: dir, prefixLength: prefixLength} } -func (f *DiskBlobStore) dir(hash string) string { - if f.prefixLength <= 0 || len(hash) < f.prefixLength { - return f.blobDir +func (d *DiskBlobStore) dir(hash string) string { + if d.prefixLength <= 0 || len(hash) < d.prefixLength { + return d.blobDir } - return path.Join(f.blobDir, hash[:f.prefixLength]) + return path.Join(d.blobDir, hash[:d.prefixLength]) } -func (f *DiskBlobStore) path(hash string) string { - return path.Join(f.dir(hash), hash) +func (d *DiskBlobStore) path(hash string) string { + return path.Join(d.dir(hash), hash) } -func (f *DiskBlobStore) ensureDirExists(dir string) error { +func (d *DiskBlobStore) ensureDirExists(dir string) error { return errors.Err(os.MkdirAll(dir, 0755)) } -func (f *DiskBlobStore) initOnce() error { - if f.initialized { +func (d *DiskBlobStore) initOnce() error { + if d.initialized { return nil } - err := f.ensureDirExists(f.blobDir) + err := d.ensureDirExists(d.blobDir) if err != nil { return err } - f.initialized = true + d.initialized = true return nil } // Has returns T/F or Error if it the blob stored already. It will error with any IO disk error. -func (f *DiskBlobStore) Has(hash string) (bool, error) { - err := f.initOnce() +func (d *DiskBlobStore) Has(hash string) (bool, error) { + err := d.initOnce() if err != nil { return false, err } - _, err = os.Stat(f.path(hash)) + _, err = os.Stat(d.path(hash)) if err != nil { if os.IsNotExist(err) { return false, nil @@ -69,14 +70,14 @@ func (f *DiskBlobStore) Has(hash string) (bool, error) { return true, nil } -// Get returns the byte slice of the blob stored or will error if the blob doesn't exist. -func (f *DiskBlobStore) Get(hash string) ([]byte, error) { - err := f.initOnce() +// Get returns the blob or an error if the blob doesn't exist. +func (d *DiskBlobStore) Get(hash string) (stream.Blob, error) { + err := d.initOnce() if err != nil { return nil, err } - file, err := os.Open(f.path(hash)) + file, err := os.Open(d.path(hash)) if err != nil { if os.IsNotExist(err) { return nil, errors.Err(ErrBlobNotFound) @@ -88,33 +89,33 @@ func (f *DiskBlobStore) Get(hash string) ([]byte, error) { } // Put stores the blob on disk -func (f *DiskBlobStore) Put(hash string, blob []byte) error { - err := f.initOnce() +func (d *DiskBlobStore) Put(hash string, blob stream.Blob) error { + err := d.initOnce() if err != nil { return err } - err = f.ensureDirExists(f.dir(hash)) + err = d.ensureDirExists(d.dir(hash)) if err != nil { return err } - return ioutil.WriteFile(f.path(hash), blob, 0644) + return ioutil.WriteFile(d.path(hash), blob, 0644) } // PutSD stores the sd blob on the disk -func (f *DiskBlobStore) PutSD(hash string, blob []byte) error { - return f.Put(hash, blob) +func (d *DiskBlobStore) PutSD(hash string, blob stream.Blob) error { + return d.Put(hash, blob) } // Delete deletes the blob from the store -func (f *DiskBlobStore) Delete(hash string) error { - err := f.initOnce() +func (d *DiskBlobStore) Delete(hash string) error { + err := d.initOnce() if err != nil { return err } - has, err := f.Has(hash) + has, err := d.Has(hash) if err != nil { return err } @@ -122,5 +123,5 @@ func (f *DiskBlobStore) Delete(hash string) error { return nil } - return os.Remove(f.path(hash)) + return os.Remove(d.path(hash)) } diff --git a/store/memory.go b/store/memory.go index f7eee95..5d333f3 100644 --- a/store/memory.go +++ b/store/memory.go @@ -1,6 +1,9 @@ package store -import "github.com/lbryio/lbry.go/extras/errors" +import ( + "github.com/lbryio/lbry.go/extras/errors" + "github.com/lbryio/lbry.go/stream" +) // MemoryBlobStore is an in memory only blob store with no persistence. type MemoryBlobStore struct { @@ -17,7 +20,7 @@ func (m *MemoryBlobStore) Has(hash string) (bool, error) { } // Get returns the blob byte slice if present and errors if the blob is not found. -func (m *MemoryBlobStore) Get(hash string) ([]byte, error) { +func (m *MemoryBlobStore) Get(hash string) (stream.Blob, error) { if m.blobs == nil { m.blobs = make(map[string][]byte) } @@ -29,7 +32,7 @@ func (m *MemoryBlobStore) Get(hash string) ([]byte, error) { } // Put stores the blob in memory -func (m *MemoryBlobStore) Put(hash string, blob []byte) error { +func (m *MemoryBlobStore) Put(hash string, blob stream.Blob) error { if m.blobs == nil { m.blobs = make(map[string][]byte) } @@ -38,7 +41,7 @@ func (m *MemoryBlobStore) Put(hash string, blob []byte) error { } // PutSD stores the sd blob in memory -func (m *MemoryBlobStore) PutSD(hash string, blob []byte) error { +func (m *MemoryBlobStore) PutSD(hash string, blob stream.Blob) error { return m.Put(hash, blob) } diff --git a/store/s3.go b/store/s3.go index 8543c49..bdb6734 100644 --- a/store/s3.go +++ b/store/s3.go @@ -6,6 +6,7 @@ import ( "time" "github.com/lbryio/lbry.go/extras/errors" + "github.com/lbryio/lbry.go/stream" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" @@ -75,11 +76,11 @@ func (s *S3BlobStore) Has(hash string) (bool, error) { } // Get returns the blob slice if present or errors on S3. -func (s *S3BlobStore) Get(hash string) ([]byte, error) { +func (s *S3BlobStore) Get(hash string) (stream.Blob, error) { //Todo-Need to handle error for blob doesn't exist for consistency. err := s.initOnce() if err != nil { - return []byte{}, err + return nil, err } log.Debugf("Getting %s from S3", hash[:8]) @@ -96,9 +97,9 @@ func (s *S3BlobStore) Get(hash string) ([]byte, error) { if aerr, ok := err.(awserr.Error); ok { switch aerr.Code() { case s3.ErrCodeNoSuchBucket: - return []byte{}, errors.Err("bucket %s does not exist", s.bucket) + return nil, errors.Err("bucket %s does not exist", s.bucket) case s3.ErrCodeNoSuchKey: - return []byte{}, errors.Err(ErrBlobNotFound) + return nil, errors.Err(ErrBlobNotFound) } } return buf.Bytes(), err @@ -108,7 +109,7 @@ func (s *S3BlobStore) Get(hash string) ([]byte, error) { } // Put stores the blob on S3 or errors if S3 connection errors. -func (s *S3BlobStore) Put(hash string, blob []byte) error { +func (s *S3BlobStore) Put(hash string, blob stream.Blob) error { err := s.initOnce() if err != nil { return err @@ -130,7 +131,7 @@ func (s *S3BlobStore) Put(hash string, blob []byte) error { } // PutSD stores the sd blob on S3 or errors if S3 connection errors. -func (s *S3BlobStore) PutSD(hash string, blob []byte) error { +func (s *S3BlobStore) PutSD(hash string, blob stream.Blob) error { //Todo - handle missing stream for consistency return s.Put(hash, blob) } diff --git a/store/store.go b/store/store.go index 551e810..1e9f241 100644 --- a/store/store.go +++ b/store/store.go @@ -1,17 +1,20 @@ package store -import "github.com/lbryio/lbry.go/extras/errors" +import ( + "github.com/lbryio/lbry.go/extras/errors" + "github.com/lbryio/lbry.go/stream" +) // BlobStore is an interface with methods for consistently handling blob storage. type BlobStore interface { // Does blob exist in the store Has(hash string) (bool, error) // Get the blob from the store - Get(hash string) ([]byte, error) + Get(hash string) (stream.Blob, error) // Put the blob into the store - Put(hash string, blob []byte) error + Put(hash string, blob stream.Blob) error // Put an SD blob into the store - PutSD(hash string, blob []byte) error + PutSD(hash string, blob stream.Blob) error // Delete the blob from the store Delete(hash string) error }