Storage API matured, docs, tests & more

This commit is contained in:
Jimmy Zelinskie 2013-07-05 06:50:52 -04:00
parent 279c78192f
commit 5cb4e2fabb
11 changed files with 199 additions and 157 deletions

View file

@ -2,12 +2,11 @@
chihaya is a high-performance [BitTorrent tracker](http://en.wikipedia.org/wiki/BitTorrent_tracker) written in the Go programming language. It isn't quite ready for prime-time just yet, but these are the features that it targets: chihaya is a high-performance [BitTorrent tracker](http://en.wikipedia.org/wiki/BitTorrent_tracker) written in the Go programming language. It isn't quite ready for prime-time just yet, but these are the features that it targets:
- Requests are multiplexed over all available threads - Requests are multiplexed over all available threads (1 goroutine per request)
- Low processing and memory footprint - Low processing and memory footprint
- IPv6 support - IPv6 support
- Generic storage interface that can be easily adapted to use any data store - Generic storage interface that can be easily adapted to use any data store
- Scaling properties that directly correlate with the chosen data store's scaling properties - Scaling properties that directly correlate with the chosen data store's scaling properties
- Redis data storage driver
##installing ##installing
@ -18,14 +17,21 @@ $ go install github.com/pushrax/chihaya
##configuring ##configuring
Configuration is done in a JSON formatted file specified with the `-config` flag. One can start with [`example/config.json`](https://github.com/pushrax/chihaya/blob/master/example/config.json) as a base. Configuration is done in a JSON formatted file specified with the `-config` flag. An example configuration can be seen in the `exampleConfig` variable of [`config/config_test.go`](https://github.com/pushrax/chihaya/blob/master/config/config_test.go).
##out of the box drivers
Chihaya currently supports the following drivers out of the box:
* [redis](http://redis.io)
##implementing custom storage ##implementing custom storage
The [`storage`](http://godoc.org/github.com/pushrax/chihaya/storage) package works similar to the standard library's [`database/sql`](http://godoc.org/database/sql) package. To write a new storage backend, create a new Go package that has an implementation of both the [`Conn`](http://godoc.org/github.com/pushrax/chihaya/storage#Conn) and the [`Driver`](http://godoc.org/github.com/pushrax/chihaya/storage#Driver) interfaces. Within your package define an [`init()`](http://golang.org/ref/spec#Program_execution) function that calls [`storage.Register(driverName, &yourDriver{})`](http://godoc.org/github.com/pushrax/chihaya/storage#Register). Your driver **must** be thread-safe. After that, all you have to do is remember to add `import _ path/to/your/library` to the top of `main.go` and now config files will recognize your driver by name. If you're writing a driver for a popular data store, consider contributing it. The [`storage`](http://godoc.org/github.com/pushrax/chihaya/storage) package is heavily inspired by the standard library's [`database/sql`](http://godoc.org/database/sql) package. To write a new storage backend, create a new Go package that has an implementation of the [`DS`](http://godoc.org/github.com/pushrax/chihaya/storage#DS), [`Tx`](http://godoc.org/github.com/pushrax/chihaya/storage#Tx), and [`Driver`](http://godoc.org/github.com/pushrax/chihaya/storage#Driver) interfaces. Within that package, you must also define an [`func init()`](http://golang.org/ref/spec#Program_execution) that calls [`storage.Register("driverName", &myDriver{})`](http://godoc.org/github.com/pushrax/chihaya/storage#Register). Please read the documentation and understand these interfaces as there are assumptions about thread-safety. After you've implemented a new driver, all you have to do is remember to add `import _ path/to/your/library` to the top of any file (preferably `main.go`) and the side effects from `func init()` will globally register your driver so that config files will recognize your driver by name. If you're writing a driver for a popular data store, consider contributing it.
##contributing ##contributing
If you're interested in contributing, please contact us in **#chihaya on [freenode](http://freenode.net/)**([webchat](http://webchat.freenode.net?channels=chihaya)) or post to the issue tracker. Please don't offer massive pull requests with no prior communication attempts (unannounced small changes are fine), as it will most likely lead to confusion and time wasted for everyone. And remember: good gophers always use gofmt! If you're interested in contributing, please contact us in **#chihaya on [freenode](http://freenode.net/)**([webchat](http://webchat.freenode.net?channels=chihaya)) or post to the issue tracker. Please don't offer massive pull requests with no prior communication attempts as it will most likely lead to confusion and time wasted for everyone. However, small unannounced fixes are welcome.
And remember: good gophers always use gofmt!

View file

@ -7,6 +7,7 @@ package config
import ( import (
"encoding/json" "encoding/json"
"io"
"os" "os"
"time" "time"
) )
@ -26,11 +27,7 @@ func (d *Duration) UnmarshalJSON(b []byte) error {
return err return err
} }
type Client struct { // Storage represents the configuration for any storage.DS.
Name string `json:"name"`
PeerID string `json:"peer_id"`
}
type Storage struct { type Storage struct {
Driver string `json:"driver"` Driver string `json:"driver"`
Network string `json:"network` Network string `json:"network`
@ -41,11 +38,12 @@ type Storage struct {
Encoding string `json:"encoding,omitempty"` Encoding string `json:"encoding,omitempty"`
Prefix string `json:"prefix,omitempty"` Prefix string `json:"prefix,omitempty"`
ConnectTimeout *Duration `json:"conn_timeout,omitempty"` MaxIdleConn int `json:"max_idle_conn"`
ReadTimeout *Duration `json:"read_timeout,omitempty"` IdleTimeout *Duration `json:"idle_timeout"`
WriteTimeout *Duration `json:"write_timeout,omitempty"` ConnTimeout *Duration `json:"conn_timeout"`
} }
// Config represents a configuration for a server.Server.
type Config struct { type Config struct {
Addr string `json:"addr"` Addr string `json:"addr"`
Storage Storage `json:"storage"` Storage Storage `json:"storage"`
@ -57,11 +55,11 @@ type Config struct {
MinAnnounce Duration `json:"min_announce"` MinAnnounce Duration `json:"min_announce"`
ReadTimeout Duration `json:"read_timeout"` ReadTimeout Duration `json:"read_timeout"`
DefaultNumWant int `json:"default_num_want"` DefaultNumWant int `json:"default_num_want"`
Whitelist []Client `json:"whitelist"`
} }
func New(path string) (*Config, error) { // Open is a shortcut to open a file, read it, and generate a Config.
// It supports relative and absolute paths.
func Open(path string) (*Config, error) {
expandedPath := os.ExpandEnv(path) expandedPath := os.ExpandEnv(path)
f, err := os.Open(expandedPath) f, err := os.Open(expandedPath)
if err != nil { if err != nil {
@ -69,29 +67,19 @@ func New(path string) (*Config, error) {
} }
defer f.Close() defer f.Close()
conf := &Config{} conf, err := New(f)
err = json.NewDecoder(f).Decode(conf)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return conf, nil return conf, nil
} }
func (c *Config) ClientWhitelisted(peerID string) (matched bool) { // New decodes JSON from a Reader into a Config.
for _, client := range c.Whitelist { func New(raw io.Reader) (*Config, error) {
length := len(client.PeerID) conf := &Config{}
if length <= len(peerID) { err := json.NewDecoder(raw).Decode(conf)
matched = true if err != nil {
for i := 0; i < length; i++ { return nil, err
if peerID[i] != client.PeerID[i] {
matched = false
break
}
}
if matched {
return true
}
}
} }
return false return conf, nil
} }

42
config/config_test.go Normal file
View file

@ -0,0 +1,42 @@
// Copyright 2013 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package config
import (
"strings"
"testing"
)
var exampleConfig = `{
"network": "tcp",
"addr": ":34000",
"storage": {
"driver": "redis",
"addr": "127.0.0.1:6379",
"user": "root",
"pass": "",
"prefix": "test:",
"max_idle_conn": 3,
"idle_timeout": "240s",
"conn_timeout": "5s"
},
"private": true,
"freeleech": false,
"announce": "30m",
"min_announce": "15m",
"read_timeout": "20s",
"default_num_want": 50
}`
func TestNew(t *testing.T) {
if _, err := New(strings.NewReader(exampleConfig)); err != nil {
t.Error(err)
}
}

View file

@ -1,31 +0,0 @@
{
"network": "tcp",
"addr": ":34000",
"storage": {
"driver": "redis",
"addr": "127.0.0.1:6379",
"user": "root",
"pass": "",
"prefix": "test:",
"conn_timeout": "5s",
"read_timeout": "5s",
"write_timeout": "5s"
},
"private": true,
"freeleech": false,
"announce": "30m",
"min_announce": "15m",
"read_timeout": "20s",
"default_num_want": 50,
"whitelist": [
{ "name": "Azureus 2.5.x", "peer_id": "-AZ25" },
{ "name": "Azureus 3.0.x", "peer_id": "-AZ30" },
{ "name": "btgdaemon 0.9", "peer_id": "-BG09" }
]
}

View file

@ -43,7 +43,7 @@ func main() {
if configPath == "" { if configPath == "" {
log.Fatalf("Must specify a configuration file") log.Fatalf("Must specify a configuration file")
} }
conf, err := config.New(configPath) conf, err := config.Open(configPath)
if err != nil { if err != nil {
log.Fatalf("Failed to parse configuration file: %s\n", err) log.Fatalf("Failed to parse configuration file: %s\n", err)
} }

View file

@ -14,11 +14,8 @@ import (
) )
func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request) {
conn := s.connPool.Get()
defer conn.Close()
passkey, _ := path.Split(r.URL.Path) passkey, _ := path.Split(r.URL.Path)
_, err := validatePasskey(passkey, conn) _, err := s.validatePasskey(passkey)
if err != nil { if err != nil {
fail(err, w, r) fail(err, w, r)
return return
@ -42,12 +39,16 @@ func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request) {
return return
} }
if !s.conf.ClientWhitelisted(pq.params["peer_id"]) { ok, err := s.dataStore.ClientWhitelisted(pq.params["peer_id"])
if err != nil {
log.Panicf("server: %s", err)
}
if !ok {
fail(errors.New("Your client is not approved"), w, r) fail(errors.New("Your client is not approved"), w, r)
return return
} }
torrent, exists, err := conn.FindTorrent(pq.params["infohash"]) torrent, exists, err := s.dataStore.FindTorrent(pq.params["infohash"])
if err != nil { if err != nil {
log.Panicf("server: %s", err) log.Panicf("server: %s", err)
} }
@ -56,7 +57,7 @@ func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request) {
return return
} }
tx, err := conn.NewTx() tx, err := s.dataStore.Begin()
if err != nil { if err != nil {
log.Panicf("server: %s", err) log.Panicf("server: %s", err)
} }

View file

@ -6,7 +6,6 @@ package server
import ( import (
"errors" "errors"
"github.com/pushrax/chihaya/config"
"net/http" "net/http"
"net/url" "net/url"
"strconv" "strconv"

View file

@ -19,7 +19,7 @@ import (
func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request) { func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request) {
passkey, _ := path.Split(r.URL.Path) passkey, _ := path.Split(r.URL.Path)
_, err := validatePasskey(passkey, s.storage) _, err := s.validatePasskey(passkey)
if err != nil { if err != nil {
fail(err, w, r) fail(err, w, r)
return return
@ -35,7 +35,7 @@ func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request) {
bencode(w, "files") bencode(w, "files")
if pq.infohashes != nil { if pq.infohashes != nil {
for _, infohash := range pq.infohashes { for _, infohash := range pq.infohashes {
torrent, exists, err := s.storage.FindTorrent(infohash) torrent, exists, err := s.dataStore.FindTorrent(infohash)
if err != nil { if err != nil {
log.Panicf("server: %s", err) log.Panicf("server: %s", err)
} }
@ -45,7 +45,7 @@ func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request) {
} }
} }
} else if infohash, exists := pq.params["info_hash"]; exists { } else if infohash, exists := pq.params["info_hash"]; exists {
torrent, exists, err := s.storage.FindTorrent(infohash) torrent, exists, err := s.dataStore.FindTorrent(infohash)
if err != nil { if err != nil {
log.Panicf("server: %s", err) log.Panicf("server: %s", err)
} }

View file

@ -23,9 +23,9 @@ import (
) )
type Server struct { type Server struct {
conf *config.Config conf *config.Config
listener net.Listener listener net.Listener
connPool storage.Pool dataStore storage.DS
serving bool serving bool
startTime time.Time startTime time.Time
@ -39,14 +39,14 @@ type Server struct {
} }
func New(conf *config.Config) (*Server, error) { func New(conf *config.Config) (*Server, error) {
pool, err := storage.Open(&conf.Storage) ds, err := storage.Open(&conf.Storage)
if err != nil { if err != nil {
return nil, err return nil, err
} }
s := &Server{ s := &Server{
conf: conf, conf: conf,
storage: pool, dataStore: ds,
Server: http.Server{ Server: http.Server{
Addr: conf.Addr, Addr: conf.Addr,
ReadTimeout: conf.ReadTimeout.Duration, ReadTimeout: conf.ReadTimeout.Duration,
@ -76,7 +76,7 @@ func (s *Server) ListenAndServe() error {
func (s *Server) Stop() error { func (s *Server) Stop() error {
s.serving = false s.serving = false
s.waitgroup.Wait() s.waitgroup.Wait()
err := s.storage.Close() err := s.dataStore.Close()
if err != nil { if err != nil {
return err return err
} }
@ -129,13 +129,13 @@ func fail(err error, w http.ResponseWriter, r *http.Request) {
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
} }
func validatePasskey(dir string, s storage.Conn) (*storage.User, error) { func (s *Server) validatePasskey(dir string) (*storage.User, error) {
if len(dir) != 34 { if len(dir) != 34 {
return nil, errors.New("Your passkey is invalid") return nil, errors.New("Passkey is invalid")
} }
passkey := dir[1:33] passkey := dir[1:33]
user, exists, err := s.FindUser(passkey) user, exists, err := s.dataStore.FindUser(passkey)
if err != nil { if err != nil {
log.Panicf("server: %s", err) log.Panicf("server: %s", err)
} }

View file

@ -2,6 +2,7 @@
// Use of this source code is governed by the BSD 2-Clause license, // Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file. // which can be found in the LICENSE file.
// Package redis implements the storage interface for a BitTorrent tracker.
package redis package redis
import ( import (
@ -15,12 +16,12 @@ import (
type driver struct{} type driver struct{}
func (d *driver) New(conf *config.Storage) storage.Pool { func (d *driver) New(conf *config.Storage) storage.DS {
return &Pool{ return &DS{
conf: conf, conf: conf,
pool: redis.Pool{ Pool: redis.Pool{
MaxIdle: 3, MaxIdle: conf.MaxIdleConn,
IdleTimeout: 240 * time.Second, IdleTimeout: conf.IdleTimeout.Duration,
Dial: makeDialFunc(conf), Dial: makeDialFunc(conf),
TestOnBorrow: testOnBorrow, TestOnBorrow: testOnBorrow,
}, },
@ -34,16 +35,13 @@ func makeDialFunc(conf *config.Storage) func() (redis.Conn, error) {
err error err error
) )
if conf.ConnectTimeout != nil && if conf.ConnTimeout != nil {
conf.ReadTimeout != nil &&
conf.WriteTimeout != nil {
conn, err = redis.DialTimeout( conn, err = redis.DialTimeout(
conf.Network, conf.Network,
conf.Addr, conf.Addr,
conf.ConnectTimeout.Duration, conf.ConnTimeout.Duration, // Connect Timeout
conf.ReadTimeout.Duration, conf.ConnTimeout.Duration, // Read Timeout
conf.WriteTimeout.Duration, conf.ConnTimeout.Duration, // Write Timeout
) )
} else { } else {
conn, err = redis.Dial(conf.Network, conf.Addr) conn, err = redis.Dial(conf.Network, conf.Addr)
@ -60,42 +58,26 @@ func testOnBorrow(c redis.Conn, t time.Time) error {
return err return err
} }
type Pool struct { type DS struct {
conf *config.Storage conf *config.Storage
pool redis.Pool redis.Pool
} }
func (p *Pool) Get() storage.Conn { func (ds *DS) FindUser(passkey string) (*storage.User, bool, error) {
return &Conn{ conn := ds.Get()
conf: p.conf, defer conn.Close()
Conn: p.pool.Get(),
}
}
func (p *Pool) Close() error { key := ds.conf.Prefix + "user:" + passkey
return p.pool.Close() reply, err := redis.Values(conn.Do("HGETALL", key))
}
type Conn struct {
conf *config.Storage
redis.Conn
}
func (c *Conn) FindUser(passkey string) (*storage.User, bool, error) {
key := c.conf.Prefix + "User:" + passkey
exists, err := redis.Bool(c.Do("EXISTS", key))
if err != nil {
return nil, false, err
}
if !exists {
return nil, false, nil
}
reply, err := redis.Values(c.Do("HGETALL", key))
if err != nil { if err != nil {
return nil, true, err return nil, true, err
} }
// If we get nothing back, the user isn't found.
if len(reply) == 0 {
return nil, false, nil
}
user := &storage.User{} user := &storage.User{}
err = redis.ScanStruct(reply, user) err = redis.ScanStruct(reply, user)
if err != nil { if err != nil {
@ -104,21 +86,21 @@ func (c *Conn) FindUser(passkey string) (*storage.User, bool, error) {
return user, true, nil return user, true, nil
} }
func (c *Conn) FindTorrent(infohash string) (*storage.Torrent, bool, error) { func (ds *DS) FindTorrent(infohash string) (*storage.Torrent, bool, error) {
key := c.conf.Prefix + "Torrent:" + infohash conn := ds.Get()
defer conn.Close()
exists, err := redis.Bool(c.Do("EXISTS", key)) key := ds.conf.Prefix + "torrent:" + infohash
reply, err := redis.Values(conn.Do("HGETALL", key))
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
if !exists {
// If we get nothing back, the torrent isn't found.
if len(reply) == 0 {
return nil, false, nil return nil, false, nil
} }
reply, err := redis.Values(c.Do("HGETALL", key))
if err != nil {
return nil, true, err
}
torrent := &storage.Torrent{} torrent := &storage.Torrent{}
err = redis.ScanStruct(reply, torrent) err = redis.ScanStruct(reply, torrent)
if err != nil { if err != nil {
@ -127,21 +109,50 @@ func (c *Conn) FindTorrent(infohash string) (*storage.Torrent, bool, error) {
return torrent, true, nil return torrent, true, nil
} }
type Tx struct { func (ds *DS) ClientWhitelisted(peerID string) (bool, error) {
conn *Conn conn := ds.Get()
defer conn.Close()
key := ds.conf.Prefix + "whitelist:" + peerID
exists, err := redis.Bool(conn.Do("EXISTS", key))
if err != nil {
return false, err
}
return exists, nil
} }
func (c *Conn) NewTx() (storage.Tx, error) { type Tx struct {
err := c.Send("MULTI") conf *config.Storage
done bool
redis.Conn
}
func (ds *DS) Begin() (storage.Tx, error) {
conn := ds.Get()
err := conn.Send("MULTI")
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &Tx{c}, nil return &Tx{
conf: ds.conf,
Conn: conn,
}, nil
}
func (t *Tx) Close() {
if t.done {
panic("redis: transaction closed twice")
}
t.done = true
t.Conn.Close()
} }
func (t *Tx) UnpruneTorrent(torrent *storage.Torrent) error { func (t *Tx) UnpruneTorrent(torrent *storage.Torrent) error {
key := t.conn.conf.Prefix + "Torrent:" + torrent.Infohash if t.done {
err := t.conn.Send("HSET " + key + " Status 0") return storage.ErrTxDone
}
key := t.conf.Prefix + "torrent:" + torrent.Infohash
err := t.Send("HSET " + key + " Status 0")
if err != nil { if err != nil {
return err return err
} }
@ -149,10 +160,24 @@ func (t *Tx) UnpruneTorrent(torrent *storage.Torrent) error {
} }
func (t *Tx) Commit() error { func (t *Tx) Commit() error {
_, err := t.conn.Do("EXEC") if t.done {
return storage.ErrTxDone
}
_, err := t.Do("EXEC")
if err != nil { if err != nil {
return err return err
} }
t.Close()
return nil
}
// Redis doesn't need to rollback. Exec is atomic.
func (t *Tx) Rollback() error {
if t.done {
return storage.ErrTxDone
}
t.Close()
return nil return nil
} }

View file

@ -7,17 +7,24 @@
package storage package storage
import ( import (
"errors"
"fmt" "fmt"
"github.com/pushrax/chihaya/config" "github.com/pushrax/chihaya/config"
) )
var drivers = make(map[string]Driver) var (
drivers = make(map[string]Driver)
ErrTxDone = errors.New("storage: Transaction has already been committed or rolled back")
)
type Driver interface { type Driver interface {
New(*config.Storage) Pool New(*config.Storage) DS
} }
// Register makes a database driver available by the provided name.
// If Register is called twice with the same name or if driver is nil,
// it panics.
func Register(name string, driver Driver) { func Register(name string, driver Driver) {
if driver == nil { if driver == nil {
panic("storage: Register driver is nil") panic("storage: Register driver is nil")
@ -28,7 +35,8 @@ func Register(name string, driver Driver) {
drivers[name] = driver drivers[name] = driver
} }
func Open(conf *config.Storage) (Pool, error) { // Open opens a data store specified by a storage configuration.
func Open(conf *config.Storage) (DS, error) {
driver, ok := drivers[conf.Driver] driver, ok := drivers[conf.Driver]
if !ok { if !ok {
return nil, fmt.Errorf( return nil, fmt.Errorf(
@ -40,25 +48,29 @@ func Open(conf *config.Storage) (Pool, error) {
return pool, nil return pool, nil
} }
// ConnPool represents a pool of connections to the data store. // DS represents a data store handle. It's expected to be safe for concurrent
type Pool interface { // use by multiple goroutines.
Close() error //
Get() Conn // A pool of connections or a database/sql.DB is a great concrete type to
} // implement the DS interface.
type DS interface {
// Conn represents a single connection to the data store.
type Conn interface {
Close() error Close() error
NewTx() (Tx, error) Begin() (Tx, error)
FindUser(passkey string) (*User, bool, error) FindUser(passkey string) (*User, bool, error)
FindTorrent(infohash string) (*Torrent, bool, error) FindTorrent(infohash string) (*Torrent, bool, error)
ClientWhitelisted(peerID string) (bool, error)
} }
// Tx represents a data store transaction. // Tx represents an in-progress data store transaction.
// A transaction must end with a call to Commit or Rollback.
//
// After a call to Commit or Rollback, all operations on the
// transaction must fail with ErrTxDone.
type Tx interface { type Tx interface {
Commit() error Commit() error
Rollback() error
UnpruneTorrent(torrent *Torrent) error UnpruneTorrent(torrent *Torrent) error
} }