mirror of
https://github.com/LBRYFoundation/tracker.git
synced 2025-08-23 17:47:29 +00:00
Reduce contention on the torrent map
We see the torrent map have quite a bit of contention. Shard the map by a configurable number of shards to reduce the times we may contend on the lock.
This commit is contained in:
parent
b910fdabf5
commit
ad9034da6d
3 changed files with 101 additions and 51 deletions
|
@ -63,6 +63,10 @@ type StatsConfig struct {
|
||||||
MemUpdateInterval Duration `json:"mem_stats_interval"`
|
MemUpdateInterval Duration `json:"mem_stats_interval"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ShardConfig struct {
|
||||||
|
TorrentMapShards int `json:"torrent_map_shards"`
|
||||||
|
}
|
||||||
|
|
||||||
// Config is a configuration for a Server.
|
// Config is a configuration for a Server.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Addr string `json:"addr"`
|
Addr string `json:"addr"`
|
||||||
|
@ -85,6 +89,7 @@ type Config struct {
|
||||||
|
|
||||||
StatsConfig
|
StatsConfig
|
||||||
NetConfig
|
NetConfig
|
||||||
|
ShardConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultConfig is a configuration that can be used as a fallback value.
|
// DefaultConfig is a configuration that can be used as a fallback value.
|
||||||
|
@ -124,6 +129,10 @@ var DefaultConfig = Config{
|
||||||
RespectAF: false,
|
RespectAF: false,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
ShardConfig: ShardConfig{
|
||||||
|
TorrentMapShards: 1,
|
||||||
|
},
|
||||||
|
|
||||||
ClientWhitelistEnabled: false,
|
ClientWhitelistEnabled: false,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,38 +5,66 @@
|
||||||
package tracker
|
package tracker
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"hash/fnv"
|
||||||
"runtime"
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/chihaya/chihaya/config"
|
||||||
"github.com/chihaya/chihaya/stats"
|
"github.com/chihaya/chihaya/stats"
|
||||||
"github.com/chihaya/chihaya/tracker/models"
|
"github.com/chihaya/chihaya/tracker/models"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type Torrents struct {
|
||||||
|
torrents map[string]*models.Torrent
|
||||||
|
sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
type Storage struct {
|
type Storage struct {
|
||||||
users map[string]*models.User
|
users map[string]*models.User
|
||||||
usersM sync.RWMutex
|
usersM sync.RWMutex
|
||||||
|
|
||||||
torrents map[string]*models.Torrent
|
shards []Torrents
|
||||||
torrentsM sync.RWMutex
|
size int32
|
||||||
|
|
||||||
clients map[string]bool
|
clients map[string]bool
|
||||||
clientsM sync.RWMutex
|
clientsM sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewStorage() *Storage {
|
func NewStorage(cfg *config.Config) *Storage {
|
||||||
return &Storage{
|
s := &Storage{
|
||||||
users: make(map[string]*models.User),
|
users: make(map[string]*models.User),
|
||||||
torrents: make(map[string]*models.Torrent),
|
shards: make([]Torrents, cfg.ShardConfig.TorrentMapShards),
|
||||||
clients: make(map[string]bool),
|
clients: make(map[string]bool),
|
||||||
}
|
}
|
||||||
|
for i := range s.shards {
|
||||||
|
s.shards[i].torrents = make(map[string]*models.Torrent)
|
||||||
|
}
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Storage) GetShardIndex(infohash string) uint32 {
|
||||||
|
idx := fnv.New32()
|
||||||
|
idx.Write([]byte(infohash))
|
||||||
|
return idx.Sum32() % uint32(len(s.shards))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Storage) GetTorrentShard(infohash string, readonly bool) *Torrents {
|
||||||
|
shardindex := s.GetShardIndex(infohash)
|
||||||
|
if readonly {
|
||||||
|
s.shards[shardindex].RLock()
|
||||||
|
} else {
|
||||||
|
s.shards[shardindex].Lock()
|
||||||
|
}
|
||||||
|
return &s.shards[shardindex]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Storage) TouchTorrent(infohash string) error {
|
func (s *Storage) TouchTorrent(infohash string) error {
|
||||||
s.torrentsM.Lock()
|
shard := s.GetTorrentShard(infohash, false)
|
||||||
defer s.torrentsM.Unlock()
|
defer shard.Unlock()
|
||||||
|
|
||||||
torrent, exists := s.torrents[infohash]
|
torrent, exists := shard.torrents[infohash]
|
||||||
if !exists {
|
if !exists {
|
||||||
return models.ErrTorrentDNE
|
return models.ErrTorrentDNE
|
||||||
}
|
}
|
||||||
|
@ -47,10 +75,10 @@ func (s *Storage) TouchTorrent(infohash string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Storage) FindTorrent(infohash string) (*models.Torrent, error) {
|
func (s *Storage) FindTorrent(infohash string) (*models.Torrent, error) {
|
||||||
s.torrentsM.RLock()
|
shard := s.GetTorrentShard(infohash, true)
|
||||||
defer s.torrentsM.RUnlock()
|
defer shard.RUnlock()
|
||||||
|
|
||||||
torrent, exists := s.torrents[infohash]
|
torrent, exists := shard.torrents[infohash]
|
||||||
if !exists {
|
if !exists {
|
||||||
return nil, models.ErrTorrentDNE
|
return nil, models.ErrTorrentDNE
|
||||||
}
|
}
|
||||||
|
@ -59,24 +87,29 @@ func (s *Storage) FindTorrent(infohash string) (*models.Torrent, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Storage) PutTorrent(torrent *models.Torrent) {
|
func (s *Storage) PutTorrent(torrent *models.Torrent) {
|
||||||
s.torrentsM.Lock()
|
shard := s.GetTorrentShard(torrent.Infohash, false)
|
||||||
defer s.torrentsM.Unlock()
|
defer shard.Unlock()
|
||||||
|
|
||||||
s.torrents[torrent.Infohash] = &*torrent
|
_, exists := shard.torrents[torrent.Infohash]
|
||||||
|
if !exists {
|
||||||
|
atomic.AddInt32(&s.size, 1)
|
||||||
|
}
|
||||||
|
shard.torrents[torrent.Infohash] = &*torrent
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Storage) DeleteTorrent(infohash string) {
|
func (s *Storage) DeleteTorrent(infohash string) {
|
||||||
s.torrentsM.Lock()
|
shard := s.GetTorrentShard(infohash, false)
|
||||||
defer s.torrentsM.Unlock()
|
defer shard.Unlock()
|
||||||
|
|
||||||
delete(s.torrents, infohash)
|
atomic.AddInt32(&s.size, -1)
|
||||||
|
delete(shard.torrents, infohash)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Storage) IncrementTorrentSnatches(infohash string) error {
|
func (s *Storage) IncrementTorrentSnatches(infohash string) error {
|
||||||
s.torrentsM.Lock()
|
shard := s.GetTorrentShard(infohash, false)
|
||||||
defer s.torrentsM.Unlock()
|
defer shard.Unlock()
|
||||||
|
|
||||||
torrent, exists := s.torrents[infohash]
|
torrent, exists := shard.torrents[infohash]
|
||||||
if !exists {
|
if !exists {
|
||||||
return models.ErrTorrentDNE
|
return models.ErrTorrentDNE
|
||||||
}
|
}
|
||||||
|
@ -87,10 +120,10 @@ func (s *Storage) IncrementTorrentSnatches(infohash string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Storage) PutLeecher(infohash string, p *models.Peer) error {
|
func (s *Storage) PutLeecher(infohash string, p *models.Peer) error {
|
||||||
s.torrentsM.Lock()
|
shard := s.GetTorrentShard(infohash, false)
|
||||||
defer s.torrentsM.Unlock()
|
defer shard.Unlock()
|
||||||
|
|
||||||
torrent, exists := s.torrents[infohash]
|
torrent, exists := shard.torrents[infohash]
|
||||||
if !exists {
|
if !exists {
|
||||||
return models.ErrTorrentDNE
|
return models.ErrTorrentDNE
|
||||||
}
|
}
|
||||||
|
@ -101,10 +134,10 @@ func (s *Storage) PutLeecher(infohash string, p *models.Peer) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Storage) DeleteLeecher(infohash string, p *models.Peer) error {
|
func (s *Storage) DeleteLeecher(infohash string, p *models.Peer) error {
|
||||||
s.torrentsM.Lock()
|
shard := s.GetTorrentShard(infohash, false)
|
||||||
defer s.torrentsM.Unlock()
|
defer shard.Unlock()
|
||||||
|
|
||||||
torrent, exists := s.torrents[infohash]
|
torrent, exists := shard.torrents[infohash]
|
||||||
if !exists {
|
if !exists {
|
||||||
return models.ErrTorrentDNE
|
return models.ErrTorrentDNE
|
||||||
}
|
}
|
||||||
|
@ -115,10 +148,10 @@ func (s *Storage) DeleteLeecher(infohash string, p *models.Peer) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Storage) PutSeeder(infohash string, p *models.Peer) error {
|
func (s *Storage) PutSeeder(infohash string, p *models.Peer) error {
|
||||||
s.torrentsM.Lock()
|
shard := s.GetTorrentShard(infohash, false)
|
||||||
defer s.torrentsM.Unlock()
|
defer shard.Unlock()
|
||||||
|
|
||||||
torrent, exists := s.torrents[infohash]
|
torrent, exists := shard.torrents[infohash]
|
||||||
if !exists {
|
if !exists {
|
||||||
return models.ErrTorrentDNE
|
return models.ErrTorrentDNE
|
||||||
}
|
}
|
||||||
|
@ -129,10 +162,10 @@ func (s *Storage) PutSeeder(infohash string, p *models.Peer) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Storage) DeleteSeeder(infohash string, p *models.Peer) error {
|
func (s *Storage) DeleteSeeder(infohash string, p *models.Peer) error {
|
||||||
s.torrentsM.Lock()
|
shard := s.GetTorrentShard(infohash, false)
|
||||||
defer s.torrentsM.Unlock()
|
defer shard.Unlock()
|
||||||
|
|
||||||
torrent, exists := s.torrents[infohash]
|
torrent, exists := shard.torrents[infohash]
|
||||||
if !exists {
|
if !exists {
|
||||||
return models.ErrTorrentDNE
|
return models.ErrTorrentDNE
|
||||||
}
|
}
|
||||||
|
@ -143,16 +176,16 @@ func (s *Storage) DeleteSeeder(infohash string, p *models.Peer) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Storage) PurgeInactiveTorrent(infohash string) error {
|
func (s *Storage) PurgeInactiveTorrent(infohash string) error {
|
||||||
s.torrentsM.Lock()
|
shard := s.GetTorrentShard(infohash, false)
|
||||||
defer s.torrentsM.Unlock()
|
defer shard.Unlock()
|
||||||
|
|
||||||
torrent, exists := s.torrents[infohash]
|
torrent, exists := shard.torrents[infohash]
|
||||||
if !exists {
|
if !exists {
|
||||||
return models.ErrTorrentDNE
|
return models.ErrTorrentDNE
|
||||||
}
|
}
|
||||||
|
|
||||||
if torrent.PeerCount() == 0 {
|
if torrent.PeerCount() == 0 {
|
||||||
delete(s.torrents, infohash)
|
delete(shard.torrents, infohash)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -162,26 +195,34 @@ func (s *Storage) PurgeInactivePeers(purgeEmptyTorrents bool, before time.Time)
|
||||||
unixtime := before.Unix()
|
unixtime := before.Unix()
|
||||||
|
|
||||||
// Build a list of keys to process.
|
// Build a list of keys to process.
|
||||||
s.torrentsM.RLock()
|
|
||||||
index := 0
|
index := 0
|
||||||
keys := make([]string, len(s.torrents))
|
maxkeys := int(atomic.LoadInt32(&s.size))
|
||||||
|
keys := make([]string, maxkeys)
|
||||||
for infohash := range s.torrents {
|
for i := range s.shards {
|
||||||
|
shard := &s.shards[i]
|
||||||
|
shard.RLock()
|
||||||
|
for infohash := range shard.torrents {
|
||||||
keys[index] = infohash
|
keys[index] = infohash
|
||||||
index++
|
index++
|
||||||
|
if index >= maxkeys {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
shard.RUnlock()
|
||||||
|
if index >= maxkeys {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
s.torrentsM.RUnlock()
|
|
||||||
|
|
||||||
// Process the keys while allowing other goroutines to run.
|
// Process the keys while allowing other goroutines to run.
|
||||||
for _, infohash := range keys {
|
for _, infohash := range keys {
|
||||||
runtime.Gosched()
|
runtime.Gosched()
|
||||||
|
shard := s.GetTorrentShard(infohash, false)
|
||||||
s.torrentsM.Lock()
|
torrent := shard.torrents[infohash]
|
||||||
torrent := s.torrents[infohash]
|
|
||||||
|
|
||||||
if torrent == nil {
|
if torrent == nil {
|
||||||
// The torrent has already been deleted since keys were computed.
|
// The torrent has already been deleted since keys were computed.
|
||||||
s.torrentsM.Unlock()
|
shard.Unlock()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -189,7 +230,7 @@ func (s *Storage) PurgeInactivePeers(purgeEmptyTorrents bool, before time.Time)
|
||||||
torrent.Leechers.Purge(unixtime)
|
torrent.Leechers.Purge(unixtime)
|
||||||
|
|
||||||
peers := torrent.PeerCount()
|
peers := torrent.PeerCount()
|
||||||
s.torrentsM.Unlock()
|
shard.Unlock()
|
||||||
|
|
||||||
if purgeEmptyTorrents && peers == 0 {
|
if purgeEmptyTorrents && peers == 0 {
|
||||||
s.PurgeInactiveTorrent(infohash)
|
s.PurgeInactiveTorrent(infohash)
|
||||||
|
|
|
@ -35,7 +35,7 @@ func New(cfg *config.Config) (*Tracker, error) {
|
||||||
tkr := &Tracker{
|
tkr := &Tracker{
|
||||||
Config: cfg,
|
Config: cfg,
|
||||||
Backend: bc,
|
Backend: bc,
|
||||||
Storage: NewStorage(),
|
Storage: NewStorage(cfg),
|
||||||
}
|
}
|
||||||
|
|
||||||
go tkr.purgeInactivePeers(
|
go tkr.purgeInactivePeers(
|
||||||
|
|
Loading…
Add table
Reference in a new issue