diff --git a/cmd/dht.go b/cmd/dht.go index b277af4..5e0f637 100644 --- a/cmd/dht.go +++ b/cmd/dht.go @@ -33,6 +33,7 @@ func (n *NodeRPC) Ping(r *http.Request, args *PingArgs, result *PingResult) erro } var dhtPort int +var rpcPort int func init() { var cmd = &cobra.Command{ @@ -44,6 +45,7 @@ func init() { } cmd.PersistentFlags().StringP("nodeID", "n", "", "nodeID in hex") cmd.PersistentFlags().IntVar(&dhtPort, "port", 4567, "Port to start DHT on") + cmd.PersistentFlags().IntVar(&rpcPort, "rpc_port", 1234, "Port to listen for rpc commands on") rootCmd.AddCommand(cmd) } @@ -70,21 +72,14 @@ func dhtCmd(cmd *cobra.Command, args []string) { } log.Println(nodeID.String()) node := dht.NewBootstrapNode(nodeID, 1*time.Millisecond, 1*time.Minute) - listener, err := net.ListenPacket(dht.Network, "0.0.0.0:"+strconv.Itoa(dhtPort)) + listener, err := net.ListenPacket(dht.Network, "127.0.0.1:"+strconv.Itoa(dhtPort)) checkErr(err) conn := listener.(*net.UDPConn) err = node.Connect(conn) checkErr(err) log.Println("started node") - node.AddKnownNode( - dht.Contact{ - bits.FromHexP("62c8ad9fb40a16062e884a63cd81f47b94604446319663d1334e1734dcefc8874b348ec683225e4852017a846e07d94e"), - net.ParseIP("34.231.152.182"), - 4444, - 3333, - }) _, _, err = dht.FindContacts(&node.Node, nodeID.Sub(bits.FromBigP(big.NewInt(1))), false, nil) - rpcServer := dht.RunRPCServer(":1234", "/", node) + rpcServer := dht.RunRPCServer("127.0.0.1:"+strconv.Itoa(rpcPort), "/", node) interruptChan := make(chan os.Signal, 1) signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM, syscall.SIGINT) <-interruptChan diff --git a/cmd/root.go b/cmd/root.go index f9528ef..35d410c 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -27,7 +27,7 @@ var verbose []string const ( verboseAll = "all" verboseDHT = "dht" - verboseNodeFinder = "nodefinder" + verboseNodeFinder = "node_finder" ) var conf string diff --git a/dht/contact.go b/dht/contact.go index cd4fb88..64f205b 100644 --- a/dht/contact.go +++ b/dht/contact.go @@ -17,8 +17,8 @@ import ( type Contact struct { ID bits.Bitmap IP net.IP - Port int - PeerPort int + Port int // the udp port used for the dht + PeerPort int // the tcp port a peer can be contacted on for blob requests } // Equals returns true if two contacts are the same. diff --git a/dht/node_rpc.go b/dht/node_rpc.go index 7763492..a04349d 100644 --- a/dht/node_rpc.go +++ b/dht/node_rpc.go @@ -1,10 +1,11 @@ package dht import ( + "errors" "net" "net/http" - "errors" "sync" + "github.com/gorilla/mux" "github.com/gorilla/rpc" "github.com/gorilla/rpc/json" @@ -12,7 +13,7 @@ import ( ) type NodeRPCServer struct { - Wg sync.WaitGroup + Wg sync.WaitGroup Node *BootstrapNode } @@ -23,13 +24,12 @@ type NodeRPC int type PingArgs struct { NodeID string - IP string - Port int + IP string + Port int } type PingResult string - func (n *NodeRPC) Ping(r *http.Request, args *PingArgs, result *PingResult) error { if rpcServer == nil { return errors.New("no node set up") @@ -48,16 +48,16 @@ func (n *NodeRPC) Ping(r *http.Request, args *PingArgs, result *PingResult) erro } type FindArgs struct { - Key string + Key string NodeID string - IP string - Port int + IP string + Port int } type ContactResponse struct { NodeID string - IP string - Port int + IP string + Port int } type FindNodeResult []ContactResponse @@ -75,7 +75,7 @@ func (n *NodeRPC) FindNode(r *http.Request, args *FindArgs, result *FindNodeResu return err } c := Contact{ID: toQuery, IP: net.ParseIP(args.IP), Port: args.Port} - req := Request{ Arg: &key, Method: "findNode"} + req := Request{Arg: &key, Method: "findNode"} nodeResponse := rpcServer.Node.Send(c, req) contacts := []ContactResponse{} if nodeResponse != nil && nodeResponse.Contacts != nil { @@ -89,7 +89,7 @@ func (n *NodeRPC) FindNode(r *http.Request, args *FindArgs, result *FindNodeResu type FindValueResult struct { Contacts []ContactResponse - Value string + Value string } func (n *NodeRPC) FindValue(r *http.Request, args *FindArgs, result *FindValueResult) error { @@ -105,7 +105,7 @@ func (n *NodeRPC) FindValue(r *http.Request, args *FindArgs, result *FindValueRe return err } c := Contact{ID: toQuery, IP: net.ParseIP(args.IP), Port: args.Port} - req := Request{ Arg: &key, Method: "findValue"} + req := Request{Arg: &key, Method: "findValue"} nodeResponse := rpcServer.Node.Send(c, req) contacts := []ContactResponse{} if nodeResponse != nil && nodeResponse.FindValueKey != "" { @@ -126,7 +126,7 @@ type IterativeFindValueArgs struct { } type IterativeFindValueResult struct { - Contacts []ContactResponse + Contacts []ContactResponse FoundValue bool } @@ -149,19 +149,19 @@ func (n *NodeRPC) IterativeFindValue(r *http.Request, args *IterativeFindValueAr } type BucketResponse struct { - Start string - End string - Count int + Start string + End string + Count int Contacts []ContactResponse } type RoutingTableResponse struct { - NodeID string - Count int + NodeID string + Count int Buckets []BucketResponse } -type GetRoutingTableArgs struct {} +type GetRoutingTableArgs struct{} func (n *NodeRPC) GetRoutingTable(r *http.Request, args *GetRoutingTableArgs, result *RoutingTableResponse) error { if rpcServer == nil { @@ -182,14 +182,27 @@ func (n *NodeRPC) GetRoutingTable(r *http.Request, args *GetRoutingTableArgs, re return nil } +type AddKnownNodeResponse struct{} + +func (n *NodeRPC) AddKnownNode(r *http.Request, args *ContactResponse, result *AddKnownNodeResponse) error { + if rpcServer == nil { + return errors.New("no node set up") + } + rpcServer.Node.AddKnownNode( + Contact{ + bits.FromHexP(args.NodeID), + net.ParseIP(args.IP), args.Port, 0, + }) + return nil +} + func RunRPCServer(address, rpcPath string, node *BootstrapNode) NodeRPCServer { mut.Lock() defer mut.Unlock() rpcServer = &NodeRPCServer{ - Wg: sync.WaitGroup{}, + Wg: sync.WaitGroup{}, Node: node, } - c := make(chan *http.Server) rpcServer.Wg.Add(1) go func() { s := rpc.NewServer() @@ -202,15 +215,7 @@ func RunRPCServer(address, rpcPath string, node *BootstrapNode) NodeRPCServer { server := &http.Server{Addr: address, Handler: r} log.Println("rpc listening on " + address) server.ListenAndServe() - c <- server - }() - go func() { - rpcServer.Wg.Wait() - close(c) - log.Println("stopped rpc listening on " + address) - for server := range c { - server.Close() - } }() + return *rpcServer }