From 395e1db489769108fe279b75737213a2b81ba0c4 Mon Sep 17 00:00:00 2001 From: Jeffrey Picard Date: Tue, 9 Nov 2021 19:39:13 -0500 Subject: [PATCH] UDPServer / ip address resolution Got the UDPServer ping/pong protocol working internally, only tested against other running go hub servers. Should in theory work with python server and clients, but still need to test that. Also switched to serving udp on the same port as grpc, and taking that into account when pinging other hubs with udp. Unit test for udp ip address lookup. --- server/args.go | 4 -- server/federation.go | 92 +++++++++++++++++++++++++++++++-------- server/federation_test.go | 65 ++++++++++++++++++++++++--- server/server.go | 18 ++++---- server/udp.go | 9 ++-- 5 files changed, 148 insertions(+), 40 deletions(-) diff --git a/server/args.go b/server/args.go index b0374ba..beff48f 100644 --- a/server/args.go +++ b/server/args.go @@ -19,7 +19,6 @@ type Args struct { CmdType int Host string Port string - UDPPort string EsHost string EsPort string PrometheusPort string @@ -39,7 +38,6 @@ type Args struct { const ( DefaultHost = "0.0.0.0" DefaultPort = "50051" - DefaultUdpPort = "41119" DefaultEsHost = "http://localhost" DefaultEsIndex = "claims" DefaultEsPort = "9200" @@ -88,7 +86,6 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args { port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "RPC port", Default: DefaultPort}) esHost := parser.String("", "eshost", &argparse.Options{Required: false, Help: "elasticsearch host", Default: DefaultEsHost}) esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort}) - udpPort := parser.String("", "uspport", &argparse.Options{Required: false, Help: "udp ping port", Default: DefaultUdpPort}) prometheusPort := parser.String("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort}) esIndex := parser.String("", "esindex", &argparse.Options{Required: false, Help: "elasticsearch index name", Default: DefaultEsIndex}) refreshDelta := parser.Int("", "refresh-delta", &argparse.Options{Required: false, Help: "elasticsearch index refresh delta in seconds", Default: DefaultRefreshDelta}) @@ -125,7 +122,6 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args { Port: *port, EsHost: *esHost, EsPort: *esPort, - UDPPort: *udpPort, PrometheusPort: *prometheusPort, EsIndex: *esIndex, RefreshDelta: *refreshDelta, diff --git a/server/federation.go b/server/federation.go index e6f31cd..cebb5ba 100644 --- a/server/federation.go +++ b/server/federation.go @@ -4,6 +4,7 @@ import ( "bufio" "context" "log" + "math" "os" "strings" "sync/atomic" @@ -66,6 +67,19 @@ func (s *Server) getNumSubs() int64 { return *s.NumPeerSubs } +// getAndSetExternalIp takes the address of a peer running a UDP server and +// pings it, so we can determine our own external IP address. +func (s *Server) getAndSetExternalIp(msg *pb.ServerMessage) error { + myIp, err := UDPPing(msg.Address, msg.Port) + if err != nil { + return err + } + log.Println("my ip: ", myIp) + s.ExternalIP = myIp + + return nil +} + // loadPeers takes the arguments given to the hub at startup and loads the // previously known peers from disk and verifies their existence before // storing them as known peers. Returns a map of peerKey -> object @@ -73,6 +87,33 @@ func (s *Server) loadPeers() error { peerFile := s.Args.PeerFile port := s.Args.Port + // First we make sure our server has come up, so we can answer back to peers. + var failures = 0 + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + +retry: + time.Sleep(time.Second * time.Duration(math.Pow(float64(failures), 2))) + conn, err := grpc.DialContext(ctx, + "0.0.0.0:"+port, + grpc.WithInsecure(), + grpc.WithBlock(), + ) + + if err != nil { + if failures > 3 { + log.Println("Warning! Our endpoint doesn't seem to have come up, didn't load peers") + return err + } + failures += 1 + goto retry + } + if err = conn.Close(); err != nil { + log.Println(err) + } + cancel() + + f, err := os.Open(peerFile) if err != nil { log.Println(err) @@ -97,19 +138,22 @@ func (s *Server) loadPeers() error { } // If the peer is us, skip log.Println(ipPort) - if ipPort[1] == port && localHosts[ipPort[0]] { + if ipPort[1] == port && + (localHosts[ipPort[0]] || ipPort[0] == s.ExternalIP) { log.Println("Self peer, skipping ...") continue } + srvMsg := &pb.ServerMessage{ Address: ipPort[0], Port: ipPort[1], } log.Printf("pinging peer %+v\n", srvMsg) - err := s.addPeer(srvMsg, true) + err = s.addPeer(srvMsg, true, true) if err != nil { log.Println(err) } + } log.Println("Returning from loadPeers") @@ -133,20 +177,18 @@ func (s *Server) subscribeToPeer(peer *FederatedServer) error { defer conn.Close() msg := &pb.ServerMessage{ - Address: s.Args.Host, + Address: s.ExternalIP, Port: s.Args.Port, } c := pb.NewHubClient(conn) - log.Printf("%s:%s subscribing to %+v\n", s.Args.Host, s.Args.Port, peer) + log.Printf("%s:%s subscribing to %+v\n", s.ExternalIP, s.Args.Port, peer) _, err = c.PeerSubscribe(ctx, msg) if err != nil { return err } - s.Subscribed = true - return nil } @@ -175,11 +217,11 @@ func (s *Server) helloPeer(server *FederatedServer) (*pb.HelloMessage, error) { msg := &pb.HelloMessage{ Port: s.Args.Port, - Host: s.Args.Host, + Host: s.ExternalIP, Servers: []*pb.ServerMessage{}, } - log.Printf("%s:%s saying hello to %+v\n", s.Args.Host, s.Args.Port, server) + log.Printf("%s:%s saying hello to %+v\n", s.ExternalIP, s.Args.Port, server) res, err := c.Hello(ctx, msg) if err != nil { log.Println(err) @@ -282,19 +324,31 @@ func (s *Server) notifyPeerSubs(newServer *FederatedServer) { // addPeer takes a new peer as a pb.ServerMessage, optionally checks to see // if they're online, and adds them to our list of peer. If we're not currently // subscribed to a peer, it will also subscribe to it. -func (s *Server) addPeer(msg *pb.ServerMessage, ping bool) error { +func (s *Server) addPeer(msg *pb.ServerMessage, ping bool, subscribe bool) error { + // First thing we get our external ip if we don't have it, otherwise we + // could end up subscribed to our self, which is silly. + if s.ExternalIP == "" { + err := s.getAndSetExternalIp(msg) + if err != nil { + log.Println(err) + log.Println("WARNING: can't determine external IP, continuing with ", s.Args.Host) + } + } + if s.Args.Port == msg.Port && - (localHosts[msg.Address] || msg.Address == s.Args.Host) { - log.Printf("%s:%s addPeer: Self peer, skipping...\n", s.Args.Host, s.Args.Port) + (localHosts[msg.Address] || msg.Address == s.ExternalIP) { + log.Printf("%s:%s addPeer: Self peer, skipping...\n", s.ExternalIP, s.Args.Port) return nil } + k := peerKey(msg) newServer := &FederatedServer{ Address: msg.Address, Port: msg.Port, Ts: time.Now(), } - log.Printf("%s:%s adding peer %+v\n", s.Args.Host, s.Args.Port, msg) + + log.Printf("%s:%s adding peer %+v\n", s.ExternalIP, s.Args.Port, msg) if oldServer, loaded := s.PeerServersLoadOrStore(newServer); !loaded { if ping { _, err := s.helloPeer(newServer) @@ -312,11 +366,11 @@ func (s *Server) addPeer(msg *pb.ServerMessage, ping bool) error { s.notifyPeerSubs(newServer) // Subscribe to all our peers for now - err := s.subscribeToPeer(newServer) - if err != nil { - return err - } else { - s.Subscribed = true + if subscribe { + err := s.subscribeToPeer(newServer) + if err != nil { + return err + } } } else { oldServer.Ts = time.Now() @@ -328,7 +382,7 @@ func (s *Server) addPeer(msg *pb.ServerMessage, ping bool) error { // peers. func (s *Server) mergeFederatedServers(servers []*pb.ServerMessage) { for _, srvMsg := range servers { - err := s.addPeer(srvMsg, false) + err := s.addPeer(srvMsg, false, true) // This shouldn't happen because we're not pinging them. if err != nil { log.Println(err) @@ -352,7 +406,7 @@ func (s *Server) makeHelloMessage() *pb.HelloMessage { return &pb.HelloMessage{ Port: s.Args.Port, - Host: s.Args.Host, + Host: s.ExternalIP, Servers: servers, } } diff --git a/server/federation_test.go b/server/federation_test.go index b6286a8..2719917 100644 --- a/server/federation_test.go +++ b/server/federation_test.go @@ -49,7 +49,6 @@ func makeDefaultArgs() *Args { Port: DefaultPort, EsHost: DefaultEsHost, EsPort: DefaultEsPort, - UDPPort: DefaultUdpPort, PrometheusPort: DefaultPrometheusPort, EsIndex: DefaultEsIndex, RefreshDelta: DefaultRefreshDelta, @@ -89,7 +88,7 @@ func TestAddPeer(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T){ server := MakeHubServer(ctx, args) - server.Subscribed = true + server.ExternalIP = "0.0.0.0" metrics.PeersKnown.Set(0) for i := 0; i < 10; i++ { @@ -107,7 +106,7 @@ func TestAddPeer(t *testing.T) { } } //log.Printf("Adding peer %+v\n", msg) - err := server.addPeer(msg, false) + err := server.addPeer(msg, false, false) if err != nil { log.Println(err) } @@ -148,7 +147,7 @@ func TestPeerWriter(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T){ server := MakeHubServer(ctx, args) - server.Subscribed = true + server.ExternalIP = "0.0.0.0" for i := 0; i < 10; i++ { var msg *pb.ServerMessage @@ -165,7 +164,7 @@ func TestPeerWriter(t *testing.T) { } } //log.Printf("Adding peer %+v\n", msg) - err := server.addPeer(msg, false) + err := server.addPeer(msg, false, false) if err != nil { log.Println(err) } @@ -425,3 +424,59 @@ func TestAddPeerEndpoint3(t *testing.T) { } } + + +// TestAddPeer tests the ability to add peers +func TestUDPServer(t *testing.T) { + ctx := context.Background() + args := makeDefaultArgs() + args.StartUDP = true + args2 := makeDefaultArgs() + args2.Port = "50052" + args2.StartUDP = true + + tests := []struct { + name string + want string + } { + { + name: "hubs server external ip", + want: "127.0.0.1", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T){ + server := MakeHubServer(ctx, args) + server2 := MakeHubServer(ctx, args2) + go server.Run() + go server2.Run() + metrics.PeersKnown.Set(0) + + msg := &pb.ServerMessage{ + Address: "0.0.0.0", + Port: "50052", + } + + err := server.addPeer(msg, true, true) + if err != nil { + log.Println(err) + } + + server.GrpcServer.GracefulStop() + server2.GrpcServer.GracefulStop() + + got1 := server.ExternalIP + if got1 != tt.want { + t.Errorf("server.ExternalIP = %s, want %s\n", got1, tt.want) + t.Errorf("server.Args.Port = %s\n", server.Args.Port) + } + got2 := server2.ExternalIP + if got2 != tt.want { + t.Errorf("server2.ExternalIP = %s, want %s\n", got2, tt.want) + t.Errorf("server2.Args.Port = %s\n", server2.Args.Port) + } + }) + } + +} diff --git a/server/server.go b/server/server.go index e82ebe2..b59bfb4 100644 --- a/server/server.go +++ b/server/server.go @@ -41,7 +41,7 @@ type Server struct { PeerSubs map[string]*FederatedServer PeerSubsMut sync.RWMutex NumPeerSubs *int64 - Subscribed bool + ExternalIP string pb.UnimplementedHubServer } @@ -202,7 +202,7 @@ func MakeHubServer(ctx context.Context, args *Args) *Server { PeerSubs: make(map[string]*FederatedServer), PeerSubsMut: sync.RWMutex{}, NumPeerSubs: numSubs, - Subscribed: false, + ExternalIP: "", } // Start up our background services @@ -219,10 +219,12 @@ func MakeHubServer(ctx context.Context, args *Args) *Server { } // Load peers from disk and subscribe to one if there are any if args.LoadPeers { - err = s.loadPeers() - if err != nil { - log.Println(err) - } + go func() { + err := s.loadPeers() + if err != nil { + log.Println(err) + } + }() } return s @@ -251,7 +253,7 @@ func (s *Server) Hello(ctx context.Context, args *pb.HelloMessage) (*pb.HelloMes } log.Println(server) - err := s.addPeer(&pb.ServerMessage{Address: host, Port: port}, false) + err := s.addPeer(&pb.ServerMessage{Address: host, Port: port}, false, true) // They just contacted us, so this shouldn't happen if err != nil { log.Println(err) @@ -288,7 +290,7 @@ func (s *Server) PeerSubscribe(ctx context.Context, in *pb.ServerMessage) (*pb.S func (s *Server) AddPeer(ctx context.Context, args *pb.ServerMessage) (*pb.StringValue, error) { metrics.RequestsCount.With(prometheus.Labels{"method": "add_peer"}).Inc() var msg = "Success" - err := s.addPeer(args, true) + err := s.addPeer(args, true, true) if err != nil { log.Println(err) msg = "Failed" diff --git a/server/udp.go b/server/udp.go index bf8ad5f..6b5140c 100644 --- a/server/udp.go +++ b/server/udp.go @@ -99,12 +99,12 @@ func decodeSPVPong(data []byte) *SPVPong { parsedProtocalVersion := data[0] flags := data[1] - height := binary.BigEndian.Uint32(data[:2]) + height := binary.BigEndian.Uint32(data[2:]) tip := make([]byte, 32) copy(tip, data[6:38]) srcRawAddr := make([]byte, 4) copy(srcRawAddr, data[38:42]) - country := binary.BigEndian.Uint16(data[:42]) + country := binary.BigEndian.Uint16(data[42:]) return &SPVPong{ protocolVersion: parsedProtocalVersion, flags: flags, @@ -148,7 +148,8 @@ func (pong *SPVPong) DecodeAddress() string { // UDPPing sends a ping over udp to another hub and returns the ip address of // this hub. -func UDPPing(address string) (string, error) { +func UDPPing(ip, port string) (string, error) { + address := ip + ":" + port addr, err := net.ResolveUDPAddr("udp", address) if err != nil { return "", err @@ -192,7 +193,7 @@ func UDPPing(address string) (string, error) { // Ping/Pong protocol to find out about each other without making full TCP // connections. func UDPServer(args *Args) error { - address := ":" + args.UDPPort + address := ":" + args.Port tip := make([]byte, 32) addr, err := net.ResolveUDPAddr("udp", address) if err != nil {