diff --git a/cmd/test.go b/cmd/test.go index f2bdced..fbb03d2 100644 --- a/cmd/test.go +++ b/cmd/test.go @@ -1,8 +1,11 @@ package cmd import ( - "github.com/davecgh/go-spew/spew" + "github.com/lbryio/reflector.go/db" "github.com/lbryio/reflector.go/reflector" + "github.com/lbryio/reflector.go/store" + + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) @@ -16,5 +19,43 @@ func init() { } func testCmd(cmd *cobra.Command, args []string) { - spew.Dump(reflector.BlockedSdHashes()) + db := new(db.SQL) + err := db.Connect(globalConfig.DBConn) + if err != nil { + log.Fatal(err) + } + + s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) + combo := store.NewDBBackedS3Store(s3, db) + + values, err := reflector.BlockedSdHashes() + if err != nil { + log.Fatal(err) + } + + for _, v := range values { + if v.Err != nil { + continue + } + + has, err := db.HasBlob(v.Value) + if err != nil { + log.Error(err) + continue + } + + if !has { + continue + } + + err = combo.Delete(v.Value) + if err != nil { + log.Error(err) + } + + err = db.Block(v.Value) + if err != nil { + log.Error(err) + } + } } diff --git a/db/db.go b/db/db.go index b33ad5e..c8f46d0 100644 --- a/db/db.go +++ b/db/db.go @@ -89,7 +89,10 @@ func (s *SQL) HasBlob(hash string) (bool, error) { return false, errors.Err("not connected") } - query := "SELECT EXISTS(SELECT 1 FROM blob_ WHERE hash = ? AND is_stored = ?)" + query := `SELECT EXISTS(SELECT 1 + FROM blob_ b + LEFT JOIN blocked bl ON b.hash = bl.hash + WHERE b.hash = ? AND b.is_stored = ? AND bl.hash IS NULL)` args := []interface{}{hash, true} logQuery(query, args...) @@ -158,6 +161,32 @@ func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) { return exists, nil } +// Delete will remove the blob from the db +func (s *SQL) Delete(hash string) error { + args := []interface{}{hash} + + query := "DELETE FROM stream WHERE sd_hash = ?" + logQuery(query, args...) + _, err := s.conn.Exec(query, args...) + if err != nil { + return errors.Err(err) + } + + query = "DELETE FROM blob_ WHERE hash = ?" + logQuery(query, args...) + _, err = s.conn.Exec(query, args...) + return errors.Err(err) +} + +// Block will mark a blob as blocked +func (s *SQL) Block(hash string) error { + query := "INSERT IGNORE INTO blocked SET hash = ?" + args := []interface{}{hash} + logQuery(query, args...) + _, err := s.conn.Exec(query, args...) + return errors.Err(err) +} + // MissingBlobsForKnownStream returns missing blobs for an existing stream // WARNING: if the stream does NOT exist, no blob hashes will be returned, which looks // like no blobs are missing @@ -396,6 +425,11 @@ CREATE TABLE stream_blob ( FOREIGN KEY (blob_hash) REFERENCES blob_ (hash) ON DELETE CASCADE ON UPDATE CASCADE ); +CREATE TABLE blocked ( + hash char(96) NOT NULL, + PRIMARY KEY (hash) +); + could add UNIQUE KEY (stream_hash, num) to stream_blob ... */ diff --git a/reflector/blocklist.go b/reflector/blocklist.go index 0a2d2ee..f1dbdbc 100644 --- a/reflector/blocklist.go +++ b/reflector/blocklist.go @@ -25,9 +25,7 @@ type blockListResponse struct { } `json:"data"` } -func BlockedSdHashes() (map[string]string, error) { - blocked := make(map[string]string) - +func BlockedSdHashes() (map[string]ValueResp, error) { resp, err := http.Get(blocklistURL) if err != nil { return nil, errors.Err(err) @@ -43,70 +41,86 @@ func BlockedSdHashes() (map[string]string, error) { return nil, errors.Prefix("list_blocked API call", r.Error) } - for _, outpoint := range r.Data.Outpoints { - sdHash, err := sdHashForOutpoint(outpoint) - if err != nil { - blocked[outpoint] = err.Error() - } else { - blocked[outpoint] = sdHash - } - } - - return blocked, nil + return sdHashesForOutpoints(r.Data.Outpoints) } +type ValueResp struct { + Value string + Err error +} + +// sdHashForOutpoint queries wallet server for the sd hash in a given outpoint func sdHashForOutpoint(outpoint string) (string, error) { - val, err := valueForOutpoint(outpoint) + vals, err := sdHashesForOutpoints([]string{outpoint}) if err != nil { return "", err } - return sdHashForValue(val) + val, ok := vals[outpoint] + if !ok { + return "", errors.Err("outpoint not in response") + } + + return val.Value, val.Err } -// decodeValue decodes a protobuf-encoded claim and returns the sd hash -func sdHashForValue(value []byte) (string, error) { - claim := &types.Claim{} - err := proto.Unmarshal(value, claim) - if err != nil { - return "", errors.Err(err) - } - - if claim.GetStream().GetSource().GetSourceType() != types.Source_lbry_sd_hash { - return "", errors.Err("source is nil or source type is not lbry_sd_hash") - } - - return hex.EncodeToString(claim.GetStream().GetSource().GetSource()), nil -} - -// valueForOutpoint queries wallet server for the value of the claim at the given outpoint -func valueForOutpoint(outpoint string) ([]byte, error) { - parts := strings.Split(outpoint, ":") - if len(parts) != 2 { - return nil, errors.Err("invalid outpoint format") - } - - nout, err := strconv.Atoi(parts[1]) - if err != nil { - return nil, errors.Prefix("invalid nout", err) - } +// sdHashesForOutpoints queries wallet server for the sd hashes in a given outpoints +func sdHashesForOutpoints(outpoints []string) (map[string]ValueResp, error) { + values := make(map[string]ValueResp) node := wallet.NewNode() - err = node.ConnectTCP("victor.lbry.tech:50001") + err := node.ConnectTCP("victor.lbry.tech:50001") if err != nil { return nil, err } - resp, err := node.GetClaimsInTx(parts[0]) - if err != nil { - return nil, err - } - - for _, tx := range resp.Result { - if tx.Nout == nout { - return hex.DecodeString(tx.Value) + for _, outpoint := range outpoints { + parts := strings.Split(outpoint, ":") + if len(parts) != 2 { + values[outpoint] = ValueResp{Err: errors.Err("invalid outpoint format")} + continue } + + nout, err := strconv.Atoi(parts[1]) + if err != nil { + values[outpoint] = ValueResp{Err: errors.Prefix("invalid nout", err)} + continue + } + + resp, err := node.GetClaimsInTx(parts[0]) + if err != nil { + values[outpoint] = ValueResp{Err: err} + continue + } + + var value []byte + for _, tx := range resp.Result { + if tx.Nout != nout { + continue + } + + value, err = hex.DecodeString(tx.Value) + break + } + if err != nil { + values[outpoint] = ValueResp{Err: err} + continue + } + + claim := &types.Claim{} + err = proto.Unmarshal(value, claim) + if err != nil { + values[outpoint] = ValueResp{Err: err} + continue + } + + if claim.GetStream().GetSource().GetSourceType() != types.Source_lbry_sd_hash { + values[outpoint] = ValueResp{Err: errors.Err("source is nil or source type is not lbry_sd_hash")} + continue + } + + values[outpoint] = ValueResp{Value: hex.EncodeToString(claim.GetStream().GetSource().GetSource())} } - return nil, errors.Err("outpoint not found") + return values, nil } diff --git a/store/dbbacked.go b/store/dbbacked.go index 52d8d5e..b5d8442 100644 --- a/store/dbbacked.go +++ b/store/dbbacked.go @@ -58,6 +58,15 @@ func (d *DBBackedS3Store) PutSD(hash string, blob []byte) error { return d.db.AddSDBlob(hash, len(blob), blobContents) } +func (d *DBBackedS3Store) Delete(hash string) error { + err := d.s3.Delete(hash) + if err != nil { + return err + } + + return d.db.Delete(hash) +} + // MissingBlobsForKnownStream returns missing blobs for an existing stream // WARNING: if the stream does NOT exist, no blob hashes will be returned, which looks // like no blobs are missing diff --git a/store/file.go b/store/file.go index 0e06beb..6c49573 100644 --- a/store/file.go +++ b/store/file.go @@ -95,3 +95,21 @@ func (f *FileBlobStore) Put(hash string, blob []byte) error { func (f *FileBlobStore) PutSD(hash string, blob []byte) error { return f.Put(hash, blob) } + +// Delete deletes the blob from the store +func (f *FileBlobStore) Delete(hash string) error { + err := f.initOnce() + if err != nil { + return err + } + + has, err := f.Has(hash) + if err != nil { + return err + } + if !has { + return nil + } + + return os.Remove(f.path(hash)) +} diff --git a/store/memory.go b/store/memory.go index 7757bd0..f9d845a 100644 --- a/store/memory.go +++ b/store/memory.go @@ -41,3 +41,9 @@ func (m *MemoryBlobStore) Put(hash string, blob []byte) error { func (m *MemoryBlobStore) PutSD(hash string, blob []byte) error { return m.Put(hash, blob) } + +// Delete deletes the blob from the store +func (m *MemoryBlobStore) Delete(hash string) error { + delete(m.blobs, hash) + return nil +} diff --git a/store/s3.go b/store/s3.go index abec192..0f6f9b5 100644 --- a/store/s3.go +++ b/store/s3.go @@ -133,3 +133,19 @@ func (s *S3BlobStore) PutSD(hash string, blob []byte) error { //Todo - handle missing stream for consistency return s.Put(hash, blob) } + +func (s *S3BlobStore) Delete(hash string) error { + err := s.initOnce() + if err != nil { + return err + } + + log.Debugf("Deleting %s from S3", hash[:8]) + + _, err = s3.New(s.session).DeleteObject(&s3.DeleteObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(hash), + }) + + return err +} diff --git a/store/store.go b/store/store.go index 580ae7b..5c879cf 100644 --- a/store/store.go +++ b/store/store.go @@ -8,6 +8,7 @@ type BlobStore interface { Get(string) ([]byte, error) Put(string, []byte) error PutSD(string, []byte) error + Delete(string) error } //ErrBlobNotFound is a standard error when a blob is not found in the store. diff --git a/wallet/tcp.go b/wallet/tcp.go index 0fc06d7..4415761 100644 --- a/wallet/tcp.go +++ b/wallet/tcp.go @@ -5,8 +5,9 @@ package wallet import ( "bufio" "crypto/tls" - "log" "net" + + log "github.com/sirupsen/logrus" ) type TCPTransport struct { @@ -44,7 +45,7 @@ func NewSSLTransport(addr string, config *tls.Config) (*TCPTransport, error) { } func (t *TCPTransport) SendMessage(body []byte) error { - log.Printf("%s <- %s", t.conn.RemoteAddr(), body) + log.Debugf("%s <- %s", t.conn.RemoteAddr(), body) _, err := t.conn.Write(body) return err } @@ -58,10 +59,10 @@ func (t *TCPTransport) listen() { line, err := reader.ReadBytes(delim) if err != nil { t.errors <- err - log.Printf("error %s", err) + log.Error(err) break } - log.Printf("%s -> %s", t.conn.RemoteAddr(), line) + log.Debugf("%s -> %s", t.conn.RemoteAddr(), line) t.responses <- line } }