From 0e0b2aaea30812e062bd1d1f81f2ac448471579a Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Tue, 7 Aug 2018 16:51:02 -0400 Subject: [PATCH] drop DB interface, attempt to fix max conn issues using interpolateParams --- db/db.go | 104 +++++++++++++++++++++++----------------------- store/dbbacked.go | 7 ++-- types/types.go | 16 ------- 3 files changed, 54 insertions(+), 73 deletions(-) delete mode 100644 types/types.go diff --git a/db/db.go b/db/db.go index 8eb8dc6..d69e0e0 100644 --- a/db/db.go +++ b/db/db.go @@ -7,23 +7,27 @@ import ( "github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/querytools" "github.com/lbryio/reflector.go/dht/bits" - "github.com/lbryio/reflector.go/types" - // blank import for db driver - _ "github.com/go-sql-driver/mysql" + + _ "github.com/go-sql-driver/mysql" // blank import for db driver log "github.com/sirupsen/logrus" ) -// DB interface communicates to a backend database with a simple set of methods that supports tracking blobs that are -// used together with a BlobStore. The DB tracks pointers and the BlobStore stores the data. -type DB interface { - Connect(string) error - HasBlob(string) (bool, error) - AddBlob(string, int, bool) error - AddSDBlob(string, int, types.SdBlob) error - HasFullStream(string) (bool, error) +// SdBlob is a special blob that contains information on the rest of the blobs in the stream +type SdBlob struct { + StreamName string `json:"stream_name"` + Blobs []struct { + Length int `json:"length"` + BlobNum int `json:"blob_num"` + BlobHash string `json:"blob_hash,omitempty"` + Iv string `json:"iv"` + } `json:"blobs"` + StreamType string `json:"stream_type"` + Key string `json:"key"` + SuggestedFileName string `json:"suggested_file_name"` + StreamHash string `json:"stream_hash"` } -// SQL is the container for the supporting MySQL database connection. +// SQL implements the DB interface type SQL struct { conn *sql.DB } @@ -40,7 +44,10 @@ func logQuery(query string, args ...interface{}) { // Connect will create a connection to the database func (s *SQL) Connect(dsn string) error { var err error - dsn += "?parseTime=1&collation=utf8mb4_unicode_ci" + // interpolateParams is necessary. otherwise uploading a stream with thousands of blobs + // will hit MySQL's max_prepared_stmt_count limit because the prepared statements are all + // opened inside a transaction. closing them manually doesn't seem to help + dsn += "?parseTime=1&collation=utf8mb4_unicode_ci&interpolateParams=1" s.conn, err = sql.Open("mysql", dsn) if err != nil { return errors.Err(err) @@ -49,7 +56,7 @@ func (s *SQL) Connect(dsn string) error { return errors.Err(s.conn.Ping()) } -// AddBlob adds a blobs information to the database. +// AddBlob adds a blob to the database. func (s *SQL) AddBlob(hash string, length int, isStored bool) error { if s.conn == nil { return errors.Err("not connected") @@ -65,17 +72,10 @@ func addBlob(tx *sql.Tx, hash string, length int, isStored bool) error { return errors.Err("length must be positive") } - query := "INSERT INTO blob_ (hash, is_stored, length) VALUES (?,?,?) ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))" - args := []interface{}{hash, isStored, length} - - logQuery(query, args...) - - stmt, err := tx.Prepare(query) - if err != nil { - return errors.Err(err) - } - - _, err = stmt.Exec(args...) + err := execPrepared(tx, + "INSERT INTO blob_ (hash, is_stored, length) VALUES (?,?,?) ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))", + []interface{}{hash, isStored, length}, + ) if err != nil { return errors.Err(err) } @@ -187,7 +187,7 @@ func (s *SQL) HasFullStream(sdHash string) (bool, error) { // AddSDBlob takes the SD Hash number of blobs and the set of blobs. In a single db tx it inserts the sdblob information // into a stream, and inserts the associated blobs' information in the database. If a blob fails the transaction is // rolled back and error(s) are returned. -func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob types.SdBlob) error { +func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob SdBlob) error { if s.conn == nil { return errors.Err("not connected") } @@ -200,17 +200,10 @@ func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob types.SdBlob) er } // insert stream - query := "INSERT IGNORE INTO stream (hash, sd_hash) VALUES (?,?)" - args := []interface{}{sdBlob.StreamHash, sdHash} - - logQuery(query, args...) - - stmt, err := tx.Prepare(query) - if err != nil { - return errors.Err(err) - } - - _, err = stmt.Exec(args...) + err = execPrepared(tx, + "INSERT IGNORE INTO stream (hash, sd_hash) VALUES (?,?)", + []interface{}{sdBlob.StreamHash, sdHash}, + ) if err != nil { return errors.Err(err) } @@ -227,22 +220,10 @@ func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob types.SdBlob) er return err } - query := "INSERT IGNORE INTO stream_blob (stream_hash, blob_hash, num) VALUES (?,?,?)" - args := []interface{}{sdBlob.StreamHash, contentBlob.BlobHash, contentBlob.BlobNum} - - logQuery(query, args...) - - stmt, err := tx.Prepare(query) - if err != nil { - return errors.Err(err) - } - - _, err = stmt.Exec(args...) - if err != nil { - return errors.Err(err) - } - - err = stmt.Close() + err = execPrepared(tx, + "INSERT IGNORE INTO stream_blob (stream_hash, blob_hash, num) VALUES (?,?,?)", + []interface{}{sdBlob.StreamHash, contentBlob.BlobHash, contentBlob.BlobNum}, + ) if err != nil { return errors.Err(err) } @@ -371,6 +352,23 @@ func closeRows(rows *sql.Rows) { } } +func execPrepared(tx *sql.Tx, query string, args []interface{}) error { + logQuery(query, args...) + + stmt, err := tx.Prepare(query) + if err != nil { + return errors.Err(err) + } + + _, err = stmt.Exec(args...) + if err != nil { + return errors.Err(err) + } + + err = stmt.Close() + return errors.Err(err) +} + /* SQL schema CREATE TABLE blob_ ( diff --git a/store/dbbacked.go b/store/dbbacked.go index 916fe68..3e5ac32 100644 --- a/store/dbbacked.go +++ b/store/dbbacked.go @@ -5,17 +5,16 @@ import ( "github.com/lbryio/lbry.go/errors" "github.com/lbryio/reflector.go/db" - "github.com/lbryio/reflector.go/types" ) // DBBackedS3Store is an instance of an S3 Store that is backed by a DB for what is stored. type DBBackedS3Store struct { s3 *S3BlobStore - db db.DB + db *db.SQL } // NewDBBackedS3Store returns an initialized store pointer. -func NewDBBackedS3Store(s3 *S3BlobStore, db db.DB) *DBBackedS3Store { +func NewDBBackedS3Store(s3 *S3BlobStore, db *db.SQL) *DBBackedS3Store { return &DBBackedS3Store{s3: s3, db: db} } @@ -42,7 +41,7 @@ func (d *DBBackedS3Store) Put(hash string, blob []byte) error { // PutSD stores the SDBlob in the S3 store. It will return an error if the sd blob is missing the stream hash or if // there is an error storing the blob information in the DB. func (d *DBBackedS3Store) PutSD(hash string, blob []byte) error { - var blobContents types.SdBlob + var blobContents db.SdBlob err := json.Unmarshal(blob, &blobContents) if err != nil { return err diff --git a/types/types.go b/types/types.go deleted file mode 100644 index f63e560..0000000 --- a/types/types.go +++ /dev/null @@ -1,16 +0,0 @@ -package types - -// SdBlob is an instance of specialized blob that contains information on the rest of the blobs it is associated with. -type SdBlob struct { - StreamName string `json:"stream_name"` - Blobs []struct { - Length int `json:"length"` - BlobNum int `json:"blob_num"` - BlobHash string `json:"blob_hash,omitempty"` - Iv string `json:"iv"` - } `json:"blobs"` - StreamType string `json:"stream_type"` - Key string `json:"key"` - SuggestedFileName string `json:"suggested_file_name"` - StreamHash string `json:"stream_hash"` -}