Refactors store implementations for config loading

Updates store implementations to load configurations from YAML files.

This change introduces a configuration loader that reads store parameters from a YAML file, allowing for more flexible and manageable store configurations.
This commit is contained in:
Niko Storni 2025-04-24 05:11:52 +02:00
parent 185900582d
commit 071a7907c1
37 changed files with 1309 additions and 358 deletions

59
cmd/blobcache.go Normal file
View file

@ -0,0 +1,59 @@
package cmd
import (
"os"
"os/signal"
"strconv"
"syscall"
"github.com/lbryio/reflector.go/config"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/server/http"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
var ()
func init() {
var cmd = &cobra.Command{
Use: "blobcache",
Short: "Run blobcache server",
Run: blobcacheCmd,
}
cmd.Flags().IntVar(&tcpPeerPort, "tcp-peer-port", 5567, "The port reflector will distribute content from for the TCP (LBRY) protocol")
cmd.Flags().IntVar(&http3PeerPort, "http3-peer-port", 5568, "The port reflector will distribute content from over HTTP3 protocol")
cmd.Flags().IntVar(&httpPeerPort, "http-peer-port", 5569, "The port reflector will distribute content from over HTTP protocol")
cmd.Flags().IntVar(&receiverPort, "receiver-port", 5566, "The port reflector will receive content from")
cmd.Flags().IntVar(&metricsPort, "metrics-port", 2112, "The port reflector will use for prometheus metrics")
cmd.Flags().BoolVar(&disableBlocklist, "disable-blocklist", false, "Disable blocklist watching/updating")
cmd.Flags().IntVar(&requestQueueSize, "request-queue-size", 200, "How many concurrent requests from downstream should be handled at once (the rest will wait)")
cmd.Flags().StringVar(&upstreamEdgeToken, "upstream-edge-token", "", "token used to retrieve/authenticate protected content")
rootCmd.AddCommand(cmd)
}
func blobcacheCmd(cmd *cobra.Command, args []string) {
stores, err := config.LoadStores("blobcache.yaml")
if err != nil {
log.Fatal(err)
}
store := stores["caching"]
defer store.Shutdown()
httpServer := http.NewServer(store, requestQueueSize, upstreamEdgeToken)
err = httpServer.Start(":" + strconv.Itoa(httpPeerPort))
if err != nil {
log.Fatal(err)
}
defer httpServer.Shutdown()
metricsServer := metrics.NewServer(":"+strconv.Itoa(metricsPort), "/metrics")
metricsServer.Start()
defer metricsServer.Shutdown()
interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
<-interruptChan
}

View file

@ -5,7 +5,6 @@ import (
"os"
"time"
"github.com/lbryio/reflector.go/server/peer"
"github.com/lbryio/reflector.go/store"
"github.com/lbryio/lbry.go/v2/stream"
@ -28,11 +27,19 @@ func getStreamCmd(cmd *cobra.Command, args []string) {
addr := args[0]
sdHash := args[1]
s := store.NewCachingStore(
"getstream",
peer.NewStore(peer.StoreOpts{Address: addr}),
store.NewDiskStore("/tmp/lbry_downloaded_blobs", 2),
)
s := store.NewCachingStore(store.CachingParams{
Name: "getstream",
Cache: store.NewPeerStore(store.PeerParams{
Name: "getstream",
Address: addr,
Timeout: 30 * time.Second,
}),
Origin: store.NewDiskStore(store.DiskParams{
Name: "getstream",
MountPoint: "/tmp/lbry_downloaded_blobs",
ShardingSize: 2,
}),
})
wd, err := os.Getwd()
if err != nil {

View file

@ -29,7 +29,14 @@ func init() {
func peerCmd(cmd *cobra.Command, args []string) {
var err error
s3 := store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName, globalConfig.S3Endpoint)
s3 := store.NewS3Store(store.S3Params{
Name: "peer",
AwsID: globalConfig.AwsID,
AwsSecret: globalConfig.AwsSecret,
Region: globalConfig.BucketRegion,
Bucket: globalConfig.BucketName,
Endpoint: globalConfig.S3Endpoint,
})
peerServer := peer.NewServer(s3)
if !peerNoDB {
@ -39,7 +46,13 @@ func peerCmd(cmd *cobra.Command, args []string) {
err = db.Connect(globalConfig.DBConn)
checkErr(err)
combo := store.NewDBBackedStore(s3, db, false, nil)
combo := store.NewDBBackedStore(store.DBBackedParams{
Name: "peer",
Store: s3,
DB: db,
DeleteOnMiss: false,
MaxSize: nil,
})
peerServer = peer.NewServer(combo)
}

View file

@ -32,9 +32,9 @@ func populateDbCmd(cmd *cobra.Command, args []string) {
log.Fatal("store-path must be defined")
}
localDb := &db.SQL{
SoftDelete: true,
TrackAccess: db.TrackAccessBlobs,
LogQueries: log.GetLevel() == log.DebugLevel,
SoftDelete: true,
TrackingLevel: db.TrackAccessBlobs,
LogQueries: log.GetLevel() == log.DebugLevel,
}
err := localDb.Connect("reflector:reflector@tcp(localhost:3306)/reflector")
if err != nil {

View file

@ -155,33 +155,63 @@ func initUpstreamStore() store.BlobStore {
}
switch upstreamProtocol {
case "tcp":
s = peer.NewStore(peer.StoreOpts{
s = store.NewPeerStore(store.PeerParams{
Name: "reflector",
Address: upstreamReflector,
Timeout: 30 * time.Second,
})
case "http3":
s = http3.NewStore(http3.StoreOpts{
s = store.NewHttp3Store(store.Http3Params{
Name: "reflector",
Address: upstreamReflector,
Timeout: 30 * time.Second,
})
case "http":
s = store.NewUpstreamStore(upstreamReflector, upstreamEdgeToken)
s = store.NewUpstreamStore(store.UpstreamParams{
Name: "reflector",
Upstream: upstreamReflector,
EdgeToken: upstreamEdgeToken,
})
default:
log.Fatalf("protocol is not recognized: %s", upstreamProtocol)
}
return s
}
func initEdgeStore() store.BlobStore {
var s3Store *store.S3Store
var s store.BlobStore
if conf != "none" {
s3Store = store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName, globalConfig.S3Endpoint)
s3Store = store.NewS3Store(store.S3Params{
Name: "reflector",
AwsID: globalConfig.AwsID,
AwsSecret: globalConfig.AwsSecret,
Region: globalConfig.BucketRegion,
Bucket: globalConfig.BucketName,
Endpoint: globalConfig.S3Endpoint,
})
}
if originEndpointFallback != "" && originEndpoint != "" {
ittt := store.NewITTTStore(store.NewHttpStore(originEndpoint, 0), store.NewHttpStore(originEndpointFallback, 0))
ittt := store.NewITTTStore(store.ITTTParams{
Name: "reflector",
This: store.NewHttpStore(store.HttpParams{
Name: "owns3",
Endpoint: originEndpoint,
PrefixLength: 0,
}),
That: store.NewHttpStore(store.HttpParams{
Name: "wasabi",
Endpoint: originEndpointFallback,
PrefixLength: 0,
}),
})
if s3Store != nil {
s = store.NewProxiedS3Store(ittt, s3Store)
s = store.NewProxiedS3Store(store.ProxiedS3Params{
Name: "reflector",
Proxied: ittt,
S3: s3Store,
})
} else {
s = ittt
}
@ -196,14 +226,20 @@ func initEdgeStore() store.BlobStore {
func initDBStore(s store.BlobStore) store.BlobStore {
if useDB {
dbInst := &db.SQL{
TrackAccess: db.TrackAccessStreams,
LogQueries: log.GetLevel() == log.DebugLevel,
TrackingLevel: db.TrackAccessStreams,
LogQueries: log.GetLevel() == log.DebugLevel,
}
err := dbInst.Connect(globalConfig.DBConn)
if err != nil {
log.Fatal(err)
}
s = store.NewDBBackedStore(s, dbInst, false, nil)
s = store.NewDBBackedStore(store.DBBackedParams{
Name: "global",
Store: s,
DB: dbInst,
DeleteOnMiss: false,
MaxSize: nil,
})
}
return s
}
@ -222,11 +258,16 @@ func initCaches(s store.BlobStore) store.BlobStore {
diskStore := initDiskStore(s, diskCache)
finalStore := initDiskStore(diskStore, secondaryDiskCache)
if memCache > 0 {
finalStore = store.NewCachingStore(
"reflector",
finalStore,
store.NewGcacheStore("mem", store.NewMemStore(), memCache, store.LRU),
)
finalStore = store.NewCachingStore(store.CachingParams{
Name: "reflector",
Origin: finalStore,
Cache: store.NewGcacheStore(store.GcacheParams{
Name: "volatile-cache",
Store: store.NewMemStore(store.MemParams{Name: "volatile-cache"}),
MaxSize: memCache,
Strategy: store.LRU,
}),
})
}
return finalStore
}
@ -244,29 +285,44 @@ func initDiskStore(upstreamStore store.BlobStore, diskParams string) store.BlobS
log.Fatal(err)
}
diskStore := store.NewDiskStore(diskCachePath, 2)
diskStore := store.NewDiskStore(store.DiskParams{
Name: "big-drive",
MountPoint: diskCachePath,
ShardingSize: 2,
})
var unwrappedStore store.BlobStore
if cacheManager == "localdb" {
localDb := &db.SQL{
SoftDelete: true,
TrackAccess: db.TrackAccessBlobs,
LogQueries: log.GetLevel() == log.DebugLevel,
SoftDelete: true,
TrackingLevel: db.TrackAccessBlobs,
LogQueries: log.GetLevel() == log.DebugLevel,
}
err = localDb.Connect("reflector:reflector@tcp(localhost:3306)/reflector")
if err != nil {
log.Fatal(err)
}
unwrappedStore = store.NewDBBackedStore(diskStore, localDb, true, &realCacheSize)
unwrappedStore = store.NewDBBackedStore(store.DBBackedParams{
Name: "local",
Store: diskStore,
DB: localDb,
DeleteOnMiss: true,
MaxSize: &realCacheSize,
})
} else {
unwrappedStore = store.NewGcacheStore("nvme", store.NewDiskStore(diskCachePath, 2), realCacheSize, cacheMangerToGcache[cacheManager])
unwrappedStore = store.NewGcacheStore(store.GcacheParams{
Name: "flash",
Store: store.NewDiskStore(store.DiskParams{Name: "flash", MountPoint: diskCachePath, ShardingSize: 2}),
MaxSize: realCacheSize,
Strategy: cacheMangerToGcache[cacheManager],
})
}
wrapped := store.NewCachingStore(
"reflector",
upstreamStore,
unwrappedStore,
)
wrapped := store.NewCachingStore(store.CachingParams{
Name: "reflector",
Origin: upstreamStore,
Cache: unwrappedStore,
})
return wrapped
}

View file

@ -58,8 +58,15 @@ func startCmd(cmd *cobra.Command, args []string) {
}
err := db.Connect(globalConfig.DBConn)
checkErr(err)
s3 := store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName, globalConfig.S3Endpoint)
comboStore := store.NewDBBackedStore(s3, db, false, nil)
s3 := store.NewS3Store(store.S3Params{
Name: "prism",
AwsID: globalConfig.AwsID,
AwsSecret: globalConfig.AwsSecret,
Region: globalConfig.BucketRegion,
Bucket: globalConfig.BucketName,
Endpoint: globalConfig.S3Endpoint,
})
comboStore := store.NewDBBackedStore(store.DBBackedParams{Name: "global", Store: s3, DB: db, DeleteOnMiss: false, MaxSize: nil})
conf := prism.DefaultConf()

View file

@ -29,7 +29,7 @@ func init() {
func testCmd(cmd *cobra.Command, args []string) {
log.Printf("reflector %s", meta.VersionString())
memStore := store.NewMemStore()
memStore := store.NewMemStore(store.MemParams{Name: "test"})
reflectorServer := reflector.NewServer(memStore, memStore)
reflectorServer.Timeout = 3 * time.Minute

View file

@ -37,9 +37,17 @@ func uploadCmd(cmd *cobra.Command, args []string) {
err := db.Connect(globalConfig.DBConn)
checkErr(err)
st := store.NewDBBackedStore(
store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName, globalConfig.S3Endpoint),
db, false, nil)
st := store.NewDBBackedStore(store.DBBackedParams{
Name: "global",
Store: store.NewS3Store(store.S3Params{
Name: "owns3",
AwsID: globalConfig.AwsID,
AwsSecret: globalConfig.AwsSecret,
Region: globalConfig.BucketRegion,
Bucket: globalConfig.BucketName,
Endpoint: globalConfig.S3Endpoint,
}),
})
uploader := reflector.NewUploader(db, st, uploadWorkers, uploadSkipExistsCheck, uploadDeleteBlobsAfterUpload)

30
config/loader.go Normal file
View file

@ -0,0 +1,30 @@
package config
import (
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/reflector.go/store"
"github.com/spf13/viper"
)
func LoadStores(configFile string) (map[string]store.BlobStore, error) {
v := viper.New()
v.SetConfigFile(configFile)
if err := v.ReadInConfig(); err != nil {
return nil, err
}
stores := make(map[string]store.BlobStore)
for storeType := range v.AllSettings() {
factory, exists := store.Factories[storeType]
if !exists {
return nil, errors.Err("unknown store type: %s", storeType)
}
storeConfig := v.Sub(storeType)
s, err := factory(storeConfig)
if err != nil {
return nil, errors.Err(err)
}
stores[storeType] = s
}
return stores, nil
}

View file

@ -35,12 +35,12 @@ type SdBlob struct {
StreamHash string `json:"stream_hash"`
}
type trackAccess int
type AccessTrackingLevel int
const (
TrackAccessNone trackAccess = iota // Don't track accesses
TrackAccessStreams // Track accesses at the stream level
TrackAccessBlobs // Track accesses at the blob level
TrackAccessNone AccessTrackingLevel = iota // Don't track accesses
TrackAccessStreams // Track accesses at the stream level
TrackAccessBlobs // Track accesses at the blob level
)
// SQL implements the DB interface
@ -48,7 +48,7 @@ type SQL struct {
conn *sql.DB
// Track the approx last time a blob or stream was accessed
TrackAccess trackAccess
TrackingLevel AccessTrackingLevel
// Instead of deleting a blob, marked it as not stored in the db
SoftDelete bool
@ -179,7 +179,7 @@ func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error)
q string
args []interface{}
)
if s.TrackAccess == TrackAccessBlobs {
if s.TrackingLevel == TrackAccessBlobs {
args = []interface{}{hash, isStored, length, time.Now()}
q = "INSERT INTO blob_ (hash, is_stored, length, last_accessed_at) VALUES (" + qt.Qs(len(args)) + ") ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored)), last_accessed_at = VALUES(last_accessed_at)"
} else {
@ -201,7 +201,7 @@ func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error)
return 0, errors.Err("blob ID is 0 even after INSERTing and SELECTing")
}
if s.TrackAccess == TrackAccessBlobs {
if s.TrackingLevel == TrackAccessBlobs {
err := s.touchBlobs([]uint64{uint64(blobID)})
if err != nil {
return 0, errors.Err(err)
@ -218,7 +218,7 @@ func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) {
args []interface{}
)
if s.TrackAccess == TrackAccessStreams {
if s.TrackingLevel == TrackAccessStreams {
args = []interface{}{hash, sdBlobID, time.Now()}
q = "INSERT IGNORE INTO stream (hash, sd_blob_id, last_accessed_at) VALUES (" + qt.Qs(len(args)) + ")"
} else {
@ -240,7 +240,7 @@ func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) {
return 0, errors.Err("stream ID is 0 even after INSERTing and SELECTing")
}
if s.TrackAccess == TrackAccessStreams {
if s.TrackingLevel == TrackAccessStreams {
err := s.touchStreams([]uint64{uint64(streamID)})
if err != nil {
return 0, errors.Err(err)
@ -264,9 +264,9 @@ func (s *SQL) HasBlobs(hashes []string, touch bool) (map[string]bool, error) {
exists, idsNeedingTouch, err := s.hasBlobs(hashes)
if touch {
if s.TrackAccess == TrackAccessBlobs {
if s.TrackingLevel == TrackAccessBlobs {
_ = s.touchBlobs(idsNeedingTouch)
} else if s.TrackAccess == TrackAccessStreams {
} else if s.TrackingLevel == TrackAccessStreams {
_ = s.touchStreams(idsNeedingTouch)
}
}
@ -338,11 +338,11 @@ func (s *SQL) hasBlobs(hashes []string) (map[string]bool, []uint64, error) {
batch := hashes[doneIndex:sliceEnd]
var query string
if s.TrackAccess == TrackAccessBlobs {
if s.TrackingLevel == TrackAccessBlobs {
query = `SELECT b.hash, b.id, NULL, b.last_accessed_at
FROM blob_ b
WHERE b.is_stored = 1 and b.hash IN (` + qt.Qs(len(batch)) + `)`
} else if s.TrackAccess == TrackAccessStreams {
} else if s.TrackingLevel == TrackAccessStreams {
query = `SELECT b.hash, b.id, s.id, s.last_accessed_at
FROM blob_ b
LEFT JOIN stream_blob sb ON b.id = sb.blob_id
@ -377,9 +377,9 @@ WHERE b.is_stored = 1 and b.hash IN (` + qt.Qs(len(batch)) + `)`
}
exists[hash] = true
if !lastAccessedAt.Valid || lastAccessedAt.Time.Before(touchDeadline) {
if s.TrackAccess == TrackAccessBlobs {
if s.TrackingLevel == TrackAccessBlobs {
needsTouch = append(needsTouch, blobID)
} else if s.TrackAccess == TrackAccessStreams && !streamID.IsZero() {
} else if s.TrackingLevel == TrackAccessStreams && !streamID.IsZero() {
needsTouch = append(needsTouch, streamID.Uint64)
}
}
@ -424,7 +424,7 @@ func (s *SQL) LeastRecentlyAccessedHashes(maxBlobs int) ([]string, error) {
return nil, errors.Err("not connected")
}
if s.TrackAccess != TrackAccessBlobs {
if s.TrackingLevel != TrackAccessBlobs {
return nil, errors.Err("blob access tracking is disabled")
}

2
go.mod
View file

@ -34,6 +34,7 @@ require (
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cast v1.7.1
github.com/spf13/cobra v1.8.1
github.com/spf13/viper v1.19.0
github.com/stretchr/testify v1.10.0
github.com/volatiletech/null/v8 v8.1.2
go.uber.org/atomic v1.11.0
@ -106,7 +107,6 @@ require (
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.12.0 // indirect
github.com/spf13/pflag v1.0.6 // indirect
github.com/spf13/viper v1.19.0 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect

View file

@ -23,7 +23,7 @@ func startServerOnRandomPort(t *testing.T) (*Server, int) {
t.Fatal(err)
}
srv := NewServer(store.NewMemStore(), store.NewMemStore())
srv := NewServer(store.NewMemStore(store.MemParams{Name: "test"}), store.NewMemStore(store.MemParams{Name: "test"}))
err = srv.Start("127.0.0.1:" + strconv.Itoa(port))
if err != nil {
t.Fatal(err)
@ -120,7 +120,7 @@ func TestServer_Timeout(t *testing.T) {
t.Fatal(err)
}
srv := NewServer(store.NewMemStore(), store.NewMemStore())
srv := NewServer(store.NewMemStore(store.MemParams{Name: "test"}), store.NewMemStore(store.MemParams{Name: "test"}))
srv.Timeout = testTimeout
err = srv.Start("127.0.0.1:" + strconv.Itoa(port))
if err != nil {
@ -182,7 +182,7 @@ func TestServer_PartialUpload(t *testing.T) {
missing[i] = bits.Rand().String()
}
st := store.BlobStore(&mockPartialStore{MemStore: store.NewMemStore(), missing: missing})
st := store.BlobStore(&mockPartialStore{MemStore: store.NewMemStore(store.MemParams{Name: "test"}), missing: missing})
if _, ok := st.(neededBlobChecker); !ok {
t.Fatal("mock does not implement the relevant interface")
}

View file

@ -24,7 +24,7 @@ import (
type Client struct {
Timeout time.Duration
conn *http.Client
roundTripper *http3.RoundTripper
roundTripper *http3.Transport
ServerAddr string
}

View file

@ -1,117 +0,0 @@
package http3
import (
"crypto/tls"
"crypto/x509"
"net/http"
"strings"
"sync"
"time"
"github.com/lbryio/reflector.go/shared"
"github.com/lbryio/reflector.go/store"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/quic-go/quic-go"
"github.com/quic-go/quic-go/http3"
)
// Store is a blob store that gets blobs from a peer.
// It satisfies the store.BlobStore interface but cannot put or delete blobs.
type Store struct {
opts StoreOpts
NotFoundCache *sync.Map
}
// StoreOpts allows to set options for a new Store.
type StoreOpts struct {
Address string
Timeout time.Duration
}
// NewStore makes a new peer store.
func NewStore(opts StoreOpts) *Store {
return &Store{opts: opts, NotFoundCache: &sync.Map{}}
}
func (p *Store) getClient() (*Client, error) {
var qconf quic.Config
window500M := 500 * 1 << 20
qconf.MaxStreamReceiveWindow = uint64(window500M)
qconf.MaxConnectionReceiveWindow = uint64(window500M)
qconf.EnableDatagrams = true
qconf.HandshakeIdleTimeout = 4 * time.Second
qconf.MaxIdleTimeout = 20 * time.Second
pool, err := x509.SystemCertPool()
if err != nil {
return nil, err
}
roundTripper := &http3.Transport{
TLSClientConfig: &tls.Config{
RootCAs: pool,
InsecureSkipVerify: true,
},
QUICConfig: &qconf,
}
connection := &http.Client{
Transport: roundTripper,
}
c := &Client{
conn: connection,
roundTripper: roundTripper,
ServerAddr: p.opts.Address,
}
return c, errors.Prefix("connection error", err)
}
func (p *Store) Name() string { return "http3" }
// Has asks the peer if they have a hash
func (p *Store) Has(hash string) (bool, error) {
c, err := p.getClient()
if err != nil {
return false, err
}
defer func() { _ = c.Close() }()
return c.HasBlob(hash)
}
// Get downloads the blob from the peer
func (p *Store) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now()
if lastChecked, ok := p.NotFoundCache.Load(hash); ok {
if lastChecked.(time.Time).After(time.Now().Add(-5 * time.Minute)) {
return nil, shared.NewBlobTrace(time.Since(start), p.Name()+"-notfoundcache"), store.ErrBlobNotFound
}
}
c, err := p.getClient()
if err != nil && strings.Contains(err.Error(), "blob not found") {
p.NotFoundCache.Store(hash, time.Now())
}
if err != nil {
return nil, shared.NewBlobTrace(time.Since(start), p.Name()), err
}
defer func() { _ = c.Close() }()
return c.GetBlob(hash)
}
// Put is not supported
func (p *Store) Put(hash string, blob stream.Blob) error {
return errors.Err(shared.ErrNotImplemented)
}
// PutSD is not supported
func (p *Store) PutSD(hash string, blob stream.Blob) error {
return errors.Err(shared.ErrNotImplemented)
}
// Delete is not supported
func (p *Store) Delete(hash string) error {
return errors.Err(shared.ErrNotImplemented)
}
// Shutdown is not supported
func (p *Store) Shutdown() {
}

View file

@ -37,7 +37,9 @@ var availabilityRequests = []pair{
}
func getServer(t *testing.T, withBlobs bool) *Server {
st := store.NewMemStore()
st := store.NewMemStore(store.MemParams{
Name: "test",
})
if withBlobs {
for k, v := range blobs {
err := st.Put(k, v)

View file

@ -1,82 +0,0 @@
package peer
import (
"strings"
"time"
"github.com/lbryio/reflector.go/shared"
"github.com/lbryio/reflector.go/store"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
)
// Store is a blob store that gets blobs from a peer.
// It satisfies the store.BlobStore interface but cannot put or delete blobs.
type Store struct {
opts StoreOpts
}
// StoreOpts allows to set options for a new Store.
type StoreOpts struct {
Address string
Timeout time.Duration
}
// NewStore makes a new peer store.
func NewStore(opts StoreOpts) *Store {
return &Store{opts: opts}
}
func (p *Store) getClient() (*Client, error) {
c := &Client{Timeout: p.opts.Timeout}
err := c.Connect(p.opts.Address)
return c, errors.Prefix("connection error", err)
}
func (p *Store) Name() string { return "peer" }
// Has asks the peer if they have a hash
func (p *Store) Has(hash string) (bool, error) {
c, err := p.getClient()
if err != nil {
return false, err
}
defer func() { _ = c.Close() }()
return c.HasBlob(hash)
}
// Get downloads the blob from the peer
func (p *Store) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now()
c, err := p.getClient()
if err != nil {
return nil, shared.NewBlobTrace(time.Since(start), p.Name()), err
}
defer func() { _ = c.Close() }()
blob, trace, err := c.GetBlob(hash)
if err != nil && strings.Contains(err.Error(), "blob not found") {
return nil, trace, store.ErrBlobNotFound
}
return blob, trace, err
}
// Put is not supported
func (p *Store) Put(hash string, blob stream.Blob) error {
return errors.Err(shared.ErrNotImplemented)
}
// PutSD is not supported
func (p *Store) PutSD(hash string, blob stream.Blob) error {
return errors.Err(shared.ErrNotImplemented)
}
// Delete is not supported
func (p *Store) Delete(hash string) error {
return errors.Err(shared.ErrNotImplemented)
}
// Shutdown is not supported
func (p *Store) Shutdown() {
}

View file

@ -1,6 +1,7 @@
package store
import (
"strings"
"time"
"github.com/lbryio/reflector.go/internal/metrics"
@ -10,6 +11,7 @@ import (
"github.com/lbryio/lbry.go/v2/stream"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
)
// CachingStore combines two stores, typically a local and a remote store, to improve performance.
@ -17,20 +19,77 @@ import (
// are retrieved from the origin and cached. Puts are cached and also forwarded to the origin.
type CachingStore struct {
origin, cache BlobStore
component string
name string
}
type CachingParams struct {
Name string `mapstructure:"name"`
Origin BlobStore `mapstructure:"origin"`
Cache BlobStore `mapstructure:"cache"`
}
type CachingConfig struct {
Name string `mapstructure:"name"`
Origin *viper.Viper
Cache *viper.Viper
}
// NewCachingStore makes a new caching disk store and returns a pointer to it.
func NewCachingStore(component string, origin, cache BlobStore) *CachingStore {
func NewCachingStore(params CachingParams) *CachingStore {
return &CachingStore{
component: component,
origin: WithSingleFlight(component, origin),
cache: WithSingleFlight(component, cache),
name: params.Name,
origin: WithSingleFlight(params.Name, params.Origin),
cache: WithSingleFlight(params.Name, params.Cache),
}
}
const nameCaching = "caching"
func CachingStoreFactory(config *viper.Viper) (BlobStore, error) {
var cfg CachingConfig
err := config.Unmarshal(&cfg)
if err != nil {
return nil, errors.Err(err)
}
cfg.Cache = config.Sub("cache")
cfg.Origin = config.Sub("origin")
if cfg.Cache == nil || cfg.Origin == nil {
return nil, errors.Err("cache and origin missing")
}
originStoreType := strings.Split(cfg.Origin.AllKeys()[0], ".")[0]
originStoreConfig := cfg.Origin.Sub(originStoreType)
factory, ok := Factories[originStoreType]
if !ok {
return nil, errors.Err("unknown store type %s", originStoreType)
}
originStore, err := factory(originStoreConfig)
if err != nil {
return nil, errors.Err(err)
}
cacheStoreType := strings.Split(cfg.Cache.AllKeys()[0], ".")[0]
cacheStoreConfig := cfg.Cache.Sub(cacheStoreType)
factory, ok = Factories[cacheStoreType]
if !ok {
return nil, errors.Err("unknown store type %s", cacheStoreType)
}
cacheStore, err := factory(cacheStoreConfig)
if err != nil {
return nil, errors.Err(err)
}
return NewCachingStore(CachingParams{
Name: cfg.Name,
Origin: originStore,
Cache: cacheStore,
}), nil
}
func init() {
RegisterStore(nameCaching, CachingStoreFactory)
}
// Name is the cache type name
func (c *CachingStore) Name() string { return nameCaching }
@ -49,17 +108,17 @@ func (c *CachingStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now()
blob, trace, err := c.cache.Get(hash)
if err == nil || !errors.Is(err, ErrBlobNotFound) {
metrics.CacheHitCount.With(metrics.CacheLabels(c.cache.Name(), c.component)).Inc()
metrics.CacheHitCount.With(metrics.CacheLabels(c.cache.Name(), c.name)).Inc()
rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds()
metrics.CacheRetrievalSpeed.With(map[string]string{
metrics.LabelCacheType: c.cache.Name(),
metrics.LabelComponent: c.component,
metrics.LabelComponent: c.name,
metrics.LabelSource: "cache",
}).Set(rate)
return blob, trace.Stack(time.Since(start), c.Name()), err
}
metrics.CacheMissCount.With(metrics.CacheLabels(c.cache.Name(), c.component)).Inc()
metrics.CacheMissCount.With(metrics.CacheLabels(c.cache.Name(), c.name)).Inc()
blob, trace, err = c.origin.Get(hash)
if err != nil {

View file

@ -13,9 +13,9 @@ import (
)
func TestCachingStore_Put(t *testing.T) {
origin := NewMemStore()
cache := NewMemStore()
s := NewCachingStore("test", origin, cache)
origin := NewMemStore(MemParams{Name: "test"})
cache := NewMemStore(MemParams{Name: "test"})
s := NewCachingStore(CachingParams{Name: "test", Origin: origin, Cache: cache})
b := []byte("this is a blob of stuff")
hash := "hash"
@ -43,9 +43,9 @@ func TestCachingStore_Put(t *testing.T) {
}
func TestCachingStore_CacheMiss(t *testing.T) {
origin := NewMemStore()
cache := NewMemStore()
s := NewCachingStore("test", origin, cache)
origin := NewMemStore(MemParams{Name: "test"})
cache := NewMemStore(MemParams{Name: "test"})
s := NewCachingStore(CachingParams{Name: "test", Origin: origin, Cache: cache})
b := []byte("this is a blob of stuff")
hash := "hash"
@ -85,8 +85,8 @@ func TestCachingStore_CacheMiss(t *testing.T) {
func TestCachingStore_ThunderingHerd(t *testing.T) {
storeDelay := 100 * time.Millisecond
origin := NewSlowBlobStore(storeDelay)
cache := NewMemStore()
s := NewCachingStore("test", origin, cache)
cache := NewMemStore(MemParams{Name: "test"})
s := NewCachingStore(CachingParams{Name: "test", Origin: origin, Cache: cache})
b := []byte("this is a blob of stuff")
hash := "hash"
@ -141,7 +141,7 @@ type SlowBlobStore struct {
func NewSlowBlobStore(delay time.Duration) *SlowBlobStore {
return &SlowBlobStore{
mem: NewMemStore(),
mem: NewMemStore(MemParams{Name: "test"}),
delay: delay,
}
}

View file

@ -2,9 +2,12 @@ package store
import (
"encoding/json"
"fmt"
"strings"
"sync"
"time"
"github.com/c2h5oh/datasize"
"github.com/lbryio/reflector.go/db"
"github.com/lbryio/reflector.go/shared"
@ -13,6 +16,7 @@ import (
"github.com/lbryio/lbry.go/v2/stream"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
)
// DBBackedStore is a store that's backed by a DB. The DB contains data about what's in the store.
@ -24,27 +28,53 @@ type DBBackedStore struct {
deleteOnMiss bool
maxSize int
cleanerStop *stop.Group
name string
}
type DBBackedParams struct {
Name string `mapstructure:"name"`
Store BlobStore `mapstructure:"store"`
DB *db.SQL `mapstructure:"db"`
DeleteOnMiss bool `mapstructure:"delete_on_miss"`
MaxSize *int `mapstructure:"max_size"`
}
type DBBackedConfig struct {
Name string `mapstructure:"name"`
Store *viper.Viper
User string `mapstructure:"user"`
Password string `mapstructure:"password"`
Host string `mapstructure:"host"`
Port int `mapstructure:"port"`
Database string `mapstructure:"database"`
DeleteOnMiss bool `mapstructure:"delete_on_miss"`
AccessTracking int `mapstructure:"access_tracking"`
SoftDeletes bool `mapstructure:"soft_deletes"`
LogQueries bool `mapstructure:"log_queries"`
HasCap bool `mapstructure:"has_cap"`
MaxSize string `mapstructure:"max_size"`
}
// NewDBBackedStore returns an initialized store pointer.
func NewDBBackedStore(blobs BlobStore, db *db.SQL, deleteOnMiss bool, maxSize *int) *DBBackedStore {
func NewDBBackedStore(params DBBackedParams) *DBBackedStore {
store := &DBBackedStore{
blobs: blobs,
db: db,
deleteOnMiss: deleteOnMiss,
blobs: params.Store,
db: params.DB,
deleteOnMiss: params.DeleteOnMiss,
cleanerStop: stop.New(),
name: params.Name,
}
if maxSize != nil {
store.maxSize = *maxSize
if params.MaxSize != nil {
store.maxSize = *params.MaxSize
go store.cleanOldestBlobs()
}
return store
}
const nameDBBacked = "db-backed"
const nameDBBacked = "db_backed"
// Name is the cache type name
func (d *DBBackedStore) Name() string { return nameDBBacked }
func (d *DBBackedStore) Name() string { return nameDBBacked + "-" + d.name }
// Has returns true if the blob is in the store
func (d *DBBackedStore) Has(hash string) (bool, error) {
@ -267,3 +297,56 @@ func (d *DBBackedStore) Shutdown() {
d.cleanerStop.Stop()
d.blobs.Shutdown()
}
func DBBackedStoreFactory(config *viper.Viper) (BlobStore, error) {
var cfg DBBackedConfig
err := config.Unmarshal(&cfg)
if err != nil {
return nil, errors.Err(err)
}
cfg.Store = config.Sub("store")
storeType := strings.Split(cfg.Store.AllKeys()[0], ".")[0]
storeConfig := cfg.Store.Sub(storeType)
factory, ok := Factories[storeType]
if !ok {
return nil, errors.Err("unknown store type %s", storeType)
}
underlyingStore, err := factory(storeConfig)
if err != nil {
return nil, errors.Err(err)
}
parsedDb := &db.SQL{
TrackingLevel: db.AccessTrackingLevel(cfg.AccessTracking),
SoftDelete: cfg.SoftDeletes,
LogQueries: cfg.LogQueries || log.GetLevel() == log.DebugLevel,
}
err = parsedDb.Connect(fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.Database))
if err != nil {
return nil, err
}
params := DBBackedParams{
Name: cfg.Name,
Store: underlyingStore,
DB: parsedDb,
DeleteOnMiss: cfg.DeleteOnMiss,
}
if cfg.HasCap {
var parsedSize datasize.ByteSize
err = parsedSize.UnmarshalText([]byte(cfg.MaxSize))
if err != nil {
return nil, errors.Err(err)
}
maxSize := int(parsedSize)
params.MaxSize = &maxSize
}
return NewDBBackedStore(params), nil
}
func init() {
RegisterStore(nameDBBacked, DBBackedStoreFactory)
}

View file

@ -7,6 +7,7 @@ import (
"github.com/lbryio/reflector.go/shared"
"github.com/lbryio/reflector.go/store/speedwalk"
"github.com/spf13/viper"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
@ -14,27 +15,47 @@ import (
// DiskStore stores blobs on a local disk
type DiskStore struct {
// the location of blobs on disk
blobDir string
// store files in subdirectories based on the first N chars in the filename. 0 = don't create subdirectories.
prefixLength int
name string
// true if initOnce ran, false otherwise
initialized bool
}
type DiskParams struct {
Name string `mapstructure:"name"`
MountPoint string `mapstructure:"mount_point"`
ShardingSize int `mapstructure:"sharding_size"`
}
// NewDiskStore returns an initialized file disk store pointer.
func NewDiskStore(dir string, prefixLength int) *DiskStore {
func NewDiskStore(params DiskParams) *DiskStore {
return &DiskStore{
blobDir: dir,
prefixLength: prefixLength,
blobDir: params.MountPoint,
prefixLength: params.ShardingSize,
name: params.Name,
}
}
const nameDisk = "disk"
func DiskStoreFactory(config *viper.Viper) (BlobStore, error) {
var cfg DiskParams
err := config.Unmarshal(&cfg)
if err != nil {
return nil, errors.Err(err)
}
return NewDiskStore(cfg), nil
}
func init() {
RegisterStore(nameDisk, DiskStoreFactory)
}
// Name is the cache type name
func (d *DiskStore) Name() string { return nameDisk }
func (d *DiskStore) Name() string { return nameDisk + "-" + d.name }
// Has returns T/F or Error if it the blob stored already. It will error with any IO disk error.
func (d *DiskStore) Has(hash string) (bool, error) {

View file

@ -16,7 +16,11 @@ func TestDiskStore_Get(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "reflector_test_*")
require.NoError(t, err)
defer func() { _ = os.RemoveAll(tmpDir) }()
d := NewDiskStore(tmpDir, 2)
d := NewDiskStore(DiskParams{
Name: "test",
Dir: tmpDir,
PrefixLength: 2,
})
hash := "f428b8265d65dad7f8ffa52922bba836404cbd62f3ecfe10adba6b444f8f658938e54f5981ac4de39644d5b93d89a94b"
data := []byte("oyuntyausntoyaunpdoyruoyduanrstjwfjyuwf")
@ -36,7 +40,11 @@ func TestDiskStore_GetNonexistentBlob(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "reflector_test_*")
require.NoError(t, err)
defer func() { _ = os.RemoveAll(tmpDir) }()
d := NewDiskStore(tmpDir, 2)
d := NewDiskStore(DiskParams{
Name: "test",
Dir: tmpDir,
PrefixLength: 2,
})
blob, _, err := d.Get("nonexistent")
assert.Nil(t, blob)

View file

@ -1,6 +1,7 @@
package store
import (
"strings"
"time"
"github.com/lbryio/reflector.go/internal/metrics"
@ -11,15 +12,16 @@ import (
"github.com/bluele/gcache"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
)
// GcacheStore adds a max cache size and Greedy-Dual-Size-Frequency cache eviction strategy to a BlobStore
type GcacheStore struct {
// underlying store
store BlobStore
// cache implementation
cache gcache.Cache
underlyingStore BlobStore
cache gcache.Cache
name string
}
type EvictionStrategy int
const (
@ -33,16 +35,30 @@ const (
SIMPLE
)
type GcacheParams struct {
Name string `mapstructure:"name"`
Store BlobStore `mapstructure:"store"`
MaxSize int `mapstructure:"max_size"`
Strategy EvictionStrategy `mapstructure:"strategy"`
}
type GcacheConfig struct {
Name string `mapstructure:"name"`
Store *viper.Viper
MaxSize int `mapstructure:"max_size"`
Strategy EvictionStrategy `mapstructure:"strategy"`
}
// NewGcacheStore initialize a new LRUStore
func NewGcacheStore(component string, store BlobStore, maxSize int, strategy EvictionStrategy) *GcacheStore {
cacheBuilder := gcache.New(maxSize)
func NewGcacheStore(params GcacheParams) *GcacheStore {
cacheBuilder := gcache.New(params.MaxSize)
var cache gcache.Cache
evictFunc := func(key interface{}, value interface{}) {
logrus.Infof("evicting %s", key)
metrics.CacheLRUEvictCount.With(metrics.CacheLabels(store.Name(), component)).Inc()
_ = store.Delete(key.(string)) // TODO: log this error. may happen if underlying entry is gone but cache entry still there
metrics.CacheLRUEvictCount.With(metrics.CacheLabels(params.Store.Name(), params.Name)).Inc()
_ = params.Store.Delete(key.(string)) // TODO: log this error. may happen if underlying entry is gone but cache entry still there
}
switch strategy {
switch params.Strategy {
case LFU:
cache = cacheBuilder.LFU().EvictedFunc(evictFunc).Build()
case ARC:
@ -51,15 +67,15 @@ func NewGcacheStore(component string, store BlobStore, maxSize int, strategy Evi
cache = cacheBuilder.LRU().EvictedFunc(evictFunc).Build()
case SIMPLE:
cache = cacheBuilder.Simple().EvictedFunc(evictFunc).Build()
}
l := &GcacheStore{
store: store,
cache: cache,
underlyingStore: params.Store,
cache: cache,
name: params.Name,
}
go func() {
if lstr, ok := store.(lister); ok {
err := l.loadExisting(lstr, maxSize)
if lstr, ok := params.Store.(lister); ok {
err := l.loadExisting(lstr, params.MaxSize)
if err != nil {
panic(err) // TODO: what should happen here? panic? return nil? just keep going?
}
@ -71,8 +87,40 @@ func NewGcacheStore(component string, store BlobStore, maxSize int, strategy Evi
const nameGcache = "gcache"
func GcacheStoreFactory(config *viper.Viper) (BlobStore, error) {
var cfg GcacheConfig
err := config.Unmarshal(&cfg)
if err != nil {
return nil, errors.Err(err)
}
cfg.Store = config.Sub("store")
storeType := strings.Split(cfg.Store.AllKeys()[0], ".")[0]
storeConfig := cfg.Store.Sub(storeType)
factory, ok := Factories[storeType]
if !ok {
return nil, errors.Err("unknown store type %s", storeType)
}
underlyingStore, err := factory(storeConfig)
if err != nil {
return nil, errors.Err(err)
}
return NewGcacheStore(GcacheParams{
Name: cfg.Name,
Store: underlyingStore,
MaxSize: cfg.MaxSize,
Strategy: cfg.Strategy,
}), nil
}
func init() {
RegisterStore(nameGcache, GcacheStoreFactory)
}
// Name is the cache type name
func (l *GcacheStore) Name() string { return nameGcache }
func (l *GcacheStore) Name() string { return nameGcache + "-" + l.name }
// Has returns whether the blob is in the store, without updating the recent-ness.
func (l *GcacheStore) Has(hash string) (bool, error) {
@ -86,7 +134,7 @@ func (l *GcacheStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
if err != nil {
return nil, shared.NewBlobTrace(time.Since(start), l.Name()), errors.Err(ErrBlobNotFound)
}
blob, stack, err := l.store.Get(hash)
blob, stack, err := l.underlyingStore.Get(hash)
if errors.Is(err, ErrBlobNotFound) {
// Blob disappeared from underlying store
l.cache.Remove(hash)
@ -99,7 +147,7 @@ func (l *GcacheStore) Put(hash string, blob stream.Blob) error {
_ = l.cache.Set(hash, true)
has, _ := l.Has(hash)
if has {
err := l.store.Put(hash, blob)
err := l.underlyingStore.Put(hash, blob)
if err != nil {
return err
}
@ -112,7 +160,7 @@ func (l *GcacheStore) PutSD(hash string, blob stream.Blob) error {
_ = l.cache.Set(hash, true)
has, _ := l.Has(hash)
if has {
err := l.store.PutSD(hash, blob)
err := l.underlyingStore.PutSD(hash, blob)
if err != nil {
return err
}
@ -122,7 +170,7 @@ func (l *GcacheStore) PutSD(hash string, blob stream.Blob) error {
// Delete deletes the blob from the store
func (l *GcacheStore) Delete(hash string) error {
err := l.store.Delete(hash)
err := l.underlyingStore.Delete(hash)
if err != nil {
return err
}

View file

@ -16,8 +16,8 @@ import (
const cacheMaxSize = 3
func getTestGcacheStore() (*GcacheStore, *MemStore) {
m := NewMemStore()
return NewGcacheStore("test", m, cacheMaxSize, LFU), m
m := NewMemStore(MemParams{Name: "test"})
return NewGcacheStore(GcacheParams{Name: "test", Store: m, MaxSize: cacheMaxSize, Strategy: LFU}), m
}
func TestGcacheStore_Eviction(t *testing.T) {
@ -90,7 +90,11 @@ func TestGcacheStore_loadExisting(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "reflector_test_*")
require.NoError(t, err)
defer func() { _ = os.RemoveAll(tmpDir) }()
d := NewDiskStore(tmpDir, 2)
d := NewDiskStore(DiskParams{
Name: "test",
Dir: tmpDir,
PrefixLength: 2,
})
hash := "hash"
b := []byte("this is a blob of stuff")
@ -102,8 +106,8 @@ func TestGcacheStore_loadExisting(t *testing.T) {
require.Equal(t, 1, len(existing), "blob should exist in cache")
assert.Equal(t, hash, existing[0])
lfu := NewGcacheStore("test", d, 3, LFU) // lru should load existing blobs when it's created
time.Sleep(100 * time.Millisecond) // async load so let's wait...
lfu := NewGcacheStore(GcacheParams{Name: "test", Store: d, MaxSize: 3, Strategy: LFU}) // lru should load existing blobs when it's created
time.Sleep(100 * time.Millisecond) // async load so let's wait...
has, err := lfu.Has(hash)
require.NoError(t, err)
assert.True(t, has, "hash should be loaded from disk store but it's not")

View file

@ -15,6 +15,7 @@ import (
"github.com/lbryio/lbry.go/v2/stream"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
)
// HttpStore reads from an HTTP endpoint that simply expects the hash to be appended to the endpoint
@ -22,21 +23,38 @@ type HttpStore struct {
endpoint string
httpClient *http.Client
prefixLength int
name string
}
type HttpParams struct {
Name string `mapstructure:"name"`
Endpoint string `mapstructure:"endpoint"`
PrefixLength int `mapstructure:"prefix_length"`
}
// NewHttpStore returns an initialized HttpStore store pointer.
func NewHttpStore(endpoint string, prefixLength int) *HttpStore {
func NewHttpStore(params HttpParams) *HttpStore {
return &HttpStore{
endpoint: endpoint,
endpoint: params.Endpoint,
httpClient: getClient(),
prefixLength: prefixLength,
prefixLength: params.PrefixLength,
name: params.Name,
}
}
const nameHttp = "http"
func HttpStoreFactory(config *viper.Viper) (BlobStore, error) {
var cfg HttpParams
err := config.Unmarshal(&cfg)
if err != nil {
return nil, errors.Err(err)
}
return NewHttpStore(cfg), nil
}
// Name is the cache type name
func (c *HttpStore) Name() string { return nameHttp }
func (c *HttpStore) Name() string { return nameHttp + "-" + c.name }
// Has checks if the hash is in the store.
func (c *HttpStore) Has(hash string) (bool, error) {
@ -60,7 +78,7 @@ func (c *HttpStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
log.Debugf("Getting %s from HTTP(s) source", hash[:8])
start := time.Now()
defer func(t time.Time) {
log.Warnf("Getting %s from HTTP(s) source took %s", hash[:8], time.Since(t).String())
log.Debugf("Getting %s from HTTP(s) source took %s", hash[:8], time.Since(t).String())
}(start)
url := c.endpoint + c.shardedPath(hash)
@ -166,3 +184,7 @@ func (c *HttpStore) shardedPath(hash string) string {
}
return path.Join(hash[:c.prefixLength], hash)
}
func init() {
RegisterStore(nameHttp, HttpStoreFactory)
}

135
store/http3.go Normal file
View file

@ -0,0 +1,135 @@
package store
import (
"strings"
"sync"
"time"
"github.com/lbryio/reflector.go/shared"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/spf13/viper"
)
// Http3Store is a blob store that gets blobs from a peer over HTTP3.
// It satisfies the BlobStore interface but cannot put or delete blobs.
type Http3Store struct {
NotFoundCache *sync.Map
name string
address string
timeout time.Duration
client *Http3Client
clientMu sync.RWMutex
}
// Http3Params allows to set options for a new Http3Store.
type Http3Params struct {
Name string `mapstructure:"name"`
Address string `mapstructure:"address"`
Timeout time.Duration `mapstructure:"timeout"`
}
// NewHttp3Store makes a new HTTP3 store.
func NewHttp3Store(params Http3Params) *Http3Store {
return &Http3Store{
name: params.Name,
NotFoundCache: &sync.Map{},
address: params.Address,
timeout: params.Timeout,
}
}
const nameHttp3 = "http3"
func Http3StoreFactory(config *viper.Viper) (BlobStore, error) {
var cfg Http3Params
err := config.Unmarshal(&cfg)
if err != nil {
return nil, errors.Err(err)
}
return NewHttp3Store(cfg), nil
}
func init() {
RegisterStore(nameHttp3, Http3StoreFactory)
}
func (h *Http3Store) Name() string { return nameHttp3 + "-" + h.name }
func (h *Http3Store) getClient() (*Http3Client, error) {
h.clientMu.RLock()
if h.client != nil {
client := h.client
h.clientMu.RUnlock()
return client, nil
}
h.clientMu.RUnlock()
h.clientMu.Lock()
defer h.clientMu.Unlock()
// Check again in case another goroutine created the client
if h.client != nil {
return h.client, nil
}
client, err := NewHttp3Client(h.address)
if err != nil {
return nil, err
}
h.client = client
return client, nil
}
// Has asks the peer if they have a hash
func (h *Http3Store) Has(hash string) (bool, error) {
c, err := h.getClient()
if err != nil {
return false, err
}
return c.HasBlob(hash)
}
// Get downloads the blob from the peer
func (h *Http3Store) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now()
if lastChecked, ok := h.NotFoundCache.Load(hash); ok {
if lastChecked.(time.Time).After(time.Now().Add(-5 * time.Minute)) {
return nil, shared.NewBlobTrace(time.Since(start), h.Name()+"-notfoundcache"), ErrBlobNotFound
}
}
c, err := h.getClient()
if err != nil && strings.Contains(err.Error(), "blob not found") {
h.NotFoundCache.Store(hash, time.Now())
}
if err != nil {
return nil, shared.NewBlobTrace(time.Since(start), h.Name()), err
}
return c.GetBlob(hash)
}
// Put is not supported
func (h *Http3Store) Put(hash string, blob stream.Blob) error {
return errors.Err(shared.ErrNotImplemented)
}
// PutSD is not supported
func (h *Http3Store) PutSD(hash string, blob stream.Blob) error {
return errors.Err(shared.ErrNotImplemented)
}
// Delete is not supported
func (h *Http3Store) Delete(hash string) error {
return errors.Err(shared.ErrNotImplemented)
}
// Shutdown shuts down the store gracefully
func (h *Http3Store) Shutdown() {
h.clientMu.Lock()
defer h.clientMu.Unlock()
if h.client != nil {
_ = h.client.Close()
h.client = nil
}
}

137
store/http3_client.go Normal file
View file

@ -0,0 +1,137 @@
package store
import (
"crypto/tls"
"crypto/x509"
"io"
"net/http"
"time"
"github.com/lbryio/reflector.go/shared"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/quic-go/quic-go"
"github.com/quic-go/quic-go/http3"
)
// Http3Client is a client for HTTP3 blob store
type Http3Client struct {
conn *http.Client
roundTripper *http3.Transport
ServerAddr string
}
// NewHttp3Client creates a new HTTP3 client
func NewHttp3Client(address string) (*Http3Client, error) {
var qconf quic.Config
window500M := 500 * 1 << 20
qconf.MaxStreamReceiveWindow = uint64(window500M)
qconf.MaxConnectionReceiveWindow = uint64(window500M)
qconf.EnableDatagrams = true
qconf.HandshakeIdleTimeout = 4 * time.Second
qconf.MaxIdleTimeout = 20 * time.Second
pool, err := x509.SystemCertPool()
if err != nil {
return nil, err
}
roundTripper := &http3.Transport{
TLSClientConfig: &tls.Config{
RootCAs: pool,
InsecureSkipVerify: true,
},
QUICConfig: &qconf,
}
connection := &http.Client{
Transport: roundTripper,
}
return &Http3Client{
conn: connection,
roundTripper: roundTripper,
ServerAddr: address,
}, nil
}
// Close closes the client
func (c *Http3Client) Close() error {
return nil
}
// HasBlob checks if the peer has a blob
func (c *Http3Client) HasBlob(hash string) (bool, error) {
url := c.ServerAddr + "/blob?hash=" + hash
req, err := http.NewRequest("HEAD", url, nil)
if err != nil {
return false, errors.Err(err)
}
res, err := c.conn.Do(req)
if err != nil {
return false, errors.Err(err)
}
defer func() { _ = res.Body.Close() }()
if res.StatusCode == http.StatusNotFound {
return false, nil
}
if res.StatusCode == http.StatusNoContent {
return true, nil
}
var body []byte
if res.Body != nil {
body, _ = io.ReadAll(res.Body)
}
return false, errors.Err("upstream error. Status code: %d (%s)", res.StatusCode, string(body))
}
// GetBlob gets a blob from the peer
func (c *Http3Client) GetBlob(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now()
url := c.ServerAddr + "/blob?hash=" + hash
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, shared.NewBlobTrace(time.Since(start), "http3"), errors.Err(err)
}
res, err := c.conn.Do(req)
if err != nil {
return nil, shared.NewBlobTrace(time.Since(start), "http3"), errors.Err(err)
}
defer func() { _ = res.Body.Close() }()
viaHeader := res.Header.Get("Via")
var trace shared.BlobTrace
if viaHeader != "" {
parsedTrace, err := shared.Deserialize(viaHeader)
if err != nil {
return nil, shared.NewBlobTrace(time.Since(start), "http3"), err
}
trace = *parsedTrace
} else {
trace = shared.NewBlobTrace(0, "http3")
}
switch res.StatusCode {
case http.StatusNotFound:
return nil, trace.Stack(time.Since(start), "http3"), ErrBlobNotFound
case http.StatusOK:
buffer := getBuffer()
defer putBuffer(buffer)
if _, err := io.Copy(buffer, res.Body); err != nil {
return nil, trace.Stack(time.Since(start), "http3"), errors.Err(err)
}
blob := make([]byte, buffer.Len())
copy(blob, buffer.Bytes())
return blob, trace.Stack(time.Since(start), "http3"), nil
default:
body, _ := io.ReadAll(res.Body)
return nil, trace.Stack(time.Since(start), "http3"),
errors.Err("upstream error. Status code: %d (%s)", res.StatusCode, string(body))
}
}

View file

@ -1,6 +1,7 @@
package store
import (
"strings"
"time"
"github.com/lbryio/reflector.go/internal/metrics"
@ -8,25 +9,83 @@ import (
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/spf13/viper"
)
// ITTTStore performs an operation on this storage, if this fails, it attempts to run it on that
type ITTTStore struct {
this, that BlobStore
name string
}
type ITTTParams struct {
Name string `mapstructure:"name"`
This BlobStore `mapstructure:"this"`
That BlobStore `mapstructure:"that"`
}
type ITTTConfig struct {
Name string `mapstructure:"name"`
This *viper.Viper
That *viper.Viper
}
// NewITTTStore returns a new instance of the IF THIS THAN THAT store
func NewITTTStore(this, that BlobStore) *ITTTStore {
func NewITTTStore(params ITTTParams) *ITTTStore {
return &ITTTStore{
this: this,
that: that,
this: params.This,
that: params.That,
name: params.Name,
}
}
const nameIttt = "ittt"
func ITTTStoreFactory(config *viper.Viper) (BlobStore, error) {
var cfg ITTTConfig
err := config.Unmarshal(&cfg)
if err != nil {
return nil, errors.Err(err)
}
cfg.This = config.Sub("this")
cfg.That = config.Sub("that")
thisStoreType := strings.Split(cfg.This.AllKeys()[0], ".")[0]
thisStoreConfig := cfg.This.Sub(thisStoreType)
factory, ok := Factories[thisStoreType]
if !ok {
return nil, errors.Err("unknown store type %s", thisStoreType)
}
thisStore, err := factory(thisStoreConfig)
if err != nil {
return nil, errors.Err(err)
}
thatStoreType := strings.Split(cfg.That.AllKeys()[0], ".")[0]
thatStoreConfig := cfg.That.Sub(thatStoreType)
factory, ok = Factories[thatStoreType]
if !ok {
return nil, errors.Err("unknown store type %s", thatStoreType)
}
thatStore, err := factory(thatStoreConfig)
if err != nil {
return nil, errors.Err(err)
}
return NewITTTStore(ITTTParams{
Name: cfg.Name,
This: thisStore,
That: thatStore,
}), nil
}
func init() {
RegisterStore(nameIttt, ITTTStoreFactory)
}
// Name is the cache type name
func (c *ITTTStore) Name() string { return nameIttt }
func (c *ITTTStore) Name() string { return nameIttt + "-" + c.name }
// Has checks in this for a hash, if it fails it checks in that. It returns true if either store has it.
func (c *ITTTStore) Has(hash string) (bool, error) {

View file

@ -8,25 +8,45 @@ import (
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/spf13/viper"
)
// MemStore is an in memory only blob store with no persistence.
type MemStore struct {
blobs map[string]stream.Blob
mu *sync.RWMutex
name string
}
func NewMemStore() *MemStore {
type MemParams struct {
Name string `mapstructure:"name"`
}
func NewMemStore(params MemParams) *MemStore {
return &MemStore{
blobs: make(map[string]stream.Blob),
mu: &sync.RWMutex{},
name: params.Name,
}
}
const nameMem = "mem"
func MemStoreFactory(config *viper.Viper) (BlobStore, error) {
var cfg MemParams
err := config.Unmarshal(&cfg)
if err != nil {
return nil, errors.Err(err)
}
return NewMemStore(cfg), nil
}
func init() {
RegisterStore(nameMem, MemStoreFactory)
}
// Name is the cache type name
func (m *MemStore) Name() string { return nameMem }
func (m *MemStore) Name() string { return nameMem + "-" + m.name }
// Has returns T/F if the blob is currently stored. It will never error.
func (m *MemStore) Has(hash string) (bool, error) {

View file

@ -8,7 +8,7 @@ import (
)
func TestMemStore_Put(t *testing.T) {
s := NewMemStore()
s := NewMemStore(MemParams{Name: "test"})
blob := []byte("abcdefg")
err := s.Put("abc", blob)
if err != nil {
@ -17,7 +17,7 @@ func TestMemStore_Put(t *testing.T) {
}
func TestMemStore_Get(t *testing.T) {
s := NewMemStore()
s := NewMemStore(MemParams{Name: "test"})
hash := "abc"
blob := []byte("abcdefg")
err := s.Put(hash, blob)

View file

@ -5,15 +5,38 @@ import (
"github.com/lbryio/reflector.go/shared"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/spf13/viper"
)
// NoopStore is a store that does nothing
type NoopStore struct{}
type NoopStore struct {
name string
}
func NewNoopStore(name string) *NoopStore {
return &NoopStore{name: name}
}
const nameNoop = "noop"
func (n *NoopStore) Name() string { return nameNoop }
func NoopStoreFactory(config *viper.Viper) (BlobStore, error) {
var cfg struct {
Name string `mapstructure:"name"`
}
err := config.Unmarshal(&cfg)
if err != nil {
return nil, errors.Err(err)
}
return NewNoopStore(cfg.Name), nil
}
func init() {
RegisterStore(nameNoop, NoopStoreFactory)
}
func (n *NoopStore) Name() string { return nameNoop + "-" + n.name }
func (n *NoopStore) Has(_ string) (bool, error) { return false, nil }
func (n *NoopStore) Get(_ string) (stream.Blob, shared.BlobTrace, error) {
return nil, shared.NewBlobTrace(time.Since(time.Now()), n.Name()), nil
@ -21,4 +44,4 @@ func (n *NoopStore) Get(_ string) (stream.Blob, shared.BlobTrace, error) {
func (n *NoopStore) Put(_ string, _ stream.Blob) error { return nil }
func (n *NoopStore) PutSD(_ string, _ stream.Blob) error { return nil }
func (n *NoopStore) Delete(_ string) error { return nil }
func (n *NoopStore) Shutdown() { return }
func (n *NoopStore) Shutdown() {}

99
store/peer.go Normal file
View file

@ -0,0 +1,99 @@
package store
import (
"strings"
"time"
"github.com/lbryio/reflector.go/shared"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/spf13/viper"
)
// PeerStore is a blob store that gets blobs from a peer.
// It satisfies the BlobStore interface but cannot put or delete blobs.
type PeerStore struct {
opts PeerParams
name string
}
// PeerParams allows to set options for a new PeerStore.
type PeerParams struct {
Name string `mapstructure:"name"`
Address string `mapstructure:"address"`
Timeout time.Duration `mapstructure:"timeout"`
}
// NewPeerStore makes a new peer store.
func NewPeerStore(params PeerParams) *PeerStore {
return &PeerStore{opts: params, name: params.Name}
}
const namePeer = "peer"
func PeerStoreFactory(config *viper.Viper) (BlobStore, error) {
var cfg PeerParams
err := config.Unmarshal(&cfg)
if err != nil {
return nil, errors.Err(err)
}
return NewPeerStore(cfg), nil
}
func init() {
RegisterStore(namePeer, PeerStoreFactory)
}
func (p *PeerStore) Name() string { return namePeer + "-" + p.name }
func (p *PeerStore) getClient() (*PeerClient, error) {
c := &PeerClient{Timeout: p.opts.Timeout}
err := c.Connect(p.opts.Address)
return c, errors.Prefix("connection error", err)
}
// Has asks the peer if they have a hash
func (p *PeerStore) Has(hash string) (bool, error) {
c, err := p.getClient()
if err != nil {
return false, err
}
defer func() { _ = c.Close() }()
return c.HasBlob(hash)
}
// Get downloads the blob from the peer
func (p *PeerStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now()
c, err := p.getClient()
if err != nil {
return nil, shared.NewBlobTrace(time.Since(start), p.Name()), err
}
defer func() { _ = c.Close() }()
blob, trace, err := c.GetBlob(hash)
if err != nil && strings.Contains(err.Error(), "blob not found") {
return nil, trace, ErrBlobNotFound
}
return blob, trace, err
}
// Put is not supported
func (p *PeerStore) Put(hash string, blob stream.Blob) error {
return errors.Err(shared.ErrNotImplemented)
}
// PutSD is not supported
func (p *PeerStore) PutSD(hash string, blob stream.Blob) error {
return errors.Err(shared.ErrNotImplemented)
}
// Delete is not supported
func (p *PeerStore) Delete(hash string) error {
return errors.Err(shared.ErrNotImplemented)
}
// Shutdown is not supported
func (p *PeerStore) Shutdown() {
}

95
store/peer_client.go Normal file
View file

@ -0,0 +1,95 @@
package store
import (
"bufio"
"bytes"
"encoding/binary"
"io"
"net"
"time"
"github.com/lbryio/reflector.go/shared"
"github.com/lbryio/lbry.go/v2/stream"
)
// PeerClient is a client for peer blob store
type PeerClient struct {
conn net.Conn
Timeout time.Duration
}
// Connect connects to a peer
func (c *PeerClient) Connect(address string) error {
var err error
c.conn, err = net.DialTimeout("tcp", address, c.Timeout)
return err
}
// Close closes the connection
func (c *PeerClient) Close() error {
if c.conn != nil {
return c.conn.Close()
}
return nil
}
// HasBlob checks if the peer has a blob
func (c *PeerClient) HasBlob(hash string) (bool, error) {
err := c.writeRequest("has", hash)
if err != nil {
return false, err
}
response, err := c.readResponse()
if err != nil {
return false, err
}
return response == "yes", nil
}
// GetBlob gets a blob from the peer
func (c *PeerClient) GetBlob(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now()
err := c.writeRequest("get", hash)
if err != nil {
return nil, shared.NewBlobTrace(time.Since(start), "peer"), err
}
response, err := c.readResponse()
if err != nil {
return nil, shared.NewBlobTrace(time.Since(start), "peer"), err
}
if response == "no" {
return nil, shared.NewBlobTrace(time.Since(start), "peer"), ErrBlobNotFound
}
size, err := binary.ReadVarint(bufio.NewReader(bytes.NewReader([]byte(response))))
if err != nil {
return nil, shared.NewBlobTrace(time.Since(start), "peer"), err
}
blob := make([]byte, size)
_, err = io.ReadFull(c.conn, blob)
if err != nil {
return nil, shared.NewBlobTrace(time.Since(start), "peer"), err
}
return blob, shared.NewBlobTrace(time.Since(start), "peer"), nil
}
func (c *PeerClient) writeRequest(cmd, hash string) error {
_, err := c.conn.Write([]byte(cmd + " " + hash + "\n"))
return err
}
func (c *PeerClient) readResponse() (string, error) {
reader := bufio.NewReader(c.conn)
response, err := reader.ReadString('\n')
if err != nil {
return "", err
}
return response[:len(response)-1], nil
}

View file

@ -1,42 +1,62 @@
package store
import (
"strings"
"time"
"github.com/lbryio/reflector.go/shared"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/spf13/viper"
)
// ProxiedS3Store writes to an S3 store and reads from any BlobStore (usually an ITTTStore of HttpStore endpoints).
type ProxiedS3Store struct {
cf BlobStore
s3 *S3Store
proxied BlobStore
s3 *S3Store
name string
}
type ProxiedS3Params struct {
Name string `mapstructure:"name"`
Proxied BlobStore `mapstructure:"proxied"`
S3 *S3Store `mapstructure:"s3"`
}
type ProxiedS3Config struct {
Name string `mapstructure:"name"`
Proxied *viper.Viper
S3 *viper.Viper
}
// NewProxiedS3Store returns an initialized ProxiedS3Store store pointer.
// NOTE: It panics if either argument is nil.
func NewProxiedS3Store(cf BlobStore, s3 *S3Store) *ProxiedS3Store {
if cf == nil || s3 == nil {
func NewProxiedS3Store(params ProxiedS3Params) *ProxiedS3Store {
if params.Proxied == nil || params.S3 == nil {
panic("both stores must be set")
}
return &ProxiedS3Store{cf: cf, s3: s3}
return &ProxiedS3Store{
proxied: params.Proxied,
s3: params.S3,
name: params.Name,
}
}
const nameProxiedS3 = "proxied-s3"
// Name is the cache type name
func (c *ProxiedS3Store) Name() string { return nameProxiedS3 }
func (c *ProxiedS3Store) Name() string { return nameProxiedS3 + "-" + c.name }
// Has checks if the hash is in the store.
func (c *ProxiedS3Store) Has(hash string) (bool, error) {
return c.cf.Has(hash)
return c.proxied.Has(hash)
}
// Get gets the blob from Cloudfront.
func (c *ProxiedS3Store) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now()
blob, trace, err := c.cf.Get(hash)
blob, trace, err := c.proxied.Get(hash)
return blob, trace.Stack(time.Since(start), c.Name()), err
}
@ -58,5 +78,53 @@ func (c *ProxiedS3Store) Delete(hash string) error {
// Shutdown shuts down the store gracefully
func (c *ProxiedS3Store) Shutdown() {
c.s3.Shutdown()
c.cf.Shutdown()
c.proxied.Shutdown()
}
func ProxiedS3StoreFactory(config *viper.Viper) (BlobStore, error) {
var cfg ProxiedS3Config
err := config.Unmarshal(&cfg)
if err != nil {
return nil, errors.Err(err)
}
cfg.Proxied = config.Sub("proxied")
cfg.S3 = config.Sub("s3")
proxiedStoreType := strings.Split(cfg.Proxied.AllKeys()[0], ".")[0]
proxiedStoreConfig := cfg.Proxied.Sub(proxiedStoreType)
factory, ok := Factories[proxiedStoreType]
if !ok {
return nil, errors.Err("unknown store type %s", proxiedStoreType)
}
proxiedStore, err := factory(proxiedStoreConfig)
if err != nil {
return nil, errors.Err(err)
}
s3StoreType := strings.Split(cfg.S3.AllKeys()[0], ".")[0]
s3StoreConfig := cfg.S3.Sub(s3StoreType)
factory, ok = Factories[s3StoreType]
if !ok {
return nil, errors.Err("unknown store type %s", s3StoreType)
}
s3Store, err := factory(s3StoreConfig)
if err != nil {
return nil, errors.Err(err)
}
s3StoreTyped, ok := s3Store.(*S3Store)
if !ok {
return nil, errors.Err("s3 store must be of type S3Store")
}
return NewProxiedS3Store(ProxiedS3Params{
Name: cfg.Name,
Proxied: proxiedStore,
S3: s3StoreTyped,
}), nil
}
func init() {
RegisterStore(nameProxiedS3, ProxiedS3StoreFactory)
}

View file

@ -17,6 +17,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
)
// S3Store is an S3 store
@ -26,25 +27,49 @@ type S3Store struct {
region string
bucket string
endpoint string
name string
session *session.Session
}
type S3Params struct {
Name string `mapstructure:"name"`
AwsID string `mapstructure:"aws_id"`
AwsSecret string `mapstructure:"aws_secret"`
Region string `mapstructure:"region"`
Bucket string `mapstructure:"bucket"`
Endpoint string `mapstructure:"endpoint"`
}
// NewS3Store returns an initialized S3 store pointer.
func NewS3Store(awsID, awsSecret, region, bucket, endpoint string) *S3Store {
func NewS3Store(params S3Params) *S3Store {
return &S3Store{
awsID: awsID,
awsSecret: awsSecret,
region: region,
bucket: bucket,
endpoint: endpoint,
awsID: params.AwsID,
awsSecret: params.AwsSecret,
region: params.Region,
bucket: params.Bucket,
endpoint: params.Endpoint,
name: params.Name,
}
}
const nameS3 = "s3"
func S3StoreFactory(config *viper.Viper) (BlobStore, error) {
var cfg S3Params
err := config.Unmarshal(&cfg)
if err != nil {
return nil, errors.Err(err)
}
return NewS3Store(cfg), nil
}
func init() {
RegisterStore(nameS3, S3StoreFactory)
}
// Name is the cache type name
func (s *S3Store) Name() string { return nameS3 }
func (s *S3Store) Name() string { return nameS3 + "-" + s.name }
// Has returns T/F or Error ( from S3 ) if the store contains the blob.
func (s *S3Store) Has(hash string) (bool, error) {
@ -169,5 +194,4 @@ func (s *S3Store) initOnce() error {
// Shutdown shuts down the store gracefully
func (s *S3Store) Shutdown() {
return
}

View file

@ -1,6 +1,7 @@
package store
import (
"strings"
"time"
"github.com/lbryio/reflector.go/internal/metrics"
@ -9,6 +10,7 @@ import (
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/spf13/viper"
"golang.org/x/sync/singleflight"
)
@ -27,6 +29,38 @@ type singleflightStore struct {
sf *singleflight.Group
}
type SingleFlightConfig struct {
Component string `mapstructure:"component"`
Store *viper.Viper
}
func SingleFlightStoreFactory(config *viper.Viper) (BlobStore, error) {
var cfg SingleFlightConfig
err := config.Unmarshal(&cfg)
if err != nil {
return nil, errors.Err(err)
}
cfg.Store = config.Sub("store")
storeType := strings.Split(cfg.Store.AllKeys()[0], ".")[0]
storeConfig := cfg.Store.Sub(storeType)
factory, ok := Factories[storeType]
if !ok {
return nil, errors.Err("unknown store type %s", storeType)
}
underlyingStore, err := factory(storeConfig)
if err != nil {
return nil, errors.Err(err)
}
return WithSingleFlight(cfg.Component, underlyingStore), nil
}
func init() {
RegisterStore("singleflight", SingleFlightStoreFactory)
}
func (s *singleflightStore) Name() string {
return "sf_" + s.BlobStore.Name()
}
@ -124,5 +158,4 @@ func (s *singleflightStore) putter(hash string, blob stream.Blob) func() (interf
// Shutdown shuts down the store gracefully
func (s *singleflightStore) Shutdown() {
s.BlobStore.Shutdown()
return
}

View file

@ -2,6 +2,7 @@ package store
import (
"github.com/lbryio/reflector.go/shared"
"github.com/spf13/viper"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
@ -39,5 +40,12 @@ type lister interface {
list() ([]string, error)
}
//ErrBlobNotFound is a standard error when a blob is not found in the store.
// ErrBlobNotFound is a standard error when a blob is not found in the store.
var ErrBlobNotFound = errors.Base("blob not found")
var Factories = make(map[string]Factory)
func RegisterStore(name string, factory Factory) {
Factories[name] = factory
}
type Factory func(config *viper.Viper) (BlobStore, error)

View file

@ -15,6 +15,7 @@ import (
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
)
// UpstreamStore is a store that works on top of the HTTP protocol
@ -22,19 +23,40 @@ type UpstreamStore struct {
upstream string
httpClient *http.Client
edgeToken string
name string
}
func NewUpstreamStore(upstream, edgeToken string) *UpstreamStore {
type UpstreamParams struct {
Name string `mapstructure:"name"`
Upstream string `mapstructure:"upstream"`
EdgeToken string `mapstructure:"edge_token"`
}
func NewUpstreamStore(params UpstreamParams) *UpstreamStore {
return &UpstreamStore{
upstream: upstream,
upstream: params.Upstream,
httpClient: getClient(),
edgeToken: edgeToken,
edgeToken: params.EdgeToken,
name: params.Name,
}
}
const nameUpstream = "upstream"
func (n *UpstreamStore) Name() string { return nameUpstream }
func UpstreamStoreFactory(config *viper.Viper) (BlobStore, error) {
var cfg UpstreamParams
err := config.Unmarshal(&cfg)
if err != nil {
return nil, errors.Err(err)
}
return NewUpstreamStore(cfg), nil
}
func init() {
RegisterStore(nameUpstream, UpstreamStoreFactory)
}
func (n *UpstreamStore) Name() string { return nameUpstream + "-" + n.name }
func (n *UpstreamStore) Has(hash string) (bool, error) {
url := n.upstream + "/blob?hash=" + hash