From 4392c9724262b60ed378cd727c2a2d9b35e4c638 Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Tue, 13 Apr 2021 00:52:56 +0200 Subject: [PATCH] fix mess with lbry.go --- cmd/root.go | 6 ++ cmd/send.go | 160 +++++++++++++++++++++++++++++++++++++++++++++ cmd/sendblob.go | 6 +- go.mod | 7 +- go.sum | 11 ++-- publish/publish.go | 76 ++++++++++----------- 6 files changed, 211 insertions(+), 55 deletions(-) create mode 100644 cmd/send.go diff --git a/cmd/root.go b/cmd/root.go index ce28f8f..1e422a7 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -163,3 +163,9 @@ func mustGetFlagInt64(cmd *cobra.Command, name string) int64 { checkErr(err) return v } + +func mustGetFlagBool(cmd *cobra.Command, name string) bool { + v, err := cmd.Flags().GetBool(name) + checkErr(err) + return v +} diff --git a/cmd/send.go b/cmd/send.go new file mode 100644 index 0000000..7021866 --- /dev/null +++ b/cmd/send.go @@ -0,0 +1,160 @@ +package cmd + +import ( + "encoding/hex" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "os" + "os/signal" + "path" + "syscall" + + "github.com/lbryio/reflector.go/reflector" + + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/stream" + + "github.com/spf13/cobra" +) + +func init() { + var cmd = &cobra.Command{ + Use: "send ADDRESS:PORT PATH", + Short: "Send a file to a reflector", + Args: cobra.ExactArgs(2), + Run: sendCmd, + } + cmd.PersistentFlags().String("sd-cache", "", "path to dir where sd blobs will be cached") + rootCmd.AddCommand(cmd) +} + +// todo: if retrying a large file is slow, we can add the ability to seek ahead in the file so we're not +// re-uploading blobs that already exist + +var hackyReflector reflector.Client + +func sendCmd(cmd *cobra.Command, args []string) { + reflectorAddress := args[0] + err := hackyReflector.Connect(reflectorAddress) + checkErr(err) + defer hackyReflector.Close() + + filePath := args[1] + file, err := os.Open(filePath) + checkErr(err) + defer file.Close() + + sdCachePath := "" + sdCacheDir := mustGetFlagString(cmd, "sd-cache") + if sdCacheDir != "" { + if _, err := os.Stat(sdCacheDir); os.IsNotExist(err) { + err = os.MkdirAll(sdCacheDir, 0777) + checkErr(err) + } + sdCachePath = path.Join(sdCacheDir, filePath+".sdblob") + } + + var enc *stream.Encoder + + if sdCachePath != "" { + if _, err := os.Stat(sdCachePath); !os.IsNotExist(err) { + sdBlob, err := ioutil.ReadFile(sdCachePath) + checkErr(err) + cachedSDBlob := &stream.SDBlob{} + err = cachedSDBlob.FromBlob(sdBlob) + checkErr(err) + enc = stream.NewEncoderFromSD(file, cachedSDBlob) + } + } + if enc == nil { + enc = stream.NewEncoder(file) + } + + exitCode := 0 + + var killed bool + interruptChan := make(chan os.Signal, 1) + signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) + go func() { + sig := <-interruptChan + fmt.Printf("caught %s, exiting...\n", sig.String()) + killed = true + exitCode = 1 + }() + + for { + if killed { + break + } + + b, err := enc.Next() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + fmt.Printf("error reading next blob: %v\n", err) + exitCode = 1 + break + } + + err = hackyReflect(b, false) + if err != nil { + fmt.Printf("error reflecting blob %s: %v\n", b.HashHex()[:8], err) + exitCode = 1 + break + } + } + + sd := enc.SDBlob() + //sd.StreamName = filepath.Base(filePath) + //sd.SuggestedFileName = filepath.Base(filePath) + err = ioutil.WriteFile(sdCachePath, sd.ToBlob(), 0666) + if err != nil { + fmt.Printf("error saving sd blob: %v\n", err) + fmt.Println(sd.ToJson()) + exitCode = 1 + } + + if killed { + os.Exit(exitCode) + } + + if reflectorAddress != "" { + err = hackyReflect(sd.ToBlob(), true) + if err != nil { + fmt.Printf("error reflecting sd blob %s: %v\n", sd.HashHex()[:8], err) + exitCode = 1 + } + } + + ret := struct { + SDHash string `json:"sd_hash"` + SourceHash string `json:"source_hash"` + }{ + SDHash: sd.HashHex(), + SourceHash: hex.EncodeToString(enc.SourceHash()), + } + + j, err := json.MarshalIndent(ret, "", " ") + checkErr(err) + fmt.Println(string(j)) + os.Exit(exitCode) +} + +func hackyReflect(b stream.Blob, sd bool) error { + var err error + if sd { + err = hackyReflector.SendSDBlob(b) + } else { + err = hackyReflector.SendBlob(b) + } + + if errors.Is(err, reflector.ErrBlobExists) { + //fmt.Printf("%s already reflected\n", b.HashHex()[:8]) + return nil + } + + return err +} diff --git a/cmd/sendblob.go b/cmd/sendblob.go index 3cd9f05..f37f390 100644 --- a/cmd/sendblob.go +++ b/cmd/sendblob.go @@ -2,7 +2,6 @@ package cmd import ( "crypto/rand" - "io/ioutil" "os" "github.com/lbryio/reflector.go/reflector" @@ -52,9 +51,8 @@ func sendBlobCmd(cmd *cobra.Command, args []string) { file, err := os.Open(path) checkErr(err) - data, err := ioutil.ReadAll(file) - checkErr(err) - s, err := stream.New(data) + defer file.Close() + s, err := stream.New(file) checkErr(err) sdBlob := &stream.SDBlob{} diff --git a/go.mod b/go.mod index ed3ebcb..4ed4f23 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module github.com/lbryio/reflector.go replace github.com/btcsuite/btcd => github.com/lbryio/lbrycrd.go v0.0.0-20200203050410-e1076f12bf19 +//replace github.com/lbryio/lbry.go/v2 => ../lbry.go + require ( github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect github.com/aws/aws-sdk-go v1.16.11 @@ -23,9 +25,9 @@ require ( github.com/karrick/godirwalk v1.16.1 github.com/lbryio/chainquery v1.9.0 github.com/lbryio/lbry.go v1.1.2 // indirect - github.com/lbryio/lbry.go/v2 v2.7.2-0.20210316000044-988178df5011 + github.com/lbryio/lbry.go/v2 v2.7.2-0.20210412222918-ed51ece75c3d github.com/lbryio/types v0.0.0-20201019032447-f0b4476ef386 - github.com/lucas-clemente/quic-go v0.20.0 + github.com/lucas-clemente/quic-go v0.20.1 github.com/phayes/freeport v0.0.0-20171002185219-e27662a4a9d6 github.com/prometheus/client_golang v0.9.3 github.com/sirupsen/logrus v1.4.2 @@ -36,7 +38,6 @@ require ( github.com/stretchr/testify v1.7.0 github.com/volatiletech/null v8.0.0+incompatible go.uber.org/atomic v1.5.1 - golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f // indirect golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4 // indirect diff --git a/go.sum b/go.sum index f61f06b..c51b403 100644 --- a/go.sum +++ b/go.sum @@ -88,8 +88,9 @@ github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWo github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0= -github.com/go-errors/errors v1.0.1 h1:LUHzmkK3GUKUrL/1gfBUxAHzcev3apQlezX/+O7ma6w= github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= +github.com/go-errors/errors v1.1.1 h1:ljK/pL5ltg3qoN+OtN6yCv9HWSfMwxSx90GJCZQxYNg= +github.com/go-errors/errors v1.1.1/go.mod h1:psDX2osz5VnTOnFWbDeWwS7yejl+uV3FEWEp4lssFEs= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-ini/ini v1.38.2/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/go-ini/ini v1.48.0 h1:TvO60hO/2xgaaTWp2P0wUe4CFxwdMzfbkv3+343Xzqw= @@ -254,8 +255,8 @@ github.com/lbryio/errors.go v0.0.0-20180223142025-ad03d3cc6a5c/go.mod h1:muH7wpU github.com/lbryio/lbry.go v1.1.1-0.20190825202001-8fa28d3d656f/go.mod h1:JtyI30bU51rm0LZ/po3mQuzf++14OWb6kR/6mMRAmKU= github.com/lbryio/lbry.go v1.1.2 h1:Dyxc+glT/rVWJwHfIf7vjlPYYbjzrQz5ARmJd5Hp69c= github.com/lbryio/lbry.go v1.1.2/go.mod h1:JtyI30bU51rm0LZ/po3mQuzf++14OWb6kR/6mMRAmKU= -github.com/lbryio/lbry.go/v2 v2.7.2-0.20210316000044-988178df5011 h1:r1NoX3NQu/Me+/qw4OzJGw8bhOUnTZHUneCbIV6SC+Y= -github.com/lbryio/lbry.go/v2 v2.7.2-0.20210316000044-988178df5011/go.mod h1:sUhhSKqPNkiwgBqvBzJIqfLLzGH8hkDGrrO/HcaXzFc= +github.com/lbryio/lbry.go/v2 v2.7.2-0.20210412222918-ed51ece75c3d h1:VUaOZ3cbCe7gfpycN/srCOk6U2bBS9NZHEz9RiRxd4E= +github.com/lbryio/lbry.go/v2 v2.7.2-0.20210412222918-ed51ece75c3d/go.mod h1:I1q8W9fwU+t0IWNiprPgE1SorWQwcO6ser0nzP3L5Pk= github.com/lbryio/lbrycrd.go v0.0.0-20200203050410-e1076f12bf19 h1:/zWD8dVIl7bV1TdJWqPqy9tpqixzX2Qxgit48h3hQcY= github.com/lbryio/lbrycrd.go v0.0.0-20200203050410-e1076f12bf19/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ= github.com/lbryio/lbryschema.go v0.0.0-20190428231007-c54836bca002/go.mod h1:dAzPCBj3CKKWBGYBZxK6tKBP5SCgY2tqd9SnQd/OyKo= @@ -265,8 +266,8 @@ github.com/lbryio/types v0.0.0-20190422033210-321fb2abda9c/go.mod h1:CG3wsDv5BiV github.com/lbryio/types v0.0.0-20201019032447-f0b4476ef386 h1:JOQkGpeCM9FWkEHRx+kRPqySPCXElNW1em1++7tVS4M= github.com/lbryio/types v0.0.0-20201019032447-f0b4476ef386/go.mod h1:CG3wsDv5BiVYQd5i1Jp7wGsaVyjZTJshqXeWMVKsISE= github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= -github.com/lucas-clemente/quic-go v0.20.0 h1:FSU3YN5VnLafHR27Ejs1r1CYMS7XMyIVDzRewkDLNBw= -github.com/lucas-clemente/quic-go v0.20.0/go.mod h1:fZq/HUDIM+mW6X6wtzORjC0E/WDBMKe5Hf9bgjISwLk= +github.com/lucas-clemente/quic-go v0.20.1 h1:hb5m76V8QS/8Nw/suHvXqo3BMHAozvIkcnzpJdpanSk= +github.com/lucas-clemente/quic-go v0.20.1/go.mod h1:fZq/HUDIM+mW6X6wtzORjC0E/WDBMKe5Hf9bgjISwLk= github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI= github.com/lusis/go-slackbot v0.0.0-20180109053408-401027ccfef5/go.mod h1:c2mYKRyMb1BPkO5St0c/ps62L4S0W2NAkaTXj9qEI+0= github.com/lusis/slack-test v0.0.0-20180109053238-3c758769bfa6/go.mod h1:sFlOUpQL1YcjhFVXhg1CG8ZASEs/Mf1oVb6H75JL/zg= diff --git a/publish/publish.go b/publish/publish.go index 417978e..9e585e8 100644 --- a/publish/publish.go +++ b/publish/publish.go @@ -3,7 +3,6 @@ package publish import ( "bytes" "encoding/json" - "io/ioutil" "os" "path/filepath" "sort" @@ -21,7 +20,6 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" "github.com/golang/protobuf/proto" - "golang.org/x/crypto/sha3" ) var TODO = ` @@ -43,6 +41,14 @@ var TODO = ` } ` +type Details struct { + Title string + Description string + Author string + Tags []string + ReleaseTime int64 +} + func Publish(client *lbrycrd.Client, path, name, address string, details Details, reflectorAddress string) (*wire.MsgTx, *chainhash.Hash, error) { if name == "" { return nil, nil, errors.Err("name required") @@ -69,11 +75,20 @@ func Publish(client *lbrycrd.Client, path, name, address string, details Details return nil, nil, err } - claim, st, err := makeClaimAndStream(path, details) + st, stPB, err := makeStream(path) if err != nil { return nil, nil, err } + stPB.Author = details.Author + stPB.ReleaseTime = details.ReleaseTime + + claim := &pb.Claim{ + Title: details.Title, + Description: details.Description, + Type: &pb.Claim_Stream{Stream: stPB}, + } + err = addClaimToTx(tx, claim, name, amount, addr) if err != nil { return nil, nil, err @@ -203,50 +218,31 @@ func reflect(st stream.Stream, reflectorAddress string) error { return nil } -type Details struct { - Title string - Description string - Author string - Tags []string - ReleaseTime int64 -} - -func makeClaimAndStream(path string, details Details) (*pb.Claim, stream.Stream, error) { +func makeStream(path string) (stream.Stream, *pb.Stream, error) { file, err := os.Open(path) if err != nil { return nil, nil, errors.Err(err) } - data, err := ioutil.ReadAll(file) - if err != nil { - return nil, nil, errors.Err(err) - } - s, err := stream.New(data) + defer file.Close() + + enc := stream.NewEncoder(file) + + s, err := enc.Stream() if err != nil { return nil, nil, errors.Err(err) } - // make the claim - sdBlob := &stream.SDBlob{} - err = sdBlob.FromBlob(s[0]) - if err != nil { - return nil, nil, errors.Err(err) - } - - filehash := sha3.Sum384(data) - - streamPB := &pb.Stream{ - Author: details.Author, - ReleaseTime: details.ReleaseTime, + streamProto := &pb.Stream{ Source: &pb.Source{ - SdHash: s[0].Hash(), + SdHash: enc.SDBlob().Hash(), Name: filepath.Base(file.Name()), - Size: uint64(len(data)), - Hash: filehash[:], + Size: uint64(enc.SourceLen()), + Hash: enc.SourceHash(), }, } mimeType, category := guessMimeType(filepath.Ext(file.Name())) - streamPB.Source.MediaType = mimeType + streamProto.Source.MediaType = mimeType switch category { case "video": @@ -254,20 +250,14 @@ func makeClaimAndStream(path string, details Details) (*pb.Claim, stream.Stream, //if err != nil { // return nil, nil, err //} - streamPB.Type = &pb.Stream_Video{} + streamProto.Type = &pb.Stream_Video{} case "audio": - streamPB.Type = &pb.Stream_Audio{} + streamProto.Type = &pb.Stream_Audio{} case "image": - streamPB.Type = &pb.Stream_Image{} + streamProto.Type = &pb.Stream_Image{} } - claim := &pb.Claim{ - Title: details.Title, - Description: details.Description, - Type: &pb.Claim_Stream{Stream: streamPB}, - } - - return claim, s, nil + return s, streamProto, nil } func getClaimPayoutScript(name string, value []byte, address btcutil.Address) ([]byte, error) {