rewrite getstream command using caching store

This commit is contained in:
Alex Grintsvayg 2019-10-03 16:13:08 -04:00
parent 24f885e268
commit a8230db802
No known key found for this signature in database
GPG key ID: AEB3F089F86A22B5
4 changed files with 146 additions and 135 deletions

View file

@ -1,9 +1,13 @@
package cmd package cmd
import ( import (
"encoding/hex"
"fmt"
"github.com/lbryio/lbryschema.go/claim" "github.com/lbryio/lbryschema.go/claim"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/golang/protobuf/jsonpb"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -24,12 +28,27 @@ func decodeCmd(cmd *cobra.Command, args []string) {
log.Fatal(err) log.Fatal(err)
} }
m := jsonpb.Marshaler{Indent: " "}
if stream := c.Claim.GetStream(); stream != nil { if stream := c.Claim.GetStream(); stream != nil {
spew.Dump(stream) json, err := m.MarshalToString(stream)
if err != nil {
log.Fatal(err)
}
fmt.Println(json)
fmt.Printf("SD hash as hex: %s\n", hex.EncodeToString(stream.GetSource().GetSdHash()))
} else if channel := c.Claim.GetChannel(); channel != nil { } else if channel := c.Claim.GetChannel(); channel != nil {
spew.Dump(channel) json, err := m.MarshalToString(channel)
} else if repost := c.Claim.GetRepost(); channel != nil { if err != nil {
spew.Dump(repost) log.Fatal(err)
}
fmt.Println(json)
} else if repost := c.Claim.GetRepost(); repost != nil {
json, err := m.MarshalToString(repost)
if err != nil {
log.Fatal(err)
}
fmt.Println(json)
} else { } else {
spew.Dump(c) spew.Dump(c)
} }

View file

