diff --git a/db/db.go b/db/db.go index 1b6f428..c1acbd5 100644 --- a/db/db.go +++ b/db/db.go @@ -3,6 +3,8 @@ package db import ( "database/sql" + "github.com/lbryio/reflector.go/types" + "github.com/lbryio/errors.go" qtools "github.com/lbryio/query.go" @@ -12,8 +14,9 @@ import ( type DB interface { Connect(string) error - AddBlob(string, int) error HasBlob(string) (bool, error) + AddBlob(string, int, bool) error + AddSDBlob(string, int, types.SdBlob) error } type SQL struct { @@ -40,7 +43,7 @@ func (s *SQL) Connect(dsn string) error { return errors.Err(s.conn.Ping()) } -func (s *SQL) AddBlob(hash string, length int) error { +func (s *SQL) AddBlob(hash string, length int, stored bool) error { if s.conn == nil { return errors.Err("not connected") } @@ -49,8 +52,8 @@ func (s *SQL) AddBlob(hash string, length int) error { return errors.Err("length must be positive") } - query := "INSERT IGNORE INTO blobs (hash, length) VALUES (?,?)" - args := []interface{}{hash, length} + query := "INSERT INTO blob_ (hash, stored, length) VALUES (?,?,?) ON DUPLICATE KEY UPDATE stored = (stored or VALUES(stored))" + args := []interface{}{hash, stored, length} logQuery(query, args...) @@ -72,8 +75,8 @@ func (s *SQL) HasBlob(hash string) (bool, error) { return false, errors.Err("not connected") } - query := "SELECT EXISTS(SELECT 1 FROM blobs WHERE hash = ?)" - args := []interface{}{hash} + query := "SELECT EXISTS(SELECT 1 FROM blob_ WHERE hash = ? AND stored = ?)" + args := []interface{}{hash, true} logQuery(query, args...) @@ -84,3 +87,94 @@ func (s *SQL) HasBlob(hash string) (bool, error) { return exists, errors.Err(err) } + +func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob types.SdBlob) error { + if s.conn == nil { + return errors.Err("not connected") + } + + // TODO: should do all this in transaction + + // insert sd blob + err := s.AddBlob(sdHash, sdBlobLength, true) + if err != nil { + return err + } + + // insert stream + query := "INSERT IGNORE INTO stream (hash, sd_hash) VALUES (?,?)" + args := []interface{}{sdBlob.StreamHash, sdHash} + + logQuery(query, args...) + + stmt, err := s.conn.Prepare(query) + if err != nil { + return errors.Err(err) + } + + _, err = stmt.Exec(args...) + if err != nil { + return errors.Err(err) + } + + // insert content blobs and connect them to stream + for _, contentBlob := range sdBlob.Blobs { + if contentBlob.BlobHash == "" { + // null terminator blob + continue + } + + err := s.AddBlob(contentBlob.BlobHash, contentBlob.Length, false) + if err != nil { + return err + } + + query := "INSERT IGNORE INTO stream_blob (stream_hash, blob_hash, num) VALUES (?,?,?)" + args := []interface{}{sdBlob.StreamHash, contentBlob.BlobHash, contentBlob.BlobNum} + + logQuery(query, args...) + + stmt, err := s.conn.Prepare(query) + if err != nil { + return errors.Err(err) + } + + _, err = stmt.Exec(args...) + if err != nil { + return errors.Err(err) + } + } + + return nil +} + +func schema() { + _ = ` +CREATE TABLE blob_ ( + hash char(96) NOT NULL, + stored TINYINT(1) NOT NULL DEFAULT 0, + length bigint(20) unsigned DEFAULT NULL, + last_announced_at datetime DEFAULT NULL, + PRIMARY KEY (hash), + KEY last_announced_at_idx (last_announced_at) +) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; + +CREATE TABLE stream ( + hash char(96) NOT NULL, + sd_hash char(96) NOT NULL, + PRIMARY KEY (hash), + KEY sd_hash_idx (sd_hash), + FOREIGN KEY stream_sd_hash_blob_hash (sd_hash) REFERENCES blob_ (hash) ON DELETE RESTRICT ON UPDATE CASCADE +) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; + +CREATE TABLE stream_blob ( + stream_hash char(96) NOT NULL, + blob_hash char(96) NOT NULL, + num int NOT NULL, + PRIMARY KEY (stream_hash, blob_hash), + FOREIGN KEY stream_hash_stream_hash (stream_hash) REFERENCES stream (hash) ON DELETE CASCADE ON UPDATE CASCADE, + FOREIGN KEY blob_hash_blob_hash (blob_hash) REFERENCES blob_ (hash) ON DELETE CASCADE ON UPDATE CASCADE +) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; + +` +} diff --git a/reflector/server.go b/reflector/server.go index 957d2e5..14b47ba 100644 --- a/reflector/server.go +++ b/reflector/server.go @@ -115,9 +115,14 @@ func (s *Server) receiveBlob(conn net.Conn) error { return errors.Err("hash of received blob data does not match hash from send request") // this can also happen if the blob size is wrong, because the server will read the wrong number of bytes from the stream } + log.Println("Got blob " + blobHash[:8]) - err = s.store.Put(blobHash, blob) + if isSdBlob { + err = s.store.PutSD(blobHash, blob) + } else { + err = s.store.Put(blobHash, blob) + } if err != nil { return err } diff --git a/reflector/shared.go b/reflector/shared.go index ad11d38..0072cb7 100644 --- a/reflector/shared.go +++ b/reflector/shared.go @@ -54,18 +54,3 @@ func getBlobHash(blob []byte) string { hashBytes := sha512.Sum384(blob) return hex.EncodeToString(hashBytes[:]) } - -// can be used to read the sd blob and then return a list of blobs that are actually missing -type sdBlobContents struct { - StreamName string `json:"stream_name"` - Blobs []struct { - Length int `json:"length"` - BlobNum int `json:"blob_num"` - BlobHash string `json:"blob_hash,omitempty"` - Iv string `json:"iv"` - } `json:"blobs"` - StreamType string `json:"stream_type"` - Key string `json:"key"` - SuggestedFileName string `json:"suggested_file_name"` - StreamHash string `json:"stream_hash"` -} diff --git a/store/dbbacked.go b/store/dbbacked.go index 9ac29b7..b87ea49 100644 --- a/store/dbbacked.go +++ b/store/dbbacked.go @@ -1,6 +1,12 @@ package store -import "github.com/lbryio/reflector.go/db" +import ( + "encoding/json" + + "github.com/go-errors/errors" + "github.com/lbryio/reflector.go/db" + "github.com/lbryio/reflector.go/types" +) type DBBackedS3Store struct { s3 *S3BlobStore @@ -25,5 +31,23 @@ func (d *DBBackedS3Store) Put(hash string, blob []byte) error { return err } - return d.db.AddBlob(hash, len(blob)) + return d.db.AddBlob(hash, len(blob), true) +} + +func (d *DBBackedS3Store) PutSD(hash string, blob []byte) error { + var blobContents types.SdBlob + err := json.Unmarshal(blob, &blobContents) + if err != nil { + return err + } + if blobContents.StreamHash == "" { + return errors.New("sd blob is missing stream hash") + } + + err = d.s3.PutSD(hash, blob) + if err != nil { + return err + } + + return d.db.AddSDBlob(hash, len(blob), blobContents) } diff --git a/store/file.go b/store/file.go index 5d48fdd..0396790 100644 --- a/store/file.go +++ b/store/file.go @@ -81,3 +81,7 @@ func (f *FileBlobStore) Put(hash string, blob []byte) error { return ioutil.WriteFile(f.path(hash), blob, 0644) } + +func (f *FileBlobStore) PutSD(hash string, blob []byte) error { + return f.Put(hash, blob) +} diff --git a/store/s3.go b/store/s3.go index b7cc40d..78ff838 100644 --- a/store/s3.go +++ b/store/s3.go @@ -105,3 +105,7 @@ func (s *S3BlobStore) Put(hash string, blob []byte) error { return err } + +func (s *S3BlobStore) PutSD(hash string, blob []byte) error { + return s.Put(hash, blob) +} diff --git a/store/store.go b/store/store.go index 053ee93..43e76ae 100644 --- a/store/store.go +++ b/store/store.go @@ -4,4 +4,5 @@ type BlobStore interface { Has(string) (bool, error) Get(string) ([]byte, error) Put(string, []byte) error + PutSD(string, []byte) error } diff --git a/types/types.go b/types/types.go new file mode 100644 index 0000000..8e582e6 --- /dev/null +++ b/types/types.go @@ -0,0 +1,15 @@ +package types + +type SdBlob struct { + StreamName string `json:"stream_name"` + Blobs []struct { + Length int `json:"length"` + BlobNum int `json:"blob_num"` + BlobHash string `json:"blob_hash,omitempty"` + Iv string `json:"iv"` + } `json:"blobs"` + StreamType string `json:"stream_type"` + Key string `json:"key"` + SuggestedFileName string `json:"suggested_file_name"` + StreamHash string `json:"stream_hash"` +}