diff --git a/Gopkg.lock b/Gopkg.lock index 2608733..acfaccb 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -3,7 +3,7 @@ [[projects]] branch = "master" - digest = "1:d64110a78451e373c5a952d2625323dbbe3bfe41c67f9652ea9668a6ceb4f645" + digest = "1:354e62d5acb9af138e13ec842f78a846d214a8d4a9f80e578698f1f1565e2ef8" name = "github.com/armon/go-metrics" packages = ["."] pruneopts = "" @@ -11,7 +11,7 @@ [[projects]] branch = "master" - digest = "1:c0b6dbbb56a745020d5939bdde2197241a1c6109f226cca57b16f46916be5e94" + digest = "1:b1b9627af19ee54d3ed6b069375f0e91baa4a25267cf3b684e80fdefb17f4719" name = "github.com/aws/aws-sdk-go" packages = [ "aws", @@ -53,14 +53,14 @@ [[projects]] branch = "master" - digest = "1:e250643be8120824556f39df6ef128fc2be490fc96e0cb64b1a8ecf96bbe3ce6" + digest = "1:56b87c786a316d6e9b9c7ba8f3dd64e3199ca3b33a55cc596c633023bed20264" name = "github.com/btcsuite/btcutil" packages = ["base58"] pruneopts = "" revision = "ab6388e0c60ae4834a1f57511e20c17b5f78be4b" [[projects]] - digest = "1:0a39ec8bf5629610a4bc7873a92039ee509246da3cef1a0ea60f1ed7e5f9cea5" + digest = "1:56c130d885a4aacae1dd9c7b71cfe39912c7ebc1ff7d2b46083c8812996dc43b" name = "github.com/davecgh/go-spew" packages = ["spew"] pruneopts = "" @@ -76,7 +76,7 @@ version = "v1.0.1" [[projects]] - digest = "1:d19c78214e03e297e9e30d2eb11892f731358b2951f2a5c7374658a156373e4c" + digest = "1:858b7fe7b0f4bc7ef9953926828f2816ea52d01a88d72d1c45bc8c108f23c356" name = "github.com/go-ini/ini" packages = ["."] pruneopts = "" @@ -85,12 +85,20 @@ [[projects]] branch = "master" - digest = "1:27b11ca1ad214ead955ff5480e8575e74c5df4e4dc02b04256a8d92131e1d3ad" + digest = "1:7150b23ba935d63f7b930d6c5ff20b52649ba623d62e0344971c822615fe57a3" name = "github.com/go-sql-driver/mysql" packages = ["."] pruneopts = "" revision = "99ff426eb706cffe92ff3d058e168b278cabf7c7" +[[projects]] + digest = "1:3dd078fda7500c341bc26cfbc6c6a34614f295a2457149fc1045cab767cbcf18" + name = "github.com/golang/protobuf" + packages = ["proto"] + pruneopts = "" + revision = "aa810b61a9c79d51363740d207bb46cf8e620ed5" + version = "v1.2.0" + [[projects]] digest = "1:dbbeb8ddb0be949954c8157ee8439c2adfd8dc1c9510eb44a6e58cb68c3dce28" name = "github.com/gorilla/context" @@ -108,7 +116,7 @@ version = "v1.6.2" [[projects]] - digest = "1:b4817bdb0da3054166de058111943ac58b315aead2fed4ee838625a4e304f74c" + digest = "1:91aaeb45b3c10cc9cb68d1450cbc8ac77d0a677cf34a8ed3d4ef4dacb9df8a50" name = "github.com/gorilla/rpc" packages = [ "v2", @@ -119,7 +127,7 @@ version = "v1.1.0" [[projects]] - digest = "1:fe1b4d4cbe48c0d55507c55f8663aa4185576cc58fa0c8be03bb8f19dfe17a9c" + digest = "1:64d212c703a2b94054be0ce470303286b177ad260b2f89a307e3d1bb6c073ef6" name = "github.com/gorilla/websocket" packages = ["."] pruneopts = "" @@ -144,7 +152,7 @@ [[projects]] branch = "master" - digest = "1:6a611e691e739173805cb54019b5c39bb9d46455526dff31e0e6fe3aaca52776" + digest = "1:6396690228a7560bf9247cb90e5ae9c797bd630b01e7d2acab430bbca9a1ecb3" name = "github.com/hashicorp/go-msgpack" packages = ["codec"] pruneopts = "" @@ -152,7 +160,7 @@ [[projects]] branch = "master" - digest = "1:deaebb4a98ca748bbad7eb653f3a675749500020823a086448ffcd7ba6b8b02d" + digest = "1:0b5ca7d18e4ded1e4dacbb37ff027cb40a80c0fed969e4e03cf7aff129bc1b44" name = "github.com/hashicorp/go-multierror" packages = ["."] pruneopts = "" @@ -160,7 +168,7 @@ [[projects]] branch = "master" - digest = "1:74f54e6ef2339f1de1e8c4b6674442118bd89e619b2fbd949ef2337330067994" + digest = "1:fd8ec2359315965bb6b84fd8e45cd5e8b58b80d8430dc96c8c5dfce46d30dbfc" name = "github.com/hashicorp/go-sockaddr" packages = ["."] pruneopts = "" @@ -175,7 +183,7 @@ revision = "0fb14efe8c47ae851c0034ed7a448854d3d34cf3" [[projects]] - digest = "1:d7ce65372f495908f80fc1f80f4dab5d763d9a1de544abd95aa719e4262d0dd5" + digest = "1:d2c45a353b65012162c7ca22c39b1b0bd06d39362fb375cf42b4e48e1104bfc6" name = "github.com/hashicorp/memberlist" packages = ["."] pruneopts = "" @@ -184,7 +192,7 @@ [[projects]] branch = "master" - digest = "1:4fd01ac3766b886665cfd335cc63819ec4e4538dcc1180c05d6edc089619962c" + digest = "1:7b8e4a60bfdacc2a79ba4a4ef21b2e86e98fb1dc99d816179e0b4aee75106051" name = "github.com/hashicorp/serf" packages = [ "coordinate", @@ -195,7 +203,7 @@ [[projects]] branch = "master" - digest = "1:0d37c42156531a07a84812a47c27610947b849710ffab6f62be6e98c5112c140" + digest = "1:32c49a8cbcb20989c4fc0825f792cb1ea079af601c11b5ed0a92a48433171db3" name = "github.com/inconshreveable/go-update" packages = [ ".", @@ -214,7 +222,7 @@ version = "v1.0" [[projects]] - digest = "1:4f767a115bc8e08576f6d38ab73c376fc1b1cd3bb5041171c9e8668cc7739b52" + digest = "1:6f49eae0c1e5dab1dafafee34b207aeb7a42303105960944828c2079b92fc88e" name = "github.com/jmespath/go-jmespath" packages = ["."] pruneopts = "" @@ -230,7 +238,7 @@ [[projects]] branch = "master" - digest = "1:132aae3fa5ad407b53f57cf6ebe274e414ec22d1700899023319931fa90b63e2" + digest = "1:7abe0d83ffb4c20fce461c314b1dc858cba274580cf1508f698b5f9fd9e1cde9" name = "github.com/johntdyer/slackrus" packages = ["."] pruneopts = "" @@ -246,7 +254,7 @@ [[projects]] branch = "master" - digest = "1:76fd7507e6014c598a01f1b3d558774d2a3114c438403bc98123870d2aecec62" + digest = "1:3e990fec1701f7cd3a301cb0fa824f65e35a37c224ff17f4d842720651d2f2fb" name = "github.com/lbryio/lbry.go" packages = [ "crypto", @@ -259,6 +267,14 @@ pruneopts = "" revision = "e2c96944fc485d3ab5e164da78f8439a94c5aa85" +[[projects]] + branch = "master" + digest = "1:dbd7fd543a88da4f81fbb849175b400d69b04f1f82de34c8c2efdc5626b80999" + name = "github.com/lbryio/types" + packages = ["go"] + pruneopts = "" + revision = "0a913ba650dd7d72e2a008b86dac117be3d5f075" + [[projects]] branch = "master" digest = "1:cabf2bf5e49edfe0c34cb9c6a256f2a99e6cc8c5e660855c8f3dafe1f81d5dcd" @@ -268,7 +284,7 @@ revision = "b7abd7672df533e627eddbf3a5a529786e8bda7f" [[projects]] - digest = "1:f0bad0fece0fb73c6ea249c18d8e80ffbe86be0457715b04463068f04686cf39" + digest = "1:4c8d8358c45ba11ab7bb15df749d4df8664ff1582daead28bae58cf8cbe49890" name = "github.com/miekg/dns" packages = ["."] pruneopts = "" @@ -276,7 +292,7 @@ version = "v1.0.8" [[projects]] - digest = "1:ba3b2eb0ae6fd3deac4386c02fe9d2279c9520738eb9db2f0667e74d5c7a0a61" + digest = "1:e6352ff4bd34c601567ad5e274837275f08e2a933e2688354cf5d44595c13ef9" name = "github.com/nlopes/slack" packages = ["."] pruneopts = "" @@ -285,7 +301,7 @@ [[projects]] branch = "master" - digest = "1:af967afd3cbc6b0145937f4dcab78bcd93e7b2f2b618fb2bcaf7069ad5c638fa" + digest = "1:d38c630298ac75e214f3caa5c240ea2923c7a089824d175ba4107d0650d56579" name = "github.com/phayes/freeport" packages = ["."] pruneopts = "" @@ -309,7 +325,7 @@ [[projects]] branch = "master" - digest = "1:1747c026a603e3c9f33f238e1d1390df2c8f48876b6bcb7a9c52c7b479e040f4" + digest = "1:56de39853758a4b6053a3f71e527305bbed11a0d876156e32e8cc7180d36198b" name = "github.com/sirupsen/logrus" packages = ["."] pruneopts = "" @@ -325,7 +341,7 @@ [[projects]] branch = "master" - digest = "1:b5212c335a490d958a6b1b5b48901b46e682b82fa9af3a238fa88df6eaa60873" + digest = "1:c8f6919ab9f140506fd4ad3f4a9c9c2af9ee7921e190af0c67b2fca2f903083c" name = "github.com/spf13/cobra" packages = ["."] pruneopts = "" @@ -341,7 +357,7 @@ [[projects]] branch = "master" - digest = "1:f05efcac20bea32e1fcefde9a3cbefb07e02053666c4a67681ad18c8efc682d3" + digest = "1:b7ef38166f9ee44c54ef992b2754950f73fa09daf30355bea7aa510f224c38a6" name = "github.com/uber-go/atomic" packages = ["."] pruneopts = "" @@ -349,7 +365,7 @@ [[projects]] branch = "master" - digest = "1:16b935c128f178647036048862a21e8bfd66d1e83fb19787a8b356bdcf0de899" + digest = "1:53c4b75f22ea7757dea07eae380ea42de547ae6865a5e3b41866754a8a8219c9" name = "golang.org/x/crypto" packages = [ "ed25519", @@ -363,7 +379,7 @@ [[projects]] branch = "master" - digest = "1:15f2fc8cc79d90b0d4d712f04bd3eb3a3856ff3dd6b610a1425113ead3501610" + digest = "1:9f170ebb5ac75debb7e958e0388545441cc77de4d131a0c170530e948f3e857e" name = "golang.org/x/net" packages = [ "bpf", @@ -378,7 +394,7 @@ [[projects]] branch = "master" - digest = "1:e8f649ecfae7835a0a27ef39fd2180f6d3c12bc422e2ae55cd611f0b283b3e6e" + digest = "1:309d0f514b3f0dd143089ff4ab91c894d3e3f7e771c89b59d4b015b955cbaa5c" name = "golang.org/x/sys" packages = [ "unix", @@ -396,7 +412,7 @@ revision = "fbb02b2291d28baffd63558aa44b4b56f178d650" [[projects]] - digest = "1:eede11c81b63c8f6fd06ef24ba0a640dc077196ec9b7a58ecde03c82eee2f151" + digest = "1:c1771ca6060335f9768dff6558108bc5ef6c58506821ad43377ee23ff059e472" name = "google.golang.org/appengine" packages = ["cloudsql"] pruneopts = "" @@ -404,7 +420,7 @@ version = "v1.1.0" [[projects]] - digest = "1:05eca53b271663de74078b5484b1995a8d56668a51434a698dc5d0863035d575" + digest = "1:f771bf87a3253de520c2af6fb6e75314dce0fedc0b30b208134fe502932bb15d" name = "gopkg.in/nullbio/null.v6" packages = ["convert"] pruneopts = "" @@ -423,6 +439,7 @@ "github.com/aws/aws-sdk-go/service/s3/s3manager", "github.com/davecgh/go-spew/spew", "github.com/go-sql-driver/mysql", + "github.com/golang/protobuf/proto", "github.com/gorilla/mux", "github.com/gorilla/rpc/v2", "github.com/gorilla/rpc/v2/json", @@ -435,6 +452,7 @@ "github.com/lbryio/lbry.go/querytools", "github.com/lbryio/lbry.go/stop", "github.com/lbryio/lbry.go/util", + "github.com/lbryio/types/go", "github.com/lyoshenka/bencode", "github.com/phayes/freeport", "github.com/sebdah/goldie", diff --git a/Gopkg.toml b/Gopkg.toml index c994789..e065e37 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -41,3 +41,7 @@ [[constraint]] branch = "master" name = "github.com/inconshreveable/go-update" + +[[constraint]] + branch = "master" + name = "github.com/lbryio/types" diff --git a/cmd/dht.go b/cmd/dht.go index 18078c7..28d79a0 100644 --- a/cmd/dht.go +++ b/cmd/dht.go @@ -1,7 +1,6 @@ package cmd import ( - "log" "net" "os" "os/signal" @@ -12,6 +11,7 @@ import ( "github.com/lbryio/reflector.go/dht" "github.com/lbryio/reflector.go/dht/bits" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) diff --git a/cmd/reflector.go b/cmd/reflector.go index d497c63..5f4305d 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -42,6 +42,7 @@ func reflectorCmd(cmd *cobra.Command, args []string) { reflectorServer.StatLogger = log.StandardLogger() reflectorServer.StatReportFrequency = 1 * time.Hour } + reflectorServer.EnableBlocklist = true err = reflectorServer.Start(":" + strconv.Itoa(reflector.DefaultPort)) if err != nil { diff --git a/cmd/test.go b/cmd/test.go new file mode 100644 index 0000000..1c0932b --- /dev/null +++ b/cmd/test.go @@ -0,0 +1,19 @@ +package cmd + +import ( + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +func init() { + var cmd = &cobra.Command{ + Use: "test", + Short: "Test things", + Run: testCmd, + } + rootCmd.AddCommand(cmd) +} + +func testCmd(cmd *cobra.Command, args []string) { + log.Println("test :-)") +} diff --git a/db/db.go b/db/db.go index b33ad5e..c47e6e1 100644 --- a/db/db.go +++ b/db/db.go @@ -74,7 +74,7 @@ func addBlob(tx *sql.Tx, hash string, length int, isStored bool) error { err := execTx(tx, "INSERT INTO blob_ (hash, is_stored, length) VALUES (?,?,?) ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))", - []interface{}{hash, isStored, length}, + hash, isStored, length, ) if err != nil { return errors.Err(err) @@ -85,21 +85,11 @@ func addBlob(tx *sql.Tx, hash string, length int, isStored bool) error { // HasBlob checks if the database contains the blob information. func (s *SQL) HasBlob(hash string) (bool, error) { - if s.conn == nil { - return false, errors.Err("not connected") + exists, err := s.HasBlobs([]string{hash}) + if err != nil { + return false, err } - - query := "SELECT EXISTS(SELECT 1 FROM blob_ WHERE hash = ? AND is_stored = ?)" - args := []interface{}{hash, true} - - logQuery(query, args...) - - row := s.conn.QueryRow(query, args...) - - exists := false - err := row.Scan(&exists) - - return exists, errors.Err(err) + return exists[hash], nil } // HasBlobs checks if the database contains the set of blobs and returns a bool map. @@ -130,34 +120,88 @@ func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) { logQuery(query, args...) - rows, err := s.conn.Query(query, args...) - if err != nil { - closeRows(rows) - return exists, err - } - - for rows.Next() { - err := rows.Scan(&hash) + err := func() error { + rows, err := s.conn.Query(query, args...) if err != nil { - closeRows(rows) - return exists, err + return errors.Err(err) } - exists[hash] = true - } + defer closeRows(rows) - err = rows.Err() + for rows.Next() { + err := rows.Scan(&hash) + if err != nil { + return errors.Err(err) + } + exists[hash] = true + } + + err = rows.Err() + if err != nil { + return errors.Err(err) + } + + doneIndex += len(batch) + return nil + }() if err != nil { - closeRows(rows) - return exists, err + return nil, err } - - closeRows(rows) - doneIndex += len(batch) } return exists, nil } +// Delete will remove the blob from the db +func (s *SQL) Delete(hash string) error { + return withTx(s.conn, func(tx *sql.Tx) error { + err := execTx(tx, "DELETE FROM stream WHERE sd_hash = ?", hash) + if err != nil { + return errors.Err(err) + } + + err = execTx(tx, "DELETE FROM blob_ WHERE hash = ?", hash) + return errors.Err(err) + }) +} + +// Block will mark a blob as blocked +func (s *SQL) Block(hash string) error { + query := "INSERT IGNORE INTO blocked SET hash = ?" + args := []interface{}{hash} + logQuery(query, args...) + _, err := s.conn.Exec(query, args...) + return errors.Err(err) +} + +// GetBlocked will return a list of blocked hashes +func (s *SQL) GetBlocked() (map[string]bool, error) { + query := "SELECT hash FROM blocked" + logQuery(query) + rows, err := s.conn.Query(query) + if err != nil { + return nil, errors.Err(err) + } + defer closeRows(rows) + + blocked := make(map[string]bool) + + var hash string + for rows.Next() { + err := rows.Scan(&hash) + if err != nil { + return nil, errors.Err(err) + } + blocked[hash] = true + } + + err = rows.Err() + if err != nil { + return nil, errors.Err(err) + } + + return blocked, nil +} + // MissingBlobsForKnownStream returns missing blobs for an existing stream // WARNING: if the stream does NOT exist, no blob hashes will be returned, which looks // like no blobs are missing @@ -219,7 +263,7 @@ func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob SdBlob) error { // insert stream err = execTx(tx, "INSERT IGNORE INTO stream (hash, sd_hash) VALUES (?,?)", - []interface{}{sdBlob.StreamHash, sdHash}, + sdBlob.StreamHash, sdHash, ) if err != nil { return errors.Err(err) @@ -239,7 +283,7 @@ func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob SdBlob) error { err = execTx(tx, "INSERT IGNORE INTO stream_blob (stream_hash, blob_hash, num) VALUES (?,?,?)", - []interface{}{sdBlob.StreamHash, contentBlob.BlobHash, contentBlob.BlobNum}, + sdBlob.StreamHash, contentBlob.BlobHash, contentBlob.BlobNum, ) if err != nil { return errors.Err(err) @@ -364,7 +408,7 @@ func closeRows(rows *sql.Rows) { } } -func execTx(tx *sql.Tx, query string, args []interface{}) error { +func execTx(tx *sql.Tx, query string, args ...interface{}) error { logQuery(query, args...) _, err := tx.Exec(query, args...) return errors.Err(err) @@ -396,6 +440,11 @@ CREATE TABLE stream_blob ( FOREIGN KEY (blob_hash) REFERENCES blob_ (hash) ON DELETE CASCADE ON UPDATE CASCADE ); +CREATE TABLE blocked ( + hash char(96) NOT NULL, + PRIMARY KEY (hash) +); + could add UNIQUE KEY (stream_hash, num) to stream_blob ... */ diff --git a/reflector/blocklist.go b/reflector/blocklist.go new file mode 100644 index 0000000..2e5f829 --- /dev/null +++ b/reflector/blocklist.go @@ -0,0 +1,149 @@ +package reflector + +import ( + "encoding/hex" + "encoding/json" + "net/http" + "strconv" + "strings" + "time" + + "github.com/lbryio/reflector.go/store" + "github.com/lbryio/reflector.go/wallet" + + "github.com/lbryio/lbry.go/errors" + types "github.com/lbryio/types/go" + + "github.com/golang/protobuf/proto" + log "github.com/sirupsen/logrus" +) + +const blocklistURL = "https://api.lbry.io/file/list_blocked" + +func (s *Server) enableBlocklist(b store.Blocklister) { + updateBlocklist(b) + t := time.NewTicker(12 * time.Hour) + for { + select { + case <-s.grp.Ch(): + return + case <-t.C: + updateBlocklist(b) + } + } +} + +func updateBlocklist(b store.Blocklister) { + values, err := blockedSdHashes() + if err != nil { + log.Error(err) + return + } + + for _, v := range values { + if v.Err != nil { + continue + } + + err = b.Block(v.Value) + if err != nil { + log.Error(err) + } + } +} + +func blockedSdHashes() (map[string]valOrErr, error) { + resp, err := http.Get(blocklistURL) + if err != nil { + return nil, errors.Err(err) + } + defer resp.Body.Close() + + var r struct { + Success bool `json:"success"` + Error string `json:"error"` + Data struct { + Outpoints []string `json:"outpoints"` + } `json:"data"` + } + + if err = json.NewDecoder(resp.Body).Decode(&r); err != nil { + return nil, errors.Err(err) + } + + if !r.Success { + return nil, errors.Prefix("list_blocked API call", r.Error) + } + + return sdHashesForOutpoints(r.Data.Outpoints) +} + +type valOrErr struct { + Value string + Err error +} + +// sdHashesForOutpoints queries wallet server for the sd hashes in a given outpoints +func sdHashesForOutpoints(outpoints []string) (map[string]valOrErr, error) { + values := make(map[string]valOrErr) + + node := wallet.NewNode() + err := node.Connect([]string{ + "victor.lbry.tech:50001", + //"lbryumx1.lbry.io:50001", // cant use real servers until victor pushes bugfix + //"lbryumx2.lbry.io:50001", + }, nil) + if err != nil { + return nil, err + } + + for _, outpoint := range outpoints { + parts := strings.Split(outpoint, ":") + if len(parts) != 2 { + values[outpoint] = valOrErr{Err: errors.Err("invalid outpoint format")} + continue + } + + nout, err := strconv.Atoi(parts[1]) + if err != nil { + values[outpoint] = valOrErr{Err: errors.Prefix("invalid nout", err)} + continue + } + + resp, err := node.GetClaimsInTx(parts[0]) + if err != nil { + values[outpoint] = valOrErr{Err: err} + continue + } + + var value []byte + for _, tx := range resp.Result { + if tx.Nout != nout { + continue + } + + value, err = hex.DecodeString(tx.Value) + break + } + if err != nil { + values[outpoint] = valOrErr{Err: err} + continue + } + + claim := &types.Claim{} + err = proto.Unmarshal(value, claim) + if err != nil { + values[outpoint] = valOrErr{Err: err} + continue + } + + if claim.GetStream().GetSource().GetSourceType() != types.Source_lbry_sd_hash { + values[outpoint] = valOrErr{Err: errors.Err("source is nil or source type is not lbry_sd_hash")} + continue + } + + values[outpoint] = valOrErr{Value: hex.EncodeToString(claim.GetStream().GetSource().GetSource())} + } + + return values, nil +} diff --git a/reflector/server.go b/reflector/server.go index d9ada28..0ddbb14 100644 --- a/reflector/server.go +++ b/reflector/server.go @@ -37,6 +37,8 @@ type Server struct { StatLogger *log.Logger // logger to log stats StatReportFrequency time.Duration // how often to log stats + EnableBlocklist bool // if true, blocklist checking and blob deletion will be enabled + store store.BlobStore grp *stop.Group stats *stats @@ -90,6 +92,19 @@ func (s *Server) Start(address string) error { s.stats.Start() } + if s.EnableBlocklist { + if b, ok := s.store.(store.Blocklister); ok { + s.grp.Add(1) + go func() { + s.enableBlocklist(b) + s.grp.Done() + }() + } else { + //s.Shutdown() + return errors.Err("blocklist is enabled but blob store does not support blocklisting") + } + } + return nil } @@ -174,39 +189,46 @@ func (s *Server) doError(conn net.Conn, err error) error { } func (s *Server) receiveBlob(conn net.Conn) error { - var err error - blobSize, blobHash, isSdBlob, err := s.readBlobRequest(conn) if err != nil { return err } - blobExists, err := s.store.Has(blobHash) - if err != nil { - return err + var wantsBlob bool + if bl, ok := s.store.(store.Blocklister); ok { + wantsBlob, err = bl.Wants(blobHash) + if err != nil { + return err + } + } else { + blobExists, err := s.store.Has(blobHash) + if err != nil { + return err + } + wantsBlob = !blobExists } var neededBlobs []string - if isSdBlob && blobExists { - if fsc, ok := s.store.(neededBlobChecker); ok { - neededBlobs, err = fsc.MissingBlobsForKnownStream(blobHash) + if isSdBlob && !wantsBlob { + if nbc, ok := s.store.(neededBlobChecker); ok { + neededBlobs, err = nbc.MissingBlobsForKnownStream(blobHash) if err != nil { return err } } else { // if we can't confirm that we have the full stream, we have to say that the sd blob is // missing. if we say we have it, they wont try to send any content blobs - blobExists = false + wantsBlob = true } } - err = s.sendBlobResponse(conn, blobExists, isSdBlob, neededBlobs) + err = s.sendBlobResponse(conn, wantsBlob, isSdBlob, neededBlobs) if err != nil { return err } - if blobExists { + if !wantsBlob { return nil } @@ -296,14 +318,14 @@ func (s *Server) readBlobRequest(conn net.Conn) (int, string, bool, error) { return blobSize, blobHash, isSdBlob, nil } -func (s *Server) sendBlobResponse(conn net.Conn, blobExists, isSdBlob bool, neededBlobs []string) error { +func (s *Server) sendBlobResponse(conn net.Conn, shouldSendBlob, isSdBlob bool, neededBlobs []string) error { var response []byte var err error if isSdBlob { - response, err = json.Marshal(sendSdBlobResponse{SendSdBlob: !blobExists, NeededBlobs: neededBlobs}) + response, err = json.Marshal(sendSdBlobResponse{SendSdBlob: shouldSendBlob, NeededBlobs: neededBlobs}) } else { - response, err = json.Marshal(sendBlobResponse{SendBlob: !blobExists}) + response, err = json.Marshal(sendBlobResponse{SendBlob: shouldSendBlob}) } if err != nil { return err diff --git a/reflector/server_test.go b/reflector/server_test.go index 5a28624..58ef6dc 100644 --- a/reflector/server_test.go +++ b/reflector/server_test.go @@ -2,7 +2,9 @@ package reflector import ( "crypto/rand" + "encoding/json" "io" + "sort" "strconv" "testing" "time" @@ -10,10 +12,6 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/lbryio/reflector.go/store" - "encoding/json" - - "sort" - "github.com/lbryio/reflector.go/dht/bits" "github.com/phayes/freeport" ) diff --git a/store/dbbacked.go b/store/dbbacked.go index 52d8d5e..b31c7aa 100644 --- a/store/dbbacked.go +++ b/store/dbbacked.go @@ -2,15 +2,19 @@ package store import ( "encoding/json" + "sync" "github.com/lbryio/lbry.go/errors" "github.com/lbryio/reflector.go/db" + log "github.com/sirupsen/logrus" ) // DBBackedS3Store is an instance of an S3 Store that is backed by a DB for what is stored. type DBBackedS3Store struct { - s3 *S3BlobStore - db *db.SQL + s3 *S3BlobStore + db *db.SQL + blockedMu sync.RWMutex + blocked map[string]bool } // NewDBBackedS3Store returns an initialized store pointer. @@ -58,9 +62,107 @@ func (d *DBBackedS3Store) PutSD(hash string, blob []byte) error { return d.db.AddSDBlob(hash, len(blob), blobContents) } +func (d *DBBackedS3Store) Delete(hash string) error { + err := d.s3.Delete(hash) + if err != nil { + return err + } + + return d.db.Delete(hash) +} + +// Block deletes the blob and prevents it from being uploaded in the future +func (d *DBBackedS3Store) Block(hash string) error { + if blocked, err := d.isBlocked(hash); blocked || err != nil { + return err + } + + log.Debugf("blocking %s", hash) + + err := d.db.Block(hash) + if err != nil { + return err + } + + has, err := d.db.HasBlob(hash) + if err != nil { + return err + } + + if has { + err = d.s3.Delete(hash) + if err != nil { + return err + } + + err = d.db.Delete(hash) + if err != nil { + return err + } + } + + return d.markBlocked(hash) +} + +// Wants returns false if the hash exists or is blocked, true otherwise +func (d *DBBackedS3Store) Wants(hash string) (bool, error) { + has, err := d.Has(hash) + if has || err != nil { + return false, err + } + + blocked, err := d.isBlocked(hash) + return !blocked, err +} + // MissingBlobsForKnownStream returns missing blobs for an existing stream // WARNING: if the stream does NOT exist, no blob hashes will be returned, which looks // like no blobs are missing func (d *DBBackedS3Store) MissingBlobsForKnownStream(sdHash string) ([]string, error) { return d.db.MissingBlobsForKnownStream(sdHash) } + +func (d *DBBackedS3Store) markBlocked(hash string) error { + err := d.initBlocked() + if err != nil { + return err + } + + d.blockedMu.Lock() + defer d.blockedMu.Unlock() + + d.blocked[hash] = true + return nil +} + +func (d *DBBackedS3Store) isBlocked(hash string) (bool, error) { + err := d.initBlocked() + if err != nil { + return false, err + } + + d.blockedMu.RLock() + defer d.blockedMu.RUnlock() + + return d.blocked[hash], nil +} + +func (d *DBBackedS3Store) initBlocked() error { + // first check without blocking since this is the most likely scenario + if d.blocked != nil { + return nil + } + + d.blockedMu.Lock() + defer d.blockedMu.Unlock() + + // check again in case of race condition + if d.blocked != nil { + return nil + } + + var err error + d.blocked, err = d.db.GetBlocked() + + return err +} diff --git a/store/file.go b/store/file.go index 0e06beb..6c49573 100644 --- a/store/file.go +++ b/store/file.go @@ -95,3 +95,21 @@ func (f *FileBlobStore) Put(hash string, blob []byte) error { func (f *FileBlobStore) PutSD(hash string, blob []byte) error { return f.Put(hash, blob) } + +// Delete deletes the blob from the store +func (f *FileBlobStore) Delete(hash string) error { + err := f.initOnce() + if err != nil { + return err + } + + has, err := f.Has(hash) + if err != nil { + return err + } + if !has { + return nil + } + + return os.Remove(f.path(hash)) +} diff --git a/store/memory.go b/store/memory.go index 7757bd0..f9d845a 100644 --- a/store/memory.go +++ b/store/memory.go @@ -41,3 +41,9 @@ func (m *MemoryBlobStore) Put(hash string, blob []byte) error { func (m *MemoryBlobStore) PutSD(hash string, blob []byte) error { return m.Put(hash, blob) } + +// Delete deletes the blob from the store +func (m *MemoryBlobStore) Delete(hash string) error { + delete(m.blobs, hash) + return nil +} diff --git a/store/s3.go b/store/s3.go index abec192..0f6f9b5 100644 --- a/store/s3.go +++ b/store/s3.go @@ -133,3 +133,19 @@ func (s *S3BlobStore) PutSD(hash string, blob []byte) error { //Todo - handle missing stream for consistency return s.Put(hash, blob) } + +func (s *S3BlobStore) Delete(hash string) error { + err := s.initOnce() + if err != nil { + return err + } + + log.Debugf("Deleting %s from S3", hash[:8]) + + _, err = s3.New(s.session).DeleteObject(&s3.DeleteObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(hash), + }) + + return err +} diff --git a/store/store.go b/store/store.go index 580ae7b..e7b5c2d 100644 --- a/store/store.go +++ b/store/store.go @@ -4,10 +4,23 @@ import "github.com/lbryio/lbry.go/errors" // BlobStore is an interface with methods for consistently handling blob storage. type BlobStore interface { - Has(string) (bool, error) - Get(string) ([]byte, error) - Put(string, []byte) error - PutSD(string, []byte) error + // Does blob exist in the store + Has(hash string) (bool, error) + // Get the blob from the store + Get(hase string) ([]byte, error) + // Put the blob into the store + Put(hash string, blob []byte) error + // Put an SD blob into the store + PutSD(hash string, blob []byte) error + // Delete the blob from the store + Delete(hash string) error +} + +type Blocklister interface { + // Block deletes the blob and prevents it from being uploaded in the future + Block(hash string) error + // Wants returns false if the hash exists or is blocked, true otherwise + Wants(hash string) (bool, error) } //ErrBlobNotFound is a standard error when a blob is not found in the store. diff --git a/wallet/client.go b/wallet/client.go new file mode 100644 index 0000000..856dfcc --- /dev/null +++ b/wallet/client.go @@ -0,0 +1,43 @@ +package wallet + +// ServerVersion returns the server's version. +// https://electrumx.readthedocs.io/en/latest/protocol-methods.html#server-version +func (n *Node) ServerVersion() (string, error) { + resp := &struct { + Result []string `json:"result"` + }{} + err := n.request("server.version", []string{"reflector.go", ProtocolVersion}, resp) + + var v string + if len(resp.Result) >= 2 { + v = resp.Result[1] + } + + return v, err +} + +type GetClaimsInTxResp struct { + Jsonrpc string `json:"jsonrpc"` + ID int `json:"id"` + Result []struct { + Name string `json:"name"` + ClaimID string `json:"claim_id"` + Txid string `json:"txid"` + Nout int `json:"nout"` + Amount int `json:"amount"` + Depth int `json:"depth"` + Height int `json:"height"` + Value string `json:"value"` + ClaimSequence int `json:"claim_sequence"` + Address string `json:"address"` + Supports []interface{} `json:"supports"` // TODO: finish me + EffectiveAmount int `json:"effective_amount"` + ValidAtHeight int `json:"valid_at_height"` + } `json:"result"` +} + +func (n *Node) GetClaimsInTx(txid string) (*GetClaimsInTxResp, error) { + var resp GetClaimsInTxResp + err := n.request("blockchain.claimtrie.getclaimsintx", []string{txid}, &resp) + return &resp, err +} diff --git a/wallet/network.go b/wallet/network.go new file mode 100644 index 0000000..32e514c --- /dev/null +++ b/wallet/network.go @@ -0,0 +1,223 @@ +package wallet + +// copied from https://github.com/d4l3k/go-electrum + +import ( + "crypto/tls" + "encoding/json" + "errors" + "fmt" + "math/rand" + "net" + "sync" + "time" + + "github.com/lbryio/lbry.go/stop" + log "github.com/sirupsen/logrus" + "github.com/uber-go/atomic" +) + +const ( + ClientVersion = "0.0.1" + ProtocolVersion = "1.0" +) + +var ( + ErrNotImplemented = errors.New("not implemented") + ErrNodeConnected = errors.New("node already connected") + ErrConnectFailed = errors.New("failed to connect") +) + +type response struct { + Id uint32 `json:"id"` + Method string `json:"method"` + Error struct { + Code int `json:"code"` + Message string `json:"message"` + } `json:"error"` +} + +type request struct { + Id uint32 `json:"id"` + Method string `json:"method"` + Params []string `json:"params"` +} + +type Node struct { + transport *TCPTransport + nextId atomic.Uint32 + grp *stop.Group + + handlersMu *sync.RWMutex + handlers map[uint32]chan []byte + + pushHandlersMu *sync.RWMutex + pushHandlers map[string][]chan []byte +} + +// NewNode creates a new node. +func NewNode() *Node { + return &Node{ + handlers: make(map[uint32]chan []byte), + pushHandlers: make(map[string][]chan []byte), + handlersMu: &sync.RWMutex{}, + pushHandlersMu: &sync.RWMutex{}, + grp: stop.New(), + } +} + +// Connect creates a new connection to the specified address. +func (n *Node) Connect(addrs []string, config *tls.Config) error { + if n.transport != nil { + return ErrNodeConnected + } + + // shuffle addresses for load balancing + rand.Seed(time.Now().UnixNano()) + rand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] }) + + var err error + + for _, addr := range addrs { + n.transport, err = NewTransport(addr, config) + if err == nil { + break + } + if e, ok := err.(*net.OpError); ok && e.Err.Error() == "no such host" { + // net.errNoSuchHost is not exported, so we have to string-match + continue + } + return err + } + + if n.transport == nil { + return ErrConnectFailed + } + + log.Debugf("wallet connected to %s", n.transport.conn.RemoteAddr()) + + n.grp.Add(1) + go func() { + defer n.grp.Done() + <-n.grp.Ch() + n.transport.Shutdown() + }() + + n.grp.Add(1) + go func() { + defer n.grp.Done() + n.handleErrors() + }() + + n.grp.Add(1) + go func() { + defer n.grp.Done() + n.listen() + }() + + return nil +} + +func (n *Node) Shutdown() { + n.grp.StopAndWait() +} + +func (n *Node) handleErrors() { + for { + select { + case <-n.grp.Ch(): + return + case err := <-n.transport.Errors(): + n.err(err) + } + } +} + +// err handles errors produced by the foreign node. +func (n *Node) err(err error) { + // TODO: Better error handling. + log.Error(err) +} + +// listen processes messages from the server. +func (n *Node) listen() { + for { + select { + case <-n.grp.Ch(): + return + case bytes := <-n.transport.Responses(): + msg := &response{} + if err := json.Unmarshal(bytes, msg); err != nil { + n.err(err) + continue + } + if len(msg.Error.Message) > 0 { + n.err(fmt.Errorf("error from server: %#v", msg.Error.Message)) + continue + } + if len(msg.Method) > 0 { + n.pushHandlersMu.RLock() + handlers := n.pushHandlers[msg.Method] + n.pushHandlersMu.RUnlock() + + for _, handler := range handlers { + select { + case handler <- bytes: + default: + } + } + } + + n.handlersMu.RLock() + c, ok := n.handlers[msg.Id] + n.handlersMu.RUnlock() + if ok { + c <- bytes + } + } + } +} + +// listenPush returns a channel of messages matching the method. +func (n *Node) listenPush(method string) <-chan []byte { + c := make(chan []byte, 1) + n.pushHandlersMu.Lock() + defer n.pushHandlersMu.Unlock() + n.pushHandlers[method] = append(n.pushHandlers[method], c) + return c +} + +// request makes a request to the server and unmarshals the response into v. +func (n *Node) request(method string, params []string, v interface{}) error { + msg := request{ + Id: n.nextId.Load(), + Method: method, + Params: params, + } + n.nextId.Inc() + + bytes, err := json.Marshal(msg) + if err != nil { + return err + } + bytes = append(bytes, delimiter) + + err = n.transport.Send(bytes) + if err != nil { + return err + } + + c := make(chan []byte, 1) + + n.handlersMu.Lock() + n.handlers[msg.Id] = c + n.handlersMu.Unlock() + + resp := <-c + + n.handlersMu.Lock() + delete(n.handlers, msg.Id) + n.handlersMu.Unlock() + + return json.Unmarshal(resp, v) +} diff --git a/wallet/transport.go b/wallet/transport.go new file mode 100644 index 0000000..d817742 --- /dev/null +++ b/wallet/transport.go @@ -0,0 +1,101 @@ +package wallet + +// copied from https://github.com/d4l3k/go-electrum + +import ( + "bufio" + "crypto/tls" + "net" + "time" + + "github.com/lbryio/lbry.go/stop" + log "github.com/sirupsen/logrus" +) + +type TCPTransport struct { + conn net.Conn + responses chan []byte + errors chan error + grp *stop.Group +} + +func NewTransport(addr string, config *tls.Config) (*TCPTransport, error) { + var conn net.Conn + var err error + + timeout := 5 * time.Second + if config != nil { + conn, err = tls.DialWithDialer(&net.Dialer{Timeout: timeout}, "tcp", addr, config) + } else { + conn, err = net.DialTimeout("tcp", addr, timeout) + } + if err != nil { + return nil, err + } + + t := &TCPTransport{ + conn: conn, + responses: make(chan []byte), + errors: make(chan error), + grp: stop.New(), + } + + t.grp.Add(1) + go func() { + defer t.grp.Done() + <-t.grp.Ch() + t.close() + }() + + t.grp.Add(1) + go func() { + t.grp.Done() + t.listen() + }() + + return t, nil +} + +const delimiter = byte('\n') + +func (t *TCPTransport) listen() { + reader := bufio.NewReader(t.conn) + for { + line, err := reader.ReadBytes(delimiter) + if err != nil { + t.error(err) + return + } + + log.Debugf("%s -> %s", t.conn.RemoteAddr(), line) + + t.responses <- line + } +} + +func (t *TCPTransport) Send(body []byte) error { + log.Debugf("%s <- %s", t.conn.RemoteAddr(), body) + _, err := t.conn.Write(body) + return err +} + +func (t *TCPTransport) error(err error) { + select { + case t.errors <- err: + default: + } +} + +func (t *TCPTransport) Responses() <-chan []byte { return t.responses } +func (t *TCPTransport) Errors() <-chan error { return t.errors } + +func (t *TCPTransport) Shutdown() { + t.grp.StopAndWait() +} + +func (t *TCPTransport) close() { + err := t.conn.Close() + if err != nil { + t.error(err) + } +}