mirror of
https://github.com/LBRYFoundation/reflector.go.git
synced 2025-09-21 02:19:46 +00:00
update readme
remove dead code
This commit is contained in:
parent
ad710dfeea
commit
7b3cca45b4
5 changed files with 168 additions and 641 deletions
|
@ -1,68 +0,0 @@
|
||||||
package cmd
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
"os/signal"
|
|
||||||
"syscall"
|
|
||||||
|
|
||||||
"github.com/lbryio/reflector.go/db"
|
|
||||||
"github.com/lbryio/reflector.go/server/peer"
|
|
||||||
"github.com/lbryio/reflector.go/store"
|
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
"github.com/spf13/cobra"
|
|
||||||
)
|
|
||||||
|
|
||||||
var peerNoDB bool
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
var cmd = &cobra.Command{
|
|
||||||
Use: "peer",
|
|
||||||
Short: "Run peer server",
|
|
||||||
Run: peerCmd,
|
|
||||||
}
|
|
||||||
cmd.Flags().BoolVar(&peerNoDB, "nodb", false, "Don't connect to a db and don't use a db-backed blob store")
|
|
||||||
rootCmd.AddCommand(cmd)
|
|
||||||
}
|
|
||||||
|
|
||||||
func peerCmd(cmd *cobra.Command, args []string) {
|
|
||||||
var err error
|
|
||||||
|
|
||||||
s3 := store.NewS3Store(store.S3Params{
|
|
||||||
Name: "peer",
|
|
||||||
AwsID: globalConfig.AwsID,
|
|
||||||
AwsSecret: globalConfig.AwsSecret,
|
|
||||||
Region: globalConfig.BucketRegion,
|
|
||||||
Bucket: globalConfig.BucketName,
|
|
||||||
Endpoint: globalConfig.S3Endpoint,
|
|
||||||
})
|
|
||||||
peerServer := peer.NewServer(s3, fmt.Sprintf(":%d", peer.DefaultPort))
|
|
||||||
|
|
||||||
if !peerNoDB {
|
|
||||||
db := &db.SQL{
|
|
||||||
LogQueries: log.GetLevel() == log.DebugLevel,
|
|
||||||
}
|
|
||||||
err = db.Connect(globalConfig.DBConn)
|
|
||||||
checkErr(err)
|
|
||||||
|
|
||||||
combo := store.NewDBBackedStore(store.DBBackedParams{
|
|
||||||
Name: "peer",
|
|
||||||
Store: s3,
|
|
||||||
DB: db,
|
|
||||||
DeleteOnMiss: false,
|
|
||||||
MaxSize: nil,
|
|
||||||
})
|
|
||||||
peerServer = peer.NewServer(combo, fmt.Sprintf(":%d", peer.DefaultPort))
|
|
||||||
}
|
|
||||||
|
|
||||||
err = peerServer.Start()
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
interruptChan := make(chan os.Signal, 1)
|
|
||||||
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
|
|
||||||
<-interruptChan
|
|
||||||
peerServer.Shutdown()
|
|
||||||
}
|
|
|
@ -1,361 +0,0 @@
|
||||||
package cmd
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
"os/signal"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"syscall"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/extras/util"
|
|
||||||
"github.com/lbryio/reflector.go/db"
|
|
||||||
"github.com/lbryio/reflector.go/internal/metrics"
|
|
||||||
"github.com/lbryio/reflector.go/meta"
|
|
||||||
"github.com/lbryio/reflector.go/reflector"
|
|
||||||
"github.com/lbryio/reflector.go/server/http"
|
|
||||||
"github.com/lbryio/reflector.go/server/http3"
|
|
||||||
"github.com/lbryio/reflector.go/server/peer"
|
|
||||||
"github.com/lbryio/reflector.go/store"
|
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/stream"
|
|
||||||
|
|
||||||
"github.com/c2h5oh/datasize"
|
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
"github.com/spf13/cobra"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
//port configuration
|
|
||||||
tcpPeerPort int
|
|
||||||
http3PeerPort int
|
|
||||||
httpPeerPort int
|
|
||||||
receiverPort int
|
|
||||||
metricsPort int
|
|
||||||
|
|
||||||
//flags configuration
|
|
||||||
disableUploads bool
|
|
||||||
disableBlocklist bool
|
|
||||||
useDB bool
|
|
||||||
|
|
||||||
//upstream configuration
|
|
||||||
upstreamReflector string
|
|
||||||
upstreamProtocol string
|
|
||||||
upstreamEdgeToken string
|
|
||||||
|
|
||||||
//downstream configuration
|
|
||||||
requestQueueSize int
|
|
||||||
|
|
||||||
//upstream edge configuration (to "cold" storage)
|
|
||||||
originEndpoint string
|
|
||||||
originEndpointFallback string
|
|
||||||
|
|
||||||
//cache configuration
|
|
||||||
diskCache string
|
|
||||||
secondaryDiskCache string
|
|
||||||
memCache int
|
|
||||||
)
|
|
||||||
var cacheManagers = []string{"localdb", "lfu", "arc", "lru", "simple"}
|
|
||||||
|
|
||||||
var cacheMangerToGcache = map[string]store.EvictionStrategy{
|
|
||||||
"lfu": store.LFU,
|
|
||||||
"arc": store.ARC,
|
|
||||||
"lru": store.LRU,
|
|
||||||
"simple": store.SIMPLE,
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
var cmd = &cobra.Command{
|
|
||||||
Use: "reflector",
|
|
||||||
Short: "Run reflector server",
|
|
||||||
Run: reflectorCmd,
|
|
||||||
}
|
|
||||||
|
|
||||||
cmd.Flags().IntVar(&tcpPeerPort, "tcp-peer-port", 5567, "The port reflector will distribute content from for the TCP (LBRY) protocol")
|
|
||||||
cmd.Flags().IntVar(&http3PeerPort, "http3-peer-port", 5568, "The port reflector will distribute content from over HTTP3 protocol")
|
|
||||||
cmd.Flags().IntVar(&httpPeerPort, "http-peer-port", 5569, "The port reflector will distribute content from over HTTP protocol")
|
|
||||||
cmd.Flags().IntVar(&receiverPort, "receiver-port", 5566, "The port reflector will receive content from")
|
|
||||||
cmd.Flags().IntVar(&metricsPort, "metrics-port", 2112, "The port reflector will use for prometheus metrics")
|
|
||||||
|
|
||||||
cmd.Flags().BoolVar(&disableUploads, "disable-uploads", false, "Disable uploads to this reflector server")
|
|
||||||
cmd.Flags().BoolVar(&disableBlocklist, "disable-blocklist", false, "Disable blocklist watching/updating")
|
|
||||||
cmd.Flags().BoolVar(&useDB, "use-db", true, "Whether to connect to the reflector db or not")
|
|
||||||
|
|
||||||
cmd.Flags().StringVar(&upstreamReflector, "upstream-reflector", "", "host:port of a reflector server where blobs are fetched from")
|
|
||||||
cmd.Flags().StringVar(&upstreamProtocol, "upstream-protocol", "http", "protocol used to fetch blobs from another upstream reflector server (tcp/http3/http)")
|
|
||||||
cmd.Flags().StringVar(&upstreamEdgeToken, "upstream-edge-token", "", "token used to retrieve/authenticate protected content")
|
|
||||||
|
|
||||||
cmd.Flags().IntVar(&requestQueueSize, "request-queue-size", 200, "How many concurrent requests from downstream should be handled at once (the rest will wait)")
|
|
||||||
|
|
||||||
cmd.Flags().StringVar(&originEndpoint, "origin-endpoint", "", "HTTP edge endpoint for standard HTTP retrieval")
|
|
||||||
cmd.Flags().StringVar(&originEndpointFallback, "origin-endpoint-fallback", "", "HTTP edge endpoint for standard HTTP retrieval if first origin fails")
|
|
||||||
|
|
||||||
cmd.Flags().StringVar(&diskCache, "disk-cache", "100GB:/tmp/downloaded_blobs:localdb", "Where to cache blobs on the file system. format is 'sizeGB:CACHE_PATH:cachemanager' (cachemanagers: localdb/lfu/arc/lru)")
|
|
||||||
cmd.Flags().StringVar(&secondaryDiskCache, "optional-disk-cache", "", "Optional secondary file system cache for blobs. format is 'sizeGB:CACHE_PATH:cachemanager' (cachemanagers: localdb/lfu/arc/lru) (this would get hit before the one specified in disk-cache)")
|
|
||||||
cmd.Flags().IntVar(&memCache, "mem-cache", 0, "enable in-memory cache with a max size of this many blobs")
|
|
||||||
|
|
||||||
rootCmd.AddCommand(cmd)
|
|
||||||
}
|
|
||||||
|
|
||||||
func reflectorCmd(cmd *cobra.Command, args []string) {
|
|
||||||
log.Printf("reflector %s", meta.VersionString())
|
|
||||||
|
|
||||||
// the blocklist logic requires the db backed store to be the outer-most store
|
|
||||||
underlyingStore := initStores()
|
|
||||||
underlyingStoreWithCaches := initCaches(underlyingStore)
|
|
||||||
|
|
||||||
if !disableUploads {
|
|
||||||
reflectorServer := reflector.NewServer(underlyingStore, underlyingStoreWithCaches)
|
|
||||||
reflectorServer.Timeout = 3 * time.Minute
|
|
||||||
reflectorServer.EnableBlocklist = !disableBlocklist
|
|
||||||
|
|
||||||
err := reflectorServer.Start(":" + strconv.Itoa(receiverPort))
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
defer reflectorServer.Shutdown()
|
|
||||||
}
|
|
||||||
|
|
||||||
peerServer := peer.NewServer(underlyingStoreWithCaches, fmt.Sprintf(":%d", tcpPeerPort))
|
|
||||||
err := peerServer.Start()
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
defer peerServer.Shutdown()
|
|
||||||
|
|
||||||
http3PeerServer := http3.NewServer(underlyingStoreWithCaches, requestQueueSize, fmt.Sprintf(":%d", http3PeerPort))
|
|
||||||
err = http3PeerServer.Start()
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
defer http3PeerServer.Shutdown()
|
|
||||||
|
|
||||||
httpServer := http.NewServer(store.WithSingleFlight("sf-http", underlyingStoreWithCaches), requestQueueSize, upstreamEdgeToken, fmt.Sprintf(":%d", httpPeerPort))
|
|
||||||
err = httpServer.Start()
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
defer httpServer.Shutdown()
|
|
||||||
|
|
||||||
metricsServer := metrics.NewServer(":"+strconv.Itoa(metricsPort), "/metrics")
|
|
||||||
metricsServer.Start()
|
|
||||||
defer metricsServer.Shutdown()
|
|
||||||
defer underlyingStoreWithCaches.Shutdown()
|
|
||||||
defer underlyingStore.Shutdown() //do we actually need this? Oo
|
|
||||||
|
|
||||||
interruptChan := make(chan os.Signal, 1)
|
|
||||||
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
|
|
||||||
<-interruptChan
|
|
||||||
}
|
|
||||||
|
|
||||||
func initUpstreamStore() store.BlobStore {
|
|
||||||
var s store.BlobStore
|
|
||||||
if upstreamReflector == "" {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
switch upstreamProtocol {
|
|
||||||
case "tcp":
|
|
||||||
s = store.NewPeerStore(store.PeerParams{
|
|
||||||
Name: "reflector",
|
|
||||||
Address: upstreamReflector,
|
|
||||||
Timeout: 30 * time.Second,
|
|
||||||
})
|
|
||||||
case "http3":
|
|
||||||
s = store.NewHttp3Store(store.Http3Params{
|
|
||||||
Name: "reflector",
|
|
||||||
Address: upstreamReflector,
|
|
||||||
Timeout: 30 * time.Second,
|
|
||||||
})
|
|
||||||
case "http":
|
|
||||||
s = store.NewUpstreamStore(store.UpstreamParams{
|
|
||||||
Name: "reflector",
|
|
||||||
Upstream: upstreamReflector,
|
|
||||||
EdgeToken: upstreamEdgeToken,
|
|
||||||
})
|
|
||||||
default:
|
|
||||||
log.Fatalf("protocol is not recognized: %s", upstreamProtocol)
|
|
||||||
}
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
|
|
||||||
func initEdgeStore() store.BlobStore {
|
|
||||||
var s3Store *store.S3Store
|
|
||||||
var s store.BlobStore
|
|
||||||
|
|
||||||
if conf != "none" {
|
|
||||||
s3Store = store.NewS3Store(store.S3Params{
|
|
||||||
Name: "reflector",
|
|
||||||
AwsID: globalConfig.AwsID,
|
|
||||||
AwsSecret: globalConfig.AwsSecret,
|
|
||||||
Region: globalConfig.BucketRegion,
|
|
||||||
Bucket: globalConfig.BucketName,
|
|
||||||
Endpoint: globalConfig.S3Endpoint,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
if originEndpointFallback != "" && originEndpoint != "" {
|
|
||||||
ittt := store.NewITTTStore(store.ITTTParams{
|
|
||||||
Name: "reflector",
|
|
||||||
This: store.NewHttpStore(store.HttpParams{
|
|
||||||
Name: "owns3",
|
|
||||||
Endpoint: originEndpoint,
|
|
||||||
ShardingSize: 0,
|
|
||||||
}),
|
|
||||||
That: store.NewHttpStore(store.HttpParams{
|
|
||||||
Name: "wasabi",
|
|
||||||
Endpoint: originEndpointFallback,
|
|
||||||
ShardingSize: 0,
|
|
||||||
}),
|
|
||||||
})
|
|
||||||
if s3Store != nil {
|
|
||||||
s = store.NewProxiedS3Store(store.ProxiedS3Params{
|
|
||||||
Name: "reflector",
|
|
||||||
Reader: ittt,
|
|
||||||
Writer: s3Store,
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
s = ittt
|
|
||||||
}
|
|
||||||
} else if s3Store != nil {
|
|
||||||
s = s3Store
|
|
||||||
} else {
|
|
||||||
log.Fatalf("this configuration does not include a valid upstream source")
|
|
||||||
}
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
|
|
||||||
func initDBStore(s store.BlobStore) store.BlobStore {
|
|
||||||
if useDB {
|
|
||||||
dbInst := &db.SQL{
|
|
||||||
TrackingLevel: db.TrackAccessStreams,
|
|
||||||
LogQueries: log.GetLevel() == log.DebugLevel,
|
|
||||||
}
|
|
||||||
err := dbInst.Connect(globalConfig.DBConn)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
s = store.NewDBBackedStore(store.DBBackedParams{
|
|
||||||
Name: "global",
|
|
||||||
Store: s,
|
|
||||||
DB: dbInst,
|
|
||||||
DeleteOnMiss: false,
|
|
||||||
MaxSize: nil,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
|
|
||||||
func initStores() store.BlobStore {
|
|
||||||
s := initUpstreamStore()
|
|
||||||
if s == nil {
|
|
||||||
s = initEdgeStore()
|
|
||||||
}
|
|
||||||
s = initDBStore(s)
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
|
|
||||||
// initCaches returns a store wrapped with caches and a stop group to execute a clean shutdown
|
|
||||||
func initCaches(s store.BlobStore) store.BlobStore {
|
|
||||||
diskStore := initDiskStore(s, diskCache)
|
|
||||||
finalStore := initDiskStore(diskStore, secondaryDiskCache)
|
|
||||||
if memCache > 0 {
|
|
||||||
finalStore = store.NewCachingStore(store.CachingParams{
|
|
||||||
Name: "reflector",
|
|
||||||
Origin: finalStore,
|
|
||||||
Cache: store.NewGcacheStore(store.GcacheParams{
|
|
||||||
Name: "volatile-cache",
|
|
||||||
Store: store.NewMemStore(store.MemParams{Name: "volatile-cache"}),
|
|
||||||
MaxSize: memCache,
|
|
||||||
Strategy: store.LRU,
|
|
||||||
}),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return finalStore
|
|
||||||
}
|
|
||||||
|
|
||||||
func initDiskStore(upstreamStore store.BlobStore, diskParams string) store.BlobStore {
|
|
||||||
diskCacheMaxSize, diskCachePath, cacheManager := diskCacheParams(diskParams)
|
|
||||||
//we are tracking blobs in memory with a 1 byte long boolean, which means that for each 2MB (a blob) we need 1Byte
|
|
||||||
// so if the underlying cache holds 10MB, 10MB/2MB=5Bytes which is also the exact count of objects to restore on startup
|
|
||||||
realCacheSize := int(float64(diskCacheMaxSize) / float64(stream.MaxBlobSize))
|
|
||||||
if diskCacheMaxSize == 0 {
|
|
||||||
return upstreamStore
|
|
||||||
}
|
|
||||||
err := os.MkdirAll(diskCachePath, os.ModePerm)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
diskStore := store.NewDiskStore(store.DiskParams{
|
|
||||||
Name: "big-drive",
|
|
||||||
MountPoint: diskCachePath,
|
|
||||||
ShardingSize: 2,
|
|
||||||
})
|
|
||||||
var unwrappedStore store.BlobStore
|
|
||||||
|
|
||||||
if cacheManager == "localdb" {
|
|
||||||
localDb := &db.SQL{
|
|
||||||
SoftDelete: true,
|
|
||||||
TrackingLevel: db.TrackAccessBlobs,
|
|
||||||
LogQueries: log.GetLevel() == log.DebugLevel,
|
|
||||||
}
|
|
||||||
err = localDb.Connect("reflector:reflector@tcp(localhost:3306)/reflector")
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
unwrappedStore = store.NewDBBackedStore(store.DBBackedParams{
|
|
||||||
Name: "local",
|
|
||||||
Store: diskStore,
|
|
||||||
DB: localDb,
|
|
||||||
DeleteOnMiss: true,
|
|
||||||
MaxSize: &realCacheSize,
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
unwrappedStore = store.NewGcacheStore(store.GcacheParams{
|
|
||||||
Name: "flash",
|
|
||||||
Store: store.NewDiskStore(store.DiskParams{Name: "flash", MountPoint: diskCachePath, ShardingSize: 2}),
|
|
||||||
MaxSize: realCacheSize,
|
|
||||||
Strategy: cacheMangerToGcache[cacheManager],
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
wrapped := store.NewCachingStore(store.CachingParams{
|
|
||||||
Name: "reflector",
|
|
||||||
Origin: upstreamStore,
|
|
||||||
Cache: unwrappedStore,
|
|
||||||
})
|
|
||||||
return wrapped
|
|
||||||
}
|
|
||||||
|
|
||||||
func diskCacheParams(diskParams string) (int, string, string) {
|
|
||||||
if diskParams == "" {
|
|
||||||
return 0, "", ""
|
|
||||||
}
|
|
||||||
|
|
||||||
parts := strings.Split(diskParams, ":")
|
|
||||||
if len(parts) != 3 {
|
|
||||||
log.Fatalf("%s does is formatted incorrectly. Expected format: 'sizeGB:CACHE_PATH:cachemanager' for example: '100GB:/tmp/downloaded_blobs:localdb'", diskParams)
|
|
||||||
}
|
|
||||||
|
|
||||||
diskCacheSize := parts[0]
|
|
||||||
path := parts[1]
|
|
||||||
cacheManager := parts[2]
|
|
||||||
|
|
||||||
if len(path) == 0 || path[0] != '/' {
|
|
||||||
log.Fatalf("disk cache paths must start with '/'")
|
|
||||||
}
|
|
||||||
|
|
||||||
if !util.InSlice(cacheManager, cacheManagers) {
|
|
||||||
log.Fatalf("specified cache manager '%s' is not supported. Use one of the following: %v", cacheManager, cacheManagers)
|
|
||||||
}
|
|
||||||
|
|
||||||
var maxSize datasize.ByteSize
|
|
||||||
err := maxSize.UnmarshalText([]byte(diskCacheSize))
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
if maxSize <= 0 {
|
|
||||||
log.Fatal("disk cache size must be more than 0")
|
|
||||||
}
|
|
||||||
return int(maxSize), path, cacheManager
|
|
||||||
}
|
|
109
cmd/start.go.old
109
cmd/start.go.old
|
@ -1,109 +0,0 @@
|
||||||
package cmd
|
|
||||||
|
|
||||||
import (
|
|
||||||
"os"
|
|
||||||
"os/signal"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"syscall"
|
|
||||||
|
|
||||||
"github.com/lbryio/reflector.go/cluster"
|
|
||||||
"github.com/lbryio/reflector.go/db"
|
|
||||||
"github.com/lbryio/reflector.go/prism"
|
|
||||||
"github.com/lbryio/reflector.go/reflector"
|
|
||||||
"github.com/lbryio/reflector.go/server/peer"
|
|
||||||
"github.com/lbryio/reflector.go/store"
|
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/dht"
|
|
||||||
"github.com/lbryio/lbry.go/v2/dht/bits"
|
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
"github.com/spf13/cobra"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
startNewCluster = "new"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
startClusterPort int
|
|
||||||
startPeerPort int
|
|
||||||
startReflectorPort int
|
|
||||||
startDhtPort int
|
|
||||||
startDhtSeeds []string
|
|
||||||
startHashRange string
|
|
||||||
)
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
var cmd = &cobra.Command{
|
|
||||||
Use: `start [cluster-address|"new"]`,
|
|
||||||
Short: "Runs full prism application with cluster, dht, peer server, and reflector server.",
|
|
||||||
Run: startCmd,
|
|
||||||
Args: cobra.ExactArgs(1),
|
|
||||||
}
|
|
||||||
cmd.PersistentFlags().IntVar(&startClusterPort, "cluster-port", cluster.DefaultPort, "Port that cluster listens on")
|
|
||||||
cmd.PersistentFlags().IntVar(&startPeerPort, "peer-port", peer.DefaultPort, "Port to start peer protocol on")
|
|
||||||
cmd.PersistentFlags().IntVar(&startReflectorPort, "reflector-port", reflector.DefaultPort, "Port to start reflector protocol on")
|
|
||||||
cmd.PersistentFlags().IntVar(&startDhtPort, "dht-port", dht.DefaultPort, "Port that dht will listen on")
|
|
||||||
cmd.PersistentFlags().StringSliceVar(&startDhtSeeds, "dht-seeds", []string{}, "Comma-separated list of dht seed nodes (addr:port,addr:port,...)")
|
|
||||||
|
|
||||||
cmd.PersistentFlags().StringVar(&startHashRange, "hash-range", "", "Limit on range of hashes to announce (start-end)")
|
|
||||||
|
|
||||||
rootCmd.AddCommand(cmd)
|
|
||||||
}
|
|
||||||
|
|
||||||
func startCmd(cmd *cobra.Command, args []string) {
|
|
||||||
db := &db.SQL{
|
|
||||||
LogQueries: log.GetLevel() == log.DebugLevel,
|
|
||||||
}
|
|
||||||
err := db.Connect(globalConfig.DBConn)
|
|
||||||
checkErr(err)
|
|
||||||
s3 := store.NewS3Store(store.S3Params{
|
|
||||||
Name: "prism",
|
|
||||||
AwsID: globalConfig.AwsID,
|
|
||||||
AwsSecret: globalConfig.AwsSecret,
|
|
||||||
Region: globalConfig.BucketRegion,
|
|
||||||
Bucket: globalConfig.BucketName,
|
|
||||||
Endpoint: globalConfig.S3Endpoint,
|
|
||||||
})
|
|
||||||
comboStore := store.NewDBBackedStore(store.DBBackedParams{Name: "global", Store: s3, DB: db, DeleteOnMiss: false, MaxSize: nil})
|
|
||||||
|
|
||||||
conf := prism.DefaultConf()
|
|
||||||
|
|
||||||
// TODO: args we need:
|
|
||||||
// minNodes - minimum number of nodes before announcing starts. otherwise first node will try to announce all the blobs in the db
|
|
||||||
// or maybe we should do maxHashesPerNode?
|
|
||||||
// in either case, this should not kill the cluster, but should only limit announces (and notify when some hashes are being left unannounced)
|
|
||||||
|
|
||||||
if args[0] != startNewCluster {
|
|
||||||
conf.ClusterSeedAddr = args[0]
|
|
||||||
}
|
|
||||||
|
|
||||||
conf.DB = db
|
|
||||||
conf.Blobs = comboStore
|
|
||||||
conf.DhtAddress = "0.0.0.0:" + strconv.Itoa(startDhtPort)
|
|
||||||
conf.DhtSeedNodes = startDhtSeeds
|
|
||||||
conf.ClusterPort = startClusterPort
|
|
||||||
conf.PeerPort = startPeerPort
|
|
||||||
conf.ReflectorPort = startReflectorPort
|
|
||||||
|
|
||||||
if startHashRange != "" {
|
|
||||||
hashRange := strings.Split(startHashRange, "-")
|
|
||||||
if len(hashRange) != 2 {
|
|
||||||
log.Fatal("invalid hash range")
|
|
||||||
}
|
|
||||||
r := bits.Range{Start: bits.FromShortHexP(hashRange[0]), End: bits.FromShortHexP(hashRange[1])}
|
|
||||||
conf.HashRange = &r
|
|
||||||
}
|
|
||||||
|
|
||||||
p := prism.New(conf)
|
|
||||||
err = p.Start()
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
interruptChan := make(chan os.Signal, 1)
|
|
||||||
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
|
|
||||||
<-interruptChan
|
|
||||||
p.Shutdown()
|
|
||||||
}
|
|
271
readme.md
271
readme.md
|
@ -1,124 +1,189 @@
|
||||||
# Reflector
|
# Reflector
|
||||||
|
|
||||||
Reflector is a central piece of software that providers LBRY with the following features:
|
Production-ready blob reflection, distribution, and caching for Odysee.
|
||||||
- Blobs reflection: when something is published, we capture the data and store it on our servers for quicker retrieval
|
|
||||||
- Blobs distribution: when a piece of content is requested and the LBRY network doesn't have it, reflector will retrieve it from its storage and distribute it
|
|
||||||
- Blobs caching: reflectors can be chained together in multiple regions or servers to form a chain of cached content. We call those "blobcaches". They are layered so that content distribution is favorable in all the regions we deploy it to
|
|
||||||
|
|
||||||
There are a few other features embedded in reflector.go including publishing streams from Go, downloading or upload blobs, resolving content and more unfinished tools.
|
This repository provides the components used in production:
|
||||||
|
- Reflector ingestion server (command name: `reflector`)
|
||||||
|
- Blob cache/edge server (`blobcache`)
|
||||||
|
- Uploader to object storage (`upload`)
|
||||||
|
|
||||||
This code includes a Go implementations of the LBRY peer protocol, reflector protocol, and DHT.
|
Other commands exist in the tree for historical/legacy reasons and are not supported.
|
||||||
|
|
||||||
## Installation
|
## How it works (at a glance)
|
||||||
|
- Ingestion (reflector): accepts uploaded blobs, persists them to object storage (e.g., S3/Wasabi) and tracks state in MySQL.
|
||||||
|
- Distribution: serves blobs over HTTP/HTTP3/Peer. Blobcaches can be deployed in front of the origin to reduce latency and egress.
|
||||||
|
- Caching (blobcache): layered disk caches backed by HTTP(S) origins (e.g., S3 endpoints), with optional local DB metadata for capacity/eviction.
|
||||||
|
|
||||||
- Install mysql 8 (5.7 might work too)
|
All services are started by the `prism` binary and are configured via YAML files loaded from a configuration directory.
|
||||||
- add a reflector user and database with password `reflector` with localhost access only
|
|
||||||
- Create the tables as described [here](https://github.com/lbryio/reflector.go/blob/master/db/db.go#L735) (the link might not update as the code does so just look for the schema in that file)
|
|
||||||
|
|
||||||
#### We do not support running reflector.go as a blob receiver, however if you want to run it as a private blobcache you may compile it yourself and run it as following:
|
## Supported commands
|
||||||
|
The following are the only supported commands for production use:
|
||||||
|
|
||||||
|
- Reflector ingestion: `prism reflector`
|
||||||
|
- Flags: `--receiver-port` (default 5566), `--metrics-port` (default 2112), `--disable-blocklist`
|
||||||
|
- Loads `reflector.yaml` from the config directory.
|
||||||
|
|
||||||
|
- Blob cache: `prism blobcache`
|
||||||
|
- Flags: `--metrics-port` (default 2112), `--disable-blocklist`
|
||||||
|
- Loads `blobcache.yaml` from the config directory.
|
||||||
|
|
||||||
|
- Uploader: `prism upload PATH`
|
||||||
|
- Flags: `--workers`, `--skipExistsCheck`, `--deleteBlobsAfterUpload`
|
||||||
|
- Loads `upload.yaml` from the config directory.
|
||||||
|
|
||||||
|
Global flag for all commands:
|
||||||
|
- `--conf-dir` (default `./`): directory containing YAML config files.
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
Configuration is per-command. The loader reads `<command>.yaml` from `--conf-dir`.
|
||||||
|
|
||||||
|
Common sections:
|
||||||
|
- `servers`: enables HTTP/HTTP3/Peer servers. Keys: `http`, `http3`, `peer`. Each accepts:
|
||||||
|
- `port` (int)
|
||||||
|
- `max_concurrent_requests` (int, http/http3)
|
||||||
|
- `edge_token` (string, http)
|
||||||
|
- `address` (string, optional; bind address, omit for all interfaces)
|
||||||
|
- `store`: defines the storage topology using composable stores. Frequently used:
|
||||||
|
- `proxied-s3`: production pattern with a `writer` (DB-backed -> S3/multiwriter) and a `reader` (caching -> disk + HTTP origins).
|
||||||
|
- `caching`: layered cache with a `cache` (often `db_backed` -> `disk`) and an `origin` chain (`http`, `http3`, or `ittt` fan-in).
|
||||||
|
- `s3`, `disk`, `multiwriter`, `db_backed`, `http`, `http3`, `peer`, `upstream` are also available building blocks.
|
||||||
|
|
||||||
|
### Minimal examples
|
||||||
|
Reflector – conf-dir contains `reflector.yaml`:
|
||||||
|
```yaml
|
||||||
|
servers:
|
||||||
|
http:
|
||||||
|
port: 5569
|
||||||
|
max_concurrent_requests: 200
|
||||||
|
http3:
|
||||||
|
port: 5568
|
||||||
|
max_concurrent_requests: 200
|
||||||
|
peer:
|
||||||
|
port: 5567
|
||||||
|
store:
|
||||||
|
proxied-s3:
|
||||||
|
name: s3_read_proxy
|
||||||
|
writer:
|
||||||
|
db_backed:
|
||||||
|
user: reflector
|
||||||
|
password: reflector
|
||||||
|
database: reflector
|
||||||
|
host: localhost
|
||||||
|
port: 3306
|
||||||
|
access_tracking: 1
|
||||||
|
soft_deletes: true
|
||||||
|
store:
|
||||||
|
s3:
|
||||||
|
name: primary
|
||||||
|
aws_id: YOUR_KEY
|
||||||
|
aws_secret: YOUR_SECRET
|
||||||
|
region: us-east-1
|
||||||
|
bucket: blobs-bucket
|
||||||
|
endpoint: https://s3.yourendpoint.tv
|
||||||
|
reader:
|
||||||
|
caching:
|
||||||
|
cache:
|
||||||
|
disk:
|
||||||
|
name: local_cache
|
||||||
|
mount_point: /mnt/reflector/cache
|
||||||
|
sharding_size: 2
|
||||||
|
origin:
|
||||||
|
http:
|
||||||
|
endpoint: https://s3.yourendpoint.tv/blobs-bucket/
|
||||||
|
sharding_size: 4
|
||||||
|
```
|
||||||
|
|
||||||
|
Blobcache – conf-dir contains `blobcache.yaml`:
|
||||||
|
```yaml
|
||||||
|
servers:
|
||||||
|
http:
|
||||||
|
port: 5569
|
||||||
|
max_concurrent_requests: 200
|
||||||
|
http3:
|
||||||
|
port: 5568
|
||||||
|
max_concurrent_requests: 200
|
||||||
|
peer:
|
||||||
|
port: 5567
|
||||||
|
store:
|
||||||
|
caching:
|
||||||
|
cache:
|
||||||
|
db_backed:
|
||||||
|
user: reflector
|
||||||
|
password: reflector
|
||||||
|
database: reflector
|
||||||
|
host: localhost
|
||||||
|
port: 3306
|
||||||
|
has_cap: true
|
||||||
|
max_size: 500GB
|
||||||
|
store:
|
||||||
|
disk:
|
||||||
|
name: blobcache
|
||||||
|
mount_point: /mnt/blobcache/cache
|
||||||
|
sharding_size: 2
|
||||||
|
origin:
|
||||||
|
http:
|
||||||
|
endpoint: https://s3.yourendpoint.tv/blobs-bucket/
|
||||||
|
sharding_size: 4
|
||||||
|
```
|
||||||
|
|
||||||
|
Uploader – conf-dir contains `upload.yaml` (points to the same writer/backend as reflector):
|
||||||
|
```yaml
|
||||||
|
database:
|
||||||
|
user: reflector
|
||||||
|
password: reflector
|
||||||
|
database: reflector
|
||||||
|
host: localhost
|
||||||
|
port: 3306
|
||||||
|
store:
|
||||||
|
proxied-s3:
|
||||||
|
writer:
|
||||||
|
db_backed:
|
||||||
|
user: reflector
|
||||||
|
password: reflector
|
||||||
|
database: reflector
|
||||||
|
host: localhost
|
||||||
|
port: 3306
|
||||||
|
store:
|
||||||
|
s3:
|
||||||
|
aws_id: YOUR_KEY
|
||||||
|
aws_secret: YOUR_SECRET
|
||||||
|
region: us-east-1
|
||||||
|
bucket: blobs-bucket
|
||||||
|
endpoint: https://s3.yourendpoint.tv
|
||||||
|
```
|
||||||
|
|
||||||
|
## Quick start
|
||||||
|
1) Build
|
||||||
|
- Requires Go 1.23+
|
||||||
|
- `make` (binaries in `dist/<platform>/prism-bin`)
|
||||||
|
|
||||||
|
2) Run a local blobcache
|
||||||
```bash
|
```bash
|
||||||
./prism-bin reflector \
|
./dist/linux_amd64/prism-bin --conf-dir=./ blobcache
|
||||||
--conf="none" \
|
|
||||||
--disable-uploads=true \
|
|
||||||
--use-db=false \
|
|
||||||
--upstream-reflector="reflector.lbry.com" \
|
|
||||||
--upstream-protocol="http" \
|
|
||||||
--request-queue-size=200 \
|
|
||||||
--disk-cache="2GB:/path/to/your/storage/:localdb" \
|
|
||||||
```
|
```
|
||||||
|
Place your `blobcache.yaml` in the `--conf-dir` directory.
|
||||||
|
|
||||||
Create a systemd script if you want to run it automatically on startup or as a service.
|
3) Run reflector ingestion
|
||||||
|
|
||||||
## Usage
|
|
||||||
|
|
||||||
Usage as reflector/blobcache:
|
|
||||||
```bash
|
```bash
|
||||||
Run reflector server
|
./dist/linux_amd64/prism-bin --conf-dir=./ reflector --receiver-port=5566 --metrics-port=2112
|
||||||
|
|
||||||
Usage:
|
|
||||||
prism reflector [flags]
|
|
||||||
|
|
||||||
Flags:
|
|
||||||
--disable-blocklist Disable blocklist watching/updating
|
|
||||||
--disable-uploads Disable uploads to this reflector server
|
|
||||||
--disk-cache string Where to cache blobs on the file system. format is 'sizeGB:CACHE_PATH:cachemanager' (cachemanagers: localdb/lfuda/lru) (default "100GB:/tmp/downloaded_blobs:localdb")
|
|
||||||
-h, --help help for reflector
|
|
||||||
--http-peer-port int The port reflector will distribute content from over HTTP protocol (default 5569)
|
|
||||||
--http3-peer-port int The port reflector will distribute content from over HTTP3 protocol (default 5568)
|
|
||||||
--mem-cache int enable in-memory cache with a max size of this many blobs
|
|
||||||
--metrics-port int The port reflector will use for prometheus metrics (default 2112)
|
|
||||||
--optional-disk-cache string Optional secondary file system cache for blobs. format is 'sizeGB:CACHE_PATH:cachemanager' (cachemanagers: localdb/lfuda/lru) (this would get hit before the one specified in disk-cache)
|
|
||||||
--origin-endpoint string HTTP edge endpoint for standard HTTP retrieval
|
|
||||||
--origin-endpoint-fallback string HTTP edge endpoint for standard HTTP retrieval if first origin fails
|
|
||||||
--receiver-port int The port reflector will receive content from (default 5566)
|
|
||||||
--request-queue-size int How many concurrent requests from downstream should be handled at once (the rest will wait) (default 200)
|
|
||||||
--tcp-peer-port int The port reflector will distribute content from for the TCP (LBRY) protocol (default 5567)
|
|
||||||
--upstream-protocol string protocol used to fetch blobs from another upstream reflector server (tcp/http3/http) (default "http")
|
|
||||||
--upstream-reflector string host:port of a reflector server where blobs are fetched from
|
|
||||||
--use-db Whether to connect to the reflector db or not (default true)
|
|
||||||
|
|
||||||
Global Flags:
|
|
||||||
--conf string Path to config. Use 'none' to disable (default "config.json")
|
|
||||||
-v, --verbose strings Verbose logging for specific components
|
|
||||||
```
|
```
|
||||||
|
|
||||||
Other uses:
|
4) Upload blobs
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
Prism is a single entry point application with multiple sub modules which can be leveraged individually or together
|
./dist/linux_amd64/prism-bin --conf-dir=./ upload /path/to/blobs \
|
||||||
|
--workers=4 --skipExistsCheck
|
||||||
Usage:
|
|
||||||
prism [command]
|
|
||||||
|
|
||||||
Available Commands:
|
|
||||||
check-integrity check blobs integrity for a given path
|
|
||||||
cluster Start(join) to or Start a new cluster
|
|
||||||
decode Decode a claim value
|
|
||||||
dht Run dht node
|
|
||||||
getstream Get a stream from a reflector server
|
|
||||||
help Help about any command
|
|
||||||
peer Run peer server
|
|
||||||
populate-db populate local database with blobs from a disk storage
|
|
||||||
publish Publish a file
|
|
||||||
reflector Run reflector server
|
|
||||||
resolve Resolve a URL
|
|
||||||
send Send a file to a reflector
|
|
||||||
sendblob Send a random blob to a reflector server
|
|
||||||
start Runs full prism application with cluster, dht, peer server, and reflector server.
|
|
||||||
test Test things
|
|
||||||
upload Upload blobs to S3
|
|
||||||
version Print the version
|
|
||||||
|
|
||||||
Flags:
|
|
||||||
--conf string Path to config. Use 'none' to disable (default "config.json")
|
|
||||||
-h, --help help for prism
|
|
||||||
-v, --verbose strings Verbose logging for specific components
|
|
||||||
```
|
|
||||||
## Running from Source
|
|
||||||
|
|
||||||
This project requires [Go v1.23](https://golang.org/doc/install).
|
|
||||||
|
|
||||||
On Ubuntu you can install it with `sudo snap install go --classic`
|
|
||||||
|
|
||||||
```
|
|
||||||
git clone git@github.com:lbryio/reflector.go.git
|
|
||||||
cd reflector.go
|
|
||||||
make
|
|
||||||
./dist/linux_amd64/prism-bin
|
|
||||||
```
|
```
|
||||||
|
|
||||||
## Contributing
|
## Notes
|
||||||
|
- Only reflector, blobcache, and upload are supported. All other commands are legacy and may be removed in the future.
|
||||||
coming soon
|
- Metrics are exposed on the configured `--metrics-port` at `/metrics` (Prometheus format).
|
||||||
|
- MySQL is required when using DB-backed stores (e.g., ingestion writer, capacity-aware caches).
|
||||||
## License
|
|
||||||
|
|
||||||
This project is MIT licensed.
|
|
||||||
|
|
||||||
## Security
|
## Security
|
||||||
|
If you discover a security issue, please email security@lbry.com. Our PGP key is available at https://lbry.com/faq/pgp-key.
|
||||||
|
|
||||||
We take security seriously. Please contact security@lbry.com regarding any security issues.
|
## License
|
||||||
Our PGP key is [here](https://lbry.com/faq/pgp-key) if you need it.
|
MIT License. See LICENSE.
|
||||||
|
|
||||||
## Contact
|
## Contact
|
||||||
The primary contact for this project is [@Nikooo777](https://github.com/Nikooo777) (niko-at-lbry.com)
|
The primary contact for this project is [@Nikooo777](https://github.com/Nikooo777)
|
||||||
|
|
Loading…
Add table
Reference in a new issue