mirror of
https://github.com/LBRYFoundation/reflector.go.git
synced 2025-08-23 17:27:25 +00:00
fix error counter
This commit is contained in:
parent
d61f7c892f
commit
2ed1e23228
1 changed files with 25 additions and 21 deletions
|
@ -16,10 +16,12 @@ import (
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type increment int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
sdInc = 1
|
sdInc increment = iota + 1
|
||||||
blobInc = 2
|
blobInc
|
||||||
errInc = 3
|
errInc
|
||||||
)
|
)
|
||||||
|
|
||||||
type Uploader struct {
|
type Uploader struct {
|
||||||
|
@ -28,7 +30,7 @@ type Uploader struct {
|
||||||
workers int
|
workers int
|
||||||
skipExistsCheck bool
|
skipExistsCheck bool
|
||||||
stopper *stop.Group
|
stopper *stop.Group
|
||||||
countChan chan int
|
countChan chan increment
|
||||||
|
|
||||||
count struct {
|
count struct {
|
||||||
total, alreadyStored, sd, blob, err int
|
total, alreadyStored, sd, blob, err int
|
||||||
|
@ -42,7 +44,7 @@ func NewUploader(db *db.SQL, store *store.DBBackedS3Store, workers int, skipExis
|
||||||
workers: workers,
|
workers: workers,
|
||||||
skipExistsCheck: skipExistsCheck,
|
skipExistsCheck: skipExistsCheck,
|
||||||
stopper: stop.New(),
|
stopper: stop.New(),
|
||||||
countChan: make(chan int),
|
countChan: make(chan increment),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -145,7 +147,13 @@ func (u *Uploader) worker(pathChan chan string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// uploadBlob uploads a blob
|
// uploadBlob uploads a blob
|
||||||
func (u *Uploader) uploadBlob(filepath string) error {
|
func (u *Uploader) uploadBlob(filepath string) (err error) {
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
u.inc(errInc)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
blob, err := ioutil.ReadFile(filepath)
|
blob, err := ioutil.ReadFile(filepath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -154,11 +162,6 @@ func (u *Uploader) uploadBlob(filepath string) error {
|
||||||
hash := BlobHash(blob)
|
hash := BlobHash(blob)
|
||||||
if hash != path.Base(filepath) {
|
if hash != path.Base(filepath) {
|
||||||
return errors.Err("file name does not match hash (%s != %s), skipping", filepath, hash)
|
return errors.Err("file name does not match hash (%s != %s), skipping", filepath, hash)
|
||||||
select {
|
|
||||||
case u.countChan <- errInc:
|
|
||||||
case <-u.stopper.Ch():
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if IsValidJSON(blob) {
|
if IsValidJSON(blob) {
|
||||||
|
@ -167,20 +170,14 @@ func (u *Uploader) uploadBlob(filepath string) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Prefix("Uploading SD blob "+hash, err)
|
return errors.Prefix("Uploading SD blob "+hash, err)
|
||||||
}
|
}
|
||||||
select {
|
u.inc(sdInc)
|
||||||
case u.countChan <- sdInc:
|
|
||||||
case <-u.stopper.Ch():
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
log.Debugf("Uploading blob %s", hash)
|
log.Debugf("Uploading blob %s", hash)
|
||||||
err = u.store.Put(hash, blob)
|
err = u.store.Put(hash, blob)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Prefix("Uploading blob "+hash, err)
|
return errors.Prefix("Uploading blob "+hash, err)
|
||||||
}
|
}
|
||||||
select {
|
u.inc(blobInc)
|
||||||
case u.countChan <- blobInc:
|
|
||||||
case <-u.stopper.Ch():
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -195,11 +192,11 @@ func (u *Uploader) counter() {
|
||||||
select {
|
select {
|
||||||
case <-u.stopper.Ch():
|
case <-u.stopper.Ch():
|
||||||
return
|
return
|
||||||
case countType, ok := <-u.countChan:
|
case incrementType, ok := <-u.countChan:
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
switch countType {
|
switch incrementType {
|
||||||
case sdInc:
|
case sdInc:
|
||||||
u.count.sd++
|
u.count.sd++
|
||||||
case blobInc:
|
case blobInc:
|
||||||
|
@ -214,6 +211,13 @@ func (u *Uploader) counter() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (u *Uploader) inc(t increment) {
|
||||||
|
select {
|
||||||
|
case u.countChan <- t:
|
||||||
|
case <-u.stopper.Ch():
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// getPaths returns the paths for files to upload. it takes a path to a file or a dir. for a file,
|
// getPaths returns the paths for files to upload. it takes a path to a file or a dir. for a file,
|
||||||
// it returns the full path to that file. for a dir, it returns the paths for all the files in the
|
// it returns the full path to that file. for a dir, it returns the paths for all the files in the
|
||||||
// dir
|
// dir
|
||||||
|
|
Loading…
Add table
Reference in a new issue