expose reflector timeout, bump it up

This commit is contained in:
Alex Grintsvayg 2018-08-15 15:50:09 -04:00
parent 4284c3b1f9
commit bd8a35e366
3 changed files with 11 additions and 8 deletions

View file

@ -5,6 +5,7 @@ import (
"os/signal" "os/signal"
"strconv" "strconv"
"syscall" "syscall"
"time"
"github.com/lbryio/reflector.go/db" "github.com/lbryio/reflector.go/db"
"github.com/lbryio/reflector.go/meta" "github.com/lbryio/reflector.go/meta"
@ -35,6 +36,7 @@ func reflectorCmd(cmd *cobra.Command, args []string) {
s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
combo := store.NewDBBackedS3Store(s3, db) combo := store.NewDBBackedS3Store(s3, db)
reflectorServer := reflector.NewServer(combo) reflectorServer := reflector.NewServer(combo)
reflectorServer.Timeout = 10 * time.Second
err = reflectorServer.Start(":" + strconv.Itoa(reflector.DefaultPort)) err = reflectorServer.Start(":" + strconv.Itoa(reflector.DefaultPort))
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)

View file

@ -32,17 +32,18 @@ const (
// Server is and instance of the reflector server. It houses the blob store and listener. // Server is and instance of the reflector server. It houses the blob store and listener.
type Server struct { type Server struct {
store store.BlobStore Timeout time.Duration // timeout to read or write next message
timeout time.Duration // timeout to read or write next message
grp *stop.Group store store.BlobStore
grp *stop.Group
} }
// NewServer returns an initialized reflector server pointer. // NewServer returns an initialized reflector server pointer.
func NewServer(store store.BlobStore) *Server { func NewServer(store store.BlobStore) *Server {
return &Server{ return &Server{
Timeout: DefaultTimeout,
store: store, store: store,
grp: stop.New(), grp: stop.New(),
timeout: DefaultTimeout,
} }
} }
@ -296,7 +297,7 @@ func (s *Server) sendTransferResponse(conn net.Conn, receivedBlob, isSdBlob bool
} }
func (s *Server) read(conn net.Conn, v interface{}) error { func (s *Server) read(conn net.Conn, v interface{}) error {
err := conn.SetReadDeadline(time.Now().Add(s.timeout)) err := conn.SetReadDeadline(time.Now().Add(s.Timeout))
if err != nil { if err != nil {
return errors.Err(err) return errors.Err(err)
} }
@ -305,7 +306,7 @@ func (s *Server) read(conn net.Conn, v interface{}) error {
} }
func (s *Server) readRawBlob(conn net.Conn, blobSize int) ([]byte, error) { func (s *Server) readRawBlob(conn net.Conn, blobSize int) ([]byte, error) {
err := conn.SetReadDeadline(time.Now().Add(s.timeout)) err := conn.SetReadDeadline(time.Now().Add(s.Timeout))
if err != nil { if err != nil {
return nil, errors.Err(err) return nil, errors.Err(err)
} }
@ -316,7 +317,7 @@ func (s *Server) readRawBlob(conn net.Conn, blobSize int) ([]byte, error) {
} }
func (s *Server) write(conn net.Conn, b []byte) error { func (s *Server) write(conn net.Conn, b []byte) error {
err := conn.SetWriteDeadline(time.Now().Add(s.timeout)) err := conn.SetWriteDeadline(time.Now().Add(s.Timeout))
if err != nil { if err != nil {
return errors.Err(err) return errors.Err(err)
} }

View file

@ -129,7 +129,7 @@ func TestServer_Timeout(t *testing.T) {
} }
srv := NewServer(&store.MemoryBlobStore{}) srv := NewServer(&store.MemoryBlobStore{})
srv.timeout = testTimeout srv.Timeout = testTimeout
err = srv.Start("127.0.0.1:" + strconv.Itoa(port)) err = srv.Start("127.0.0.1:" + strconv.Itoa(port))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)