mirror of
https://github.com/LBRYFoundation/reflector.go.git
synced 2025-08-23 17:27:25 +00:00
maybe removing this transaction will improve the situation?
This commit is contained in:
parent
6166ff37cf
commit
2047fe6c05
1 changed files with 43 additions and 45 deletions
88
db/db.go
88
db/db.go
|
@ -66,17 +66,15 @@ func (s *SQL) AddBlob(hash string, length int, isStored bool) error {
|
||||||
return errors.Err("not connected")
|
return errors.Err("not connected")
|
||||||
}
|
}
|
||||||
|
|
||||||
return withTx(s.conn, func(tx *sql.Tx) error {
|
return addBlob(s.conn, hash, length, isStored)
|
||||||
return addBlob(tx, hash, length, isStored)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func addBlob(tx *sql.Tx, hash string, length int, isStored bool) error {
|
func addBlob(e Executor, hash string, length int, isStored bool) error {
|
||||||
if length <= 0 {
|
if length <= 0 {
|
||||||
return errors.Err("length must be positive")
|
return errors.Err("length must be positive")
|
||||||
}
|
}
|
||||||
|
|
||||||
err := execTx(tx,
|
err := exec(e,
|
||||||
"INSERT INTO blob_ (hash, is_stored, length) VALUES (?,?,?) ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))",
|
"INSERT INTO blob_ (hash, is_stored, length) VALUES (?,?,?) ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))",
|
||||||
hash, isStored, length,
|
hash, isStored, length,
|
||||||
)
|
)
|
||||||
|
@ -159,15 +157,13 @@ func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
|
||||||
|
|
||||||
// Delete will remove the blob from the db
|
// Delete will remove the blob from the db
|
||||||
func (s *SQL) Delete(hash string) error {
|
func (s *SQL) Delete(hash string) error {
|
||||||
return withTx(s.conn, func(tx *sql.Tx) error {
|
err := exec(s.conn, "DELETE FROM stream WHERE sd_hash = ?", hash)
|
||||||
err := execTx(tx, "DELETE FROM stream WHERE sd_hash = ?", hash)
|
if err != nil {
|
||||||
if err != nil {
|
|
||||||
return errors.Err(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = execTx(tx, "DELETE FROM blob_ WHERE hash = ?", hash)
|
|
||||||
return errors.Err(err)
|
return errors.Err(err)
|
||||||
})
|
}
|
||||||
|
|
||||||
|
err = exec(s.conn, "DELETE FROM blob_ WHERE hash = ?", hash)
|
||||||
|
return errors.Err(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Block will mark a blob as blocked
|
// Block will mark a blob as blocked
|
||||||
|
@ -259,44 +255,42 @@ func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob SdBlob) error {
|
||||||
return errors.Err("not connected")
|
return errors.Err("not connected")
|
||||||
}
|
}
|
||||||
|
|
||||||
return withTx(s.conn, func(tx *sql.Tx) error {
|
// insert sd blob
|
||||||
// insert sd blob
|
err := addBlob(s.conn, sdHash, sdBlobLength, true)
|
||||||
err := addBlob(tx, sdHash, sdBlobLength, true)
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// insert stream
|
||||||
|
err = exec(s.conn,
|
||||||
|
"INSERT IGNORE INTO stream (hash, sd_hash) VALUES (?,?)",
|
||||||
|
sdBlob.StreamHash, sdHash,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Err(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// insert content blobs and connect them to stream
|
||||||
|
for _, contentBlob := range sdBlob.Blobs {
|
||||||
|
if contentBlob.BlobHash == "" {
|
||||||
|
// null terminator blob
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
err := addBlob(s.conn, contentBlob.BlobHash, contentBlob.Length, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// insert stream
|
err = exec(s.conn,
|
||||||
err = execTx(tx,
|
"INSERT IGNORE INTO stream_blob (stream_hash, blob_hash, num) VALUES (?,?,?)",
|
||||||
"INSERT IGNORE INTO stream (hash, sd_hash) VALUES (?,?)",
|
sdBlob.StreamHash, contentBlob.BlobHash, contentBlob.BlobNum,
|
||||||
sdBlob.StreamHash, sdHash,
|
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Err(err)
|
return errors.Err(err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
// insert content blobs and connect them to stream
|
return nil
|
||||||
for _, contentBlob := range sdBlob.Blobs {
|
|
||||||
if contentBlob.BlobHash == "" {
|
|
||||||
// null terminator blob
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
err := addBlob(tx, contentBlob.BlobHash, contentBlob.Length, false)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = execTx(tx,
|
|
||||||
"INSERT IGNORE INTO stream_blob (stream_hash, blob_hash, num) VALUES (?,?,?)",
|
|
||||||
sdBlob.StreamHash, contentBlob.BlobHash, contentBlob.BlobNum,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Err(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetHashRange gets the smallest and biggest hashes in the db
|
// GetHashRange gets the smallest and biggest hashes in the db
|
||||||
|
@ -414,13 +408,17 @@ func closeRows(rows *sql.Rows) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func execTx(tx *sql.Tx, query string, args ...interface{}) error {
|
type Executor interface {
|
||||||
|
Exec(query string, args ...interface{}) (sql.Result, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func exec(e Executor, query string, args ...interface{}) error {
|
||||||
logQuery(query, args...)
|
logQuery(query, args...)
|
||||||
attempt, maxAttempts := 0, 3
|
attempt, maxAttempts := 0, 3
|
||||||
var err error
|
var err error
|
||||||
Retry:
|
Retry:
|
||||||
attempt++
|
attempt++
|
||||||
_, err = tx.Exec(query, args...)
|
_, err = e.Exec(query, args...)
|
||||||
if e, ok := err.(*mysql.MySQLError); ok && attempt <= maxAttempts && e.Number == 1205 {
|
if e, ok := err.(*mysql.MySQLError); ok && attempt <= maxAttempts && e.Number == 1205 {
|
||||||
//Error 1205: Lock wait timeout exceeded; try restarting transaction
|
//Error 1205: Lock wait timeout exceeded; try restarting transaction
|
||||||
goto Retry
|
goto Retry
|
||||||
|
|
Loading…
Add table
Reference in a new issue