From 2b458a6bd054dfc61ffce125f4901447e1354ea7 Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Wed, 21 Jul 2021 18:32:48 +0200 Subject: [PATCH] fix params more cleanups --- cmd/reflector.go | 8 ++++---- db/db.go | 14 +++++++------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/cmd/reflector.go b/cmd/reflector.go index 3ee853d..93e9a9f 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -31,7 +31,7 @@ var ( //port configuration tcpPeerPort int http3PeerPort int - httpPort int + httpPeerPort int receiverPort int metricsPort int @@ -73,7 +73,7 @@ func init() { cmd.Flags().IntVar(&tcpPeerPort, "tcp-peer-port", 5567, "The port reflector will distribute content from for the TCP (LBRY) protocol") cmd.Flags().IntVar(&http3PeerPort, "http3-peer-port", 5568, "The port reflector will distribute content from over HTTP3 protocol") - cmd.Flags().IntVar(&httpPort, "http-port", 5569, "The port reflector will distribute content from over HTTP protocol") + cmd.Flags().IntVar(&httpPeerPort, "http-peer-port", 5569, "The port reflector will distribute content from over HTTP protocol") cmd.Flags().IntVar(&receiverPort, "receiver-port", 5566, "The port reflector will receive content from") cmd.Flags().IntVar(&metricsPort, "metrics-port", 2112, "The port reflector will use for prometheus metrics") @@ -82,7 +82,7 @@ func init() { cmd.Flags().BoolVar(&useDB, "use-db", true, "Whether to connect to the reflector db or not") cmd.Flags().StringVar(&upstreamReflector, "upstream-reflector", "", "host:port of a reflector server where blobs are fetched from") - cmd.Flags().StringVar(&upstreamProtocol, "proxy-protocol", "http", "protocol used to fetch blobs from another reflector server (tcp/http3/http)") + cmd.Flags().StringVar(&upstreamProtocol, "upstream-protocol", "http", "protocol used to fetch blobs from another upstream reflector server (tcp/http3/http)") cmd.Flags().IntVar(&requestQueueSize, "request-queue-size", 200, "How many concurrent requests from downstream should be handled at once (the rest will wait)") @@ -130,7 +130,7 @@ func reflectorCmd(cmd *cobra.Command, args []string) { defer http3PeerServer.Shutdown() httpServer := http.NewServer(underlyingStoreWithCaches, requestQueueSize) - err = httpServer.Start(":" + strconv.Itoa(httpPort)) + err = httpServer.Start(":" + strconv.Itoa(httpPeerPort)) if err != nil { log.Fatal(err) } diff --git a/db/db.go b/db/db.go index eca573c..a8abe82 100644 --- a/db/db.go +++ b/db/db.go @@ -39,12 +39,9 @@ type SdBlob struct { type trackAccess int const ( - //TrackAccessNone Don't track accesses - TrackAccessNone trackAccess = iota - //TrackAccessStreams Track accesses at the stream level - TrackAccessStreams - //TrackAccessBlobs Track accesses at the blob level - TrackAccessBlobs + TrackAccessNone trackAccess = iota // Don't track accesses + TrackAccessStreams // Track accesses at the stream level + TrackAccessBlobs // Track accesses at the blob level ) // SQL implements the DB interface @@ -106,13 +103,14 @@ func (s *SQL) AddBlobs(hash []string) error { if s.conn == nil { return errors.Err("not connected") } - // Split the slice into batches of 20 items. + batch := 10000 totalBlobs := int64(len(hash)) work := make(chan []string, 1000) stopper := stop.New() var totalInserted atomic.Int64 start := time.Now() + go func() { for i := 0; i < len(hash); i += batch { j := i + batch @@ -124,6 +122,7 @@ func (s *SQL) AddBlobs(hash []string) error { log.Infof("done loading %d hashes in the work queue", len(hash)) close(work) }() + for i := 0; i < runtime.NumCPU(); i++ { stopper.Add(1) go func(worker int) { @@ -145,6 +144,7 @@ func (s *SQL) AddBlobs(hash []string) error { } }(i) } + stopper.Wait() return nil }