diff --git a/store/disk.go b/store/disk.go index 008897b..4ac2a0f 100644 --- a/store/disk.go +++ b/store/disk.go @@ -4,10 +4,10 @@ import ( "io/ioutil" "os" "path" - "path/filepath" "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" + "github.com/lbryio/reflector.go/store/speedwalk" ) // DiskStore stores blobs on a local disk @@ -118,28 +118,7 @@ func (d *DiskStore) list() ([]string, error) { return nil, err } - dirs, err := ioutil.ReadDir(d.blobDir) - if err != nil { - return nil, err - } - - var existing []string - - for _, dir := range dirs { - if dir.IsDir() { - files, err := ioutil.ReadDir(filepath.Join(d.blobDir, dir.Name())) - if err != nil { - return nil, err - } - for _, file := range files { - if file.Mode().IsRegular() && !file.IsDir() { - existing = append(existing, file.Name()) - } - } - } - } - - return existing, nil + return speedwalk.AllFiles(d.blobDir, true) } func (d *DiskStore) dir(hash string) string { diff --git a/store/memory.go b/store/memory.go index f46a0f2..f462a8d 100644 --- a/store/memory.go +++ b/store/memory.go @@ -1,19 +1,22 @@ package store import ( + "sync" + "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" ) // MemStore is an in memory only blob store with no persistence. -// MemStore is NOT THREAD-SAFE type MemStore struct { blobs map[string]stream.Blob + mu *sync.RWMutex } func NewMemStore() *MemStore { return &MemStore{ blobs: make(map[string]stream.Blob), + mu: &sync.RWMutex{}, } } @@ -24,12 +27,16 @@ func (m *MemStore) Name() string { return nameMem } // Has returns T/F if the blob is currently stored. It will never error. func (m *MemStore) Has(hash string) (bool, error) { + m.mu.RLock() + defer m.mu.RUnlock() _, ok := m.blobs[hash] return ok, nil } // Get returns the blob byte slice if present and errors if the blob is not found. func (m *MemStore) Get(hash string) (stream.Blob, error) { + m.mu.RLock() + defer m.mu.RUnlock() blob, ok := m.blobs[hash] if !ok { return nil, errors.Err(ErrBlobNotFound) @@ -39,6 +46,8 @@ func (m *MemStore) Get(hash string) (stream.Blob, error) { // Put stores the blob in memory func (m *MemStore) Put(hash string, blob stream.Blob) error { + m.mu.Lock() + defer m.mu.Unlock() m.blobs[hash] = blob return nil } @@ -50,11 +59,15 @@ func (m *MemStore) PutSD(hash string, blob stream.Blob) error { // Delete deletes the blob from the store func (m *MemStore) Delete(hash string) error { + m.mu.Lock() + defer m.mu.Unlock() delete(m.blobs, hash) return nil } // Debug returns the blobs in memory. It's useful for testing and debugging. func (m *MemStore) Debug() map[string]stream.Blob { + m.mu.RLock() + defer m.mu.RUnlock() return m.blobs } diff --git a/store/speedwalk/speedwalk.go b/store/speedwalk/speedwalk.go index 1c94ed1..e2563ba 100644 --- a/store/speedwalk/speedwalk.go +++ b/store/speedwalk/speedwalk.go @@ -13,7 +13,7 @@ import ( ) // AllFiles recursively lists every file in every subdirectory of a given directory -// If basename is true, retun the basename of each file. Otherwise return the full path starting at startDir. +// If basename is true, return the basename of each file. Otherwise return the full path starting at startDir. func AllFiles(startDir string, basename bool) ([]string, error) { items, err := ioutil.ReadDir(startDir) if err != nil { @@ -22,7 +22,10 @@ func AllFiles(startDir string, basename bool) ([]string, error) { pathChan := make(chan string) paths := make([]string, 0, 1000) + pathWG := &sync.WaitGroup{} + pathWG.Add(1) go func() { + defer pathWG.Done() for { path, ok := <-pathChan if !ok { @@ -32,13 +35,13 @@ func AllFiles(startDir string, basename bool) ([]string, error) { } }() - wg := &sync.WaitGroup{} maxThreads := runtime.NumCPU() - 1 goroutineLimiter := make(chan struct{}, maxThreads) for i := 0; i < maxThreads; i++ { goroutineLimiter <- struct{}{} } + walkerWG := &sync.WaitGroup{} for _, item := range items { if !item.IsDir() { if basename { @@ -50,11 +53,11 @@ func AllFiles(startDir string, basename bool) ([]string, error) { } <-goroutineLimiter - wg.Add(1) + walkerWG.Add(1) go func(dir string) { defer func() { - wg.Done() + walkerWG.Done() goroutineLimiter <- struct{}{} }() @@ -65,7 +68,7 @@ func AllFiles(startDir string, basename bool) ([]string, error) { if basename { pathChan <- de.Name() } else { - pathChan <- filepath.Join(startDir, osPathname) + pathChan <- osPathname } } return nil @@ -77,8 +80,10 @@ func AllFiles(startDir string, basename bool) ([]string, error) { }(item.Name()) } - wg.Wait() + walkerWG.Wait() close(pathChan) + pathWG.Wait() + return paths, nil }