mirror of
https://github.com/LBRYFoundation/reflector.go.git
synced 2025-09-21 02:19:46 +00:00
add support for sharding in http stores
This commit is contained in:
parent
03445476ec
commit
185900582d
2 changed files with 34 additions and 13 deletions
|
@ -179,7 +179,7 @@ func initEdgeStore() store.BlobStore {
|
|||
s3Store = store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName, globalConfig.S3Endpoint)
|
||||
}
|
||||
if originEndpointFallback != "" && originEndpoint != "" {
|
||||
ittt := store.NewITTTStore(store.NewHttpStore(originEndpoint), store.NewHttpStore(originEndpointFallback))
|
||||
ittt := store.NewITTTStore(store.NewHttpStore(originEndpoint, 0), store.NewHttpStore(originEndpointFallback, 0))
|
||||
if s3Store != nil {
|
||||
s = store.NewProxiedS3Store(ittt, s3Store)
|
||||
} else {
|
||||
|
|
|
@ -3,6 +3,8 @@ package store
|
|||
import (
|
||||
"io"
|
||||
"net/http"
|
||||
"path"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/reflector.go/internal/metrics"
|
||||
|
@ -17,15 +19,17 @@ import (
|
|||
|
||||
// HttpStore reads from an HTTP endpoint that simply expects the hash to be appended to the endpoint
|
||||
type HttpStore struct {
|
||||
endpoint string // cloudflare endpoint
|
||||
httpClient *http.Client
|
||||
endpoint string
|
||||
httpClient *http.Client
|
||||
prefixLength int
|
||||
}
|
||||
|
||||
// NewHttpStore returns an initialized HttpStore store pointer.
|
||||
func NewHttpStore(endpoint string) *HttpStore {
|
||||
func NewHttpStore(endpoint string, prefixLength int) *HttpStore {
|
||||
return &HttpStore{
|
||||
endpoint: endpoint,
|
||||
httpClient: getClient(),
|
||||
endpoint: endpoint,
|
||||
httpClient: getClient(),
|
||||
prefixLength: prefixLength,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -58,7 +62,8 @@ func (c *HttpStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
|||
defer func(t time.Time) {
|
||||
log.Warnf("Getting %s from HTTP(s) source took %s", hash[:8], time.Since(t).String())
|
||||
}(start)
|
||||
url := c.endpoint + hash
|
||||
|
||||
url := c.endpoint + c.shardedPath(hash)
|
||||
req, err := http.NewRequest(http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return nil, shared.NewBlobTrace(time.Since(start), c.Name()), errors.Err(err)
|
||||
|
@ -91,21 +96,30 @@ func (c *HttpStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
|||
switch res.StatusCode {
|
||||
case http.StatusNotFound:
|
||||
return nil, trace.Stack(time.Since(start), c.Name()), ErrBlobNotFound
|
||||
|
||||
case http.StatusOK:
|
||||
contentLength := res.Header.Get("Content-Length")
|
||||
if contentLength != "" {
|
||||
size, err := strconv.Atoi(contentLength)
|
||||
if err == nil && size > 0 && size <= stream.MaxBlobSize {
|
||||
blob := make([]byte, size)
|
||||
_, err = io.ReadFull(res.Body, blob)
|
||||
if err == nil {
|
||||
metrics.MtrInBytesHttp.Add(float64(size))
|
||||
return blob, trace.Stack(time.Since(start), c.Name()), nil
|
||||
}
|
||||
log.Warnf("Error reading body with known size: %s", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
buffer := getBuffer()
|
||||
defer putBuffer(buffer)
|
||||
|
||||
if _, err := io.Copy(buffer, res.Body); err != nil {
|
||||
return nil, trace.Stack(time.Since(start), c.Name()), errors.Err(err)
|
||||
}
|
||||
|
||||
blob := make([]byte, buffer.Len())
|
||||
copy(blob, buffer.Bytes())
|
||||
|
||||
metrics.MtrInBytesHttp.Add(float64(len(blob)))
|
||||
return blob, trace.Stack(time.Since(start), c.Name()), nil
|
||||
|
||||
default:
|
||||
body, _ := io.ReadAll(res.Body)
|
||||
log.Warnf("Got status code %d (%s)", res.StatusCode, string(body))
|
||||
|
@ -115,7 +129,7 @@ func (c *HttpStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
|||
}
|
||||
|
||||
func (c *HttpStore) cfRequest(method, hash string) (int, io.ReadCloser, error) {
|
||||
url := c.endpoint + hash
|
||||
url := c.endpoint + c.shardedPath(hash)
|
||||
req, err := http.NewRequest(method, url, nil)
|
||||
if err != nil {
|
||||
return 0, nil, errors.Err(err)
|
||||
|
@ -145,3 +159,10 @@ func (c *HttpStore) Delete(_ string) error {
|
|||
// Shutdown shuts down the store gracefully
|
||||
func (c *HttpStore) Shutdown() {
|
||||
}
|
||||
|
||||
func (c *HttpStore) shardedPath(hash string) string {
|
||||
if c.prefixLength <= 0 || len(hash) < c.prefixLength {
|
||||
return hash
|
||||
}
|
||||
return path.Join(hash[:c.prefixLength], hash)
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue