-add rpc_port argument
-run node on localhost for testing
This commit is contained in:
Jack Robison 2018-07-17 17:19:03 -04:00
parent 620a5d7d48
commit c967af4a98
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
4 changed files with 43 additions and 43 deletions

View file

@ -33,6 +33,7 @@ func (n *NodeRPC) Ping(r *http.Request, args *PingArgs, result *PingResult) erro
} }
var dhtPort int var dhtPort int
var rpcPort int
func init() { func init() {
var cmd = &cobra.Command{ var cmd = &cobra.Command{
@ -44,6 +45,7 @@ func init() {
} }
cmd.PersistentFlags().StringP("nodeID", "n", "", "nodeID in hex") cmd.PersistentFlags().StringP("nodeID", "n", "", "nodeID in hex")
cmd.PersistentFlags().IntVar(&dhtPort, "port", 4567, "Port to start DHT on") cmd.PersistentFlags().IntVar(&dhtPort, "port", 4567, "Port to start DHT on")
cmd.PersistentFlags().IntVar(&rpcPort, "rpc_port", 1234, "Port to listen for rpc commands on")
rootCmd.AddCommand(cmd) rootCmd.AddCommand(cmd)
} }
@ -70,21 +72,14 @@ func dhtCmd(cmd *cobra.Command, args []string) {
} }
log.Println(nodeID.String()) log.Println(nodeID.String())
node := dht.NewBootstrapNode(nodeID, 1*time.Millisecond, 1*time.Minute) node := dht.NewBootstrapNode(nodeID, 1*time.Millisecond, 1*time.Minute)
listener, err := net.ListenPacket(dht.Network, "0.0.0.0:"+strconv.Itoa(dhtPort)) listener, err := net.ListenPacket(dht.Network, "127.0.0.1:"+strconv.Itoa(dhtPort))
checkErr(err) checkErr(err)
conn := listener.(*net.UDPConn) conn := listener.(*net.UDPConn)
err = node.Connect(conn) err = node.Connect(conn)
checkErr(err) checkErr(err)
log.Println("started node") log.Println("started node")
node.AddKnownNode(
dht.Contact{
bits.FromHexP("62c8ad9fb40a16062e884a63cd81f47b94604446319663d1334e1734dcefc8874b348ec683225e4852017a846e07d94e"),
net.ParseIP("34.231.152.182"),
4444,
3333,
})
_, _, err = dht.FindContacts(&node.Node, nodeID.Sub(bits.FromBigP(big.NewInt(1))), false, nil) _, _, err = dht.FindContacts(&node.Node, nodeID.Sub(bits.FromBigP(big.NewInt(1))), false, nil)
rpcServer := dht.RunRPCServer(":1234", "/", node) rpcServer := dht.RunRPCServer("127.0.0.1:"+strconv.Itoa(rpcPort), "/", node)
interruptChan := make(chan os.Signal, 1) interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM, syscall.SIGINT) signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
<-interruptChan <-interruptChan

View file

@ -27,7 +27,7 @@ var verbose []string
const ( const (
verboseAll = "all" verboseAll = "all"
verboseDHT = "dht" verboseDHT = "dht"
verboseNodeFinder = "nodefinder" verboseNodeFinder = "node_finder"
) )
var conf string var conf string

View file

