From d167213d548ba8419848010813703e1996e2b47c Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Thu, 8 Feb 2018 13:33:52 -0500 Subject: [PATCH] use serf to track cluster members, update hash range on membership change. closes lbryio/reflector-cluster#47 --- cluster/cluster.go | 30 +++++++++++ cmd/cluster.go | 122 +++++++++++++++++++++++++++++++++++++++++++++ cmd/peer.go | 6 +-- cmd/reflector.go | 6 +-- cmd/root.go | 12 +++++ 5 files changed, 170 insertions(+), 6 deletions(-) create mode 100644 cluster/cluster.go create mode 100644 cmd/cluster.go diff --git a/cluster/cluster.go b/cluster/cluster.go new file mode 100644 index 0000000..b509715 --- /dev/null +++ b/cluster/cluster.go @@ -0,0 +1,30 @@ +package cluster + +import ( + "github.com/lbryio/errors.go" + + "github.com/hashicorp/serf/serf" + log "github.com/sirupsen/logrus" +) + +func Connect(nodeName, addr string, port int) (*serf.Serf, <-chan serf.Event, error) { + conf := serf.DefaultConfig() + conf.MemberlistConfig.BindPort = port + conf.MemberlistConfig.AdvertisePort = port + conf.NodeName = nodeName + + eventCh := make(chan serf.Event) + conf.EventCh = eventCh + + cluster, err := serf.Create(conf) + if err != nil { + return nil, nil, errors.Prefix("couldn't create cluster", err) + } + + _, err = cluster.Join([]string{addr}, true) + if err != nil { + log.Warnf("couldn't join cluster, starting own: %v\n", err) + } + + return cluster, eventCh, nil +} diff --git a/cmd/cluster.go b/cmd/cluster.go new file mode 100644 index 0000000..5fa4ccd --- /dev/null +++ b/cmd/cluster.go @@ -0,0 +1,122 @@ +package cmd + +import ( + "math/rand" + "os" + "os/signal" + "sort" + "strconv" + "sync" + "syscall" + + "github.com/hashicorp/serf/serf" + "github.com/lbryio/internal-apis/app/crypto" + "github.com/lbryio/reflector.go/cluster" + + "github.com/davecgh/go-spew/spew" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +const ( + clusterStart = "start" + clusterJoin = "join" + + clusterPort = 17946 +) + +func init() { + var cmd = &cobra.Command{ + Use: "cluster [start|join]", + Short: "Connect to cluster", + ValidArgs: []string{clusterStart, clusterJoin}, + Args: argFuncs(cobra.ExactArgs(1), cobra.OnlyValidArgs), + Run: clusterCmd, + } + RootCmd.AddCommand(cmd) +} + +func clusterCmd(cmd *cobra.Command, args []string) { + var c *serf.Serf + var eventCh <-chan serf.Event + var err error + + nodeName := crypto.RandString(12) + clusterAddr := "127.0.0.1:" + strconv.Itoa(clusterPort) + if args[0] == clusterStart { + c, eventCh, err = cluster.Connect(nodeName, clusterAddr, clusterPort) + } else { + c, eventCh, err = cluster.Connect(nodeName, clusterAddr, clusterPort+1+rand.Intn(1000)) + } + if err != nil { + log.Fatal(err) + } + defer c.Leave() + + shutdownCh := make(chan struct{}) + var shutdownWg sync.WaitGroup + + shutdownWg.Add(1) + go func() { + defer shutdownWg.Done() + for { + select { + case event := <-eventCh: + spew.Dump(event) + switch event.EventType() { + case serf.EventMemberJoin, serf.EventMemberFailed, serf.EventMemberLeave: + memberEvent := event.(serf.MemberEvent) + if event.EventType() == serf.EventMemberJoin && len(memberEvent.Members) == 1 && memberEvent.Members[0].Name == nodeName { + // ignore event from my own joining of the cluster + } else { + spew.Dump(c.Members()) + log.Printf("my hash range is now %d\n", getHashRangeStart(nodeName, getAliveMembers(c.Members()))) + // figure out my new hash range based on the start and the number of alive members + // get hashes in that range that need announcing + // announce them + // if more than one node is announcing each hash, figure out how to deal with last_announced_at so both nodes dont announce the same thing at the same time + } + } + case <-shutdownCh: + log.Debugln("shutting down event dumper") + return + } + } + }() + + interruptChan := make(chan os.Signal, 1) + signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) + <-interruptChan + log.Debugln("received interrupt") + close(shutdownCh) + log.Debugln("waiting for threads to finish") + shutdownWg.Wait() + log.Debugln("shutting down main thread") +} + +func getHashRangeStart(myName string, members []serf.Member) int { + names := []string{} + for _, m := range members { + names = append(names, m.Name) + } + + sort.Strings(names) + i := 1 + for _, n := range names { + if n == myName { + return i + } + i++ + } + return -1 +} + +func getAliveMembers(members []serf.Member) []serf.Member { + alive := []serf.Member{} + for _, m := range members { + if m.Status == serf.StatusAlive { + alive = append(alive, m) + } + } + return alive +} diff --git a/cmd/peer.go b/cmd/peer.go index 5e75c09..ced40bc 100644 --- a/cmd/peer.go +++ b/cmd/peer.go @@ -1,23 +1,23 @@ package cmd import ( - "log" "strconv" "github.com/lbryio/reflector.go/db" "github.com/lbryio/reflector.go/peer" "github.com/lbryio/reflector.go/store" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) func init() { - var peerCmd = &cobra.Command{ + var cmd = &cobra.Command{ Use: "peer", Short: "Run peer server", Run: peerCmd, } - RootCmd.AddCommand(peerCmd) + RootCmd.AddCommand(cmd) } func peerCmd(cmd *cobra.Command, args []string) { diff --git a/cmd/reflector.go b/cmd/reflector.go index d314dfe..06765fe 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -1,23 +1,23 @@ package cmd import ( - "log" "strconv" "github.com/lbryio/reflector.go/db" "github.com/lbryio/reflector.go/reflector" "github.com/lbryio/reflector.go/store" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) func init() { - var reflectorCmd = &cobra.Command{ + var cmd = &cobra.Command{ Use: "reflector", Short: "Run reflector server", Run: reflectorCmd, } - RootCmd.AddCommand(reflectorCmd) + RootCmd.AddCommand(cmd) } func reflectorCmd(cmd *cobra.Command, args []string) { diff --git a/cmd/root.go b/cmd/root.go index 2f71f8d..95df43d 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -40,3 +40,15 @@ func checkErr(err error) { panic(err) } } + +func argFuncs(funcs ...cobra.PositionalArgs) cobra.PositionalArgs { + return func(cmd *cobra.Command, args []string) error { + for _, f := range funcs { + err := f(cmd, args) + if err != nil { + return err + } + } + return nil + } +}