diff --git a/config/config.go b/config/config.go index 1c10ba1..b636efc 100644 --- a/config/config.go +++ b/config/config.go @@ -63,6 +63,10 @@ type StatsConfig struct { MemUpdateInterval Duration `json:"mem_stats_interval"` } +type ShardConfig struct { + TorrentMapShards int `json:"torrent_map_shards"` +} + // Config is a configuration for a Server. type Config struct { Addr string `json:"addr"` @@ -85,6 +89,7 @@ type Config struct { StatsConfig NetConfig + ShardConfig } // DefaultConfig is a configuration that can be used as a fallback value. @@ -124,6 +129,10 @@ var DefaultConfig = Config{ RespectAF: false, }, + ShardConfig: ShardConfig{ + TorrentMapShards: 1, + }, + ClientWhitelistEnabled: false, } diff --git a/tracker/storage.go b/tracker/storage.go index b803fe9..8aa1522 100644 --- a/tracker/storage.go +++ b/tracker/storage.go @@ -5,38 +5,66 @@ package tracker import ( + "hash/fnv" "runtime" "sync" + "sync/atomic" "time" + "github.com/chihaya/chihaya/config" "github.com/chihaya/chihaya/stats" "github.com/chihaya/chihaya/tracker/models" ) +type Torrents struct { + torrents map[string]*models.Torrent + sync.RWMutex +} + type Storage struct { users map[string]*models.User usersM sync.RWMutex - torrents map[string]*models.Torrent - torrentsM sync.RWMutex + shards []Torrents + size int32 clients map[string]bool clientsM sync.RWMutex } -func NewStorage() *Storage { - return &Storage{ - users: make(map[string]*models.User), - torrents: make(map[string]*models.Torrent), - clients: make(map[string]bool), +func NewStorage(cfg *config.Config) *Storage { + s := &Storage{ + users: make(map[string]*models.User), + shards: make([]Torrents, cfg.ShardConfig.TorrentMapShards), + clients: make(map[string]bool), } + for i := range s.shards { + s.shards[i].torrents = make(map[string]*models.Torrent) + } + return s +} + +func (s *Storage) GetShardIndex(infohash string) uint32 { + idx := fnv.New32() + idx.Write([]byte(infohash)) + return idx.Sum32() % uint32(len(s.shards)) +} + +func (s *Storage) GetTorrentShard(infohash string, readonly bool) *Torrents { + shardindex := s.GetShardIndex(infohash) + if readonly { + s.shards[shardindex].RLock() + } else { + s.shards[shardindex].Lock() + } + return &s.shards[shardindex] } func (s *Storage) TouchTorrent(infohash string) error { - s.torrentsM.Lock() - defer s.torrentsM.Unlock() + shard := s.GetTorrentShard(infohash, false) + defer shard.Unlock() - torrent, exists := s.torrents[infohash] + torrent, exists := shard.torrents[infohash] if !exists { return models.ErrTorrentDNE } @@ -47,10 +75,10 @@ func (s *Storage) TouchTorrent(infohash string) error { } func (s *Storage) FindTorrent(infohash string) (*models.Torrent, error) { - s.torrentsM.RLock() - defer s.torrentsM.RUnlock() + shard := s.GetTorrentShard(infohash, true) + defer shard.RUnlock() - torrent, exists := s.torrents[infohash] + torrent, exists := shard.torrents[infohash] if !exists { return nil, models.ErrTorrentDNE } @@ -59,24 +87,29 @@ func (s *Storage) FindTorrent(infohash string) (*models.Torrent, error) { } func (s *Storage) PutTorrent(torrent *models.Torrent) { - s.torrentsM.Lock() - defer s.torrentsM.Unlock() + shard := s.GetTorrentShard(torrent.Infohash, false) + defer shard.Unlock() - s.torrents[torrent.Infohash] = &*torrent + _, exists := shard.torrents[torrent.Infohash] + if !exists { + atomic.AddInt32(&s.size, 1) + } + shard.torrents[torrent.Infohash] = &*torrent } func (s *Storage) DeleteTorrent(infohash string) { - s.torrentsM.Lock() - defer s.torrentsM.Unlock() + shard := s.GetTorrentShard(infohash, false) + defer shard.Unlock() - delete(s.torrents, infohash) + atomic.AddInt32(&s.size, -1) + delete(shard.torrents, infohash) } func (s *Storage) IncrementTorrentSnatches(infohash string) error { - s.torrentsM.Lock() - defer s.torrentsM.Unlock() + shard := s.GetTorrentShard(infohash, false) + defer shard.Unlock() - torrent, exists := s.torrents[infohash] + torrent, exists := shard.torrents[infohash] if !exists { return models.ErrTorrentDNE } @@ -87,10 +120,10 @@ func (s *Storage) IncrementTorrentSnatches(infohash string) error { } func (s *Storage) PutLeecher(infohash string, p *models.Peer) error { - s.torrentsM.Lock() - defer s.torrentsM.Unlock() + shard := s.GetTorrentShard(infohash, false) + defer shard.Unlock() - torrent, exists := s.torrents[infohash] + torrent, exists := shard.torrents[infohash] if !exists { return models.ErrTorrentDNE } @@ -101,10 +134,10 @@ func (s *Storage) PutLeecher(infohash string, p *models.Peer) error { } func (s *Storage) DeleteLeecher(infohash string, p *models.Peer) error { - s.torrentsM.Lock() - defer s.torrentsM.Unlock() + shard := s.GetTorrentShard(infohash, false) + defer shard.Unlock() - torrent, exists := s.torrents[infohash] + torrent, exists := shard.torrents[infohash] if !exists { return models.ErrTorrentDNE } @@ -115,10 +148,10 @@ func (s *Storage) DeleteLeecher(infohash string, p *models.Peer) error { } func (s *Storage) PutSeeder(infohash string, p *models.Peer) error { - s.torrentsM.Lock() - defer s.torrentsM.Unlock() + shard := s.GetTorrentShard(infohash, false) + defer shard.Unlock() - torrent, exists := s.torrents[infohash] + torrent, exists := shard.torrents[infohash] if !exists { return models.ErrTorrentDNE } @@ -129,10 +162,10 @@ func (s *Storage) PutSeeder(infohash string, p *models.Peer) error { } func (s *Storage) DeleteSeeder(infohash string, p *models.Peer) error { - s.torrentsM.Lock() - defer s.torrentsM.Unlock() + shard := s.GetTorrentShard(infohash, false) + defer shard.Unlock() - torrent, exists := s.torrents[infohash] + torrent, exists := shard.torrents[infohash] if !exists { return models.ErrTorrentDNE } @@ -143,16 +176,16 @@ func (s *Storage) DeleteSeeder(infohash string, p *models.Peer) error { } func (s *Storage) PurgeInactiveTorrent(infohash string) error { - s.torrentsM.Lock() - defer s.torrentsM.Unlock() + shard := s.GetTorrentShard(infohash, false) + defer shard.Unlock() - torrent, exists := s.torrents[infohash] + torrent, exists := shard.torrents[infohash] if !exists { return models.ErrTorrentDNE } if torrent.PeerCount() == 0 { - delete(s.torrents, infohash) + delete(shard.torrents, infohash) } return nil @@ -162,26 +195,34 @@ func (s *Storage) PurgeInactivePeers(purgeEmptyTorrents bool, before time.Time) unixtime := before.Unix() // Build a list of keys to process. - s.torrentsM.RLock() index := 0 - keys := make([]string, len(s.torrents)) - - for infohash := range s.torrents { - keys[index] = infohash - index++ + maxkeys := int(atomic.LoadInt32(&s.size)) + keys := make([]string, maxkeys) + for i := range s.shards { + shard := &s.shards[i] + shard.RLock() + for infohash := range shard.torrents { + keys[index] = infohash + index++ + if index >= maxkeys { + break + } + } + shard.RUnlock() + if index >= maxkeys { + break + } } - s.torrentsM.RUnlock() // Process the keys while allowing other goroutines to run. for _, infohash := range keys { runtime.Gosched() - - s.torrentsM.Lock() - torrent := s.torrents[infohash] + shard := s.GetTorrentShard(infohash, false) + torrent := shard.torrents[infohash] if torrent == nil { // The torrent has already been deleted since keys were computed. - s.torrentsM.Unlock() + shard.Unlock() continue } @@ -189,7 +230,7 @@ func (s *Storage) PurgeInactivePeers(purgeEmptyTorrents bool, before time.Time) torrent.Leechers.Purge(unixtime) peers := torrent.PeerCount() - s.torrentsM.Unlock() + shard.Unlock() if purgeEmptyTorrents && peers == 0 { s.PurgeInactiveTorrent(infohash) diff --git a/tracker/tracker.go b/tracker/tracker.go index 6f11c65..e08e2c1 100644 --- a/tracker/tracker.go +++ b/tracker/tracker.go @@ -35,7 +35,7 @@ func New(cfg *config.Config) (*Tracker, error) { tkr := &Tracker{ Config: cfg, Backend: bc, - Storage: NewStorage(), + Storage: NewStorage(cfg), } go tkr.purgeInactivePeers(