From 35f98ce1622ab6101370470ab580b248b291ed5d Mon Sep 17 00:00:00 2001 From: Mark Beamer Jr Date: Tue, 29 May 2018 21:38:55 -0400 Subject: [PATCH] code cleanup -Added travis support -updated travis to analyze code beneath the root. -refactored upload.go to fix travis errors. -gocyclo should ignore test files. $GOFILES needed to be adjusted. -fix rows.Close() ignoring error. Created func to handle so defer can be used when needed also. -fixed ignored errors. -fixed unit test that was not passing correctly to anonymous function. -fixed govet error for passing param inside go func. -removed returned error, in favor of logging instead. -added error logging for ignored error. -fixed potential race conditions. -removed unused append -fixed time usage to align with go standards. -removed unused variables -made changes for code review. -code comments for exported functions. -Documented bitmap.go and insert into contact list. -Documented dht, message, bootstrap -Fixed comment typos -Documented message,node, routing_table, testing in DHT package. -Documented server, client, prism, server and shared in peer and reflector packages. -Documented the stores in Store package. -made defer adjustments inline and deleted the separate function. -adjusted method in upload to take the only parameter it requires. --- .travis.yml | 71 ++++++++++ cluster/cluster.go | 10 +- cmd/cluster.go | 2 +- cmd/dht.go | 2 +- cmd/peer.go | 6 +- cmd/reflector.go | 6 +- cmd/root.go | 22 +-- cmd/start.go | 6 +- cmd/upload.go | 286 ++++++++++++++++++++++----------------- db/db.go | 68 ++++++++-- dht/bitmap.go | 54 +++++++- dht/bitmap_test.go | 3 - dht/bootstrap.go | 15 +- dht/bootstrap_test.go | 4 +- dht/dht.go | 15 +- dht/dht_test.go | 16 ++- dht/message.go | 19 ++- dht/message_test.go | 2 +- dht/node.go | 53 +++++--- dht/node_finder.go | 3 + dht/node_test.go | 16 +-- dht/routing_table.go | 18 ++- dht/testing.go | 17 ++- peer/server.go | 36 ++++- peer/server_test.go | 10 +- reflector/client.go | 5 + reflector/client_test.go | 18 ++- reflector/prism.go | 4 + reflector/server.go | 30 +++- reflector/shared.go | 2 + store/dbbacked.go | 7 + store/file.go | 7 + store/memory.go | 6 + store/memory_test.go | 4 +- store/s3.go | 8 ++ store/store.go | 2 + types/types.go | 1 + 37 files changed, 615 insertions(+), 239 deletions(-) create mode 100644 .travis.yml diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..c247e1e --- /dev/null +++ b/.travis.yml @@ -0,0 +1,71 @@ +os: linux +dist: trusty +language: go + +# Only the last two Go releases are supported by the Go team with security +# updates. Any versions older than that should be considered deprecated. +# Don't bother testing with them. tip builds your code with the latest +# development version of Go. This can warn you that your code will break +# in the next version of Go. Don't worry! Later we declare that test runs +# are allowed to fail on Go tip. +go: + - 1.10.2 + - master + +# Skip the install step. Don't `go get` dependencies. Only build with the +# code in vendor/ +install: true + +matrix: + # It's ok if our code fails on unstable development versions of Go. + allow_failures: + - go: master + # Don't wait for tip tests to finish. Mark the test run green if the + # tests pass on the stable versions of Go. + fast_finish: true + +# Don't email me the results of the test runs. +notifications: + email: false + +# Anything in before_script that returns a nonzero exit code will +# flunk the build and immediately stop. It's sorta like having +# set -e enabled in bash. +before_script: +# All the .go files, excluding vendor/ and model (auto generated) + - GO_FILES=$(find . -iname '*.go' ! -iname '*_test.go' -type f | grep -v /vendor/ ) + - go get golang.org/x/tools/cmd/goimports # Used in build script for generated files + - go get github.com/golang/lint/golint # Linter + - go get honnef.co/go/tools/cmd/megacheck # Badass static analyzer/linter + - go get github.com/jgautheron/gocyclo # Check against high complexity + - go get github.com/mdempsky/unconvert # Identifies unnecessary type conversions + - go get github.com/kisielk/errcheck # Checks for unhandled errors + - go get github.com/opennota/check/cmd/varcheck # Checks for unused vars + - go get github.com/opennota/check/cmd/structcheck # Checks for unused fields in structs + + + +# script always run to completion (set +e). All of these code checks are must haves +# in a modern Go project. +script: + # Build Prism successfully + - make + # Fail if a .go file hasn't been formatted with gofmt + - test -z $(gofmt -s -l $GO_FILES) + # Run unit tests + - go test ./... + # Checks for unused vars and fields on structs + - varcheck ./... + - structcheck ./... + # go vet is the official Go static analyzer + - go vet ./... + # forbid code with huge functions + - gocyclo -ignore "_test.go" -avg -over 19 $GO_FILES + # checks for unhandled errors + - errcheck ./... + # "go vet on steroids" + linter - ignore autogen code + - megacheck -simple.exit-non-zero=true ./... + # check for unnecessary conversions - ignore autogen code + - unconvert ./... + # one last linter - ignore autogen code + - golint -set_exit_status $(go list ./... | grep -v /vendor/ ) diff --git a/cluster/cluster.go b/cluster/cluster.go index 30803b1..6ac8e76 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -14,9 +14,11 @@ import ( ) const ( + // DefaultClusterPort is the default port used when starting up a Cluster. DefaultClusterPort = 17946 ) +// Cluster is a management type for Serf which is used to maintain cluster membership of lbry nodes. type Cluster struct { name string port int @@ -27,6 +29,7 @@ type Cluster struct { stop *stopOnce.Stopper } +// New returns a new Cluster instance that is not connected. func New(port int, seedAddr string) *Cluster { return &Cluster{ name: crypto.RandString(12), @@ -36,6 +39,8 @@ func New(port int, seedAddr string) *Cluster { } } +// Connect Initializes the Cluster based on a configuration passed via the New function. It then stores the seed +// address, starts gossiping and listens for gossip. func (c *Cluster) Connect() error { var err error @@ -66,9 +71,12 @@ func (c *Cluster) Connect() error { return nil } +// Shutdown safely shuts down the cluster. func (c *Cluster) Shutdown() { c.stop.StopAndWait() - c.s.Leave() + if err := c.s.Leave(); err != nil { + log.Error("error shutting down cluster - ", err) + } } func (c *Cluster) listen() { diff --git a/cmd/cluster.go b/cmd/cluster.go index 2f4db81..3d41f63 100644 --- a/cmd/cluster.go +++ b/cmd/cluster.go @@ -21,7 +21,7 @@ func init() { Args: argFuncs(cobra.ExactArgs(1), cobra.OnlyValidArgs), Run: clusterCmd, } - RootCmd.AddCommand(cmd) + rootCmd.AddCommand(cmd) } func clusterCmd(cmd *cobra.Command, args []string) { diff --git a/cmd/dht.go b/cmd/dht.go index 3e5afe9..993d306 100644 --- a/cmd/dht.go +++ b/cmd/dht.go @@ -12,7 +12,7 @@ func init() { Short: "Run interactive dht node", Run: dhtCmd, } - RootCmd.AddCommand(cmd) + rootCmd.AddCommand(cmd) } func dhtCmd(cmd *cobra.Command, args []string) { diff --git a/cmd/peer.go b/cmd/peer.go index ced40bc..edf864f 100644 --- a/cmd/peer.go +++ b/cmd/peer.go @@ -17,15 +17,15 @@ func init() { Short: "Run peer server", Run: peerCmd, } - RootCmd.AddCommand(cmd) + rootCmd.AddCommand(cmd) } func peerCmd(cmd *cobra.Command, args []string) { db := new(db.SQL) - err := db.Connect(GlobalConfig.DBConn) + err := db.Connect(globalConfig.DBConn) checkErr(err) - s3 := store.NewS3BlobStore(GlobalConfig.AwsID, GlobalConfig.AwsSecret, GlobalConfig.BucketRegion, GlobalConfig.BucketName) + s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) combo := store.NewDBBackedS3Store(s3, db) log.Fatal(peer.NewServer(combo).ListenAndServe("localhost:" + strconv.Itoa(peer.DefaultPort))) } diff --git a/cmd/reflector.go b/cmd/reflector.go index 06765fe..c2975f2 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -17,15 +17,15 @@ func init() { Short: "Run reflector server", Run: reflectorCmd, } - RootCmd.AddCommand(cmd) + rootCmd.AddCommand(cmd) } func reflectorCmd(cmd *cobra.Command, args []string) { db := new(db.SQL) - err := db.Connect(GlobalConfig.DBConn) + err := db.Connect(globalConfig.DBConn) checkErr(err) - s3 := store.NewS3BlobStore(GlobalConfig.AwsID, GlobalConfig.AwsSecret, GlobalConfig.BucketRegion, GlobalConfig.BucketName) + s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) combo := store.NewDBBackedS3Store(s3, db) log.Fatal(reflector.NewServer(combo).ListenAndServe("localhost:" + strconv.Itoa(reflector.DefaultPort))) } diff --git a/cmd/root.go b/cmd/root.go index e09d4b1..85c1bd9 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -11,6 +11,7 @@ import ( "github.com/spf13/cobra" ) +// Config is the base configuration for Prism when no sub commands are used. type Config struct { AwsID string `json:"aws_id"` AwsSecret string `json:"aws_secret"` @@ -19,25 +20,24 @@ type Config struct { DBConn string `json:"db_conn"` } -var Verbose bool -var Conf string -var GlobalConfig Config +var verbose bool +var conf string +var globalConfig Config -// RootCmd represents the base command when called without any subcommands -var RootCmd = &cobra.Command{ +var rootCmd = &cobra.Command{ Use: "reflector", Short: "Reflector accepts blobs, stores them securely, and hosts them on the network", PersistentPreRun: func(cmd *cobra.Command, args []string) { - if Verbose { + if verbose { log.SetLevel(log.DebugLevel) } var err error - if Conf == "" { + if conf == "" { log.Errorln("--conf flag required") os.Exit(1) } else { - GlobalConfig, err = loadConfig(Conf) + globalConfig, err = loadConfig(conf) if err != nil { log.Error(err) os.Exit(1) @@ -51,14 +51,14 @@ var RootCmd = &cobra.Command{ } func init() { - RootCmd.PersistentFlags().BoolVarP(&Verbose, "verbose", "v", false, "Enable verbose logging") - RootCmd.PersistentFlags().StringVar(&Conf, "conf", "config.json", "Path to config") + rootCmd.PersistentFlags().BoolVarP(&verbose, "verbose", "v", false, "Enable verbose logging") + rootCmd.PersistentFlags().StringVar(&conf, "conf", "config.json", "Path to config") } // Execute adds all child commands to the root command and sets flags appropriately. // This is called by main.main(). It only needs to happen once to the rootCmd. func Execute() { - if err := RootCmd.Execute(); err != nil { + if err := rootCmd.Execute(); err != nil { log.Errorln(err) os.Exit(1) } diff --git a/cmd/start.go b/cmd/start.go index 2a89d2e..7a25d81 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -20,15 +20,15 @@ func init() { Run: startCmd, Args: cobra.RangeArgs(0, 1), } - RootCmd.AddCommand(cmd) + rootCmd.AddCommand(cmd) } func startCmd(cmd *cobra.Command, args []string) { db := new(db.SQL) - err := db.Connect(GlobalConfig.DBConn) + err := db.Connect(globalConfig.DBConn) checkErr(err) - s3 := store.NewS3BlobStore(GlobalConfig.AwsID, GlobalConfig.AwsSecret, GlobalConfig.BucketRegion, GlobalConfig.BucketName) + s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) comboStore := store.NewDBBackedS3Store(s3, db) clusterAddr := "" diff --git a/cmd/upload.go b/cmd/upload.go index 7c937c8..86643f2 100644 --- a/cmd/upload.go +++ b/cmd/upload.go @@ -20,6 +20,23 @@ import ( var workers int +const ( + sdInc = 1 + blobInc = 2 + errInc = 3 +) + +type uploaderParams struct { + workerWG *sync.WaitGroup + counterWG *sync.WaitGroup + stopper *stopOnce.Stopper + filenameChan chan string + countChan chan int + sdCount int + blobCount int + errCount int +} + func init() { var cmd = &cobra.Command{ Use: "upload DIR", @@ -28,40 +45,27 @@ func init() { Run: uploadCmd, } cmd.PersistentFlags().IntVar(&workers, "workers", 1, "How many worker threads to run at once") - RootCmd.AddCommand(cmd) + rootCmd.AddCommand(cmd) } func uploadCmd(cmd *cobra.Command, args []string) { startTime := time.Now() db := new(db.SQL) - err := db.Connect(GlobalConfig.DBConn) + err := db.Connect(globalConfig.DBConn) checkErr(err) - stopper := stopOnce.New() - interruptChan := make(chan os.Signal, 1) - signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) - go func() { - <-interruptChan - stopper.Stop() - }() + params := uploaderParams{ + workerWG: &sync.WaitGroup{}, + counterWG: &sync.WaitGroup{}, + filenameChan: make(chan string), + countChan: make(chan int), + stopper: stopOnce.New()} - dir := args[0] + setInterrupt(params.stopper) - f, err := os.Open(dir) + filenames, err := getFileNames(args[0]) checkErr(err) - files, err := f.Readdir(-1) - checkErr(err) - err = f.Close() - checkErr(err) - - var filenames []string - for _, file := range files { - if !file.IsDir() { - filenames = append(filenames, file.Name()) - } - } - totalCount := len(filenames) log.Println("checking for existing blobs") @@ -72,96 +76,11 @@ func uploadCmd(cmd *cobra.Command, args []string) { log.Printf("%d new blobs to upload", totalCount-existsCount) - sdCount := 0 - blobCount := 0 - errCount := 0 - - workerWG := &sync.WaitGroup{} - filenameChan := make(chan string) - counterWG := &sync.WaitGroup{} - countChan := make(chan int) - const ( - sdInc = 1 - blobInc = 2 - errInc = 3 - ) - - for i := 0; i < workers; i++ { - go func(i int) { - workerWG.Add(1) - defer workerWG.Done() - defer func(i int) { - log.Printf("worker %d quitting", i) - }(i) - - blobStore := newBlobStore() - - for { - select { - case <-stopper.Ch(): - return - case filename, ok := <-filenameChan: - if !ok { - return - } - - blob, err := ioutil.ReadFile(dir + "/" + filename) - checkErr(err) - - hash := peer.GetBlobHash(blob) - if hash != filename { - log.Errorf("worker %d: filename does not match hash (%s != %s), skipping", i, filename, hash) - select { - case countChan <- errInc: - case <-stopper.Ch(): - } - continue - } - - if isJSON(blob) { - log.Printf("worker %d: PUTTING SD BLOB %s", i, hash) - blobStore.PutSD(hash, blob) - select { - case countChan <- sdInc: - case <-stopper.Ch(): - } - } else { - log.Printf("worker %d: putting %s", i, hash) - blobStore.Put(hash, blob) - select { - case countChan <- blobInc: - case <-stopper.Ch(): - } - } - } - } - }(i) - } - + startUploadWorkers(¶ms, args[0]) + params.counterWG.Add(1) go func() { - counterWG.Add(1) - defer counterWG.Done() - for { - select { - case <-stopper.Ch(): - return - case countType, ok := <-countChan: - if !ok { - return - } - switch countType { - case sdInc: - sdCount++ - case blobInc: - blobCount++ - case errInc: - errCount++ - } - } - if (sdCount+blobCount)%50 == 0 { - log.Printf("%d of %d done (%s elapsed, %.3fs per blob)", sdCount+blobCount, totalCount-existsCount, time.Now().Sub(startTime).String(), time.Now().Sub(startTime).Seconds()/float64(sdCount+blobCount)) - } - } + defer params.counterWG.Done() + runCountReceiver(¶ms, startTime, totalCount, existsCount) }() Upload: @@ -171,25 +90,25 @@ Upload: } select { - case filenameChan <- filename: - case <-stopper.Ch(): + case params.filenameChan <- filename: + case <-params.stopper.Ch(): log.Warnln("Caught interrupt, quitting at first opportunity...") break Upload } } - close(filenameChan) - workerWG.Wait() - close(countChan) - counterWG.Wait() - stopper.Stop() + close(params.filenameChan) + params.workerWG.Wait() + close(params.countChan) + params.counterWG.Wait() + params.stopper.Stop() log.Println("SUMMARY") log.Printf("%d blobs total", totalCount) - log.Printf("%d SD blobs uploaded", sdCount) - log.Printf("%d content blobs uploaded", blobCount) + log.Printf("%d SD blobs uploaded", params.sdCount) + log.Printf("%d content blobs uploaded", params.blobCount) log.Printf("%d blobs already stored", existsCount) - log.Printf("%d errors encountered", errCount) + log.Printf("%d errors encountered", params.errCount) } func isJSON(data []byte) bool { @@ -199,9 +118,128 @@ func isJSON(data []byte) bool { func newBlobStore() *store.DBBackedS3Store { db := new(db.SQL) - err := db.Connect(GlobalConfig.DBConn) + err := db.Connect(globalConfig.DBConn) checkErr(err) - s3 := store.NewS3BlobStore(GlobalConfig.AwsID, GlobalConfig.AwsSecret, GlobalConfig.BucketRegion, GlobalConfig.BucketName) + s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) return store.NewDBBackedS3Store(s3, db) } + +func setInterrupt(stopper *stopOnce.Stopper) { + interruptChan := make(chan os.Signal, 1) + signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) + go func() { + <-interruptChan + stopper.Stop() + }() +} + +func startUploadWorkers(params *uploaderParams, dir string) { + for i := 0; i < workers; i++ { + params.workerWG.Add(1) + go func(i int) { + defer params.workerWG.Done() + defer func(i int) { + log.Printf("worker %d quitting", i) + }(i) + + blobStore := newBlobStore() + launchFileUploader(params, blobStore, dir, i) + }(i) + } +} + +func launchFileUploader(params *uploaderParams, blobStore *store.DBBackedS3Store, dir string, worker int) { + for { + select { + case <-params.stopper.Ch(): + return + case filename, ok := <-params.filenameChan: + if !ok { + return + } + + blob, err := ioutil.ReadFile(dir + "/" + filename) + checkErr(err) + + hash := peer.GetBlobHash(blob) + if hash != filename { + log.Errorf("worker %d: filename does not match hash (%s != %s), skipping", worker, filename, hash) + select { + case params.countChan <- errInc: + case <-params.stopper.Ch(): + } + continue + } + + if isJSON(blob) { + log.Printf("worker %d: PUTTING SD BLOB %s", worker, hash) + if err := blobStore.PutSD(hash, blob); err != nil { + log.Error("PutSD Error: ", err) + } + select { + case params.countChan <- sdInc: + case <-params.stopper.Ch(): + } + } else { + log.Printf("worker %d: putting %s", worker, hash) + if err := blobStore.Put(hash, blob); err != nil { + log.Error("Put Blob Error: ", err) + } + select { + case params.countChan <- blobInc: + case <-params.stopper.Ch(): + } + } + } + } +} + +func runCountReceiver(params *uploaderParams, startTime time.Time, totalCount int, existsCount int) { + for { + select { + case <-params.stopper.Ch(): + return + case countType, ok := <-params.countChan: + if !ok { + return + } + switch countType { + case sdInc: + params.sdCount++ + case blobInc: + params.blobCount++ + case errInc: + params.errCount++ + } + } + if (params.sdCount+params.blobCount)%50 == 0 { + log.Printf("%d of %d done (%s elapsed, %.3fs per blob)", params.sdCount+params.blobCount, totalCount-existsCount, time.Since(startTime).String(), time.Since(startTime).Seconds()/float64(params.sdCount+params.blobCount)) + } + } +} + +func getFileNames(dir string) ([]string, error) { + f, err := os.Open(dir) + if err != nil { + return nil, err + } + + files, err := f.Readdir(-1) + if err != nil { + return nil, err + } + err = f.Close() + if err != nil { + return nil, err + } + + var filenames []string + for _, file := range files { + if !file.IsDir() { + filenames = append(filenames, file.Name()) + } + } + + return filenames, nil +} diff --git a/db/db.go b/db/db.go index 88e5445..7b39dfe 100644 --- a/db/db.go +++ b/db/db.go @@ -6,11 +6,13 @@ import ( "github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/querytools" "github.com/lbryio/reflector.go/types" - + // blank import for db driver _ "github.com/go-sql-driver/mysql" log "github.com/sirupsen/logrus" ) +// DB interface communicates to a backend database with a simple set of methods that supports tracking blobs that are +// used together with a BlobStore. The DB tracks pointers and the BlobStore stores the data. type DB interface { Connect(string) error HasBlob(string) (bool, error) @@ -18,6 +20,7 @@ type DB interface { AddSDBlob(string, int, types.SdBlob) error } +// SQL is the container for the supporting MySQL database connection. type SQL struct { conn *sql.DB } @@ -31,6 +34,7 @@ func logQuery(query string, args ...interface{}) { } } +// Connect will create a connection to the database func (s *SQL) Connect(dsn string) error { var err error dsn += "?parseTime=1&collation=utf8mb4_unicode_ci" @@ -42,6 +46,7 @@ func (s *SQL) Connect(dsn string) error { return errors.Err(s.conn.Ping()) } +// AddBlob adds a blobs information to the database. func (s *SQL) AddBlob(hash string, length int, stored bool) error { if s.conn == nil { return errors.Err("not connected") @@ -75,6 +80,7 @@ func addBlob(tx *sql.Tx, hash string, length int, stored bool) error { return nil } +// HasBlob checks if the database contains the blob information. func (s *SQL) HasBlob(hash string) (bool, error) { if s.conn == nil { return false, errors.Err("not connected") @@ -93,6 +99,7 @@ func (s *SQL) HasBlob(hash string) (bool, error) { return exists, errors.Err(err) } +// HasBlobs checks if the database contains the set of blobs and returns a bool map. func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) { if s.conn == nil { return nil, errors.Err("not connected") @@ -122,14 +129,14 @@ func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) { rows, err := s.conn.Query(query, args...) if err != nil { - rows.Close() + closeRows(rows) return exists, err } for rows.Next() { err := rows.Scan(&hash) if err != nil { - rows.Close() + closeRows(rows) return exists, err } exists[hash] = true @@ -137,17 +144,20 @@ func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) { err = rows.Err() if err != nil { - rows.Close() + closeRows(rows) return exists, err } - rows.Close() + closeRows(rows) doneIndex += len(batch) } return exists, nil } +// AddSDBlob takes the SD Hash number of blobs and the set of blobs. In a single db tx it inserts the sdblob information +// into a stream, and inserts the associated blobs' information in the database. If a blob fails the transaction is +// rolled back and error(s) are returned. func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob types.SdBlob) error { if s.conn == nil { return errors.Err("not connected") @@ -225,10 +235,14 @@ func withTx(dbOrTx interface{}, f txFunc) (err error) { } defer func() { if p := recover(); p != nil { - tx.Rollback() + if rollBackError := tx.Rollback(); rollBackError != nil { + log.Error("failed to rollback tx on panic - ", rollBackError) + } panic(p) } else if err != nil { - tx.Rollback() + if rollBackError := tx.Rollback(); rollBackError != nil { + log.Error("failed to rollback tx on panic - ", rollBackError) + } } else { err = errors.Err(tx.Commit()) } @@ -240,6 +254,13 @@ func withTx(dbOrTx interface{}, f txFunc) (err error) { return f(tx) } +func closeRows(rows *sql.Rows) { + if err := rows.Close(); err != nil { + log.Error("error closing rows: ", err) + } +} + +/*// func to generate schema. SQL below that. func schema() { _ = ` CREATE TABLE blob_ ( @@ -269,4 +290,35 @@ CREATE TABLE stream_blob ( ) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; ` -} +}*/ + +/* SQL script to create schema +CREATE TABLE `reflector`.`blob_` +( + `hash` char(96) NOT NULL, + `stored` TINYINT(1) NOT NULL DEFAULT 0, + `length` bigint(20) unsigned DEFAULT NULL, + `last_announced_at` datetime DEFAULT NULL, + PRIMARY KEY (`hash`), + KEY `last_announced_at_idx` (`last_announced_at`) +) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; + +CREATE TABLE `reflector`.`stream` +( + `hash` char(96) NOT NULL, + `sd_hash` char(96) NOT NULL, + PRIMARY KEY (hash), + KEY `sd_hash_idx` (`sd_hash`), + FOREIGN KEY (`sd_hash`) REFERENCES `blob_` (`hash`) ON DELETE RESTRICT ON UPDATE CASCADE +) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; + +CREATE TABLE `reflector`.`stream_blob` +( + `stream_hash` char(96) NOT NULL, + `blob_hash` char(96) NOT NULL, + `num` int NOT NULL, + PRIMARY KEY (`stream_hash`, `blob_hash`), + FOREIGN KEY (`stream_hash`) REFERENCES `stream` (`hash`) ON DELETE CASCADE ON UPDATE CASCADE, + FOREIGN KEY (`blob_hash`) REFERENCES `blob_` (`hash`) ON DELETE CASCADE ON UPDATE CASCADE +) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; +*/ diff --git a/dht/bitmap.go b/dht/bitmap.go index 0da55a0..8b9e7e2 100644 --- a/dht/bitmap.go +++ b/dht/bitmap.go @@ -6,15 +6,19 @@ import ( "encoding/hex" "strings" + "strconv" + "github.com/lbryio/lbry.go/errors" "github.com/lyoshenka/bencode" ) // TODO: http://roaringbitmap.org/ +// Bitmap is a generalized representation of an identifier or data that can be sorted, compared fast. Used by the DHT +// package as a way to handle the unique identifiers of a DHT node. type Bitmap [nodeIDLength]byte -func (b Bitmap) RawString() string { +func (b Bitmap) rawString() string { return string(b[:]) } @@ -31,14 +35,17 @@ func (b Bitmap) BString() string { return buf.String() } +// Hex returns a hexadecimal representation of the bitmap. func (b Bitmap) Hex() string { return hex.EncodeToString(b[:]) } +// HexShort returns a hexadecimal representation of the first 4 bytes. func (b Bitmap) HexShort() string { return hex.EncodeToString(b[:4]) } +// HexSimplified returns the hexadecimal representation with all leading 0's removed func (b Bitmap) HexSimplified() string { simple := strings.TrimLeft(b.Hex(), "0") if simple == "" { @@ -47,6 +54,7 @@ func (b Bitmap) HexSimplified() string { return simple } +// Equals returns T/F if every byte in bitmap are equal. func (b Bitmap) Equals(other Bitmap) bool { for k := range b { if b[k] != other[k] { @@ -56,6 +64,7 @@ func (b Bitmap) Equals(other Bitmap) bool { return true } +// Less returns T/F if there exists a byte pair that is not equal AND this bitmap is less than the other. func (b Bitmap) Less(other interface{}) bool { for k := range b { if b[k] != other.(Bitmap)[k] { @@ -65,6 +74,7 @@ func (b Bitmap) Less(other interface{}) bool { return false } +// LessOrEqual returns true if the bitmaps are equal, otherwise it checks if this bitmap is less than the other. func (b Bitmap) LessOrEqual(other interface{}) bool { if bm, ok := other.(Bitmap); ok && b.Equals(bm) { return true @@ -72,6 +82,7 @@ func (b Bitmap) LessOrEqual(other interface{}) bool { return b.Less(other) } +// Greater returns T/F if there exists a byte pair that is not equal AND this bitmap byte is greater than the other. func (b Bitmap) Greater(other interface{}) bool { for k := range b { if b[k] != other.(Bitmap)[k] { @@ -81,6 +92,7 @@ func (b Bitmap) Greater(other interface{}) bool { return false } +// GreaterOrEqual returns true if the bitmaps are equal, otherwise it checks if this bitmap is greater than the other. func (b Bitmap) GreaterOrEqual(other interface{}) bool { if bm, ok := other.(Bitmap); ok && b.Equals(bm) { return true @@ -88,12 +100,15 @@ func (b Bitmap) GreaterOrEqual(other interface{}) bool { return b.Greater(other) } +// Copy returns a duplicate value for the bitmap. func (b Bitmap) Copy() Bitmap { var ret Bitmap copy(ret[:], b[:]) return ret } +// Xor returns a diff bitmap. If they are equal, the returned bitmap will be all 0's. If 100% unique the returned +// bitmap will be all 1's. func (b Bitmap) Xor(other Bitmap) Bitmap { var ret Bitmap for k := range b { @@ -102,6 +117,7 @@ func (b Bitmap) Xor(other Bitmap) Bitmap { return ret } +// And returns a comparison bitmap, that for each byte returns the AND true table result func (b Bitmap) And(other Bitmap) Bitmap { var ret Bitmap for k := range b { @@ -110,6 +126,7 @@ func (b Bitmap) And(other Bitmap) Bitmap { return ret } +// Or returns a comparison bitmap, that for each byte returns the OR true table result func (b Bitmap) Or(other Bitmap) Bitmap { var ret Bitmap for k := range b { @@ -118,6 +135,7 @@ func (b Bitmap) Or(other Bitmap) Bitmap { return ret } +// Not returns a complimentary bitmap that is an inverse. So b.NOT.NOT = b func (b Bitmap) Not() Bitmap { var ret Bitmap for k := range b { @@ -138,16 +156,21 @@ func (b Bitmap) add(other Bitmap) (Bitmap, bool) { return ret, carry } +// Add returns a bitmap that treats both bitmaps as numbers and adding them together. Since the size of a bitmap is +// limited, an overflow is possible when adding bitmaps. func (b Bitmap) Add(other Bitmap) Bitmap { ret, carry := b.add(other) if carry { - panic("overflow in bitmap addition") + panic("overflow in bitmap addition. limited to " + strconv.Itoa(nodeIDBits) + " bits.") } return ret } +// Sub returns a bitmap that treats both bitmaps as numbers and subtracts then via the inverse of the other and adding +// then together a + (-b). Negative bitmaps are not supported so other must be greater than this. func (b Bitmap) Sub(other Bitmap) Bitmap { if b.Less(other) { + // ToDo: Why is this not supported? Should it say not implemented? BitMap might have a generic use case outside of dht. panic("negative bitmaps not supported") } complement, _ := other.Not().add(BitmapFromShortHexP("1")) @@ -155,10 +178,12 @@ func (b Bitmap) Sub(other Bitmap) Bitmap { return ret } +// Get returns the binary bit at the position passed. func (b Bitmap) Get(n int) bool { return getBit(b[:], n) } +// Set sets the binary bit at the position passed. func (b Bitmap) Set(n int, one bool) Bitmap { ret := b.Copy() setBit(ret[:], n, one) @@ -200,7 +225,7 @@ Outer: return ret } -// Syffix returns a copy of b with the last n bits set to 1 (if `one` is true) or 0 (if `one` is false) +// Suffix returns a copy of b with the last n bits set to 1 (if `one` is true) or 0 (if `one` is false) // https://stackoverflow.com/a/23192263/182709 func (b Bitmap) Suffix(n int, one bool) Bitmap { ret := b.Copy() @@ -223,11 +248,13 @@ Outer: return ret } +// MarshalBencode implements the Marshaller(bencode)/Message interface. func (b Bitmap) MarshalBencode() ([]byte, error) { str := string(b[:]) return bencode.EncodeBytes(str) } +// UnmarshalBencode implements the Marshaller(bencode)/Message interface. func (b *Bitmap) UnmarshalBencode(encoded []byte) error { var str string err := bencode.DecodeBytes(encoded, &str) @@ -241,6 +268,7 @@ func (b *Bitmap) UnmarshalBencode(encoded []byte) error { return nil } +// BitmapFromBytes returns a bitmap as long as the byte array is of a specific length specified in the parameters. func BitmapFromBytes(data []byte) (Bitmap, error) { var bmp Bitmap @@ -252,6 +280,8 @@ func BitmapFromBytes(data []byte) (Bitmap, error) { return bmp, nil } +// BitmapFromBytesP returns a bitmap as long as the byte array is of a specific length specified in the parameters +// otherwise it wil panic. func BitmapFromBytesP(data []byte) Bitmap { bmp, err := BitmapFromBytes(data) if err != nil { @@ -260,10 +290,14 @@ func BitmapFromBytesP(data []byte) Bitmap { return bmp } +//BitmapFromString returns a bitmap by converting the string to bytes and creating from bytes as long as the byte array +// is of a specific length specified in the parameters func BitmapFromString(data string) (Bitmap, error) { return BitmapFromBytes([]byte(data)) } +//BitmapFromStringP returns a bitmap by converting the string to bytes and creating from bytes as long as the byte array +// is of a specific length specified in the parameters otherwise it wil panic. func BitmapFromStringP(data string) Bitmap { bmp, err := BitmapFromString(data) if err != nil { @@ -272,6 +306,8 @@ func BitmapFromStringP(data string) Bitmap { return bmp } +//BitmapFromHex returns a bitmap by converting the hex string to bytes and creating from bytes as long as the byte array +// is of a specific length specified in the parameters func BitmapFromHex(hexStr string) (Bitmap, error) { decoded, err := hex.DecodeString(hexStr) if err != nil { @@ -280,6 +316,8 @@ func BitmapFromHex(hexStr string) (Bitmap, error) { return BitmapFromBytes(decoded) } +//BitmapFromHexP returns a bitmap by converting the hex string to bytes and creating from bytes as long as the byte array +// is of a specific length specified in the parameters otherwise it wil panic. func BitmapFromHexP(hexStr string) Bitmap { bmp, err := BitmapFromHex(hexStr) if err != nil { @@ -288,10 +326,15 @@ func BitmapFromHexP(hexStr string) Bitmap { return bmp } +//BitmapFromShortHex returns a bitmap by converting the hex string to bytes, adding the leading zeros prefix to the +// hex string and creating from bytes as long as the byte array is of a specific length specified in the parameters func BitmapFromShortHex(hexStr string) (Bitmap, error) { return BitmapFromHex(strings.Repeat("0", nodeIDLength*2-len(hexStr)) + hexStr) } +//BitmapFromShortHexP returns a bitmap by converting the hex string to bytes, adding the leading zeros prefix to the +// hex string and creating from bytes as long as the byte array is of a specific length specified in the parameters +// otherwise it wil panic. func BitmapFromShortHexP(hexStr string) Bitmap { bmp, err := BitmapFromShortHex(hexStr) if err != nil { @@ -300,6 +343,7 @@ func BitmapFromShortHexP(hexStr string) Bitmap { return bmp } +// RandomBitmapP generates a cryptographically random bitmap with the confines of the parameters specified. func RandomBitmapP() Bitmap { var id Bitmap _, err := rand.Read(id[:]) @@ -309,12 +353,16 @@ func RandomBitmapP() Bitmap { return id } +// RandomBitmapInRangeP generates a cryptographically random bitmap and while it is greater than the high threshold +// bitmap will subtract the diff between high and low until it is no longer greater that the high. func RandomBitmapInRangeP(low, high Bitmap) Bitmap { diff := high.Sub(low) r := RandomBitmapP() for r.Greater(diff) { r = r.Sub(diff) } + //ToDo - Adding the low at this point doesn't gurantee it will be within the range. Consider bitmaps as numbers and + // I have a range of 50-100. If get to say 60, and add 50, I would be at 110. Should protect against this? return r.Add(low) } diff --git a/dht/bitmap_test.go b/dht/bitmap_test.go index cd9f533..3b541a2 100644 --- a/dht/bitmap_test.go +++ b/dht/bitmap_test.go @@ -54,13 +54,10 @@ func TestBitmap(t *testing.T) { func TestBitmap_GetBit(t *testing.T) { tt := []struct { - hex string bit int expected bool panic bool }{ - //{hex: "0", bit: 385, one: true, expected: "1", panic:true}, // should error - //{hex: "0", bit: 384, one: true, expected: "1", panic:true}, {bit: 383, expected: false, panic: false}, {bit: 382, expected: true, panic: false}, {bit: 381, expected: false, panic: false}, diff --git a/dht/bootstrap.go b/dht/bootstrap.go index a4d629a..30077c2 100644 --- a/dht/bootstrap.go +++ b/dht/bootstrap.go @@ -13,6 +13,7 @@ const ( bootstrapDefaultRefreshDuration = 15 * time.Minute ) +// BootstrapNode is a configured node setup for testing. type BootstrapNode struct { Node @@ -24,7 +25,7 @@ type BootstrapNode struct { nodeKeys map[Bitmap]int } -// New returns a BootstrapNode pointer. +// NewBootstrapNode returns a BootstrapNode pointer. func NewBootstrapNode(id Bitmap, initialPingInterval, rePingInterval time.Duration) *BootstrapNode { b := &BootstrapNode{ Node: *NewNode(id), @@ -71,7 +72,7 @@ func (b *BootstrapNode) Connect(conn UDPConn) error { return nil } -// ypsert adds the contact to the list, or updates the lastPinged time +// upsert adds the contact to the list, or updates the lastPinged time func (b *BootstrapNode) upsert(c Contact) { b.nlock.Lock() defer b.nlock.Unlock() @@ -157,17 +158,21 @@ func (b *BootstrapNode) check() { func (b *BootstrapNode) handleRequest(addr *net.UDPAddr, request Request) { switch request.Method { case pingMethod: - b.sendMessage(addr, Response{ID: request.ID, NodeID: b.id, Data: pingSuccessResponse}) + if err := b.sendMessage(addr, Response{ID: request.ID, NodeID: b.id, Data: pingSuccessResponse}); err != nil { + log.Error("error sending response message - ", err) + } case findNodeMethod: if request.Arg == nil { log.Errorln("request is missing arg") return } - b.sendMessage(addr, Response{ + if err := b.sendMessage(addr, Response{ ID: request.ID, NodeID: b.id, Contacts: b.get(bucketSize), - }) + }); err != nil { + log.Error("error sending 'findnodemethod' response message - ", err) + } } go func() { diff --git a/dht/bootstrap_test.go b/dht/bootstrap_test.go index 8b45dee..e57fc83 100644 --- a/dht/bootstrap_test.go +++ b/dht/bootstrap_test.go @@ -13,7 +13,9 @@ func TestBootstrapPing(t *testing.T) { panic(err) } - b.Connect(listener.(*net.UDPConn)) + if err := b.Connect(listener.(*net.UDPConn)); err != nil { + t.Error(err) + } defer b.Shutdown() b.Shutdown() diff --git a/dht/dht.go b/dht/dht.go index 7595c84..250fcc1 100644 --- a/dht/dht.go +++ b/dht/dht.go @@ -35,8 +35,7 @@ const ( udpMaxMessageLength = 1024 // bytes. I think our longest message is ~676 bytes, so I rounded up maxPeerFails = 3 // after this many failures, a peer is considered bad and will be removed from the routing table - - tExpire = 60 * time.Minute // the time after which a key/value pair expires; this is a time-to-live (TTL) from the original publication date + //tExpire = 60 * time.Minute // the time after which a key/value pair expires; this is a time-to-live (TTL) from the original publication date tReannounce = 50 * time.Minute // the time after which the original publisher must republish a key/value pair tRefresh = 1 * time.Hour // the time after which an otherwise unaccessed bucket must be refreshed //tReplicate = 1 * time.Hour // the interval between Kademlia replication events, when a node is required to publish its entire database @@ -165,6 +164,7 @@ func (dht *DHT) Start() error { return nil } +// WaitUntilJoined blocks until the node joins the network. func (dht *DHT) WaitUntilJoined() { if dht.joined == nil { panic("dht not initialized") @@ -181,7 +181,8 @@ func (dht *DHT) Shutdown() { log.Debugf("[%s] DHT stopped", dht.node.id.HexShort()) } -// Get returns the list of nodes that have the blob for the given hash +// Ping pings a given address, creates a temporary contact for sending a message, and returns an error if communication +// fails. func (dht *DHT) Ping(addr string) error { raddr, err := net.ResolveUDPAddr(network, addr) if err != nil { @@ -254,7 +255,11 @@ func (dht *DHT) startReannouncer() { case <-tick.C: dht.lock.RLock() for h := range dht.announced { - go dht.Announce(h) + go func(bm Bitmap) { + if err := dht.Announce(bm); err != nil { + log.Error("error re-announcing bitmap - ", err) + } + }(h) } dht.lock.RUnlock() } @@ -310,6 +315,8 @@ func (dht *DHT) storeOnNode(hash Bitmap, c Contact) { }() } +// PrintState prints the current state of the DHT including address, nr outstanding transactions, stored hashes as well +// as current bucket information. func (dht *DHT) PrintState() { log.Printf("DHT node %s at %s", dht.contact.String(), time.Now().Format(time.RFC822Z)) log.Printf("Outstanding transactions: %d", dht.node.CountActiveTransactions()) diff --git a/dht/dht_test.go b/dht/dht_test.go index e702fa4..5b4bb00 100644 --- a/dht/dht_test.go +++ b/dht/dht_test.go @@ -8,7 +8,7 @@ import ( ) func TestNodeFinder_FindNodes(t *testing.T) { - bs, dhts := TestingCreateDHT(3, true, false) + bs, dhts := TestingCreateDHT(t, 3, true, false) defer func() { for i := range dhts { dhts[i].Shutdown() @@ -59,7 +59,7 @@ func TestNodeFinder_FindNodes(t *testing.T) { } func TestNodeFinder_FindNodes_NoBootstrap(t *testing.T) { - _, dhts := TestingCreateDHT(3, false, false) + _, dhts := TestingCreateDHT(t, 3, false, false) defer func() { for i := range dhts { dhts[i].Shutdown() @@ -74,7 +74,7 @@ func TestNodeFinder_FindNodes_NoBootstrap(t *testing.T) { } func TestNodeFinder_FindValue(t *testing.T) { - bs, dhts := TestingCreateDHT(3, true, false) + bs, dhts := TestingCreateDHT(t, 3, true, false) defer func() { for i := range dhts { dhts[i].Shutdown() @@ -108,7 +108,7 @@ func TestNodeFinder_FindValue(t *testing.T) { func TestDHT_LargeDHT(t *testing.T) { nodes := 100 - bs, dhts := TestingCreateDHT(nodes, true, true) + bs, dhts := TestingCreateDHT(t, nodes, true, true) defer func() { for _, d := range dhts { go d.Shutdown() @@ -121,10 +121,12 @@ func TestDHT_LargeDHT(t *testing.T) { ids := make([]Bitmap, nodes) for i := range ids { ids[i] = RandomBitmapP() - go func(i int) { - wg.Add(1) + wg.Add(1) + go func(index int) { defer wg.Done() - dhts[i].Announce(ids[i]) + if err := dhts[index].Announce(ids[index]); err != nil { + t.Error("error announcing random bitmap - ", err) + } }(i) } wg.Wait() diff --git a/dht/message.go b/dht/message.go index d858621..e67f42c 100644 --- a/dht/message.go +++ b/dht/message.go @@ -42,16 +42,19 @@ const ( tokenField = "token" ) +// Message is an extension of the bencode marshalling interface for serialized message passing. type Message interface { bencode.Marshaler } type messageID [messageIDLength]byte +// HexShort returns the first 8 hex characters of the hex encoded message id. func (m messageID) HexShort() string { return hex.EncodeToString(m[:])[:8] } +// UnmarshalBencode takes a byte slice and unmarshals the message id. func (m *messageID) UnmarshalBencode(encoded []byte) error { var str string err := bencode.DecodeBytes(encoded, &str) @@ -62,6 +65,7 @@ func (m *messageID) UnmarshalBencode(encoded []byte) error { return nil } +// MarshallBencode returns the encoded byte slice of the message id. func (m messageID) MarshalBencode() ([]byte, error) { str := string(m[:]) return bencode.EncodeBytes(str) @@ -76,6 +80,7 @@ func newMessageID() messageID { return m } +// Request represents the structured request from one node to another. type Request struct { ID messageID NodeID Bitmap @@ -84,6 +89,7 @@ type Request struct { StoreArgs *storeArgs } +// MarshalBencode returns the serialized byte slice representation of the request func (r Request) MarshalBencode() ([]byte, error) { var args interface{} if r.StoreArgs != nil { @@ -102,6 +108,7 @@ func (r Request) MarshalBencode() ([]byte, error) { }) } +// UnmarshalBencode unmarshals the serialized byte slice into the appropriate fields of the request. func (r *Request) UnmarshalBencode(b []byte) error { var raw struct { ID messageID `bencode:"1"` @@ -136,7 +143,7 @@ func (r *Request) UnmarshalBencode(b []byte) error { return nil } -func (r Request) ArgsDebug() string { +func (r Request) argsDebug() string { if r.StoreArgs != nil { return r.StoreArgs.BlobHash.HexShort() + ", " + r.StoreArgs.Value.LbryID.HexShort() + ":" + strconv.Itoa(r.StoreArgs.Value.Port) } else if r.Arg != nil { @@ -158,6 +165,7 @@ type storeArgs struct { SelfStore bool // this is an int on the wire } +// MarshalBencode returns the serialized byte slice representation of the storage arguments. func (s storeArgs) MarshalBencode() ([]byte, error) { encodedValue, err := bencode.EncodeString(s.Value) if err != nil { @@ -177,6 +185,7 @@ func (s storeArgs) MarshalBencode() ([]byte, error) { }) } +// UnmarshalBencode unmarshals the serialized byte slice into the appropriate fields of the store arguments. func (s *storeArgs) UnmarshalBencode(b []byte) error { var argsInt []bencode.RawMessage err := bencode.DecodeBytes(b, &argsInt) @@ -219,6 +228,7 @@ func (s *storeArgs) UnmarshalBencode(b []byte) error { return nil } +// Response represents the structured response one node returns to another. type Response struct { ID messageID NodeID Bitmap @@ -228,7 +238,7 @@ type Response struct { Token string } -func (r Response) ArgsDebug() string { +func (r Response) argsDebug() string { if r.Data != "" { return r.Data } @@ -251,6 +261,7 @@ func (r Response) ArgsDebug() string { return str } +// MarshalBencode returns the serialized byte slice representation of the response. func (r Response) MarshalBencode() ([]byte, error) { data := map[string]interface{}{ headerTypeField: responseType, @@ -293,6 +304,7 @@ func (r Response) MarshalBencode() ([]byte, error) { return bencode.EncodeBytes(data) } +// UnmarshalBencode unmarshals the serialized byte slice into the appropriate fields of the store arguments. func (r *Response) UnmarshalBencode(b []byte) error { var raw struct { ID messageID `bencode:"1"` @@ -362,6 +374,7 @@ func (r *Response) UnmarshalBencode(b []byte) error { return nil } +// Error represents an error message that is returned from one node to another in communication. type Error struct { ID messageID NodeID Bitmap @@ -369,6 +382,7 @@ type Error struct { Response []string } +// MarshalBencode returns the serialized byte slice representation of an error message. func (e Error) MarshalBencode() ([]byte, error) { return bencode.EncodeBytes(map[string]interface{}{ headerTypeField: errorType, @@ -379,6 +393,7 @@ func (e Error) MarshalBencode() ([]byte, error) { }) } +// UnmarshalBencode unmarshals the serialized byte slice into the appropriate fields of the error message. func (e *Error) UnmarshalBencode(b []byte) error { var raw struct { ID messageID `bencode:"1"` diff --git a/dht/message_test.go b/dht/message_test.go index 0eb0f52..1cac9e3 100644 --- a/dht/message_test.go +++ b/dht/message_test.go @@ -101,7 +101,7 @@ func TestBencodeFindValueResponse(t *testing.T) { res := Response{ ID: newMessageID(), NodeID: RandomBitmapP(), - FindValueKey: RandomBitmapP().RawString(), + FindValueKey: RandomBitmapP().rawString(), Token: "arst", Contacts: []Contact{ {ID: RandomBitmapP(), IP: net.IPv4(1, 2, 3, 4).To4(), Port: 5678}, diff --git a/dht/node.go b/dht/node.go index 4bcb4ba..556b30a 100644 --- a/dht/node.go +++ b/dht/node.go @@ -33,8 +33,10 @@ type UDPConn interface { Close() error } +// RequestHandlerFunc is exported handler for requests. type RequestHandlerFunc func(addr *net.UDPAddr, request Request) +// Node is a type representation of a node on the network. type Node struct { // the node's id id Bitmap @@ -61,7 +63,7 @@ type Node struct { stop *stopOnce.Stopper } -// New returns a Node pointer. +// NewNode returns an initialized Node's pointer. func NewNode(id Bitmap) *Node { return &Node{ id: id, @@ -87,13 +89,14 @@ func (n *Node) Connect(conn UDPConn) error { <-n.stop.Ch() n.tokens.Stop() n.connClosed = true - n.conn.Close() + if err := n.conn.Close(); err != nil { + log.Error("error closing node connection on shutdown - ", err) + } }() packets := make(chan packet) - + n.stop.Add(1) go func() { - n.stop.Add(1) defer n.stop.Done() buf := make([]byte, udpMaxMessageLength) @@ -121,9 +124,8 @@ func (n *Node) Connect(conn UDPConn) error { } } }() - + n.stop.Add(1) go func() { - n.stop.Add(1) defer n.stop.Done() var pkt packet @@ -171,7 +173,7 @@ func (n *Node) handlePacket(pkt packet) { log.Errorf("[%s] error decoding request from %s: %s: (%d bytes) %s", n.id.HexShort(), pkt.raddr.String(), err.Error(), len(pkt.data), hex.EncodeToString(pkt.data)) return } - log.Debugf("[%s] query %s: received request from %s: %s(%s)", n.id.HexShort(), request.ID.HexShort(), request.NodeID.HexShort(), request.Method, request.ArgsDebug()) + log.Debugf("[%s] query %s: received request from %s: %s(%s)", n.id.HexShort(), request.ID.HexShort(), request.NodeID.HexShort(), request.Method, request.argsDebug()) n.handleRequest(pkt.raddr, request) case '0' + responseType: @@ -181,7 +183,7 @@ func (n *Node) handlePacket(pkt packet) { log.Errorf("[%s] error decoding response from %s: %s: (%d bytes) %s", n.id.HexShort(), pkt.raddr.String(), err.Error(), len(pkt.data), hex.EncodeToString(pkt.data)) return } - log.Debugf("[%s] query %s: received response from %s: %s", n.id.HexShort(), response.ID.HexShort(), response.NodeID.HexShort(), response.ArgsDebug()) + log.Debugf("[%s] query %s: received response from %s: %s", n.id.HexShort(), response.ID.HexShort(), response.NodeID.HexShort(), response.argsDebug()) n.handleResponse(pkt.raddr, response) case '0' + errorType: @@ -219,26 +221,34 @@ func (n *Node) handleRequest(addr *net.UDPAddr, request Request) { log.Errorln("invalid request method") return case pingMethod: - n.sendMessage(addr, Response{ID: request.ID, NodeID: n.id, Data: pingSuccessResponse}) + if err := n.sendMessage(addr, Response{ID: request.ID, NodeID: n.id, Data: pingSuccessResponse}); err != nil { + log.Error("error sending 'pingmethod' response message - ", err) + } case storeMethod: // TODO: we should be sending the IP in the request, not just using the sender's IP // TODO: should we be using StoreArgs.NodeID or StoreArgs.Value.LbryID ??? if n.tokens.Verify(request.StoreArgs.Value.Token, request.NodeID, addr) { n.Store(request.StoreArgs.BlobHash, Contact{ID: request.StoreArgs.NodeID, IP: addr.IP, Port: request.StoreArgs.Value.Port}) - n.sendMessage(addr, Response{ID: request.ID, NodeID: n.id, Data: storeSuccessResponse}) + if err := n.sendMessage(addr, Response{ID: request.ID, NodeID: n.id, Data: storeSuccessResponse}); err != nil { + log.Error("error sending 'storemethod' response message - ", err) + } } else { - n.sendMessage(addr, Error{ID: request.ID, NodeID: n.id, ExceptionType: "invalid-token"}) + if err := n.sendMessage(addr, Error{ID: request.ID, NodeID: n.id, ExceptionType: "invalid-token"}); err != nil { + log.Error("error sending 'storemethod'response message for invalid-token - ", err) + } } case findNodeMethod: if request.Arg == nil { log.Errorln("request is missing arg") return } - n.sendMessage(addr, Response{ + if err := n.sendMessage(addr, Response{ ID: request.ID, NodeID: n.id, Contacts: n.rt.GetClosest(*request.Arg, bucketSize), - }) + }); err != nil { + log.Error("error sending 'findnodemethod' response message - ", err) + } case findValueMethod: if request.Arg == nil { @@ -253,13 +263,15 @@ func (n *Node) handleRequest(addr *net.UDPAddr, request Request) { } if contacts := n.store.Get(*request.Arg); len(contacts) > 0 { - res.FindValueKey = request.Arg.RawString() + res.FindValueKey = request.Arg.rawString() res.Contacts = contacts } else { res.Contacts = n.rt.GetClosest(*request.Arg, bucketSize) } - n.sendMessage(addr, res) + if err := n.sendMessage(addr, res); err != nil { + log.Error("error sending 'findvaluemethod' response message - ", err) + } } // nodes that send us requests should not be inserted, only refreshed. @@ -294,15 +306,17 @@ func (n *Node) sendMessage(addr *net.UDPAddr, data Message) error { if req, ok := data.(Request); ok { log.Debugf("[%s] query %s: sending request to %s (%d bytes) %s(%s)", - n.id.HexShort(), req.ID.HexShort(), addr.String(), len(encoded), req.Method, req.ArgsDebug()) + n.id.HexShort(), req.ID.HexShort(), addr.String(), len(encoded), req.Method, req.argsDebug()) } else if res, ok := data.(Response); ok { log.Debugf("[%s] query %s: sending response to %s (%d bytes) %s", - n.id.HexShort(), res.ID.HexShort(), addr.String(), len(encoded), res.ArgsDebug()) + n.id.HexShort(), res.ID.HexShort(), addr.String(), len(encoded), res.argsDebug()) } else { log.Debugf("[%s] (%d bytes) %s", n.id.HexShort(), len(encoded), spew.Sdump(data)) } - n.conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) + if err := n.conn.SetWriteDeadline(time.Now().Add(5 * time.Second)); err != nil { + log.Error("error setting write deadline - ", err) + } _, err = n.conn.WriteToUDP(encoded, addr) return errors.Err(err) @@ -405,7 +419,7 @@ func (n *Node) SendCancelable(contact Contact, req Request) (<-chan *Response, c return n.SendAsync(ctx, contact, req), cancel } -// Count returns the number of transactions in the manager +// CountActiveTransactions returns the number of transactions in the manager func (n *Node) CountActiveTransactions() int { n.txLock.Lock() defer n.txLock.Unlock() @@ -428,6 +442,7 @@ func (n *Node) startRoutingTableGrooming() { }() } +// Store stores a node contact in the node's contact store. func (n *Node) Store(hash Bitmap, c Contact) { n.store.Upsert(hash, c) } diff --git a/dht/node_finder.go b/dht/node_finder.go index d38eec9..6fbd35f 100644 --- a/dht/node_finder.go +++ b/dht/node_finder.go @@ -195,6 +195,9 @@ func (cf *contactFinder) insertIntoActiveList(contact Contact) { inserted := false for i, n := range cf.activeContacts { + // 5000ft: insert contact into sorted active contacts list + // Detail: if diff between new contact id and the target id has fewer changes than the n contact from target + // it should be inserted in between the previous and current. if contact.ID.Xor(cf.target).Less(n.ID.Xor(cf.target)) { cf.activeContacts = append(cf.activeContacts[:i], append([]Contact{contact}, cf.activeContacts[i:]...)...) inserted = true diff --git a/dht/node_test.go b/dht/node_test.go index e0f2c88..f9fbdd5 100644 --- a/dht/node_test.go +++ b/dht/node_test.go @@ -30,7 +30,7 @@ func TestPing(t *testing.T) { data, err := bencode.EncodeBytes(map[string]interface{}{ headerTypeField: requestType, headerMessageIDField: messageID, - headerNodeIDField: testNodeID.RawString(), + headerNodeIDField: testNodeID.rawString(), headerPayloadField: "ping", headerArgsField: []string{}, }) @@ -86,7 +86,7 @@ func TestPing(t *testing.T) { rNodeID, ok := response[headerNodeIDField].(string) if !ok { t.Error("node ID is not a string") - } else if rNodeID != dhtNodeID.RawString() { + } else if rNodeID != dhtNodeID.rawString() { t.Error("unexpected node ID") } } @@ -176,7 +176,7 @@ func TestStore(t *testing.T) { } } - verifyResponse(t, response, messageID, dhtNodeID.RawString()) + verifyResponse(t, response, messageID, dhtNodeID.rawString()) _, ok := response[headerPayloadField] if !ok { @@ -257,7 +257,7 @@ func TestFindNode(t *testing.T) { } } - verifyResponse(t, response, messageID, dhtNodeID.RawString()) + verifyResponse(t, response, messageID, dhtNodeID.rawString()) _, ok := response[headerPayloadField] if !ok { @@ -290,10 +290,8 @@ func TestFindValueExisting(t *testing.T) { defer dht.Shutdown() nodesToInsert := 3 - var nodes []Contact for i := 0; i < nodesToInsert; i++ { n := Contact{ID: RandomBitmapP(), IP: net.ParseIP("127.0.0.1"), Port: 10000 + i} - nodes = append(nodes, n) dht.node.rt.Update(n) } @@ -333,7 +331,7 @@ func TestFindValueExisting(t *testing.T) { } } - verifyResponse(t, response, messageID, dhtNodeID.RawString()) + verifyResponse(t, response, messageID, dhtNodeID.rawString()) _, ok := response[headerPayloadField] if !ok { @@ -345,7 +343,7 @@ func TestFindValueExisting(t *testing.T) { t.Fatal("payload is not a dictionary") } - compactContacts, ok := payload[valueToFind.RawString()] + compactContacts, ok := payload[valueToFind.rawString()] if !ok { t.Fatal("payload is missing key for search value") } @@ -412,7 +410,7 @@ func TestFindValueFallbackToFindNode(t *testing.T) { } } - verifyResponse(t, response, messageID, dhtNodeID.RawString()) + verifyResponse(t, response, messageID, dhtNodeID.rawString()) _, ok := response[headerPayloadField] if !ok { diff --git a/dht/routing_table.go b/dht/routing_table.go index 29bbb81..6507f9d 100644 --- a/dht/routing_table.go +++ b/dht/routing_table.go @@ -14,6 +14,7 @@ import ( "github.com/lbryio/lbry.go/errors" "github.com/lyoshenka/bencode" + log "github.com/sirupsen/logrus" ) // TODO: if routing table is ever empty (aka the node is isolated), it should re-bootstrap @@ -21,24 +22,29 @@ import ( // TODO: use a tree with bucket splitting instead of a fixed bucket list. include jack's optimization (see link in commit mesg) // https://github.com/lbryio/lbry/pull/1211/commits/341b27b6d21ac027671d42458826d02735aaae41 +// Contact is a type representation of another node that a specific node is in communication with. type Contact struct { ID Bitmap IP net.IP Port int } +// Equals returns T/F if two contacts are the same. func (c Contact) Equals(other Contact) bool { return c.ID == other.ID } +// Addr returns the UPD Address of the contact. func (c Contact) Addr() *net.UDPAddr { return &net.UDPAddr{IP: c.IP, Port: c.Port} } +// String returns the concatenated short hex encoded string of its ID + @ + string represention of its UPD Address. func (c Contact) String() string { return c.ID.HexShort() + "@" + c.Addr().String() } +// MarshalCompact returns the compact byte slice representation of a contact. func (c Contact) MarshalCompact() ([]byte, error) { if c.IP.To4() == nil { return nil, errors.Err("ip not set") @@ -60,6 +66,7 @@ func (c Contact) MarshalCompact() ([]byte, error) { return buf.Bytes(), nil } +// UnmarshalCompact unmarshals the compact byte slice representation of a contact. func (c *Contact) UnmarshalCompact(b []byte) error { if len(b) != compactNodeInfoLength { return errors.Err("invalid compact length") @@ -70,10 +77,12 @@ func (c *Contact) UnmarshalCompact(b []byte) error { return nil } +// MarshalBencode returns the serialized byte slice representation of a contact. func (c Contact) MarshalBencode() ([]byte, error) { return bencode.EncodeBytes([]interface{}{c.ID, c.IP.String(), c.Port}) } +// UnmarshalBencode unmarshals the serialized byte slice into the appropriate fields of the contact. func (c *Contact) UnmarshalBencode(b []byte) error { var raw []bencode.RawMessage err := bencode.DecodeBytes(b, &raw) @@ -139,7 +148,7 @@ func (p *peer) Touch() { // ActiveSince returns whether a peer has responded in the last `d` duration // this is used to check if the peer is "good", meaning that we believe the peer will respond to our requests func (p *peer) ActiveInLast(d time.Duration) bool { - return time.Now().Sub(p.LastActivity) > d + return time.Since(p.LastActivity) > d } // IsBad returns whether a peer is "bad", meaning that it has failed to respond to multiple pings in a row @@ -236,7 +245,7 @@ func find(id Bitmap, peers []peer) int { func (b *bucket) NeedsRefresh(refreshInterval time.Duration) bool { b.lock.RLock() defer b.lock.RUnlock() - return time.Now().Sub(b.lastUpdate) > refreshInterval + return time.Since(b.lastUpdate) > refreshInterval } type routingTable struct { @@ -341,6 +350,7 @@ func (rt *routingTable) Count() int { return count } +// Range is a structure that holds a min and max bitmaps. The range is used in bucket sizing. type Range struct { start Bitmap end Bitmap @@ -457,7 +467,9 @@ func RoutingTableRefresh(n *Node, refreshInterval time.Duration, cancel <-chan s }() } - nf.Find() + if _, err := nf.Find(); err != nil { + log.Error("error finding contact during routing table refresh - ", err) + } }(id) } diff --git a/dht/testing.go b/dht/testing.go index 0a69439..b93d2a0 100644 --- a/dht/testing.go +++ b/dht/testing.go @@ -13,7 +13,8 @@ import ( var testingDHTIP = "127.0.0.1" var testingDHTFirstPort = 21000 -func TestingCreateDHT(numNodes int, bootstrap, concurrent bool) (*BootstrapNode, []*DHT) { +// TestingCreateDHT initializes a testable DHT network with a specific number of nodes, with bootstrap and concurrent options. +func TestingCreateDHT(t *testing.T, numNodes int, bootstrap, concurrent bool) (*BootstrapNode, []*DHT) { var bootstrapNode *BootstrapNode var seeds []string @@ -25,7 +26,9 @@ func TestingCreateDHT(numNodes int, bootstrap, concurrent bool) (*BootstrapNode, if err != nil { panic(err) } - bootstrapNode.Connect(listener.(*net.UDPConn)) + if err := bootstrapNode.Connect(listener.(*net.UDPConn)); err != nil { + t.Error("error connecting bootstrap node - ", err) + } } if numNodes < 1 { @@ -41,7 +44,11 @@ func TestingCreateDHT(numNodes int, bootstrap, concurrent bool) (*BootstrapNode, panic(err) } - go dht.Start() + go func() { + if err := dht.Start(); err != nil { + t.Error("error starting dht - ", err) + } + }() if !concurrent { dht.WaitUntilJoined() } @@ -103,7 +110,7 @@ func newTestUDPConn(addr string) *testUDPConn { func (t testUDPConn) ReadFromUDP(b []byte) (int, *net.UDPAddr, error) { var timeoutCh <-chan time.Time if !t.readDeadline.IsZero() { - timeoutCh = time.After(t.readDeadline.Sub(time.Now())) + timeoutCh = time.After(time.Until(t.readDeadline)) } select { @@ -218,7 +225,7 @@ func verifyContacts(t *testing.T, contacts []interface{}, nodes []Contact) { continue } for _, n := range nodes { - if n.ID.RawString() == id { + if n.ID.rawString() == id { currNode = n currNodeFound = true foundNodes[id] = true diff --git a/peer/server.go b/peer/server.go index c232593..2b115b6 100644 --- a/peer/server.go +++ b/peer/server.go @@ -18,35 +18,47 @@ import ( ) const ( - DefaultPort = 3333 + // DefaultPort is the port the peer server listens on if not passed in. + DefaultPort = 3333 + // LbrycrdAddress to be used when paying for data. Not implemented yet. LbrycrdAddress = "bJxKvpD96kaJLriqVajZ7SaQTsWWyrGQct" ) +// Server is an instance of a peer server that houses the listener and store. type Server struct { store store.BlobStore l net.Listener closed bool } +// NewServer returns an initialized Server pointer. func NewServer(store store.BlobStore) *Server { return &Server{ store: store, } } +// Shutdown gracefully shuts down the peer server. func (s *Server) Shutdown() { // TODO: need waitgroup so we can finish whatever we're doing before stopping s.closed = true - s.l.Close() + if err := s.l.Close(); err != nil { + log.Error("error shuting down peer server - ", err) + } } +// ListenAndServe starts the server listener to handle connections. func (s *Server) ListenAndServe(address string) error { log.Println("Listening on " + address) l, err := net.Listen("tcp", address) if err != nil { return err } - defer l.Close() + defer func(listener net.Listener) { + if err := listener.Close(); err != nil { + log.Error("error closing listener for peer server - ", err) + } + }(l) for { conn, err := l.Accept() @@ -62,7 +74,11 @@ func (s *Server) ListenAndServe(address string) error { } func (s *Server) handleConnection(conn net.Conn) { - defer conn.Close() + defer func(conn net.Conn) { + if err := conn.Close(); err != nil { + log.Error("error closing client connection for peer server - ", err) + } + }(conn) timeoutDuration := 5 * time.Second @@ -71,7 +87,9 @@ func (s *Server) handleConnection(conn net.Conn) { var response []byte var err error - conn.SetReadDeadline(time.Now().Add(timeoutDuration)) + if err := conn.SetReadDeadline(time.Now().Add(timeoutDuration)); err != nil { + log.Error("error setting read deadline for client connection - ", err) + } request, err = readNextRequest(conn) if err != nil { if err != io.EOF { @@ -79,7 +97,9 @@ func (s *Server) handleConnection(conn net.Conn) { } return } - conn.SetReadDeadline(time.Time{}) + if err := conn.SetReadDeadline(time.Time{}); err != nil { + log.Error("error setting read deadline client connection - ", err) + } if strings.Contains(string(request), `"requested_blobs"`) { log.Debugln("received availability request") @@ -225,6 +245,7 @@ func isValidJSON(b []byte) bool { return json.Unmarshal(b, &r) == nil } +// GetBlobHash returns the sha512 hash hex encoded string of the blob byte slice. func GetBlobHash(blob []byte) string { hashBytes := sha512.Sum384(blob) return hex.EncodeToString(hashBytes[:]) @@ -234,7 +255,8 @@ const ( maxRequestSize = 64 * (2 ^ 10) // 64kb paymentRateAccepted = "RATE_ACCEPTED" paymentRateTooLow = "RATE_TOO_LOW" - paymentRateUnset = "RATE_UNSET" + //ToDo: paymentRateUnset is not used but exists in the protocol. + //paymentRateUnset = "RATE_UNSET" ) var errRequestTooLarge = errors.Base("request is too large") diff --git a/peer/server_test.go b/peer/server_test.go index 193ab4a..d8c0837 100644 --- a/peer/server_test.go +++ b/peer/server_test.go @@ -33,18 +33,20 @@ var availabilityRequests = []pair{ }, } -func getServer(withBlobs bool) *Server { +func getServer(t *testing.T, withBlobs bool) *Server { st := store.MemoryBlobStore{} if withBlobs { for k, v := range blobs { - st.Put(k, v) + if err := st.Put(k, v); err != nil { + t.Error("error during put operation of memory blobstore - ", err) + } } } return NewServer(&st) } func TestAvailabilityRequest_NoBlobs(t *testing.T) { - s := getServer(false) + s := getServer(t, false) for _, p := range availabilityRequests { response, err := s.handleAvailabilityRequest(p.request) @@ -59,7 +61,7 @@ func TestAvailabilityRequest_NoBlobs(t *testing.T) { } func TestAvailabilityRequest_WithBlobs(t *testing.T) { - s := getServer(true) + s := getServer(t, true) for _, p := range availabilityRequests { response, err := s.handleAvailabilityRequest(p.request) diff --git a/reflector/client.go b/reflector/client.go index ec40f86..b26d51d 100644 --- a/reflector/client.go +++ b/reflector/client.go @@ -9,11 +9,13 @@ import ( log "github.com/sirupsen/logrus" ) +// Client is an instance of a client connected to a server. type Client struct { conn net.Conn connected bool } +// Connect connects to a specific clients and errors if it cannot be contacted. func (c *Client) Connect(address string) error { var err error c.conn, err = net.Dial("tcp", address) @@ -23,11 +25,14 @@ func (c *Client) Connect(address string) error { c.connected = true return c.doHandshake(protocolVersion1) } + +// Close closes the connection with the client. func (c *Client) Close() error { c.connected = false return c.conn.Close() } +// SendBlob sends a send blob request to the client. func (c *Client) SendBlob(blob []byte) error { if !c.connected { return errors.Err("not connected") diff --git a/reflector/client_test.go b/reflector/client_test.go index 674cc97..6067e9f 100644 --- a/reflector/client_test.go +++ b/reflector/client_test.go @@ -8,21 +8,29 @@ import ( "testing" "github.com/lbryio/reflector.go/store" + log "github.com/sirupsen/logrus" ) var address = "localhost:" + strconv.Itoa(DefaultPort) -var s Server func TestMain(m *testing.M) { dir, err := ioutil.TempDir("", "reflector_client_test") if err != nil { - panic(err) + log.Panic("could not create temp directory - ", err) } - defer os.RemoveAll(dir) + defer func(directory string) { + if err := os.RemoveAll(dir); err != nil { + log.Panic("error removing files and directory - ", err) + } + }(dir) ms := store.MemoryBlobStore{} s := NewServer(&ms) - go s.ListenAndServe(address) + go func() { + if err := s.ListenAndServe(address); err != nil { + log.Panic("error starting up reflector server - ", err) + } + }() os.Exit(m.Run()) } @@ -39,7 +47,7 @@ func TestSmallBlob(t *testing.T) { c := Client{} err := c.Connect(address) if err != nil { - t.Error(err) + t.Error("error connecting client to server - ", err) } err = c.SendBlob([]byte{}) diff --git a/reflector/prism.go b/reflector/prism.go index e1f865c..63c1ff2 100644 --- a/reflector/prism.go +++ b/reflector/prism.go @@ -8,6 +8,7 @@ import ( "github.com/lbryio/reflector.go/store" ) +// Prism is the root instance of the application and houses the DHT, Peer Server, Reflector Server, and Cluster. type Prism struct { dht *dht.DHT peer *peer.Server @@ -17,6 +18,7 @@ type Prism struct { stop *stopOnce.Stopper } +// NewPrism returns an initialized Prism instance pointer. func NewPrism(store store.BlobStore, clusterSeedAddr string) *Prism { d, err := dht.New(nil) if err != nil { @@ -31,6 +33,7 @@ func NewPrism(store store.BlobStore, clusterSeedAddr string) *Prism { } } +// Connect starts the components of the application. func (p *Prism) Connect() error { err := p.dht.Start() if err != nil { @@ -49,6 +52,7 @@ func (p *Prism) Connect() error { return nil } +// Shutdown gracefully shuts down the different prism components before exiting. func (p *Prism) Shutdown() { p.stop.StopAndWait() p.reflector.Shutdown() diff --git a/reflector/server.go b/reflector/server.go index e5fb7a4..9ca9983 100644 --- a/reflector/server.go +++ b/reflector/server.go @@ -14,31 +14,43 @@ import ( log "github.com/sirupsen/logrus" ) +// Server is and instance of the reflector server. It houses the blob store and listener. type Server struct { store store.BlobStore l net.Listener closed bool } +// NewServer returns an initialized reflector server pointer. func NewServer(store store.BlobStore) *Server { return &Server{ store: store, } } +// Shutdown shuts down the reflector server gracefully. func (s *Server) Shutdown() { // TODO: need waitgroup so we can finish whatever we're doing before stopping s.closed = true - s.l.Close() + if err := s.l.Close(); err != nil { + log.Error("error shutting down reflector server - ", err) + } } +//ListenAndServe starts the server listener to handle connections. func (s *Server) ListenAndServe(address string) error { + //ToDo - We should make this DRY as it is the same code in both servers. log.Println("Listening on " + address) l, err := net.Listen("tcp", address) if err != nil { return err } - defer l.Close() + + defer func(listener net.Listener) { + if err := listener.Close(); err != nil { + log.Error("error closing reflector server listener - ", err) + } + }(l) for { conn, err := l.Accept() @@ -55,14 +67,20 @@ func (s *Server) ListenAndServe(address string) error { func (s *Server) handleConn(conn net.Conn) { // TODO: connection should time out eventually - defer conn.Close() + defer func(conn net.Conn) { + if err := conn.Close(); err != nil { + log.Error("error closing reflector client connection - ", err) + } + }(conn) err := s.doHandshake(conn) if err != nil { if err == io.EOF { return } - s.doError(conn, err) + if err := s.doError(conn, err); err != nil { + log.Error("error sending error response to reflector client connection - ", err) + } return } @@ -70,7 +88,9 @@ func (s *Server) handleConn(conn net.Conn) { err = s.receiveBlob(conn) if err != nil { if err != io.EOF { - s.doError(conn, err) + if err := s.doError(conn, err); err != nil { + log.Error("error sending error response for receiving a blob to reflector client connection - ", err) + } } return } diff --git a/reflector/shared.go b/reflector/shared.go index cee4855..44ff5a6 100644 --- a/reflector/shared.go +++ b/reflector/shared.go @@ -8,6 +8,7 @@ import ( ) const ( + // DefaultPort is the port the reflector server listens on if not passed in. DefaultPort = 5566 maxBlobSize = 2 * 1024 * 1024 @@ -16,6 +17,7 @@ const ( protocolVersion2 = 1 ) +// ErrBlobExists is a default error for when a blob already exists on the reflector server. var ErrBlobExists = errors.Base("blob exists on server") type errorResponse struct { diff --git a/store/dbbacked.go b/store/dbbacked.go index cc1bd4d..f5d2e60 100644 --- a/store/dbbacked.go +++ b/store/dbbacked.go @@ -8,23 +8,28 @@ import ( "github.com/lbryio/reflector.go/types" ) +// DBBackedS3Store is an instance of an S3 Store that is backed by a DB for what is stored. type DBBackedS3Store struct { s3 *S3BlobStore db db.DB } +// NewDBBackedS3Store returns an initialized store pointer. func NewDBBackedS3Store(s3 *S3BlobStore, db db.DB) *DBBackedS3Store { return &DBBackedS3Store{s3: s3, db: db} } +// Has returns T/F or Error ( if the DB errors ) if store contains the blob. func (d *DBBackedS3Store) Has(hash string) (bool, error) { return d.db.HasBlob(hash) } +// Get returns the byte slice of the blob or an error. func (d *DBBackedS3Store) Get(hash string) ([]byte, error) { return d.s3.Get(hash) } +// Put stores the blob in the S3 store and stores the blob information in the DB. func (d *DBBackedS3Store) Put(hash string, blob []byte) error { err := d.s3.Put(hash, blob) if err != nil { @@ -34,6 +39,8 @@ func (d *DBBackedS3Store) Put(hash string, blob []byte) error { return d.db.AddBlob(hash, len(blob), true) } +// PutSD stores the SDBlob in the S3 store. It will return an error if the sd blob is missing the stream hash or if +// there is an error storing the blob information in the DB. func (d *DBBackedS3Store) PutSD(hash string, blob []byte) error { var blobContents types.SdBlob err := json.Unmarshal(blob, &blobContents) diff --git a/store/file.go b/store/file.go index dc37ac8..a6acdbe 100644 --- a/store/file.go +++ b/store/file.go @@ -8,12 +8,14 @@ import ( "github.com/lbryio/lbry.go/errors" ) +// FileBlobStore is a local disk store. type FileBlobStore struct { dir string initialized bool } +// NewFileBlobStore returns an initialized file disk store pointer. func NewFileBlobStore(dir string) *FileBlobStore { return &FileBlobStore{dir: dir} } @@ -43,6 +45,7 @@ func (f *FileBlobStore) initOnce() error { return nil } +// Has returns T/F or Error if it the blob stored already. It will error with any IO disk error. func (f *FileBlobStore) Has(hash string) (bool, error) { err := f.initOnce() if err != nil { @@ -59,6 +62,7 @@ func (f *FileBlobStore) Has(hash string) (bool, error) { return true, nil } +// Get returns the byte slice of the blob stored or will error if the blob doesn't exist. func (f *FileBlobStore) Get(hash string) ([]byte, error) { err := f.initOnce() if err != nil { @@ -76,6 +80,7 @@ func (f *FileBlobStore) Get(hash string) ([]byte, error) { return ioutil.ReadAll(file) } +// Put stores the blob on disk or errors with any IO error. func (f *FileBlobStore) Put(hash string, blob []byte) error { err := f.initOnce() if err != nil { @@ -85,6 +90,8 @@ func (f *FileBlobStore) Put(hash string, blob []byte) error { return ioutil.WriteFile(f.path(hash), blob, 0644) } +// PutSD stores the sd blob on the disk or errors with any IO error. func (f *FileBlobStore) PutSD(hash string, blob []byte) error { + //Todo - need to handle when streaming hash is not present. return f.Put(hash, blob) } diff --git a/store/memory.go b/store/memory.go index 5a81dd4..0a52e0c 100644 --- a/store/memory.go +++ b/store/memory.go @@ -2,10 +2,12 @@ package store import "github.com/lbryio/lbry.go/errors" +// MemoryBlobStore is an in memory only blob store with no persistence. type MemoryBlobStore struct { blobs map[string][]byte } +// Has returns T/F if the blob is currently stored. It will never error. func (m *MemoryBlobStore) Has(hash string) (bool, error) { if m.blobs == nil { m.blobs = make(map[string][]byte) @@ -14,6 +16,7 @@ func (m *MemoryBlobStore) Has(hash string) (bool, error) { return ok, nil } +// Get returns the blob byte slice if present and errors if the blob is not found. func (m *MemoryBlobStore) Get(hash string) ([]byte, error) { if m.blobs == nil { m.blobs = make(map[string][]byte) @@ -25,6 +28,7 @@ func (m *MemoryBlobStore) Get(hash string) ([]byte, error) { return blob, nil } +// Put stores the blob in memory. It will never error. func (m *MemoryBlobStore) Put(hash string, blob []byte) error { if m.blobs == nil { m.blobs = make(map[string][]byte) @@ -33,6 +37,8 @@ func (m *MemoryBlobStore) Put(hash string, blob []byte) error { return nil } +// PutSD stores the sd blob in memory. It will never error. func (m *MemoryBlobStore) PutSD(hash string, blob []byte) error { + //ToDo - need to handle when stream is not present. return m.Put(hash, blob) } diff --git a/store/memory_test.go b/store/memory_test.go index c2bbfc6..58d47a3 100644 --- a/store/memory_test.go +++ b/store/memory_test.go @@ -20,7 +20,9 @@ func TestMemoryBlobStore_Get(t *testing.T) { s := MemoryBlobStore{} hash := "abc" blob := []byte("abcdefg") - s.Put(hash, blob) + if err := s.Put(hash, blob); err != nil { + t.Error("error getting memory blob - ", err) + } gotBlob, err := s.Get(hash) if err != nil { diff --git a/store/s3.go b/store/s3.go index 0eac518..abec192 100644 --- a/store/s3.go +++ b/store/s3.go @@ -16,6 +16,7 @@ import ( log "github.com/sirupsen/logrus" ) +// S3BlobStore is an S3 store type S3BlobStore struct { awsID string awsSecret string @@ -25,6 +26,7 @@ type S3BlobStore struct { session *session.Session } +// NewS3BlobStore returns an initialized S3 store pointer. func NewS3BlobStore(awsID, awsSecret, region, bucket string) *S3BlobStore { return &S3BlobStore{ awsID: awsID, @@ -51,6 +53,7 @@ func (s *S3BlobStore) initOnce() error { return nil } +// Has returns T/F or Error ( from S3 ) if the store contains the blob. func (s *S3BlobStore) Has(hash string) (bool, error) { err := s.initOnce() if err != nil { @@ -71,7 +74,9 @@ func (s *S3BlobStore) Has(hash string) (bool, error) { return true, nil } +// Get returns the blob slice if present or errors on S3. func (s *S3BlobStore) Get(hash string) ([]byte, error) { + //Todo-Need to handle error for blob doesn't exist for consistency. err := s.initOnce() if err != nil { return []byte{}, err @@ -102,6 +107,7 @@ func (s *S3BlobStore) Get(hash string) ([]byte, error) { return buf.Bytes(), nil } +// Put stores the blob on S3 or errors if S3 connection errors. func (s *S3BlobStore) Put(hash string, blob []byte) error { err := s.initOnce() if err != nil { @@ -122,6 +128,8 @@ func (s *S3BlobStore) Put(hash string, blob []byte) error { return err } +// PutSD stores the sd blob on S3 or errors if S3 connection errors. func (s *S3BlobStore) PutSD(hash string, blob []byte) error { + //Todo - handle missing stream for consistency return s.Put(hash, blob) } diff --git a/store/store.go b/store/store.go index f9258bc..580ae7b 100644 --- a/store/store.go +++ b/store/store.go @@ -2,6 +2,7 @@ package store import "github.com/lbryio/lbry.go/errors" +// BlobStore is an interface with methods for consistently handling blob storage. type BlobStore interface { Has(string) (bool, error) Get(string) ([]byte, error) @@ -9,4 +10,5 @@ type BlobStore interface { PutSD(string, []byte) error } +//ErrBlobNotFound is a standard error when a blob is not found in the store. var ErrBlobNotFound = errors.Base("blob not found") diff --git a/types/types.go b/types/types.go index 8e582e6..f63e560 100644 --- a/types/types.go +++ b/types/types.go @@ -1,5 +1,6 @@ package types +// SdBlob is an instance of specialized blob that contains information on the rest of the blobs it is associated with. type SdBlob struct { StreamName string `json:"stream_name"` Blobs []struct {