diff --git a/.gitignore b/.gitignore deleted file mode 100644 index 131dbe4..0000000 --- a/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -/config.json -/chihaya -/Godeps/_workspace diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index b0999e3..0000000 --- a/.travis.yml +++ /dev/null @@ -1,24 +0,0 @@ -language: go - -go: - - 1.5 - - tip - -sudo: false - -before_install: - - go get github.com/tools/godep - - godep restore - -script: - - go test -v ./... - -notifications: - irc: - channels: - - "irc.freenode.net#chihaya" - use_notice: true - skip_join: true - on_success: always - on_failure: always - email: false diff --git a/chihaya.go b/chihaya.go new file mode 100644 index 0000000..4f23bae --- /dev/null +++ b/chihaya.go @@ -0,0 +1,47 @@ +// Copyright 2016 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file.package middleware + +package chihaya + +import ( + "net" + "time" +) + +// AnnounceRequest represents the parsed parameters from an announce request. +type AnnounceRequest map[string]interface{} + +// AnnounceResponse represents the parameters used to create an announce +// response. +type AnnounceResponse struct { + Compact bool + Complete int32 + Incomplete int32 + Interval time.Duration + MinInterval time.Duration + IPv4Peers []Peer + IPv6Peers []Peer +} + +// ScrapeRequest represents the parsed parameters from a scrape request. +type ScrapeRequest map[string]interface{} + +// ScrapeResponse represents the parameters used to create a scrape response. +type ScrapeResponse struct { + Files map[string]Scrape +} + +// Scrape represents the state of a swarm that is returned in a scrape response. +type Scrape struct { + Complete int32 + Incomplete int32 +} + +// Peer represents the connection details of a peer that is returned in an +// announce response. +type Peer struct { + ID string + IP net.IP + Port uint16 +} diff --git a/cmd/chihaya/main.go b/cmd/chihaya/main.go new file mode 100644 index 0000000..b611e94 --- /dev/null +++ b/cmd/chihaya/main.go @@ -0,0 +1,47 @@ +// Copyright 2016 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package main + +import ( + "flag" + "log" + "os" + "os/signal" + "syscall" + + "github.com/chihaya/chihaya/config" + "github.com/chihaya/chihaya/server" + "github.com/chihaya/chihaya/tracker" +) + +var configPath string + +func init() { + flag.StringVar(&configPath, "config", "", "path to the configuration file") +} + +func main() { + flag.Parse() + + cfg, err := config.Open(configPath) + if err != nil { + log.Fatal("failed to load config: " + err.Error()) + } + + tkr, err := tracker.NewTracker(&cfg.Tracker) + if err != nil { + log.Fatal("failed to create tracker: " + err.Error()) + } + + pool, err := server.StartPool(cfg.Servers, tkr) + if err != nil { + log.Fatal("failed to create server pool: " + err.Error()) + } + + shutdown := make(chan os.Signal) + signal.Notify(shutdown, syscall.SIGINT, syscall.SIGTERM) + <-shutdown + pool.Stop() +} diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..e7487fd --- /dev/null +++ b/config/config.go @@ -0,0 +1,84 @@ +// Copyright 2016 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +// Package config implements the opening and parsing of a chihaya configuration. +package config + +import ( + "io" + "io/ioutil" + "os" + "time" + + "gopkg.in/yaml.v2" +) + +// DefaultConfig is a sane configuration used as a fallback or for testing. +var DefaultConfig = Config{ + Tracker: TrackerConfig{ + AnnounceInterval: 10 * time.Minute, + MinAnnounceInterval: 5 * time.Minute, + AnnounceMiddleware: []string{}, + ScrapeMiddleware: []string{}, + }, + Servers: []ServerConfig{}, +} + +// Config represents the global configuration of a chihaya binary. +type Config struct { + Tracker TrackerConfig `yaml:"tracker"` + Servers []ServerConfig `yaml:"servers"` +} + +// TrackerConfig represents the configuration of the BitTorrent tracker used by +// chihaya. +type TrackerConfig struct { + AnnounceInterval time.Duration `yaml:"announce"` + MinAnnounceInterval time.Duration `yaml:"minAnnounce"` + AnnounceMiddleware []string `yaml:"announceMiddleware"` + ScrapeMiddleware []string `yaml:"scrapeMiddleware"` +} + +// ServerConfig represents the configuration of the servers started by chihaya. +type ServerConfig struct { + Name string `yaml:"name"` + Config interface{} `yaml:"config"` +} + +// Open is a shortcut to open a file, read it, and allocates a new Config. +// It supports relative and absolute paths. Given "", it returns DefaultConfig. +func Open(path string) (*Config, error) { + if path == "" { + return &DefaultConfig, nil + } + + f, err := os.Open(os.ExpandEnv(path)) + if err != nil { + return nil, err + } + defer f.Close() + + cfg, err := Decode(f) + if err != nil { + return nil, err + } + + return cfg, nil +} + +// Decode unmarshals an io.Reader into a newly allocated *Config. +func Decode(r io.Reader) (*Config, error) { + contents, err := ioutil.ReadAll(r) + if err != nil { + return nil, err + } + + cfg := &Config{} + err = yaml.Unmarshal(contents, cfg) + if err != nil { + return nil, err + } + + return cfg, nil +} diff --git a/config/example.yaml b/config/example.yaml new file mode 100644 index 0000000..6b89c63 --- /dev/null +++ b/config/example.yaml @@ -0,0 +1,39 @@ +# Copyright 2016 The Chihaya Authors. All rights reserved. +# Use of this source code is governed by the BSD 2-Clause license, +# which can be found in the LICENSE file. + +chihaya: + tracker: + announce: "10m" + minAnnounce: "5m" + announceMiddleware: + - "prometheus" + - "storeClientValidation" + - "storeCreateOnAnnounce" + scrapeMiddleware: + - "prometheus" + - "storeClientValidation" + + servers: + - name: "store" + config: + addr: "localhost:6880" + requestTimeout: "10s" + readTimeout: "10s" + writeTimeout: "10s" + clientStore: "memory" + peerStore: "memory" + peerStoreConfig: + gcAfter: "30m" + shards: 1 + + - name: "http" + config: + addr: "localhost:6881" + requestTimeout: "10s" + readTimeout: "10s" + writeTimeout: "10s" + + - name: "udp" + config: + addr: "localhost:6882" diff --git a/pkg/bencode/bencode.go b/pkg/bencode/bencode.go new file mode 100644 index 0000000..819d0c8 --- /dev/null +++ b/pkg/bencode/bencode.go @@ -0,0 +1,23 @@ +// Copyright 2016 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +// Package bencode implements bencoding of data as defined in BEP 3 using +// type assertion over reflection for performance. +package bencode + +// Dict represents a bencode dictionary. +type Dict map[string]interface{} + +// NewDict allocates the memory for a Dict. +func NewDict() Dict { + return make(Dict) +} + +// List represents a bencode list. +type List []interface{} + +// NewList allocates the memory for a List. +func NewList() List { + return make(List, 0) +} diff --git a/pkg/bencode/decoder.go b/pkg/bencode/decoder.go new file mode 100644 index 0000000..387425b --- /dev/null +++ b/pkg/bencode/decoder.go @@ -0,0 +1,135 @@ +// Copyright 2016 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package bencode + +import ( + "bufio" + "bytes" + "errors" + "io" + "strconv" +) + +// A Decoder reads bencoded objects from an input stream. +type Decoder struct { + r *bufio.Reader +} + +// NewDecoder returns a new decoder that reads from r. +func NewDecoder(r io.Reader) *Decoder { + return &Decoder{r: bufio.NewReader(r)} +} + +// Decode unmarshals the next bencoded value in the stream. +func (dec *Decoder) Decode() (interface{}, error) { + return unmarshal(dec.r) +} + +// Unmarshal deserializes and returns the bencoded value in buf. +func Unmarshal(buf []byte) (interface{}, error) { + r := bufio.NewReader(bytes.NewBuffer(buf)) + return unmarshal(r) +} + +// unmarshal reads bencoded values from a bufio.Reader +func unmarshal(r *bufio.Reader) (interface{}, error) { + tok, err := r.ReadByte() + if err != nil { + return nil, err + } + + switch tok { + case 'i': + return readTerminatedInt(r, 'e') + + case 'l': + list := NewList() + for { + ok, err := readTerminator(r, 'e') + if err != nil { + return nil, err + } else if ok { + break + } + + v, err := unmarshal(r) + if err != nil { + return nil, err + } + list = append(list, v) + } + return list, nil + + case 'd': + dict := NewDict() + for { + ok, err := readTerminator(r, 'e') + if err != nil { + return nil, err + } else if ok { + break + } + + v, err := unmarshal(r) + if err != nil { + return nil, err + } + + key, ok := v.(string) + if !ok { + return nil, errors.New("bencode: non-string map key") + } + + dict[key], err = unmarshal(r) + if err != nil { + return nil, err + } + } + return dict, nil + + default: + err = r.UnreadByte() + if err != nil { + return nil, err + } + + length, err := readTerminatedInt(r, ':') + if err != nil { + return nil, errors.New("bencode: unknown input sequence") + } + + buf := make([]byte, length) + n, err := r.Read(buf) + + if err != nil { + return nil, err + } else if int64(n) != length { + return nil, errors.New("bencode: short read") + } + + return string(buf), nil + } +} + +func readTerminator(r *bufio.Reader, term byte) (bool, error) { + tok, err := r.ReadByte() + if err != nil { + return false, err + } else if tok == term { + return true, nil + } + return false, r.UnreadByte() +} + +func readTerminatedInt(r *bufio.Reader, term byte) (int64, error) { + buf, err := r.ReadSlice(term) + if err != nil { + return 0, err + } else if len(buf) <= 1 { + return 0, errors.New("bencode: empty integer field") + } + + return strconv.ParseInt(string(buf[:len(buf)-1]), 10, 64) +} diff --git a/pkg/bencode/decoder_test.go b/pkg/bencode/decoder_test.go new file mode 100644 index 0000000..a5c9406 --- /dev/null +++ b/pkg/bencode/decoder_test.go @@ -0,0 +1,86 @@ +// Copyright 2016 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package bencode + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +var unmarshalTests = []struct { + input string + expected interface{} +}{ + {"i42e", int64(42)}, + {"i-42e", int64(-42)}, + + {"7:example", "example"}, + + {"l3:one3:twoe", List{"one", "two"}}, + {"le", List{}}, + + {"d3:one2:aa3:two2:bbe", Dict{"one": "aa", "two": "bb"}}, + {"de", Dict{}}, +} + +func TestUnmarshal(t *testing.T) { + for _, tt := range unmarshalTests { + got, err := Unmarshal([]byte(tt.input)) + assert.Nil(t, err, "unmarshal should not fail") + assert.Equal(t, got, tt.expected, "unmarshalled values should match the expected results") + } +} + +type bufferLoop struct { + val string +} + +func (r *bufferLoop) Read(b []byte) (int, error) { + n := copy(b, r.val) + return n, nil +} + +func BenchmarkUnmarshalScalar(b *testing.B) { + d1 := NewDecoder(&bufferLoop{"7:example"}) + d2 := NewDecoder(&bufferLoop{"i42e"}) + + for i := 0; i < b.N; i++ { + d1.Decode() + d2.Decode() + } +} + +func TestUnmarshalLarge(t *testing.T) { + data := Dict{ + "k1": List{"a", "b", "c"}, + "k2": int64(42), + "k3": "val", + "k4": int64(-42), + } + + buf, _ := Marshal(data) + dec := NewDecoder(&bufferLoop{string(buf)}) + + got, err := dec.Decode() + assert.Nil(t, err, "decode should not fail") + assert.Equal(t, got, data, "encoding and decoding should equal the original value") +} + +func BenchmarkUnmarshalLarge(b *testing.B) { + data := map[string]interface{}{ + "k1": []string{"a", "b", "c"}, + "k2": 42, + "k3": "val", + "k4": uint(42), + } + + buf, _ := Marshal(data) + dec := NewDecoder(&bufferLoop{string(buf)}) + + for i := 0; i < b.N; i++ { + dec.Decode() + } +} diff --git a/pkg/bencode/encoder.go b/pkg/bencode/encoder.go new file mode 100644 index 0000000..2f51688 --- /dev/null +++ b/pkg/bencode/encoder.go @@ -0,0 +1,157 @@ +// Copyright 2016 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package bencode + +import ( + "bytes" + "fmt" + "io" + "strconv" + "time" +) + +// An Encoder writes bencoded objects to an output stream. +type Encoder struct { + w io.Writer +} + +// NewEncoder returns a new encoder that writes to w. +func NewEncoder(w io.Writer) *Encoder { + return &Encoder{w: w} +} + +// Encode writes the bencoding of v to the stream. +func (enc *Encoder) Encode(v interface{}) error { + return marshal(enc.w, v) +} + +// Marshal returns the bencoding of v. +func Marshal(v interface{}) ([]byte, error) { + buf := &bytes.Buffer{} + err := marshal(buf, v) + return buf.Bytes(), err +} + +// Marshaler is the interface implemented by objects that can marshal +// themselves. +type Marshaler interface { + MarshalBencode() ([]byte, error) +} + +// marshal writes types bencoded to an io.Writer +func marshal(w io.Writer, data interface{}) error { + switch v := data.(type) { + case Marshaler: + bencoded, err := v.MarshalBencode() + if err != nil { + return err + } + _, err = w.Write(bencoded) + if err != nil { + return err + } + + case string: + marshalString(w, v) + + case int: + marshalInt(w, int64(v)) + + case uint: + marshalUint(w, uint64(v)) + + case int16: + marshalInt(w, int64(v)) + + case uint16: + marshalUint(w, uint64(v)) + + case int64: + marshalInt(w, v) + + case uint64: + marshalUint(w, v) + + case []byte: + marshalBytes(w, v) + + case time.Duration: // Assume seconds + marshalInt(w, int64(v/time.Second)) + + case Dict: + marshal(w, map[string]interface{}(v)) + + case []Dict: + w.Write([]byte{'l'}) + for _, val := range v { + err := marshal(w, val) + if err != nil { + return err + } + } + w.Write([]byte{'e'}) + + case map[string]interface{}: + w.Write([]byte{'d'}) + for key, val := range v { + marshalString(w, key) + err := marshal(w, val) + if err != nil { + return err + } + } + w.Write([]byte{'e'}) + + case []string: + w.Write([]byte{'l'}) + for _, val := range v { + err := marshal(w, val) + if err != nil { + return err + } + } + w.Write([]byte{'e'}) + + case List: + marshal(w, []interface{}(v)) + + case []interface{}: + w.Write([]byte{'l'}) + for _, val := range v { + err := marshal(w, val) + if err != nil { + return err + } + } + w.Write([]byte{'e'}) + + default: + return fmt.Errorf("attempted to marshal unsupported type:\n%t", v) + } + + return nil +} + +func marshalInt(w io.Writer, v int64) { + w.Write([]byte{'i'}) + w.Write([]byte(strconv.FormatInt(v, 10))) + w.Write([]byte{'e'}) +} + +func marshalUint(w io.Writer, v uint64) { + w.Write([]byte{'i'}) + w.Write([]byte(strconv.FormatUint(v, 10))) + w.Write([]byte{'e'}) +} + +func marshalBytes(w io.Writer, v []byte) { + w.Write([]byte(strconv.Itoa(len(v)))) + w.Write([]byte{':'}) + w.Write(v) +} + +func marshalString(w io.Writer, v string) { + marshalBytes(w, []byte(v)) +} diff --git a/pkg/bencode/encoder_test.go b/pkg/bencode/encoder_test.go new file mode 100644 index 0000000..61186d0 --- /dev/null +++ b/pkg/bencode/encoder_test.go @@ -0,0 +1,71 @@ +// Copyright 2016 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package bencode + +import ( + "bytes" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +var marshalTests = []struct { + input interface{} + expected []string +}{ + {int(42), []string{"i42e"}}, + {int(-42), []string{"i-42e"}}, + {uint(43), []string{"i43e"}}, + {int64(44), []string{"i44e"}}, + {uint64(45), []string{"i45e"}}, + {int16(44), []string{"i44e"}}, + {uint16(45), []string{"i45e"}}, + + {"example", []string{"7:example"}}, + {[]byte("example"), []string{"7:example"}}, + {30 * time.Minute, []string{"i1800e"}}, + + {[]string{"one", "two"}, []string{"l3:one3:twoe", "l3:two3:onee"}}, + {[]interface{}{"one", "two"}, []string{"l3:one3:twoe", "l3:two3:onee"}}, + {[]string{}, []string{"le"}}, + + {map[string]interface{}{"one": "aa", "two": "bb"}, []string{"d3:one2:aa3:two2:bbe", "d3:two2:bb3:one2:aae"}}, + {map[string]interface{}{}, []string{"de"}}, +} + +func TestMarshal(t *testing.T) { + for _, test := range marshalTests { + got, err := Marshal(test.input) + assert.Nil(t, err, "marshal should not fail") + assert.Contains(t, test.expected, string(got), "the marshaled result should be one of the expected permutations") + } +} + +func BenchmarkMarshalScalar(b *testing.B) { + buf := &bytes.Buffer{} + encoder := NewEncoder(buf) + + for i := 0; i < b.N; i++ { + encoder.Encode("test") + encoder.Encode(123) + } +} + +func BenchmarkMarshalLarge(b *testing.B) { + data := map[string]interface{}{ + "k1": []string{"a", "b", "c"}, + "k2": 42, + "k3": "val", + "k4": uint(42), + } + + buf := &bytes.Buffer{} + encoder := NewEncoder(buf) + + for i := 0; i < b.N; i++ { + encoder.Encode(data) + } +} diff --git a/pkg/clientid/client_id.go b/pkg/clientid/client_id.go new file mode 100644 index 0000000..3f97009 --- /dev/null +++ b/pkg/clientid/client_id.go @@ -0,0 +1,23 @@ +// Copyright 2016 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +// Package clientid implements the parsing of BitTorrent ClientIDs from +// BitTorrent PeerIDs. +package clientid + +// New returns the part of a PeerID that identifies a peer's client software. +func New(peerID string) (clientID string) { + length := len(peerID) + if length >= 6 { + if peerID[0] == '-' { + if length >= 7 { + clientID = peerID[1:7] + } + } else { + clientID = peerID[:6] + } + } + + return +} diff --git a/pkg/clientid/client_id_test.go b/pkg/clientid/client_id_test.go new file mode 100644 index 0000000..949ac66 --- /dev/null +++ b/pkg/clientid/client_id_test.go @@ -0,0 +1,62 @@ +// Copyright 2016 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package clientid + +import "testing" + +func TestClientID(t *testing.T) { + var clientTable = []struct { + peerID string + clientID string + }{ + {"-AZ3034-6wfG2wk6wWLc", "AZ3034"}, + {"-AZ3042-6ozMq5q6Q3NX", "AZ3042"}, + {"-BS5820-oy4La2MWGEFj", "BS5820"}, + {"-AR6360-6oZyyMWoOOBe", "AR6360"}, + {"-AG2083-s1hiF8vGAAg0", "AG2083"}, + {"-AG3003-lEl2Mm4NEO4n", "AG3003"}, + {"-MR1100-00HS~T7*65rm", "MR1100"}, + {"-LK0140-ATIV~nbEQAMr", "LK0140"}, + {"-KT2210-347143496631", "KT2210"}, + {"-TR0960-6ep6svaa61r4", "TR0960"}, + {"-XX1150-dv220cotgj4d", "XX1150"}, + {"-AZ2504-192gwethivju", "AZ2504"}, + {"-KT4310-3L4UvarKuqIu", "KT4310"}, + {"-AZ2060-0xJQ02d4309O", "AZ2060"}, + {"-BD0300-2nkdf08Jd890", "BD0300"}, + {"-A~0010-a9mn9DFkj39J", "A~0010"}, + {"-UT2300-MNu93JKnm930", "UT2300"}, + {"-UT2300-KT4310KT4301", "UT2300"}, + + {"T03A0----f089kjsdf6e", "T03A0-"}, + {"S58B-----nKl34GoNb75", "S58B--"}, + {"M4-4-0--9aa757Efd5Bl", "M4-4-0"}, + + {"AZ2500BTeYUzyabAfo6U", "AZ2500"}, // BitTyrant + {"exbc0JdSklm834kj9Udf", "exbc0J"}, // Old BitComet + {"FUTB0L84j542mVc84jkd", "FUTB0L"}, // Alt BitComet + {"XBT054d-8602Jn83NnF9", "XBT054"}, // XBT + {"OP1011affbecbfabeefb", "OP1011"}, // Opera + {"-ML2.7.2-kgjjfkd9762", "ML2.7."}, // MLDonkey + {"-BOWA0C-SDLFJWEIORNM", "BOWA0C"}, // Bits on Wheels + {"Q1-0-0--dsn34DFn9083", "Q1-0-0"}, // Queen Bee + {"Q1-10-0-Yoiumn39BDfO", "Q1-10-"}, // Queen Bee Alt + {"346------SDFknl33408", "346---"}, // TorreTopia + {"QVOD0054ABFFEDCCDEDB", "QVOD00"}, // Qvod + + {"", ""}, + {"-", ""}, + {"12345", ""}, + {"-12345", ""}, + {"123456", "123456"}, + {"-123456", "123456"}, + } + + for _, tt := range clientTable { + if parsedID := New(tt.peerID); parsedID != tt.clientID { + t.Error("Incorrectly parsed peer ID", tt.peerID, "as", parsedID) + } + } +} diff --git a/pkg/event/event.go b/pkg/event/event.go new file mode 100644 index 0000000..c9bd626 --- /dev/null +++ b/pkg/event/event.go @@ -0,0 +1,70 @@ +// Copyright 2016 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +// Package event implements type-level constraints for dealing with the events +// communicated via BitTorrent announce. +package event + +import ( + "errors" + "strings" +) + +// ErrUnknownEvent is returned when New fails to return an event. +var ErrUnknownEvent = errors.New("unknown event") + +// Event represents an event done by a BitTorrent client. +type event uint8 + +const ( + // None is the event when a BitTorrent client announces due to time lapsed + // since the previous announce. + None event = iota + + // Started is the event sent by a BitTorrent client when it joins a swarm. + Started + + // Stopped is the event sent by a BitTorrent client when it leaves a swarm. + Stopped + + // Completed is the event sent by a BitTorrent client when it finishes + // downloading all of the required chunks. + Completed +) + +var ( + eventToString = make(map[event]string) + stringToEvent = make(map[string]event) +) + +func init() { + eventToString[None] = "none" + eventToString[Started] = "started" + eventToString[Stopped] = "stopped" + eventToString[Completed] = "completed" + + stringToEvent[""] = None + stringToEvent["none"] = None + stringToEvent["started"] = Started + stringToEvent["stopped"] = Stopped + stringToEvent["completed"] = Completed +} + +// New returns the proper Event given a string. +func New(eventStr string) (event, error) { + if e, ok := stringToEvent[strings.ToLower(eventStr)]; ok { + return e, nil + } + + return None, ErrUnknownEvent +} + +// String implements Stringer for an event. +func (e event) String() string { + if name, ok := eventToString[e]; ok { + return name + } + + panic("event: event has no associated name") +} diff --git a/pkg/event/event_test.go b/pkg/event/event_test.go new file mode 100644 index 0000000..20ee33f --- /dev/null +++ b/pkg/event/event_test.go @@ -0,0 +1,33 @@ +// Copyright 2016 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package event + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNew(t *testing.T) { + var table = []struct { + data string + expected event + expectedErr error + }{ + {"", None, nil}, + {"NONE", None, nil}, + {"none", None, nil}, + {"started", Started, nil}, + {"stopped", Stopped, nil}, + {"completed", Completed, nil}, + {"notAnEvent", None, ErrUnknownEvent}, + } + + for _, tt := range table { + got, err := New(tt.data) + assert.Equal(t, err, tt.expectedErr, "errors should equal the expected value") + assert.Equal(t, got, tt.expected, "events should equal the expected value") + } +} diff --git a/server/http/config.go b/server/http/config.go new file mode 100644 index 0000000..497f9e8 --- /dev/null +++ b/server/http/config.go @@ -0,0 +1,33 @@ +// Copyright 2016 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package http + +import ( + "time" + + "gopkg.in/yaml.v2" +) + +type httpConfig struct { + Addr string `yaml:"addr"` + RequestTimeout time.Duration `yaml:"requestTimeout"` + ReadTimeout time.Duration `yaml:"readTimeout"` + WriteTimeout time.Duration `yaml:"writeTimeout"` +} + +func newHTTPConfig(srvcfg interface{}) (*httpConfig, error) { + bytes, err := yaml.Marshal(srvcfg) + if err != nil { + return nil, err + } + + var cfg httpConfig + err = yaml.Unmarshal(bytes, &cfg) + if err != nil { + return nil, err + } + + return &cfg, nil +} diff --git a/server/http/query/query.go b/server/http/query/query.go new file mode 100644 index 0000000..9c10030 --- /dev/null +++ b/server/http/query/query.go @@ -0,0 +1,239 @@ +// Copyright 2016 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +// Package query implements a simple, fast URL parser designed to be used to +// parse parameters sent from BitTorrent clients. The last value of a key wins, +// except for they key "info_hash". +package query + +import ( + "errors" + "net/url" + "strconv" + "strings" + + "github.com/chihaya/chihaya" + "github.com/chihaya/chihaya/pkg/event" +) + +// ErrKeyNotFound is returned when a provided key has no value associated with +// it. +var ErrKeyNotFound = errors.New("query: value for the provided key does not exist") + +// Query represents a parsed URL.Query. +type Query struct { + query string + infohashes []string + params map[string]string +} + +// New parses a raw URL query. +func New(query string) (*Query, error) { + var ( + keyStart, keyEnd int + valStart, valEnd int + firstInfohash string + + onKey = true + hasInfohash = false + + q = &Query{ + query: query, + infohashes: nil, + params: make(map[string]string), + } + ) + + for i, length := 0, len(query); i < length; i++ { + separator := query[i] == '&' || query[i] == ';' || query[i] == '?' + last := i == length-1 + + if separator || last { + if onKey && !last { + keyStart = i + 1 + continue + } + + if last && !separator && !onKey { + valEnd = i + } + + keyStr, err := url.QueryUnescape(query[keyStart : keyEnd+1]) + if err != nil { + return nil, err + } + + var valStr string + + if valEnd > 0 { + valStr, err = url.QueryUnescape(query[valStart : valEnd+1]) + if err != nil { + return nil, err + } + } + + q.params[strings.ToLower(keyStr)] = valStr + + if keyStr == "info_hash" { + if hasInfohash { + // Multiple infohashes + if q.infohashes == nil { + q.infohashes = []string{firstInfohash} + } + q.infohashes = append(q.infohashes, valStr) + } else { + firstInfohash = valStr + hasInfohash = true + } + } + + valEnd = 0 + onKey = true + keyStart = i + 1 + + } else if query[i] == '=' { + onKey = false + valStart = i + 1 + valEnd = 0 + } else if onKey { + keyEnd = i + } else { + valEnd = i + } + } + + return q, nil +} + +// Infohashes returns a list of requested infohashes. +func (q *Query) Infohashes() ([]string, error) { + if q.infohashes == nil { + infohash, err := q.String("info_hash") + if err != nil { + return nil, err + } + return []string{infohash}, nil + } + return q.infohashes, nil +} + +// String returns a string parsed from a query. Every key can be returned as a +// string because they are encoded in the URL as strings. +func (q *Query) String(key string) (string, error) { + val, exists := q.params[key] + if !exists { + return "", ErrKeyNotFound + } + return val, nil +} + +// Uint64 returns a uint parsed from a query. After being called, it is safe to +// cast the uint64 to your desired length. +func (q *Query) Uint64(key string) (uint64, error) { + str, exists := q.params[key] + if !exists { + return 0, ErrKeyNotFound + } + + val, err := strconv.ParseUint(str, 10, 64) + if err != nil { + return 0, err + } + + return val, nil +} + +// AnnounceRequest generates an chihaya.AnnounceRequest with the parameters +// provided by a query. +func (q *Query) AnnounceRequest() (chihaya.AnnounceRequest, error) { + request := make(chihaya.AnnounceRequest) + + request["query"] = q.query + + eventStr, err := q.String("event") + if err != nil { + return nil, errors.New("failed to parse parameter: event") + } + request["event"], err = event.New(eventStr) + if err != nil { + return nil, errors.New("failed to provide valid client event") + } + + compactStr, err := q.String("compact") + if err != nil { + return nil, errors.New("failed to parse parameter: compact") + } + request["compact"] = compactStr != "0" + + request["info_hash"], err = q.String("info_hash") + if err != nil { + return nil, errors.New("failed to parse parameter: info_hash") + } + + request["peer_id"], err = q.String("peer_id") + if err != nil { + return nil, errors.New("failed to parse parameter: peer_id") + } + + request["left"], err = q.Uint64("left") + if err != nil { + return nil, errors.New("failed to parse parameter: left") + } + + request["downloaded"], err = q.Uint64("downloaded") + if err != nil { + return nil, errors.New("failed to parse parameter: downloaded") + } + + request["uploaded"], err = q.Uint64("uploaded") + if err != nil { + return nil, errors.New("failed to parse parameter: uploaded") + } + + request["numwant"], err = q.String("numwant") + if err != nil { + return nil, errors.New("failed to parse parameter: numwant") + } + + request["ip"], err = q.Uint64("port") + if err != nil { + return nil, errors.New("failed to parse parameter: port") + } + + request["port"], err = q.Uint64("port") + if err != nil { + return nil, errors.New("failed to parse parameter: port") + } + + request["ip"], err = q.String("ip") + if err != nil { + return nil, errors.New("failed to parse parameter: ip") + } + + request["ipv4"], err = q.String("ipv4") + if err != nil { + return nil, errors.New("failed to parse parameter: ipv4") + } + + request["ipv6"], err = q.String("ipv6") + if err != nil { + return nil, errors.New("failed to parse parameter: ipv6") + } + + return request, nil +} + +// ScrapeRequest generates an chihaya.ScrapeRequeset with the parameters +// provided by a query. +func (q *Query) ScrapeRequest() (chihaya.ScrapeRequest, error) { + request := make(chihaya.ScrapeRequest) + + var err error + request["info_hash"], err = q.Infohashes() + if err != nil { + return nil, errors.New("failed to parse parameter: info_hash") + } + + return request, nil +} diff --git a/server/http/server.go b/server/http/server.go new file mode 100644 index 0000000..becda8f --- /dev/null +++ b/server/http/server.go @@ -0,0 +1,147 @@ +// Copyright 2016 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package http + +import ( + "errors" + "log" + "net" + "net/http" + + "github.com/julienschmidt/httprouter" + "github.com/tylerb/graceful" + + "github.com/chihaya/chihaya/config" + "github.com/chihaya/chihaya/server" + "github.com/chihaya/chihaya/server/http/query" + "github.com/chihaya/chihaya/tracker" +) + +func init() { + server.Register("http", constructor) +} + +func constructor(srvcfg *config.ServerConfig, tkr *tracker.Tracker) (server.Server, error) { + cfg, err := newHTTPConfig(srvcfg) + if err != nil { + return nil, errors.New("http: invalid config: " + err.Error()) + } + + return &httpServer{ + cfg: cfg, + tkr: tkr, + }, nil +} + +type httpServer struct { + cfg *httpConfig + tkr *tracker.Tracker + grace *graceful.Server + stopping bool +} + +func (s *httpServer) Start() { + s.grace = &graceful.Server{ + Server: &http.Server{ + Addr: s.cfg.Addr, + Handler: s.routes(), + ReadTimeout: s.cfg.ReadTimeout, + WriteTimeout: s.cfg.WriteTimeout, + }, + Timeout: s.cfg.RequestTimeout, + NoSignalHandling: true, + ShutdownInitiated: func() { s.stopping = true }, + ConnState: func(conn net.Conn, state http.ConnState) { + switch state { + case http.StateNew: + //stats.RecordEvent(stats.AcceptedConnection) + + case http.StateClosed: + //stats.RecordEvent(stats.ClosedConnection) + + case http.StateHijacked: + panic("http: connection impossibly hijacked") + + // Ignore the following cases. + case http.StateActive, http.StateIdle: + + default: + panic("http: connection transitioned to unknown state") + } + }, + } + s.grace.SetKeepAlivesEnabled(false) + + if err := s.grace.ListenAndServe(); err != nil { + if opErr, ok := err.(*net.OpError); !ok || (ok && opErr.Op != "accept") { + log.Printf("Failed to gracefully run HTTP server: %s", err.Error()) + return + } + } +} + +func (s *httpServer) Stop() { + if !s.stopping { + s.grace.Stop(s.grace.Timeout) + } + + s.grace = nil + s.stopping = false +} + +func (s *httpServer) routes() *httprouter.Router { + r := httprouter.New() + r.GET("/announce", s.serveAnnounce) + r.GET("/scrape", s.serveScrape) + return r +} + +func (s *httpServer) serveAnnounce(w http.ResponseWriter, r *http.Request, p httprouter.Params) { + writer := &writer{w} + + q, err := query.New(r.URL.RawQuery) + if err != nil { + writer.writeError(err) + return + } + + req, err := q.AnnounceRequest() + if err != nil { + writer.writeError(err) + return + } + + resp, err := s.tkr.HandleAnnounce(req) + if err != nil { + writer.writeError(err) + return + } + + writer.writeAnnounceResponse(resp) +} + +func (s *httpServer) serveScrape(w http.ResponseWriter, r *http.Request, p httprouter.Params) { + writer := &writer{w} + + q, err := query.New(r.URL.RawQuery) + if err != nil { + writer.writeError(err) + return + } + + req, err := q.ScrapeRequest() + if err != nil { + writer.writeError(err) + return + } + + resp, err := s.tkr.HandleScrape(req) + if err != nil { + writer.writeError(err) + return + } + + writer.writeScrapeResponse(resp) +} diff --git a/server/http/writer.go b/server/http/writer.go new file mode 100644 index 0000000..743ebd6 --- /dev/null +++ b/server/http/writer.go @@ -0,0 +1,94 @@ +// Copyright 2016 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package http + +import ( + "net/http" + + "github.com/chihaya/chihaya" + "github.com/chihaya/chihaya/pkg/bencode" +) + +type writer struct{ http.ResponseWriter } + +func (w *writer) writeError(err error) error { + return bencode.NewEncoder(w).Encode(bencode.Dict{ + "failure reason": err.Error(), + }) +} + +func (w *writer) writeAnnounceResponse(resp *chihaya.AnnounceResponse) error { + bdict := bencode.Dict{ + "complete": resp.Complete, + "incomplete": resp.Incomplete, + "interval": resp.Interval, + "min interval": resp.MinInterval, + } + + // Add the peers to the dictionary in the compact format. + if resp.Compact { + var IPv4CompactDict, IPv6CompactDict []byte + + // Add the IPv4 peers to the dictionary. + for _, peer := range resp.IPv4Peers { + IPv4CompactDict = append(IPv4CompactDict, compact(peer)...) + } + if len(IPv4CompactDict) > 0 { + bdict["peers"] = IPv4CompactDict + } + + // Add the IPv6 peers to the dictionary. + for _, peer := range resp.IPv6Peers { + IPv6CompactDict = append(IPv6CompactDict, compact(peer)...) + } + if len(IPv6CompactDict) > 0 { + bdict["peers6"] = IPv6CompactDict + } + + return bencode.NewEncoder(w).Encode(bdict) + } + + // Add the peers to the dictionary. + var peers []bencode.Dict + for _, peer := range resp.IPv4Peers { + peers = append(peers, dict(peer)) + } + for _, peer := range resp.IPv6Peers { + peers = append(peers, dict(peer)) + } + bdict["peers"] = peers + + return bencode.NewEncoder(w).Encode(bdict) +} + +func (w *writer) writeScrapeResponse(resp *chihaya.ScrapeResponse) error { + filesDict := bencode.NewDict() + for infohash, scrape := range resp.Files { + filesDict[infohash] = bencode.Dict{ + "complete": scrape.Complete, + "incomplete": scrape.Incomplete, + "downloaded": scrape.Downloaded, + } + } + + return bencode.NewEncoder(w).Encode(bencode.Dict{ + "files": filesDict, + }) +} + +func compact(peer chihaya.Peer) (buf []byte) { + buf = []byte(peer.IP) + buf = append(buf, byte(peer.Port>>8)) + buf = append(buf, byte(peer.Port&0xff)) + return +} + +func dict(peer chihaya.Peer) bencode.Dict { + return bencode.Dict{ + "peer id": peer.ID, + "ip": peer.IP.String(), + "port": peer.Port, + } +} diff --git a/server/http/writer_test.go b/server/http/writer_test.go new file mode 100644 index 0000000..e82e86c --- /dev/null +++ b/server/http/writer_test.go @@ -0,0 +1,30 @@ +// Copyright 2016 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package http + +import ( + "errors" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestWriteError(t *testing.T) { + var table = []struct { + reason, expected string + }{ + {"hello world", "d14:failure reason11:hello worlde"}, + {"what's up", "d14:failure reason9:what's upe"}, + } + + for _, tt := range table { + r := httptest.NewRecorder() + w := &writer{r} + err := w.writeError(errors.New(tt.reason)) + assert.Nil(t, err, "writeError should not fail with test input") + assert.Equal(t, r.Body.String(), tt.expected, "writer should write the expected value") + } +} diff --git a/server/pool.go b/server/pool.go new file mode 100644 index 0000000..91b3ab5 --- /dev/null +++ b/server/pool.go @@ -0,0 +1,53 @@ +// Copyright 2016 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package server + +import ( + "sync" + + "github.com/chihaya/chihaya/config" + "github.com/chihaya/chihaya/tracker" +) + +// Pool represents a running pool of servers. +type Pool struct { + servers []Server + wg sync.WaitGroup +} + +// StartPool creates a new pool of servers specified by the provided config and +// runs them. +func StartPool(cfgs []config.ServerConfig, tkr *tracker.Tracker) (*Pool, error) { + var servers []Server + var wg sync.WaitGroup + + for _, cfg := range cfgs { + srv, err := New(&cfg, tkr) + if err != nil { + return nil, err + } + + wg.Add(1) + go func(srv Server) { + defer wg.Done() + srv.Start() + }(srv) + + servers = append(servers, srv) + } + + return &Pool{ + servers: servers, + wg: wg, + }, nil +} + +// Stop safely shuts down a pool of servers. +func (p *Pool) Stop() { + for _, srv := range p.servers { + srv.Stop() + } + p.wg.Wait() +} diff --git a/server/server.go b/server/server.go new file mode 100644 index 0000000..1d0fa3e --- /dev/null +++ b/server/server.go @@ -0,0 +1,54 @@ +// Copyright 2016 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +// Package server implements an abstraction over servers meant to be run . +// alongside a tracker. +// +// Servers may be implementations of different transport protocols or have their +// own custom behavior. +package server + +import ( + "fmt" + + "github.com/chihaya/chihaya/config" + "github.com/chihaya/chihaya/tracker" +) + +var constructors = make(map[string]Constructor) + +// Constructor is a function that creates a new Server. +type Constructor func(*config.ServerConfig, *tracker.Tracker) (Server, error) + +// Register makes a Constructor available by the provided name. +// +// If this function is called twice with the same name or if the Constructor is +// nil, it panics. +func Register(name string, con Constructor) { + if con == nil { + panic("server: could not register nil Constructor") + } + if _, dup := constructors[name]; dup { + panic("server: could not register duplicate Constructor: " + name) + } + constructors[name] = con +} + +// New creates a Server specified by a configuration. +func New(cfg *config.ServerConfig, tkr *tracker.Tracker) (Server, error) { + con, ok := constructors[cfg.Name] + if !ok { + return nil, fmt.Errorf( + "server: unknown Constructor %q (forgotten import?)", + cfg.Name, + ) + } + return con(cfg, tkr) +} + +// Server represents one instance of a server accessing the tracker. +type Server interface { + Start() + Stop() +} diff --git a/server/store/client_store.go b/server/store/client_store.go new file mode 100644 index 0000000..90c8b28 --- /dev/null +++ b/server/store/client_store.go @@ -0,0 +1,49 @@ +// Copyright 2016 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package store + +import "fmt" + +var clientStoreDrivers = make(map[string]ClientStoreDriver) + +// ClientStore represents an interface for manipulating clientIDs. +type ClientStore interface { + CreateClient(clientID string) error + FindClient(peerID string) (bool, error) + DeleteClient(clientID string) error +} + +// ClientStoreDriver represents an interface for creating a handle to the +// storage of swarms. +type ClientStoreDriver interface { + New(*Config) (ClientStore, error) +} + +// RegisterClientStoreDriver makes a driver available by the provided name. +// +// If this function is called twice with the same name or if the driver is nil, +// it panics. +func RegisterClientStoreDriver(name string, driver ClientStoreDriver) { + if driver == nil { + panic("store: could not register nil ClientStoreDriver") + } + if _, dup := clientStoreDrivers[name]; dup { + panic("store: could not register duplicate ClientStoreDriver: " + name) + } + clientStoreDrivers[name] = driver +} + +// OpenClientStore returns a ClientStore specified by a configuration. +func OpenClientStore(name string, cfg *Config) (ClientStore, error) { + driver, ok := clientStoreDrivers[name] + if !ok { + return nil, fmt.Errorf( + "store: unknown driver %q (forgotten import?)", + name, + ) + } + + return driver.New(cfg) +} diff --git a/server/store/memory/client_store.go b/server/store/memory/client_store.go new file mode 100644 index 0000000..ac1e3e3 --- /dev/null +++ b/server/store/memory/client_store.go @@ -0,0 +1,59 @@ +// Copyright 2016 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package memory + +import ( + "sync" + + "github.com/chihaya/chihaya/pkg/clientid" + "github.com/chihaya/chihaya/server/store" +) + +func init() { + store.RegisterClientStoreDriver("memory", &clientStoreDriver{}) +} + +type clientStoreDriver struct{} + +func (d *clientStoreDriver) New(cfg *store.Config) (store.ClientStore, error) { + return &clientStore{ + clientIDs: make(map[string]struct{}), + }, nil +} + +type clientStore struct { + clientIDs map[string]struct{} + sync.RWMutex +} + +var _ store.ClientStore = &clientStore{} + +func (s *clientStore) CreateClient(clientID string) error { + s.Lock() + defer s.Unlock() + + s.clientIDs[clientID] = struct{}{} + + return nil +} + +func (s *clientStore) FindClient(peerID string) (bool, error) { + clientID := clientid.New(peerID) + s.Lock() + defer s.Unlock() + + _, ok := s.clientIDs[clientID] + + return ok, nil +} + +func (s *clientStore) DeleteClient(clientID string) error { + s.Lock() + defer s.Unlock() + + delete(s.clientIDs, clientID) + + return nil +} diff --git a/server/store/memory/peer_store.go b/server/store/memory/peer_store.go new file mode 100644 index 0000000..699ca1b --- /dev/null +++ b/server/store/memory/peer_store.go @@ -0,0 +1,272 @@ +// Copyright 2016 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package memory + +import ( + "hash/fnv" + "runtime" + "sync" + "time" + + "gopkg.in/yaml.v2" + + "github.com/chihaya/chihaya" + "github.com/chihaya/chihaya/server/store" +) + +func init() { + store.RegisterPeerStoreDriver("memory", &peerStoreDriver{}) +} + +type peerStoreDriver struct{} + +func (d *peerStoreDriver) New(storecfg *store.Config) (store.PeerStore, error) { + cfg, err := newPeerStoreConfig(storecfg) + if err != nil { + return nil, err + } + + return &peerStore{ + shards: make([]*peerShard, cfg.Shards), + }, nil +} + +type peerStoreConfig struct { + Shards int `yaml:"shards"` +} + +func newPeerStoreConfig(storecfg *store.Config) (*peerStoreConfig, error) { + bytes, err := yaml.Marshal(storecfg.PeerStoreConfig) + if err != nil { + return nil, err + } + + var cfg peerStoreConfig + err = yaml.Unmarshal(bytes, &cfg) + if err != nil { + return nil, err + } + + return &cfg, nil +} + +const seedersSuffix = "-seeders" +const leechersSuffix = "-leechers" + +type peer struct { + chihaya.Peer + LastAction time.Time +} + +type peerShard struct { + peers map[string]map[string]peer + sync.RWMutex +} + +type peerStore struct { + shards []*peerShard +} + +var _ store.PeerStore = &peerStore{} + +func (s *peerStore) shardIndex(infohash string) uint32 { + idx := fnv.New32() + idx.Write([]byte(infohash)) + return idx.Sum32() % uint32(len(s.shards)) +} + +func (s *peerStore) PutSeeder(infohash string, p chihaya.Peer) error { + key := infohash + seedersSuffix + + shard := s.shards[s.shardIndex(infohash)] + shard.Lock() + defer shard.Unlock() + + if shard.peers[key] == nil { + shard.peers[key] = make(map[string]peer) + } + + shard.peers[key][p.ID] = peer{ + Peer: p, + LastAction: time.Now(), + } + + return nil +} + +func (s *peerStore) DeleteSeeder(infohash, peerID string) error { + key := infohash + seedersSuffix + + shard := s.shards[s.shardIndex(infohash)] + shard.Lock() + defer shard.Unlock() + + if shard.peers[key] == nil { + return nil + } + + delete(shard.peers[key], peerID) + + if len(shard.peers[key]) == 0 { + shard.peers[key] = nil + } + + return nil +} + +func (s *peerStore) PutLeecher(infohash string, p chihaya.Peer) error { + key := infohash + leechersSuffix + + shard := s.shards[s.shardIndex(infohash)] + shard.Lock() + defer shard.Unlock() + + if shard.peers[key] == nil { + shard.peers[key] = make(map[string]peer) + } + + shard.peers[key][p.ID] = peer{ + Peer: p, + LastAction: time.Now(), + } + + return nil +} + +func (s *peerStore) DeleteLeecher(infohash, peerID string) error { + key := infohash + leechersSuffix + + shard := s.shards[s.shardIndex(infohash)] + shard.Lock() + defer shard.Unlock() + + if shard.peers[key] == nil { + return nil + } + + delete(shard.peers[key], peerID) + + if len(shard.peers[key]) == 0 { + shard.peers[key] = nil + } + + return nil +} + +func (s *peerStore) GraduateLeecher(infohash string, p chihaya.Peer) error { + leecherKey := infohash + leechersSuffix + seederKey := infohash + seedersSuffix + + shard := s.shards[s.shardIndex(infohash)] + shard.Lock() + defer shard.Unlock() + + if shard.peers[leecherKey] != nil { + delete(shard.peers[leecherKey], p.ID) + } + + if shard.peers[seederKey] == nil { + shard.peers[seederKey] = make(map[string]peer) + } + + shard.peers[seederKey][p.ID] = peer{ + Peer: p, + LastAction: time.Now(), + } + + return nil +} + +func (s *peerStore) CollectGarbage(cutoff time.Time) error { + for _, shard := range s.shards { + shard.RLock() + var keys []string + for key := range shard.peers { + keys = append(keys, key) + } + shard.RUnlock() + runtime.Gosched() + + for _, key := range keys { + shard.Lock() + var peersToDelete []string + for peerID, p := range shard.peers[key] { + if p.LastAction.Before(cutoff) { + peersToDelete = append(peersToDelete, peerID) + } + } + + for _, peerID := range peersToDelete { + delete(shard.peers[key], peerID) + } + shard.Unlock() + runtime.Gosched() + } + + runtime.Gosched() + } + + return nil +} + +func (s *peerStore) AnnouncePeers(infohash string, seeder bool, numWant int) (peers, peers6 []chihaya.Peer, err error) { + leecherKey := infohash + leechersSuffix + seederKey := infohash + seedersSuffix + + shard := s.shards[s.shardIndex(infohash)] + shard.RLock() + defer shard.RUnlock() + + if seeder { + // Append leechers as possible. + leechers := shard.peers[leecherKey] + for _, p := range leechers { + if numWant == 0 { + break + } + + if p.IP.To4() == nil { + peers6 = append(peers6, p.Peer) + } else { + peers = append(peers, p.Peer) + } + numWant-- + } + } else { + // Append as many seeders as possible. + seeders := shard.peers[seederKey] + for _, p := range seeders { + if numWant == 0 { + break + } + + if p.IP.To4() == nil { + peers6 = append(peers6, p.Peer) + } else { + peers = append(peers, p.Peer) + } + numWant-- + } + + // Append leechers until we reach numWant. + leechers := shard.peers[leecherKey] + if numWant > 0 { + for _, p := range leechers { + if numWant == 0 { + break + } + + if p.IP.To4() == nil { + peers6 = append(peers6, p.Peer) + } else { + peers = append(peers, p.Peer) + } + numWant-- + } + } + } + + return +} diff --git a/server/store/peer_store.go b/server/store/peer_store.go new file mode 100644 index 0000000..2c6c2a9 --- /dev/null +++ b/server/store/peer_store.go @@ -0,0 +1,62 @@ +// Copyright 2016 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package store + +import ( + "fmt" + "time" + + "github.com/chihaya/chihaya" +) + +var peerStoreDrivers = make(map[string]PeerStoreDriver) + +// PeerStore represents an interface for manipulating peers. +type PeerStore interface { + PutSeeder(infohash string, p chihaya.Peer) error + DeleteSeeder(infohash, peerID string) error + + PutLeecher(infohash string, p chihaya.Peer) error + DeleteLeecher(infohash, peerID string) error + + GraduateLeecher(infohash string, p chihaya.Peer) error + AnnouncePeers(infohash string, seeder bool, numWant int) (peers, peers6 []chihaya.Peer, err error) + CollectGarbage(cutoff time.Time) error +} + +// PeerStoreDriver represents an interface for creating a handle to the storage +// of peers. +type PeerStoreDriver interface { + New(*Config) (PeerStore, error) +} + +// RegisterPeerStoreDriver makes a driver available by the provided name. +// +// If this function is called twice with the same name or if the driver is nil, +// it panics. +func RegisterPeerStoreDriver(name string, driver PeerStoreDriver) { + if driver == nil { + panic("storage: could not register nil PeerStoreDriver") + } + + if _, dup := peerStoreDrivers[name]; dup { + panic("storage: could not register duplicate PeerStoreDriver: " + name) + } + + peerStoreDrivers[name] = driver +} + +// OpenPeerStore returns a PeerStore specified by a configuration. +func OpenPeerStore(name string, cfg *Config) (PeerStore, error) { + driver, ok := peerStoreDrivers[name] + if !ok { + return nil, fmt.Errorf( + "storage: unknown driver %q (forgotten import?)", + name, + ) + } + + return driver.New(cfg) +} diff --git a/server/store/store.go b/server/store/store.go new file mode 100644 index 0000000..c50c4d7 --- /dev/null +++ b/server/store/store.go @@ -0,0 +1,95 @@ +// Copyright 2016 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package store + +import ( + "errors" + "log" + "sync" + "time" + + "gopkg.in/yaml.v2" + + "github.com/chihaya/chihaya/config" + "github.com/chihaya/chihaya/server" + "github.com/chihaya/chihaya/tracker" +) + +var theStore *Store + +func init() { + server.Register("store", constructor) +} + +func constructor(srvcfg *config.ServerConfig, tkr *tracker.Tracker) (server.Server, error) { + if theStore == nil { + cfg, err := newConfig(srvcfg) + if err != nil { + return nil, errors.New("store: invalid store config: " + err.Error()) + } + + theStore = &Store{ + cfg: cfg, + tkr: tkr, + } + } + return theStore, nil +} + +type Config struct { + Addr string `yaml:"addr"` + RequestTimeout time.Duration `yaml:"requestTimeout"` + ReadTimeout time.Duration `yaml:"readTimeout"` + WriteTimeout time.Duration `yaml:"writeTimeout"` + GCAfter time.Duration `yaml:"gcAfter"` + ClientStore string `yaml:"clientStore"` + ClientStoreConfig interface{} `yaml:"clienStoreConfig"` + PeerStore string `yaml:"peerStore"` + PeerStoreConfig interface{} `yaml:"peerStoreConfig"` +} + +func newConfig(srvcfg interface{}) (*Config, error) { + bytes, err := yaml.Marshal(srvcfg) + if err != nil { + return nil, err + } + + var cfg Config + err = yaml.Unmarshal(bytes, &cfg) + if err != nil { + return nil, err + } + + return &cfg, nil +} + +// MustGetStore is used by middleware to access the store. +// +// This function calls log.Fatal if a server hasn't been already created by +// the server package. +func MustGetStore() *Store { + if theStore == nil { + log.Fatal("store middleware used without store server") + } + return theStore +} + +type Store struct { + cfg *Config + tkr *tracker.Tracker + shutdown chan struct{} + wg sync.WaitGroup + + PeerStore + ClientStore +} + +func (s *Store) Start() { +} + +func (s *Store) Stop() { + close(s.shutdown) + s.wg.Wait() +} diff --git a/tracker/middleware.go b/tracker/middleware.go new file mode 100644 index 0000000..8dd37ca --- /dev/null +++ b/tracker/middleware.go @@ -0,0 +1,110 @@ +// Copyright 2016 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file.package middleware + +package tracker + +import ( + "github.com/chihaya/chihaya" + "github.com/chihaya/chihaya/config" +) + +// AnnounceHandler is a function that operates on an AnnounceResponse before it +// has been delivered to a client. +type AnnounceHandler func(*config.TrackerConfig, chihaya.AnnounceRequest, *chihaya.AnnounceResponse) error + +func (h AnnounceHandler) handleAnnounce(cfg *config.TrackerConfig, req chihaya.AnnounceRequest, resp *chihaya.AnnounceResponse) error { + return h(cfg, req, resp) +} + +// AnnounceMiddleware is higher-order AnnounceHandler used to implement modular +// behavior processing an announce. +type AnnounceMiddleware func(AnnounceHandler) AnnounceHandler + +type announceChain struct{ mw []AnnounceMiddleware } + +func (c *announceChain) Append(mw ...AnnounceMiddleware) { + newMW := make([]AnnounceMiddleware, len(c.mw)+len(mw)) + copy(newMW[:len(c.mw)], c.mw) + copy(newMW[len(c.mw):], mw) + c.mw = newMW +} + +func (c *announceChain) Handler() AnnounceHandler { + final := func(cfg *config.TrackerConfig, req chihaya.AnnounceRequest, resp *chihaya.AnnounceResponse) error { + return nil + } + for i := len(c.mw) - 1; i >= 0; i-- { + final = c.mw[i](final) + } + return final +} + +var announceMiddleware = make(map[string]AnnounceMiddleware) + +// RegisterAnnounceMiddleware makes a middleware available to the tracker under +// the provided named. +// +// If this function is called twice with the same name or if the handler is nil, +// it panics. +func RegisterAnnounceMiddleware(name string, mw AnnounceMiddleware) { + if mw == nil { + panic("tracker: could not register nil AnnounceMiddleware") + } + + if _, dup := announceMiddleware[name]; dup { + panic("tracker: could not register duplicate AnnounceMiddleware: " + name) + } + + announceMiddleware[name] = mw +} + +// ScrapeHandler is a middleware function that operates on a ScrapeResponse +// before it has been delivered to a client. +type ScrapeHandler func(*config.TrackerConfig, chihaya.ScrapeRequest, *chihaya.ScrapeResponse) error + +func (h ScrapeHandler) handleScrape(cfg *config.TrackerConfig, req chihaya.ScrapeRequest, resp *chihaya.ScrapeResponse) error { + return h(cfg, req, resp) +} + +// ScrapeMiddleware is higher-order ScrapeHandler used to implement modular +// behavior processing a scrape. +type ScrapeMiddleware func(ScrapeHandler) ScrapeHandler + +type scrapeChain struct{ mw []ScrapeMiddleware } + +func (c *scrapeChain) Append(mw ...ScrapeMiddleware) { + newMW := make([]ScrapeMiddleware, len(c.mw)+len(mw)) + copy(newMW[:len(c.mw)], c.mw) + copy(newMW[len(c.mw):], mw) + c.mw = newMW +} + +func (c *scrapeChain) Handler() ScrapeHandler { + final := func(cfg *config.TrackerConfig, req chihaya.ScrapeRequest, resp *chihaya.ScrapeResponse) error { + return nil + } + for i := len(c.mw) - 1; i >= 0; i-- { + final = c.mw[i](final) + } + return final +} + +var scrapeMiddleware = make(map[string]ScrapeMiddleware) + +// RegisterScrapeMiddleware makes a middleware available to the tracker under +// the provided named. +// +// If this function is called twice with the same name or if the handler is nil, +// it panics. +func RegisterScrapeMiddleware(name string, mw ScrapeMiddleware) { + if mw == nil { + panic("tracker: could not register nil ScrapeMiddleware") + } + + if _, dup := scrapeMiddleware[name]; dup { + panic("tracker: could not register duplicate ScrapeMiddleware: " + name) + } + + scrapeMiddleware[name] = mw +} diff --git a/tracker/middleware_test.go b/tracker/middleware_test.go new file mode 100644 index 0000000..6582675 --- /dev/null +++ b/tracker/middleware_test.go @@ -0,0 +1,53 @@ +// Copyright 2016 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file.package middleware + +package tracker + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/chihaya/chihaya" + "github.com/chihaya/chihaya/config" +) + +func testAnnounceMW1(next AnnounceHandler) AnnounceHandler { + return func(cfg *config.TrackerConfig, req chihaya.AnnounceRequest, resp *chihaya.AnnounceResponse) error { + resp.IPv4Peers = append(resp.IPv4Peers, chihaya.Peer{ + Port: 1, + }) + return next(cfg, req, resp) + } +} + +func testAnnounceMW2(next AnnounceHandler) AnnounceHandler { + return func(cfg *config.TrackerConfig, req chihaya.AnnounceRequest, resp *chihaya.AnnounceResponse) error { + resp.IPv4Peers = append(resp.IPv4Peers, chihaya.Peer{ + Port: 2, + }) + return next(cfg, req, resp) + } +} + +func testAnnounceMW3(next AnnounceHandler) AnnounceHandler { + return func(cfg *config.TrackerConfig, req chihaya.AnnounceRequest, resp *chihaya.AnnounceResponse) error { + resp.IPv4Peers = append(resp.IPv4Peers, chihaya.Peer{ + Port: 3, + }) + return next(cfg, req, resp) + } +} + +func TestAnnounceChain(t *testing.T) { + var achain announceChain + achain.Append(testAnnounceMW1) + achain.Append(testAnnounceMW2) + achain.Append(testAnnounceMW3) + handler := achain.Handler() + resp := &chihaya.AnnounceResponse{} + err := handler(nil, chihaya.AnnounceRequest{}, resp) + assert.Nil(t, err, "the handler should not return an error") + assert.Equal(t, resp.IPv4Peers, []chihaya.Peer{chihaya.Peer{Port: 1}, chihaya.Peer{Port: 2}, chihaya.Peer{Port: 3}}, "the list of peers added from the middleware should be in the same order.") +} diff --git a/tracker/tracker.go b/tracker/tracker.go new file mode 100644 index 0000000..fa49e4b --- /dev/null +++ b/tracker/tracker.go @@ -0,0 +1,64 @@ +// Copyright 2016 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file.package middleware + +package tracker + +import ( + "errors" + + "github.com/chihaya/chihaya" + "github.com/chihaya/chihaya/config" +) + +// Tracker represents a protocol independent, middleware-composed BitTorrent +// tracker. +type Tracker struct { + cfg *config.TrackerConfig + handleAnnounce AnnounceHandler + handleScrape ScrapeHandler +} + +// NewTracker parses a config and generates a Tracker composed by the middleware +// specified in the config. +func NewTracker(cfg *config.TrackerConfig) (*Tracker, error) { + var achain announceChain + for _, mwName := range cfg.AnnounceMiddleware { + mw, ok := announceMiddleware[mwName] + if !ok { + return nil, errors.New("failed to find announce middleware: " + mwName) + } + achain.Append(mw) + } + + var schain scrapeChain + for _, mwName := range cfg.ScrapeMiddleware { + mw, ok := scrapeMiddleware[mwName] + if !ok { + return nil, errors.New("failed to find scrape middleware: " + mwName) + } + schain.Append(mw) + } + + return &Tracker{ + cfg: cfg, + handleAnnounce: achain.Handler(), + handleScrape: schain.Handler(), + }, nil +} + +// HandleAnnounce runs an AnnounceRequest through a Tracker's middleware and +// returns the result. +func (t *Tracker) HandleAnnounce(req chihaya.AnnounceRequest) (*chihaya.AnnounceResponse, error) { + resp := &chihaya.AnnounceResponse{} + err := t.handleAnnounce(t.cfg, req, resp) + return resp, err +} + +// HandleScrape runs a ScrapeRequest through a Tracker's middleware and returns +// the result. +func (t *Tracker) HandleScrape(req chihaya.ScrapeRequest) (*chihaya.ScrapeResponse, error) { + resp := &chihaya.ScrapeResponse{} + err := t.handleScrape(t.cfg, req, resp) + return resp, err +}