accepts full streams, stores them in db

This commit is contained in:
Alex Grintsvayg 2018-02-02 16:49:20 -05:00
parent ca958c0711
commit 7b3ac43fff
8 changed files with 156 additions and 24 deletions

106
db/db.go
View file

@ -3,6 +3,8 @@ package db
import ( import (
"database/sql" "database/sql"
"github.com/lbryio/reflector.go/types"
"github.com/lbryio/errors.go" "github.com/lbryio/errors.go"
qtools "github.com/lbryio/query.go" qtools "github.com/lbryio/query.go"
@ -12,8 +14,9 @@ import (
type DB interface { type DB interface {
Connect(string) error Connect(string) error
AddBlob(string, int) error
HasBlob(string) (bool, error) HasBlob(string) (bool, error)
AddBlob(string, int, bool) error
AddSDBlob(string, int, types.SdBlob) error
} }
type SQL struct { type SQL struct {
@ -40,7 +43,7 @@ func (s *SQL) Connect(dsn string) error {
return errors.Err(s.conn.Ping()) return errors.Err(s.conn.Ping())
} }
func (s *SQL) AddBlob(hash string, length int) error { func (s *SQL) AddBlob(hash string, length int, stored bool) error {
if s.conn == nil { if s.conn == nil {
return errors.Err("not connected") return errors.Err("not connected")
} }
@ -49,8 +52,8 @@ func (s *SQL) AddBlob(hash string, length int) error {
return errors.Err("length must be positive") return errors.Err("length must be positive")
} }
query := "INSERT IGNORE INTO blobs (hash, length) VALUES (?,?)" query := "INSERT INTO blob_ (hash, stored, length) VALUES (?,?,?) ON DUPLICATE KEY UPDATE stored = (stored or VALUES(stored))"
args := []interface{}{hash, length} args := []interface{}{hash, stored, length}
logQuery(query, args...) logQuery(query, args...)
@ -72,8 +75,8 @@ func (s *SQL) HasBlob(hash string) (bool, error) {
return false, errors.Err("not connected") return false, errors.Err("not connected")
} }
query := "SELECT EXISTS(SELECT 1 FROM blobs WHERE hash = ?)" query := "SELECT EXISTS(SELECT 1 FROM blob_ WHERE hash = ? AND stored = ?)"
args := []interface{}{hash} args := []interface{}{hash, true}
logQuery(query, args...) logQuery(query, args...)
@ -84,3 +87,94 @@ func (s *SQL) HasBlob(hash string) (bool, error) {
return exists, errors.Err(err) return exists, errors.Err(err)
} }
func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob types.SdBlob) error {
if s.conn == nil {
return errors.Err("not connected")
}
// TODO: should do all this in transaction
// insert sd blob
err := s.AddBlob(sdHash, sdBlobLength, true)
if err != nil {
return err
}
// insert stream
query := "INSERT IGNORE INTO stream (hash, sd_hash) VALUES (?,?)"
args := []interface{}{sdBlob.StreamHash, sdHash}
logQuery(query, args...)
stmt, err := s.conn.Prepare(query)
if err != nil {
return errors.Err(err)
}
_, err = stmt.Exec(args...)
if err != nil {
return errors.Err(err)
}
// insert content blobs and connect them to stream
for _, contentBlob := range sdBlob.Blobs {
if contentBlob.BlobHash == "" {
// null terminator blob
continue
}
err := s.AddBlob(contentBlob.BlobHash, contentBlob.Length, false)
if err != nil {
return err
}
query := "INSERT IGNORE INTO stream_blob (stream_hash, blob_hash, num) VALUES (?,?,?)"
args := []interface{}{sdBlob.StreamHash, contentBlob.BlobHash, contentBlob.BlobNum}
logQuery(query, args...)
stmt, err := s.conn.Prepare(query)
if err != nil {
return errors.Err(err)
}
_, err = stmt.Exec(args...)
if err != nil {
return errors.Err(err)
}
}
return nil
}
func schema() {
_ = `
CREATE TABLE blob_ (
hash char(96) NOT NULL,
stored TINYINT(1) NOT NULL DEFAULT 0,
length bigint(20) unsigned DEFAULT NULL,
last_announced_at datetime DEFAULT NULL,
PRIMARY KEY (hash),
KEY last_announced_at_idx (last_announced_at)
) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE TABLE stream (
hash char(96) NOT NULL,
sd_hash char(96) NOT NULL,
PRIMARY KEY (hash),
KEY sd_hash_idx (sd_hash),
FOREIGN KEY stream_sd_hash_blob_hash (sd_hash) REFERENCES blob_ (hash) ON DELETE RESTRICT ON UPDATE CASCADE
) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE TABLE stream_blob (
stream_hash char(96) NOT NULL,
blob_hash char(96) NOT NULL,
num int NOT NULL,
PRIMARY KEY (stream_hash, blob_hash),
FOREIGN KEY stream_hash_stream_hash (stream_hash) REFERENCES stream (hash) ON DELETE CASCADE ON UPDATE CASCADE,
FOREIGN KEY blob_hash_blob_hash (blob_hash) REFERENCES blob_ (hash) ON DELETE CASCADE ON UPDATE CASCADE
) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
`
}

View file

@ -115,9 +115,14 @@ func (s *Server) receiveBlob(conn net.Conn) error {
return errors.Err("hash of received blob data does not match hash from send request") return errors.Err("hash of received blob data does not match hash from send request")
// this can also happen if the blob size is wrong, because the server will read the wrong number of bytes from the stream // this can also happen if the blob size is wrong, because the server will read the wrong number of bytes from the stream
} }
log.Println("Got blob " + blobHash[:8]) log.Println("Got blob " + blobHash[:8])
err = s.store.Put(blobHash, blob) if isSdBlob {
err = s.store.PutSD(blobHash, blob)
} else {
err = s.store.Put(blobHash, blob)
}
if err != nil { if err != nil {
return err return err
} }

View file

@ -54,18 +54,3 @@ func getBlobHash(blob []byte) string {
hashBytes := sha512.Sum384(blob) hashBytes := sha512.Sum384(blob)
return hex.EncodeToString(hashBytes[:]) return hex.EncodeToString(hashBytes[:])
} }
// can be used to read the sd blob and then return a list of blobs that are actually missing
type sdBlobContents struct {
StreamName string `json:"stream_name"`
Blobs []struct {
Length int `json:"length"`
BlobNum int `json:"blob_num"`
BlobHash string `json:"blob_hash,omitempty"`
Iv string `json:"iv"`
} `json:"blobs"`
StreamType string `json:"stream_type"`
Key string `json:"key"`
SuggestedFileName string `json:"suggested_file_name"`
StreamHash string `json:"stream_hash"`
}

