From 08df3b167cd5c7f95ac79fb0e4e53dfb7ea6ffe1 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Wed, 11 Sep 2019 12:30:01 -0400 Subject: [PATCH] add getstream command to download a stream from a peer --- cmd/getstream.go | 71 ++++++++++++++++++++ cmd/peer.go | 19 ++++-- peer/client.go | 164 +++++++++++++++++++++++++++++++++++++++++++++++ peer/server.go | 6 +- 4 files changed, 252 insertions(+), 8 deletions(-) create mode 100644 cmd/getstream.go create mode 100644 peer/client.go diff --git a/cmd/getstream.go b/cmd/getstream.go new file mode 100644 index 0000000..ca3de6d --- /dev/null +++ b/cmd/getstream.go @@ -0,0 +1,71 @@ +package cmd + +import ( + "io/ioutil" + "os" + + "github.com/lbryio/lbry.go/stream" + + "github.com/lbryio/lbry.go/extras/errors" + "github.com/lbryio/reflector.go/peer" + + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +func init() { + var cmd = &cobra.Command{ + Use: "getstream ADDRESS:PORT SDHASH", + Short: "Get a stream from a reflector server", + Args: cobra.ExactArgs(2), + Run: getStreamCmd, + } + rootCmd.AddCommand(cmd) +} + +func getStreamCmd(cmd *cobra.Command, args []string) { + addr := args[0] + sdHash := args[1] + + c := peer.Client{} + err := c.Connect(addr) + if err != nil { + log.Fatal("error connecting client to server: ", err) + } + + s, err := c.GetStream(sdHash) + if err != nil { + log.Error(errors.FullTrace(err)) + return + } + + var sd stream.SDBlob + err = sd.FromBlob(s[0]) + if err != nil { + log.Error(errors.FullTrace(err)) + return + } + + log.Printf("Downloading %d blobs for %s", len(sd.BlobInfos)-1, sd.SuggestedFileName) + + data, err := s.Data() + if err != nil { + log.Error(errors.FullTrace(err)) + return + } + + wd, err := os.Getwd() + if err != nil { + log.Error(errors.FullTrace(err)) + return + } + + filename := wd + "/" + sd.SuggestedFileName + err = ioutil.WriteFile(filename, data, 0644) + if err != nil { + log.Error(errors.FullTrace(err)) + return + } + + log.Printf("Wrote %d bytes to %s\n", len(data), filename) +} diff --git a/cmd/peer.go b/cmd/peer.go index 07fd71e..7a82eae 100644 --- a/cmd/peer.go +++ b/cmd/peer.go @@ -14,23 +14,32 @@ import ( "github.com/spf13/cobra" ) +var peerNoDB bool + func init() { var cmd = &cobra.Command{ Use: "peer", Short: "Run peer server", Run: peerCmd, } + cmd.Flags().BoolVar(&peerNoDB, "nodb", false, "Don't connect to a db and don't use a db-backed blob store") rootCmd.AddCommand(cmd) } func peerCmd(cmd *cobra.Command, args []string) { - db := new(db.SQL) - err := db.Connect(globalConfig.DBConn) - checkErr(err) + var err error s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) - combo := store.NewDBBackedS3Store(s3, db) - peerServer := peer.NewServer(combo) + peerServer := peer.NewServer(s3) + + if !peerNoDB { + db := new(db.SQL) + err = db.Connect(globalConfig.DBConn) + checkErr(err) + + combo := store.NewDBBackedS3Store(s3, db) + peerServer = peer.NewServer(combo) + } err = peerServer.Start(":" + strconv.Itoa(peer.DefaultPort)) if err != nil { diff --git a/peer/client.go b/peer/client.go new file mode 100644 index 0000000..a8e2591 --- /dev/null +++ b/peer/client.go @@ -0,0 +1,164 @@ +package peer + +import ( + "bufio" + "encoding/hex" + "encoding/json" + "io" + "net" + "time" + + "github.com/lbryio/lbry.go/stream" + + "github.com/lbryio/lbry.go/extras/errors" + + log "github.com/sirupsen/logrus" +) + +// ErrBlobExists is a default error for when a blob already exists on the reflector server. +var ErrBlobExists = errors.Base("blob exists on server") + +// Client is an instance of a client connected to a server. +type Client struct { + Timeout time.Duration + conn net.Conn + buf *bufio.Reader + connected bool +} + +// Connect connects to a specific clients and errors if it cannot be contacted. +func (c *Client) Connect(address string) error { + var err error + if c.Timeout == 0 { + c.Timeout = 5 * time.Second + } + c.conn, err = net.Dial("tcp4", address) + if err != nil { + return err + } + c.connected = true + c.buf = bufio.NewReader(c.conn) + return nil +} + +// Close closes the connection with the client. +func (c *Client) Close() error { + c.connected = false + return c.conn.Close() +} + +// GetStream gets a stream +func (c *Client) GetStream(sdHash string) (stream.Stream, error) { + if !c.connected { + return nil, errors.Err("not connected") + } + + var sd stream.SDBlob + + b, err := c.GetBlob(sdHash) + if err != nil { + return nil, err + } + + err = sd.FromBlob(b) + if err != nil { + return nil, err + } + + s := make(stream.Stream, len(sd.BlobInfos)+1-1) // +1 for sd blob, -1 for last null blob + s[0] = b + + for i := 0; i < len(sd.BlobInfos)-1; i++ { + s[i+1], err = c.GetBlob(hex.EncodeToString(sd.BlobInfos[i].BlobHash)) + if err != nil { + return nil, err + } + } + + return s, nil +} + +// GetBlob gets a blob +func (c *Client) GetBlob(blobHash string) (stream.Blob, error) { + if !c.connected { + return nil, errors.Err("not connected") + } + + sendRequest, err := json.Marshal(blobRequest{ + RequestedBlob: blobHash, + }) + if err != nil { + return nil, err + } + + err = c.write(sendRequest) + if err != nil { + return nil, err + } + + var resp blobResponse + err = c.read(&resp) + if err != nil { + return nil, err + } + + if resp.IncomingBlob.Error != "" { + return nil, errors.Prefix(blobHash[:8], resp.IncomingBlob.Error) + } + if resp.IncomingBlob.BlobHash != blobHash { + return nil, errors.Prefix(blobHash[:8], "Blob hash in response does not match requested hash") + } + if resp.IncomingBlob.Length <= 0 { + return nil, errors.Prefix(blobHash[:8], "Length reported as <= 0") + } + + log.Println("Receiving blob " + blobHash[:8]) + + blob, err := c.readRawBlob(resp.IncomingBlob.Length) + if err != nil { + return nil, err + } + + return stream.Blob(blob), nil +} + +func (c *Client) read(v interface{}) error { + err := c.conn.SetReadDeadline(time.Now().Add(c.Timeout)) + if err != nil { + return errors.Err(err) + } + + m, err := readNextMessage(c.buf) + if err != nil { + return err + } + + err = json.Unmarshal(m, v) + return errors.Err(err) +} + +func (c *Client) readRawBlob(blobSize int) ([]byte, error) { + err := c.conn.SetReadDeadline(time.Now().Add(c.Timeout)) + if err != nil { + return nil, errors.Err(err) + } + + blob := make([]byte, blobSize) + _, err = io.ReadFull(c.buf, blob) + return blob, errors.Err(err) +} + +func (c *Client) write(b []byte) error { + err := c.conn.SetWriteDeadline(time.Now().Add(c.Timeout)) + if err != nil { + return errors.Err(err) + } + + log.Debugf("Writing %d bytes", len(b)) + + n, err := c.conn.Write(b) + if err == nil && n != len(b) { + err = io.ErrShortWrite + } + return errors.Err(err) +} diff --git a/peer/server.go b/peer/server.go index 1741a5c..4d80010 100644 --- a/peer/server.go +++ b/peer/server.go @@ -112,6 +112,7 @@ func (s *Server) handleConnection(conn net.Conn) { }() timeoutDuration := 1 * time.Minute + buf := bufio.NewReader(conn) for { var request []byte @@ -122,7 +123,7 @@ func (s *Server) handleConnection(conn net.Conn) { log.Error(errors.FullTrace(err)) } - request, err = readNextRequest(conn) + request, err = readNextMessage(buf) if err != nil { if err != io.EOF { s.logError(err) @@ -314,10 +315,9 @@ func (s *Server) logError(e error) { //log.Error(errors.FullTrace(e)) } -func readNextRequest(conn net.Conn) ([]byte, error) { +func readNextMessage(buf *bufio.Reader) ([]byte, error) { request := make([]byte, 0) eof := false - buf := bufio.NewReader(conn) for { chunk, err := buf.ReadBytes('}')