diff --git a/cmd/reflector.go b/cmd/reflector.go index 738a066..6fe44ad 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -37,6 +37,7 @@ var ( proxyProtocol string useDB bool cloudFrontEndpoint string + WasabiEndpoint string reflectorCmdDiskCache string bufferReflectorCmdDiskCache string reflectorCmdMemCache int @@ -52,6 +53,7 @@ func init() { cmd.Flags().StringVar(&proxyPort, "proxy-port", "5567", "port of another reflector server where blobs are fetched from") cmd.Flags().StringVar(&proxyProtocol, "proxy-protocol", "http3", "protocol used to fetch blobs from another reflector server (tcp/http3)") cmd.Flags().StringVar(&cloudFrontEndpoint, "cloudfront-endpoint", "", "CloudFront edge endpoint for standard HTTP retrieval") + cmd.Flags().StringVar(&WasabiEndpoint, "wasabi-endpoint", "", "Wasabi edge endpoint for standard HTTP retrieval") cmd.Flags().IntVar(&tcpPeerPort, "tcp-peer-port", 5567, "The port reflector will distribute content from") cmd.Flags().IntVar(&http3PeerPort, "http3-peer-port", 5568, "The port reflector will distribute content from over HTTP3 protocol") cmd.Flags().IntVar(&receiverPort, "receiver-port", 5566, "The port reflector will receive content from") @@ -137,12 +139,12 @@ func setupStore() store.BlobStore { if conf != "none" { s3Store = store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) } - if cloudFrontEndpoint != "" { - cfs := store.NewCloudFrontROStore(cloudFrontEndpoint) + if cloudFrontEndpoint != "" && WasabiEndpoint != "" { + ittt := store.NewITTTStore(store.NewCloudFrontROStore(WasabiEndpoint), store.NewCloudFrontROStore(cloudFrontEndpoint)) if s3Store != nil { - s = store.NewCloudFrontRWStore(cfs, s3Store) + s = store.NewCloudFrontRWStore(ittt, s3Store) } else { - s = cfs + s = ittt } } else if s3Store != nil { s = s3Store diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 2793fc0..2570677 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -60,6 +60,7 @@ func (s *Server) Shutdown() { const ( ns = "reflector" subsystemCache = "cache" + subsystemITTT = "ittt" labelDirection = "direction" labelErrorType = "error_type" @@ -124,6 +125,18 @@ var ( Name: "hit_total", Help: "Total number of blobs retrieved from the cache storage", }, []string{LabelCacheType, LabelComponent}) + ThisHitCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: ns, + Subsystem: subsystemITTT, + Name: "this_hit_total", + Help: "Total number of blobs retrieved from the this storage", + }) + ThatHitCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: ns, + Subsystem: subsystemITTT, + Name: "that_hit_total", + Help: "Total number of blobs retrieved from the that storage", + }) CacheMissCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: ns, Subsystem: subsystemCache, diff --git a/peer/http3/server.go b/peer/http3/server.go index 89a64db..30ae3fd 100644 --- a/peer/http3/server.go +++ b/peer/http3/server.go @@ -112,7 +112,7 @@ func (s *Server) Start(address string) error { }, QuicConfig: quicConf, } - go InitWorkers(s, 100) + go InitWorkers(s, 200) go s.listenForShutdown(&server) s.grp.Add(1) go func() { diff --git a/store/cloudfront_rw.go b/store/cloudfront_rw.go index 32ee418..6b293a8 100644 --- a/store/cloudfront_rw.go +++ b/store/cloudfront_rw.go @@ -7,15 +7,15 @@ import ( "github.com/lbryio/reflector.go/shared" ) -// CloudFrontRWStore combines a Cloudfront and an S3 store. Reads go to Cloudfront, writes go to S3. +// CloudFrontRWStore combines a Cloudfront and an S3 store. Reads go to Cloudfront/Wasabi, writes go to S3. type CloudFrontRWStore struct { - cf *CloudFrontROStore + cf *ITTTStore s3 *S3Store } // NewCloudFrontRWStore returns an initialized CloudFrontRWStore store pointer. // NOTE: It panics if either argument is nil. -func NewCloudFrontRWStore(cf *CloudFrontROStore, s3 *S3Store) *CloudFrontRWStore { +func NewCloudFrontRWStore(cf *ITTTStore, s3 *S3Store) *CloudFrontRWStore { if cf == nil || s3 == nil { panic("both stores must be set") } diff --git a/store/ittt.go b/store/ittt.go new file mode 100644 index 0000000..5edc67d --- /dev/null +++ b/store/ittt.go @@ -0,0 +1,75 @@ +package store + +import ( + "time" + + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/stream" + + "github.com/lbryio/reflector.go/internal/metrics" + "github.com/lbryio/reflector.go/shared" +) + +// ITTT store performs an operation on this storage, if this fails, it attempts to run it on that +type ITTTStore struct { + this, that BlobStore +} + +// NewCachingStore makes a new caching disk store and returns a pointer to it. +func NewITTTStore(this, that BlobStore) *ITTTStore { + return &ITTTStore{ + this: this, + that: that, + } +} + +const nameIttt = "ittt" + +// Name is the cache type name +func (c *ITTTStore) Name() string { return nameIttt } + +// Has checks the cache and then the origin for a hash. It returns true if either store has it. +func (c *ITTTStore) Has(hash string) (bool, error) { + has, err := c.this.Has(hash) + if err != nil || !has { + has, err = c.that.Has(hash) + } + return has, err +} + +// Get tries to get the blob from this first, falling back to that. +func (c *ITTTStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { + start := time.Now() + blob, trace, err := c.this.Get(hash) + if err == nil { + metrics.ThisHitCount.Inc() + return blob, trace.Stack(time.Since(start), c.Name()), err + } + + blob, trace, err = c.that.Get(hash) + if err != nil { + return nil, trace.Stack(time.Since(start), c.Name()), err + } + metrics.ThatHitCount.Inc() + return blob, trace.Stack(time.Since(start), c.Name()), nil +} + +// Put not implemented +func (c *ITTTStore) Put(hash string, blob stream.Blob) error { + return errors.Err(shared.ErrNotImplemented) +} + +// PutSD not implemented +func (c *ITTTStore) PutSD(hash string, blob stream.Blob) error { + return errors.Err(shared.ErrNotImplemented) +} + +// Delete not implemented +func (c *ITTTStore) Delete(hash string) error { + return errors.Err(shared.ErrNotImplemented) +} + +// Shutdown shuts down the store gracefully +func (c *ITTTStore) Shutdown() { + return +} diff --git a/store/s3.go b/store/s3.go index 75d66cc..75ba459 100644 --- a/store/s3.go +++ b/store/s3.go @@ -112,10 +112,11 @@ func (s *S3Store) Put(hash string, blob stream.Blob) error { }(time.Now()) _, err = s3manager.NewUploader(s.session).Upload(&s3manager.UploadInput{ - Bucket: aws.String(s.bucket), - Key: aws.String(hash), - Body: bytes.NewBuffer(blob), - StorageClass: aws.String(s3.StorageClassIntelligentTiering), + Bucket: aws.String(s.bucket), + Key: aws.String(hash), + Body: bytes.NewBuffer(blob), + ACL: aws.String("public-read"), + //StorageClass: aws.String(s3.StorageClassIntelligentTiering), }) metrics.MtrOutBytesReflector.Add(float64(blob.Size())) @@ -152,6 +153,7 @@ func (s *S3Store) initOnce() error { sess, err := session.NewSession(&aws.Config{ Credentials: credentials.NewStaticCredentials(s.awsID, s.awsSecret, ""), Region: aws.String(s.region), + Endpoint: aws.String("https://s3.wasabisys.com"), }) if err != nil { return err