diff --git a/dht/routing_table.go b/dht/routing_table.go index ec851be..9a109ea 100644 --- a/dht/routing_table.go +++ b/dht/routing_table.go @@ -3,17 +3,16 @@ package dht import ( "encoding/json" "fmt" + "math/big" "net" "sort" "strconv" "strings" "sync" "time" - "github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/stop" "github.com/lbryio/reflector.go/dht/bits" - "math/big" ) // TODO: if routing table is ever empty (aka the node is isolated), it should re-bootstrap @@ -25,7 +24,12 @@ import ( type peer struct { Contact Contact LastActivity time.Time + // LastReplied time.Time + // LastRequested time.Time + // LastFailure time.Time + // SecondLastFailure time.Time NumFailures int + //, // // @@ -56,7 +60,7 @@ type bucket struct { lock *sync.RWMutex peers []peer lastUpdate time.Time - bucketRange *bits.Range + bucketRange bits.Range } // Len returns the number of peers in the bucket @@ -163,7 +167,7 @@ func (rt *routingTable) reset() { rt.buckets = append(rt.buckets, bucket{ peers: make([]peer, 0, bucketSize), lock: &sync.RWMutex{}, - bucketRange: &bits.Range{ + bucketRange: bits.Range{ Start: bits.FromBigP(start), End: bits.FromBigP(end), }, @@ -179,7 +183,7 @@ func (rt *routingTable) BucketInfo() string { for j, c := range contacts { s[j] = c.ID.HexShort() } - bucketInfo = append(bucketInfo, fmt.Sprintf("Bucket %d: (%d) %s", i, len(contacts), strings.Join(s, ", "))) + bucketInfo = append(bucketInfo, fmt.Sprintf("bucket %d: (%d) %s", i, len(contacts), strings.Join(s, ", "))) } } if len(bucketInfo) == 0 { @@ -242,7 +246,7 @@ func (rt *routingTable) Count() int { func (rt *routingTable) BucketRanges() []bits.Range { ranges := make([]bits.Range, len(rt.buckets)) for i, b := range rt.buckets { - ranges[i] = *b.bucketRange + ranges[i] = b.bucketRange } return ranges } @@ -280,59 +284,122 @@ func (rt *routingTable) shouldSplit(target bits.Bitmap) bool { } func (rt *routingTable) insertContact(c Contact) { - if len(rt.buckets[rt.bucketNumFor(c.ID)].peers) < bucketSize { + bucketIndex := rt.bucketNumFor(c.ID) + peersInBucket := int(len(rt.buckets[bucketIndex].peers)) + if peersInBucket < bucketSize { rt.buckets[rt.bucketNumFor(c.ID)].UpdateContact(c, true) - } else if rt.shouldSplit(c.ID) { - rt.recursiveInsertContact(c) + } else if peersInBucket >= bucketSize && rt.shouldSplit(c.ID) { + rt.splitBucket(bucketIndex) + rt.insertContact(c) } + rt.popEmptyBuckets() } -func (rt *routingTable) recursiveInsertContact(c Contact) { - bucketIndex := rt.bucketNumFor(c.ID) +func (rt *routingTable) splitBucket(bucketIndex int) { + b := rt.buckets[bucketIndex] + min := b.bucketRange.Start.Big() max := b.bucketRange.End.Big() - - midpoint := max.Sub(max, min) + midpoint := &big.Int{} + midpoint.Sub(max, min) midpoint.Div(midpoint, big.NewInt(2)) + midpoint.Add(midpoint, min) + midpointPlusOne := &big.Int{} + midpointPlusOne.Add(midpointPlusOne, min) + midpointPlusOne.Add(midpoint, big.NewInt(1)) - // re-size the bucket to be split - b.bucketRange.Start = bits.FromBigP(min) - b.bucketRange.End = bits.FromBigP(midpoint.Sub(midpoint, big.NewInt(1))) + first_half := rt.buckets[:bucketIndex+1] - movedPeers := []peer{} - resizedPeers := []peer{} - - // set the re-sized bucket to only have peers still in range - for _, p := range b.peers { - if rt.bucketNumFor(p.Contact.ID) != bucketIndex { - movedPeers = append(movedPeers, p) - } else { - resizedPeers = append(resizedPeers, p) - } + second_half := []bucket{} + for i := bucketIndex + 1; i < len(rt.buckets); i++ { + second_half = append(second_half, rt.buckets[i]) } - b.peers = resizedPeers - // add the new bucket - insert := bucket{ + copiedPeers := []peer{} + copy(copiedPeers, b.peers) + b.peers = []peer{} + + rt.buckets = []bucket{} + for _, i := range first_half { + rt.buckets = append(rt.buckets, i) + } + newBucket := bucket{ peers: make([]peer, 0, bucketSize), lock: &sync.RWMutex{}, - bucketRange: &bits.Range{ - Start: bits.FromBigP(midpoint), + bucketRange: bits.Range{ + Start: bits.FromBigP(midpointPlusOne), End: bits.FromBigP(max), }, } - rt.buckets = append(rt.buckets[:bucketIndex], append([]bucket{insert}, rt.buckets[bucketIndex:]...)...) + rt.buckets = append(rt.buckets, newBucket) + for _, i := range second_half { + rt.buckets = append(rt.buckets, i) + } + // re-size the bucket to be split + rt.buckets[bucketIndex].bucketRange.Start = bits.FromBigP(min) + rt.buckets[bucketIndex].bucketRange.End = bits.FromBigP(midpoint) - // re-insert the contacts that where out of range of the split bucket - for _, p := range movedPeers { + // re-insert the contacts that were in the re-sized bucket + for _, p := range copiedPeers { rt.insertContact(p.Contact) } - - // insert the new contact - rt.insertContact(c) } +func (rt *routingTable) printBucketInfo() { + for i, b := range rt.buckets { + fmt.Printf("bucket %d, %d contacts\n", i + 1, len(b.peers)) + fmt.Printf(" start : %s\n", b.bucketRange.Start.String()) + fmt.Printf(" stop : %s\n", b.bucketRange.End.String()) + fmt.Println("") + } +} + +func (rt *routingTable) popBucket(bucketIndex int) { + canGoLower := bucketIndex >= 1 + canGoHigher := len(rt.buckets) - 1 > bucketIndex + + if canGoLower && !canGoHigher { + // raise the end of bucket[bucketIndex-1] + rt.buckets[bucketIndex-1].bucketRange.End = bits.FromBigP(rt.buckets[bucketIndex].bucketRange.End.Big()) + } else if !canGoLower && canGoHigher { + // lower the start of bucket[bucketIndex+1] + rt.buckets[bucketIndex+1].bucketRange.Start = bits.FromBigP(rt.buckets[bucketIndex].bucketRange.Start.Big()) + } else if canGoLower && canGoHigher { + // raise the end of bucket[bucketIndex-1] and lower the start of bucket[bucketIndex+1] to the + // midpoint of the range covered by bucket[bucketIndex] + midpoint := &big.Int{} + midpoint.Sub(rt.buckets[bucketIndex].bucketRange.End.Big(), rt.buckets[bucketIndex].bucketRange.Start.Big()) + midpoint.Div(midpoint, big.NewInt(2)) + midpointPlusOne := &big.Int{} + midpointPlusOne.Add(midpoint, big.NewInt(1)) + rt.buckets[bucketIndex-1].bucketRange.End = bits.FromBigP(midpoint) + rt.buckets[bucketIndex+1].bucketRange.Start = bits.FromBigP(midpointPlusOne) + } else { + return + } + // pop the bucket + rt.buckets = rt.buckets[:bucketIndex+copy(rt.buckets[bucketIndex:], rt.buckets[bucketIndex+1:])] +} + +func (rt *routingTable) popNextEmptyBucket() bool { + for bucketIndex := 0; bucketIndex < len(rt.buckets); bucketIndex += 1 { + if len(rt.buckets[bucketIndex].peers) == 0 { + rt.popBucket(bucketIndex) + return true + } + } + return false +} + +func (rt *routingTable) popEmptyBuckets() { + if len(rt.buckets) > 1 { + popBuckets := rt.popNextEmptyBucket() + for popBuckets == true { + popBuckets = rt.popNextEmptyBucket() + } + } +} func (rt *routingTable) GetIDsForRefresh(refreshInterval time.Duration) []bits.Bitmap { var bitmaps []bits.Bitmap diff --git a/dht/routing_table_test.go b/dht/routing_table_test.go index 2418d06..69cc8f4 100644 --- a/dht/routing_table_test.go +++ b/dht/routing_table_test.go @@ -2,11 +2,11 @@ package dht import ( "encoding/json" + "math/big" "net" "strconv" "strings" "testing" - "github.com/lbryio/reflector.go/dht/bits" "github.com/sebdah/goldie" ) @@ -36,29 +36,92 @@ func TestRoutingTable_bucketFor(t *testing.T) { } } -func TestRoutingTableFillBuckets(t *testing.T) { - n1 := bits.FromHexP("FFFFFFFF0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000") - n2 := bits.FromHexP("FFFFFFF00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000") - n3 := bits.FromHexP("111111110000000000000000000000000000000000000000000000000000000000000000000000000000000000000000") - n4 := bits.FromHexP("111111120000000000000000000000000000000000000000000000000000000000000000000000000000000000000000") - n5 := bits.FromHexP("111111130000000000000000000000000000000000000000000000000000000000000000000000000000000000000000") - n6 := bits.FromHexP("111111140000000000000000000000000000000000000000000000000000000000000000000000000000000000000000") - n7 := bits.FromHexP("111111150000000000000000000000000000000000000000000000000000000000000000000000000000000000000000") - n8 := bits.FromHexP("111111160000000000000000000000000000000000000000000000000000000000000000000000000000000000000000") - n9 := bits.FromHexP("111111070000000000000000000000000000000000000000000000000000000000000000000000000000000000000000") +func checkBucketCount(rt *routingTable, t *testing.T, correctSize, correctCount, testCaseIndex int) { + if len(rt.buckets) != correctSize { + t.Errorf("failed test case %d. there should be %d buckets, got %d", testCaseIndex + 1, correctSize, len(rt.buckets)) + } + if rt.Count() != correctCount { + t.Errorf("failed test case %d. there should be %d contacts, got %d", testCaseIndex + 1, correctCount, rt.Count()) + } - rt := newRoutingTable(n1) - rt.Update(Contact{n2, net.ParseIP("127.0.0.1"), 8001}) - rt.Update(Contact{n3, net.ParseIP("127.0.0.1"), 8002}) - rt.Update(Contact{n4, net.ParseIP("127.0.0.1"), 8003}) - rt.Update(Contact{n5, net.ParseIP("127.0.0.1"), 8004}) - rt.Update(Contact{n6, net.ParseIP("127.0.0.1"), 8005}) - rt.Update(Contact{n7, net.ParseIP("127.0.0.1"), 8006}) - rt.Update(Contact{n7, net.ParseIP("127.0.0.1"), 8007}) - rt.Update(Contact{n8, net.ParseIP("127.0.0.1"), 8008}) - rt.Update(Contact{n9, net.ParseIP("127.0.0.1"), 8009}) +} - log.Printf(rt.BucketInfo()) +func checkRangeContinuity(rt *routingTable, t *testing.T) { + position := big.NewInt(0) + for i, bucket := range rt.buckets { + bucketStart := bucket.bucketRange.Start.Big() + if bucketStart.Cmp(position) != 0 { + t.Errorf("invalid start of bucket range: %s vs %s", position.String(), bucketStart.String()) + } + if bucketStart.Cmp(bucket.bucketRange.End.Big()) != -1 { + t.Error("range start is not less than bucket end") + } + position = bucket.bucketRange.End.Big() + if i != len(rt.buckets) - 1 { + position.Add(position, big.NewInt(1)) + } + } + if position.Cmp(bits.MaxP().Big()) != 0 { + t.Errorf("range does not cover the whole keyspace, %s vs %s", bits.FromBigP(position).String(), bits.MaxP().String()) + } +} + +func TestSplitBuckets(t *testing.T) { + rt := newRoutingTable(bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")) + if len(rt.buckets) != 1 { + t.Errorf("there should only be one bucket so far") + } + if len(rt.buckets[0].peers) != 0 { + t.Errorf("there should be no contacts yet") + } + + var tests = []struct { + id bits.Bitmap + expectedBucketCount int + expectedTotalContacts int + }{ + //fill first bucket + {bits.FromHexP("F00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 1, 1}, + {bits.FromHexP("FF0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 1, 2}, + {bits.FromHexP("FFF000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 1, 3}, + {bits.FromHexP("FFFF00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 1, 4}, + {bits.FromHexP("FFFFF0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 1, 5}, + {bits.FromHexP("FFFFFF000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 1, 6}, + {bits.FromHexP("FFFFFFF00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 1, 7}, + {bits.FromHexP("FFFFFFFF0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 1, 8}, + + // fill second bucket + {bits.FromHexP("FFFFFFFFF000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 2, 9}, + {bits.FromHexP("FFFFFFFFFF00000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 2, 10}, + {bits.FromHexP("FFFFFFFFFFF0000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 2, 11}, + {bits.FromHexP("FFFFFFFFFFFF000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 2, 12}, + {bits.FromHexP("FFFFFFFFFFFFF00000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 2, 13}, + {bits.FromHexP("FFFFFFFFFFFFFF0000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 2, 14}, + {bits.FromHexP("FFFFFFFFFFFFFFF000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 2, 15}, + {bits.FromHexP("FFFFFFFFFFFFFFFF00000000000000000000000000000000000000000000000000000000000000000000000000000000"), 2, 16}, + + // this should be skipped (no split should occur) + {bits.FromHexP("FFFFFFFFFFFFFFFFF0000000000000000000000000000000000000000000000000000000000000000000000000000000"), 2, 16}, + + {bits.FromHexP("100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 3, 17}, + {bits.FromHexP("200000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 3, 18}, + {bits.FromHexP("300000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 3, 19}, + + {bits.FromHexP("400000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 4, 20}, + {bits.FromHexP("500000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 4, 21}, + {bits.FromHexP("600000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 4, 22}, + {bits.FromHexP("700000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 4, 23}, + {bits.FromHexP("800000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 4, 24}, + {bits.FromHexP("900000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 4, 25}, + {bits.FromHexP("A00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 4, 26}, + {bits.FromHexP("B00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 4, 27}, + } + for i, testCase := range tests { + rt.Update(Contact{testCase.id, net.ParseIP("127.0.0.1"), 8000 + i}) + checkBucketCount(rt, t, testCase.expectedBucketCount, testCase.expectedTotalContacts, i) + checkRangeContinuity(rt, t) + } + rt.printBucketInfo() } func TestRoutingTable_GetClosest(t *testing.T) { @@ -77,7 +140,6 @@ func TestRoutingTable_GetClosest(t *testing.T) { if !contacts[0].ID.Equals(n3) { t.Error(contacts[0]) } - contacts = rt.GetClosest(n2, 10) if len(contacts) != 2 { t.Error(len(contacts)) @@ -148,7 +210,8 @@ func TestRoutingTable_MoveToBack(t *testing.T) { func TestRoutingTable_InitialBucketRange(t *testing.T) { id := bits.FromHexP("1c8aff71b99462464d9eeac639595ab99664be3482cb91a29d87467515c7d9158fe72aa1f1582dab07d8f8b5db277f41") - ranges := newRoutingTable(id).BucketRanges() + rt := newRoutingTable(id) + ranges := rt.BucketRanges() bucketRange := ranges[0] if len(ranges) != 1 { t.Error("there should only be one bucket") @@ -169,6 +232,7 @@ func TestRoutingTable_InitialBucketRange(t *testing.T) { if found != 1000 { t.Errorf("%d did not appear in any bucket", found) } + log.Println(rt.Count()) } func TestRoutingTable_Save(t *testing.T) {