From 620a5d7d4894308aa672cf43274558d9c92ca9a0 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Fri, 13 Jul 2018 13:31:54 -0400 Subject: [PATCH] handle peer port correctly --- cmd/dht.go | 18 +++++++----- dht/contact.go | 39 +++++++++++++++----------- dht/contact_test.go | 6 ++-- dht/dht.go | 1 + dht/dht_test.go | 12 ++++++++ dht/message.go | 8 +++--- dht/message_test.go | 4 +-- dht/node.go | 2 +- dht/node_test.go | 2 +- dht/routing_table_test.go | 59 ++++++++++++++++++++------------------- 10 files changed, 87 insertions(+), 64 deletions(-) diff --git a/cmd/dht.go b/cmd/dht.go index 2ba46b7..b277af4 100644 --- a/cmd/dht.go +++ b/cmd/dht.go @@ -1,26 +1,28 @@ package cmd import ( + "log" + "math/big" "net" + "net/http" "os" "os/signal" "strconv" "syscall" "time" + "github.com/lbryio/reflector.go/dht" "github.com/lbryio/reflector.go/dht/bits" + "github.com/spf13/cobra" - "log" - "net/http" - "math/big" ) type NodeRPC string type PingArgs struct { - nodeID string + nodeID string address string - port int + port int } type PingResult string @@ -76,8 +78,10 @@ func dhtCmd(cmd *cobra.Command, args []string) { log.Println("started node") node.AddKnownNode( dht.Contact{ - bits.FromHexP("62c8ad9fb40a16062e884a63cd81f47b94604446319663d1334e1734dcefc8874b348ec683225e4852017a846e07d94e"), - net.ParseIP("34.231.152.182"), 4444, + bits.FromHexP("62c8ad9fb40a16062e884a63cd81f47b94604446319663d1334e1734dcefc8874b348ec683225e4852017a846e07d94e"), + net.ParseIP("34.231.152.182"), + 4444, + 3333, }) _, _, err = dht.FindContacts(&node.Node, nodeID.Sub(bits.FromBigP(big.NewInt(1))), false, nil) rpcServer := dht.RunRPCServer(":1234", "/", node) diff --git a/dht/contact.go b/dht/contact.go index 32d5881..cd4fb88 100644 --- a/dht/contact.go +++ b/dht/contact.go @@ -3,6 +3,7 @@ package dht import ( "bytes" "net" + "strconv" "github.com/lbryio/lbry.go/errors" "github.com/lbryio/reflector.go/dht/bits" @@ -12,44 +13,47 @@ import ( // TODO: if routing table is ever empty (aka the node is isolated), it should re-bootstrap -// TODO: use a tree with bucket splitting instead of a fixed bucket list. include jack's optimization (see link in commit mesg) -// https://github.com/lbryio/lbry/pull/1211/commits/341b27b6d21ac027671d42458826d02735aaae41 - -// Contact is a type representation of another node that a specific node is in communication with. +// Contact contains information for contacting another node on the network type Contact struct { - ID bits.Bitmap - IP net.IP - Port int + ID bits.Bitmap + IP net.IP + Port int + PeerPort int } -// Equals returns T/F if two contacts are the same. +// Equals returns true if two contacts are the same. func (c Contact) Equals(other Contact, checkID bool) bool { return c.IP.Equal(other.IP) && c.Port == other.Port && (!checkID || c.ID == other.ID) } -// Addr returns the UPD Address of the contact. +// Addr returns the address of the contact. func (c Contact) Addr() *net.UDPAddr { return &net.UDPAddr{IP: c.IP, Port: c.Port} } -// String returns the concatenated short hex encoded string of its ID + @ + string represention of its UPD Address. +// String returns a short string representation of the contact func (c Contact) String() string { - return c.ID.HexShort() + "@" + c.Addr().String() + str := c.ID.HexShort() + "@" + c.Addr().String() + if c.PeerPort != 0 { + str += "(" + strconv.Itoa(c.PeerPort) + ")" + } + return str } -// MarshalCompact returns the compact byte slice representation of a contact. +// MarshalCompact returns a compact byteslice representation of the contact +// NOTE: The compact representation always uses the tcp PeerPort, not the udp Port. This is dumb, but that's how the python daemon does it func (c Contact) MarshalCompact() ([]byte, error) { if c.IP.To4() == nil { return nil, errors.Err("ip not set") } - if c.Port < 0 || c.Port > 65535 { + if c.PeerPort < 0 || c.PeerPort > 65535 { return nil, errors.Err("invalid port") } var buf bytes.Buffer buf.Write(c.IP.To4()) - buf.WriteByte(byte(c.Port >> 8)) - buf.WriteByte(byte(c.Port)) + buf.WriteByte(byte(c.PeerPort >> 8)) + buf.WriteByte(byte(c.PeerPort)) buf.Write(c.ID[:]) if buf.Len() != compactNodeInfoLength { @@ -59,13 +63,14 @@ func (c Contact) MarshalCompact() ([]byte, error) { return buf.Bytes(), nil } -// UnmarshalCompact unmarshals the compact byte slice representation of a contact. +// UnmarshalCompact unmarshals the compact byteslice representation of a contact. +// NOTE: The compact representation always uses the tcp PeerPort, not the udp Port. This is dumb, but that's how the python daemon does it func (c *Contact) UnmarshalCompact(b []byte) error { if len(b) != compactNodeInfoLength { return errors.Err("invalid compact length") } c.IP = net.IPv4(b[0], b[1], b[2], b[3]).To4() - c.Port = int(uint16(b[5]) | uint16(b[4])<<8) + c.PeerPort = int(uint16(b[5]) | uint16(b[4])<<8) c.ID = bits.FromBytesP(b[6:]) return nil } diff --git a/dht/contact_test.go b/dht/contact_test.go index f3c9d93..cfe5abc 100644 --- a/dht/contact_test.go +++ b/dht/contact_test.go @@ -10,9 +10,9 @@ import ( func TestCompactEncoding(t *testing.T) { c := Contact{ - ID: bits.FromHexP("1c8aff71b99462464d9eeac639595ab99664be3482cb91a29d87467515c7d9158fe72aa1f1582dab07d8f8b5db277f41"), - IP: net.ParseIP("1.2.3.4"), - Port: int(55<<8 + 66), + ID: bits.FromHexP("1c8aff71b99462464d9eeac639595ab99664be3482cb91a29d87467515c7d9158fe72aa1f1582dab07d8f8b5db277f41"), + IP: net.ParseIP("1.2.3.4"), + PeerPort: int(55<<8 + 66), } var compact []byte diff --git a/dht/dht.go b/dht/dht.go index f8e328c..a92d373 100644 --- a/dht/dht.go +++ b/dht/dht.go @@ -318,6 +318,7 @@ func (dht *DHT) startReannouncer() { func (dht *DHT) storeOnNode(hash bits.Bitmap, c Contact) { // self-store if dht.contact.ID == c.ID { + c.PeerPort = dht.conf.PeerProtocolPort dht.node.Store(hash, c) return } diff --git a/dht/dht_test.go b/dht/dht_test.go index 7ee2ecf..918c2db 100644 --- a/dht/dht_test.go +++ b/dht/dht_test.go @@ -10,6 +10,10 @@ import ( ) func TestNodeFinder_FindNodes(t *testing.T) { + if testing.Short() { + t.Skip("skipping slow nodeFinder test") + } + bs, dhts := TestingCreateDHT(t, 3, true, false) defer func() { for i := range dhts { @@ -73,6 +77,10 @@ func TestNodeFinder_FindNodes_NoBootstrap(t *testing.T) { } func TestNodeFinder_FindValue(t *testing.T) { + if testing.Short() { + t.Skip("skipping slow nodeFinder test") + } + bs, dhts := TestingCreateDHT(t, 3, true, false) defer func() { for i := range dhts { @@ -104,6 +112,10 @@ func TestNodeFinder_FindValue(t *testing.T) { } func TestDHT_LargeDHT(t *testing.T) { + if testing.Short() { + t.Skip("skipping large DHT test") + } + nodes := 100 bs, dhts := TestingCreateDHT(t, nodes, true, true) defer func() { diff --git a/dht/message.go b/dht/message.go index d10f1d5..31ad475 100644 --- a/dht/message.go +++ b/dht/message.go @@ -44,7 +44,7 @@ const ( protocolVersionField = "protocolVersion" ) -// Message is an extension of the bencode marshalling interface for serialized message passing. +// Message is a DHT message type Message interface { bencode.Marshaler } @@ -82,7 +82,7 @@ func newMessageID() messageID { return m } -// Request represents the structured request from one node to another. +// Request represents a DHT request message type Request struct { ID messageID NodeID bits.Bitmap @@ -261,7 +261,7 @@ func (s *storeArgs) UnmarshalBencode(b []byte) error { return nil } -// Response represents the structured response one node returns to another. +// Response represents a DHT response message type Response struct { ID messageID NodeID bits.Bitmap @@ -416,7 +416,7 @@ func (r *Response) UnmarshalBencode(b []byte) error { return nil } -// Error represents an error message that is returned from one node to another in communication. +// Error represents a DHT error response type Error struct { ID messageID NodeID bits.Bitmap diff --git a/dht/message_test.go b/dht/message_test.go index 71812eb..040f5a1 100644 --- a/dht/message_test.go +++ b/dht/message_test.go @@ -103,9 +103,9 @@ func TestBencodeFindValueResponse(t *testing.T) { ID: newMessageID(), NodeID: bits.Rand(), FindValueKey: bits.Rand().RawString(), - Token: "arst", + Token: "arstarstarst", Contacts: []Contact{ - {ID: bits.Rand(), IP: net.IPv4(1, 2, 3, 4).To4(), Port: 5678}, + {ID: bits.Rand(), IP: net.IPv4(1, 2, 3, 4).To4(), PeerPort: 8765}, }, } diff --git a/dht/node.go b/dht/node.go index 0c8f95a..c3d3738 100644 --- a/dht/node.go +++ b/dht/node.go @@ -236,7 +236,7 @@ func (n *Node) handleRequest(addr *net.UDPAddr, request Request) { // TODO: we should be sending the IP in the request, not just using the sender's IP // TODO: should we be using StoreArgs.NodeID or StoreArgs.Value.LbryID ??? if n.tokens.Verify(request.StoreArgs.Value.Token, request.NodeID, addr) { - n.Store(request.StoreArgs.BlobHash, Contact{ID: request.StoreArgs.NodeID, IP: addr.IP, Port: request.StoreArgs.Value.Port}) + n.Store(request.StoreArgs.BlobHash, Contact{ID: request.StoreArgs.NodeID, IP: addr.IP, Port: addr.Port, PeerPort: request.StoreArgs.Value.Port}) err := n.sendMessage(addr, Response{ID: request.ID, NodeID: n.id, Data: storeSuccessResponse}) if err != nil { diff --git a/dht/node_test.go b/dht/node_test.go index 26d0cc7..2a167b5 100644 --- a/dht/node_test.go +++ b/dht/node_test.go @@ -289,7 +289,7 @@ func TestFindValueExisting(t *testing.T) { messageID := newMessageID() valueToFind := bits.Rand() - nodeToFind := Contact{ID: bits.Rand(), IP: net.ParseIP("1.2.3.4"), Port: 1286} + nodeToFind := Contact{ID: bits.Rand(), IP: net.ParseIP("1.2.3.4"), PeerPort: 1286} dht.node.store.Upsert(valueToFind, nodeToFind) dht.node.store.Upsert(valueToFind, nodeToFind) dht.node.store.Upsert(valueToFind, nodeToFind) diff --git a/dht/routing_table_test.go b/dht/routing_table_test.go index 89064fd..2cc25b5 100644 --- a/dht/routing_table_test.go +++ b/dht/routing_table_test.go @@ -9,6 +9,7 @@ import ( "testing" "github.com/lbryio/reflector.go/dht/bits" + "github.com/sebdah/goldie" ) @@ -59,7 +60,7 @@ func TestBucket_Split(t *testing.T) { } for i, testCase := range tests { - rt.Update(Contact{testCase.id, net.ParseIP("127.0.0.1"), 8000 + i}) + rt.Update(Contact{testCase.id, net.ParseIP("127.0.0.1"), 8000 + i, 0}) if len(rt.buckets) != testCase.expectedBucketCount { t.Errorf("failed test case %s. there should be %d buckets, got %d", testCase.name, testCase.expectedBucketCount, len(rt.buckets)) @@ -126,25 +127,25 @@ func TestBucket_Split_KthClosest_DoSplit(t *testing.T) { rt := newRoutingTable(bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")) // add 4 low IDs - rt.Update(Contact{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001"), net.ParseIP("127.0.0.1"), 8001}) - rt.Update(Contact{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002"), net.ParseIP("127.0.0.1"), 8002}) - rt.Update(Contact{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003"), net.ParseIP("127.0.0.1"), 8003}) - rt.Update(Contact{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000004"), net.ParseIP("127.0.0.1"), 8004}) + rt.Update(Contact{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001"), net.ParseIP("127.0.0.1"), 8001, 0}) + rt.Update(Contact{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002"), net.ParseIP("127.0.0.1"), 8002, 0}) + rt.Update(Contact{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003"), net.ParseIP("127.0.0.1"), 8003, 0}) + rt.Update(Contact{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000004"), net.ParseIP("127.0.0.1"), 8004, 0}) // add 4 high IDs - rt.Update(Contact{bits.FromHexP("800000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8001}) - rt.Update(Contact{bits.FromHexP("900000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8002}) - rt.Update(Contact{bits.FromHexP("a00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8003}) - rt.Update(Contact{bits.FromHexP("b00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8004}) + rt.Update(Contact{bits.FromHexP("800000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8001, 0}) + rt.Update(Contact{bits.FromHexP("900000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8002, 0}) + rt.Update(Contact{bits.FromHexP("a00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8003, 0}) + rt.Update(Contact{bits.FromHexP("b00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8004, 0}) // split the bucket and fill the high bucket - rt.Update(Contact{bits.FromHexP("c00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8005}) - rt.Update(Contact{bits.FromHexP("d00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8006}) - rt.Update(Contact{bits.FromHexP("e00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8007}) - rt.Update(Contact{bits.FromHexP("f00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8008}) + rt.Update(Contact{bits.FromHexP("c00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8005, 0}) + rt.Update(Contact{bits.FromHexP("d00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8006, 0}) + rt.Update(Contact{bits.FromHexP("e00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8007, 0}) + rt.Update(Contact{bits.FromHexP("f00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8008, 0}) // add a high ID. it should split because the high ID is closer than the Kth closest ID - rt.Update(Contact{bits.FromHexP("910000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.1"), 8009}) + rt.Update(Contact{bits.FromHexP("910000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.1"), 8009, 0}) if len(rt.buckets) != 3 { t.Errorf("expected 3 buckets, got %d", len(rt.buckets)) @@ -158,25 +159,25 @@ func TestBucket_Split_KthClosest_DontSplit(t *testing.T) { rt := newRoutingTable(bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")) // add 4 low IDs - rt.Update(Contact{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001"), net.ParseIP("127.0.0.1"), 8001}) - rt.Update(Contact{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002"), net.ParseIP("127.0.0.1"), 8002}) - rt.Update(Contact{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003"), net.ParseIP("127.0.0.1"), 8003}) - rt.Update(Contact{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000004"), net.ParseIP("127.0.0.1"), 8004}) + rt.Update(Contact{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001"), net.ParseIP("127.0.0.1"), 8001, 0}) + rt.Update(Contact{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002"), net.ParseIP("127.0.0.1"), 8002, 0}) + rt.Update(Contact{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003"), net.ParseIP("127.0.0.1"), 8003, 0}) + rt.Update(Contact{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000004"), net.ParseIP("127.0.0.1"), 8004, 0}) // add 4 high IDs - rt.Update(Contact{bits.FromHexP("800000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8001}) - rt.Update(Contact{bits.FromHexP("900000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8002}) - rt.Update(Contact{bits.FromHexP("a00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8003}) - rt.Update(Contact{bits.FromHexP("b00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8004}) + rt.Update(Contact{bits.FromHexP("800000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8001, 0}) + rt.Update(Contact{bits.FromHexP("900000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8002, 0}) + rt.Update(Contact{bits.FromHexP("a00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8003, 0}) + rt.Update(Contact{bits.FromHexP("b00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8004, 0}) // split the bucket and fill the high bucket - rt.Update(Contact{bits.FromHexP("c00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8005}) - rt.Update(Contact{bits.FromHexP("d00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8006}) - rt.Update(Contact{bits.FromHexP("e00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8007}) - rt.Update(Contact{bits.FromHexP("f00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8008}) + rt.Update(Contact{bits.FromHexP("c00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8005, 0}) + rt.Update(Contact{bits.FromHexP("d00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8006, 0}) + rt.Update(Contact{bits.FromHexP("e00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8007, 0}) + rt.Update(Contact{bits.FromHexP("f00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8008, 0}) // add a really high ID. this should not split because its not closer than the Kth closest ID - rt.Update(Contact{bits.FromHexP("ffff00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.1"), 8009}) + rt.Update(Contact{bits.FromHexP("ffff00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.1"), 8009, 0}) if len(rt.buckets) != 2 { t.Errorf("expected 2 buckets, got %d", len(rt.buckets)) @@ -191,8 +192,8 @@ func TestRoutingTable_GetClosest(t *testing.T) { n2 := bits.FromHexP("FFFFFFF00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000") n3 := bits.FromHexP("111111110000000000000000000000000000000000000000000000000000000000000000000000000000000000000000") rt := newRoutingTable(n1) - rt.Update(Contact{n2, net.ParseIP("127.0.0.1"), 8001}) - rt.Update(Contact{n3, net.ParseIP("127.0.0.1"), 8002}) + rt.Update(Contact{n2, net.ParseIP("127.0.0.1"), 8001, 0}) + rt.Update(Contact{n3, net.ParseIP("127.0.0.1"), 8002, 0}) contacts := rt.GetClosest(bits.FromHexP("222222220000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 1) if len(contacts) != 1 {