diff --git a/reflector/uploader.go b/reflector/uploader.go index 0c3e6dd..5ad4421 100644 --- a/reflector/uploader.go +++ b/reflector/uploader.go @@ -16,10 +16,12 @@ import ( log "github.com/sirupsen/logrus" ) +type increment int + const ( - sdInc = 1 - blobInc = 2 - errInc = 3 + sdInc increment = iota + 1 + blobInc + errInc ) type Uploader struct { @@ -28,7 +30,7 @@ type Uploader struct { workers int skipExistsCheck bool stopper *stop.Group - countChan chan int + countChan chan increment count struct { total, alreadyStored, sd, blob, err int @@ -42,7 +44,7 @@ func NewUploader(db *db.SQL, store *store.DBBackedS3Store, workers int, skipExis workers: workers, skipExistsCheck: skipExistsCheck, stopper: stop.New(), - countChan: make(chan int), + countChan: make(chan increment), } } @@ -145,7 +147,13 @@ func (u *Uploader) worker(pathChan chan string) { } // uploadBlob uploads a blob -func (u *Uploader) uploadBlob(filepath string) error { +func (u *Uploader) uploadBlob(filepath string) (err error) { + defer func() { + if err != nil { + u.inc(errInc) + } + }() + blob, err := ioutil.ReadFile(filepath) if err != nil { return err @@ -154,11 +162,6 @@ func (u *Uploader) uploadBlob(filepath string) error { hash := BlobHash(blob) if hash != path.Base(filepath) { return errors.Err("file name does not match hash (%s != %s), skipping", filepath, hash) - select { - case u.countChan <- errInc: - case <-u.stopper.Ch(): - } - return nil } if IsValidJSON(blob) { @@ -167,20 +170,14 @@ func (u *Uploader) uploadBlob(filepath string) error { if err != nil { return errors.Prefix("Uploading SD blob "+hash, err) } - select { - case u.countChan <- sdInc: - case <-u.stopper.Ch(): - } + u.inc(sdInc) } else { log.Debugf("Uploading blob %s", hash) err = u.store.Put(hash, blob) if err != nil { return errors.Prefix("Uploading blob "+hash, err) } - select { - case u.countChan <- blobInc: - case <-u.stopper.Ch(): - } + u.inc(blobInc) } return nil @@ -195,11 +192,11 @@ func (u *Uploader) counter() { select { case <-u.stopper.Ch(): return - case countType, ok := <-u.countChan: + case incrementType, ok := <-u.countChan: if !ok { return } - switch countType { + switch incrementType { case sdInc: u.count.sd++ case blobInc: @@ -214,6 +211,13 @@ func (u *Uploader) counter() { } } +func (u *Uploader) inc(t increment) { + select { + case u.countChan <- t: + case <-u.stopper.Ch(): + } +} + // getPaths returns the paths for files to upload. it takes a path to a file or a dir. for a file, // it returns the full path to that file. for a dir, it returns the paths for all the files in the // dir