diff --git a/db/db.go b/db/db.go index 8b106a8..a9304a9 100644 --- a/db/db.go +++ b/db/db.go @@ -66,17 +66,15 @@ func (s *SQL) AddBlob(hash string, length int, isStored bool) error { return errors.Err("not connected") } - return withTx(s.conn, func(tx *sql.Tx) error { - return addBlob(tx, hash, length, isStored) - }) + return addBlob(s.conn, hash, length, isStored) } -func addBlob(tx *sql.Tx, hash string, length int, isStored bool) error { +func addBlob(e Executor, hash string, length int, isStored bool) error { if length <= 0 { return errors.Err("length must be positive") } - err := execTx(tx, + err := exec(e, "INSERT INTO blob_ (hash, is_stored, length) VALUES (?,?,?) ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))", hash, isStored, length, ) @@ -159,15 +157,13 @@ func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) { // Delete will remove the blob from the db func (s *SQL) Delete(hash string) error { - return withTx(s.conn, func(tx *sql.Tx) error { - err := execTx(tx, "DELETE FROM stream WHERE sd_hash = ?", hash) - if err != nil { - return errors.Err(err) - } - - err = execTx(tx, "DELETE FROM blob_ WHERE hash = ?", hash) + err := exec(s.conn, "DELETE FROM stream WHERE sd_hash = ?", hash) + if err != nil { return errors.Err(err) - }) + } + + err = exec(s.conn, "DELETE FROM blob_ WHERE hash = ?", hash) + return errors.Err(err) } // Block will mark a blob as blocked @@ -259,44 +255,42 @@ func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob SdBlob) error { return errors.Err("not connected") } - return withTx(s.conn, func(tx *sql.Tx) error { - // insert sd blob - err := addBlob(tx, sdHash, sdBlobLength, true) + // insert sd blob + err := addBlob(s.conn, sdHash, sdBlobLength, true) + if err != nil { + return err + } + + // insert stream + err = exec(s.conn, + "INSERT IGNORE INTO stream (hash, sd_hash) VALUES (?,?)", + sdBlob.StreamHash, sdHash, + ) + if err != nil { + return errors.Err(err) + } + + // insert content blobs and connect them to stream + for _, contentBlob := range sdBlob.Blobs { + if contentBlob.BlobHash == "" { + // null terminator blob + continue + } + + err := addBlob(s.conn, contentBlob.BlobHash, contentBlob.Length, false) if err != nil { return err } - // insert stream - err = execTx(tx, - "INSERT IGNORE INTO stream (hash, sd_hash) VALUES (?,?)", - sdBlob.StreamHash, sdHash, + err = exec(s.conn, + "INSERT IGNORE INTO stream_blob (stream_hash, blob_hash, num) VALUES (?,?,?)", + sdBlob.StreamHash, contentBlob.BlobHash, contentBlob.BlobNum, ) if err != nil { return errors.Err(err) } - - // insert content blobs and connect them to stream - for _, contentBlob := range sdBlob.Blobs { - if contentBlob.BlobHash == "" { - // null terminator blob - continue - } - - err := addBlob(tx, contentBlob.BlobHash, contentBlob.Length, false) - if err != nil { - return err - } - - err = execTx(tx, - "INSERT IGNORE INTO stream_blob (stream_hash, blob_hash, num) VALUES (?,?,?)", - sdBlob.StreamHash, contentBlob.BlobHash, contentBlob.BlobNum, - ) - if err != nil { - return errors.Err(err) - } - } - return nil - }) + } + return nil } // GetHashRange gets the smallest and biggest hashes in the db @@ -414,13 +408,17 @@ func closeRows(rows *sql.Rows) { } } -func execTx(tx *sql.Tx, query string, args ...interface{}) error { +type Executor interface { + Exec(query string, args ...interface{}) (sql.Result, error) +} + +func exec(e Executor, query string, args ...interface{}) error { logQuery(query, args...) attempt, maxAttempts := 0, 3 var err error Retry: attempt++ - _, err = tx.Exec(query, args...) + _, err = e.Exec(query, args...) if e, ok := err.(*mysql.MySQLError); ok && attempt <= maxAttempts && e.Number == 1205 { //Error 1205: Lock wait timeout exceeded; try restarting transaction goto Retry