@ -17,8 +17,8 @@ import (
type Contact struct { type Contact struct {
ID bits.Bitmap ID bits.Bitmap
IP net.IP IP net.IP
Port int Port int // the udp port used for the dht
PeerPort int PeerPort int // the tcp port a peer can be contacted on for blob requests
} }
// Equals returns true if two contacts are the same. // Equals returns true if two contacts are the same.

View file

@ -1,10 +1,11 @@
package dht package dht
import ( import (
"errors"
"net" "net"
"net/http" "net/http"
"errors"
"sync" "sync"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/gorilla/rpc" "github.com/gorilla/rpc"
"github.com/gorilla/rpc/json" "github.com/gorilla/rpc/json"
@ -12,7 +13,7 @@ import (
) )
type NodeRPCServer struct { type NodeRPCServer struct {
Wg sync.WaitGroup Wg sync.WaitGroup
Node *BootstrapNode Node *BootstrapNode
} }
@ -23,13 +24,12 @@ type NodeRPC int
type PingArgs struct { type PingArgs struct {
NodeID string NodeID string
IP string IP string
Port int Port int
} }
type PingResult string type PingResult string
func (n *NodeRPC) Ping(r *http.Request, args *PingArgs, result *PingResult) error { func (n *NodeRPC) Ping(r *http.Request, args *PingArgs, result *PingResult) error {
if rpcServer == nil { if rpcServer == nil {
return errors.New("no node set up") return errors.New("no node set up")
@ -48,16 +48,16 @@ func (n *NodeRPC) Ping(r *http.Request, args *PingArgs, result *PingResult) erro
} }
type FindArgs struct { type FindArgs struct {
Key string Key string
NodeID string NodeID string
IP string IP string
Port int Port int
} }
type ContactResponse struct { type ContactResponse struct {
NodeID string NodeID string
IP string IP string
Port int Port int
} }
type FindNodeResult []ContactResponse type FindNodeResult []ContactResponse
@ -75,7 +75,7 @@ func (n *NodeRPC) FindNode(r *http.Request, args *FindArgs, result *FindNodeResu
return err return err
} }
c := Contact{ID: toQuery, IP: net.ParseIP(args.IP), Port: args.Port} c := Contact{ID: toQuery, IP: net.ParseIP(args.IP), Port: args.Port}
req := Request{ Arg: &key, Method: "findNode"} req := Request{Arg: &key, Method: "findNode"}
nodeResponse := rpcServer.Node.Send(c, req) nodeResponse := rpcServer.Node.Send(c, req)
contacts := []ContactResponse{} contacts := []ContactResponse{}
if nodeResponse != nil && nodeResponse.Contacts != nil { if nodeResponse != nil && nodeResponse.Contacts != nil {
@ -89,7 +89,7 @@ func (n *NodeRPC) FindNode(r *http.Request, args *FindArgs, result *FindNodeResu
type FindValueResult struct { type FindValueResult struct {
Contacts []ContactResponse Contacts []ContactResponse
Value string Value string
} }
func (n *NodeRPC) FindValue(r *http.Request, args *FindArgs, result *FindValueResult) error { func (n *NodeRPC) FindValue(r *http.Request, args *FindArgs, result *FindValueResult) error {
@ -105,7 +105,7 @@ func (n *NodeRPC) FindValue(r *http.Request, args *FindArgs, result *FindValueRe
return err return err
} }
c := Contact{ID: toQuery, IP: net.ParseIP(args.IP), Port: args.Port} c := Contact{ID: toQuery, IP: net.ParseIP(args.IP), Port: args.Port}
req := Request{ Arg: &key, Method: "findValue"} req := Request{Arg: &key, Method: "findValue"}
nodeResponse := rpcServer.Node.Send(c, req) nodeResponse := rpcServer.Node.Send(c, req)
contacts := []ContactResponse{} contacts := []ContactResponse{}
if nodeResponse != nil && nodeResponse.FindValueKey != "" { if nodeResponse != nil && nodeResponse.FindValueKey != "" {
@ -126,7 +126,7 @@ type IterativeFindValueArgs struct {
} }
type IterativeFindValueResult struct { type IterativeFindValueResult struct {
Contacts []ContactResponse Contacts []ContactResponse
FoundValue bool FoundValue bool
} }
@ -149,19 +149,19 @@ func (n *NodeRPC) IterativeFindValue(r *http.Request, args *IterativeFindValueAr
} }
type BucketResponse struct { type BucketResponse struct {
Start string Start string
End string End string
Count int Count int
Contacts []ContactResponse Contacts []ContactResponse
} }
type RoutingTableResponse struct { type RoutingTableResponse struct {
NodeID string NodeID string
Count int Count int
Buckets []BucketResponse Buckets []BucketResponse
} }
type GetRoutingTableArgs struct {} type GetRoutingTableArgs struct{}
func (n *NodeRPC) GetRoutingTable(r *http.Request, args *GetRoutingTableArgs, result *RoutingTableResponse) error { func (n *NodeRPC) GetRoutingTable(r *http.Request, args *GetRoutingTableArgs, result *RoutingTableResponse) error {
if rpcServer == nil { if rpcServer == nil {
@ -182,14 +182,27 @@ func (n *NodeRPC) GetRoutingTable(r *http.Request, args *GetRoutingTableArgs, re
return nil return nil
} }
type AddKnownNodeResponse struct{}
func (n *NodeRPC) AddKnownNode(r *http.Request, args *ContactResponse, result *AddKnownNodeResponse) error {
if rpcServer == nil {
return errors.New("no node set up")
}
rpcServer.Node.AddKnownNode(
Contact{
bits.FromHexP(args.NodeID),
net.ParseIP(args.IP), args.Port, 0,
})
return nil
}
func RunRPCServer(address, rpcPath string, node *BootstrapNode) NodeRPCServer { func RunRPCServer(address, rpcPath string, node *BootstrapNode) NodeRPCServer {
mut.Lock() mut.Lock()
defer mut.Unlock() defer mut.Unlock()
rpcServer = &NodeRPCServer{ rpcServer = &NodeRPCServer{
Wg: sync.WaitGroup{}, Wg: sync.WaitGroup{},
Node: node, Node: node,
} }
c := make(chan *http.Server)
rpcServer.Wg.Add(1) rpcServer.Wg.Add(1)
go func() { go func() {
s := rpc.NewServer() s := rpc.NewServer()
@ -202,15 +215,7 @@ func RunRPCServer(address, rpcPath string, node *BootstrapNode) NodeRPCServer {
server := &http.Server{Addr: address, Handler: r} server := &http.Server{Addr: address, Handler: r}
log.Println("rpc listening on " + address) log.Println("rpc listening on " + address)
server.ListenAndServe() server.ListenAndServe()
c <- server
}()
go func() {
rpcServer.Wg.Wait()
close(c)
log.Println("stopped rpc listening on " + address)
for server := range c {
server.Close()
}
}() }()
return *rpcServer return *rpcServer
} }