From e70b9af3e41662cd0c5dd246b0b097f13acdc84c Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Sat, 21 Nov 2020 11:39:15 -0500 Subject: [PATCH 1/2] dont overallocate ram when reading blobs from disk ReadFile checks the file size and allocates a bit more space than we expect we'll need. ReadAll uses Go's standard resizing algo, which doubles the underlying array each time you hit the end. So ReadAll ends up allocating 4MB for a full blob, while ReadFile allocates slightly over 2MB. --- store/disk.go | 8 +++----- store/disk_test.go | 45 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 5 deletions(-) create mode 100644 store/disk_test.go diff --git a/store/disk.go b/store/disk.go index 4ac2a0f..2234971 100644 --- a/store/disk.go +++ b/store/disk.go @@ -58,17 +58,15 @@ func (d *DiskStore) Get(hash string) (stream.Blob, error) { return nil, err } - file, err := os.Open(d.path(hash)) + blob, err := ioutil.ReadFile(d.path(hash)) if err != nil { if os.IsNotExist(err) { return nil, errors.Err(ErrBlobNotFound) } - return nil, err + return nil, errors.Err(err) } - defer file.Close() - blob, err := ioutil.ReadAll(file) - return blob, errors.Err(err) + return blob, nil } // Put stores the blob on disk diff --git a/store/disk_test.go b/store/disk_test.go new file mode 100644 index 0000000..3bc088a --- /dev/null +++ b/store/disk_test.go @@ -0,0 +1,45 @@ +package store + +import ( + "io/ioutil" + "os" + "path" + "path/filepath" + "testing" + + "github.com/lbryio/lbry.go/v2/extras/errors" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDiskStore_Get(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "reflector_test_*") + require.NoError(t, err) + defer os.RemoveAll(tmpDir) + d := NewDiskStore(tmpDir, 2) + + hash := "1234567890" + data := []byte("oyuntyausntoyaunpdoyruoyduanrstjwfjyuwf") + + expectedPath := path.Join(tmpDir, hash[:2], hash) + err = os.MkdirAll(filepath.Dir(expectedPath), os.ModePerm) + require.NoError(t, err) + err = ioutil.WriteFile(expectedPath, data, os.ModePerm) + require.NoError(t, err) + + blob, err := d.Get(hash) + assert.NoError(t, err) + assert.EqualValues(t, data, blob) +} + +func TestDiskStore_GetNonexistentBlob(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "reflector_test_*") + require.NoError(t, err) + defer os.RemoveAll(tmpDir) + d := NewDiskStore(tmpDir, 2) + + blob, err := d.Get("nonexistent") + assert.Nil(t, blob) + assert.True(t, errors.Is(err, ErrBlobNotFound)) +} From fb77bf621e5ef6049af217f077acd38bfc1a85bb Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Sat, 21 Nov 2020 12:07:47 -0500 Subject: [PATCH 2/2] dont over-allocate ram when reading blobs via QUIC --- peer/http3/client.go | 42 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/peer/http3/client.go b/peer/http3/client.go index 41c40fb..ad5c741 100644 --- a/peer/http3/client.go +++ b/peer/http3/client.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net/http" + "sync" "time" "github.com/lbryio/lbry.go/v2/extras/errors" @@ -80,18 +81,47 @@ func (c *Client) GetBlob(hash string) (stream.Blob, error) { return nil, errors.Err(err) } defer resp.Body.Close() + if resp.StatusCode == http.StatusNotFound { fmt.Printf("%s blob not found %d\n", hash, resp.StatusCode) return nil, errors.Err(store.ErrBlobNotFound) - } - if resp.StatusCode != http.StatusOK { + } else if resp.StatusCode != http.StatusOK { return nil, errors.Err("non 200 status code returned: %d", resp.StatusCode) } - body := &bytes.Buffer{} - _, err = io.Copy(body, resp.Body) + + tmp := getBuffer() + defer putBuffer(tmp) + + written, err := io.Copy(tmp, resp.Body) if err != nil { return nil, errors.Err(err) } - metrics.MtrInBytesUdp.Add(float64(len(body.Bytes()))) - return body.Bytes(), nil + + blob := make([]byte, written) + copy(blob, tmp.Bytes()) + + metrics.MtrInBytesUdp.Add(float64(len(blob))) + + return blob, nil +} + +// buffer pool to reduce GC +// https://www.captaincodeman.com/2017/06/02/golang-buffer-pool-gotcha +var buffers = sync.Pool{ + // New is called when a new instance is needed + New: func() interface{} { + buf := make([]byte, 0, stream.MaxBlobSize) + return bytes.NewBuffer(buf) + }, +} + +// getBuffer fetches a buffer from the pool +func getBuffer() *bytes.Buffer { + return buffers.Get().(*bytes.Buffer) +} + +// putBuffer returns a buffer to the pool +func putBuffer(buf *bytes.Buffer) { + buf.Reset() + buffers.Put(buf) }