From 7b3cca45b4398ebfa140fdeb51f0b737f5ac82af Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Fri, 22 Aug 2025 01:56:39 +0200 Subject: [PATCH] update readme remove dead code --- cmd/peer.go.old | 68 ------ cmd/{reflector2.go => reflector.go} | 0 cmd/reflector.go.old | 361 ---------------------------- cmd/start.go.old | 109 --------- readme.md | 271 +++++++++++++-------- 5 files changed, 168 insertions(+), 641 deletions(-) delete mode 100644 cmd/peer.go.old rename cmd/{reflector2.go => reflector.go} (100%) delete mode 100644 cmd/reflector.go.old delete mode 100644 cmd/start.go.old diff --git a/cmd/peer.go.old b/cmd/peer.go.old deleted file mode 100644 index 6bd781e..0000000 --- a/cmd/peer.go.old +++ /dev/null @@ -1,68 +0,0 @@ -package cmd - -import ( - "fmt" - "os" - "os/signal" - "syscall" - - "github.com/lbryio/reflector.go/db" - "github.com/lbryio/reflector.go/server/peer" - "github.com/lbryio/reflector.go/store" - - log "github.com/sirupsen/logrus" - "github.com/spf13/cobra" -) - -var peerNoDB bool - -func init() { - var cmd = &cobra.Command{ - Use: "peer", - Short: "Run peer server", - Run: peerCmd, - } - cmd.Flags().BoolVar(&peerNoDB, "nodb", false, "Don't connect to a db and don't use a db-backed blob store") - rootCmd.AddCommand(cmd) -} - -func peerCmd(cmd *cobra.Command, args []string) { - var err error - - s3 := store.NewS3Store(store.S3Params{ - Name: "peer", - AwsID: globalConfig.AwsID, - AwsSecret: globalConfig.AwsSecret, - Region: globalConfig.BucketRegion, - Bucket: globalConfig.BucketName, - Endpoint: globalConfig.S3Endpoint, - }) - peerServer := peer.NewServer(s3, fmt.Sprintf(":%d", peer.DefaultPort)) - - if !peerNoDB { - db := &db.SQL{ - LogQueries: log.GetLevel() == log.DebugLevel, - } - err = db.Connect(globalConfig.DBConn) - checkErr(err) - - combo := store.NewDBBackedStore(store.DBBackedParams{ - Name: "peer", - Store: s3, - DB: db, - DeleteOnMiss: false, - MaxSize: nil, - }) - peerServer = peer.NewServer(combo, fmt.Sprintf(":%d", peer.DefaultPort)) - } - - err = peerServer.Start() - if err != nil { - log.Fatal(err) - } - - interruptChan := make(chan os.Signal, 1) - signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) - <-interruptChan - peerServer.Shutdown() -} diff --git a/cmd/reflector2.go b/cmd/reflector.go similarity index 100% rename from cmd/reflector2.go rename to cmd/reflector.go diff --git a/cmd/reflector.go.old b/cmd/reflector.go.old deleted file mode 100644 index cd7df16..0000000 --- a/cmd/reflector.go.old +++ /dev/null @@ -1,361 +0,0 @@ -package cmd - -import ( - "fmt" - "os" - "os/signal" - "strconv" - "strings" - "syscall" - "time" - - "github.com/lbryio/lbry.go/v2/extras/util" - "github.com/lbryio/reflector.go/db" - "github.com/lbryio/reflector.go/internal/metrics" - "github.com/lbryio/reflector.go/meta" - "github.com/lbryio/reflector.go/reflector" - "github.com/lbryio/reflector.go/server/http" - "github.com/lbryio/reflector.go/server/http3" - "github.com/lbryio/reflector.go/server/peer" - "github.com/lbryio/reflector.go/store" - - "github.com/lbryio/lbry.go/v2/stream" - - "github.com/c2h5oh/datasize" - log "github.com/sirupsen/logrus" - "github.com/spf13/cobra" -) - -var ( - //port configuration - tcpPeerPort int - http3PeerPort int - httpPeerPort int - receiverPort int - metricsPort int - - //flags configuration - disableUploads bool - disableBlocklist bool - useDB bool - - //upstream configuration - upstreamReflector string - upstreamProtocol string - upstreamEdgeToken string - - //downstream configuration - requestQueueSize int - - //upstream edge configuration (to "cold" storage) - originEndpoint string - originEndpointFallback string - - //cache configuration - diskCache string - secondaryDiskCache string - memCache int -) -var cacheManagers = []string{"localdb", "lfu", "arc", "lru", "simple"} - -var cacheMangerToGcache = map[string]store.EvictionStrategy{ - "lfu": store.LFU, - "arc": store.ARC, - "lru": store.LRU, - "simple": store.SIMPLE, -} - -func init() { - var cmd = &cobra.Command{ - Use: "reflector", - Short: "Run reflector server", - Run: reflectorCmd, - } - - cmd.Flags().IntVar(&tcpPeerPort, "tcp-peer-port", 5567, "The port reflector will distribute content from for the TCP (LBRY) protocol") - cmd.Flags().IntVar(&http3PeerPort, "http3-peer-port", 5568, "The port reflector will distribute content from over HTTP3 protocol") - cmd.Flags().IntVar(&httpPeerPort, "http-peer-port", 5569, "The port reflector will distribute content from over HTTP protocol") - cmd.Flags().IntVar(&receiverPort, "receiver-port", 5566, "The port reflector will receive content from") - cmd.Flags().IntVar(&metricsPort, "metrics-port", 2112, "The port reflector will use for prometheus metrics") - - cmd.Flags().BoolVar(&disableUploads, "disable-uploads", false, "Disable uploads to this reflector server") - cmd.Flags().BoolVar(&disableBlocklist, "disable-blocklist", false, "Disable blocklist watching/updating") - cmd.Flags().BoolVar(&useDB, "use-db", true, "Whether to connect to the reflector db or not") - - cmd.Flags().StringVar(&upstreamReflector, "upstream-reflector", "", "host:port of a reflector server where blobs are fetched from") - cmd.Flags().StringVar(&upstreamProtocol, "upstream-protocol", "http", "protocol used to fetch blobs from another upstream reflector server (tcp/http3/http)") - cmd.Flags().StringVar(&upstreamEdgeToken, "upstream-edge-token", "", "token used to retrieve/authenticate protected content") - - cmd.Flags().IntVar(&requestQueueSize, "request-queue-size", 200, "How many concurrent requests from downstream should be handled at once (the rest will wait)") - - cmd.Flags().StringVar(&originEndpoint, "origin-endpoint", "", "HTTP edge endpoint for standard HTTP retrieval") - cmd.Flags().StringVar(&originEndpointFallback, "origin-endpoint-fallback", "", "HTTP edge endpoint for standard HTTP retrieval if first origin fails") - - cmd.Flags().StringVar(&diskCache, "disk-cache", "100GB:/tmp/downloaded_blobs:localdb", "Where to cache blobs on the file system. format is 'sizeGB:CACHE_PATH:cachemanager' (cachemanagers: localdb/lfu/arc/lru)") - cmd.Flags().StringVar(&secondaryDiskCache, "optional-disk-cache", "", "Optional secondary file system cache for blobs. format is 'sizeGB:CACHE_PATH:cachemanager' (cachemanagers: localdb/lfu/arc/lru) (this would get hit before the one specified in disk-cache)") - cmd.Flags().IntVar(&memCache, "mem-cache", 0, "enable in-memory cache with a max size of this many blobs") - - rootCmd.AddCommand(cmd) -} - -func reflectorCmd(cmd *cobra.Command, args []string) { - log.Printf("reflector %s", meta.VersionString()) - - // the blocklist logic requires the db backed store to be the outer-most store - underlyingStore := initStores() - underlyingStoreWithCaches := initCaches(underlyingStore) - - if !disableUploads { - reflectorServer := reflector.NewServer(underlyingStore, underlyingStoreWithCaches) - reflectorServer.Timeout = 3 * time.Minute - reflectorServer.EnableBlocklist = !disableBlocklist - - err := reflectorServer.Start(":" + strconv.Itoa(receiverPort)) - if err != nil { - log.Fatal(err) - } - defer reflectorServer.Shutdown() - } - - peerServer := peer.NewServer(underlyingStoreWithCaches, fmt.Sprintf(":%d", tcpPeerPort)) - err := peerServer.Start() - if err != nil { - log.Fatal(err) - } - defer peerServer.Shutdown() - - http3PeerServer := http3.NewServer(underlyingStoreWithCaches, requestQueueSize, fmt.Sprintf(":%d", http3PeerPort)) - err = http3PeerServer.Start() - if err != nil { - log.Fatal(err) - } - defer http3PeerServer.Shutdown() - - httpServer := http.NewServer(store.WithSingleFlight("sf-http", underlyingStoreWithCaches), requestQueueSize, upstreamEdgeToken, fmt.Sprintf(":%d", httpPeerPort)) - err = httpServer.Start() - if err != nil { - log.Fatal(err) - } - defer httpServer.Shutdown() - - metricsServer := metrics.NewServer(":"+strconv.Itoa(metricsPort), "/metrics") - metricsServer.Start() - defer metricsServer.Shutdown() - defer underlyingStoreWithCaches.Shutdown() - defer underlyingStore.Shutdown() //do we actually need this? Oo - - interruptChan := make(chan os.Signal, 1) - signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) - <-interruptChan -} - -func initUpstreamStore() store.BlobStore { - var s store.BlobStore - if upstreamReflector == "" { - return nil - } - switch upstreamProtocol { - case "tcp": - s = store.NewPeerStore(store.PeerParams{ - Name: "reflector", - Address: upstreamReflector, - Timeout: 30 * time.Second, - }) - case "http3": - s = store.NewHttp3Store(store.Http3Params{ - Name: "reflector", - Address: upstreamReflector, - Timeout: 30 * time.Second, - }) - case "http": - s = store.NewUpstreamStore(store.UpstreamParams{ - Name: "reflector", - Upstream: upstreamReflector, - EdgeToken: upstreamEdgeToken, - }) - default: - log.Fatalf("protocol is not recognized: %s", upstreamProtocol) - } - return s -} - -func initEdgeStore() store.BlobStore { - var s3Store *store.S3Store - var s store.BlobStore - - if conf != "none" { - s3Store = store.NewS3Store(store.S3Params{ - Name: "reflector", - AwsID: globalConfig.AwsID, - AwsSecret: globalConfig.AwsSecret, - Region: globalConfig.BucketRegion, - Bucket: globalConfig.BucketName, - Endpoint: globalConfig.S3Endpoint, - }) - } - if originEndpointFallback != "" && originEndpoint != "" { - ittt := store.NewITTTStore(store.ITTTParams{ - Name: "reflector", - This: store.NewHttpStore(store.HttpParams{ - Name: "owns3", - Endpoint: originEndpoint, - ShardingSize: 0, - }), - That: store.NewHttpStore(store.HttpParams{ - Name: "wasabi", - Endpoint: originEndpointFallback, - ShardingSize: 0, - }), - }) - if s3Store != nil { - s = store.NewProxiedS3Store(store.ProxiedS3Params{ - Name: "reflector", - Reader: ittt, - Writer: s3Store, - }) - } else { - s = ittt - } - } else if s3Store != nil { - s = s3Store - } else { - log.Fatalf("this configuration does not include a valid upstream source") - } - return s -} - -func initDBStore(s store.BlobStore) store.BlobStore { - if useDB { - dbInst := &db.SQL{ - TrackingLevel: db.TrackAccessStreams, - LogQueries: log.GetLevel() == log.DebugLevel, - } - err := dbInst.Connect(globalConfig.DBConn) - if err != nil { - log.Fatal(err) - } - s = store.NewDBBackedStore(store.DBBackedParams{ - Name: "global", - Store: s, - DB: dbInst, - DeleteOnMiss: false, - MaxSize: nil, - }) - } - return s -} - -func initStores() store.BlobStore { - s := initUpstreamStore() - if s == nil { - s = initEdgeStore() - } - s = initDBStore(s) - return s -} - -// initCaches returns a store wrapped with caches and a stop group to execute a clean shutdown -func initCaches(s store.BlobStore) store.BlobStore { - diskStore := initDiskStore(s, diskCache) - finalStore := initDiskStore(diskStore, secondaryDiskCache) - if memCache > 0 { - finalStore = store.NewCachingStore(store.CachingParams{ - Name: "reflector", - Origin: finalStore, - Cache: store.NewGcacheStore(store.GcacheParams{ - Name: "volatile-cache", - Store: store.NewMemStore(store.MemParams{Name: "volatile-cache"}), - MaxSize: memCache, - Strategy: store.LRU, - }), - }) - } - return finalStore -} - -func initDiskStore(upstreamStore store.BlobStore, diskParams string) store.BlobStore { - diskCacheMaxSize, diskCachePath, cacheManager := diskCacheParams(diskParams) - //we are tracking blobs in memory with a 1 byte long boolean, which means that for each 2MB (a blob) we need 1Byte - // so if the underlying cache holds 10MB, 10MB/2MB=5Bytes which is also the exact count of objects to restore on startup - realCacheSize := int(float64(diskCacheMaxSize) / float64(stream.MaxBlobSize)) - if diskCacheMaxSize == 0 { - return upstreamStore - } - err := os.MkdirAll(diskCachePath, os.ModePerm) - if err != nil { - log.Fatal(err) - } - - diskStore := store.NewDiskStore(store.DiskParams{ - Name: "big-drive", - MountPoint: diskCachePath, - ShardingSize: 2, - }) - var unwrappedStore store.BlobStore - - if cacheManager == "localdb" { - localDb := &db.SQL{ - SoftDelete: true, - TrackingLevel: db.TrackAccessBlobs, - LogQueries: log.GetLevel() == log.DebugLevel, - } - err = localDb.Connect("reflector:reflector@tcp(localhost:3306)/reflector") - if err != nil { - log.Fatal(err) - } - unwrappedStore = store.NewDBBackedStore(store.DBBackedParams{ - Name: "local", - Store: diskStore, - DB: localDb, - DeleteOnMiss: true, - MaxSize: &realCacheSize, - }) - } else { - unwrappedStore = store.NewGcacheStore(store.GcacheParams{ - Name: "flash", - Store: store.NewDiskStore(store.DiskParams{Name: "flash", MountPoint: diskCachePath, ShardingSize: 2}), - MaxSize: realCacheSize, - Strategy: cacheMangerToGcache[cacheManager], - }) - } - - wrapped := store.NewCachingStore(store.CachingParams{ - Name: "reflector", - Origin: upstreamStore, - Cache: unwrappedStore, - }) - return wrapped -} - -func diskCacheParams(diskParams string) (int, string, string) { - if diskParams == "" { - return 0, "", "" - } - - parts := strings.Split(diskParams, ":") - if len(parts) != 3 { - log.Fatalf("%s does is formatted incorrectly. Expected format: 'sizeGB:CACHE_PATH:cachemanager' for example: '100GB:/tmp/downloaded_blobs:localdb'", diskParams) - } - - diskCacheSize := parts[0] - path := parts[1] - cacheManager := parts[2] - - if len(path) == 0 || path[0] != '/' { - log.Fatalf("disk cache paths must start with '/'") - } - - if !util.InSlice(cacheManager, cacheManagers) { - log.Fatalf("specified cache manager '%s' is not supported. Use one of the following: %v", cacheManager, cacheManagers) - } - - var maxSize datasize.ByteSize - err := maxSize.UnmarshalText([]byte(diskCacheSize)) - if err != nil { - log.Fatal(err) - } - if maxSize <= 0 { - log.Fatal("disk cache size must be more than 0") - } - return int(maxSize), path, cacheManager -} diff --git a/cmd/start.go.old b/cmd/start.go.old deleted file mode 100644 index 76e8eaa..0000000 --- a/cmd/start.go.old +++ /dev/null @@ -1,109 +0,0 @@ -package cmd - -import ( - "os" - "os/signal" - "strconv" - "strings" - "syscall" - - "github.com/lbryio/reflector.go/cluster" - "github.com/lbryio/reflector.go/db" - "github.com/lbryio/reflector.go/prism" - "github.com/lbryio/reflector.go/reflector" - "github.com/lbryio/reflector.go/server/peer" - "github.com/lbryio/reflector.go/store" - - "github.com/lbryio/lbry.go/v2/dht" - "github.com/lbryio/lbry.go/v2/dht/bits" - - log "github.com/sirupsen/logrus" - "github.com/spf13/cobra" -) - -const ( - startNewCluster = "new" -) - -var ( - startClusterPort int - startPeerPort int - startReflectorPort int - startDhtPort int - startDhtSeeds []string - startHashRange string -) - -func init() { - var cmd = &cobra.Command{ - Use: `start [cluster-address|"new"]`, - Short: "Runs full prism application with cluster, dht, peer server, and reflector server.", - Run: startCmd, - Args: cobra.ExactArgs(1), - } - cmd.PersistentFlags().IntVar(&startClusterPort, "cluster-port", cluster.DefaultPort, "Port that cluster listens on") - cmd.PersistentFlags().IntVar(&startPeerPort, "peer-port", peer.DefaultPort, "Port to start peer protocol on") - cmd.PersistentFlags().IntVar(&startReflectorPort, "reflector-port", reflector.DefaultPort, "Port to start reflector protocol on") - cmd.PersistentFlags().IntVar(&startDhtPort, "dht-port", dht.DefaultPort, "Port that dht will listen on") - cmd.PersistentFlags().StringSliceVar(&startDhtSeeds, "dht-seeds", []string{}, "Comma-separated list of dht seed nodes (addr:port,addr:port,...)") - - cmd.PersistentFlags().StringVar(&startHashRange, "hash-range", "", "Limit on range of hashes to announce (start-end)") - - rootCmd.AddCommand(cmd) -} - -func startCmd(cmd *cobra.Command, args []string) { - db := &db.SQL{ - LogQueries: log.GetLevel() == log.DebugLevel, - } - err := db.Connect(globalConfig.DBConn) - checkErr(err) - s3 := store.NewS3Store(store.S3Params{ - Name: "prism", - AwsID: globalConfig.AwsID, - AwsSecret: globalConfig.AwsSecret, - Region: globalConfig.BucketRegion, - Bucket: globalConfig.BucketName, - Endpoint: globalConfig.S3Endpoint, - }) - comboStore := store.NewDBBackedStore(store.DBBackedParams{Name: "global", Store: s3, DB: db, DeleteOnMiss: false, MaxSize: nil}) - - conf := prism.DefaultConf() - - // TODO: args we need: - // minNodes - minimum number of nodes before announcing starts. otherwise first node will try to announce all the blobs in the db - // or maybe we should do maxHashesPerNode? - // in either case, this should not kill the cluster, but should only limit announces (and notify when some hashes are being left unannounced) - - if args[0] != startNewCluster { - conf.ClusterSeedAddr = args[0] - } - - conf.DB = db - conf.Blobs = comboStore - conf.DhtAddress = "0.0.0.0:" + strconv.Itoa(startDhtPort) - conf.DhtSeedNodes = startDhtSeeds - conf.ClusterPort = startClusterPort - conf.PeerPort = startPeerPort - conf.ReflectorPort = startReflectorPort - - if startHashRange != "" { - hashRange := strings.Split(startHashRange, "-") - if len(hashRange) != 2 { - log.Fatal("invalid hash range") - } - r := bits.Range{Start: bits.FromShortHexP(hashRange[0]), End: bits.FromShortHexP(hashRange[1])} - conf.HashRange = &r - } - - p := prism.New(conf) - err = p.Start() - if err != nil { - log.Fatal(err) - } - - interruptChan := make(chan os.Signal, 1) - signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) - <-interruptChan - p.Shutdown() -} diff --git a/readme.md b/readme.md index 7c3ffca..3e51a8b 100644 --- a/readme.md +++ b/readme.md @@ -1,124 +1,189 @@ # Reflector -Reflector is a central piece of software that providers LBRY with the following features: -- Blobs reflection: when something is published, we capture the data and store it on our servers for quicker retrieval -- Blobs distribution: when a piece of content is requested and the LBRY network doesn't have it, reflector will retrieve it from its storage and distribute it -- Blobs caching: reflectors can be chained together in multiple regions or servers to form a chain of cached content. We call those "blobcaches". They are layered so that content distribution is favorable in all the regions we deploy it to +Production-ready blob reflection, distribution, and caching for Odysee. -There are a few other features embedded in reflector.go including publishing streams from Go, downloading or upload blobs, resolving content and more unfinished tools. +This repository provides the components used in production: +- Reflector ingestion server (command name: `reflector`) +- Blob cache/edge server (`blobcache`) +- Uploader to object storage (`upload`) -This code includes a Go implementations of the LBRY peer protocol, reflector protocol, and DHT. +Other commands exist in the tree for historical/legacy reasons and are not supported. -## Installation +## How it works (at a glance) +- Ingestion (reflector): accepts uploaded blobs, persists them to object storage (e.g., S3/Wasabi) and tracks state in MySQL. +- Distribution: serves blobs over HTTP/HTTP3/Peer. Blobcaches can be deployed in front of the origin to reduce latency and egress. +- Caching (blobcache): layered disk caches backed by HTTP(S) origins (e.g., S3 endpoints), with optional local DB metadata for capacity/eviction. -- Install mysql 8 (5.7 might work too) -- add a reflector user and database with password `reflector` with localhost access only -- Create the tables as described [here](https://github.com/lbryio/reflector.go/blob/master/db/db.go#L735) (the link might not update as the code does so just look for the schema in that file) +All services are started by the `prism` binary and are configured via YAML files loaded from a configuration directory. -#### We do not support running reflector.go as a blob receiver, however if you want to run it as a private blobcache you may compile it yourself and run it as following: +## Supported commands +The following are the only supported commands for production use: + +- Reflector ingestion: `prism reflector` + - Flags: `--receiver-port` (default 5566), `--metrics-port` (default 2112), `--disable-blocklist` + - Loads `reflector.yaml` from the config directory. + +- Blob cache: `prism blobcache` + - Flags: `--metrics-port` (default 2112), `--disable-blocklist` + - Loads `blobcache.yaml` from the config directory. + +- Uploader: `prism upload PATH` + - Flags: `--workers`, `--skipExistsCheck`, `--deleteBlobsAfterUpload` + - Loads `upload.yaml` from the config directory. + +Global flag for all commands: +- `--conf-dir` (default `./`): directory containing YAML config files. + +## Configuration +Configuration is per-command. The loader reads `.yaml` from `--conf-dir`. + +Common sections: +- `servers`: enables HTTP/HTTP3/Peer servers. Keys: `http`, `http3`, `peer`. Each accepts: + - `port` (int) + - `max_concurrent_requests` (int, http/http3) + - `edge_token` (string, http) + - `address` (string, optional; bind address, omit for all interfaces) +- `store`: defines the storage topology using composable stores. Frequently used: + - `proxied-s3`: production pattern with a `writer` (DB-backed -> S3/multiwriter) and a `reader` (caching -> disk + HTTP origins). + - `caching`: layered cache with a `cache` (often `db_backed` -> `disk`) and an `origin` chain (`http`, `http3`, or `ittt` fan-in). + - `s3`, `disk`, `multiwriter`, `db_backed`, `http`, `http3`, `peer`, `upstream` are also available building blocks. + +### Minimal examples +Reflector – conf-dir contains `reflector.yaml`: +```yaml +servers: + http: + port: 5569 + max_concurrent_requests: 200 + http3: + port: 5568 + max_concurrent_requests: 200 + peer: + port: 5567 +store: + proxied-s3: + name: s3_read_proxy + writer: + db_backed: + user: reflector + password: reflector + database: reflector + host: localhost + port: 3306 + access_tracking: 1 + soft_deletes: true + store: + s3: + name: primary + aws_id: YOUR_KEY + aws_secret: YOUR_SECRET + region: us-east-1 + bucket: blobs-bucket + endpoint: https://s3.yourendpoint.tv + reader: + caching: + cache: + disk: + name: local_cache + mount_point: /mnt/reflector/cache + sharding_size: 2 + origin: + http: + endpoint: https://s3.yourendpoint.tv/blobs-bucket/ + sharding_size: 4 +``` + +Blobcache – conf-dir contains `blobcache.yaml`: +```yaml +servers: + http: + port: 5569 + max_concurrent_requests: 200 + http3: + port: 5568 + max_concurrent_requests: 200 + peer: + port: 5567 +store: + caching: + cache: + db_backed: + user: reflector + password: reflector + database: reflector + host: localhost + port: 3306 + has_cap: true + max_size: 500GB + store: + disk: + name: blobcache + mount_point: /mnt/blobcache/cache + sharding_size: 2 + origin: + http: + endpoint: https://s3.yourendpoint.tv/blobs-bucket/ + sharding_size: 4 +``` + +Uploader – conf-dir contains `upload.yaml` (points to the same writer/backend as reflector): +```yaml +database: + user: reflector + password: reflector + database: reflector + host: localhost + port: 3306 +store: + proxied-s3: + writer: + db_backed: + user: reflector + password: reflector + database: reflector + host: localhost + port: 3306 + store: + s3: + aws_id: YOUR_KEY + aws_secret: YOUR_SECRET + region: us-east-1 + bucket: blobs-bucket + endpoint: https://s3.yourendpoint.tv +``` + +## Quick start +1) Build +- Requires Go 1.23+ +- `make` (binaries in `dist//prism-bin`) + +2) Run a local blobcache ```bash -./prism-bin reflector \ ---conf="none" \ ---disable-uploads=true \ ---use-db=false \ ---upstream-reflector="reflector.lbry.com" \ ---upstream-protocol="http" \ ---request-queue-size=200 \ ---disk-cache="2GB:/path/to/your/storage/:localdb" \ +./dist/linux_amd64/prism-bin --conf-dir=./ blobcache ``` +Place your `blobcache.yaml` in the `--conf-dir` directory. -Create a systemd script if you want to run it automatically on startup or as a service. - -## Usage - -Usage as reflector/blobcache: +3) Run reflector ingestion ```bash -Run reflector server - -Usage: - prism reflector [flags] - -Flags: - --disable-blocklist Disable blocklist watching/updating - --disable-uploads Disable uploads to this reflector server - --disk-cache string Where to cache blobs on the file system. format is 'sizeGB:CACHE_PATH:cachemanager' (cachemanagers: localdb/lfuda/lru) (default "100GB:/tmp/downloaded_blobs:localdb") - -h, --help help for reflector - --http-peer-port int The port reflector will distribute content from over HTTP protocol (default 5569) - --http3-peer-port int The port reflector will distribute content from over HTTP3 protocol (default 5568) - --mem-cache int enable in-memory cache with a max size of this many blobs - --metrics-port int The port reflector will use for prometheus metrics (default 2112) - --optional-disk-cache string Optional secondary file system cache for blobs. format is 'sizeGB:CACHE_PATH:cachemanager' (cachemanagers: localdb/lfuda/lru) (this would get hit before the one specified in disk-cache) - --origin-endpoint string HTTP edge endpoint for standard HTTP retrieval - --origin-endpoint-fallback string HTTP edge endpoint for standard HTTP retrieval if first origin fails - --receiver-port int The port reflector will receive content from (default 5566) - --request-queue-size int How many concurrent requests from downstream should be handled at once (the rest will wait) (default 200) - --tcp-peer-port int The port reflector will distribute content from for the TCP (LBRY) protocol (default 5567) - --upstream-protocol string protocol used to fetch blobs from another upstream reflector server (tcp/http3/http) (default "http") - --upstream-reflector string host:port of a reflector server where blobs are fetched from - --use-db Whether to connect to the reflector db or not (default true) - -Global Flags: - --conf string Path to config. Use 'none' to disable (default "config.json") - -v, --verbose strings Verbose logging for specific components +./dist/linux_amd64/prism-bin --conf-dir=./ reflector --receiver-port=5566 --metrics-port=2112 ``` -Other uses: - +4) Upload blobs ```bash -Prism is a single entry point application with multiple sub modules which can be leveraged individually or together - -Usage: - prism [command] - -Available Commands: - check-integrity check blobs integrity for a given path - cluster Start(join) to or Start a new cluster - decode Decode a claim value - dht Run dht node - getstream Get a stream from a reflector server - help Help about any command - peer Run peer server - populate-db populate local database with blobs from a disk storage - publish Publish a file - reflector Run reflector server - resolve Resolve a URL - send Send a file to a reflector - sendblob Send a random blob to a reflector server - start Runs full prism application with cluster, dht, peer server, and reflector server. - test Test things - upload Upload blobs to S3 - version Print the version - -Flags: - --conf string Path to config. Use 'none' to disable (default "config.json") - -h, --help help for prism - -v, --verbose strings Verbose logging for specific components -``` -## Running from Source - -This project requires [Go v1.23](https://golang.org/doc/install). - -On Ubuntu you can install it with `sudo snap install go --classic` - -``` -git clone git@github.com:lbryio/reflector.go.git -cd reflector.go -make -./dist/linux_amd64/prism-bin +./dist/linux_amd64/prism-bin --conf-dir=./ upload /path/to/blobs \ + --workers=4 --skipExistsCheck ``` -## Contributing - -coming soon - -## License - -This project is MIT licensed. +## Notes +- Only reflector, blobcache, and upload are supported. All other commands are legacy and may be removed in the future. +- Metrics are exposed on the configured `--metrics-port` at `/metrics` (Prometheus format). +- MySQL is required when using DB-backed stores (e.g., ingestion writer, capacity-aware caches). ## Security +If you discover a security issue, please email security@lbry.com. Our PGP key is available at https://lbry.com/faq/pgp-key. -We take security seriously. Please contact security@lbry.com regarding any security issues. -Our PGP key is [here](https://lbry.com/faq/pgp-key) if you need it. +## License +MIT License. See LICENSE. ## Contact -The primary contact for this project is [@Nikooo777](https://github.com/Nikooo777) (niko-at-lbry.com) +The primary contact for this project is [@Nikooo777](https://github.com/Nikooo777)