package cmd import ( "os" "os/signal" "strconv" "strings" "syscall" "time" "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/stop" "github.com/lbryio/reflector.go/db" "github.com/lbryio/reflector.go/internal/metrics" "github.com/lbryio/reflector.go/meta" "github.com/lbryio/reflector.go/peer" "github.com/lbryio/reflector.go/peer/http3" "github.com/lbryio/reflector.go/reflector" "github.com/lbryio/reflector.go/store" "github.com/lbryio/lbry.go/v2/stream" "github.com/c2h5oh/datasize" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) var ( tcpPeerPort int http3PeerPort int receiverPort int metricsPort int disableUploads bool disableBlocklist bool proxyAddress string proxyPort string proxyProtocol string useDB bool cloudFrontEndpoint string reflectorCmdDiskCache string bufferReflectorCmdDiskCache string reflectorCmdMemCache int ) func init() { var cmd = &cobra.Command{ Use: "reflector", Short: "Run reflector server", Run: reflectorCmd, } cmd.Flags().StringVar(&proxyAddress, "proxy-address", "", "address of another reflector server where blobs are fetched from") cmd.Flags().StringVar(&proxyPort, "proxy-port", "5567", "port of another reflector server where blobs are fetched from") cmd.Flags().StringVar(&proxyProtocol, "proxy-protocol", "http3", "protocol used to fetch blobs from another reflector server (tcp/http3)") cmd.Flags().StringVar(&cloudFrontEndpoint, "cloudfront-endpoint", "", "CloudFront edge endpoint for standard HTTP retrieval") cmd.Flags().IntVar(&tcpPeerPort, "tcp-peer-port", 5567, "The port reflector will distribute content from") cmd.Flags().IntVar(&http3PeerPort, "http3-peer-port", 5568, "The port reflector will distribute content from over HTTP3 protocol") cmd.Flags().IntVar(&receiverPort, "receiver-port", 5566, "The port reflector will receive content from") cmd.Flags().IntVar(&metricsPort, "metrics-port", 2112, "The port reflector will use for metrics") cmd.Flags().BoolVar(&disableUploads, "disable-uploads", false, "Disable uploads to this reflector server") cmd.Flags().BoolVar(&disableBlocklist, "disable-blocklist", false, "Disable blocklist watching/updating") cmd.Flags().BoolVar(&useDB, "use-db", true, "whether to connect to the reflector db or not") cmd.Flags().StringVar(&reflectorCmdDiskCache, "disk-cache", "", "enable disk cache, setting max size and path where to store blobs. format is 'sizeGB:CACHE_PATH'") cmd.Flags().StringVar(&bufferReflectorCmdDiskCache, "buffer-disk-cache", "", "enable buffer disk cache, setting max size and path where to store blobs. format is 'sizeGB:CACHE_PATH'") cmd.Flags().IntVar(&reflectorCmdMemCache, "mem-cache", 0, "enable in-memory cache with a max size of this many blobs") rootCmd.AddCommand(cmd) } func reflectorCmd(cmd *cobra.Command, args []string) { log.Printf("reflector %s", meta.VersionString()) cleanerStopper := stop.New() // the blocklist logic requires the db backed store to be the outer-most store underlyingStore := setupStore() outerStore := wrapWithCache(underlyingStore, cleanerStopper) if !disableUploads { reflectorServer := reflector.NewServer(underlyingStore, outerStore) reflectorServer.Timeout = 3 * time.Minute reflectorServer.EnableBlocklist = !disableBlocklist err := reflectorServer.Start(":" + strconv.Itoa(receiverPort)) if err != nil { log.Fatal(err) } defer reflectorServer.Shutdown() } peerServer := peer.NewServer(outerStore) err := peerServer.Start(":" + strconv.Itoa(tcpPeerPort)) if err != nil { log.Fatal(err) } defer peerServer.Shutdown() http3PeerServer := http3.NewServer(outerStore) err = http3PeerServer.Start(":" + strconv.Itoa(http3PeerPort)) if err != nil { log.Fatal(err) } defer http3PeerServer.Shutdown() metricsServer := metrics.NewServer(":"+strconv.Itoa(metricsPort), "/metrics") metricsServer.Start() defer metricsServer.Shutdown() defer outerStore.Shutdown() defer underlyingStore.Shutdown() interruptChan := make(chan os.Signal, 1) signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) <-interruptChan // deferred shutdowns happen now cleanerStopper.StopAndWait() } func setupStore() store.BlobStore { var s store.BlobStore if proxyAddress != "" { switch proxyProtocol { case "tcp": s = peer.NewStore(peer.StoreOpts{ Address: proxyAddress + ":" + proxyPort, Timeout: 30 * time.Second, }) case "http3": s = http3.NewStore(http3.StoreOpts{ Address: proxyAddress + ":" + proxyPort, Timeout: 30 * time.Second, }) default: log.Fatalf("protocol is not recognized: %s", proxyProtocol) } } else { var s3Store *store.S3Store if conf != "none" { s3Store = store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) } if cloudFrontEndpoint != "" { cfs := store.NewCloudFrontROStore(cloudFrontEndpoint) if s3Store != nil { s = store.NewCloudFrontRWStore(cfs, s3Store) } else { s = cfs } } else if s3Store != nil { s = s3Store } else { log.Fatalf("this configuration does not include a valid upstream source") } } if useDB { dbInst := &db.SQL{ TrackAccess: db.TrackAccessStreams, LogQueries: log.GetLevel() == log.DebugLevel, } err := dbInst.Connect(globalConfig.DBConn) if err != nil { log.Fatal(err) } s = store.NewDBBackedStore(s, dbInst, false) } return s } func wrapWithCache(s store.BlobStore, cleanerStopper *stop.Group) store.BlobStore { wrapped := s diskCacheMaxSize, diskCachePath := diskCacheParams(reflectorCmdDiskCache) //we are tracking blobs in memory with a 1 byte long boolean, which means that for each 2MB (a blob) we need 1Byte // so if the underlying cache holds 10MB, 10MB/2MB=5Bytes which is also the exact count of objects to restore on startup realCacheSize := float64(diskCacheMaxSize) / float64(stream.MaxBlobSize) if diskCacheMaxSize > 0 { err := os.MkdirAll(diskCachePath, os.ModePerm) if err != nil { log.Fatal(err) } localDb := &db.SQL{ SoftDelete: true, TrackAccess: db.TrackAccessBlobs, LogQueries: log.GetLevel() == log.DebugLevel, } err = localDb.Connect("reflector:reflector@tcp(localhost:3306)/reflector") if err != nil { log.Fatal(err) } dbBackedDiskStore := store.NewDBBackedStore(store.NewDiskStore(diskCachePath, 2), localDb, true) wrapped = store.NewCachingStore( "reflector", wrapped, dbBackedDiskStore, ) go cleanOldestBlobs(int(realCacheSize), localDb, dbBackedDiskStore, cleanerStopper) } diskCacheMaxSize, diskCachePath = diskCacheParams(bufferReflectorCmdDiskCache) realCacheSize = float64(diskCacheMaxSize) / float64(stream.MaxBlobSize) if diskCacheMaxSize > 0 { err := os.MkdirAll(diskCachePath, os.ModePerm) if err != nil { log.Fatal(err) } wrapped = store.NewCachingStore( "reflector", wrapped, store.NewLFUDAStore("nvme", store.NewDiskStore(diskCachePath, 2), realCacheSize), ) } if reflectorCmdMemCache > 0 { wrapped = store.NewCachingStore( "reflector", wrapped, store.NewLRUStore("mem", store.NewMemStore(), reflectorCmdMemCache), ) } return wrapped } func diskCacheParams(diskParams string) (int, string) { if diskParams == "" { return 0, "" } parts := strings.Split(diskParams, ":") if len(parts) != 2 { log.Fatalf("--disk-cache must be a number, followed by ':', followed by a string") } diskCacheSize := parts[0] path := parts[1] if len(path) == 0 || path[0] != '/' { log.Fatalf("--disk-cache path must start with '/'") } var maxSize datasize.ByteSize err := maxSize.UnmarshalText([]byte(diskCacheSize)) if err != nil { log.Fatal(err) } if maxSize <= 0 { log.Fatal("--disk-cache size must be more than 0") } return int(maxSize), path } func cleanOldestBlobs(maxItems int, db *db.SQL, store store.BlobStore, stopper *stop.Group) { const cleanupInterval = 10 * time.Minute for { select { case <-stopper.Ch(): log.Infoln("stopping self cleanup") return case <-time.After(cleanupInterval): err := doClean(maxItems, db, store, stopper) if err != nil { log.Error(errors.FullTrace(err)) } } } } func doClean(maxItems int, db *db.SQL, store store.BlobStore, stopper *stop.Group) error { blobsCount, err := db.Count() if err != nil { return err } if blobsCount >= maxItems { itemsToDelete := blobsCount / 10 blobs, err := db.LeastRecentlyAccessedHashes(itemsToDelete) if err != nil { return err } for _, hash := range blobs { select { case <-stopper.Ch(): return nil default: } err = store.Delete(hash) if err != nil { return err } } } return nil }