From d15245bd05873cad3a71985f87667170b2bf0dff Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Tue, 7 Aug 2018 11:00:04 -0400 Subject: [PATCH] fix rpc server --- Gopkg.lock | 88 ++++++++++--------- cmd/dht.go | 63 +++++--------- dht/bootstrap.go | 6 +- dht/config.go | 2 + dht/dht.go | 8 ++ dht/node.go | 5 ++ dht/node_rpc.go | 221 ----------------------------------------------- dht/rpc.go | 176 +++++++++++++++++++++++++++++++++++++ 8 files changed, 261 insertions(+), 308 deletions(-) delete mode 100644 dht/node_rpc.go create mode 100644 dht/rpc.go diff --git a/Gopkg.lock b/Gopkg.lock index 3d3ba0f..704b144 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -3,15 +3,15 @@ [[projects]] branch = "master" - digest = "1:436959adf1a11c1ee93ee7cd3b25dfa63f235f9cc283d86f1606626d0b7efbb3" + digest = "1:354e62d5acb9af138e13ec842f78a846d214a8d4a9f80e578698f1f1565e2ef8" name = "github.com/armon/go-metrics" packages = ["."] pruneopts = "" - revision = "783273d703149aaeb9897cf58613d5af48861c25" + revision = "3c58d8115a78a6879e5df75ae900846768d36895" [[projects]] branch = "master" - digest = "1:a18cd84ee5872580c1f19e7935bc5490f4ac6a8deaa78ba00a50aaa2a772c6e3" + digest = "1:b1b9627af19ee54d3ed6b069375f0e91baa4a25267cf3b684e80fdefb17f4719" name = "github.com/aws/aws-sdk-go" packages = [ "aws", @@ -24,6 +24,7 @@ "aws/credentials/ec2rolecreds", "aws/credentials/endpointcreds", "aws/credentials/stscreds", + "aws/csm", "aws/defaults", "aws/ec2metadata", "aws/endpoints", @@ -32,8 +33,11 @@ "aws/signer/v4", "internal/sdkio", "internal/sdkrand", + "internal/sdkuri", "internal/shareddefaults", "private/protocol", + "private/protocol/eventstream", + "private/protocol/eventstream/eventstreamapi", "private/protocol/query", "private/protocol/query/queryutil", "private/protocol/rest", @@ -45,15 +49,15 @@ "service/sts", ] pruneopts = "" - revision = "59a21fcfacca3b32766b385235797649ec791e48" + revision = "c0447dbaaf195bb477fd2d511b8e4665e04b9017" [[projects]] branch = "master" - digest = "1:827862daac23c3e946d7f0800957967e88e5d4b4b8437a303291237db3f565f3" + digest = "1:56b87c786a316d6e9b9c7ba8f3dd64e3199ca3b33a55cc596c633023bed20264" name = "github.com/btcsuite/btcutil" packages = ["base58"] pruneopts = "" - revision = "501929d3d046174c3d39f0ea54ece471aa17238c" + revision = "ab6388e0c60ae4834a1f57511e20c17b5f78be4b" [[projects]] digest = "1:56c130d885a4aacae1dd9c7b71cfe39912c7ebc1ff7d2b46083c8812996dc43b" @@ -72,20 +76,20 @@ version = "v1.0.1" [[projects]] - digest = "1:f6d8770900622a855dbeda0aaefb536ad97bc30315e6647a02c4e2044f4a858f" + digest = "1:858b7fe7b0f4bc7ef9953926828f2816ea52d01a88d72d1c45bc8c108f23c356" name = "github.com/go-ini/ini" packages = ["."] pruneopts = "" - revision = "6529cf7c58879c08d927016dde4477f18a0634cb" - version = "v1.36.0" + revision = "358ee7663966325963d4e8b2e1fbd570c5195153" + version = "v1.38.1" [[projects]] branch = "master" - digest = "1:fd1248d2993def2a9ef5b8112b2cb3f4f948e42c69bba544a9208e1bacb5ac8f" + digest = "1:7150b23ba935d63f7b930d6c5ff20b52649ba623d62e0344971c822615fe57a3" name = "github.com/go-sql-driver/mysql" packages = ["."] pruneopts = "" - revision = "3287d94d4c6a48a63e16fffaabf27ab20203af2a" + revision = "99ff426eb706cffe92ff3d058e168b278cabf7c7" [[projects]] digest = "1:dbbeb8ddb0be949954c8157ee8439c2adfd8dc1c9510eb44a6e58cb68c3dce28" @@ -107,8 +111,8 @@ digest = "1:91aaeb45b3c10cc9cb68d1450cbc8ac77d0a677cf34a8ed3d4ef4dacb9df8a50" name = "github.com/gorilla/rpc" packages = [ - ".", - "json", + "v2", + "v2/json", ] pruneopts = "" revision = "22c016f3df3febe0c1f6727598b6389507e03a18" @@ -124,11 +128,11 @@ [[projects]] branch = "master" - digest = "1:304c322b62533a48ac052ffee80f67087fce1bc07186cd4e610a1b0e77765836" + digest = "1:4fe55793760295fbef367890352b720784243e0ad19b5ee242519a4682bb9ef8" name = "github.com/hashicorp/errwrap" packages = ["."] pruneopts = "" - revision = "7554cd9344cec97297fa6649b055a8c98c2a1e55" + revision = "d6c0cd88035724dd42e0f335ae30161c20575ecc" [[projects]] branch = "master" @@ -148,11 +152,11 @@ [[projects]] branch = "master" - digest = "1:b46ef59de1f724e8a2b508ea2b329eaf6cac4d71cbd44ad5e3dbd4e8fd49de9b" + digest = "1:0b5ca7d18e4ded1e4dacbb37ff027cb40a80c0fed969e4e03cf7aff129bc1b44" name = "github.com/hashicorp/go-multierror" packages = ["."] pruneopts = "" - revision = "b7773ae218740a7be65057fc60b366a49b538a44" + revision = "3d5d8f294aa03d8e98859feac328afbdf1ae0703" [[projects]] branch = "master" @@ -180,14 +184,14 @@ [[projects]] branch = "master" - digest = "1:a899b8e21f5ed142d242ce3d413d023346067802390914515cbb091fbfd7d72c" + digest = "1:7b8e4a60bfdacc2a79ba4a4ef21b2e86e98fb1dc99d816179e0b4aee75106051" name = "github.com/hashicorp/serf" packages = [ "coordinate", "serf", ] pruneopts = "" - revision = "80ab48778deee28e4ea2dc4ef1ebb2c5f4063996" + revision = "984a73625de3138f44deb38d00878fab39eb6447" [[projects]] digest = "1:870d441fe217b8e689d7949fef6e43efbc787e50f200cb1e70dbca9204a1d6be" @@ -214,7 +218,7 @@ [[projects]] branch = "master" - digest = "1:3e70054e223de253fcd46190b2b925ef3ad10ba1d7845e097179547579604418" + digest = "1:3e990fec1701f7cd3a301cb0fa824f65e35a37c224ff17f4d842720651d2f2fb" name = "github.com/lbryio/lbry.go" packages = [ "crypto", @@ -225,7 +229,7 @@ "util", ] pruneopts = "" - revision = "821cfb748eb42aa7b6fb345a174950f3ed78dff5" + revision = "e2c96944fc485d3ab5e164da78f8439a94c5aa85" [[projects]] branch = "master" @@ -236,20 +240,20 @@ revision = "b7abd7672df533e627eddbf3a5a529786e8bda7f" [[projects]] - digest = "1:dadcf27115348ca2452e16b50f67b66f61ba9d7cd50c61bf8a6e2611d8dc1c7c" + digest = "1:4c8d8358c45ba11ab7bb15df749d4df8664ff1582daead28bae58cf8cbe49890" name = "github.com/miekg/dns" packages = ["."] pruneopts = "" - revision = "eac804ceef194db2da6ee80c728d7658c8c805ff" - version = "v1.0.6" + revision = "5a2b9fab83ff0f8bfc99684bd5f43a37abe560f1" + version = "v1.0.8" [[projects]] - digest = "1:3cb50c403fa46c85697dbc4e06a95008689e058f33466b7eb8d31ea0eb291ea3" + digest = "1:e6352ff4bd34c601567ad5e274837275f08e2a933e2688354cf5d44595c13ef9" name = "github.com/nlopes/slack" packages = ["."] pruneopts = "" - revision = "8ab4d0b364ef1e9af5d102531da20d5ec902b6c4" - version = "v0.2.0" + revision = "0db1d5eae1116bf7c8ed96c6749acfbf4daaec3e" + version = "v0.3.0" [[projects]] branch = "master" @@ -269,11 +273,11 @@ [[projects]] branch = "master" - digest = "1:5d8eb114a78540fe5217d77bc5d232e171e14406b37ac72bada1cd5fa65d633e" + digest = "1:56de39853758a4b6053a3f71e527305bbed11a0d876156e32e8cc7180d36198b" name = "github.com/sirupsen/logrus" packages = ["."] pruneopts = "" - revision = "778f2e774c725116edbc3d039dc0dfc1cc62aae8" + revision = "d329d24db4313262a3b0a24d8aeb1dc4bd294fb0" [[projects]] digest = "1:d0b38ba6da419a6d4380700218eeec8623841d44a856bb57369c172fbf692ab4" @@ -285,11 +289,11 @@ [[projects]] branch = "master" - digest = "1:a1403cc8a94b8d7956ee5e9694badef0e7b051af289caad1cf668331e3ffa4f6" + digest = "1:c8f6919ab9f140506fd4ad3f4a9c9c2af9ee7921e190af0c67b2fca2f903083c" name = "github.com/spf13/cobra" packages = ["."] pruneopts = "" - revision = "ef82de70bb3f60c65fb8eebacbb2d122ef517385" + revision = "7c4570c3ebeb8129a1f7456d0908a8b676b6f9f1" [[projects]] digest = "1:8e243c568f36b09031ec18dff5f7d2769dcf5ca4d624ea511c8e3197dc3d352d" @@ -309,7 +313,7 @@ [[projects]] branch = "master" - digest = "1:e231930ac5a059ec3784a0f88a4909dad69f438dfbc78f119db43d2ba97a44b5" + digest = "1:53c4b75f22ea7757dea07eae380ea42de547ae6865a5e3b41866754a8a8219c9" name = "golang.org/x/crypto" packages = [ "ed25519", @@ -319,11 +323,11 @@ "ssh/terminal", ] pruneopts = "" - revision = "94e3fad7f1b4eed4ec147751ad6b4c4d33f00611" + revision = "f027049dab0ad238e394a753dba2d14753473a04" [[projects]] branch = "master" - digest = "1:9e548233d0dc00e74be262e54a9d1bbe7e4c19e5951083520261740e37daeb02" + digest = "1:9f170ebb5ac75debb7e958e0388545441cc77de4d131a0c170530e948f3e857e" name = "golang.org/x/net" packages = [ "bpf", @@ -334,18 +338,18 @@ "ipv6", ] pruneopts = "" - revision = "2491c5de3490fced2f6cff376127c667efeed857" + revision = "19491d39cadbd9cd33f26ca22cc89ba4ba38251c" [[projects]] branch = "master" - digest = "1:b86b91c7728e9caf753e003ca8858099d91ed7053298c2f586657ab4c5537c3a" + digest = "1:309d0f514b3f0dd143089ff4ab91c894d3e3f7e771c89b59d4b015b955cbaa5c" name = "golang.org/x/sys" packages = [ "unix", "windows", ] pruneopts = "" - revision = "d0faeb539838e250bd0a9db4182d48d4a1915181" + revision = "0718ef2ef256118d53a01598f179001ec2af7626" [[projects]] branch = "master" @@ -356,12 +360,12 @@ revision = "fbb02b2291d28baffd63558aa44b4b56f178d650" [[projects]] - digest = "1:934fb8966f303ede63aa405e2c8d7f0a427a05ea8df335dfdc1833dd4d40756f" + digest = "1:c1771ca6060335f9768dff6558108bc5ef6c58506821ad43377ee23ff059e472" name = "google.golang.org/appengine" packages = ["cloudsql"] pruneopts = "" - revision = "150dc57a1b433e64154302bdc40b6bb8aefa313a" - version = "v1.0.0" + revision = "b1f26356af11148e710935ed1ac8a7f5702c7612" + version = "v1.1.0" [[projects]] digest = "1:f771bf87a3253de520c2af6fb6e75314dce0fedc0b30b208134fe502932bb15d" @@ -384,8 +388,8 @@ "github.com/davecgh/go-spew/spew", "github.com/go-sql-driver/mysql", "github.com/gorilla/mux", - "github.com/gorilla/rpc", - "github.com/gorilla/rpc/json", + "github.com/gorilla/rpc/v2", + "github.com/gorilla/rpc/v2/json", "github.com/hashicorp/serf/serf", "github.com/lbryio/errors.go", "github.com/lbryio/lbry.go/crypto", diff --git a/cmd/dht.go b/cmd/dht.go index 5e0f637..04d329e 100644 --- a/cmd/dht.go +++ b/cmd/dht.go @@ -2,9 +2,7 @@ package cmd import ( "log" - "math/big" "net" - "net/http" "os" "os/signal" "strconv" @@ -17,35 +15,23 @@ import ( "github.com/spf13/cobra" ) -type NodeRPC string - -type PingArgs struct { - nodeID string - address string - port int -} - -type PingResult string - -func (n *NodeRPC) Ping(r *http.Request, args *PingArgs, result *PingResult) error { - *result = PingResult("pong") - return nil -} - +var dhtNodeID string var dhtPort int -var rpcPort int +var dhtRpcPort int +var dhtSeeds []string func init() { var cmd = &cobra.Command{ - Use: "dht [bootstrap|connect]", + Use: "dht [connect|bootstrap]", Short: "Run dht node", - ValidArgs: []string{"start", "bootstrap"}, + ValidArgs: []string{"connect", "bootstrap"}, Args: argFuncs(cobra.ExactArgs(1), cobra.OnlyValidArgs), Run: dhtCmd, } - cmd.PersistentFlags().StringP("nodeID", "n", "", "nodeID in hex") + cmd.PersistentFlags().StringVar(&dhtNodeID, "nodeID", "", "nodeID in hex") cmd.PersistentFlags().IntVar(&dhtPort, "port", 4567, "Port to start DHT on") - cmd.PersistentFlags().IntVar(&rpcPort, "rpc_port", 1234, "Port to listen for rpc commands on") + cmd.PersistentFlags().IntVar(&dhtRpcPort, "rpcPort", 0, "Port to listen for rpc commands on") + cmd.PersistentFlags().StringSliceVar(&dhtSeeds, "seeds", []string{}, "Addresses of seed nodes") rootCmd.AddCommand(cmd) } @@ -60,30 +46,27 @@ func dhtCmd(cmd *cobra.Command, args []string) { interruptChan := make(chan os.Signal, 1) signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) <-interruptChan - log.Printf("shutting down bootstrap node") node.Shutdown() } else { - nodeIDStr := cmd.Flag("nodeID").Value.String() - nodeID := bits.Bitmap{} - if nodeIDStr == "" { - nodeID = bits.Rand() - } else { - nodeID = bits.FromHexP(nodeIDStr) + nodeID := bits.Rand() + if dhtNodeID != "" { + nodeID = bits.FromHexP(dhtNodeID) } log.Println(nodeID.String()) - node := dht.NewBootstrapNode(nodeID, 1*time.Millisecond, 1*time.Minute) - listener, err := net.ListenPacket(dht.Network, "127.0.0.1:"+strconv.Itoa(dhtPort)) - checkErr(err) - conn := listener.(*net.UDPConn) - err = node.Connect(conn) - checkErr(err) - log.Println("started node") - _, _, err = dht.FindContacts(&node.Node, nodeID.Sub(bits.FromBigP(big.NewInt(1))), false, nil) - rpcServer := dht.RunRPCServer("127.0.0.1:"+strconv.Itoa(rpcPort), "/", node) + + dhtConf := dht.NewStandardConfig() + dhtConf.Address = "0.0.0.0:" + strconv.Itoa(dhtPort) + dhtConf.RPCPort = dhtRpcPort + if len(dhtSeeds) > 0 { + dhtConf.SeedNodes = dhtSeeds + } + + d := dht.New(dhtConf) + d.Start() + interruptChan := make(chan os.Signal, 1) signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM, syscall.SIGINT) <-interruptChan - rpcServer.Wg.Done() - node.Shutdown() + d.Shutdown() } } diff --git a/dht/bootstrap.go b/dht/bootstrap.go index 20c614d..dd649a7 100644 --- a/dht/bootstrap.go +++ b/dht/bootstrap.go @@ -48,10 +48,6 @@ func (b *BootstrapNode) Add(c Contact) { b.upsert(c) } -func (b *BootstrapNode) AddKnownNode(c Contact) { - b.Node.rt.Update(c) -} - // Connect connects to the given connection and starts any background threads necessary func (b *BootstrapNode) Connect(conn UDPConn) error { err := b.Node.Connect(conn) @@ -59,7 +55,7 @@ func (b *BootstrapNode) Connect(conn UDPConn) error { return err } - log.Debugf("[%s] bootstrap: node connected", b.id.HexShort()) + log.Infof("[%s] bootstrap: node connected", b.id.HexShort()) go func() { t := time.NewTicker(b.checkInterval / 5) diff --git a/dht/config.go b/dht/config.go index 3e4a49c..21a1900 100644 --- a/dht/config.go +++ b/dht/config.go @@ -50,6 +50,8 @@ type Config struct { PrintState time.Duration // the port that clients can use to download blobs using the LBRY peer protocol PeerProtocolPort int + // if nonzero, an RPC server will listen to requests on this port and respond to them + RPCPort int // the time after which the original publisher must reannounce a key/value pair ReannounceTime time.Duration // send at most this many announces per second diff --git a/dht/dht.go b/dht/dht.go index 9e5d3cd..b5b2965 100644 --- a/dht/dht.go +++ b/dht/dht.go @@ -100,6 +100,14 @@ func (dht *DHT) Start() error { dht.grp.Done() }() + if dht.conf.RPCPort > 0 { + dht.grp.Add(1) + go func() { + dht.runRPCServer(dht.conf.RPCPort) + dht.grp.Done() + }() + } + return nil } diff --git a/dht/node.go b/dht/node.go index c3d3738..55f4724 100644 --- a/dht/node.go +++ b/dht/node.go @@ -467,3 +467,8 @@ func (n *Node) startRoutingTableGrooming() { func (n *Node) Store(hash bits.Bitmap, c Contact) { n.store.Upsert(hash, c) } + +//AddKnownNode adds a known-good node to the routing table +func (n *Node) AddKnownNode(c Contact) { + n.rt.Update(c) +} diff --git a/dht/node_rpc.go b/dht/node_rpc.go deleted file mode 100644 index a04349d..0000000 --- a/dht/node_rpc.go +++ /dev/null @@ -1,221 +0,0 @@ -package dht - -import ( - "errors" - "net" - "net/http" - "sync" - - "github.com/gorilla/mux" - "github.com/gorilla/rpc" - "github.com/gorilla/rpc/json" - "github.com/lbryio/reflector.go/dht/bits" -) - -type NodeRPCServer struct { - Wg sync.WaitGroup - Node *BootstrapNode -} - -var mut sync.Mutex -var rpcServer *NodeRPCServer - -type NodeRPC int - -type PingArgs struct { - NodeID string - IP string - Port int -} - -type PingResult string - -func (n *NodeRPC) Ping(r *http.Request, args *PingArgs, result *PingResult) error { - if rpcServer == nil { - return errors.New("no node set up") - } - toQuery, err := bits.FromHex(args.NodeID) - if err != nil { - return err - } - c := Contact{ID: toQuery, IP: net.ParseIP(args.IP), Port: args.Port} - req := Request{Method: "ping"} - nodeResponse := rpcServer.Node.Send(c, req) - if nodeResponse != nil { - *result = PingResult(nodeResponse.Data) - } - return nil -} - -type FindArgs struct { - Key string - NodeID string - IP string - Port int -} - -type ContactResponse struct { - NodeID string - IP string - Port int -} - -type FindNodeResult []ContactResponse - -func (n *NodeRPC) FindNode(r *http.Request, args *FindArgs, result *FindNodeResult) error { - if rpcServer == nil { - return errors.New("no node set up") - } - key, err := bits.FromHex(args.Key) - if err != nil { - return err - } - toQuery, err := bits.FromHex(args.NodeID) - if err != nil { - return err - } - c := Contact{ID: toQuery, IP: net.ParseIP(args.IP), Port: args.Port} - req := Request{Arg: &key, Method: "findNode"} - nodeResponse := rpcServer.Node.Send(c, req) - contacts := []ContactResponse{} - if nodeResponse != nil && nodeResponse.Contacts != nil { - for _, foundContact := range nodeResponse.Contacts { - contacts = append(contacts, ContactResponse{foundContact.ID.Hex(), foundContact.IP.String(), foundContact.Port}) - } - } - *result = FindNodeResult(contacts) - return nil -} - -type FindValueResult struct { - Contacts []ContactResponse - Value string -} - -func (n *NodeRPC) FindValue(r *http.Request, args *FindArgs, result *FindValueResult) error { - if rpcServer == nil { - return errors.New("no node set up") - } - key, err := bits.FromHex(args.Key) - if err != nil { - return err - } - toQuery, err := bits.FromHex(args.NodeID) - if err != nil { - return err - } - c := Contact{ID: toQuery, IP: net.ParseIP(args.IP), Port: args.Port} - req := Request{Arg: &key, Method: "findValue"} - nodeResponse := rpcServer.Node.Send(c, req) - contacts := []ContactResponse{} - if nodeResponse != nil && nodeResponse.FindValueKey != "" { - *result = FindValueResult{Value: nodeResponse.FindValueKey} - return nil - } else if nodeResponse != nil && nodeResponse.Contacts != nil { - for _, foundContact := range nodeResponse.Contacts { - contacts = append(contacts, ContactResponse{foundContact.ID.Hex(), foundContact.IP.String(), foundContact.Port}) - } - *result = FindValueResult{Contacts: contacts} - return nil - } - return errors.New("not sure what happened") -} - -type IterativeFindValueArgs struct { - Key string -} - -type IterativeFindValueResult struct { - Contacts []ContactResponse - FoundValue bool -} - -func (n *NodeRPC) IterativeFindValue(r *http.Request, args *IterativeFindValueArgs, result *IterativeFindValueResult) error { - if rpcServer == nil { - return errors.New("no node set up") - } - key, err := bits.FromHex(args.Key) - if err != nil { - return err - } - foundContacts, found, err := FindContacts(&rpcServer.Node.Node, key, false, nil) - contacts := []ContactResponse{} - result.FoundValue = found - for _, foundContact := range foundContacts { - contacts = append(contacts, ContactResponse{foundContact.ID.Hex(), foundContact.IP.String(), foundContact.Port}) - } - result.Contacts = contacts - return nil -} - -type BucketResponse struct { - Start string - End string - Count int - Contacts []ContactResponse -} - -type RoutingTableResponse struct { - NodeID string - Count int - Buckets []BucketResponse -} - -type GetRoutingTableArgs struct{} - -func (n *NodeRPC) GetRoutingTable(r *http.Request, args *GetRoutingTableArgs, result *RoutingTableResponse) error { - if rpcServer == nil { - return errors.New("no node set up") - } - result.NodeID = rpcServer.Node.id.String() - result.Count = len(rpcServer.Node.rt.buckets) - for _, b := range rpcServer.Node.rt.buckets { - bucketInfo := []ContactResponse{} - for _, c := range b.Contacts() { - bucketInfo = append(bucketInfo, ContactResponse{c.ID.String(), c.IP.String(), c.Port}) - } - result.Buckets = append(result.Buckets, BucketResponse{ - Start: b.Range.Start.String(), End: b.Range.End.String(), Contacts: bucketInfo, - Count: b.Len(), - }) - } - return nil -} - -type AddKnownNodeResponse struct{} - -func (n *NodeRPC) AddKnownNode(r *http.Request, args *ContactResponse, result *AddKnownNodeResponse) error { - if rpcServer == nil { - return errors.New("no node set up") - } - rpcServer.Node.AddKnownNode( - Contact{ - bits.FromHexP(args.NodeID), - net.ParseIP(args.IP), args.Port, 0, - }) - return nil -} - -func RunRPCServer(address, rpcPath string, node *BootstrapNode) NodeRPCServer { - mut.Lock() - defer mut.Unlock() - rpcServer = &NodeRPCServer{ - Wg: sync.WaitGroup{}, - Node: node, - } - rpcServer.Wg.Add(1) - go func() { - s := rpc.NewServer() - s.RegisterCodec(json.NewCodec(), "application/json") - s.RegisterCodec(json.NewCodec(), "application/json;charset=UTF-8") - node := new(NodeRPC) - s.RegisterService(node, "") - r := mux.NewRouter() - r.Handle(rpcPath, s) - server := &http.Server{Addr: address, Handler: r} - log.Println("rpc listening on " + address) - server.ListenAndServe() - }() - - return *rpcServer -} diff --git a/dht/rpc.go b/dht/rpc.go new file mode 100644 index 0000000..2857df8 --- /dev/null +++ b/dht/rpc.go @@ -0,0 +1,176 @@ +package dht + +import ( + "context" + "net" + "net/http" + "strconv" + "sync" + + "github.com/lbryio/lbry.go/errors" + "github.com/lbryio/reflector.go/dht/bits" + + "github.com/gorilla/mux" + rpc2 "github.com/gorilla/rpc/v2" + "github.com/gorilla/rpc/v2/json" +) + +type rpcReceiver struct { + dht *DHT +} + +type RpcPingArgs struct { + Address string +} + +func (rpc *rpcReceiver) Ping(r *http.Request, args *RpcPingArgs, result *string) error { + if args.Address == "" { + return errors.Err("no address given") + } + + err := rpc.dht.Ping(args.Address) + if err != nil { + return err + } + + *result = pingSuccessResponse + return nil +} + +type RpcFindArgs struct { + Key string + NodeID string + IP string + Port int +} + +func (rpc *rpcReceiver) FindNode(r *http.Request, args *RpcFindArgs, result *[]Contact) error { + key, err := bits.FromHex(args.Key) + if err != nil { + return err + } + + toQuery, err := bits.FromHex(args.NodeID) + if err != nil { + return err + } + + c := Contact{ID: toQuery, IP: net.ParseIP(args.IP), Port: args.Port} + req := Request{Method: findNodeMethod, Arg: &key} + + nodeResponse := rpc.dht.node.Send(c, req) + if nodeResponse != nil && nodeResponse.Contacts != nil { + *result = nodeResponse.Contacts + } + return nil +} + +type RpcFindValueResult struct { + Contacts []Contact + Value string +} + +func (rpc *rpcReceiver) FindValue(r *http.Request, args *RpcFindArgs, result *RpcFindValueResult) error { + key, err := bits.FromHex(args.Key) + if err != nil { + return err + } + toQuery, err := bits.FromHex(args.NodeID) + if err != nil { + return err + } + c := Contact{ID: toQuery, IP: net.ParseIP(args.IP), Port: args.Port} + req := Request{Arg: &key, Method: findValueMethod} + + nodeResponse := rpc.dht.node.Send(c, req) + if nodeResponse != nil && nodeResponse.FindValueKey != "" { + *result = RpcFindValueResult{Value: nodeResponse.FindValueKey} + return nil + } + if nodeResponse != nil && nodeResponse.Contacts != nil { + *result = RpcFindValueResult{Contacts: nodeResponse.Contacts} + return nil + } + + return errors.Err("not sure what happened") +} + +type RpcIterativeFindValueArgs struct { + Key string +} + +type RpcIterativeFindValueResult struct { + Contacts []Contact + FoundValue bool +} + +func (rpc *rpcReceiver) IterativeFindValue(r *http.Request, args *RpcIterativeFindValueArgs, result *RpcIterativeFindValueResult) error { + key, err := bits.FromHex(args.Key) + if err != nil { + return err + } + foundContacts, found, err := FindContacts(rpc.dht.node, key, false, nil) + result.Contacts = foundContacts + result.FoundValue = found + return nil +} + +type RpcBucketResponse struct { + Start string + End string + NumContacts int + Contacts []Contact +} + +type RpcRoutingTableResponse struct { + NodeID string + NumBuckets int + Buckets []RpcBucketResponse +} + +func (rpc *rpcReceiver) GetRoutingTable(r *http.Request, args *struct{}, result *RpcRoutingTableResponse) error { + result.NodeID = rpc.dht.node.id.String() + result.NumBuckets = len(rpc.dht.node.rt.buckets) + for _, b := range rpc.dht.node.rt.buckets { + result.Buckets = append(result.Buckets, RpcBucketResponse{ + Start: b.Range.Start.String(), + End: b.Range.End.String(), + NumContacts: b.Len(), + Contacts: b.Contacts(), + }) + } + return nil +} + +func (rpc *rpcReceiver) AddKnownNode(r *http.Request, args *Contact, result *string) error { + rpc.dht.node.AddKnownNode(*args) + return nil +} + +func (dht *DHT) runRPCServer(port int) { + addr := "0.0.0.0:" + strconv.Itoa(port) + + s := rpc2.NewServer() + s.RegisterCodec(json.NewCodec(), "application/json") + s.RegisterCodec(json.NewCodec(), "application/json;charset=UTF-8") + s.RegisterService(&rpcReceiver{dht: dht}, "rpc") + + handler := mux.NewRouter() + handler.Handle("/", s) + server := &http.Server{Addr: addr, Handler: handler} + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + log.Printf("RPC server listening on %s", addr) + err := server.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + log.Error(err) + } + }() + + <-dht.grp.Ch() + server.Shutdown(context.Background()) + wg.Wait() +}