diff --git a/.gitignore b/.gitignore index 65444fc..8c1fca8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /.idea /blobs +/config.json diff --git a/main.go b/main.go index a28a4f7..34c84ce 100644 --- a/main.go +++ b/main.go @@ -1,11 +1,16 @@ package main import ( + "encoding/json" "flag" + "io/ioutil" "math/rand" "strconv" "time" + "github.com/lbryio/reflector.go/peer" + "github.com/lbryio/reflector.go/store" + log "github.com/sirupsen/logrus" ) @@ -18,39 +23,58 @@ func checkErr(err error) { func main() { rand.Seed(time.Now().UnixNano()) - port := DefaultPort - //address := "52.14.109.125:" + strconv.Itoa(port) - address := "localhost:" + strconv.Itoa(port) - - serve := flag.Bool("server", false, "Run server") - blobDir := flag.String("blobdir", "", "Where blobs will be saved to") + confFile := flag.String("conf", "config.json", "Config file") flag.Parse() - if serve != nil && *serve { - if blobDir == nil || *blobDir == "" { - log.Fatal("-blobdir required") - } - server := NewServer(*blobDir) - log.Fatal(server.ListenAndServe(address)) - return - } - var err error - client := Client{} + conf := loadConfig(*confFile) - log.Println("Connecting to " + address) - err = client.Connect(address) - checkErr(err) + peerAddress := "localhost:" + strconv.Itoa(peer.DefaultPort) + server := peer.NewServer(store.NewS3BlobStore(conf.AwsID, conf.AwsSecret, conf.BucketRegion, conf.BucketName)) + log.Fatal(server.ListenAndServe(peerAddress)) + return - log.Println("Connected") + // + //address := "52.14.109.125:" + strconv.Itoa(port) + //reflectorAddress := "localhost:" + strconv.Itoa(reflector.DefaultPort) + //server := reflector.NewServer(store.NewS3BlobStore(conf.awsID, conf.awsSecret, conf.bucketRegion, conf.bucketName)) + //log.Fatal(server.ListenAndServe(reflectorAddress)) - defer func() { - log.Println("Closing connection") - client.Close() - }() - - blob := make([]byte, 2*1024*1024) - _, err = rand.Read(blob) - checkErr(err) - err = client.SendBlob(blob) - checkErr(err) + // + //var err error + //client := reflector.Client{} + // + //log.Println("Connecting to " + reflectorAddress) + //err = client.Connect(reflectorAddress) + //checkErr(err) + // + //log.Println("Connected") + // + //defer func() { + // log.Println("Closing connection") + // client.Close() + //}() + // + //blob := make([]byte, 2*1024*1024) + //_, err = rand.Read(blob) + //checkErr(err) + //err = client.SendBlob(blob) + //checkErr(err) +} + +type config struct { + AwsID string `json:"aws_id"` + AwsSecret string `json:"aws_secret"` + BucketRegion string `json:"bucket_region"` + BucketName string `json:"bucket_name"` +} + +func loadConfig(path string) config { + raw, err := ioutil.ReadFile(path) + checkErr(err) + + var c config + err = json.Unmarshal(raw, &c) + checkErr(err) + + return c } diff --git a/peer/server.go b/peer/server.go new file mode 100644 index 0000000..4dfeb62 --- /dev/null +++ b/peer/server.go @@ -0,0 +1,189 @@ +package peer + +import ( + "crypto/sha512" + "encoding/hex" + "encoding/json" + "io" + "net" + + "github.com/lbryio/reflector.go/store" + + log "github.com/sirupsen/logrus" +) + +const ( + DefaultPort = 3333 +) + +type Server struct { + store store.BlobStore +} + +func NewServer(store store.BlobStore) *Server { + return &Server{ + store: store, + } +} + +func (s *Server) ListenAndServe(address string) error { + log.Println("Listening on " + address) + l, err := net.Listen("tcp", address) + if err != nil { + return err + } + defer l.Close() + + for { + conn, err := l.Accept() + if err != nil { + log.Error(err) + } else { + go s.handleConn(conn) + } + } +} + +func (s *Server) handleConn(conn net.Conn) { + // TODO: connection should time out eventually + defer conn.Close() + + err := s.doAvailabilityRequest(conn) + if err != nil { + log.Error(err) + return + } + + err = s.doPaymentRateNegotiation(conn) + if err != nil { + log.Error(err) + return + } + + for { + err = s.doBlobRequest(conn) + if err != nil { + if err != io.EOF { + log.Error(err) + } + return + } + } +} + +func (s *Server) doAvailabilityRequest(conn net.Conn) error { + var request availabilityRequest + err := json.NewDecoder(conn).Decode(&request) + if err != nil { + return err + } + + address := "bJxKvpD96kaJLriqVajZ7SaQTsWWyrGQct" + availableBlobs := []string{} + for _, blobHash := range request.RequestedBlobs { + exists, err := s.store.Has(blobHash) + if err != nil { + return err + } + if exists { + availableBlobs = append(availableBlobs, blobHash) + } + } + + response, err := json.Marshal(availabilityResponse{LbrycrdAddress: address, AvailableBlobs: availableBlobs}) + if err != nil { + return err + } + + _, err = conn.Write(response) + if err != nil { + return err + } + + return nil +} + +func (s *Server) doPaymentRateNegotiation(conn net.Conn) error { + var request paymentRateRequest + err := json.NewDecoder(conn).Decode(&request) + if err != nil { + return err + } + + offerReply := paymentRateAccepted + if request.BlobDataPaymentRate < 0 { + offerReply = paymentRateTooLow + } + + response, err := json.Marshal(paymentRateResponse{BlobDataPaymentRate: offerReply}) + if err != nil { + return err + } + + _, err = conn.Write(response) + if err != nil { + return err + } + + return nil +} + +func (s *Server) doBlobRequest(conn net.Conn) error { + var request blobRequest + err := json.NewDecoder(conn).Decode(&request) + if err != nil { + return err + } + + log.Println("Sending blob " + request.RequestedBlob[:8]) + + blob, err := s.store.Get(request.RequestedBlob) + if err != nil { + return err + } + + response, err := json.Marshal(blobResponse{IncomingBlob: incomingBlob{ + BlobHash: getBlobHash(blob), + Length: len(blob), + }}) + if err != nil { + return err + } + + _, err = conn.Write(response) + if err != nil { + return err + } + + _, err = conn.Write(blob) + if err != nil { + return err + } + + return nil +} + +func readAll(conn net.Conn) { + buf := make([]byte, 0, 4096) // big buffer + tmp := make([]byte, 256) // using small tmo buffer for demonstrating + for { + n, err := conn.Read(tmp) + if err != nil { + if err != io.EOF { + log.Println("read error:", err) + } + break + } + log.Println("got", n, "bytes.") + buf = append(buf, tmp[:n]...) + } + log.Println("total size:", len(buf)) + if len(buf) > 0 { + log.Println(string(buf)) + } +} + +func getBlobHash(blob []byte) string { + hashBytes := sha512.Sum384(blob) + return hex.EncodeToString(hashBytes[:]) +} diff --git a/peer/shared.go b/peer/shared.go new file mode 100644 index 0000000..d640dae --- /dev/null +++ b/peer/shared.go @@ -0,0 +1,38 @@ +package peer + +type availabilityRequest struct { + LbrycrdAddress bool `json:"lbrycrd_address"` + RequestedBlobs []string `json:"requested_blobs"` +} + +type availabilityResponse struct { + LbrycrdAddress string `json:"lbrycrd_address"` + AvailableBlobs []string `json:"available_blobs"` +} + +const ( + paymentRateAccepted = "RATE_ACCEPTED" + paymentRateTooLow = "RATE_TOO_LOW" + paymentRateUnset = "RATE_UNSET" +) + +type paymentRateRequest struct { + BlobDataPaymentRate float64 `json:"blob_data_payment_rate"` +} + +type paymentRateResponse struct { + BlobDataPaymentRate string `json:"blob_data_payment_rate"` +} + +type blobRequest struct { + RequestedBlob string `json:"requested_blob"` +} + +type incomingBlob struct { + Error string `json:"error,omitempty"` + BlobHash string `json:"blob_hash"` + Length int `json:"length"` +} +type blobResponse struct { + IncomingBlob incomingBlob `json:"incoming_blob"` +} diff --git a/client.go b/reflector/client.go similarity index 93% rename from client.go rename to reflector/client.go index aac0842..fdec97f 100644 --- a/client.go +++ b/reflector/client.go @@ -1,4 +1,4 @@ -package main +package reflector import ( "encoding/json" @@ -33,8 +33,8 @@ func (c *Client) SendBlob(blob []byte) error { return errors.Err("not connected") } - if len(blob) != BlobSize { - return errors.Err("blob must be exactly " + strconv.Itoa(BlobSize) + " bytes") + if len(blob) != maxBlobSize { + return errors.Err("blob must be exactly " + strconv.Itoa(maxBlobSize) + " bytes") } blobHash := getBlobHash(blob) diff --git a/client_test.go b/reflector/client_test.go similarity index 98% rename from client_test.go rename to reflector/client_test.go index c43d50a..3c1e712 100644 --- a/client_test.go +++ b/reflector/client_test.go @@ -1,4 +1,4 @@ -package main +package reflector import ( "io/ioutil" diff --git a/server.go b/reflector/server.go similarity index 80% rename from server.go rename to reflector/server.go index 331af9e..957d2e5 100644 --- a/server.go +++ b/reflector/server.go @@ -1,37 +1,30 @@ -package main +package reflector import ( "bufio" "encoding/json" "io" - "io/ioutil" "net" - "os" - "path" "strconv" + "github.com/lbryio/reflector.go/store" + "github.com/lbryio/errors.go" log "github.com/sirupsen/logrus" ) type Server struct { - BlobDir string + store store.BlobStore } -func NewServer(blobDir string) *Server { +func NewServer(store store.BlobStore) *Server { return &Server{ - BlobDir: blobDir, + store: store, } } func (s *Server) ListenAndServe(address string) error { - log.Println("Blobs will be saved to " + s.BlobDir) - err := s.ensureBlobDirExists() - if err != nil { - return err - } - log.Println("Listening on " + address) l, err := net.Listen("tcp", address) if err != nil { @@ -42,10 +35,10 @@ func (s *Server) ListenAndServe(address string) error { for { conn, err := l.Accept() if err != nil { - // TODO: dont crash server on error here - return err + log.Error(err) + } else { + go s.handleConn(conn) } - go s.handleConn(conn) } } @@ -93,11 +86,13 @@ func (s *Server) receiveBlob(conn net.Conn) error { } blobExists := false - blobPath := path.Join(s.BlobDir, blobHash) - if !isSdBlob { // we have to say sd blobs are missing because if we say we have it, they wont try to send any content blobs - if _, err := os.Stat(blobPath); !os.IsNotExist(err) { - blobExists = true + if !isSdBlob { + // we have to say sd blobs are missing because if we say we have it, they wont try to send any content blobs + has, err := s.store.Has(blobHash) + if err != nil { + return err } + blobExists = has } err = s.sendBlobResponse(conn, blobExists, isSdBlob) @@ -122,7 +117,7 @@ func (s *Server) receiveBlob(conn net.Conn) error { } log.Println("Got blob " + blobHash[:8]) - err = ioutil.WriteFile(blobPath, blob, 0644) + err = s.store.Put(blobHash, blob) if err != nil { return err } @@ -167,8 +162,8 @@ func (s *Server) readBlobRequest(conn net.Conn) (int, string, bool, error) { var blobSize int isSdBlob := sendRequest.SdBlobHash != "" - if blobSize > BlobSize { - return 0, "", isSdBlob, errors.Err("blob cannot be more than " + strconv.Itoa(BlobSize) + " bytes") + if blobSize > maxBlobSize { + return 0, "", isSdBlob, errors.Err("blob cannot be more than " + strconv.Itoa(maxBlobSize) + " bytes") } if isSdBlob { @@ -222,19 +217,3 @@ func (s *Server) sendTransferResponse(conn net.Conn, receivedBlob, isSdBlob bool } return nil } - -func (s *Server) ensureBlobDirExists() error { - if stat, err := os.Stat(s.BlobDir); err != nil { - if os.IsNotExist(err) { - err2 := os.Mkdir(s.BlobDir, 0755) - if err2 != nil { - return err2 - } - } else { - return err - } - } else if !stat.IsDir() { - return errors.Err("blob dir exists but is not a dir") - } - return nil -} diff --git a/shared.go b/reflector/shared.go similarity index 97% rename from shared.go rename to reflector/shared.go index 8bcc0ef..ad11d38 100644 --- a/shared.go +++ b/reflector/shared.go @@ -1,4 +1,4 @@ -package main +package reflector import ( "crypto/sha512" @@ -10,7 +10,7 @@ import ( const ( DefaultPort = 5566 - BlobSize = 2 * 1024 * 1024 + maxBlobSize = 2 * 1024 * 1024 protocolVersion1 = 0 protocolVersion2 = 1 diff --git a/store/store.go b/store/store.go new file mode 100644 index 0000000..3e2f43c --- /dev/null +++ b/store/store.go @@ -0,0 +1,184 @@ +package store + +import ( + "bytes" + "io/ioutil" + "net/http" + "os" + "path" + + "github.com/lbryio/errors.go" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" +) + +type BlobStore interface { + Has(string) (bool, error) + Get(string) ([]byte, error) + Put(string, []byte) error +} + +type FileBlobStore struct { + dir string + + initialized bool +} + +func NewFileBlobStore(dir string) *FileBlobStore { + return &FileBlobStore{dir: dir} +} + +func (f *FileBlobStore) path(hash string) string { + return path.Join(f.dir, hash) +} + +func (f *FileBlobStore) initOnce() error { + if f.initialized { + return nil + } + defer func() { f.initialized = true }() + + if stat, err := os.Stat(f.dir); err != nil { + if os.IsNotExist(err) { + err2 := os.Mkdir(f.dir, 0755) + if err2 != nil { + return err2 + } + } else { + return err + } + } else if !stat.IsDir() { + return errors.Err("blob dir exists but is not a dir") + } + return nil +} + +func (f *FileBlobStore) Has(hash string) (bool, error) { + err := f.initOnce() + if err != nil { + return false, err + } + + _, err = os.Stat(f.path(hash)) + if err != nil { + if os.IsNotExist(err) { + return false, nil + } + return false, err + } + return true, nil +} + +func (f *FileBlobStore) Get(hash string) ([]byte, error) { + err := f.initOnce() + if err != nil { + return []byte{}, err + } + + file, err := os.Open(f.path(hash)) + if err != nil { + return []byte{}, err + } + + return ioutil.ReadAll(file) +} + +func (f *FileBlobStore) Put(hash string, blob []byte) error { + err := f.initOnce() + if err != nil { + return err + } + + return ioutil.WriteFile(f.path(hash), blob, 0644) +} + +type S3BlobStore struct { + awsID string + awsSecret string + region string + bucket string + + session *session.Session +} + +func NewS3BlobStore(awsID, awsSecret, region, bucket string) *S3BlobStore { + return &S3BlobStore{ + awsID: awsID, + awsSecret: awsSecret, + region: region, + bucket: bucket, + } +} + +func (s *S3BlobStore) initOnce() error { + if s.session != nil { + return nil + } + + sess, err := session.NewSession(&aws.Config{ + Credentials: credentials.NewStaticCredentials(s.awsID, s.awsSecret, ""), + Region: aws.String(s.region), + }) + if err != nil { + return err + } + + s.session = sess + return nil +} + +func (s *S3BlobStore) Has(hash string) (bool, error) { + err := s.initOnce() + if err != nil { + return false, err + } + + _, err = s3.New(s.session).HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(hash), + }) + if err != nil { + if reqFail, ok := err.(s3.RequestFailure); ok && reqFail.StatusCode() == http.StatusNotFound { + return false, nil + } + return false, err + } + + return true, nil +} + +func (s *S3BlobStore) Get(hash string) ([]byte, error) { + err := s.initOnce() + if err != nil { + return []byte{}, err + } + + buf := &aws.WriteAtBuffer{} + _, err = s3manager.NewDownloader(s.session).Download(buf, &s3.GetObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(hash), + }) + if err != nil { + return buf.Bytes(), err + } + + return buf.Bytes(), nil +} + +func (s *S3BlobStore) Put(hash string, blob []byte) error { + err := s.initOnce() + if err != nil { + return err + } + + _, err = s3manager.NewUploader(s.session).Upload(&s3manager.UploadInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(hash), + Body: bytes.NewBuffer(blob), + }) + return err +}