diff --git a/cmd/getstream.go b/cmd/getstream.go index f3f6d8f..355824c 100644 --- a/cmd/getstream.go +++ b/cmd/getstream.go @@ -30,7 +30,7 @@ func getStreamCmd(cmd *cobra.Command, args []string) { s := store.NewCachingBlobStore( peer.NewStore(peer.StoreOpts{Address: addr}), - store.NewDiskBlobStore("/tmp/lbry_downloaded_blobs", 2), + store.NewDiskBlobStore("/tmp/lbry_downloaded_blobs", 1000, 2), ) wd, err := os.Getwd() diff --git a/cmd/reflector.go b/cmd/reflector.go index f071f60..7a97e72 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -19,18 +19,21 @@ import ( "github.com/spf13/cobra" ) -var reflectorCmdCacheDir string -var tcpPeerPort int -var http3PeerPort int -var receiverPort int -var metricsPort int -var disableUploads bool -var disableBlocklist bool -var proxyAddress string -var proxyPort string -var proxyProtocol string -var useDB bool -var cloudFrontEndpoint string +var ( + tcpPeerPort int + http3PeerPort int + receiverPort int + metricsPort int + disableUploads bool + disableBlocklist bool + proxyAddress string + proxyPort string + proxyProtocol string + useDB bool + cloudFrontEndpoint string + reflectorCmdCacheDir string + reflectorCmdCacheMaxBlobs int +) func init() { var cmd = &cobra.Command{ @@ -38,7 +41,6 @@ func init() { Short: "Run reflector server", Run: reflectorCmd, } - cmd.Flags().StringVar(&reflectorCmdCacheDir, "cache", "", "if specified, the path where blobs should be cached (disabled when left empty)") cmd.Flags().StringVar(&proxyAddress, "proxy-address", "", "address of another reflector server where blobs are fetched from") cmd.Flags().StringVar(&proxyPort, "proxy-port", "5567", "port of another reflector server where blobs are fetched from") cmd.Flags().StringVar(&proxyProtocol, "proxy-protocol", "http3", "protocol used to fetch blobs from another reflector server (tcp/http3)") @@ -50,6 +52,8 @@ func init() { cmd.Flags().BoolVar(&disableUploads, "disable-uploads", false, "Disable uploads to this reflector server") cmd.Flags().BoolVar(&disableBlocklist, "disable-blocklist", false, "Disable blocklist watching/updating") cmd.Flags().BoolVar(&useDB, "use-db", true, "whether to connect to the reflector db or not") + cmd.Flags().StringVar(&reflectorCmdCacheDir, "cache", "", "if specified, the path where blobs should be cached (disabled when left empty)") + cmd.Flags().IntVar(&reflectorCmdCacheMaxBlobs, "cache-max-blobs", 0, "if cache is enabled, this option sets the max blobs the cache will hold") rootCmd.AddCommand(cmd) } @@ -113,7 +117,7 @@ func reflectorCmd(cmd *cobra.Command, args []string) { if err != nil { log.Fatal(err) } - blobStore = store.NewCachingBlobStore(blobStore, store.NewDiskBlobStore(reflectorCmdCacheDir, 2)) + blobStore = store.NewCachingBlobStore(blobStore, store.NewDiskBlobStore(reflectorCmdCacheDir, reflectorCmdCacheMaxBlobs, 2)) } peerServer := peer.NewServer(blobStore) diff --git a/go.mod b/go.mod index 4002591..b7de5f7 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/google/gops v0.3.7 github.com/gorilla/mux v1.7.4 github.com/hashicorp/go-msgpack v0.5.5 // indirect - github.com/hashicorp/golang-lru v0.5.3 // indirect + github.com/hashicorp/golang-lru v0.5.4 github.com/hashicorp/memberlist v0.1.4 // indirect github.com/hashicorp/serf v0.8.2 github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf @@ -27,9 +27,11 @@ require ( github.com/phayes/freeport v0.0.0-20171002185219-e27662a4a9d6 github.com/prometheus/client_golang v0.9.2 github.com/sirupsen/logrus v1.4.2 + github.com/spf13/afero v1.4.1 github.com/spf13/cast v1.3.0 github.com/spf13/cobra v0.0.3 github.com/spf13/pflag v1.0.3 // indirect + github.com/stretchr/testify v1.4.0 github.com/volatiletech/null v8.0.0+incompatible go.uber.org/atomic v1.5.1 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 @@ -37,6 +39,7 @@ require ( golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4 // indirect google.golang.org/appengine v1.6.2 // indirect + gotest.tools v2.2.0+incompatible ) go 1.15 diff --git a/go.sum b/go.sum index 4e9282a..01d1580 100644 --- a/go.sum +++ b/go.sum @@ -152,6 +152,8 @@ github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.3 h1:YPkqC67at8FYaadspW/6uE0COsBxS2656RLEr8Bppgk= github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= +github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/hcl v0.0.0-20180404174102-ef8a98b0bbce h1:xdsDDbiBDQTKASoGEZ+pEmF1OnWuu8AQ9I8iNbHNeno= github.com/hashicorp/hcl v0.0.0-20180404174102-ef8a98b0bbce/go.mod h1:oZtUIOe8dh44I2q6ScRibXws4Ajl+d+nod3AaR9vL5w= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= @@ -192,6 +194,7 @@ github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -280,6 +283,7 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/profile v1.3.0/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= +github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= @@ -346,6 +350,8 @@ github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:Udh github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA= github.com/spf13/afero v1.1.1 h1:Lt3ihYMlE+lreX1GS4Qw4ZsNpYQLxIXKBTEOXm3nt6I= github.com/spf13/afero v1.1.1/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/afero v1.4.1 h1:asw9sl74539yqavKaglDM5hFpdJVK0Y5Dr/JOgQ89nQ= +github.com/spf13/afero v1.4.1/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I= github.com/spf13/cast v1.2.0/go.mod h1:r2rcYCSwa1IExKTDiTfzaxqT2FNHs8hODu4LnUfgKEg= github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= @@ -394,6 +400,7 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20190313024323-a1f597ede03a/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -471,6 +478,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190921001708-c4c64cad1fd0 h1:xQwXv67TxFo9nC1GJFyab5eq/5B590r6RlnL/G8Sz7w= diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 4d70cd1..17ce4e0 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -92,6 +92,12 @@ const ( ) var ( + ErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: ns, + Name: "error_total", + Help: "Total number of errors", + }, []string{labelDirection, labelErrorType}) + BlobDownloadCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: ns, Name: "blob_download_total", @@ -107,6 +113,7 @@ var ( Name: "http3_blob_download_total", Help: "Total number of blobs downloaded from reflector through QUIC protocol", }) + CacheHitCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: ns, Name: "cache_hit_total", @@ -128,6 +135,7 @@ var ( Name: "cache_waiting_requests_total", Help: "How many cache requests are waiting for an in-flight origin request", }) + BlobUploadCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: ns, Name: "blob_upload_total", @@ -138,16 +146,13 @@ var ( Name: "sdblob_upload_total", Help: "Total number of SD blobs (and therefore streams) uploaded to reflector", }) + RetrieverSpeed = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: ns, Name: "speed_mbps", Help: "Speed of blob retrieval", }, []string{MtrLabelSource}) - ErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: ns, - Name: "error_total", - Help: "Total number of errors", - }, []string{labelDirection, labelErrorType}) + MtrInBytesTcp = promauto.NewCounter(prometheus.CounterOpts{ Namespace: ns, Name: "tcp_in_bytes", diff --git a/store/disk.go b/store/disk.go index 82e40ac..5c4d2ff 100644 --- a/store/disk.go +++ b/store/disk.go @@ -5,32 +5,40 @@ import ( "os" "path" "path/filepath" - "sort" - "syscall" - "time" "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" + "github.com/spf13/afero" - log "github.com/sirupsen/logrus" + lru "github.com/hashicorp/golang-lru" ) // DiskBlobStore stores blobs on a local disk type DiskBlobStore struct { // the location of blobs on disk blobDir string + // max number of blobs to store + maxBlobs int // store files in subdirectories based on the first N chars in the filename. 0 = don't create subdirectories. prefixLength int - initialized bool - lastChecked time.Time - diskCleanupBusy chan bool + // lru cache + lru *lru.Cache + // filesystem abstraction + fs afero.Fs + + // true if initOnce ran, false otherwise + initialized bool } // NewDiskBlobStore returns an initialized file disk store pointer. -func NewDiskBlobStore(dir string, prefixLength int) *DiskBlobStore { - dbs := DiskBlobStore{blobDir: dir, prefixLength: prefixLength, diskCleanupBusy: make(chan bool, 1)} - dbs.diskCleanupBusy <- true +func NewDiskBlobStore(dir string, maxBlobs, prefixLength int) *DiskBlobStore { + dbs := DiskBlobStore{ + blobDir: dir, + maxBlobs: maxBlobs, + prefixLength: prefixLength, + fs: afero.NewOsFs(), + } return &dbs } @@ -41,27 +49,12 @@ func (d *DiskBlobStore) dir(hash string) string { return path.Join(d.blobDir, hash[:d.prefixLength]) } -// GetUsedSpace returns a value between 0 and 1, with 0 being completely empty and 1 being full, for the disk that holds the provided path -func (d *DiskBlobStore) getUsedSpace() (float32, error) { - var stat syscall.Statfs_t - err := syscall.Statfs(d.blobDir, &stat) - if err != nil { - return 0, err - } - // Available blocks * size per block = available space in bytes - all := stat.Blocks * uint64(stat.Bsize) - free := stat.Bfree * uint64(stat.Bsize) - used := all - free - - return float32(used) / float32(all), nil -} - func (d *DiskBlobStore) path(hash string) string { return path.Join(d.dir(hash), hash) } func (d *DiskBlobStore) ensureDirExists(dir string) error { - return errors.Err(os.MkdirAll(dir, 0755)) + return errors.Err(d.fs.MkdirAll(dir, 0755)) } func (d *DiskBlobStore) initOnce() error { @@ -74,6 +67,19 @@ func (d *DiskBlobStore) initOnce() error { return err } + l, err := lru.NewWithEvict(d.maxBlobs, func(key interface{}, value interface{}) { + _ = d.fs.Remove(d.path(key.(string))) // TODO: log this error. may happen if file is gone but cache entry still there? + }) + if err != nil { + return errors.Err(err) + } + d.lru = l + + err = d.loadExisting() + if err != nil { + return err + } + d.initialized = true return nil } @@ -85,14 +91,7 @@ func (d *DiskBlobStore) Has(hash string) (bool, error) { return false, err } - _, err = os.Stat(d.path(hash)) - if err != nil { - if os.IsNotExist(err) { - return false, nil - } - return false, err - } - return true, nil + return d.lru.Contains(hash), nil } // Get returns the blob or an error if the blob doesn't exist. @@ -102,9 +101,15 @@ func (d *DiskBlobStore) Get(hash string) (stream.Blob, error) { return nil, err } - file, err := os.Open(d.path(hash)) + _, has := d.lru.Get(hash) + if !has { + return nil, errors.Err(ErrBlobNotFound) + } + + file, err := d.fs.Open(d.path(hash)) if err != nil { if os.IsNotExist(err) { + d.lru.Remove(hash) return nil, errors.Err(ErrBlobNotFound) } return nil, err @@ -126,7 +131,14 @@ func (d *DiskBlobStore) Put(hash string, blob stream.Blob) error { return err } - return ioutil.WriteFile(d.path(hash), blob, 0644) + err = afero.WriteFile(d.fs, d.path(hash), blob, 0644) + if err != nil { + return errors.Err(err) + } + + d.lru.Add(hash, true) + + return nil } // PutSD stores the sd blob on the disk @@ -141,83 +153,30 @@ func (d *DiskBlobStore) Delete(hash string) error { return err } - has, err := d.Has(hash) + d.lru.Remove(hash) + return nil +} + +// loadExisting scans the blobDir and imports existing blobs into lru cache +func (d *DiskBlobStore) loadExisting() error { + dirs, err := afero.ReadDir(d.fs, d.blobDir) if err != nil { return err } - if !has { - return nil - } - return os.Remove(d.path(hash)) -} - -func (d *DiskBlobStore) ensureDiskSpace() { - defer func() { - d.lastChecked = time.Now() - d.diskCleanupBusy <- true - }() - - used, err := d.getUsedSpace() - if err != nil { - log.Errorln(err.Error()) - return - } - log.Infof("disk usage: %.2f%%\n", used*100) - if used > 0.90 { - log.Infoln("over 0.90, cleaning up") - err = d.WipeOldestBlobs() - if err != nil { - log.Errorln(err.Error()) - return - } - log.Infoln("Done cleaning up") - } -} - -func (d *DiskBlobStore) WipeOldestBlobs() (err error) { - dirs, err := ioutil.ReadDir(d.blobDir) - if err != nil { - return err - } - type datedFile struct { - Atime time.Time - File *os.FileInfo - FullPath string - } - datedFiles := make([]datedFile, 0, 5000) for _, dir := range dirs { if dir.IsDir() { - files, err := ioutil.ReadDir(filepath.Join(d.blobDir, dir.Name())) + files, err := afero.ReadDir(d.fs, filepath.Join(d.blobDir, dir.Name())) if err != nil { return err } for _, file := range files { if file.Mode().IsRegular() && !file.IsDir() { - datedFiles = append(datedFiles, datedFile{ - Atime: atime(file), - File: &file, - FullPath: filepath.Join(d.blobDir, dir.Name(), file.Name()), - }) + d.lru.Add(file.Name(), true) } } } } - sort.Slice(datedFiles, func(i, j int) bool { - return datedFiles[i].Atime.Before(datedFiles[j].Atime) - }) - //delete the first 50000 blobs - for i, df := range datedFiles { - if i >= 50000 { - break - } - log.Infoln(df.FullPath) - log.Infoln(df.Atime.String()) - err = os.Remove(df.FullPath) - if err != nil { - return err - } - } return nil } diff --git a/store/disk_test.go b/store/disk_test.go new file mode 100644 index 0000000..d4f6a80 --- /dev/null +++ b/store/disk_test.go @@ -0,0 +1,109 @@ +package store + +import ( + "os" + "reflect" + "testing" + + "github.com/lbryio/lbry.go/v2/extras/errors" + + "github.com/spf13/afero" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const cacheMaxBlobs = 3 + +func memDiskStore() *DiskBlobStore { + d := NewDiskBlobStore("/", cacheMaxBlobs, 2) + d.fs = afero.NewMemMapFs() + return d +} + +func countOnDisk(t *testing.T, fs afero.Fs) int { + t.Helper() + count := 0 + afero.Walk(fs, "/", func(path string, info os.FileInfo, err error) error { + if err != nil { + t.Fatal(err) + } + if !info.IsDir() { + count++ + } + return nil + }) + return count +} + +func TestDiskBlobStore_LRU(t *testing.T) { + d := memDiskStore() + b := []byte("x") + err := d.Put("one", b) + require.NoError(t, err) + err = d.Put("two", b) + require.NoError(t, err) + err = d.Put("three", b) + require.NoError(t, err) + err = d.Put("four", b) + require.NoError(t, err) + err = d.Put("five", b) + require.NoError(t, err) + + assert.Equal(t, cacheMaxBlobs, countOnDisk(t, d.fs)) + + for k, v := range map[string]bool{ + "one": false, + "two": false, + "three": true, + "four": true, + "five": true, + "six": false, + } { + has, err := d.Has(k) + assert.NoError(t, err) + assert.Equal(t, v, has) + } + + d.Get("three") // touch so it stays in cache + d.Put("six", b) + + assert.Equal(t, cacheMaxBlobs, countOnDisk(t, d.fs)) + + for k, v := range map[string]bool{ + "one": false, + "two": false, + "three": true, + "four": false, + "five": true, + "six": true, + } { + has, err := d.Has(k) + assert.NoError(t, err) + assert.Equal(t, v, has) + } + + err = d.Delete("three") + assert.NoError(t, err) + err = d.Delete("five") + assert.NoError(t, err) + err = d.Delete("six") + assert.NoError(t, err) + assert.Equal(t, 0, countOnDisk(t, d.fs)) +} + +func TestDiskBlobStore_FileMissingOnDisk(t *testing.T) { + d := memDiskStore() + hash := "hash" + b := []byte("this is a blob of stuff") + err := d.Put(hash, b) + require.NoError(t, err) + + err = d.fs.Remove("/ha/hash") + require.NoError(t, err) + + blob, err := d.Get(hash) + assert.Nil(t, blob) + assert.True(t, errors.Is(err, ErrBlobNotFound), "expected (%s) %s, got (%s) %s", + reflect.TypeOf(ErrBlobNotFound).String(), ErrBlobNotFound.Error(), + reflect.TypeOf(err).String(), err.Error()) +}