diff --git a/cmd/decode.go b/cmd/decode.go index 730fdb3..86ab619 100644 --- a/cmd/decode.go +++ b/cmd/decode.go @@ -1,9 +1,13 @@ package cmd import ( + "encoding/hex" + "fmt" + "github.com/lbryio/lbryschema.go/claim" "github.com/davecgh/go-spew/spew" + "github.com/golang/protobuf/jsonpb" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) @@ -24,12 +28,27 @@ func decodeCmd(cmd *cobra.Command, args []string) { log.Fatal(err) } + m := jsonpb.Marshaler{Indent: " "} + if stream := c.Claim.GetStream(); stream != nil { - spew.Dump(stream) + json, err := m.MarshalToString(stream) + if err != nil { + log.Fatal(err) + } + fmt.Println(json) + fmt.Printf("SD hash as hex: %s\n", hex.EncodeToString(stream.GetSource().GetSdHash())) } else if channel := c.Claim.GetChannel(); channel != nil { - spew.Dump(channel) - } else if repost := c.Claim.GetRepost(); channel != nil { - spew.Dump(repost) + json, err := m.MarshalToString(channel) + if err != nil { + log.Fatal(err) + } + fmt.Println(json) + } else if repost := c.Claim.GetRepost(); repost != nil { + json, err := m.MarshalToString(repost) + if err != nil { + log.Fatal(err) + } + fmt.Println(json) } else { spew.Dump(c) } diff --git a/cmd/getstream.go b/cmd/getstream.go index 6a8c053..5d1f6d6 100644 --- a/cmd/getstream.go +++ b/cmd/getstream.go @@ -1,13 +1,15 @@ package cmd import ( + "encoding/hex" "os" - "github.com/lbryio/reflector.go/store" - - "github.com/lbryio/lbry.go/extras/errors" "github.com/lbryio/reflector.go/peer" + "github.com/lbryio/lbry.go/stream" + + "github.com/lbryio/reflector.go/store" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) @@ -26,22 +28,48 @@ func getStreamCmd(cmd *cobra.Command, args []string) { addr := args[0] sdHash := args[1] - c := peer.Client{} - err := c.Connect(addr) - if err != nil { - log.Fatal("error connecting client to server: ", err) - } - - cache := store.NewFileBlobStore("/tmp/lbry_downloaded_blobs") + s := store.NewCachingBlobStore( + peer.NewStore(addr), + store.NewFileBlobStore("/tmp/lbry_downloaded_blobs", 2), + ) wd, err := os.Getwd() if err != nil { log.Fatal(err) } - err = c.WriteStream(sdHash, wd, cache) + var sd stream.SDBlob + + sdb, err := s.Get(sdHash) if err != nil { - log.Error(errors.FullTrace(err)) - return + log.Fatal(err) + } + + err = sd.FromBlob(sdb) + if err != nil { + log.Fatal(err) + } + + f, err := os.Create(wd + "/" + sd.SuggestedFileName) + if err != nil { + log.Fatal(err) + } + + for i := 0; i < len(sd.BlobInfos)-1; i++ { + bb, err := s.Get(hex.EncodeToString(sd.BlobInfos[i].BlobHash)) + if err != nil { + log.Fatal(err) + } + b := stream.Blob(bb) + + data, err := b.Plaintext(sd.Key, sd.BlobInfos[i].IV) + if err != nil { + log.Fatal(err) + } + + _, err = f.Write(data) + if err != nil { + log.Fatal(err) + } } } diff --git a/peer/client.go b/peer/client.go index 8558f1a..1c058eb 100644 --- a/peer/client.go +++ b/peer/client.go @@ -6,14 +6,12 @@ import ( "encoding/json" "io" "net" - "os" "time" "github.com/lbryio/reflector.go/store" - "github.com/lbryio/lbry.go/stream" - "github.com/lbryio/lbry.go/extras/errors" + "github.com/lbryio/lbry.go/stream" log "github.com/sirupsen/logrus" ) @@ -50,56 +48,6 @@ func (c *Client) Close() error { return c.conn.Close() } -// WriteStream downloads and writes a stream to file -func (c *Client) WriteStream(sdHash, dir string, blobStore store.BlobStore) error { - if !c.connected { - return errors.Err("not connected") - } - - var sd stream.SDBlob - - sdb, err := c.getBlobWithCache(sdHash, blobStore) - if err != nil { - return err - } - - err = sd.FromBlob(sdb) - if err != nil { - return err - } - - info, err := os.Stat(dir) - if err != nil { - return errors.Prefix("cannot stat "+dir, err) - } else if !info.IsDir() { - return errors.Err(dir + " must be a directory") - } - - f, err := os.Create(dir + "/" + sd.SuggestedFileName) - if err != nil { - return err - } - - for i := 0; i < len(sd.BlobInfos)-1; i++ { - b, err := c.getBlobWithCache(hex.EncodeToString(sd.BlobInfos[i].BlobHash), blobStore) - if err != nil { - return err - } - - data, err := b.Plaintext(sd.Key, sd.BlobInfos[i].IV) - if err != nil { - return err - } - - _, err = f.Write(data) - if err != nil { - return err - } - } - - return nil -} - // GetStream gets a stream func (c *Client) GetStream(sdHash string, blobCache store.BlobStore) (stream.Stream, error) { if !c.connected { @@ -108,7 +56,7 @@ func (c *Client) GetStream(sdHash string, blobCache store.BlobStore) (stream.Str var sd stream.SDBlob - b, err := c.getBlobWithCache(sdHash, blobCache) + b, err := c.GetBlob(sdHash) if err != nil { return nil, err } @@ -122,7 +70,7 @@ func (c *Client) GetStream(sdHash string, blobCache store.BlobStore) (stream.Str s[0] = b for i := 0; i < len(sd.BlobInfos)-1; i++ { - s[i+1], err = c.getBlobWithCache(hex.EncodeToString(sd.BlobInfos[i].BlobHash), blobCache) + s[i+1], err = c.GetBlob(hex.EncodeToString(sd.BlobInfos[i].BlobHash)) if err != nil { return nil, err } @@ -131,24 +79,37 @@ func (c *Client) GetStream(sdHash string, blobCache store.BlobStore) (stream.Str return s, nil } -func (c *Client) getBlobWithCache(hash string, blobCache store.BlobStore) (stream.Blob, error) { - if blobCache == nil { - return c.GetBlob(hash) +// HasBlob checks if the blob is available +func (c *Client) HasBlob(hash string) (bool, error) { + if !c.connected { + return false, errors.Err("not connected") } - blob, err := blobCache.Get(hash) - if err == nil || !errors.Is(err, store.ErrBlobNotFound) { - return blob, err - } - - blob, err = c.GetBlob(hash) + sendRequest, err := json.Marshal(availabilityRequest{ + RequestedBlobs: []string{hash}, + }) if err != nil { - return nil, err + return false, err } - err = blobCache.Put(hash, blob) + err = c.write(sendRequest) + if err != nil { + return false, err + } - return blob, err + var resp availabilityResponse + err = c.read(&resp) + if err != nil { + return false, err + } + + for _, h := range resp.AvailableBlobs { + if h == hash { + return true, nil + } + } + + return false, nil } // GetBlob gets a blob @@ -185,7 +146,7 @@ func (c *Client) GetBlob(hash string) (stream.Blob, error) { return nil, errors.Prefix(hash[:8], "Length reported as <= 0") } - log.Println("Receiving blob " + hash[:8]) + log.Printf("Receiving blob %s from %s", hash[:8], c.conn.RemoteAddr()) blob, err := c.readRawBlob(resp.IncomingBlob.Length) if err != nil { @@ -206,6 +167,8 @@ func (c *Client) read(v interface{}) error { return err } + log.Debugf("Read %d bytes from %s", len(m), c.conn.RemoteAddr()) + err = json.Unmarshal(m, v) return errors.Err(err) } @@ -217,7 +180,8 @@ func (c *Client) readRawBlob(blobSize int) ([]byte, error) { } blob := make([]byte, blobSize) - _, err = io.ReadFull(c.buf, blob) + n, err := io.ReadFull(c.buf, blob) + log.Debugf("Read %d bytes from %s", n, c.conn.RemoteAddr()) return blob, errors.Err(err) } @@ -227,7 +191,7 @@ func (c *Client) write(b []byte) error { return errors.Err(err) } - log.Debugf("Writing %d bytes", len(b)) + log.Debugf("Writing %d bytes to %s", len(b), c.conn.RemoteAddr()) n, err := c.conn.Write(b) if err == nil && n != len(b) { diff --git a/peer/server.go b/peer/server.go index 4d80010..90bca99 100644 --- a/peer/server.go +++ b/peer/server.go @@ -186,45 +186,45 @@ func (s *Server) handleAvailabilityRequest(data []byte) ([]byte, error) { return json.Marshal(availabilityResponse{LbrycrdAddress: LbrycrdAddress, AvailableBlobs: availableBlobs}) } -func (s *Server) handlePaymentRateNegotiation(data []byte) ([]byte, error) { - var request paymentRateRequest - err := json.Unmarshal(data, &request) - if err != nil { - return []byte{}, err - } - - offerReply := paymentRateAccepted - if request.BlobDataPaymentRate < 0 { - offerReply = paymentRateTooLow - } - - return json.Marshal(paymentRateResponse{BlobDataPaymentRate: offerReply}) -} - -func (s *Server) handleBlobRequest(data []byte) ([]byte, error) { - var request blobRequest - err := json.Unmarshal(data, &request) - if err != nil { - return []byte{}, err - } - - log.Debugln("Sending blob " + request.RequestedBlob[:8]) - - blob, err := s.store.Get(request.RequestedBlob) - if err != nil { - return []byte{}, err - } - - response, err := json.Marshal(blobResponse{IncomingBlob: incomingBlob{ - BlobHash: reflector.BlobHash(blob), - Length: len(blob), - }}) - if err != nil { - return []byte{}, err - } - - return append(response, blob...), nil -} +//func (s *Server) handlePaymentRateNegotiation(data []byte) ([]byte, error) { +// var request paymentRateRequest +// err := json.Unmarshal(data, &request) +// if err != nil { +// return []byte{}, err +// } +// +// offerReply := paymentRateAccepted +// if request.BlobDataPaymentRate < 0 { +// offerReply = paymentRateTooLow +// } +// +// return json.Marshal(paymentRateResponse{BlobDataPaymentRate: offerReply}) +//} +// +//func (s *Server) handleBlobRequest(data []byte) ([]byte, error) { +// var request blobRequest +// err := json.Unmarshal(data, &request) +// if err != nil { +// return []byte{}, err +// } +// +// log.Debugln("Sending blob " + request.RequestedBlob[:8]) +// +// blob, err := s.store.Get(request.RequestedBlob) +// if err != nil { +// return []byte{}, err +// } +// +// response, err := json.Marshal(blobResponse{IncomingBlob: incomingBlob{ +// BlobHash: reflector.BlobHash(blob), +// Length: len(blob), +// }}) +// if err != nil { +// return []byte{}, err +// } +// +// return append(response, blob...), nil +//} func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) { var request compositeRequest @@ -316,7 +316,7 @@ func (s *Server) logError(e error) { } func readNextMessage(buf *bufio.Reader) ([]byte, error) { - request := make([]byte, 0) + msg := make([]byte, 0) eof := false for { @@ -324,7 +324,7 @@ func readNextMessage(buf *bufio.Reader) ([]byte, error) { if err != nil { if err != io.EOF { //log.Errorln("readBytes error:", err) // logged by caller - return request, err + return msg, err } eof = true } @@ -333,14 +333,14 @@ func readNextMessage(buf *bufio.Reader) ([]byte, error) { //spew.Dump(chunk) if len(chunk) > 0 { - request = append(request, chunk...) + msg = append(msg, chunk...) - if len(request) > maxRequestSize { - return request, errRequestTooLarge + if len(msg) > maxRequestSize { + return msg, errRequestTooLarge } // yes, this is how the peer protocol knows when the request finishes - if reflector.IsValidJSON(request) { + if reflector.IsValidJSON(msg) { break } } @@ -355,11 +355,11 @@ func readNextMessage(buf *bufio.Reader) ([]byte, error) { // spew.Dump(request) //} - if len(request) == 0 && eof { - return []byte{}, io.EOF + if len(msg) == 0 && eof { + return nil, io.EOF } - return request, nil + return msg, nil } const (