From 61e83d86de7023dd50876b76175f15b3a2ec4947 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Thu, 20 Sep 2018 11:24:36 -0400 Subject: [PATCH] actually done now --- Gopkg.lock | 74 +++++++++++------- Gopkg.toml | 4 + cmd/dht.go | 2 +- cmd/reflector.go | 1 + cmd/test.go | 44 +---------- db/db.go | 89 +++++++++++++--------- reflector/blocklist.go | 71 ++++++++++++------ reflector/server.go | 50 +++++++++---- store/dbbacked.go | 97 +++++++++++++++++++++++- store/store.go | 22 ++++-- wallet/network.go | 165 +++++++++++++++++++++++++---------------- wallet/tcp.go | 75 ------------------- wallet/transport.go | 101 +++++++++++++++++++++++++ 13 files changed, 503 insertions(+), 292 deletions(-) delete mode 100644 wallet/tcp.go create mode 100644 wallet/transport.go diff --git a/Gopkg.lock b/Gopkg.lock index 2608733..acfaccb 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -3,7 +3,7 @@ [[projects]] branch = "master" - digest = "1:d64110a78451e373c5a952d2625323dbbe3bfe41c67f9652ea9668a6ceb4f645" + digest = "1:354e62d5acb9af138e13ec842f78a846d214a8d4a9f80e578698f1f1565e2ef8" name = "github.com/armon/go-metrics" packages = ["."] pruneopts = "" @@ -11,7 +11,7 @@ [[projects]] branch = "master" - digest = "1:c0b6dbbb56a745020d5939bdde2197241a1c6109f226cca57b16f46916be5e94" + digest = "1:b1b9627af19ee54d3ed6b069375f0e91baa4a25267cf3b684e80fdefb17f4719" name = "github.com/aws/aws-sdk-go" packages = [ "aws", @@ -53,14 +53,14 @@ [[projects]] branch = "master" - digest = "1:e250643be8120824556f39df6ef128fc2be490fc96e0cb64b1a8ecf96bbe3ce6" + digest = "1:56b87c786a316d6e9b9c7ba8f3dd64e3199ca3b33a55cc596c633023bed20264" name = "github.com/btcsuite/btcutil" packages = ["base58"] pruneopts = "" revision = "ab6388e0c60ae4834a1f57511e20c17b5f78be4b" [[projects]] - digest = "1:0a39ec8bf5629610a4bc7873a92039ee509246da3cef1a0ea60f1ed7e5f9cea5" + digest = "1:56c130d885a4aacae1dd9c7b71cfe39912c7ebc1ff7d2b46083c8812996dc43b" name = "github.com/davecgh/go-spew" packages = ["spew"] pruneopts = "" @@ -76,7 +76,7 @@ version = "v1.0.1" [[projects]] - digest = "1:d19c78214e03e297e9e30d2eb11892f731358b2951f2a5c7374658a156373e4c" + digest = "1:858b7fe7b0f4bc7ef9953926828f2816ea52d01a88d72d1c45bc8c108f23c356" name = "github.com/go-ini/ini" packages = ["."] pruneopts = "" @@ -85,12 +85,20 @@ [[projects]] branch = "master" - digest = "1:27b11ca1ad214ead955ff5480e8575e74c5df4e4dc02b04256a8d92131e1d3ad" + digest = "1:7150b23ba935d63f7b930d6c5ff20b52649ba623d62e0344971c822615fe57a3" name = "github.com/go-sql-driver/mysql" packages = ["."] pruneopts = "" revision = "99ff426eb706cffe92ff3d058e168b278cabf7c7" +[[projects]] + digest = "1:3dd078fda7500c341bc26cfbc6c6a34614f295a2457149fc1045cab767cbcf18" + name = "github.com/golang/protobuf" + packages = ["proto"] + pruneopts = "" + revision = "aa810b61a9c79d51363740d207bb46cf8e620ed5" + version = "v1.2.0" + [[projects]] digest = "1:dbbeb8ddb0be949954c8157ee8439c2adfd8dc1c9510eb44a6e58cb68c3dce28" name = "github.com/gorilla/context" @@ -108,7 +116,7 @@ version = "v1.6.2" [[projects]] - digest = "1:b4817bdb0da3054166de058111943ac58b315aead2fed4ee838625a4e304f74c" + digest = "1:91aaeb45b3c10cc9cb68d1450cbc8ac77d0a677cf34a8ed3d4ef4dacb9df8a50" name = "github.com/gorilla/rpc" packages = [ "v2", @@ -119,7 +127,7 @@ version = "v1.1.0" [[projects]] - digest = "1:fe1b4d4cbe48c0d55507c55f8663aa4185576cc58fa0c8be03bb8f19dfe17a9c" + digest = "1:64d212c703a2b94054be0ce470303286b177ad260b2f89a307e3d1bb6c073ef6" name = "github.com/gorilla/websocket" packages = ["."] pruneopts = "" @@ -144,7 +152,7 @@ [[projects]] branch = "master" - digest = "1:6a611e691e739173805cb54019b5c39bb9d46455526dff31e0e6fe3aaca52776" + digest = "1:6396690228a7560bf9247cb90e5ae9c797bd630b01e7d2acab430bbca9a1ecb3" name = "github.com/hashicorp/go-msgpack" packages = ["codec"] pruneopts = "" @@ -152,7 +160,7 @@ [[projects]] branch = "master" - digest = "1:deaebb4a98ca748bbad7eb653f3a675749500020823a086448ffcd7ba6b8b02d" + digest = "1:0b5ca7d18e4ded1e4dacbb37ff027cb40a80c0fed969e4e03cf7aff129bc1b44" name = "github.com/hashicorp/go-multierror" packages = ["."] pruneopts = "" @@ -160,7 +168,7 @@ [[projects]] branch = "master" - digest = "1:74f54e6ef2339f1de1e8c4b6674442118bd89e619b2fbd949ef2337330067994" + digest = "1:fd8ec2359315965bb6b84fd8e45cd5e8b58b80d8430dc96c8c5dfce46d30dbfc" name = "github.com/hashicorp/go-sockaddr" packages = ["."] pruneopts = "" @@ -175,7 +183,7 @@ revision = "0fb14efe8c47ae851c0034ed7a448854d3d34cf3" [[projects]] - digest = "1:d7ce65372f495908f80fc1f80f4dab5d763d9a1de544abd95aa719e4262d0dd5" + digest = "1:d2c45a353b65012162c7ca22c39b1b0bd06d39362fb375cf42b4e48e1104bfc6" name = "github.com/hashicorp/memberlist" packages = ["."] pruneopts = "" @@ -184,7 +192,7 @@ [[projects]] branch = "master" - digest = "1:4fd01ac3766b886665cfd335cc63819ec4e4538dcc1180c05d6edc089619962c" + digest = "1:7b8e4a60bfdacc2a79ba4a4ef21b2e86e98fb1dc99d816179e0b4aee75106051" name = "github.com/hashicorp/serf" packages = [ "coordinate", @@ -195,7 +203,7 @@ [[projects]] branch = "master" - digest = "1:0d37c42156531a07a84812a47c27610947b849710ffab6f62be6e98c5112c140" + digest = "1:32c49a8cbcb20989c4fc0825f792cb1ea079af601c11b5ed0a92a48433171db3" name = "github.com/inconshreveable/go-update" packages = [ ".", @@ -214,7 +222,7 @@ version = "v1.0" [[projects]] - digest = "1:4f767a115bc8e08576f6d38ab73c376fc1b1cd3bb5041171c9e8668cc7739b52" + digest = "1:6f49eae0c1e5dab1dafafee34b207aeb7a42303105960944828c2079b92fc88e" name = "github.com/jmespath/go-jmespath" packages = ["."] pruneopts = "" @@ -230,7 +238,7 @@ [[projects]] branch = "master" - digest = "1:132aae3fa5ad407b53f57cf6ebe274e414ec22d1700899023319931fa90b63e2" + digest = "1:7abe0d83ffb4c20fce461c314b1dc858cba274580cf1508f698b5f9fd9e1cde9" name = "github.com/johntdyer/slackrus" packages = ["."] pruneopts = "" @@ -246,7 +254,7 @@ [[projects]] branch = "master" - digest = "1:76fd7507e6014c598a01f1b3d558774d2a3114c438403bc98123870d2aecec62" + digest = "1:3e990fec1701f7cd3a301cb0fa824f65e35a37c224ff17f4d842720651d2f2fb" name = "github.com/lbryio/lbry.go" packages = [ "crypto", @@ -259,6 +267,14 @@ pruneopts = "" revision = "e2c96944fc485d3ab5e164da78f8439a94c5aa85" +[[projects]] + branch = "master" + digest = "1:dbd7fd543a88da4f81fbb849175b400d69b04f1f82de34c8c2efdc5626b80999" + name = "github.com/lbryio/types" + packages = ["go"] + pruneopts = "" + revision = "0a913ba650dd7d72e2a008b86dac117be3d5f075" + [[projects]] branch = "master" digest = "1:cabf2bf5e49edfe0c34cb9c6a256f2a99e6cc8c5e660855c8f3dafe1f81d5dcd" @@ -268,7 +284,7 @@ revision = "b7abd7672df533e627eddbf3a5a529786e8bda7f" [[projects]] - digest = "1:f0bad0fece0fb73c6ea249c18d8e80ffbe86be0457715b04463068f04686cf39" + digest = "1:4c8d8358c45ba11ab7bb15df749d4df8664ff1582daead28bae58cf8cbe49890" name = "github.com/miekg/dns" packages = ["."] pruneopts = "" @@ -276,7 +292,7 @@ version = "v1.0.8" [[projects]] - digest = "1:ba3b2eb0ae6fd3deac4386c02fe9d2279c9520738eb9db2f0667e74d5c7a0a61" + digest = "1:e6352ff4bd34c601567ad5e274837275f08e2a933e2688354cf5d44595c13ef9" name = "github.com/nlopes/slack" packages = ["."] pruneopts = "" @@ -285,7 +301,7 @@ [[projects]] branch = "master" - digest = "1:af967afd3cbc6b0145937f4dcab78bcd93e7b2f2b618fb2bcaf7069ad5c638fa" + digest = "1:d38c630298ac75e214f3caa5c240ea2923c7a089824d175ba4107d0650d56579" name = "github.com/phayes/freeport" packages = ["."] pruneopts = "" @@ -309,7 +325,7 @@ [[projects]] branch = "master" - digest = "1:1747c026a603e3c9f33f238e1d1390df2c8f48876b6bcb7a9c52c7b479e040f4" + digest = "1:56de39853758a4b6053a3f71e527305bbed11a0d876156e32e8cc7180d36198b" name = "github.com/sirupsen/logrus" packages = ["."] pruneopts = "" @@ -325,7 +341,7 @@ [[projects]] branch = "master" - digest = "1:b5212c335a490d958a6b1b5b48901b46e682b82fa9af3a238fa88df6eaa60873" + digest = "1:c8f6919ab9f140506fd4ad3f4a9c9c2af9ee7921e190af0c67b2fca2f903083c" name = "github.com/spf13/cobra" packages = ["."] pruneopts = "" @@ -341,7 +357,7 @@ [[projects]] branch = "master" - digest = "1:f05efcac20bea32e1fcefde9a3cbefb07e02053666c4a67681ad18c8efc682d3" + digest = "1:b7ef38166f9ee44c54ef992b2754950f73fa09daf30355bea7aa510f224c38a6" name = "github.com/uber-go/atomic" packages = ["."] pruneopts = "" @@ -349,7 +365,7 @@ [[projects]] branch = "master" - digest = "1:16b935c128f178647036048862a21e8bfd66d1e83fb19787a8b356bdcf0de899" + digest = "1:53c4b75f22ea7757dea07eae380ea42de547ae6865a5e3b41866754a8a8219c9" name = "golang.org/x/crypto" packages = [ "ed25519", @@ -363,7 +379,7 @@ [[projects]] branch = "master" - digest = "1:15f2fc8cc79d90b0d4d712f04bd3eb3a3856ff3dd6b610a1425113ead3501610" + digest = "1:9f170ebb5ac75debb7e958e0388545441cc77de4d131a0c170530e948f3e857e" name = "golang.org/x/net" packages = [ "bpf", @@ -378,7 +394,7 @@ [[projects]] branch = "master" - digest = "1:e8f649ecfae7835a0a27ef39fd2180f6d3c12bc422e2ae55cd611f0b283b3e6e" + digest = "1:309d0f514b3f0dd143089ff4ab91c894d3e3f7e771c89b59d4b015b955cbaa5c" name = "golang.org/x/sys" packages = [ "unix", @@ -396,7 +412,7 @@ revision = "fbb02b2291d28baffd63558aa44b4b56f178d650" [[projects]] - digest = "1:eede11c81b63c8f6fd06ef24ba0a640dc077196ec9b7a58ecde03c82eee2f151" + digest = "1:c1771ca6060335f9768dff6558108bc5ef6c58506821ad43377ee23ff059e472" name = "google.golang.org/appengine" packages = ["cloudsql"] pruneopts = "" @@ -404,7 +420,7 @@ version = "v1.1.0" [[projects]] - digest = "1:05eca53b271663de74078b5484b1995a8d56668a51434a698dc5d0863035d575" + digest = "1:f771bf87a3253de520c2af6fb6e75314dce0fedc0b30b208134fe502932bb15d" name = "gopkg.in/nullbio/null.v6" packages = ["convert"] pruneopts = "" @@ -423,6 +439,7 @@ "github.com/aws/aws-sdk-go/service/s3/s3manager", "github.com/davecgh/go-spew/spew", "github.com/go-sql-driver/mysql", + "github.com/golang/protobuf/proto", "github.com/gorilla/mux", "github.com/gorilla/rpc/v2", "github.com/gorilla/rpc/v2/json", @@ -435,6 +452,7 @@ "github.com/lbryio/lbry.go/querytools", "github.com/lbryio/lbry.go/stop", "github.com/lbryio/lbry.go/util", + "github.com/lbryio/types/go", "github.com/lyoshenka/bencode", "github.com/phayes/freeport", "github.com/sebdah/goldie", diff --git a/Gopkg.toml b/Gopkg.toml index c994789..e065e37 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -41,3 +41,7 @@ [[constraint]] branch = "master" name = "github.com/inconshreveable/go-update" + +[[constraint]] + branch = "master" + name = "github.com/lbryio/types" diff --git a/cmd/dht.go b/cmd/dht.go index 18078c7..28d79a0 100644 --- a/cmd/dht.go +++ b/cmd/dht.go @@ -1,7 +1,6 @@ package cmd import ( - "log" "net" "os" "os/signal" @@ -12,6 +11,7 @@ import ( "github.com/lbryio/reflector.go/dht" "github.com/lbryio/reflector.go/dht/bits" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) diff --git a/cmd/reflector.go b/cmd/reflector.go index d497c63..5f4305d 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -42,6 +42,7 @@ func reflectorCmd(cmd *cobra.Command, args []string) { reflectorServer.StatLogger = log.StandardLogger() reflectorServer.StatReportFrequency = 1 * time.Hour } + reflectorServer.EnableBlocklist = true err = reflectorServer.Start(":" + strconv.Itoa(reflector.DefaultPort)) if err != nil { diff --git a/cmd/test.go b/cmd/test.go index fbb03d2..1c0932b 100644 --- a/cmd/test.go +++ b/cmd/test.go @@ -1,10 +1,6 @@ package cmd import ( - "github.com/lbryio/reflector.go/db" - "github.com/lbryio/reflector.go/reflector" - "github.com/lbryio/reflector.go/store" - log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) @@ -19,43 +15,5 @@ func init() { } func testCmd(cmd *cobra.Command, args []string) { - db := new(db.SQL) - err := db.Connect(globalConfig.DBConn) - if err != nil { - log.Fatal(err) - } - - s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) - combo := store.NewDBBackedS3Store(s3, db) - - values, err := reflector.BlockedSdHashes() - if err != nil { - log.Fatal(err) - } - - for _, v := range values { - if v.Err != nil { - continue - } - - has, err := db.HasBlob(v.Value) - if err != nil { - log.Error(err) - continue - } - - if !has { - continue - } - - err = combo.Delete(v.Value) - if err != nil { - log.Error(err) - } - - err = db.Block(v.Value) - if err != nil { - log.Error(err) - } - } + log.Println("test :-)") } diff --git a/db/db.go b/db/db.go index c8f46d0..aa2db28 100644 --- a/db/db.go +++ b/db/db.go @@ -85,24 +85,11 @@ func addBlob(tx *sql.Tx, hash string, length int, isStored bool) error { // HasBlob checks if the database contains the blob information. func (s *SQL) HasBlob(hash string) (bool, error) { - if s.conn == nil { - return false, errors.Err("not connected") + exists, err := s.HasBlobs([]string{hash}) + if err != nil { + return false, err } - - query := `SELECT EXISTS(SELECT 1 - FROM blob_ b - LEFT JOIN blocked bl ON b.hash = bl.hash - WHERE b.hash = ? AND b.is_stored = ? AND bl.hash IS NULL)` - args := []interface{}{hash, true} - - logQuery(query, args...) - - row := s.conn.QueryRow(query, args...) - - exists := false - err := row.Scan(&exists) - - return exists, errors.Err(err) + return exists[hash], nil } // HasBlobs checks if the database contains the set of blobs and returns a bool map. @@ -133,29 +120,32 @@ func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) { logQuery(query, args...) - rows, err := s.conn.Query(query, args...) - if err != nil { - closeRows(rows) - return exists, err - } - - for rows.Next() { - err := rows.Scan(&hash) + err := func() error { + rows, err := s.conn.Query(query, args...) if err != nil { - closeRows(rows) - return exists, err + return errors.Err(err) } - exists[hash] = true - } + defer closeRows(rows) - err = rows.Err() + for rows.Next() { + err := rows.Scan(&hash) + if err != nil { + return errors.Err(err) + } + exists[hash] = true + } + + err = rows.Err() + if err != nil { + return errors.Err(err) + } + + doneIndex += len(batch) + return nil + }() if err != nil { - closeRows(rows) - return exists, err + return nil, err } - - closeRows(rows) - doneIndex += len(batch) } return exists, nil @@ -187,6 +177,35 @@ func (s *SQL) Block(hash string) error { return errors.Err(err) } +// GetBlocked will return a list of blocked hashes +func (s *SQL) GetBlocked() (map[string]bool, error) { + query := "SELECT hash FROM blocked" + logQuery(query) + rows, err := s.conn.Query(query) + if err != nil { + return nil, errors.Err(err) + } + defer closeRows(rows) + + blocked := make(map[string]bool) + + var hash string + for rows.Next() { + err := rows.Scan(&hash) + if err != nil { + return nil, errors.Err(err) + } + blocked[hash] = true + } + + err = rows.Err() + if err != nil { + return nil, errors.Err(err) + } + + return blocked, nil +} + // MissingBlobsForKnownStream returns missing blobs for an existing stream // WARNING: if the stream does NOT exist, no blob hashes will be returned, which looks // like no blobs are missing diff --git a/reflector/blocklist.go b/reflector/blocklist.go index f1dbdbc..9712828 100644 --- a/reflector/blocklist.go +++ b/reflector/blocklist.go @@ -6,33 +6,67 @@ import ( "net/http" "strconv" "strings" + "time" + "github.com/lbryio/reflector.go/store" "github.com/lbryio/reflector.go/wallet" "github.com/lbryio/lbry.go/errors" types "github.com/lbryio/types/go" "github.com/golang/protobuf/proto" + log "github.com/sirupsen/logrus" ) const blocklistURL = "https://api.lbry.io/file/list_blocked" -type blockListResponse struct { - Success bool `json:"success"` - Error string `json:"error"` - Data struct { - Outpoints []string `json:"outpoints"` - } `json:"data"` +func (s *Server) enableBlocklist(b store.Blocklister) { + updateBlocklist(b) + t := time.NewTicker(12 * time.Hour) + for { + select { + case <-s.grp.Ch(): + return + case <-t.C: + updateBlocklist(b) + } + } } -func BlockedSdHashes() (map[string]ValueResp, error) { +func updateBlocklist(b store.Blocklister) { + values, err := blockedSdHashes() + if err != nil { + log.Error(err) + return + } + + for _, v := range values { + if v.Err != nil { + continue + } + + err = b.Block(v.Value) + if err != nil { + log.Error(err) + } + } +} + +func blockedSdHashes() (map[string]ValueResp, error) { resp, err := http.Get(blocklistURL) if err != nil { return nil, errors.Err(err) } defer resp.Body.Close() - var r blockListResponse + var r struct { + Success bool `json:"success"` + Error string `json:"error"` + Data struct { + Outpoints []string `json:"outpoints"` + } `json:"data"` + } + if err = json.NewDecoder(resp.Body).Decode(&r); err != nil { return nil, errors.Err(err) } @@ -49,27 +83,16 @@ type ValueResp struct { Err error } -// sdHashForOutpoint queries wallet server for the sd hash in a given outpoint -func sdHashForOutpoint(outpoint string) (string, error) { - vals, err := sdHashesForOutpoints([]string{outpoint}) - if err != nil { - return "", err - } - - val, ok := vals[outpoint] - if !ok { - return "", errors.Err("outpoint not in response") - } - - return val.Value, val.Err -} - // sdHashesForOutpoints queries wallet server for the sd hashes in a given outpoints func sdHashesForOutpoints(outpoints []string) (map[string]ValueResp, error) { values := make(map[string]ValueResp) node := wallet.NewNode() - err := node.ConnectTCP("victor.lbry.tech:50001") + err := node.Connect([]string{ + "victor.lbry.tech:50001", + //"lbryumx1.lbry.io:50001", // cant use real servers until victor pushes bugfix + //"lbryumx2.lbry.io:50001", + }, nil) if err != nil { return nil, err } diff --git a/reflector/server.go b/reflector/server.go index d9ada28..0ddbb14 100644 --- a/reflector/server.go +++ b/reflector/server.go @@ -37,6 +37,8 @@ type Server struct { StatLogger *log.Logger // logger to log stats StatReportFrequency time.Duration // how often to log stats + EnableBlocklist bool // if true, blocklist checking and blob deletion will be enabled + store store.BlobStore grp *stop.Group stats *stats @@ -90,6 +92,19 @@ func (s *Server) Start(address string) error { s.stats.Start() } + if s.EnableBlocklist { + if b, ok := s.store.(store.Blocklister); ok { + s.grp.Add(1) + go func() { + s.enableBlocklist(b) + s.grp.Done() + }() + } else { + //s.Shutdown() + return errors.Err("blocklist is enabled but blob store does not support blocklisting") + } + } + return nil } @@ -174,39 +189,46 @@ func (s *Server) doError(conn net.Conn, err error) error { } func (s *Server) receiveBlob(conn net.Conn) error { - var err error - blobSize, blobHash, isSdBlob, err := s.readBlobRequest(conn) if err != nil { return err } - blobExists, err := s.store.Has(blobHash) - if err != nil { - return err + var wantsBlob bool + if bl, ok := s.store.(store.Blocklister); ok { + wantsBlob, err = bl.Wants(blobHash) + if err != nil { + return err + } + } else { + blobExists, err := s.store.Has(blobHash) + if err != nil { + return err + } + wantsBlob = !blobExists } var neededBlobs []string - if isSdBlob && blobExists { - if fsc, ok := s.store.(neededBlobChecker); ok { - neededBlobs, err = fsc.MissingBlobsForKnownStream(blobHash) + if isSdBlob && !wantsBlob { + if nbc, ok := s.store.(neededBlobChecker); ok { + neededBlobs, err = nbc.MissingBlobsForKnownStream(blobHash) if err != nil { return err } } else { // if we can't confirm that we have the full stream, we have to say that the sd blob is // missing. if we say we have it, they wont try to send any content blobs - blobExists = false + wantsBlob = true } } - err = s.sendBlobResponse(conn, blobExists, isSdBlob, neededBlobs) + err = s.sendBlobResponse(conn, wantsBlob, isSdBlob, neededBlobs) if err != nil { return err } - if blobExists { + if !wantsBlob { return nil } @@ -296,14 +318,14 @@ func (s *Server) readBlobRequest(conn net.Conn) (int, string, bool, error) { return blobSize, blobHash, isSdBlob, nil } -func (s *Server) sendBlobResponse(conn net.Conn, blobExists, isSdBlob bool, neededBlobs []string) error { +func (s *Server) sendBlobResponse(conn net.Conn, shouldSendBlob, isSdBlob bool, neededBlobs []string) error { var response []byte var err error if isSdBlob { - response, err = json.Marshal(sendSdBlobResponse{SendSdBlob: !blobExists, NeededBlobs: neededBlobs}) + response, err = json.Marshal(sendSdBlobResponse{SendSdBlob: shouldSendBlob, NeededBlobs: neededBlobs}) } else { - response, err = json.Marshal(sendBlobResponse{SendBlob: !blobExists}) + response, err = json.Marshal(sendBlobResponse{SendBlob: shouldSendBlob}) } if err != nil { return err diff --git a/store/dbbacked.go b/store/dbbacked.go index b5d8442..b31c7aa 100644 --- a/store/dbbacked.go +++ b/store/dbbacked.go @@ -2,15 +2,19 @@ package store import ( "encoding/json" + "sync" "github.com/lbryio/lbry.go/errors" "github.com/lbryio/reflector.go/db" + log "github.com/sirupsen/logrus" ) // DBBackedS3Store is an instance of an S3 Store that is backed by a DB for what is stored. type DBBackedS3Store struct { - s3 *S3BlobStore - db *db.SQL + s3 *S3BlobStore + db *db.SQL + blockedMu sync.RWMutex + blocked map[string]bool } // NewDBBackedS3Store returns an initialized store pointer. @@ -67,9 +71,98 @@ func (d *DBBackedS3Store) Delete(hash string) error { return d.db.Delete(hash) } +// Block deletes the blob and prevents it from being uploaded in the future +func (d *DBBackedS3Store) Block(hash string) error { + if blocked, err := d.isBlocked(hash); blocked || err != nil { + return err + } + + log.Debugf("blocking %s", hash) + + err := d.db.Block(hash) + if err != nil { + return err + } + + has, err := d.db.HasBlob(hash) + if err != nil { + return err + } + + if has { + err = d.s3.Delete(hash) + if err != nil { + return err + } + + err = d.db.Delete(hash) + if err != nil { + return err + } + } + + return d.markBlocked(hash) +} + +// Wants returns false if the hash exists or is blocked, true otherwise +func (d *DBBackedS3Store) Wants(hash string) (bool, error) { + has, err := d.Has(hash) + if has || err != nil { + return false, err + } + + blocked, err := d.isBlocked(hash) + return !blocked, err +} + // MissingBlobsForKnownStream returns missing blobs for an existing stream // WARNING: if the stream does NOT exist, no blob hashes will be returned, which looks // like no blobs are missing func (d *DBBackedS3Store) MissingBlobsForKnownStream(sdHash string) ([]string, error) { return d.db.MissingBlobsForKnownStream(sdHash) } + +func (d *DBBackedS3Store) markBlocked(hash string) error { + err := d.initBlocked() + if err != nil { + return err + } + + d.blockedMu.Lock() + defer d.blockedMu.Unlock() + + d.blocked[hash] = true + return nil +} + +func (d *DBBackedS3Store) isBlocked(hash string) (bool, error) { + err := d.initBlocked() + if err != nil { + return false, err + } + + d.blockedMu.RLock() + defer d.blockedMu.RUnlock() + + return d.blocked[hash], nil +} + +func (d *DBBackedS3Store) initBlocked() error { + // first check without blocking since this is the most likely scenario + if d.blocked != nil { + return nil + } + + d.blockedMu.Lock() + defer d.blockedMu.Unlock() + + // check again in case of race condition + if d.blocked != nil { + return nil + } + + var err error + d.blocked, err = d.db.GetBlocked() + + return err +} diff --git a/store/store.go b/store/store.go index 5c879cf..e7b5c2d 100644 --- a/store/store.go +++ b/store/store.go @@ -4,11 +4,23 @@ import "github.com/lbryio/lbry.go/errors" // BlobStore is an interface with methods for consistently handling blob storage. type BlobStore interface { - Has(string) (bool, error) - Get(string) ([]byte, error) - Put(string, []byte) error - PutSD(string, []byte) error - Delete(string) error + // Does blob exist in the store + Has(hash string) (bool, error) + // Get the blob from the store + Get(hase string) ([]byte, error) + // Put the blob into the store + Put(hash string, blob []byte) error + // Put an SD blob into the store + PutSD(hash string, blob []byte) error + // Delete the blob from the store + Delete(hash string) error +} + +type Blocklister interface { + // Block deletes the blob and prevents it from being uploaded in the future + Block(hash string) error + // Wants returns false if the hash exists or is blocked, true otherwise + Wants(hash string) (bool, error) } //ErrBlobNotFound is a standard error when a blob is not found in the store. diff --git a/wallet/network.go b/wallet/network.go index 58ad935..e35257b 100644 --- a/wallet/network.go +++ b/wallet/network.go @@ -7,8 +7,14 @@ import ( "encoding/json" "errors" "fmt" - "log" + "math/rand" + "net" "sync" + "time" + + "github.com/lbryio/lbry.go/stop" + log "github.com/sirupsen/logrus" + "github.com/uber-go/atomic" ) const ( @@ -19,16 +25,11 @@ const ( var ( ErrNotImplemented = errors.New("not implemented") ErrNodeConnected = errors.New("node already connected") + ErrConnectFailed = errors.New("failed to connect") ) -type Transport interface { - SendMessage([]byte) error - Responses() <-chan []byte - Errors() <-chan error -} - -type respMetadata struct { - Id int `json:"id"` +type response struct { + Id uint32 `json:"id"` Method string `json:"method"` Error struct { Code int `json:"code"` @@ -37,90 +38,125 @@ type respMetadata struct { } type request struct { - Id int `json:"id"` + Id uint32 `json:"id"` Method string `json:"method"` Params []string `json:"params"` } type Node struct { - Address string + transport *TCPTransport + nextId atomic.Uint32 + grp *stop.Group - transport Transport - handlers map[int]chan []byte - handlersLock sync.RWMutex + handlersMu sync.RWMutex + handlers map[uint32]chan []byte - pushHandlers map[string][]chan []byte - pushHandlersLock sync.RWMutex - - nextId int + pushHandlersMu sync.RWMutex + pushHandlers map[string][]chan []byte } // NewNode creates a new node. func NewNode() *Node { - n := &Node{ - handlers: make(map[int]chan []byte), + return &Node{ + handlers: make(map[uint32]chan []byte), pushHandlers: make(map[string][]chan []byte), + grp: stop.New(), } - return n } -// ConnectTCP creates a new TCP connection to the specified address. -func (n *Node) ConnectTCP(addr string) error { +// Connect creates a new connection to the specified address. +func (n *Node) Connect(addrs []string, config *tls.Config) error { if n.transport != nil { return ErrNodeConnected } - n.Address = addr - transport, err := NewTCPTransport(addr) - if err != nil { + + // shuffle addresses for load balancing + rand.Seed(time.Now().UnixNano()) + rand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] }) + + var err error + + for _, addr := range addrs { + n.transport, err = NewTransport(addr, config) + if err == nil { + break + } + if e, ok := err.(*net.OpError); ok && e.Err.Error() == "no such host" { + // net.errNoSuchHost is not exported, so we have to string-match + continue + } return err } - n.transport = transport - go n.listen() + + if n.transport == nil { + return ErrConnectFailed + } + + log.Debugf("wallet connected to %s", n.transport.conn.RemoteAddr()) + + n.grp.Add(1) + go func() { + defer n.grp.Done() + <-n.grp.Ch() + n.transport.Shutdown() + }() + + n.grp.Add(1) + go func() { + defer n.grp.Done() + n.handleErrors() + }() + + n.grp.Add(1) + go func() { + defer n.grp.Done() + n.listen() + }() + return nil } -// ConnectSLL creates a new SLL connection to the specified address. -func (n *Node) ConnectSSL(addr string, config *tls.Config) error { - if n.transport != nil { - return ErrNodeConnected +func (n *Node) Shutdown() { + n.grp.StopAndWait() +} + +func (n *Node) handleErrors() { + for { + select { + case <-n.grp.Ch(): + return + case err := <-n.transport.Errors(): + n.err(err) + } } - n.Address = addr - transport, err := NewSSLTransport(addr, config) - if err != nil { - return err - } - n.transport = transport - go n.listen() - return nil } // err handles errors produced by the foreign node. func (n *Node) err(err error) { // TODO: Better error handling. - log.Fatal(err) + log.Error(err) } // listen processes messages from the server. func (n *Node) listen() { for { select { - case err := <-n.transport.Errors(): - n.err(err) + case <-n.grp.Ch(): return case bytes := <-n.transport.Responses(): - msg := &respMetadata{} + msg := &response{} if err := json.Unmarshal(bytes, msg); err != nil { n.err(err) - return + continue } if len(msg.Error.Message) > 0 { n.err(fmt.Errorf("error from server: %#v", msg.Error.Message)) - return + continue } if len(msg.Method) > 0 { - n.pushHandlersLock.RLock() + n.pushHandlersMu.RLock() handlers := n.pushHandlers[msg.Method] - n.pushHandlersLock.RUnlock() + n.pushHandlersMu.RUnlock() for _, handler := range handlers { select { @@ -130,10 +166,9 @@ func (n *Node) listen() { } } - n.handlersLock.RLock() + n.handlersMu.RLock() c, ok := n.handlers[msg.Id] - n.handlersLock.RUnlock() - + n.handlersMu.RUnlock() if ok { c <- bytes } @@ -144,8 +179,8 @@ func (n *Node) listen() { // listenPush returns a channel of messages matching the method. func (n *Node) listenPush(method string) <-chan []byte { c := make(chan []byte, 1) - n.pushHandlersLock.Lock() - defer n.pushHandlersLock.Unlock() + n.pushHandlersMu.Lock() + defer n.pushHandlersMu.Unlock() n.pushHandlers[method] = append(n.pushHandlers[method], c) return c } @@ -153,34 +188,34 @@ func (n *Node) listenPush(method string) <-chan []byte { // request makes a request to the server and unmarshals the response into v. func (n *Node) request(method string, params []string, v interface{}) error { msg := request{ - Id: n.nextId, + Id: n.nextId.Load(), Method: method, Params: params, } - n.nextId++ + n.nextId.Inc() + bytes, err := json.Marshal(msg) if err != nil { return err } - bytes = append(bytes, delim) - if err := n.transport.SendMessage(bytes); err != nil { + bytes = append(bytes, delimiter) + + err = n.transport.Send(bytes) + if err != nil { return err } c := make(chan []byte, 1) - n.handlersLock.Lock() + n.handlersMu.Lock() n.handlers[msg.Id] = c - n.handlersLock.Unlock() + n.handlersMu.Unlock() resp := <-c - n.handlersLock.Lock() - defer n.handlersLock.Unlock() + n.handlersMu.Lock() delete(n.handlers, msg.Id) + n.handlersMu.Unlock() - if err := json.Unmarshal(resp, v); err != nil { - return nil - } - return nil + return json.Unmarshal(resp, v) } diff --git a/wallet/tcp.go b/wallet/tcp.go deleted file mode 100644 index 4415761..0000000 --- a/wallet/tcp.go +++ /dev/null @@ -1,75 +0,0 @@ -package wallet - -// copied from https://github.com/d4l3k/go-electrum - -import ( - "bufio" - "crypto/tls" - "net" - - log "github.com/sirupsen/logrus" -) - -type TCPTransport struct { - conn net.Conn - responses chan []byte - errors chan error -} - -func NewTCPTransport(addr string) (*TCPTransport, error) { - conn, err := net.Dial("tcp", addr) - if err != nil { - return nil, err - } - t := &TCPTransport{ - conn: conn, - responses: make(chan []byte), - errors: make(chan error), - } - go t.listen() - return t, nil -} - -func NewSSLTransport(addr string, config *tls.Config) (*TCPTransport, error) { - conn, err := tls.Dial("tcp", addr, config) - if err != nil { - return nil, err - } - t := &TCPTransport{ - conn: conn, - responses: make(chan []byte), - errors: make(chan error), - } - go t.listen() - return t, nil -} - -func (t *TCPTransport) SendMessage(body []byte) error { - log.Debugf("%s <- %s", t.conn.RemoteAddr(), body) - _, err := t.conn.Write(body) - return err -} - -const delim = byte('\n') - -func (t *TCPTransport) listen() { - defer t.conn.Close() - reader := bufio.NewReader(t.conn) - for { - line, err := reader.ReadBytes(delim) - if err != nil { - t.errors <- err - log.Error(err) - break - } - log.Debugf("%s -> %s", t.conn.RemoteAddr(), line) - t.responses <- line - } -} - -func (t *TCPTransport) Responses() <-chan []byte { - return t.responses -} -func (t *TCPTransport) Errors() <-chan error { - return t.errors -} diff --git a/wallet/transport.go b/wallet/transport.go new file mode 100644 index 0000000..d817742 --- /dev/null +++ b/wallet/transport.go @@ -0,0 +1,101 @@ +package wallet + +// copied from https://github.com/d4l3k/go-electrum + +import ( + "bufio" + "crypto/tls" + "net" + "time" + + "github.com/lbryio/lbry.go/stop" + log "github.com/sirupsen/logrus" +) + +type TCPTransport struct { + conn net.Conn + responses chan []byte + errors chan error + grp *stop.Group +} + +func NewTransport(addr string, config *tls.Config) (*TCPTransport, error) { + var conn net.Conn + var err error + + timeout := 5 * time.Second + if config != nil { + conn, err = tls.DialWithDialer(&net.Dialer{Timeout: timeout}, "tcp", addr, config) + } else { + conn, err = net.DialTimeout("tcp", addr, timeout) + } + if err != nil { + return nil, err + } + + t := &TCPTransport{ + conn: conn, + responses: make(chan []byte), + errors: make(chan error), + grp: stop.New(), + } + + t.grp.Add(1) + go func() { + defer t.grp.Done() + <-t.grp.Ch() + t.close() + }() + + t.grp.Add(1) + go func() { + t.grp.Done() + t.listen() + }() + + return t, nil +} + +const delimiter = byte('\n') + +func (t *TCPTransport) listen() { + reader := bufio.NewReader(t.conn) + for { + line, err := reader.ReadBytes(delimiter) + if err != nil { + t.error(err) + return + } + + log.Debugf("%s -> %s", t.conn.RemoteAddr(), line) + + t.responses <- line + } +} + +func (t *TCPTransport) Send(body []byte) error { + log.Debugf("%s <- %s", t.conn.RemoteAddr(), body) + _, err := t.conn.Write(body) + return err +} + +func (t *TCPTransport) error(err error) { + select { + case t.errors <- err: + default: + } +} + +func (t *TCPTransport) Responses() <-chan []byte { return t.responses } +func (t *TCPTransport) Errors() <-chan error { return t.errors } + +func (t *TCPTransport) Shutdown() { + t.grp.StopAndWait() +} + +func (t *TCPTransport) close() { + err := t.conn.Close() + if err != nil { + t.error(err) + } +}