@ -1,13 +1,15 @@
package cmd package cmd
import ( import (
"encoding/hex"
"os" "os"
"github.com/lbryio/reflector.go/store"
"github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/reflector.go/peer" "github.com/lbryio/reflector.go/peer"
"github.com/lbryio/lbry.go/stream"
"github.com/lbryio/reflector.go/store"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -26,22 +28,48 @@ func getStreamCmd(cmd *cobra.Command, args []string) {
addr := args[0] addr := args[0]
sdHash := args[1] sdHash := args[1]
c := peer.Client{} s := store.NewCachingBlobStore(
err := c.Connect(addr) peer.NewStore(addr),
if err != nil { store.NewFileBlobStore("/tmp/lbry_downloaded_blobs", 2),
log.Fatal("error connecting client to server: ", err) )
}
cache := store.NewFileBlobStore("/tmp/lbry_downloaded_blobs")
wd, err := os.Getwd() wd, err := os.Getwd()
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
err = c.WriteStream(sdHash, wd, cache) var sd stream.SDBlob
sdb, err := s.Get(sdHash)
if err != nil { if err != nil {
log.Error(errors.FullTrace(err)) log.Fatal(err)
return }
err = sd.FromBlob(sdb)
if err != nil {
log.Fatal(err)
}
f, err := os.Create(wd + "/" + sd.SuggestedFileName)
if err != nil {
log.Fatal(err)
}
for i := 0; i < len(sd.BlobInfos)-1; i++ {
bb, err := s.Get(hex.EncodeToString(sd.BlobInfos[i].BlobHash))
if err != nil {
log.Fatal(err)
}
b := stream.Blob(bb)
data, err := b.Plaintext(sd.Key, sd.BlobInfos[i].IV)
if err != nil {
log.Fatal(err)
}
_, err = f.Write(data)
if err != nil {
log.Fatal(err)
}
} }
} }

View file

@ -6,14 +6,12 @@ import (
"encoding/json" "encoding/json"
"io" "io"
"net" "net"
"os"
"time" "time"
"github.com/lbryio/reflector.go/store" "github.com/lbryio/reflector.go/store"
"github.com/lbryio/lbry.go/stream"
"github.com/lbryio/lbry.go/extras/errors" "github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/lbry.go/stream"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -50,56 +48,6 @@ func (c *Client) Close() error {
return c.conn.Close() return c.conn.Close()
} }
// WriteStream downloads and writes a stream to file
func (c *Client) WriteStream(sdHash, dir string, blobStore store.BlobStore) error {
if !c.connected {
return errors.Err("not connected")
}
var sd stream.SDBlob
sdb, err := c.getBlobWithCache(sdHash, blobStore)
if err != nil {
return err
}
err = sd.FromBlob(sdb)
if err != nil {
return err
}
info, err := os.Stat(dir)
if err != nil {
return errors.Prefix("cannot stat "+dir, err)
} else if !info.IsDir() {
return errors.Err(dir + " must be a directory")
}
f, err := os.Create(dir + "/" + sd.SuggestedFileName)
if err != nil {
return err
}
for i := 0; i < len(sd.BlobInfos)-1; i++ {
b, err := c.getBlobWithCache(hex.EncodeToString(sd.BlobInfos[i].BlobHash), blobStore)
if err != nil {
return err
}
data, err := b.Plaintext(sd.Key, sd.BlobInfos[i].IV)
if err != nil {
return err
}
_, err = f.Write(data)
if err != nil {
return err
}
}
return nil
}
// GetStream gets a stream // GetStream gets a stream
func (c *Client) GetStream(sdHash string, blobCache store.BlobStore) (stream.Stream, error) { func (c *Client) GetStream(sdHash string, blobCache store.BlobStore) (stream.Stream, error) {
if !c.connected { if !c.connected {
@ -108,7 +56,7 @@ func (c *Client) GetStream(sdHash string, blobCache store.BlobStore) (stream.Str
var sd stream.SDBlob var sd stream.SDBlob
b, err := c.getBlobWithCache(sdHash, blobCache) b, err := c.GetBlob(sdHash)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -122,7 +70,7 @@ func (c *Client) GetStream(sdHash string, blobCache store.BlobStore) (stream.Str
s[0] = b s[0] = b
for i := 0; i < len(sd.BlobInfos)-1; i++ { for i := 0; i < len(sd.BlobInfos)-1; i++ {
s[i+1], err = c.getBlobWithCache(hex.EncodeToString(sd.BlobInfos[i].BlobHash), blobCache) s[i+1], err = c.GetBlob(hex.EncodeToString(sd.BlobInfos[i].BlobHash))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -131,24 +79,37 @@ func (c *Client) GetStream(sdHash string, blobCache store.BlobStore) (stream.Str
return s, nil return s, nil
} }
func (c *Client) getBlobWithCache(hash string, blobCache store.BlobStore) (stream.Blob, error) { // HasBlob checks if the blob is available
if blobCache == nil { func (c *Client) HasBlob(hash string) (bool, error) {
return c.GetBlob(hash) if !c.connected {
return false, errors.Err("not connected")
} }
blob, err := blobCache.Get(hash) sendRequest, err := json.Marshal(availabilityRequest{
if err == nil || !errors.Is(err, store.ErrBlobNotFound) { RequestedBlobs: []string{hash},
return blob, err })
}
blob, err = c.GetBlob(hash)
if err != nil { if err != nil {
return nil, err return false, err
} }
err = blobCache.Put(hash, blob) err = c.write(sendRequest)
if err != nil {
return false, err
}
return blob, err var resp availabilityResponse
err = c.read(&resp)
if err != nil {
return false, err
}
for _, h := range resp.AvailableBlobs {
if h == hash {
return true, nil
}
}
return false, nil
} }
// GetBlob gets a blob // GetBlob gets a blob
@ -185,7 +146,7 @@ func (c *Client) GetBlob(hash string) (stream.Blob, error) {
return nil, errors.Prefix(hash[:8], "Length reported as <= 0") return nil, errors.Prefix(hash[:8], "Length reported as <= 0")
} }
log.Println("Receiving blob " + hash[:8]) log.Printf("Receiving blob %s from %s", hash[:8], c.conn.RemoteAddr())
blob, err := c.readRawBlob(resp.IncomingBlob.Length) blob, err := c.readRawBlob(resp.IncomingBlob.Length)
if err != nil { if err != nil {
@ -206,6 +167,8 @@ func (c *Client) read(v interface{}) error {
return err return err
} }
log.Debugf("Read %d bytes from %s", len(m), c.conn.RemoteAddr())
err = json.Unmarshal(m, v) err = json.Unmarshal(m, v)
return errors.Err(err) return errors.Err(err)
} }
@ -217,7 +180,8 @@ func (c *Client) readRawBlob(blobSize int) ([]byte, error) {
} }
blob := make([]byte, blobSize) blob := make([]byte, blobSize)
_, err = io.ReadFull(c.buf, blob) n, err := io.ReadFull(c.buf, blob)
log.Debugf("Read %d bytes from %s", n, c.conn.RemoteAddr())
return blob, errors.Err(err) return blob, errors.Err(err)
} }
@ -227,7 +191,7 @@ func (c *Client) write(b []byte) error {
return errors.Err(err) return errors.Err(err)
} }
log.Debugf("Writing %d bytes", len(b)) log.Debugf("Writing %d bytes to %s", len(b), c.conn.RemoteAddr())
n, err := c.conn.Write(b) n, err := c.conn.Write(b)
if err == nil && n != len(b) { if err == nil && n != len(b) {

View file

@ -186,45 +186,45 @@ func (s *Server) handleAvailabilityRequest(data []byte) ([]byte, error) {
return json.Marshal(availabilityResponse{LbrycrdAddress: LbrycrdAddress, AvailableBlobs: availableBlobs}) return json.Marshal(availabilityResponse{LbrycrdAddress: LbrycrdAddress, AvailableBlobs: availableBlobs})
} }
func (s *Server) handlePaymentRateNegotiation(data []byte) ([]byte, error) { //func (s *Server) handlePaymentRateNegotiation(data []byte) ([]byte, error) {
var request paymentRateRequest // var request paymentRateRequest
err := json.Unmarshal(data, &request) // err := json.Unmarshal(data, &request)
if err != nil { // if err != nil {
return []byte{}, err // return []byte{}, err
} // }
//
offerReply := paymentRateAccepted // offerReply := paymentRateAccepted
if request.BlobDataPaymentRate < 0 { // if request.BlobDataPaymentRate < 0 {
offerReply = paymentRateTooLow // offerReply = paymentRateTooLow
} // }
//
return json.Marshal(paymentRateResponse{BlobDataPaymentRate: offerReply}) // return json.Marshal(paymentRateResponse{BlobDataPaymentRate: offerReply})
} //}
//
func (s *Server) handleBlobRequest(data []byte) ([]byte, error) { //func (s *Server) handleBlobRequest(data []byte) ([]byte, error) {
var request blobRequest // var request blobRequest
err := json.Unmarshal(data, &request) // err := json.Unmarshal(data, &request)
if err != nil { // if err != nil {
return []byte{}, err // return []byte{}, err
} // }
//
log.Debugln("Sending blob " + request.RequestedBlob[:8]) // log.Debugln("Sending blob " + request.RequestedBlob[:8])
//
blob, err := s.store.Get(request.RequestedBlob) // blob, err := s.store.Get(request.RequestedBlob)
if err != nil { // if err != nil {
return []byte{}, err // return []byte{}, err
} // }
//
response, err := json.Marshal(blobResponse{IncomingBlob: incomingBlob{ // response, err := json.Marshal(blobResponse{IncomingBlob: incomingBlob{
BlobHash: reflector.BlobHash(blob), // BlobHash: reflector.BlobHash(blob),
Length: len(blob), // Length: len(blob),
}}) // }})
if err != nil { // if err != nil {
return []byte{}, err // return []byte{}, err
} // }
//
return append(response, blob...), nil // return append(response, blob...), nil
} //}
func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) { func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) {
var request compositeRequest var request compositeRequest
@ -316,7 +316,7 @@ func (s *Server) logError(e error) {
} }
func readNextMessage(buf *bufio.Reader) ([]byte, error) { func readNextMessage(buf *bufio.Reader) ([]byte, error) {
request := make([]byte, 0) msg := make([]byte, 0)
eof := false eof := false
for { for {
@ -324,7 +324,7 @@ func readNextMessage(buf *bufio.Reader) ([]byte, error) {
if err != nil { if err != nil {
if err != io.EOF { if err != io.EOF {
//log.Errorln("readBytes error:", err) // logged by caller //log.Errorln("readBytes error:", err) // logged by caller
return request, err return msg, err
} }
eof = true eof = true
} }
@ -333,14 +333,14 @@ func readNextMessage(buf *bufio.Reader) ([]byte, error) {
//spew.Dump(chunk) //spew.Dump(chunk)
if len(chunk) > 0 { if len(chunk) > 0 {
request = append(request, chunk...) msg = append(msg, chunk...)
if len(request) > maxRequestSize { if len(msg) > maxRequestSize {
return request, errRequestTooLarge return msg, errRequestTooLarge
} }
// yes, this is how the peer protocol knows when the request finishes // yes, this is how the peer protocol knows when the request finishes
if reflector.IsValidJSON(request) { if reflector.IsValidJSON(msg) {
break break
} }
} }
@ -355,11 +355,11 @@ func readNextMessage(buf *bufio.Reader) ([]byte, error) {
// spew.Dump(request) // spew.Dump(request)
//} //}
if len(request) == 0 && eof { if len(msg) == 0 && eof {
return []byte{}, io.EOF return nil, io.EOF
} }
return request, nil return msg, nil
} }
const ( const (