View file

@ -1,6 +1,12 @@
package store package store
import "github.com/lbryio/reflector.go/db" import (
"encoding/json"
"github.com/go-errors/errors"
"github.com/lbryio/reflector.go/db"
"github.com/lbryio/reflector.go/types"
)
type DBBackedS3Store struct { type DBBackedS3Store struct {
s3 *S3BlobStore s3 *S3BlobStore
@ -25,5 +31,23 @@ func (d *DBBackedS3Store) Put(hash string, blob []byte) error {
return err return err
} }
return d.db.AddBlob(hash, len(blob)) return d.db.AddBlob(hash, len(blob), true)
}
func (d *DBBackedS3Store) PutSD(hash string, blob []byte) error {
var blobContents types.SdBlob
err := json.Unmarshal(blob, &blobContents)
if err != nil {
return err
}
if blobContents.StreamHash == "" {
return errors.New("sd blob is missing stream hash")
}
err = d.s3.PutSD(hash, blob)
if err != nil {
return err
}
return d.db.AddSDBlob(hash, len(blob), blobContents)
} }

View file

@ -81,3 +81,7 @@ func (f *FileBlobStore) Put(hash string, blob []byte) error {
return ioutil.WriteFile(f.path(hash), blob, 0644) return ioutil.WriteFile(f.path(hash), blob, 0644)
} }
func (f *FileBlobStore) PutSD(hash string, blob []byte) error {
return f.Put(hash, blob)
}

View file

@ -105,3 +105,7 @@ func (s *S3BlobStore) Put(hash string, blob []byte) error {
return err return err
} }
func (s *S3BlobStore) PutSD(hash string, blob []byte) error {
return s.Put(hash, blob)
}

View file

@ -4,4 +4,5 @@ type BlobStore interface {
Has(string) (bool, error) Has(string) (bool, error)
Get(string) ([]byte, error) Get(string) ([]byte, error)
Put(string, []byte) error Put(string, []byte) error
PutSD(string, []byte) error
} }

15
types/types.go Normal file
View file

@ -0,0 +1,15 @@
package types
type SdBlob struct {
StreamName string `json:"stream_name"`
Blobs []struct {
Length int `json:"length"`
BlobNum int `json:"blob_num"`
BlobHash string `json:"blob_hash,omitempty"`
Iv string `json:"iv"`
} `json:"blobs"`
StreamType string `json:"stream_type"`
Key string `json:"key"`
SuggestedFileName string `json:"suggested_file_name"`
StreamHash string `json:"stream_hash"`
}