diff --git a/cmd/reflector.go b/cmd/reflector.go index 11197b9..19db0ea 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -179,7 +179,7 @@ func initEdgeStore() store.BlobStore { s3Store = store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName, globalConfig.S3Endpoint) } if originEndpointFallback != "" && originEndpoint != "" { - ittt := store.NewITTTStore(store.NewHttpStore(originEndpoint), store.NewHttpStore(originEndpointFallback)) + ittt := store.NewITTTStore(store.NewHttpStore(originEndpoint, 0), store.NewHttpStore(originEndpointFallback, 0)) if s3Store != nil { s = store.NewProxiedS3Store(ittt, s3Store) } else { diff --git a/store/http.go b/store/http.go index 66b21af..6cc254c 100644 --- a/store/http.go +++ b/store/http.go @@ -3,6 +3,8 @@ package store import ( "io" "net/http" + "path" + "strconv" "time" "github.com/lbryio/reflector.go/internal/metrics" @@ -17,15 +19,17 @@ import ( // HttpStore reads from an HTTP endpoint that simply expects the hash to be appended to the endpoint type HttpStore struct { - endpoint string // cloudflare endpoint - httpClient *http.Client + endpoint string + httpClient *http.Client + prefixLength int } // NewHttpStore returns an initialized HttpStore store pointer. -func NewHttpStore(endpoint string) *HttpStore { +func NewHttpStore(endpoint string, prefixLength int) *HttpStore { return &HttpStore{ - endpoint: endpoint, - httpClient: getClient(), + endpoint: endpoint, + httpClient: getClient(), + prefixLength: prefixLength, } } @@ -58,7 +62,8 @@ func (c *HttpStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { defer func(t time.Time) { log.Warnf("Getting %s from HTTP(s) source took %s", hash[:8], time.Since(t).String()) }(start) - url := c.endpoint + hash + + url := c.endpoint + c.shardedPath(hash) req, err := http.NewRequest(http.MethodGet, url, nil) if err != nil { return nil, shared.NewBlobTrace(time.Since(start), c.Name()), errors.Err(err) @@ -91,21 +96,30 @@ func (c *HttpStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { switch res.StatusCode { case http.StatusNotFound: return nil, trace.Stack(time.Since(start), c.Name()), ErrBlobNotFound - case http.StatusOK: + contentLength := res.Header.Get("Content-Length") + if contentLength != "" { + size, err := strconv.Atoi(contentLength) + if err == nil && size > 0 && size <= stream.MaxBlobSize { + blob := make([]byte, size) + _, err = io.ReadFull(res.Body, blob) + if err == nil { + metrics.MtrInBytesHttp.Add(float64(size)) + return blob, trace.Stack(time.Since(start), c.Name()), nil + } + log.Warnf("Error reading body with known size: %s", err.Error()) + } + } + buffer := getBuffer() defer putBuffer(buffer) - if _, err := io.Copy(buffer, res.Body); err != nil { return nil, trace.Stack(time.Since(start), c.Name()), errors.Err(err) } - blob := make([]byte, buffer.Len()) copy(blob, buffer.Bytes()) - metrics.MtrInBytesHttp.Add(float64(len(blob))) return blob, trace.Stack(time.Since(start), c.Name()), nil - default: body, _ := io.ReadAll(res.Body) log.Warnf("Got status code %d (%s)", res.StatusCode, string(body)) @@ -115,7 +129,7 @@ func (c *HttpStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { } func (c *HttpStore) cfRequest(method, hash string) (int, io.ReadCloser, error) { - url := c.endpoint + hash + url := c.endpoint + c.shardedPath(hash) req, err := http.NewRequest(method, url, nil) if err != nil { return 0, nil, errors.Err(err) @@ -145,3 +159,10 @@ func (c *HttpStore) Delete(_ string) error { // Shutdown shuts down the store gracefully func (c *HttpStore) Shutdown() { } + +func (c *HttpStore) shardedPath(hash string) string { + if c.prefixLength <= 0 || len(hash) < c.prefixLength { + return hash + } + return path.Join(hash[:c.prefixLength], hash) +}