started work on go blob primitives. successfully matched python's blob crypto (excluding canonical JSON)

This commit is contained in:
Alex Grintsvayg 2018-08-20 17:50:39 -04:00
parent 4293e772e2
commit 11ebfb822b
8 changed files with 404 additions and 29 deletions

View file

@ -19,7 +19,7 @@ type SdBlob struct {
Length int `json:"length"` Length int `json:"length"`
BlobNum int `json:"blob_num"` BlobNum int `json:"blob_num"`
BlobHash string `json:"blob_hash,omitempty"` BlobHash string `json:"blob_hash,omitempty"`
Iv string `json:"iv"` IV string `json:"iv"`
} `json:"blobs"` } `json:"blobs"`
StreamType string `json:"stream_type"` StreamType string `json:"stream_type"`
Key string `json:"key"` Key string `json:"key"`

View file

@ -3,9 +3,11 @@ package reflector
import ( import (
"encoding/json" "encoding/json"
"net" "net"
"strconv"
"github.com/lbryio/reflector.go/stream"
"github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/errors"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -36,20 +38,18 @@ func (c *Client) Close() error {
} }
// SendBlob sends a send blob request to the client. // SendBlob sends a send blob request to the client.
func (c *Client) SendBlob(blob []byte) error { func (c *Client) SendBlob(blob stream.Blob) error {
if !c.connected { if !c.connected {
return errors.Err("not connected") return errors.Err("not connected")
} }
if len(blob) > maxBlobSize { if err := blob.ValidForSend(); err != nil {
return errors.Err("blob must be at most " + strconv.Itoa(maxBlobSize) + " bytes") return errors.Err(err)
} else if len(blob) == 0 {
return errors.Err("blob is empty")
} }
blobHash := BlobHash(blob) blobHash := blob.HashHex()
sendRequest, err := json.Marshal(sendBlobRequest{ sendRequest, err := json.Marshal(sendBlobRequest{
BlobSize: len(blob), BlobSize: blob.Size(),
BlobHash: blobHash, BlobHash: blobHash,
}) })
if err != nil { if err != nil {

View file

@ -11,6 +11,7 @@ import (
"time" "time"
"github.com/lbryio/reflector.go/store" "github.com/lbryio/reflector.go/store"
"github.com/lbryio/reflector.go/stream"
"github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/stop" "github.com/lbryio/lbry.go/stop"
@ -27,7 +28,7 @@ const (
network = "tcp4" network = "tcp4"
protocolVersion1 = 0 protocolVersion1 = 0
protocolVersion2 = 1 protocolVersion2 = 1
maxBlobSize = 2 * 1024 * 1024 maxBlobSize = stream.MaxBlobSize
) )
// Server is and instance of the reflector server. It houses the blob store and listener. // Server is and instance of the reflector server. It houses the blob store and listener.
@ -217,8 +218,8 @@ func (s *Server) receiveBlob(conn net.Conn) error {
return err return err
} }
} else { } else {
// if we can't confirm that we have the full stream, we have to say that the sd blob is // if we can't check for blobs in a stream, we have to say that the sd blob is
// missing. if we say we have it, they wont try to send any content blobs // missing. if we say we have the sd blob, they wont try to send any content blobs
wantsBlob = true wantsBlob = true
} }
} }

View file

