From 4c000ed4198f19f77866abb28ceb19e2d28f6853 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Tue, 10 Jul 2018 17:30:47 -0400 Subject: [PATCH] grin's cleanup and some WIP --- cmd/dht.go | 5 +- dht/bits/bitmap.go | 5 - dht/bits/range.go | 4 + dht/routing_table.go | 339 ++++++++++++++------------------------ dht/routing_table_test.go | 31 ++-- 5 files changed, 151 insertions(+), 233 deletions(-) diff --git a/cmd/dht.go b/cmd/dht.go index 62be601..d3199bd 100644 --- a/cmd/dht.go +++ b/cmd/dht.go @@ -19,7 +19,7 @@ var dhtPort int func init() { var cmd = &cobra.Command{ - Use: "dht [start|bootstrap]", + Use: "dht [bootstrap|connect]", Short: "Run dht node", ValidArgs: []string{"start", "bootstrap"}, Args: argFuncs(cobra.ExactArgs(1), cobra.OnlyValidArgs), @@ -47,5 +47,8 @@ func dhtCmd(cmd *cobra.Command, args []string) { node.Shutdown() } else { log.Fatal("not implemented") + + // + } } diff --git a/dht/bits/bitmap.go b/dht/bits/bitmap.go index f0d8a26..0605fb6 100644 --- a/dht/bits/bitmap.go +++ b/dht/bits/bitmap.go @@ -344,11 +344,6 @@ func MaxP() Bitmap { return FromHexP(strings.Repeat("f", NumBytes*2)) } -// Min returns a bitmap with all bits set to 0 -func MinP() Bitmap { - return FromHexP(strings.Repeat("0", NumBytes*2)) -} - // Rand generates a cryptographically random bitmap with the confines of the parameters specified. func Rand() Bitmap { var id Bitmap diff --git a/dht/bits/range.go b/dht/bits/range.go index 349e148..083e36c 100644 --- a/dht/bits/range.go +++ b/dht/bits/range.go @@ -61,3 +61,7 @@ func (r Range) intervalStart(n, num int) *big.Int { func (r Range) IntervalSize() *big.Int { return (&big.Int{}).Sub(r.End.Big(), r.Start.Big()) } + +func (r Range) Contains(b Bitmap) bool { + return r.Start.Cmp(b) <= 0 && r.End.Cmp(b) >= 0 +} diff --git a/dht/routing_table.go b/dht/routing_table.go index dd0f69c..3f5d26f 100644 --- a/dht/routing_table.go +++ b/dht/routing_table.go @@ -3,13 +3,14 @@ package dht import ( "encoding/json" "fmt" - "math/big" "net" "sort" "strconv" "strings" "sync" "time" + + "github.com/davecgh/go-spew/spew" "github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/stop" "github.com/lbryio/reflector.go/dht/bits" @@ -28,7 +29,7 @@ type peer struct { // LastRequested time.Time // LastFailure time.Time // SecondLastFailure time.Time - NumFailures int + NumFailures int //, // @@ -57,10 +58,18 @@ func (p *peer) Fail() { } type bucket struct { - lock *sync.RWMutex - peers []peer - lastUpdate time.Time - bucketRange bits.Range + lock *sync.RWMutex + peers []peer + lastUpdate time.Time + Range bits.Range // capitalized because `range` is a keyword +} + +func newBucket(r bits.Range) *bucket { + return &bucket{ + peers: make([]peer, 0, bucketSize), + lock: &sync.RWMutex{}, + Range: r, + } } // Len returns the number of peers in the bucket @@ -70,6 +79,17 @@ func (b bucket) Len() int { return len(b.peers) } +func (b bucket) Contains(c Contact) bool { + b.lock.RLock() + defer b.lock.RUnlock() + for _, p := range b.peers { + if p.Contact.Equals(c, true) { + return true + } + } + return false +} + // Contacts returns a slice of the bucket's contacts func (b bucket) Contacts() []Contact { b.lock.RLock() @@ -85,22 +105,26 @@ func (b bucket) Contacts() []Contact { func (b *bucket) UpdateContact(c Contact, insertIfNew bool) { b.lock.Lock() defer b.lock.Unlock() + fmt.Printf("updating contact %s\n", c.ID) // TODO: verify the peer is in the bucket key range peerIndex := find(c.ID, b.peers) if peerIndex >= 0 { + fmt.Println("exists, moving to back") b.lastUpdate = time.Now() b.peers[peerIndex].Touch() moveToBack(b.peers, peerIndex) - } else if insertIfNew { + fmt.Println("inserting new") hasRoom := true if len(b.peers) >= bucketSize { + fmt.Println("no room") hasRoom = false for i := range b.peers { if b.peers[i].IsBad(maxPeerFails) { + fmt.Println("dropping bad peer to make room") // TODO: Ping contact first. Only remove if it does not respond b.peers = append(b.peers[:i], b.peers[i+1:]...) hasRoom = true @@ -110,10 +134,13 @@ func (b *bucket) UpdateContact(c Contact, insertIfNew bool) { } if hasRoom { + fmt.Println("actually adding") b.lastUpdate = time.Now() peer := peer{Contact: c} peer.Touch() b.peers = append(b.peers, peer) + } else { + fmt.Println("no room, dropping") } } } @@ -146,37 +173,59 @@ func (b *bucket) NeedsRefresh(refreshInterval time.Duration) bool { return time.Since(b.lastUpdate) > refreshInterval } +func (b *bucket) Split() (*bucket, *bucket) { + b.lock.Lock() + defer b.lock.Unlock() + + left := newBucket(b.Range.IntervalP(1, 2)) + right := newBucket(b.Range.IntervalP(2, 2)) + left.lastUpdate = b.lastUpdate + right.lastUpdate = b.lastUpdate + + for _, p := range b.peers { + if left.Range.Contains(p.Contact.ID) { + left.peers = append(left.peers, p) + } else { + right.peers = append(right.peers, p) + } + } + + if len(left.peers) == 0 { + left, right = right.Split() + left.Range.Start = b.Range.Start + } else if len(right.peers) == 0 { + left, right = left.Split() + right.Range.End = b.Range.End + } + + return left, right +} + type routingTable struct { id bits.Bitmap - buckets []bucket - lock *sync.RWMutex + buckets []*bucket + mu *sync.RWMutex // this mutex is write-locked only when CHANGING THE NUMBER OF BUCKETS in the table } func newRoutingTable(id bits.Bitmap) *routingTable { - var rt routingTable - rt.id = id - rt.lock = &sync.RWMutex{} + rt := routingTable{ + id: id, + mu: &sync.RWMutex{}, + } rt.reset() return &rt } func (rt *routingTable) reset() { - rt.Lock() - defer rt.Unlock() - newBucketLock := &sync.RWMutex{} - newBucketLock.Lock() - rt.buckets = []bucket{} - rt.buckets = append(rt.buckets, bucket{ - peers: make([]peer, 0, bucketSize), - lock: newBucketLock, - bucketRange: bits.Range{ - Start: bits.MinP(), - End: bits.MaxP(), - }, - }) + rt.mu.Lock() + defer rt.mu.Unlock() + rt.buckets = []*bucket{newBucket(bits.MaxRange())} } func (rt *routingTable) BucketInfo() string { + rt.mu.RLock() + defer rt.mu.RUnlock() + var bucketInfo []string for i, b := range rt.buckets { if b.Len() > 0 { @@ -196,52 +245,53 @@ func (rt *routingTable) BucketInfo() string { // Update inserts or refreshes a contact func (rt *routingTable) Update(c Contact) { - rt.insertContact(c) + rt.mu.Lock() // write lock, because updates may cause bucket splits + defer rt.mu.Unlock() + + if rt.shouldSplit(c) { + spew.Dump("splitting") + i := rt.bucketNumFor(c.ID) + left, right := rt.buckets[i].Split() + rt.buckets = append(rt.buckets[:i], append([]*bucket{left, right}, rt.buckets[i+1:]...)...) + } else { + spew.Dump("no split") + } + rt.buckets[rt.bucketNumFor(c.ID)].UpdateContact(c, true) } // Fresh refreshes a contact if its already in the routing table func (rt *routingTable) Fresh(c Contact) { + rt.mu.RLock() + defer rt.mu.RUnlock() rt.bucketFor(c.ID).UpdateContact(c, false) } // FailContact marks a contact as having failed, and removes it if it failed too many times func (rt *routingTable) Fail(c Contact) { + rt.mu.RLock() + defer rt.mu.RUnlock() rt.bucketFor(c.ID).FailContact(c.ID) } -func (rt *routingTable) getClosestToUs(limit int) []Contact { - contacts := []Contact{} - toSort := []sortedContact{} - rt.lock.RLock() - defer rt.lock.RUnlock() - for _, bucket := range rt.buckets { - toSort = []sortedContact{} - toSort = appendContacts(toSort, bucket, rt.id) - sort.Sort(byXorDistance(toSort)) - for _, sorted := range toSort { - contacts = append(contacts, sorted.contact) - if len(contacts) >= limit { - break - } - } - } - return contacts +// GetClosest returns the closest `limit` contacts from the routing table. +// This is a locking wrapper around getClosest() +func (rt *routingTable) GetClosest(target bits.Bitmap, limit int) []Contact { + rt.mu.RLock() + defer rt.mu.RUnlock() + return rt.getClosest(target, limit) } -// GetClosest returns the closest `limit` contacts from the routing table -// It marks each bucket it accesses as having been accessed -func (rt *routingTable) GetClosest(target bits.Bitmap, limit int) []Contact { - if target == rt.id { - return rt.getClosestToUs(limit) - } - rt.lock.RLock() - defer rt.lock.RUnlock() - toSort := []sortedContact{} +// getClosest returns the closest `limit` contacts from the routing table +func (rt *routingTable) getClosest(target bits.Bitmap, limit int) []Contact { + var toSort []sortedContact for _, b := range rt.buckets { - toSort = appendContacts(toSort, b, target) + for _, c := range b.Contacts() { + toSort = append(toSort, sortedContact{c, c.ID.Xor(target)}) + } } sort.Sort(byXorDistance(toSort)) - contacts := []Contact{} + + var contacts []Contact for _, sorted := range toSort { contacts = append(contacts, sorted.contact) if len(contacts) >= limit { @@ -251,18 +301,11 @@ func (rt *routingTable) GetClosest(target bits.Bitmap, limit int) []Contact { return contacts } -func appendContacts(contacts []sortedContact, b bucket, target bits.Bitmap) []sortedContact { - for _, contact := range b.Contacts() { - contacts = append(contacts, sortedContact{contact, contact.ID.Xor(target)}) - } - return contacts -} - // Count returns the number of contacts in the routing table func (rt *routingTable) Count() int { + rt.mu.RLock() + defer rt.mu.RUnlock() count := 0 - rt.lock.RLock() - defer rt.lock.RUnlock() for _, bucket := range rt.buckets { count += bucket.Len() } @@ -271,32 +314,30 @@ func (rt *routingTable) Count() int { // Len returns the number of buckets in the routing table func (rt *routingTable) Len() int { - rt.lock.RLock() - defer rt.lock.RUnlock() + rt.mu.RLock() + defer rt.mu.RUnlock() return len(rt.buckets) } // BucketRanges returns a slice of ranges, where the `start` of each range is the smallest id that can // go in that bucket, and the `end` is the largest id func (rt *routingTable) BucketRanges() []bits.Range { - rt.lock.RLock() - defer rt.lock.RUnlock() + rt.mu.RLock() + defer rt.mu.RUnlock() ranges := make([]bits.Range, len(rt.buckets)) for i, b := range rt.buckets { - ranges[i] = b.bucketRange + ranges[i] = b.Range } return ranges } func (rt *routingTable) bucketNumFor(target bits.Bitmap) int { - rt.lock.RLock() - defer rt.lock.RUnlock() if rt.id.Equals(target) { panic("routing table does not have a bucket for its own id") } distance := target.Xor(rt.id) for i, b := range rt.buckets { - if b.bucketRange.Start.Cmp(distance) <= 0 && b.bucketRange.End.Cmp(distance) >= 0 { + if b.Range.Contains(distance) { return i } } @@ -304,164 +345,36 @@ func (rt *routingTable) bucketNumFor(target bits.Bitmap) int { } func (rt *routingTable) bucketFor(target bits.Bitmap) *bucket { - bucketIndex := rt.bucketNumFor(target) - rt.lock.RLock() - defer rt.lock.RUnlock() - return &rt.buckets[bucketIndex] + return rt.buckets[rt.bucketNumFor(target)] } -func (rt *routingTable) shouldSplit(target bits.Bitmap) bool { - b := rt.bucketFor(target) +func (rt *routingTable) shouldSplit(c Contact) bool { + b := rt.bucketFor(c.ID) + if b.Contains(c) { + return false + } if b.Len() >= bucketSize { - if b.bucketRange.Start.Equals(bits.MinP()) { // this is the bucket covering our node id + if b.Range.Start.Equals(bits.Bitmap{}) { // this is the bucket covering our node id return true } - kClosest := rt.GetClosest(rt.id, bucketSize) - kthClosest := kClosest[len(kClosest) - 1] - if target.Xor(rt.id).Cmp(kthClosest.ID.Xor(rt.id)) < 0 { - return true // the kth closest contact is further than this one + kClosest := rt.getClosest(rt.id, bucketSize) + kthClosest := kClosest[len(kClosest)-1] + if rt.id.Closer(c.ID, kthClosest.ID) { + return true } } return false } -func (rt *routingTable) insertContact(c Contact) { - bucketIndex := rt.bucketNumFor(c.ID) - peersInBucket :=rt.buckets[bucketIndex].Len() - if peersInBucket < bucketSize { - rt.buckets[rt.bucketNumFor(c.ID)].UpdateContact(c, true) - } else if peersInBucket >= bucketSize && rt.shouldSplit(c.ID) { - rt.splitBucket(bucketIndex) - rt.insertContact(c) - rt.popEmptyBuckets() - } -} - -func (rt * routingTable) Lock() { - rt.lock.Lock() - for _, buk := range rt.buckets { - buk.lock.Lock() - } -} - -func (rt * routingTable) Unlock() { - rt.lock.Unlock() - for _, buk := range rt.buckets { - buk.lock.Unlock() - } -} - -func (rt *routingTable) splitBucket(bucketIndex int) { - rt.Lock() - defer rt.Unlock() - - b := rt.buckets[bucketIndex] - min := b.bucketRange.Start.Big() - max := b.bucketRange.End.Big() - midpoint := &big.Int{} - midpoint.Sub(max, min) - midpoint.Div(midpoint, big.NewInt(2)) - midpoint.Add(midpoint, min) - midpointPlusOne := &big.Int{} - midpointPlusOne.Add(midpointPlusOne, min) - midpointPlusOne.Add(midpoint, big.NewInt(1)) - - first_half := rt.buckets[:bucketIndex+1] - second_half := []bucket{} - for i := bucketIndex + 1; i < len(rt.buckets); i++ { - second_half = append(second_half, rt.buckets[i]) - } - - copiedPeers := []peer{} - copy(copiedPeers, b.peers) - b.peers = []peer{} - - rt.buckets = []bucket{} - for _, buk := range first_half { - rt.buckets = append(rt.buckets, buk) - } - newBucketLock := &sync.RWMutex{} - newBucketLock.Lock() // will be unlocked by the deferred rt.Unlock() - newBucket := bucket{ - peers: make([]peer, 0, bucketSize), - lock: newBucketLock, - bucketRange: bits.Range{ - Start: bits.FromBigP(midpointPlusOne), - End: bits.FromBigP(max), - }, - } - rt.buckets = append(rt.buckets, newBucket) - for _, buk := range second_half { - rt.buckets = append(rt.buckets, buk) - } - // re-size the bucket to be split - rt.buckets[bucketIndex].bucketRange.Start = bits.FromBigP(min) - rt.buckets[bucketIndex].bucketRange.End = bits.FromBigP(midpoint) - - // re-insert the contacts that were in the re-sized bucket - for _, p := range copiedPeers { - rt.insertContact(p.Contact) - } -} - func (rt *routingTable) printBucketInfo() { for i, b := range rt.buckets { - fmt.Printf("bucket %d, %d contacts\n", i + 1, len(b.peers)) - fmt.Printf(" start : %s\n", b.bucketRange.Start.String()) - fmt.Printf(" stop : %s\n", b.bucketRange.End.String()) + fmt.Printf("bucket %d, %d contacts\n", i+1, len(b.peers)) + fmt.Printf(" start : %s\n", b.Range.Start.String()) + fmt.Printf(" stop : %s\n", b.Range.End.String()) fmt.Println("") } } -func (rt *routingTable) popBucket(bucketIndex int) { - canGoLower := bucketIndex >= 1 - canGoHigher := len(rt.buckets) - 1 > bucketIndex - - if canGoLower && !canGoHigher { - // raise the end of bucket[bucketIndex-1] - rt.buckets[bucketIndex-1].bucketRange.End = bits.FromBigP(rt.buckets[bucketIndex].bucketRange.End.Big()) - } else if !canGoLower && canGoHigher { - // lower the start of bucket[bucketIndex+1] - rt.buckets[bucketIndex+1].bucketRange.Start = bits.FromBigP(rt.buckets[bucketIndex].bucketRange.Start.Big()) - } else if canGoLower && canGoHigher { - // raise the end of bucket[bucketIndex-1] and lower the start of bucket[bucketIndex+1] to the - // midpoint of the range covered by bucket[bucketIndex] - midpoint := &big.Int{} - midpoint.Sub(rt.buckets[bucketIndex].bucketRange.End.Big(), rt.buckets[bucketIndex].bucketRange.Start.Big()) - midpoint.Div(midpoint, big.NewInt(2)) - midpointPlusOne := &big.Int{} - midpointPlusOne.Add(midpoint, big.NewInt(1)) - rt.buckets[bucketIndex-1].bucketRange.End = bits.FromBigP(midpoint) - rt.buckets[bucketIndex+1].bucketRange.Start = bits.FromBigP(midpointPlusOne) - } else { - return - } - // pop the bucket - rt.buckets = rt.buckets[:bucketIndex+copy(rt.buckets[bucketIndex:], rt.buckets[bucketIndex+1:])] -} - -func (rt *routingTable) popNextEmptyBucket() bool { - for bucketIndex := 0; bucketIndex < len(rt.buckets); bucketIndex += 1 { - if len(rt.buckets[bucketIndex].peers) == 0 { - rt.popBucket(bucketIndex) - return true - } - } - return false -} - -func (rt *routingTable) popEmptyBuckets() { - rt.Lock() - defer rt.Unlock() - - if len(rt.buckets) > 1 { - popBuckets := rt.popNextEmptyBucket() - for popBuckets == true { - popBuckets = rt.popNextEmptyBucket() - } - } -} - func (rt *routingTable) GetIDsForRefresh(refreshInterval time.Duration) []bits.Bitmap { var bitmaps []bits.Bitmap for i, bucket := range rt.buckets { diff --git a/dht/routing_table_test.go b/dht/routing_table_test.go index 5ab3493..bb72adb 100644 --- a/dht/routing_table_test.go +++ b/dht/routing_table_test.go @@ -2,38 +2,39 @@ package dht import ( "encoding/json" + "fmt" "math/big" "net" "strconv" "strings" "testing" + "github.com/lbryio/reflector.go/dht/bits" "github.com/sebdah/goldie" ) - -func checkBucketCount(rt *routingTable, t *testing.T, correctSize, correctCount, testCaseIndex int) { +func checkBucketCount(t *testing.T, rt *routingTable, correctSize, correctCount, testCaseIndex int) { if len(rt.buckets) != correctSize { - t.Errorf("failed test case %d. there should be %d buckets, got %d", testCaseIndex + 1, correctSize, len(rt.buckets)) + t.Errorf("failed test case %d. there should be %d buckets, got %d", testCaseIndex+1, correctSize, len(rt.buckets)) } if rt.Count() != correctCount { - t.Errorf("failed test case %d. there should be %d contacts, got %d", testCaseIndex + 1, correctCount, rt.Count()) + t.Errorf("failed test case %d. there should be %d contacts, got %d", testCaseIndex+1, correctCount, rt.Count()) } } -func checkRangeContinuity(rt *routingTable, t *testing.T) { +func checkRangeContinuity(t *testing.T, rt *routingTable) { position := big.NewInt(0) for i, bucket := range rt.buckets { - bucketStart := bucket.bucketRange.Start.Big() + bucketStart := bucket.Range.Start.Big() if bucketStart.Cmp(position) != 0 { t.Errorf("invalid start of bucket range: %s vs %s", position.String(), bucketStart.String()) } - if bucketStart.Cmp(bucket.bucketRange.End.Big()) != -1 { + if bucketStart.Cmp(bucket.Range.End.Big()) != -1 { t.Error("range start is not less than bucket end") } - position = bucket.bucketRange.End.Big() - if i != len(rt.buckets) - 1 { + position = bucket.Range.End.Big() + if i != len(rt.buckets)-1 { position.Add(position, big.NewInt(1)) } } @@ -52,8 +53,8 @@ func TestSplitBuckets(t *testing.T) { } var tests = []struct { - id bits.Bitmap - expectedBucketCount int + id bits.Bitmap + expectedBucketCount int expectedTotalContacts int }{ //fill first bucket @@ -92,10 +93,13 @@ func TestSplitBuckets(t *testing.T) { {bits.FromHexP("A00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 4, 26}, {bits.FromHexP("B00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 4, 27}, } + for i, testCase := range tests { + fmt.Printf("\n\n\ncase %d\n", i) rt.Update(Contact{testCase.id, net.ParseIP("127.0.0.1"), 8000 + i}) - checkBucketCount(rt, t, testCase.expectedBucketCount, testCase.expectedTotalContacts, i) - checkRangeContinuity(rt, t) + //spew.Dump(rt.buckets) + checkBucketCount(t, rt, testCase.expectedBucketCount, testCase.expectedTotalContacts, i) + checkRangeContinuity(t, rt) } var testRanges = []struct { @@ -204,7 +208,6 @@ func TestRoutingTable_MoveToBack(t *testing.T) { } } - func TestRoutingTable_Save(t *testing.T) { id := bits.FromHexP("1c8aff71b99462464d9eeac639595ab99664be3482cb91a29d87467515c7d9158fe72aa1f1582dab07d8f8b5db277f41") rt := newRoutingTable(id)