From 468eefee579b7006e805c39093be6d5f7793044d Mon Sep 17 00:00:00 2001 From: Leo Balduf Date: Sun, 1 May 2016 17:56:07 -0400 Subject: [PATCH 1/3] store: add ErrResourceDoesNotExist and update memory implementation --- server/store/ip_store.go | 20 ++++++++++++------ server/store/memory/ip_store.go | 27 ++++++++++++++++-------- server/store/memory/ip_store_test.go | 3 ++- server/store/memory/peer_store.go | 20 ++++++++++++++---- server/store/memory/string_store.go | 4 ++++ server/store/memory/string_store_test.go | 8 ++++--- server/store/peer_store.go | 14 ++++++++---- server/store/store.go | 4 ++++ server/store/string_store.go | 8 +++++++ tracker/tracker.go | 2 +- 10 files changed, 81 insertions(+), 29 deletions(-) diff --git a/server/store/ip_store.go b/server/store/ip_store.go index 3fea3a6..e02afb5 100644 --- a/server/store/ip_store.go +++ b/server/store/ip_store.go @@ -21,30 +21,36 @@ type IPStore interface { AddNetwork(network string) error // HasIP returns whether the given IP address is contained in the IPStore - // or belong to any of the stored networks. + // or belongs to any of the stored networks. HasIP(ip net.IP) (bool, error) - // HasAnyIP returns whether any of the given IP addresses are contained in - // the IPStore or belong to any of the stored networks. + // HasAnyIP returns whether any of the given IP addresses are contained + // in the IPStore or belongs to any of the stored networks. HasAnyIP(ips []net.IP) (bool, error) - // HassAllIPs returns whether all of the given IP addresses are contained in - // the IPStore or belong to any of the stored networks. + // HassAllIPs returns whether all of the given IP addresses are + // contained in the IPStore or belongs to any of the stored networks. HasAllIPs(ips []net.IP) (bool, error) // RemoveIP removes a single IP address from the IPStore. // // This wil not remove the given address from any networks it belongs to // that are stored in the IPStore. + // + // Returns ErrResourceDoesNotExist if the given IP address is not + // contained in the store. RemoveIP(ip net.IP) error - // RemoveNetwork removes a range of IP addresses that was previously added - // through AddNetwork. + // RemoveNetwork removes a range of IP addresses that was previously + // added through AddNetwork. // // The given network must not, as a string, match the previously added // network, but rather denote the same network, e.g. if the network // 192.168.22.255/24 was added, removing the network 192.168.22.123/24 // will succeed. + // + // Returns ErrResourceDoesNotExist if the given network is not + // contained in the store. RemoveNetwork(network string) error } diff --git a/server/store/memory/ip_store.go b/server/store/memory/ip_store.go index 77b0506..dc2f4b8 100644 --- a/server/store/memory/ip_store.go +++ b/server/store/memory/ip_store.go @@ -57,14 +57,14 @@ func key(ip net.IP) [16]byte { } func (s *ipStore) AddNetwork(network string) error { - s.Lock() - defer s.Unlock() - key, length, err := netmatch.ParseNetwork(network) if err != nil { return err } + s.Lock() + defer s.Unlock() + return s.networks.Add(key, length) } @@ -78,9 +78,9 @@ func (s *ipStore) AddIP(ip net.IP) error { } func (s *ipStore) HasIP(ip net.IP) (bool, error) { + key := key(ip) s.RLock() defer s.RUnlock() - key := key(ip) _, ok := s.ips[key] if ok { @@ -138,22 +138,31 @@ func (s *ipStore) HasAllIPs(ips []net.IP) (bool, error) { } func (s *ipStore) RemoveIP(ip net.IP) error { + key := key(ip) s.Lock() defer s.Unlock() - delete(s.ips, key(ip)) + if _, ok := s.ips[key]; !ok { + return store.ErrResourceDoesNotExist + } + + delete(s.ips, key) return nil } func (s *ipStore) RemoveNetwork(network string) error { - s.Lock() - defer s.Unlock() - key, length, err := netmatch.ParseNetwork(network) if err != nil { return err } - return s.networks.Remove(key, length) + s.Lock() + defer s.Unlock() + + err = s.networks.Remove(key, length) + if err != nil && err == netmatch.ErrNotContained { + return store.ErrResourceDoesNotExist + } + return err } diff --git a/server/store/memory/ip_store_test.go b/server/store/memory/ip_store_test.go index e6c83c2..80f7610 100644 --- a/server/store/memory/ip_store_test.go +++ b/server/store/memory/ip_store_test.go @@ -9,6 +9,7 @@ import ( "testing" "github.com/chihaya/chihaya/server/store" + "github.com/stretchr/testify/assert" ) @@ -64,7 +65,7 @@ func TestIPStore(t *testing.T) { // check removes err = s.RemoveIP(v6) - assert.Nil(t, err) + assert.NotNil(t, err) err = s.RemoveIP(v4s) assert.Nil(t, err) diff --git a/server/store/memory/peer_store.go b/server/store/memory/peer_store.go index 6638940..ef5257f 100644 --- a/server/store/memory/peer_store.go +++ b/server/store/memory/peer_store.go @@ -119,10 +119,16 @@ func (s *peerStore) DeleteSeeder(infoHash chihaya.InfoHash, p chihaya.Peer) erro defer shard.Unlock() if shard.peers[key] == nil { - return nil + return store.ErrResourceDoesNotExist } - delete(shard.peers[key], peerKey(p)) + pk := peerKey(p) + + if _, ok := shard.peers[key][pk]; !ok { + return store.ErrResourceDoesNotExist + } + + delete(shard.peers[key], pk) if len(shard.peers[key]) == 0 { delete(shard.peers, key) @@ -156,10 +162,16 @@ func (s *peerStore) DeleteLeecher(infoHash chihaya.InfoHash, p chihaya.Peer) err defer shard.Unlock() if shard.peers[key] == nil { - return nil + return store.ErrResourceDoesNotExist } - delete(shard.peers[key], peerKey(p)) + pk := peerKey(p) + + if _, ok := shard.peers[key][pk]; !ok { + return store.ErrResourceDoesNotExist + } + + delete(shard.peers[key], pk) if len(shard.peers[key]) == 0 { delete(shard.peers, key) diff --git a/server/store/memory/string_store.go b/server/store/memory/string_store.go index 2fbba18..85cc4d4 100644 --- a/server/store/memory/string_store.go +++ b/server/store/memory/string_store.go @@ -51,6 +51,10 @@ func (ss *stringStore) RemoveString(s string) error { ss.Lock() defer ss.Unlock() + if _, ok := ss.strings[s]; !ok { + return store.ErrResourceDoesNotExist + } + delete(ss.strings, s) return nil diff --git a/server/store/memory/string_store_test.go b/server/store/memory/string_store_test.go index 39fe802..32618e4 100644 --- a/server/store/memory/string_store_test.go +++ b/server/store/memory/string_store_test.go @@ -5,9 +5,11 @@ package memory import ( - "github.com/chihaya/chihaya/server/store" - "github.com/stretchr/testify/assert" "testing" + + "github.com/stretchr/testify/assert" + + "github.com/chihaya/chihaya/server/store" ) var ( @@ -30,7 +32,7 @@ func TestStringStore(t *testing.T) { assert.False(t, has) err = ss.RemoveString(s1) - assert.Nil(t, err) + assert.NotNil(t, err) err = ss.PutString(s1) assert.Nil(t, err) diff --git a/server/store/peer_store.go b/server/store/peer_store.go index 74289d0..1cb88b5 100644 --- a/server/store/peer_store.go +++ b/server/store/peer_store.go @@ -18,11 +18,17 @@ type PeerStore interface { // PutSeeder adds a seeder for the infoHash to the PeerStore. PutSeeder(infoHash chihaya.InfoHash, p chihaya.Peer) error // DeleteSeeder removes a seeder for the infoHash from the PeerStore. + // + // Returns ErrResourceDoesNotExist if the infoHash or peer does not + // exist. DeleteSeeder(infoHash chihaya.InfoHash, p chihaya.Peer) error // PutLeecher adds a leecher for the infoHash to the PeerStore. PutLeecher(infoHash chihaya.InfoHash, p chihaya.Peer) error // DeleteLeecher removes a leecher for the infoHash from the PeerStore. + // + // Returns ErrResourceDoesNotExist if the infoHash or peer does not + // exist. DeleteLeecher(infoHash chihaya.InfoHash, p chihaya.Peer) error // GraduateLeecher promotes a peer from a leecher to a seeder for the @@ -32,11 +38,11 @@ type PeerStore interface { // announce. // // If seeder is true then the peers returned will only be leechers, the - // ammount of leechers returned will be the smaller value of numWant or the - // available leechers. + // ammount of leechers returned will be the smaller value of numWant or + // the available leechers. // If it is false then seeders will be returned up until numWant or the - // available seeders, whichever is smaller. If the available seeders is less - // than numWant then peers are returned until numWant or they run out. + // available seeders, whichever is smaller. If the available seeders is + // less than numWant then peers are returned until numWant or they run out. AnnouncePeers(infoHash chihaya.InfoHash, seeder bool, numWant int, peer4, peer6 chihaya.Peer) (peers, peers6 []chihaya.Peer, err error) // CollectGarbage deletes peers from the peerStore which are older than the // cutoff time. diff --git a/server/store/store.go b/server/store/store.go index ff8a811..4625078 100644 --- a/server/store/store.go +++ b/server/store/store.go @@ -23,6 +23,10 @@ func init() { server.Register("store", constructor) } +// ErrResourceDoesNotExist is the error returned by all delete methods in the +// store if the requested resource does not exist. +var ErrResourceDoesNotExist = errors.New("resource does not exist") + func constructor(srvcfg *chihaya.ServerConfig, tkr *tracker.Tracker) (server.Server, error) { if theStore == nil { cfg, err := newConfig(srvcfg) diff --git a/server/store/string_store.go b/server/store/string_store.go index 8620053..cfaea49 100644 --- a/server/store/string_store.go +++ b/server/store/string_store.go @@ -10,8 +10,16 @@ var stringStoreDrivers = make(map[string]StringStoreDriver) // StringStore represents an interface for manipulating strings. type StringStore interface { + // PutString adds the given string to the StringStore. PutString(s string) error + + // HasString returns whether or not the StringStore contains the given + // string. HasString(s string) (bool, error) + + // RemoveString removes the string from the string store. + // Returns ErrResourceDoesNotExist if the given string is not contained + // in the store. RemoveString(s string) error } diff --git a/tracker/tracker.go b/tracker/tracker.go index c3406c9..a0b8391 100644 --- a/tracker/tracker.go +++ b/tracker/tracker.go @@ -8,8 +8,8 @@ package tracker import ( "errors" - "fmt" + "github.com/chihaya/chihaya" ) From 9f229c4ab6c55cb6d34558638e38df159eb62ff3 Mon Sep 17 00:00:00 2001 From: Leo Balduf Date: Mon, 18 Apr 2016 15:49:36 -0400 Subject: [PATCH 2/3] add helper methods to get IPv4 and IPv6 peer --- chihaya.go | 22 ++++++++++++++++++++ server/store/middleware/response/response.go | 6 ++---- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/chihaya.go b/chihaya.go index 8a7386b..f8e765e 100644 --- a/chihaya.go +++ b/chihaya.go @@ -34,6 +34,28 @@ type AnnounceRequest struct { Params Params } +// Peer4 returns a Peer using the IPv4 endpoint of the Announce. +// Note that, if the Announce does not contain an IPv4 address, the IP field of +// the returned Peer can be nil. +func (r *AnnounceRequest) Peer4() Peer { + return Peer{ + IP: r.IPv4, + Port: r.Port, + ID: r.PeerID, + } +} + +// Peer6 returns a Peer using the IPv6 endpoint of the Announce. +// Note that, if the Announce does not contain an IPv6 address, the IP field of +// the returned Peer can be nil. +func (r *AnnounceRequest) Peer6() Peer { + return Peer{ + IP: r.IPv6, + Port: r.Port, + ID: r.PeerID, + } +} + // AnnounceResponse represents the parameters used to create an announce // response. type AnnounceResponse struct { diff --git a/server/store/middleware/response/response.go b/server/store/middleware/response/response.go index 1970667..72d8225 100644 --- a/server/store/middleware/response/response.go +++ b/server/store/middleware/response/response.go @@ -27,16 +27,14 @@ func (f FailedToRetrievePeers) Error() string { return string(f) } func responseAnnounceClient(next tracker.AnnounceHandler) tracker.AnnounceHandler { return func(cfg *chihaya.TrackerConfig, req *chihaya.AnnounceRequest, resp *chihaya.AnnounceResponse) (err error) { storage := store.MustGetStore() - peer4 := chihaya.Peer{ID: req.PeerID, IP: req.IPv4, Port: req.Port} - peer6 := chihaya.Peer{ID: req.PeerID, IP: req.IPv6, Port: req.Port} resp.MinInterval = cfg.MinAnnounceInterval resp.Compact = req.Compact resp.Complete = int32(storage.NumSeeders(req.InfoHash)) resp.Incomplete = int32(storage.NumLeechers(req.InfoHash)) - resp.IPv4Peers, resp.IPv6Peers, err = storage.AnnouncePeers(req.InfoHash, req.Left == 0, int(req.NumWant), peer4, peer6) + resp.IPv4Peers, resp.IPv6Peers, err = storage.AnnouncePeers(req.InfoHash, req.Left == 0, int(req.NumWant), req.Peer4(), req.Peer6()) if err != nil { - return err.(FailedToRetrievePeers) + return FailedToRetrievePeers(err.Error()) } return next(cfg, req, resp) From aaf9978df376b3754a9c824f795349f8d89e1148 Mon Sep 17 00:00:00 2001 From: Leo Balduf Date: Mon, 18 Apr 2016 15:49:55 -0400 Subject: [PATCH 3/3] middleware: add swarm interaction --- cmd/chihaya/main.go | 1 + server/store/middleware/swarm/README.md | 12 ++++ server/store/middleware/swarm/swarm.go | 75 +++++++++++++++++++++++++ server/store/peer_store.go | 4 ++ 4 files changed, 92 insertions(+) create mode 100644 server/store/middleware/swarm/README.md create mode 100644 server/store/middleware/swarm/swarm.go diff --git a/cmd/chihaya/main.go b/cmd/chihaya/main.go index 4796d54..30e74f6 100644 --- a/cmd/chihaya/main.go +++ b/cmd/chihaya/main.go @@ -28,6 +28,7 @@ import ( _ "github.com/chihaya/chihaya/server/store/middleware/infohash" _ "github.com/chihaya/chihaya/server/store/middleware/ip" _ "github.com/chihaya/chihaya/server/store/middleware/response" + _ "github.com/chihaya/chihaya/server/store/middleware/swarm" ) var configPath string diff --git a/server/store/middleware/swarm/README.md b/server/store/middleware/swarm/README.md new file mode 100644 index 0000000..60444e7 --- /dev/null +++ b/server/store/middleware/swarm/README.md @@ -0,0 +1,12 @@ +## Swarm Interaction Middleware + +This package provides the announce middleware that modifies peer data stored in the `store` package. + +### `store_swarm_interaction` + +The `store_swarm_interaction` middleware updates the data stored in the `peerStore` based on the announce. + +### Important things to notice + +It is recommended to have this middleware run before the `store_response` middleware. +The `store_response` middleware assumes the store to be already updated by the announce. \ No newline at end of file diff --git a/server/store/middleware/swarm/swarm.go b/server/store/middleware/swarm/swarm.go new file mode 100644 index 0000000..87ff15b --- /dev/null +++ b/server/store/middleware/swarm/swarm.go @@ -0,0 +1,75 @@ +// Copyright 2016 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package response + +import ( + "github.com/chihaya/chihaya" + "github.com/chihaya/chihaya/pkg/event" + "github.com/chihaya/chihaya/server/store" + "github.com/chihaya/chihaya/tracker" +) + +func init() { + tracker.RegisterAnnounceMiddleware("store_swarm_interaction", announceSwarmInteraction) +} + +// FailedSwarmInteraction represents an error that indicates that the +// interaction of a peer with a swarm failed. +type FailedSwarmInteraction string + +// Error satisfies the error interface for FailedSwarmInteraction. +func (f FailedSwarmInteraction) Error() string { return string(f) } + +// announceSwarmInteraction provides a middleware that manages swarm +// interactions for a peer based on the announce. +func announceSwarmInteraction(next tracker.AnnounceHandler) tracker.AnnounceHandler { + return func(cfg *chihaya.TrackerConfig, req *chihaya.AnnounceRequest, resp *chihaya.AnnounceResponse) (err error) { + if req.IPv4 != nil { + err = updatePeerStore(req, req.Peer4()) + if err != nil { + return FailedSwarmInteraction(err.Error()) + } + } + + if req.IPv6 != nil { + err = updatePeerStore(req, req.Peer6()) + if err != nil { + return FailedSwarmInteraction(err.Error()) + } + } + + return next(cfg, req, resp) + } +} + +func updatePeerStore(req *chihaya.AnnounceRequest, peer chihaya.Peer) (err error) { + storage := store.MustGetStore() + + switch { + case req.Event == event.Stopped: + err = storage.DeleteSeeder(req.InfoHash, peer) + if err != nil && err != store.ErrResourceDoesNotExist { + return err + } + + err = storage.DeleteLeecher(req.InfoHash, peer) + if err != nil && err != store.ErrResourceDoesNotExist { + return err + } + + case req.Event == event.Completed || req.Left == 0: + err = storage.GraduateLeecher(req.InfoHash, peer) + if err != nil { + return err + } + default: + err = storage.PutLeecher(req.InfoHash, peer) + if err != nil { + return err + } + } + + return nil +} diff --git a/server/store/peer_store.go b/server/store/peer_store.go index 1cb88b5..aada1cd 100644 --- a/server/store/peer_store.go +++ b/server/store/peer_store.go @@ -33,7 +33,11 @@ type PeerStore interface { // GraduateLeecher promotes a peer from a leecher to a seeder for the // infoHash within the PeerStore. + // + // If the given Peer is not a leecher, it will still be added to the + // list of seeders and no error will be returned. GraduateLeecher(infoHash chihaya.InfoHash, p chihaya.Peer) error + // AnnouncePeers returns a list of both IPv4, and IPv6 peers for an // announce. //