From 3b0a2df0ef6c84e4444865f8d81a7d44a392e586 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Mon, 14 May 2018 20:55:45 -0400 Subject: [PATCH] added upload command, --conf and --verbose flags --- cmd/cluster.go | 4 +- cmd/root.go | 52 ++++++++++++- cmd/upload.go | 187 +++++++++++++++++++++++++++++++++++++++++++++ db/db.go | 55 +++++++++++++ dht/node_finder.go | 3 + main.go | 28 +------ peer/server.go | 4 +- 7 files changed, 300 insertions(+), 33 deletions(-) create mode 100644 cmd/upload.go diff --git a/cmd/cluster.go b/cmd/cluster.go index 2e6cc52..cb6659f 100644 --- a/cmd/cluster.go +++ b/cmd/cluster.go @@ -95,7 +95,7 @@ func clusterCmd(cmd *cobra.Command, args []string) { } func getHashRangeStart(myName string, members []serf.Member) int { - names := []string{} + var names []string for _, m := range members { names = append(names, m.Name) } @@ -112,7 +112,7 @@ func getHashRangeStart(myName string, members []serf.Member) int { } func getAliveMembers(members []serf.Member) []serf.Member { - alive := []serf.Member{} + var alive []serf.Member for _, m := range members { if m.Status == serf.StatusAlive { alive = append(alive, m) diff --git a/cmd/root.go b/cmd/root.go index 95df43d..627fd92 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -1,9 +1,13 @@ package cmd import ( - "fmt" + "encoding/json" + "io/ioutil" "os" + "github.com/lbryio/errors.go" + + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) @@ -15,22 +19,47 @@ type Config struct { DBConn string `json:"db_conn"` } +var Verbose bool +var Conf string var GlobalConfig Config // RootCmd represents the base command when called without any subcommands var RootCmd = &cobra.Command{ Use: "reflector", Short: "Reflector accepts blobs, stores them securely, and hosts them on the network", + PersistentPreRun: func(cmd *cobra.Command, args []string) { + if Verbose { + log.SetLevel(log.DebugLevel) + } + + var err error + if Conf == "" { + log.Errorln("--conf flag required") + os.Exit(1) + } else { + GlobalConfig, err = loadConfig(Conf) + if err != nil { + log.Error(err) + os.Exit(1) + } + } + }, + // Uncomment the following line if your bare application // has an action associated with it: // Run: func(cmd *cobra.Command, args []string) { }, } +func init() { + RootCmd.PersistentFlags().BoolVarP(&Verbose, "verbose", "v", false, "Enable verbose logging") + RootCmd.PersistentFlags().StringVar(&Conf, "conf", "config.json", "Path to config") +} + // Execute adds all child commands to the root command and sets flags appropriately. // This is called by main.main(). It only needs to happen once to the rootCmd. func Execute() { if err := RootCmd.Execute(); err != nil { - fmt.Println(err) + log.Errorln(err) os.Exit(1) } } @@ -52,3 +81,22 @@ func argFuncs(funcs ...cobra.PositionalArgs) cobra.PositionalArgs { return nil } } + +func loadConfig(path string) (Config, error) { + var c Config + + raw, err := ioutil.ReadFile(path) + if err != nil { + if os.IsNotExist(err) { + return c, errors.Err("config file not found") + } + return c, err + } + + err = json.Unmarshal(raw, &c) + if err != nil { + return c, err + } + + return c, nil +} diff --git a/cmd/upload.go b/cmd/upload.go new file mode 100644 index 0000000..cb81b03 --- /dev/null +++ b/cmd/upload.go @@ -0,0 +1,187 @@ +package cmd + +import ( + "encoding/json" + "io/ioutil" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/lbryio/lbry.go/stopOnce" + "github.com/lbryio/reflector.go/db" + "github.com/lbryio/reflector.go/peer" + "github.com/lbryio/reflector.go/store" + + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +func init() { + var cmd = &cobra.Command{ + Use: "upload DIR", + Short: "Upload blobs to S3", + Args: cobra.ExactArgs(1), + Run: uploadCmd, + } + RootCmd.AddCommand(cmd) +} + +func uploadCmd(cmd *cobra.Command, args []string) { + startTime := time.Now() + db := new(db.SQL) + err := db.Connect(GlobalConfig.DBConn) + checkErr(err) + + stopper := stopOnce.New() + interruptChan := make(chan os.Signal, 1) + signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) + go func() { + <-interruptChan + stopper.Stop() + }() + + dir := args[0] + + f, err := os.Open(dir) + checkErr(err) + + files, err := f.Readdir(-1) + checkErr(err) + err = f.Close() + checkErr(err) + + var filenames []string + for _, file := range files { + if file.IsDir() { + continue + } + + filenames = append(filenames, file.Name()) + } + + totalCount := len(filenames) + + log.Println("checking for existing blobs") + + exists, err := db.HasBlobs(filenames) + checkErr(err) + existsCount := len(exists) + + log.Printf("%d new blobs to upload", totalCount-existsCount) + + sdCount := 0 + blobCount := 0 + + concurrency := 2 + wg := &sync.WaitGroup{} + filenameChan := make(chan string) + sdChan := make(chan int) + blobChan := make(chan int) + + go func() { + wg.Add(1) + defer wg.Done() + for { + select { + case <-stopper.Chan(): + return + case <-sdChan: + sdCount++ + case <-blobChan: + blobCount++ + } + if (sdCount+blobCount)%50 == 0 { + log.Printf("%d of %d done (%s elapsed)", sdCount+blobCount, totalCount-existsCount, time.Now().Sub(startTime).String()) + } + } + }() + + for i := 0; i < concurrency; i++ { + go func(i int) { + wg.Add(1) + defer wg.Done() + defer func(i int) { + log.Printf("worker %d quitting", i) + }(i) + + blobStore := newBlobStore() + + for { + select { + case <-stopper.Chan(): + return + case filename, ok := <-filenameChan: + if !ok { + return + } + + blob, err := ioutil.ReadFile(dir + "/" + filename) + checkErr(err) + + hash := peer.GetBlobHash(blob) + if hash != filename { + log.Errorf("worker %d: filename does not match hash (%s != %s), skipping", i, filename, hash) + continue + } + + if isJSON(blob) { + log.Printf("worker %d: PUTTING SD BLOB %s", i, hash) + blobStore.PutSD(hash, blob) + select { + case sdChan <- 1: + case <-stopper.Chan(): + } + } else { + log.Printf("worker %d: putting %s", i, hash) + blobStore.Put(hash, blob) + select { + case blobChan <- 1: + case <-stopper.Chan(): + } + } + } + } + }(i) + } + +Upload: + for _, filename := range filenames { + if exists[filename] { + continue + } + + select { + case filenameChan <- filename: + case <-stopper.Chan(): + log.Warnln("Caught interrupt, quitting at first opportunity...") + break Upload + } + } + + close(filenameChan) + wg.Wait() + stopper.Stop() + + log.Println("") + log.Println("SUMMARY") + log.Printf("%d blobs total", totalCount) + log.Printf("%d SD blobs uploaded", sdCount) + log.Printf("%d content blobs uploaded", blobCount) + log.Printf("%d blobs already stored", existsCount) +} + +func isJSON(data []byte) bool { + var js json.RawMessage + return json.Unmarshal(data, &js) == nil +} + +func newBlobStore() *store.DBBackedS3Store { + db := new(db.SQL) + err := db.Connect(GlobalConfig.DBConn) + checkErr(err) + + s3 := store.NewS3BlobStore(GlobalConfig.AwsID, GlobalConfig.AwsSecret, GlobalConfig.BucketRegion, GlobalConfig.BucketName) + return store.NewDBBackedS3Store(s3, db) +} diff --git a/db/db.go b/db/db.go index 49141fe..88e5445 100644 --- a/db/db.go +++ b/db/db.go @@ -93,6 +93,61 @@ func (s *SQL) HasBlob(hash string) (bool, error) { return exists, errors.Err(err) } +func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) { + if s.conn == nil { + return nil, errors.Err("not connected") + } + + var hash string + exists := make(map[string]bool) + maxBatchSize := 100 + doneIndex := 0 + + for len(hashes) > doneIndex { + sliceEnd := doneIndex + maxBatchSize + if sliceEnd > len(hashes) { + sliceEnd = len(hashes) + } + log.Debugf("getting hashes[%d:%d] of %d", doneIndex, sliceEnd, len(hashes)) + batch := hashes[doneIndex:sliceEnd] + + query := "SELECT hash FROM blob_ WHERE stored = ? && hash IN (" + querytools.Qs(len(batch)) + ")" + args := make([]interface{}, len(batch)+1) + args[0] = true + for i := range batch { + args[i+1] = batch[i] + } + + logQuery(query, args...) + + rows, err := s.conn.Query(query, args...) + if err != nil { + rows.Close() + return exists, err + } + + for rows.Next() { + err := rows.Scan(&hash) + if err != nil { + rows.Close() + return exists, err + } + exists[hash] = true + } + + err = rows.Err() + if err != nil { + rows.Close() + return exists, err + } + + rows.Close() + doneIndex += len(batch) + } + + return exists, nil +} + func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob types.SdBlob) error { if s.conn == nil { return errors.Err("not connected") diff --git a/dht/node_finder.go b/dht/node_finder.go index d2993a5..f9f620f 100644 --- a/dht/node_finder.go +++ b/dht/node_finder.go @@ -12,6 +12,9 @@ import ( log "github.com/sirupsen/logrus" ) +// TODO: iterativeFindValue may be stopping early. if it gets a response with one peer, it should keep going because other nodes may know about more peers that have that blob +// TODO: or, it should try a tcp handshake with peers as it finds them, to make sure they are still online and have the blob + type contactFinder struct { findValue bool // true if we're using findValue target Bitmap diff --git a/main.go b/main.go index 4be1d4f..00f66cb 100644 --- a/main.go +++ b/main.go @@ -1,33 +1,7 @@ package main -import ( - "encoding/json" - "io/ioutil" - - "github.com/lbryio/reflector.go/cmd" - - log "github.com/sirupsen/logrus" -) - -func checkErr(err error) { - if err != nil { - panic(err) - } -} +import "github.com/lbryio/reflector.go/cmd" func main() { - log.SetLevel(log.DebugLevel) - cmd.GlobalConfig = loadConfig("config.json") cmd.Execute() } - -func loadConfig(path string) cmd.Config { - raw, err := ioutil.ReadFile(path) - checkErr(err) - - var c cmd.Config - err = json.Unmarshal(raw, &c) - checkErr(err) - - return c -} diff --git a/peer/server.go b/peer/server.go index 1b56a30..9dbd9e0 100644 --- a/peer/server.go +++ b/peer/server.go @@ -150,7 +150,7 @@ func (s *Server) handleBlobRequest(data []byte) ([]byte, error) { } response, err := json.Marshal(blobResponse{IncomingBlob: incomingBlob{ - BlobHash: getBlobHash(blob), + BlobHash: GetBlobHash(blob), Length: len(blob), }}) if err != nil { @@ -213,7 +213,7 @@ func isValidJSON(b []byte) bool { return json.Unmarshal(b, &r) == nil } -func getBlobHash(blob []byte) string { +func GetBlobHash(blob []byte) string { hashBytes := sha512.Sum384(blob) return hex.EncodeToString(hashBytes[:]) }