diff --git a/db/db.go b/db/db.go index c6793d9..f93cdcd 100644 --- a/db/db.go +++ b/db/db.go @@ -371,9 +371,7 @@ CREATE TABLE blob_ ( hash char(96) NOT NULL, is_stored TINYINT(1) NOT NULL DEFAULT 0, length bigint(20) unsigned DEFAULT NULL, - last_announced_at datetime DEFAULT NULL, - PRIMARY KEY (hash), - KEY last_announced_at_idx (last_announced_at) + PRIMARY KEY (hash) ); CREATE TABLE stream ( diff --git a/dht/config.go b/dht/config.go new file mode 100644 index 0000000..294a5bc --- /dev/null +++ b/dht/config.go @@ -0,0 +1,76 @@ +package dht + +import ( + "strconv" + "time" + + "github.com/lbryio/reflector.go/dht/bits" + peerproto "github.com/lbryio/reflector.go/peer" +) + +const ( + Network = "udp4" + DefaultPort = 4444 + + DefaultAnnounceRate = 10 + DefaultAnnounceBurst = 1 + DefaultReannounceTime = 50 * time.Minute + + // TODO: all these constants should be defaults, and should be used to set values in the standard Config. then the code should use values in the config + // TODO: alternatively, have a global Config for constants. at least that way tests can modify the values + alpha = 3 // this is the constant alpha in the spec + bucketSize = 8 // this is the constant k in the spec + nodeIDLength = bits.NumBytes // bytes. this is the constant B in the spec + messageIDLength = 20 // bytes. + + udpRetry = 1 + udpTimeout = 5 * time.Second + udpMaxMessageLength = 4096 // bytes. I think our longest message is ~676 bytes, so I rounded up to 1024 + // scratch that. a findValue could return more than K results if a lot of nodes are storing that value, so we need more buffer + + maxPeerFails = 3 // after this many failures, a peer is considered bad and will be removed from the routing table + //tExpire = 60 * time.Minute // the time after which a key/value pair expires; this is a time-to-live (TTL) from the original publication date + tRefresh = 1 * time.Hour // the time after which an otherwise unaccessed bucket must be refreshed + //tReplicate = 1 * time.Hour // the interval between Kademlia replication events, when a node is required to publish its entire database + //tNodeRefresh = 15 * time.Minute // the time after which a good node becomes questionable if it has not messaged us + + compactNodeInfoLength = nodeIDLength + 6 // nodeID + 4 for IP + 2 for port + + tokenSecretRotationInterval = 5 * time.Minute // how often the token-generating secret is rotated +) + +// Config represents the configure of dht. +type Config struct { + // this node's address. format is `ip:port` + Address string + // the seed nodes through which we can join in dht network + SeedNodes []string + // the hex-encoded node id for this node. if string is empty, a random id will be generated + NodeID string + // print the state of the dht every X time + PrintState time.Duration + // the port that clients can use to download blobs using the LBRY peer protocol + PeerProtocolPort int + // the time after which the original publisher must reannounce a key/value pair + ReannounceTime time.Duration + // send at most this many announces per second + AnnounceRate int + // allow bursts of up to this many times the announce rate + AnnounceBurst int +} + +// NewStandardConfig returns a Config pointer with default values. +func NewStandardConfig() *Config { + return &Config{ + Address: "0.0.0.0:" + strconv.Itoa(DefaultPort), + SeedNodes: []string{ + "lbrynet1.lbry.io:4444", + "lbrynet2.lbry.io:4444", + "lbrynet3.lbry.io:4444", + }, + PeerProtocolPort: peerproto.DefaultPort, + ReannounceTime: DefaultReannounceTime, + AnnounceRate: DefaultAnnounceRate, + AnnounceBurst: DefaultAnnounceBurst, + } +} diff --git a/dht/dht.go b/dht/dht.go index a92d373..9e5d3cd 100644 --- a/dht/dht.go +++ b/dht/dht.go @@ -3,13 +3,10 @@ package dht import ( "fmt" "net" - "strconv" "strings" - "sync" "time" "github.com/lbryio/reflector.go/dht/bits" - peerproto "github.com/lbryio/reflector.go/peer" "github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/stop" @@ -30,61 +27,6 @@ func init() { //log.SetLevel(log.DebugLevel) } -const ( - Network = "udp4" - DefaultPort = 4444 - - // TODO: all these constants should be defaults, and should be used to set values in the standard Config. then the code should use values in the config - // TODO: alternatively, have a global Config for constants. at least that way tests can modify the values - alpha = 3 // this is the constant alpha in the spec - bucketSize = 8 // this is the constant k in the spec - nodeIDLength = bits.NumBytes // bytes. this is the constant B in the spec - messageIDLength = 20 // bytes. - - udpRetry = 1 - udpTimeout = 5 * time.Second - udpMaxMessageLength = 4096 // bytes. I think our longest message is ~676 bytes, so I rounded up to 1024 - // scratch that. a findValue could return more than K results if a lot of nodes are storing that value, so we need more buffer - - maxPeerFails = 3 // after this many failures, a peer is considered bad and will be removed from the routing table - //tExpire = 60 * time.Minute // the time after which a key/value pair expires; this is a time-to-live (TTL) from the original publication date - tReannounce = 50 * time.Minute // the time after which the original publisher must republish a key/value pair - tRefresh = 1 * time.Hour // the time after which an otherwise unaccessed bucket must be refreshed - //tReplicate = 1 * time.Hour // the interval between Kademlia replication events, when a node is required to publish its entire database - //tNodeRefresh = 15 * time.Minute // the time after which a good node becomes questionable if it has not messaged us - - compactNodeInfoLength = nodeIDLength + 6 // nodeID + 4 for IP + 2 for port - - tokenSecretRotationInterval = 5 * time.Minute // how often the token-generating secret is rotated -) - -// Config represents the configure of dht. -type Config struct { - // this node's address. format is `ip:port` - Address string - // the seed nodes through which we can join in dht network - SeedNodes []string - // the hex-encoded node id for this node. if string is empty, a random id will be generated - NodeID string - // print the state of the dht every X time - PrintState time.Duration - // the port that clients can use to download blobs using the LBRY peer protocol - PeerProtocolPort int -} - -// NewStandardConfig returns a Config pointer with default values. -func NewStandardConfig() *Config { - return &Config{ - Address: "0.0.0.0:" + strconv.Itoa(DefaultPort), - SeedNodes: []string{ - "lbrynet1.lbry.io:4444", - "lbrynet2.lbry.io:4444", - "lbrynet3.lbry.io:4444", - }, - PeerProtocolPort: peerproto.DefaultPort, - } -} - // DHT represents a DHT node. type DHT struct { // config @@ -97,12 +39,10 @@ type DHT struct { grp *stop.Group // channel is closed when DHT joins network joined chan struct{} - // lock for announced list - lock *sync.RWMutex - // list of bitmaps that need to be reannounced periodically - announced map[bits.Bitmap]bool // cache for store tokens tokenCache *tokenCache + // hashes that need to be put into the announce queue or removed from the queue + announceAddRemove chan queueEdit } // New returns a DHT pointer. If config is nil, then config will be set to the default config. @@ -112,11 +52,10 @@ func New(config *Config) *DHT { } d := &DHT{ - conf: config, - grp: stop.New(), - joined: make(chan struct{}), - lock: &sync.RWMutex{}, - announced: make(map[bits.Bitmap]bool), + conf: config, + grp: stop.New(), + joined: make(chan struct{}), + announceAddRemove: make(chan queueEdit), } return d } @@ -155,7 +94,11 @@ func (dht *DHT) Start() error { log.Infof("[%s] DHT ready on %s (%d nodes found during join)", dht.node.id.HexShort(), dht.contact.Addr().String(), dht.node.rt.Count()) - go dht.startReannouncer() + dht.grp.Add(1) + go func() { + dht.runAnnouncer() + dht.grp.Done() + }() return nil } @@ -238,113 +181,6 @@ func (dht *DHT) Get(hash bits.Bitmap) ([]Contact, error) { return nil, nil } -// Add adds the hash to the list of hashes this node has -func (dht *DHT) Add(hash bits.Bitmap) { - // TODO: calling Add several times quickly could cause it to be announced multiple times before dht.announced[hash] is set to true - dht.lock.RLock() - exists := dht.announced[hash] - dht.lock.RUnlock() - if exists { - return - } - - dht.grp.Add(1) - go func() { - defer dht.grp.Done() - err := dht.announce(hash) - if err != nil { - log.Error(errors.Prefix("error announcing bitmap", err)) - } - }() -} - -// Announce announces to the DHT that this node has the blob for the given hash -func (dht *DHT) announce(hash bits.Bitmap) error { - contacts, _, err := FindContacts(dht.node, hash, false, dht.grp.Child()) - if err != nil { - return err - } - - // if we found less than K contacts, or current node is closer than farthest contact - if len(contacts) < bucketSize { - // append self to contacts, and self-store - contacts = append(contacts, dht.contact) - } else if hash.Closer(dht.node.id, contacts[bucketSize-1].ID) { - // pop last contact, and self-store instead - contacts[bucketSize-1] = dht.contact - } - - wg := &sync.WaitGroup{} - for _, c := range contacts { - wg.Add(1) - go func(c Contact) { - dht.storeOnNode(hash, c) - wg.Done() - }(c) - } - - wg.Wait() - - dht.lock.Lock() - dht.announced[hash] = true - dht.lock.Unlock() - - return nil -} - -func (dht *DHT) startReannouncer() { - tick := time.NewTicker(tReannounce) - for { - select { - case <-dht.grp.Ch(): - return - case <-tick.C: - dht.lock.RLock() - for h := range dht.announced { - dht.grp.Add(1) - go func(bm bits.Bitmap) { - defer dht.grp.Done() - err := dht.announce(bm) - if err != nil { - log.Error("error re-announcing bitmap - ", err) - } - }(h) - } - dht.lock.RUnlock() - } - } -} - -func (dht *DHT) storeOnNode(hash bits.Bitmap, c Contact) { - // self-store - if dht.contact.ID == c.ID { - c.PeerPort = dht.conf.PeerProtocolPort - dht.node.Store(hash, c) - return - } - - token := dht.tokenCache.Get(c, hash, dht.grp.Ch()) - - resCh := dht.node.SendAsync(c, Request{ - Method: storeMethod, - StoreArgs: &storeArgs{ - BlobHash: hash, - Value: storeArgsValue{ - Token: token, - LbryID: dht.contact.ID, - Port: dht.conf.PeerProtocolPort, - }, - }, - }) - - go func() { - select { - case <-resCh: - case <-dht.grp.Ch(): - } - }() -} - // PrintState prints the current state of the DHT including address, nr outstanding transactions, stored hashes as well // as current bucket information. func (dht *DHT) PrintState() { diff --git a/dht/dht_announce.go b/dht/dht_announce.go new file mode 100644 index 0000000..456b3d1 --- /dev/null +++ b/dht/dht_announce.go @@ -0,0 +1,148 @@ +package dht + +import ( + "container/ring" + "context" + "sync" + "time" + + "github.com/lbryio/lbry.go/errors" + "github.com/lbryio/reflector.go/dht/bits" + "golang.org/x/time/rate" +) + +type queueEdit struct { + hash bits.Bitmap + add bool +} + +// Add adds the hash to the list of hashes this node is announcing +func (dht *DHT) Add(hash bits.Bitmap) { + dht.announceAddRemove <- queueEdit{hash: hash, add: true} +} + +// Remove removes the hash from the list of hashes this node is announcing +func (dht *DHT) Remove(hash bits.Bitmap) { + dht.announceAddRemove <- queueEdit{hash: hash, add: false} +} + +func (dht *DHT) runAnnouncer() { + type hashAndTime struct { + hash bits.Bitmap + lastAnnounce time.Time + } + + queue := ring.New(0) + hashes := make(map[bits.Bitmap]*ring.Ring) + limiter := rate.NewLimiter(rate.Limit(dht.conf.AnnounceRate), dht.conf.AnnounceRate*dht.conf.AnnounceBurst) + + var announceNextHash <-chan time.Time + timer := time.NewTimer(0) + closedCh := make(chan time.Time) + close(closedCh) + + for { + select { + case <-dht.grp.Ch(): + return + + case change := <-dht.announceAddRemove: + if change.add { + r := ring.New(1) + r.Value = hashAndTime{hash: change.hash} + queue.Prev().Link(r) + queue = r + hashes[change.hash] = r + announceNextHash = closedCh // don't wait to announce next hash + } else { + if r, exists := hashes[change.hash]; exists { + delete(hashes, change.hash) + if len(hashes) == 0 { + queue = ring.New(0) + announceNextHash = make(chan time.Time) // no hashes to announce, wait indefinitely + } else { + if r == queue { + queue = queue.Next() // don't lose our pointer + } + r.Prev().Link(r.Next()) + } + } + } + + case <-announceNextHash: + limiter.Wait(context.Background()) // TODO: should use grp.ctx somehow + dht.grp.Add(1) + ht := queue.Value.(hashAndTime) + + if !ht.lastAnnounce.IsZero() { + nextAnnounce := ht.lastAnnounce.Add(dht.conf.ReannounceTime) + if nextAnnounce.Before(time.Now()) { + timer.Reset(time.Until(nextAnnounce)) + announceNextHash = timer.C // wait until next hash should be announced + continue + } + } + + go func(hash bits.Bitmap) { + defer dht.grp.Done() + err := dht.announce(hash) + if err != nil { + log.Error(errors.Prefix("announce", err)) + } + }(ht.hash) + + queue.Value = hashAndTime{hash: ht.hash, lastAnnounce: time.Now()} + queue = queue.Next() + announceNextHash = closedCh // don't wait to announce next hash + } + } +} + +// Announce announces to the DHT that this node has the blob for the given hash +func (dht *DHT) announce(hash bits.Bitmap) error { + contacts, _, err := FindContacts(dht.node, hash, false, dht.grp.Child()) + if err != nil { + return err + } + + // self-store if we found less than K contacts, or we're closer than the farthest contact + if len(contacts) < bucketSize { + contacts = append(contacts, dht.contact) + } else if hash.Closer(dht.node.id, contacts[bucketSize-1].ID) { + contacts[bucketSize-1] = dht.contact + } + + wg := &sync.WaitGroup{} + for _, c := range contacts { + wg.Add(1) + go func(c Contact) { + dht.store(hash, c) + wg.Done() + }(c) + } + + wg.Wait() + + return nil +} + +func (dht *DHT) store(hash bits.Bitmap, c Contact) { + if dht.contact.ID == c.ID { + // self-store + c.PeerPort = dht.conf.PeerProtocolPort + dht.node.Store(hash, c) + return + } + + dht.node.SendAsync(c, Request{ + Method: storeMethod, + StoreArgs: &storeArgs{ + BlobHash: hash, + Value: storeArgsValue{ + Token: dht.tokenCache.Get(c, hash, dht.grp.Ch()), + LbryID: dht.contact.ID, + Port: dht.conf.PeerProtocolPort, + }, + }, + }) +} diff --git a/dht/dht_announce_test.go b/dht/dht_announce_test.go new file mode 100644 index 0000000..a238af3 --- /dev/null +++ b/dht/dht_announce_test.go @@ -0,0 +1,29 @@ +package dht + +import ( + "testing" +) + +func TestDHT_Announce(t *testing.T) { + t.Skip("NEED SOME TESTS FOR ANNOUNCING") + + // tests + // - max rate + // - new announces get ahead of old announces + // - announcer blocks correctly (when nothing to announce, when next announce time is in the future) and unblocks correctly (when waiting to announce next and a new hash is added) + // thought: what happens when you're waiting to announce a hash and it gets removed? probably nothing, since later hashes will be announced later. but still good to test this + // + + //bs, dhts := TestingCreateNetwork(t, 2, true, true) + //defer func() { + // for _, d := range dhts { + // go d.Shutdown() + // } + // bs.Shutdown() + // time.Sleep(1 * time.Second) + //}() + // + //announcer := dhts[0] + //receiver := dhts[1] + +} diff --git a/dht/dht_test.go b/dht/dht_test.go index 918c2db..6079314 100644 --- a/dht/dht_test.go +++ b/dht/dht_test.go @@ -14,7 +14,7 @@ func TestNodeFinder_FindNodes(t *testing.T) { t.Skip("skipping slow nodeFinder test") } - bs, dhts := TestingCreateDHT(t, 3, true, false) + bs, dhts := TestingCreateNetwork(t, 3, true, false) defer func() { for i := range dhts { dhts[i].Shutdown() @@ -63,7 +63,7 @@ func TestNodeFinder_FindNodes(t *testing.T) { } func TestNodeFinder_FindNodes_NoBootstrap(t *testing.T) { - _, dhts := TestingCreateDHT(t, 3, false, false) + _, dhts := TestingCreateNetwork(t, 3, false, false) defer func() { for i := range dhts { dhts[i].Shutdown() @@ -81,7 +81,7 @@ func TestNodeFinder_FindValue(t *testing.T) { t.Skip("skipping slow nodeFinder test") } - bs, dhts := TestingCreateDHT(t, 3, true, false) + bs, dhts := TestingCreateNetwork(t, 3, true, false) defer func() { for i := range dhts { dhts[i].Shutdown() @@ -117,7 +117,7 @@ func TestDHT_LargeDHT(t *testing.T) { } nodes := 100 - bs, dhts := TestingCreateDHT(t, nodes, true, true) + bs, dhts := TestingCreateNetwork(t, nodes, true, true) defer func() { for _, d := range dhts { go d.Shutdown() diff --git a/dht/testing.go b/dht/testing.go index cda1071..2836d4c 100644 --- a/dht/testing.go +++ b/dht/testing.go @@ -14,8 +14,8 @@ import ( var testingDHTIP = "127.0.0.1" var testingDHTFirstPort = 21000 -// TestingCreateDHT initializes a testable DHT network with a specific number of nodes, with bootstrap and concurrent options. -func TestingCreateDHT(t *testing.T, numNodes int, bootstrap, concurrent bool) (*BootstrapNode, []*DHT) { +// TestingCreateNetwork initializes a testable DHT network with a specific number of nodes, with bootstrap and concurrent options. +func TestingCreateNetwork(t *testing.T, numNodes int, bootstrap, concurrent bool) (*BootstrapNode, []*DHT) { var bootstrapNode *BootstrapNode var seeds []string @@ -42,7 +42,11 @@ func TestingCreateDHT(t *testing.T, numNodes int, bootstrap, concurrent bool) (* dhts := make([]*DHT, numNodes) for i := 0; i < numNodes; i++ { - dht := New(&Config{Address: testingDHTIP + ":" + strconv.Itoa(firstPort+i), NodeID: bits.Rand().Hex(), SeedNodes: seeds}) + c := NewStandardConfig() + c.NodeID = bits.Rand().Hex() + c.Address = testingDHTIP + ":" + strconv.Itoa(firstPort+i) + c.SeedNodes = seeds + dht := New(c) go func() { err := dht.Start() diff --git a/prism/prism.go b/prism/prism.go index ab83a67..190fbee 100644 --- a/prism/prism.go +++ b/prism/prism.go @@ -141,8 +141,6 @@ func (p *Prism) Shutdown() { // AnnounceRange announces the `n`th interval of hashes, out of a total of `total` intervals func (p *Prism) AnnounceRange(n, total int) { - // TODO: if more than one node is announcing each hash, figure out how to deal with last_announced_at so both nodes dont announce the same thing at the same time - // num and total are 1-indexed if n < 1 { log.Errorf("%s: n must be >= 1", p.dht.ID().HexShort())