diff --git a/peer/http3/client.go b/peer/http3/client.go index 41c40fb..ad5c741 100644 --- a/peer/http3/client.go +++ b/peer/http3/client.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net/http" + "sync" "time" "github.com/lbryio/lbry.go/v2/extras/errors" @@ -80,18 +81,47 @@ func (c *Client) GetBlob(hash string) (stream.Blob, error) { return nil, errors.Err(err) } defer resp.Body.Close() + if resp.StatusCode == http.StatusNotFound { fmt.Printf("%s blob not found %d\n", hash, resp.StatusCode) return nil, errors.Err(store.ErrBlobNotFound) - } - if resp.StatusCode != http.StatusOK { + } else if resp.StatusCode != http.StatusOK { return nil, errors.Err("non 200 status code returned: %d", resp.StatusCode) } - body := &bytes.Buffer{} - _, err = io.Copy(body, resp.Body) + + tmp := getBuffer() + defer putBuffer(tmp) + + written, err := io.Copy(tmp, resp.Body) if err != nil { return nil, errors.Err(err) } - metrics.MtrInBytesUdp.Add(float64(len(body.Bytes()))) - return body.Bytes(), nil + + blob := make([]byte, written) + copy(blob, tmp.Bytes()) + + metrics.MtrInBytesUdp.Add(float64(len(blob))) + + return blob, nil +} + +// buffer pool to reduce GC +// https://www.captaincodeman.com/2017/06/02/golang-buffer-pool-gotcha +var buffers = sync.Pool{ + // New is called when a new instance is needed + New: func() interface{} { + buf := make([]byte, 0, stream.MaxBlobSize) + return bytes.NewBuffer(buf) + }, +} + +// getBuffer fetches a buffer from the pool +func getBuffer() *bytes.Buffer { + return buffers.Get().(*bytes.Buffer) +} + +// putBuffer returns a buffer to the pool +func putBuffer(buf *bytes.Buffer) { + buf.Reset() + buffers.Put(buf) } diff --git a/store/disk.go b/store/disk.go index 4ac2a0f..2234971 100644 --- a/store/disk.go +++ b/store/disk.go @@ -58,17 +58,15 @@ func (d *DiskStore) Get(hash string) (stream.Blob, error) { return nil, err } - file, err := os.Open(d.path(hash)) + blob, err := ioutil.ReadFile(d.path(hash)) if err != nil { if os.IsNotExist(err) { return nil, errors.Err(ErrBlobNotFound) } - return nil, err + return nil, errors.Err(err) } - defer file.Close() - blob, err := ioutil.ReadAll(file) - return blob, errors.Err(err) + return blob, nil } // Put stores the blob on disk diff --git a/store/disk_test.go b/store/disk_test.go new file mode 100644 index 0000000..3bc088a --- /dev/null +++ b/store/disk_test.go @@ -0,0 +1,45 @@ +package store + +import ( + "io/ioutil" + "os" + "path" + "path/filepath" + "testing" + + "github.com/lbryio/lbry.go/v2/extras/errors" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDiskStore_Get(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "reflector_test_*") + require.NoError(t, err) + defer os.RemoveAll(tmpDir) + d := NewDiskStore(tmpDir, 2) + + hash := "1234567890" + data := []byte("oyuntyausntoyaunpdoyruoyduanrstjwfjyuwf") + + expectedPath := path.Join(tmpDir, hash[:2], hash) + err = os.MkdirAll(filepath.Dir(expectedPath), os.ModePerm) + require.NoError(t, err) + err = ioutil.WriteFile(expectedPath, data, os.ModePerm) + require.NoError(t, err) + + blob, err := d.Get(hash) + assert.NoError(t, err) + assert.EqualValues(t, data, blob) +} + +func TestDiskStore_GetNonexistentBlob(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "reflector_test_*") + require.NoError(t, err) + defer os.RemoveAll(tmpDir) + d := NewDiskStore(tmpDir, 2) + + blob, err := d.Get("nonexistent") + assert.Nil(t, blob) + assert.True(t, errors.Is(err, ErrBlobNotFound)) +}