From 75886211b14695d9f90dede6a2fb47d16ef485c7 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Wed, 15 Aug 2018 20:17:02 -0400 Subject: [PATCH] fix partial stream upload --- db/db.go | 47 +++++++++----- reflector/server.go | 39 +++++++----- reflector/server_test.go | 130 +++++++++++++++++++++++++++++++++------ store/dbbacked.go | 8 ++- 4 files changed, 171 insertions(+), 53 deletions(-) diff --git a/db/db.go b/db/db.go index 36f1cf7..b33ad5e 100644 --- a/db/db.go +++ b/db/db.go @@ -158,30 +158,47 @@ func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) { return exists, nil } -// HasFullStream checks if the full stream has been uploaded (i.e. if we have the sd blob and all the content blobs) -func (s *SQL) HasFullStream(sdHash string) (bool, error) { +// MissingBlobsForKnownStream returns missing blobs for an existing stream +// WARNING: if the stream does NOT exist, no blob hashes will be returned, which looks +// like no blobs are missing +func (s *SQL) MissingBlobsForKnownStream(sdHash string) ([]string, error) { if s.conn == nil { - return false, errors.Err("not connected") + return nil, errors.Err("not connected") } - query := `SELECT EXISTS( - SELECT 1 FROM stream s - LEFT JOIN stream_blob sb ON s.hash = sb.stream_hash - LEFT JOIN blob_ b ON b.hash = sb.blob_hash - WHERE s.sd_hash = ? - GROUP BY s.sd_hash - HAVING min(b.is_stored = 1) - );` + query := ` + SELECT b.hash FROM blob_ b + INNER JOIN stream_blob sb ON b.hash = sb.blob_hash + INNER JOIN stream s ON s.hash = sb.stream_hash AND s.sd_hash = ? + WHERE b.is_stored = 0 + ` args := []interface{}{sdHash} logQuery(query, args...) - row := s.conn.QueryRow(query, args...) + rows, err := s.conn.Query(query, args...) + if err != nil { + return nil, errors.Err(err) + } + defer closeRows(rows) - exists := false - err := row.Scan(&exists) + var missingBlobs []string + var hash string - return exists, errors.Err(err) + for rows.Next() { + err := rows.Scan(&hash) + if err != nil { + return nil, errors.Err(err) + } + missingBlobs = append(missingBlobs, hash) + } + + err = rows.Err() + if err != nil { + return nil, errors.Err(err) + } + + return missingBlobs, errors.Err(err) } // AddSDBlob takes the SD Hash number of blobs and the set of blobs. In a single db tx it inserts the sdblob information diff --git a/reflector/server.go b/reflector/server.go index 1176ded..b96c0da 100644 --- a/reflector/server.go +++ b/reflector/server.go @@ -165,23 +165,27 @@ func (s *Server) receiveBlob(conn net.Conn) error { return err } - // fullStreamChecker can check if the full stream has been uploaded - type fullStreamChecker interface { - HasFullStream(string) (bool, error) - } - - blobExists := false - if fsc, ok := s.store.(fullStreamChecker); ok && isSdBlob { - blobExists, err = fsc.HasFullStream(blobHash) - } else { - // if we can't confirm that we have the full stream, we have to say that the sd blob is missing. if we say we have it, they wont try to send any content blobs - blobExists, err = s.store.Has(blobHash) - } + blobExists, err := s.store.Has(blobHash) if err != nil { return err } - err = s.sendBlobResponse(conn, blobExists, isSdBlob) + var neededBlobs []string + + if isSdBlob && blobExists { + if fsc, ok := s.store.(neededBlobChecker); ok { + neededBlobs, err = fsc.MissingBlobsForKnownStream(blobHash) + if err != nil { + return err + } + } else { + // if we can't confirm that we have the full stream, we have to say that the sd blob is + // missing. if we say we have it, they wont try to send any content blobs + blobExists = false + } + } + + err = s.sendBlobResponse(conn, blobExists, isSdBlob, neededBlobs) if err != nil { return err } @@ -264,12 +268,12 @@ func (s *Server) readBlobRequest(conn net.Conn) (int, string, bool, error) { return blobSize, blobHash, isSdBlob, nil } -func (s *Server) sendBlobResponse(conn net.Conn, blobExists, isSdBlob bool) error { +func (s *Server) sendBlobResponse(conn net.Conn, blobExists, isSdBlob bool, neededBlobs []string) error { var response []byte var err error if isSdBlob { - response, err = json.Marshal(sendSdBlobResponse{SendSdBlob: !blobExists}) + response, err = json.Marshal(sendSdBlobResponse{SendSdBlob: !blobExists, NeededBlobs: neededBlobs}) } else { response, err = json.Marshal(sendBlobResponse{SendBlob: !blobExists}) } @@ -374,3 +378,8 @@ type blobTransferResponse struct { type sdBlobTransferResponse struct { ReceivedSdBlob bool `json:"received_sd_blob"` } + +// neededBlobChecker can check which blobs from a known stream are not uploaded yet +type neededBlobChecker interface { + MissingBlobsForKnownStream(string) ([]string, error) +} diff --git a/reflector/server_test.go b/reflector/server_test.go index d258ef1..d13ea2f 100644 --- a/reflector/server_test.go +++ b/reflector/server_test.go @@ -3,6 +3,7 @@ package reflector import ( "crypto/rand" "io" + "math" "strconv" "testing" "time" @@ -10,6 +11,11 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/lbryio/reflector.go/store" + "encoding/json" + + "sort" + + "github.com/lbryio/reflector.go/dht/bits" "github.com/phayes/freeport" ) @@ -62,11 +68,7 @@ func TestServer_MediumBlob(t *testing.T) { t.Fatal("error connecting client to server", err) } - blob := make([]byte, 1000) - _, err = rand.Read(blob) - if err != nil { - t.Fatal("failed to make random blob") - } + blob := randBlob(1000) err = c.SendBlob(blob) if err != nil { @@ -84,11 +86,7 @@ func TestServer_FullBlob(t *testing.T) { t.Fatal("error connecting client to server", err) } - blob := make([]byte, maxBlobSize) - _, err = rand.Read(blob) - if err != nil { - t.Fatal("failed to make random blob") - } + blob := randBlob(maxBlobSize) err = c.SendBlob(blob) if err != nil { @@ -106,11 +104,7 @@ func TestServer_TooBigBlob(t *testing.T) { t.Fatal("error connecting client to server", err) } - blob := make([]byte, maxBlobSize+1) - _, err = rand.Read(blob) - if err != nil { - t.Fatal("failed to make random blob") - } + blob := randBlob(maxBlobSize + 1) err = c.SendBlob(blob) if err == nil { @@ -144,11 +138,7 @@ func TestServer_Timeout(t *testing.T) { time.Sleep(testTimeout * 2) - blob := make([]byte, 10) - _, err = rand.Read(blob) - if err != nil { - t.Fatal("failed to make random blob") - } + blob := randBlob(10) err = c.SendBlob(blob) t.Log(spew.Sdump(err)) @@ -156,3 +146,103 @@ func TestServer_Timeout(t *testing.T) { t.Error("server should have timed out by now") } } + +type mockPartialStore struct { + store.MemoryBlobStore + missing []string +} + +func (m mockPartialStore) MissingBlobsForKnownStream(hash string) ([]string, error) { + return m.missing, nil +} + +func TestServer_PartialUpload(t *testing.T) { + port, err := freeport.GetFreePort() + if err != nil { + t.Fatal(err) + } + + sdHash := bits.Rand().String() + missing := make([]string, 4) + for i := range missing { + missing[i] = bits.Rand().String() + } + + var st store.BlobStore + st = &mockPartialStore{missing: missing} + if _, ok := st.(neededBlobChecker); !ok { + t.Fatal("mock does not implement the relevant interface") + } + st.Put(sdHash, randBlob(10)) + + srv := NewServer(st) + err = srv.Start("127.0.0.1:" + strconv.Itoa(port)) + if err != nil { + t.Fatal(err) + } + defer srv.Shutdown() + + c := Client{} + err = c.Connect(":" + strconv.Itoa(port)) + if err != nil { + t.Fatal("error connecting client to server", err) + } + + sendRequest, err := json.Marshal(sendBlobRequest{ + SdBlobHash: sdHash, + SdBlobSize: len(sdHash), + }) + if err != nil { + t.Fatal(err) + } + + _, err = c.conn.Write(sendRequest) + if err != nil { + t.Fatal(err) + } + + var sendResp sendSdBlobResponse + err = json.NewDecoder(c.conn).Decode(&sendResp) + if err != nil { + t.Fatal(err) + } + + if sendResp.SendSdBlob { + t.Errorf("expected SendSdBlob = false, got true") + } + + if len(sendResp.NeededBlobs) != len(missing) { + t.Fatalf("got %d needed blobs, expected %d", len(sendResp.NeededBlobs), len(missing)) + } + + sort.Strings(sendResp.NeededBlobs) + sort.Strings(missing) + + for i := range missing { + if missing[i] != sendResp.NeededBlobs[i] { + t.Errorf("needed blobs mismatch: %s != %s", missing[i], sendResp.NeededBlobs[i]) + } + } +} + +func MakeRandStream(size int) ([]byte, [][]byte) { + blobs := make([][]byte, int(math.Ceil(float64(size)/maxBlobSize))) + for i := 0; i < len(blobs); i++ { + blobs[i] = randBlob(int(math.Min(maxBlobSize, float64(size)))) + size -= maxBlobSize + } + + return nil, blobs +} + +func randBlob(size int) []byte { + //if size > maxBlobSize { + // panic("blob size too big") + //} + blob := make([]byte, size) + _, err := rand.Read(blob) + if err != nil { + panic("failed to make random blob") + } + return blob +} diff --git a/store/dbbacked.go b/store/dbbacked.go index 3e5ac32..52d8d5e 100644 --- a/store/dbbacked.go +++ b/store/dbbacked.go @@ -58,7 +58,9 @@ func (d *DBBackedS3Store) PutSD(hash string, blob []byte) error { return d.db.AddSDBlob(hash, len(blob), blobContents) } -// HasFullStream checks if the full stream has been uploaded (i.e. if we have the sd blob and all the content blobs) -func (d *DBBackedS3Store) HasFullStream(sdHash string) (bool, error) { - return d.db.HasFullStream(sdHash) +// MissingBlobsForKnownStream returns missing blobs for an existing stream +// WARNING: if the stream does NOT exist, no blob hashes will be returned, which looks +// like no blobs are missing +func (d *DBBackedS3Store) MissingBlobsForKnownStream(sdHash string) ([]string, error) { + return d.db.MissingBlobsForKnownStream(sdHash) }