@ -9,10 +9,10 @@ import (
"testing" "testing"
"time" "time"
"github.com/davecgh/go-spew/spew" "github.com/lbryio/reflector.go/dht/bits"
"github.com/lbryio/reflector.go/store" "github.com/lbryio/reflector.go/store"
"github.com/lbryio/reflector.go/dht/bits" "github.com/davecgh/go-spew/spew"
"github.com/phayes/freeport" "github.com/phayes/freeport"
) )
@ -224,18 +224,6 @@ func TestServer_PartialUpload(t *testing.T) {
} }
} }
//func MakeRandStream(size int) ([]byte, [][]byte) {
// blobs := make([][]byte, int(math.Ceil(float64(size)/maxBlobSize)))
// for i := 0; i < len(blobs); i++ {
// blobs[i] = randBlob(int(math.Min(maxBlobSize, float64(size))))
// size -= maxBlobSize
// }
//
// //TODO: create SD blob for the stream
//
// return nil, blobs
//}
func randBlob(size int) []byte { func randBlob(size int) []byte {
//if size > maxBlobSize { //if size > maxBlobSize {
// panic("blob size too big") // panic("blob size too big")

View file

@ -4,8 +4,9 @@ import (
"encoding/json" "encoding/json"
"sync" "sync"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/reflector.go/db" "github.com/lbryio/reflector.go/db"
"github.com/lbryio/lbry.go/errors"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -22,12 +23,12 @@ func NewDBBackedS3Store(s3 *S3BlobStore, db *db.SQL) *DBBackedS3Store {
return &DBBackedS3Store{s3: s3, db: db} return &DBBackedS3Store{s3: s3, db: db}
} }
// Has returns T/F or Error ( if the DB errors ) if store contains the blob. // Has returns true if the blob is in the store
func (d *DBBackedS3Store) Has(hash string) (bool, error) { func (d *DBBackedS3Store) Has(hash string) (bool, error) {
return d.db.HasBlob(hash) return d.db.HasBlob(hash)
} }
// Get returns the byte slice of the blob or an error. // Get gets the blob
func (d *DBBackedS3Store) Get(hash string) ([]byte, error) { func (d *DBBackedS3Store) Get(hash string) ([]byte, error) {
return d.s3.Get(hash) return d.s3.Get(hash)
} }

192
stream/blob.go Normal file
View file

@ -0,0 +1,192 @@
package stream
import (
"bytes"
"crypto/aes"
"crypto/rand"
"crypto/sha512"
"encoding/hex"
"encoding/json"
"strconv"
"github.com/lbryio/lbry.go/errors"
)
const MaxBlobSize = 2 * 1024 * 1024
type Blob []byte
var ErrBlobTooBig = errors.Base("blob must be at most " + strconv.Itoa(MaxBlobSize) + " bytes")
var ErrBlobEmpty = errors.Base("blob is empty")
func (b Blob) Size() int {
return len(b)
}
// Hash returns a hash of the blob data
func (b Blob) Hash() []byte {
if b.Size() == 0 {
return nil
}
hashBytes := sha512.Sum384(b)
return hashBytes[:]
}
// HexHash returns th blob hash as a hex string
func (b Blob) HashHex() string {
return hex.EncodeToString(b.Hash())
}
// ValidForSend returns true if the blob size is within the limits
func (b Blob) ValidForSend() error {
if b.Size() > MaxBlobSize {
return ErrBlobTooBig
}
if b.Size() == 0 {
return ErrBlobEmpty
}
return nil
}
// BlobInfo is the stream descriptor info for a single blob in a stream
// Encoding to and from JSON is customized to match existing behavior (see json.go in package)
type BlobInfo struct {
Length int `json:"length"`
BlobNum int `json:"blob_num"`
BlobHash []byte `json:"-"`
IV []byte `json:"-"`
}
// Hash returns the hash of the blob info for calculating the stream hash
func (bi BlobInfo) Hash() []byte {
sum := sha512.New384()
if bi.Length > 0 {
sum.Write([]byte(hex.EncodeToString(bi.BlobHash)))
}
sum.Write([]byte(strconv.Itoa(bi.BlobNum)))
sum.Write([]byte(hex.EncodeToString(bi.IV)))
sum.Write([]byte(strconv.Itoa(bi.Length)))
return sum.Sum(nil)
}
// SDBlob contains information about the rest of the blobs in the stream
// Encoding to and from JSON is customized to match existing behavior (see json.go in package)
type SDBlob struct {
StreamName string `json:"-"`
BlobInfos []BlobInfo `json:"blobs"`
StreamType string `json:"stream_type"`
Key []byte `json:"-"`
SuggestedFileName string `json:"-"`
StreamHash []byte `json:"-"`
ivFunc func() []byte
}
// ToBlob converts the SDBlob to a normal data Blob
func (s SDBlob) ToBlob() (Blob, error) {
b, err := json.Marshal(s)
return Blob(b), err
}
// FromBlob unmarshals a data Blob that should contain SDBlob data
func (s *SDBlob) FromBlob(b Blob) error {
return json.Unmarshal(b, s)
}
func NewSdBlob(blobs []Blob) *SDBlob {
return newSdBlob(blobs, nil, nil)
}
func newSdBlob(blobs []Blob, key []byte, ivs [][]byte) *SDBlob {
sd := &SDBlob{}
if key == nil {
key = randIV()
}
sd.Key = key
if ivs == nil {
ivs = make([][]byte, len(blobs))
for i := range ivs {
ivs[i] = randIV()
}
}
for i, b := range blobs {
sd.addBlob(b, ivs[i])
}
sd.updateStreamHash()
return sd
}
// addBlob adds the blob's info to stream
func (s *SDBlob) addBlob(b Blob, iv []byte) {
if iv == nil {
iv = s.nextIV()
}
s.BlobInfos = append(s.BlobInfos, BlobInfo{
BlobNum: len(s.BlobInfos),
Length: b.Size(),
BlobHash: b.Hash(),
IV: iv,
})
}
// nextIV returns the next IV using ivFunc, or a random IV if no ivFunc is set
func (s SDBlob) nextIV() []byte {
if s.ivFunc != nil {
return s.ivFunc()
}
return randIV()
}
// IsValid returns true if the set StreamHash matches the current hash of the stream data
func (s SDBlob) IsValid() bool {
return bytes.Equal(s.StreamHash, s.computeStreamHash())
}
// updateStreamHash sets the stream hash to the current hash of the stream data
func (s *SDBlob) updateStreamHash() {
s.StreamHash = s.computeStreamHash()
}
// computeStreamHash calculates the stream hash for the stream
func (s *SDBlob) computeStreamHash() []byte {
return streamHash(
hex.EncodeToString([]byte(s.StreamName)),
hex.EncodeToString(s.Key),
hex.EncodeToString([]byte(s.SuggestedFileName)),
s.BlobInfos,
)
}
// streamHash calculates the stream hash, given the stream's fields and blobs
func streamHash(hexStreamName, hexKey, hexSuggestedFileName string, blobInfos []BlobInfo) []byte {
blobSum := sha512.New384()
for _, b := range blobInfos {
blobSum.Write(b.Hash())
}
sum := sha512.New384()
sum.Write([]byte(hexStreamName))
sum.Write([]byte(hexKey))
sum.Write([]byte(hexSuggestedFileName))
sum.Write(blobSum.Sum(nil))
return sum.Sum(nil)
}
// randIV returns a random AES IV
func randIV() []byte {
blob := make([]byte, aes.BlockSize)
_, err := rand.Read(blob)
if err != nil {
panic("failed to make random blob")
}
return blob
}
// NullIV returns an IV of 0s
func NullIV() []byte {
return make([]byte, aes.BlockSize)
}

83
stream/blob_test.go Normal file

File diff suppressed because one or more lines are too long

110
stream/json.go Normal file
View file

@ -0,0 +1,110 @@
package stream
import (
"encoding/hex"
"encoding/json"
"github.com/lbryio/lbry.go/errors"
)
// inspired by https://blog.gopheracademy.com/advent-2016/advanced-encoding-decoding/
type SDBlobAlias SDBlob
type JSONSDBlob struct {
StreamName string `json:"stream_name"`
SDBlobAlias
Key string `json:"key"`
SuggestedFileName string `json:"suggested_file_name"`
StreamHash string `json:"stream_hash"`
}
func (s SDBlob) MarshalJSON() ([]byte, error) {
var tmp JSONSDBlob
tmp.StreamName = hex.EncodeToString([]byte(s.StreamName))
tmp.StreamHash = hex.EncodeToString(s.StreamHash)
tmp.SuggestedFileName = hex.EncodeToString([]byte(s.SuggestedFileName))
tmp.Key = hex.EncodeToString(s.Key)
tmp.SDBlobAlias = SDBlobAlias(s)
return json.Marshal(tmp)
}
func (s *SDBlob) UnmarshalJSON(b []byte) error {
var tmp JSONSDBlob
err := json.Unmarshal(b, &tmp)
if err != nil {
return errors.Err(err)
}
*s = SDBlob(tmp.SDBlobAlias)
str, err := hex.DecodeString(tmp.StreamName)
if err != nil {
return errors.Err(err)
}
s.StreamName = string(str)
str, err = hex.DecodeString(tmp.SuggestedFileName)
if err != nil {
return errors.Err(err)
}
s.SuggestedFileName = string(str)
s.StreamHash, err = hex.DecodeString(tmp.StreamHash)
if err != nil {
return errors.Err(err)
}
s.Key, err = hex.DecodeString(tmp.Key)
if err != nil {
return errors.Err(err)
}
return nil
}
type BlobInfoAlias BlobInfo
type JSONBlobInfo struct {
BlobInfoAlias
BlobHash string `json:"blob_hash,omitempty"`
IV string `json:"iv"`
}
func (bi BlobInfo) MarshalJSON() ([]byte, error) {
var tmp JSONBlobInfo
tmp.IV = hex.EncodeToString(bi.IV)
if len(bi.BlobHash) > 0 {
tmp.BlobHash = hex.EncodeToString(bi.BlobHash)
}
tmp.BlobInfoAlias = BlobInfoAlias(bi)
return json.Marshal(tmp)
}
func (bi *BlobInfo) UnmarshalJSON(b []byte) error {
var tmp JSONBlobInfo
err := json.Unmarshal(b, &tmp)
if err != nil {
return errors.Err(err)
}
*bi = BlobInfo(tmp.BlobInfoAlias)
bi.BlobHash, err = hex.DecodeString(tmp.BlobHash)
if err != nil {
return errors.Err(err)
}
bi.IV, err = hex.DecodeString(tmp.IV)
if err != nil {
return errors.Err(err)
}
return nil
}