use speedwalk for faster file listing

This commit is contained in:
Alex Grintsvayg 2020-11-02 14:48:56 -05:00
parent aaae3ffa5b
commit 659a6e73cc
No known key found for this signature in database
GPG key ID: AEB3F089F86A22B5
3 changed files with 27 additions and 30 deletions

View file

@ -4,10 +4,10 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"path" "path"
"path/filepath"
"github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/store/speedwalk"
) )
// DiskStore stores blobs on a local disk // DiskStore stores blobs on a local disk
@ -118,28 +118,7 @@ func (d *DiskStore) list() ([]string, error) {
return nil, err return nil, err
} }
dirs, err := ioutil.ReadDir(d.blobDir) return speedwalk.AllFiles(d.blobDir, true)
if err != nil {
return nil, err
}
var existing []string
for _, dir := range dirs {
if dir.IsDir() {
files, err := ioutil.ReadDir(filepath.Join(d.blobDir, dir.Name()))
if err != nil {
return nil, err
}
for _, file := range files {
if file.Mode().IsRegular() && !file.IsDir() {
existing = append(existing, file.Name())
}
}
}
}
return existing, nil
} }
func (d *DiskStore) dir(hash string) string { func (d *DiskStore) dir(hash string) string {

View file

@ -1,19 +1,22 @@
package store package store
import ( import (
"sync"
"github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/lbry.go/v2/stream"
) )
// MemStore is an in memory only blob store with no persistence. // MemStore is an in memory only blob store with no persistence.
// MemStore is NOT THREAD-SAFE
type MemStore struct { type MemStore struct {
blobs map[string]stream.Blob blobs map[string]stream.Blob
mu *sync.RWMutex
} }
func NewMemStore() *MemStore { func NewMemStore() *MemStore {
return &MemStore{ return &MemStore{
blobs: make(map[string]stream.Blob), blobs: make(map[string]stream.Blob),
mu: &sync.RWMutex{},
} }
} }
@ -24,12 +27,16 @@ func (m *MemStore) Name() string { return nameMem }
// Has returns T/F if the blob is currently stored. It will never error. // Has returns T/F if the blob is currently stored. It will never error.
func (m *MemStore) Has(hash string) (bool, error) { func (m *MemStore) Has(hash string) (bool, error) {
m.mu.RLock()
defer m.mu.RUnlock()
_, ok := m.blobs[hash] _, ok := m.blobs[hash]
return ok, nil return ok, nil
} }
// Get returns the blob byte slice if present and errors if the blob is not found. // Get returns the blob byte slice if present and errors if the blob is not found.
func (m *MemStore) Get(hash string) (stream.Blob, error) { func (m *MemStore) Get(hash string) (stream.Blob, error) {
m.mu.RLock()
defer m.mu.RUnlock()
blob, ok := m.blobs[hash] blob, ok := m.blobs[hash]
if !ok { if !ok {
return nil, errors.Err(ErrBlobNotFound) return nil, errors.Err(ErrBlobNotFound)
@ -39,6 +46,8 @@ func (m *MemStore) Get(hash string) (stream.Blob, error) {
// Put stores the blob in memory // Put stores the blob in memory
func (m *MemStore) Put(hash string, blob stream.Blob) error { func (m *MemStore) Put(hash string, blob stream.Blob) error {
m.mu.Lock()
defer m.mu.Unlock()
m.blobs[hash] = blob m.blobs[hash] = blob
return nil return nil
} }
@ -50,11 +59,15 @@ func (m *MemStore) PutSD(hash string, blob stream.Blob) error {
// Delete deletes the blob from the store // Delete deletes the blob from the store
func (m *MemStore) Delete(hash string) error { func (m *MemStore) Delete(hash string) error {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.blobs, hash) delete(m.blobs, hash)
return nil return nil
} }
// Debug returns the blobs in memory. It's useful for testing and debugging. // Debug returns the blobs in memory. It's useful for testing and debugging.
func (m *MemStore) Debug() map[string]stream.Blob { func (m *MemStore) Debug() map[string]stream.Blob {
m.mu.RLock()
defer m.mu.RUnlock()
return m.blobs return m.blobs
} }

View file

@ -13,7 +13,7 @@ import (
) )
// AllFiles recursively lists every file in every subdirectory of a given directory // AllFiles recursively lists every file in every subdirectory of a given directory
// If basename is true, retun the basename of each file. Otherwise return the full path starting at startDir. // If basename is true, return the basename of each file. Otherwise return the full path starting at startDir.
func AllFiles(startDir string, basename bool) ([]string, error) { func AllFiles(startDir string, basename bool) ([]string, error) {
items, err := ioutil.ReadDir(startDir) items, err := ioutil.ReadDir(startDir)
if err != nil { if err != nil {
@ -22,7 +22,10 @@ func AllFiles(startDir string, basename bool) ([]string, error) {
pathChan := make(chan string) pathChan := make(chan string)
paths := make([]string, 0, 1000) paths := make([]string, 0, 1000)
pathWG := &sync.WaitGroup{}
pathWG.Add(1)
go func() { go func() {
defer pathWG.Done()
for { for {
path, ok := <-pathChan path, ok := <-pathChan
if !ok { if !ok {
@ -32,13 +35,13 @@ func AllFiles(startDir string, basename bool) ([]string, error) {
} }
}() }()
wg := &sync.WaitGroup{}
maxThreads := runtime.NumCPU() - 1 maxThreads := runtime.NumCPU() - 1
goroutineLimiter := make(chan struct{}, maxThreads) goroutineLimiter := make(chan struct{}, maxThreads)
for i := 0; i < maxThreads; i++ { for i := 0; i < maxThreads; i++ {
goroutineLimiter <- struct{}{} goroutineLimiter <- struct{}{}
} }
walkerWG := &sync.WaitGroup{}
for _, item := range items { for _, item := range items {
if !item.IsDir() { if !item.IsDir() {
if basename { if basename {
@ -50,11 +53,11 @@ func AllFiles(startDir string, basename bool) ([]string, error) {
} }
<-goroutineLimiter <-goroutineLimiter
wg.Add(1) walkerWG.Add(1)
go func(dir string) { go func(dir string) {
defer func() { defer func() {
wg.Done() walkerWG.Done()
goroutineLimiter <- struct{}{} goroutineLimiter <- struct{}{}
}() }()
@ -65,7 +68,7 @@ func AllFiles(startDir string, basename bool) ([]string, error) {
if basename { if basename {
pathChan <- de.Name() pathChan <- de.Name()
} else { } else {
pathChan <- filepath.Join(startDir, osPathname) pathChan <- osPathname
} }
} }
return nil return nil
@ -77,8 +80,10 @@ func AllFiles(startDir string, basename bool) ([]string, error) {
}(item.Name()) }(item.Name())
} }
wg.Wait() walkerWG.Wait()
close(pathChan) close(pathChan)
pathWG.Wait()
return paths, nil return paths, nil
} }