diff --git a/cluster/cluster.go b/cluster/cluster.go index 100e4c2..30803b1 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -3,105 +3,127 @@ package cluster import ( "io/ioutil" baselog "log" - "os" - "os/signal" - "strconv" - "sync" - "syscall" + "sort" - "github.com/davecgh/go-spew/spew" "github.com/lbryio/lbry.go/crypto" "github.com/lbryio/lbry.go/errors" - "github.com/lbryio/reflector.go/cluster" + "github.com/lbryio/lbry.go/stopOnce" "github.com/hashicorp/serf/serf" log "github.com/sirupsen/logrus" ) +const ( + DefaultClusterPort = 17946 +) + type Cluster struct { + name string + port int + seedAddr string + s *serf.Serf - eventCh <-chan serf.Event + eventCh chan serf.Event + stop *stopOnce.Stopper } -func New() { - c := &Cluster{} +func New(port int, seedAddr string) *Cluster { + return &Cluster{ + name: crypto.RandString(12), + port: port, + seedAddr: seedAddr, + stop: stopOnce.New(), + } +} + +func (c *Cluster) Connect() error { var err error - nodeName := crypto.RandString(12) - clusterAddr := "127.0.0.1:" + strconv.Itoa(clusterPort) - if args[0] == clusterStart { - c.s, c.eventCh, err = cluster.Connect(nodeName, clusterAddr, clusterPort) - } else { - c.s, c.eventCh, err = cluster.Connect(nodeName, clusterAddr, clusterPort+1+int(crypto.RandInt64(1000))) - } - if err != nil { - log.Fatal(err) - } - defer c.Leave() - - shutdownCh := make(chan struct{}) - var shutdownWg sync.WaitGroup - - shutdownWg.Add(1) - go func() { - defer shutdownWg.Done() - for { - select { - case event := <-eventCh: - spew.Dump(event) - switch event.EventType() { - case serf.EventMemberJoin, serf.EventMemberFailed, serf.EventMemberLeave: - memberEvent := event.(serf.MemberEvent) - if event.EventType() == serf.EventMemberJoin && len(memberEvent.Members) == 1 && memberEvent.Members[0].Name == nodeName { - // ignore event from my own joining of the cluster - } else { - //spew.Dump(c.Members()) - alive := getAliveMembers(c.Members()) - log.Printf("%s: my hash range is now %d of %d\n", nodeName, getHashRangeStart(nodeName, alive), len(alive)) - // figure out my new hash range based on the start and the number of alive members - // get hashes in that range that need announcing - // announce them - // if more than one node is announcing each hash, figure out how to deal with last_announced_at so both nodes dont announce the same thing at the same time - } - } - case <-shutdownCh: - log.Debugln("shutting down event dumper") - return - } - } - }() - - interruptChan := make(chan os.Signal, 1) - signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) - <-interruptChan - log.Debugln("received interrupt") - close(shutdownCh) - log.Debugln("waiting for threads to finish") - shutdownWg.Wait() - log.Debugln("shutting down main thread") -} - -func Connect(nodeName, addr string, port int) (*serf.Serf, <-chan serf.Event, error) { conf := serf.DefaultConfig() - conf.MemberlistConfig.BindPort = port - conf.MemberlistConfig.AdvertisePort = port - conf.NodeName = nodeName + conf.MemberlistConfig.BindPort = c.port + conf.MemberlistConfig.AdvertisePort = c.port + conf.NodeName = c.name nullLogger := baselog.New(ioutil.Discard, "", 0) conf.Logger = nullLogger - eventCh := make(chan serf.Event) - conf.EventCh = eventCh + c.eventCh = make(chan serf.Event) + conf.EventCh = c.eventCh - cluster, err := serf.Create(conf) + c.s, err = serf.Create(conf) if err != nil { - return nil, nil, errors.Prefix("couldn't create cluster", err) + return errors.Prefix("couldn't create cluster", err) } - _, err = cluster.Join([]string{addr}, true) - if err != nil { - log.Warnf("couldn't join cluster, starting own: %v\n", err) + if c.seedAddr != "" { + _, err = c.s.Join([]string{c.seedAddr}, true) + if err != nil { + return err + } } - return cluster, eventCh, nil + c.listen() + return nil +} + +func (c *Cluster) Shutdown() { + c.stop.StopAndWait() + c.s.Leave() +} + +func (c *Cluster) listen() { + c.stop.Add(1) + go func() { + defer c.stop.Done() + for { + select { + case <-c.stop.Ch(): + return + case event := <-c.eventCh: + switch event.EventType() { + case serf.EventMemberJoin, serf.EventMemberFailed, serf.EventMemberLeave: + memberEvent := event.(serf.MemberEvent) + if event.EventType() == serf.EventMemberJoin && len(memberEvent.Members) == 1 && memberEvent.Members[0].Name == c.name { + // ignore event from my own joining of the cluster + continue + } + + //spew.Dump(c.Members()) + alive := getAliveMembers(c.s.Members()) + log.Printf("%s: my hash range is now %d of %d\n", c.name, getHashRangeStart(c.name, alive), len(alive)) + // figure out my new hash range based on the start and the number of alive members + // get hashes in that range that need announcing + // announce them + // if more than one node is announcing each hash, figure out how to deal with last_announced_at so both nodes dont announce the same thing at the same time + } + } + } + }() +} + +func getHashRangeStart(myName string, members []serf.Member) int { + var names []string + for _, m := range members { + names = append(names, m.Name) + } + + sort.Strings(names) + i := 1 + for _, n := range names { + if n == myName { + return i + } + i++ + } + return -1 +} + +func getAliveMembers(members []serf.Member) []serf.Member { + var alive []serf.Member + for _, m := range members { + if m.Status == serf.StatusAlive { + alive = append(alive, m) + } + } + return alive } diff --git a/cmd/cluster.go b/cmd/cluster.go index cb6659f..2f4db81 100644 --- a/cmd/cluster.go +++ b/cmd/cluster.go @@ -3,32 +3,21 @@ package cmd import ( "os" "os/signal" - "sort" "strconv" - "sync" "syscall" "github.com/lbryio/lbry.go/crypto" "github.com/lbryio/reflector.go/cluster" - "github.com/davecgh/go-spew/spew" - "github.com/hashicorp/serf/serf" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) -const ( - clusterStart = "start" - clusterJoin = "join" - - clusterPort = 17946 -) - func init() { var cmd = &cobra.Command{ Use: "cluster [start|join]", Short: "Connect to cluster", - ValidArgs: []string{clusterStart, clusterJoin}, + ValidArgs: []string{"start", "join"}, Args: argFuncs(cobra.ExactArgs(1), cobra.OnlyValidArgs), Run: clusterCmd, } @@ -36,87 +25,21 @@ func init() { } func clusterCmd(cmd *cobra.Command, args []string) { - var c *serf.Serf - var eventCh <-chan serf.Event - var err error - - nodeName := crypto.RandString(12) - clusterAddr := "127.0.0.1:" + strconv.Itoa(clusterPort) - if args[0] == clusterStart { - c, eventCh, err = cluster.Connect(nodeName, clusterAddr, clusterPort) + port := 17946 + var c *cluster.Cluster + if args[0] == "start" { + c = cluster.New(port, "") } else { - c, eventCh, err = cluster.Connect(nodeName, clusterAddr, clusterPort+1+int(crypto.RandInt64(1000))) + c = cluster.New(port+1+int(crypto.RandInt64(1000)), "127.0.0.1:"+strconv.Itoa(port)) } + + err := c.Connect() if err != nil { log.Fatal(err) } - defer c.Leave() - - shutdownCh := make(chan struct{}) - var shutdownWg sync.WaitGroup - - shutdownWg.Add(1) - go func() { - defer shutdownWg.Done() - for { - select { - case event := <-eventCh: - spew.Dump(event) - switch event.EventType() { - case serf.EventMemberJoin, serf.EventMemberFailed, serf.EventMemberLeave: - memberEvent := event.(serf.MemberEvent) - if event.EventType() == serf.EventMemberJoin && len(memberEvent.Members) == 1 && memberEvent.Members[0].Name == nodeName { - // ignore event from my own joining of the cluster - } else { - //spew.Dump(c.Members()) - alive := getAliveMembers(c.Members()) - log.Printf("%s: my hash range is now %d of %d\n", nodeName, getHashRangeStart(nodeName, alive), len(alive)) - // figure out my new hash range based on the start and the number of alive members - // get hashes in that range that need announcing - // announce them - // if more than one node is announcing each hash, figure out how to deal with last_announced_at so both nodes dont announce the same thing at the same time - } - } - case <-shutdownCh: - log.Debugln("shutting down event dumper") - return - } - } - }() interruptChan := make(chan os.Signal, 1) signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) <-interruptChan - log.Debugln("received interrupt") - close(shutdownCh) - log.Debugln("waiting for threads to finish") - shutdownWg.Wait() - log.Debugln("shutting down main thread") -} - -func getHashRangeStart(myName string, members []serf.Member) int { - var names []string - for _, m := range members { - names = append(names, m.Name) - } - - sort.Strings(names) - i := 1 - for _, n := range names { - if n == myName { - return i - } - i++ - } - return -1 -} - -func getAliveMembers(members []serf.Member) []serf.Member { - var alive []serf.Member - for _, m := range members { - if m.Status == serf.StatusAlive { - alive = append(alive, m) - } - } - return alive + c.Shutdown() } diff --git a/cmd/root.go b/cmd/root.go index 627fd92..e09d4b1 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -5,7 +5,7 @@ import ( "io/ioutil" "os" - "github.com/lbryio/errors.go" + "github.com/lbryio/lbry.go/errors" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" diff --git a/cmd/start.go b/cmd/start.go new file mode 100644 index 0000000..2a89d2e --- /dev/null +++ b/cmd/start.go @@ -0,0 +1,50 @@ +package cmd + +import ( + "os" + "os/signal" + "syscall" + + "github.com/lbryio/reflector.go/db" + "github.com/lbryio/reflector.go/reflector" + "github.com/lbryio/reflector.go/store" + + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +func init() { + var cmd = &cobra.Command{ + Use: "start [cluster-address]", + Short: "Run prism server", + Run: startCmd, + Args: cobra.RangeArgs(0, 1), + } + RootCmd.AddCommand(cmd) +} + +func startCmd(cmd *cobra.Command, args []string) { + db := new(db.SQL) + err := db.Connect(GlobalConfig.DBConn) + checkErr(err) + + s3 := store.NewS3BlobStore(GlobalConfig.AwsID, GlobalConfig.AwsSecret, GlobalConfig.BucketRegion, GlobalConfig.BucketName) + comboStore := store.NewDBBackedS3Store(s3, db) + + clusterAddr := "" + if len(args) > 0 { + clusterAddr = args[0] + } + + p := reflector.NewPrism(comboStore, clusterAddr) + err = p.Connect() + if err != nil { + log.Error(err) + return + } + + interruptChan := make(chan os.Signal, 1) + signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) + <-interruptChan + p.Shutdown() +} diff --git a/peer/server.go b/peer/server.go index 9dbd9e0..c232593 100644 --- a/peer/server.go +++ b/peer/server.go @@ -10,9 +10,10 @@ import ( "strings" "time" - "github.com/davecgh/go-spew/spew" + "github.com/lbryio/lbry.go/errors" "github.com/lbryio/reflector.go/store" + "github.com/davecgh/go-spew/spew" log "github.com/sirupsen/logrus" ) @@ -22,7 +23,9 @@ const ( ) type Server struct { - store store.BlobStore + store store.BlobStore + l net.Listener + closed bool } func NewServer(store store.BlobStore) *Server { @@ -31,6 +34,12 @@ func NewServer(store store.BlobStore) *Server { } } +func (s *Server) Shutdown() { + // TODO: need waitgroup so we can finish whatever we're doing before stopping + s.closed = true + s.l.Close() +} + func (s *Server) ListenAndServe(address string) error { log.Println("Listening on " + address) l, err := net.Listen("tcp", address) @@ -42,6 +51,9 @@ func (s *Server) ListenAndServe(address string) error { for { conn, err := l.Accept() if err != nil { + if s.closed { + return nil + } log.Error(err) } else { go s.handleConnection(conn) @@ -217,3 +229,43 @@ func GetBlobHash(blob []byte) string { hashBytes := sha512.Sum384(blob) return hex.EncodeToString(hashBytes[:]) } + +const ( + maxRequestSize = 64 * (2 ^ 10) // 64kb + paymentRateAccepted = "RATE_ACCEPTED" + paymentRateTooLow = "RATE_TOO_LOW" + paymentRateUnset = "RATE_UNSET" +) + +var errRequestTooLarge = errors.Base("request is too large") + +type availabilityRequest struct { + LbrycrdAddress bool `json:"lbrycrd_address"` + RequestedBlobs []string `json:"requested_blobs"` +} + +type availabilityResponse struct { + LbrycrdAddress string `json:"lbrycrd_address"` + AvailableBlobs []string `json:"available_blobs"` +} + +type paymentRateRequest struct { + BlobDataPaymentRate float64 `json:"blob_data_payment_rate"` +} + +type paymentRateResponse struct { + BlobDataPaymentRate string `json:"blob_data_payment_rate"` +} + +type blobRequest struct { + RequestedBlob string `json:"requested_blob"` +} + +type incomingBlob struct { + Error string `json:"error,omitempty"` + BlobHash string `json:"blob_hash"` + Length int `json:"length"` +} +type blobResponse struct { + IncomingBlob incomingBlob `json:"incoming_blob"` +} diff --git a/peer/shared.go b/peer/shared.go deleted file mode 100644 index 5d6937f..0000000 --- a/peer/shared.go +++ /dev/null @@ -1,44 +0,0 @@ -package peer - -import "github.com/lbryio/lbry.go/errors" - -const maxRequestSize = 64 * (2 ^ 10) // 64kb - -var errRequestTooLarge = errors.Base("request is too large") - -type availabilityRequest struct { - LbrycrdAddress bool `json:"lbrycrd_address"` - RequestedBlobs []string `json:"requested_blobs"` -} - -type availabilityResponse struct { - LbrycrdAddress string `json:"lbrycrd_address"` - AvailableBlobs []string `json:"available_blobs"` -} - -const ( - paymentRateAccepted = "RATE_ACCEPTED" - paymentRateTooLow = "RATE_TOO_LOW" - paymentRateUnset = "RATE_UNSET" -) - -type paymentRateRequest struct { - BlobDataPaymentRate float64 `json:"blob_data_payment_rate"` -} - -type paymentRateResponse struct { - BlobDataPaymentRate string `json:"blob_data_payment_rate"` -} - -type blobRequest struct { - RequestedBlob string `json:"requested_blob"` -} - -type incomingBlob struct { - Error string `json:"error,omitempty"` - BlobHash string `json:"blob_hash"` - Length int `json:"length"` -} -type blobResponse struct { - IncomingBlob incomingBlob `json:"incoming_blob"` -} diff --git a/reflector/prism.go b/reflector/prism.go new file mode 100644 index 0000000..e1f865c --- /dev/null +++ b/reflector/prism.go @@ -0,0 +1,58 @@ +package reflector + +import ( + "github.com/lbryio/lbry.go/stopOnce" + "github.com/lbryio/reflector.go/cluster" + "github.com/lbryio/reflector.go/dht" + "github.com/lbryio/reflector.go/peer" + "github.com/lbryio/reflector.go/store" +) + +type Prism struct { + dht *dht.DHT + peer *peer.Server + reflector *Server + cluster *cluster.Cluster + + stop *stopOnce.Stopper +} + +func NewPrism(store store.BlobStore, clusterSeedAddr string) *Prism { + d, err := dht.New(nil) + if err != nil { + panic(err) + } + return &Prism{ + dht: d, + peer: peer.NewServer(store), + reflector: NewServer(store), + cluster: cluster.New(cluster.DefaultClusterPort, clusterSeedAddr), + stop: stopOnce.New(), + } +} + +func (p *Prism) Connect() error { + err := p.dht.Start() + if err != nil { + return err + } + + err = p.cluster.Connect() + if err != nil { + return err + } + + // start peer + + // start reflector + + return nil +} + +func (p *Prism) Shutdown() { + p.stop.StopAndWait() + p.reflector.Shutdown() + p.peer.Shutdown() + p.cluster.Shutdown() + p.dht.Shutdown() +} diff --git a/reflector/server.go b/reflector/server.go index b8f49e8..e5fb7a4 100644 --- a/reflector/server.go +++ b/reflector/server.go @@ -15,7 +15,9 @@ import ( ) type Server struct { - store store.BlobStore + store store.BlobStore + l net.Listener + closed bool } func NewServer(store store.BlobStore) *Server { @@ -24,6 +26,12 @@ func NewServer(store store.BlobStore) *Server { } } +func (s *Server) Shutdown() { + // TODO: need waitgroup so we can finish whatever we're doing before stopping + s.closed = true + s.l.Close() +} + func (s *Server) ListenAndServe(address string) error { log.Println("Listening on " + address) l, err := net.Listen("tcp", address) @@ -35,6 +43,9 @@ func (s *Server) ListenAndServe(address string) error { for { conn, err := l.Accept() if err != nil { + if s.closed { + return nil + } log.Error(err) } else { go s.handleConn(conn)