diff --git a/server/args.go b/server/args.go index b0374ba..beff48f 100644 --- a/server/args.go +++ b/server/args.go @@ -19,7 +19,6 @@ type Args struct { CmdType int Host string Port string - UDPPort string EsHost string EsPort string PrometheusPort string @@ -39,7 +38,6 @@ type Args struct { const ( DefaultHost = "0.0.0.0" DefaultPort = "50051" - DefaultUdpPort = "41119" DefaultEsHost = "http://localhost" DefaultEsIndex = "claims" DefaultEsPort = "9200" @@ -88,7 +86,6 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args { port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "RPC port", Default: DefaultPort}) esHost := parser.String("", "eshost", &argparse.Options{Required: false, Help: "elasticsearch host", Default: DefaultEsHost}) esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort}) - udpPort := parser.String("", "uspport", &argparse.Options{Required: false, Help: "udp ping port", Default: DefaultUdpPort}) prometheusPort := parser.String("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort}) esIndex := parser.String("", "esindex", &argparse.Options{Required: false, Help: "elasticsearch index name", Default: DefaultEsIndex}) refreshDelta := parser.Int("", "refresh-delta", &argparse.Options{Required: false, Help: "elasticsearch index refresh delta in seconds", Default: DefaultRefreshDelta}) @@ -125,7 +122,6 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args { Port: *port, EsHost: *esHost, EsPort: *esPort, - UDPPort: *udpPort, PrometheusPort: *prometheusPort, EsIndex: *esIndex, RefreshDelta: *refreshDelta, diff --git a/server/federation.go b/server/federation.go index e6f31cd..cebb5ba 100644 --- a/server/federation.go +++ b/server/federation.go @@ -4,6 +4,7 @@ import ( "bufio" "context" "log" + "math" "os" "strings" "sync/atomic" @@ -66,6 +67,19 @@ func (s *Server) getNumSubs() int64 { return *s.NumPeerSubs } +// getAndSetExternalIp takes the address of a peer running a UDP server and +// pings it, so we can determine our own external IP address. +func (s *Server) getAndSetExternalIp(msg *pb.ServerMessage) error { + myIp, err := UDPPing(msg.Address, msg.Port) + if err != nil { + return err + } + log.Println("my ip: ", myIp) + s.ExternalIP = myIp + + return nil +} + // loadPeers takes the arguments given to the hub at startup and loads the // previously known peers from disk and verifies their existence before // storing them as known peers. Returns a map of peerKey -> object @@ -73,6 +87,33 @@ func (s *Server) loadPeers() error { peerFile := s.Args.PeerFile port := s.Args.Port + // First we make sure our server has come up, so we can answer back to peers. + var failures = 0 + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + +retry: + time.Sleep(time.Second * time.Duration(math.Pow(float64(failures), 2))) + conn, err := grpc.DialContext(ctx, + "0.0.0.0:"+port, + grpc.WithInsecure(), + grpc.WithBlock(), + ) + + if err != nil { + if failures > 3 { + log.Println("Warning! Our endpoint doesn't seem to have come up, didn't load peers") + return err + } + failures += 1 + goto retry + } + if err = conn.Close(); err != nil { + log.Println(err) + } + cancel() + + f, err := os.Open(peerFile) if err != nil { log.Println(err) @@ -97,19 +138,22 @@ func (s *Server) loadPeers() error { } // If the peer is us, skip log.Println(ipPort) - if ipPort[1] == port && localHosts[ipPort[0]] { + if ipPort[1] == port && + (localHosts[ipPort[0]] || ipPort[0] == s.ExternalIP) { log.Println("Self peer, skipping ...") continue } + srvMsg := &pb.ServerMessage{ Address: ipPort[0], Port: ipPort[1], } log.Printf("pinging peer %+v\n", srvMsg) - err := s.addPeer(srvMsg, true) + err = s.addPeer(srvMsg, true, true) if err != nil { log.Println(err) } + } log.Println("Returning from loadPeers") @@ -133,20 +177,18 @@ func (s *Server) subscribeToPeer(peer *FederatedServer) error { defer conn.Close() msg := &pb.ServerMessage{ - Address: s.Args.Host, + Address: s.ExternalIP, Port: s.Args.Port, } c := pb.NewHubClient(conn) - log.Printf("%s:%s subscribing to %+v\n", s.Args.Host, s.Args.Port, peer) + log.Printf("%s:%s subscribing to %+v\n", s.ExternalIP, s.Args.Port, peer) _, err = c.PeerSubscribe(ctx, msg) if err != nil { return err } - s.Subscribed = true - return nil } @@ -175,11 +217,11 @@ func (s *Server) helloPeer(server *FederatedServer) (*pb.HelloMessage, error) { msg := &pb.HelloMessage{ Port: s.Args.Port, - Host: s.Args.Host, + Host: s.ExternalIP, Servers: []*pb.ServerMessage{}, } - log.Printf("%s:%s saying hello to %+v\n", s.Args.Host, s.Args.Port, server) + log.Printf("%s:%s saying hello to %+v\n", s.ExternalIP, s.Args.Port, server) res, err := c.Hello(ctx, msg) if err != nil { log.Println(err) @@ -282,19 +324,31 @@ func (s *Server) notifyPeerSubs(newServer *FederatedServer) { // addPeer takes a new peer as a pb.ServerMessage, optionally checks to see // if they're online, and adds them to our list of peer. If we're not currently // subscribed to a peer, it will also subscribe to it. -func (s *Server) addPeer(msg *pb.ServerMessage, ping bool) error { +func (s *Server) addPeer(msg *pb.ServerMessage, ping bool, subscribe bool) error { + // First thing we get our external ip if we don't have it, otherwise we + // could end up subscribed to our self, which is silly. + if s.ExternalIP == "" { + err := s.getAndSetExternalIp(msg) + if err != nil { + log.Println(err) + log.Println("WARNING: can't determine external IP, continuing with ", s.Args.Host) + } + } + if s.Args.Port == msg.Port && - (localHosts[msg.Address] || msg.Address == s.Args.Host) { - log.Printf("%s:%s addPeer: Self peer, skipping...\n", s.Args.Host, s.Args.Port) + (localHosts[msg.Address] || msg.Address == s.ExternalIP) { + log.Printf("%s:%s addPeer: Self peer, skipping...\n", s.ExternalIP, s.Args.Port) return nil } + k := peerKey(msg) newServer := &FederatedServer{ Address: msg.Address, Port: msg.Port, Ts: time.Now(), } - log.Printf("%s:%s adding peer %+v\n", s.Args.Host, s.Args.Port, msg) + + log.Printf("%s:%s adding peer %+v\n", s.ExternalIP, s.Args.Port, msg) if oldServer, loaded := s.PeerServersLoadOrStore(newServer); !loaded { if ping { _, err := s.helloPeer(newServer) @@ -312,11 +366,11 @@ func (s *Server) addPeer(msg *pb.ServerMessage, ping bool) error { s.notifyPeerSubs(newServer) // Subscribe to all our peers for now - err := s.subscribeToPeer(newServer) - if err != nil { - return err - } else { - s.Subscribed = true + if subscribe { + err := s.subscribeToPeer(newServer) + if err != nil { + return err + } } } else { oldServer.Ts = time.Now() @@ -328,7 +382,7 @@ func (s *Server) addPeer(msg *pb.ServerMessage, ping bool) error { // peers. func (s *Server) mergeFederatedServers(servers []*pb.ServerMessage) { for _, srvMsg := range servers { - err := s.addPeer(srvMsg, false) + err := s.addPeer(srvMsg, false, true) // This shouldn't happen because we're not pinging them. if err != nil { log.Println(err) @@ -352,7 +406,7 @@ func (s *Server) makeHelloMessage() *pb.HelloMessage { return &pb.HelloMessage{ Port: s.Args.Port, - Host: s.Args.Host, + Host: s.ExternalIP, Servers: servers, } } diff --git a/server/federation_test.go b/server/federation_test.go index b6286a8..2719917 100644 --- a/server/federation_test.go +++ b/server/federation_test.go @@ -49,7 +49,6 @@ func makeDefaultArgs() *Args { Port: DefaultPort, EsHost: DefaultEsHost, EsPort: DefaultEsPort, - UDPPort: DefaultUdpPort, PrometheusPort: DefaultPrometheusPort, EsIndex: DefaultEsIndex, RefreshDelta: DefaultRefreshDelta, @@ -89,7 +88,7 @@ func TestAddPeer(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T){ server := MakeHubServer(ctx, args) - server.Subscribed = true + server.ExternalIP = "0.0.0.0" metrics.PeersKnown.Set(0) for i := 0; i < 10; i++ { @@ -107,7 +106,7 @@ func TestAddPeer(t *testing.T) { } } //log.Printf("Adding peer %+v\n", msg) - err := server.addPeer(msg, false) + err := server.addPeer(msg, false, false) if err != nil { log.Println(err) } @@ -148,7 +147,7 @@ func TestPeerWriter(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T){ server := MakeHubServer(ctx, args) - server.Subscribed = true + server.ExternalIP = "0.0.0.0" for i := 0; i < 10; i++ { var msg *pb.ServerMessage @@ -165,7 +164,7 @@ func TestPeerWriter(t *testing.T) { } } //log.Printf("Adding peer %+v\n", msg) - err := server.addPeer(msg, false) + err := server.addPeer(msg, false, false) if err != nil { log.Println(err) } @@ -425,3 +424,59 @@ func TestAddPeerEndpoint3(t *testing.T) { } } + + +// TestAddPeer tests the ability to add peers +func TestUDPServer(t *testing.T) { + ctx := context.Background() + args := makeDefaultArgs() + args.StartUDP = true + args2 := makeDefaultArgs() + args2.Port = "50052" + args2.StartUDP = true + + tests := []struct { + name string + want string + } { + { + name: "hubs server external ip", + want: "127.0.0.1", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T){ + server := MakeHubServer(ctx, args) + server2 := MakeHubServer(ctx, args2) + go server.Run() + go server2.Run() + metrics.PeersKnown.Set(0) + + msg := &pb.ServerMessage{ + Address: "0.0.0.0", + Port: "50052", + } + + err := server.addPeer(msg, true, true) + if err != nil { + log.Println(err) + } + + server.GrpcServer.GracefulStop() + server2.GrpcServer.GracefulStop() + + got1 := server.ExternalIP + if got1 != tt.want { + t.Errorf("server.ExternalIP = %s, want %s\n", got1, tt.want) + t.Errorf("server.Args.Port = %s\n", server.Args.Port) + } + got2 := server2.ExternalIP + if got2 != tt.want { + t.Errorf("server2.ExternalIP = %s, want %s\n", got2, tt.want) + t.Errorf("server2.Args.Port = %s\n", server2.Args.Port) + } + }) + } + +} diff --git a/server/server.go b/server/server.go index e82ebe2..b59bfb4 100644 --- a/server/server.go +++ b/server/server.go @@ -41,7 +41,7 @@ type Server struct { PeerSubs map[string]*FederatedServer PeerSubsMut sync.RWMutex NumPeerSubs *int64 - Subscribed bool + ExternalIP string pb.UnimplementedHubServer } @@ -202,7 +202,7 @@ func MakeHubServer(ctx context.Context, args *Args) *Server { PeerSubs: make(map[string]*FederatedServer), PeerSubsMut: sync.RWMutex{}, NumPeerSubs: numSubs, - Subscribed: false, + ExternalIP: "", } // Start up our background services @@ -219,10 +219,12 @@ func MakeHubServer(ctx context.Context, args *Args) *Server { } // Load peers from disk and subscribe to one if there are any if args.LoadPeers { - err = s.loadPeers() - if err != nil { - log.Println(err) - } + go func() { + err := s.loadPeers() + if err != nil { + log.Println(err) + } + }() } return s @@ -251,7 +253,7 @@ func (s *Server) Hello(ctx context.Context, args *pb.HelloMessage) (*pb.HelloMes } log.Println(server) - err := s.addPeer(&pb.ServerMessage{Address: host, Port: port}, false) + err := s.addPeer(&pb.ServerMessage{Address: host, Port: port}, false, true) // They just contacted us, so this shouldn't happen if err != nil { log.Println(err) @@ -288,7 +290,7 @@ func (s *Server) PeerSubscribe(ctx context.Context, in *pb.ServerMessage) (*pb.S func (s *Server) AddPeer(ctx context.Context, args *pb.ServerMessage) (*pb.StringValue, error) { metrics.RequestsCount.With(prometheus.Labels{"method": "add_peer"}).Inc() var msg = "Success" - err := s.addPeer(args, true) + err := s.addPeer(args, true, true) if err != nil { log.Println(err) msg = "Failed" diff --git a/server/udp.go b/server/udp.go index bf8ad5f..6b5140c 100644 --- a/server/udp.go +++ b/server/udp.go @@ -99,12 +99,12 @@ func decodeSPVPong(data []byte) *SPVPong { parsedProtocalVersion := data[0] flags := data[1] - height := binary.BigEndian.Uint32(data[:2]) + height := binary.BigEndian.Uint32(data[2:]) tip := make([]byte, 32) copy(tip, data[6:38]) srcRawAddr := make([]byte, 4) copy(srcRawAddr, data[38:42]) - country := binary.BigEndian.Uint16(data[:42]) + country := binary.BigEndian.Uint16(data[42:]) return &SPVPong{ protocolVersion: parsedProtocalVersion, flags: flags, @@ -148,7 +148,8 @@ func (pong *SPVPong) DecodeAddress() string { // UDPPing sends a ping over udp to another hub and returns the ip address of // this hub. -func UDPPing(address string) (string, error) { +func UDPPing(ip, port string) (string, error) { + address := ip + ":" + port addr, err := net.ResolveUDPAddr("udp", address) if err != nil { return "", err @@ -192,7 +193,7 @@ func UDPPing(address string) (string, error) { // Ping/Pong protocol to find out about each other without making full TCP // connections. func UDPServer(args *Args) error { - address := ":" + args.UDPPort + address := ":" + args.Port tip := make([]byte, 32) addr, err := net.ResolveUDPAddr("udp", address) if err != nil {