use new connections for each blob

This commit is contained in:
Niko Storni 2020-05-13 21:53:57 +02:00
parent e0da2674a1
commit 90997b9918
3 changed files with 21 additions and 26 deletions

View file

@ -59,12 +59,12 @@ func reflectorCmd(cmd *cobra.Command, args []string) {
case "tcp": case "tcp":
blobStore = peer.NewStore(peer.StoreOpts{ blobStore = peer.NewStore(peer.StoreOpts{
Address: reflectorServerAddress + ":" + reflectorServerPort, Address: reflectorServerAddress + ":" + reflectorServerPort,
Timeout: 10 * time.Second, Timeout: 30 * time.Second,
}) })
case "udp": case "udp":
blobStore = quic.NewStore(quic.StoreOpts{ blobStore = quic.NewStore(quic.StoreOpts{
Address: reflectorServerAddress + ":" + reflectorServerPort, Address: reflectorServerAddress + ":" + reflectorServerPort,
Timeout: 10 * time.Second, Timeout: 30 * time.Second,
}) })
} }
} else { } else {

View file

@ -149,6 +149,9 @@ func (c *Client) GetBlob(hash string) (stream.Blob, error) {
} }
if resp.IncomingBlob.Error != "" { if resp.IncomingBlob.Error != "" {
if resp.IncomingBlob.Error == store.ErrBlobNotFound.Error() {
return nil, errors.Err(store.ErrBlobNotFound)
}
return nil, errors.Prefix(hash[:8], resp.IncomingBlob.Error) return nil, errors.Prefix(hash[:8], resp.IncomingBlob.Error)
} }
if resp.IncomingBlob.BlobHash != hash { if resp.IncomingBlob.BlobHash != hash {

View file

@ -10,8 +10,7 @@ import (
// Store is a blob store that gets blobs from a peer. // Store is a blob store that gets blobs from a peer.
// It satisfies the store.BlobStore interface but cannot put or delete blobs. // It satisfies the store.BlobStore interface but cannot put or delete blobs.
type Store struct { type Store struct {
client *Client opts StoreOpts
connErr error
} }
// StoreOpts allows to set options for a new Store. // StoreOpts allows to set options for a new Store.
@ -22,40 +21,33 @@ type StoreOpts struct {
// NewStore makes a new peer store. // NewStore makes a new peer store.
func NewStore(opts StoreOpts) *Store { func NewStore(opts StoreOpts) *Store {
c := &Client{Timeout: opts.Timeout} return &Store{opts: opts}
err := c.Connect(opts.Address)
return &Store{client: c, connErr: err}
} }
// CloseStore closes the client that gets initialized when the store is initialized func (p *Store) getClient() (*Client, error) {
func (p *Store) CloseStore() error { c := &Client{Timeout: p.opts.Timeout}
if p.client != nil && p.client.stream != nil { err := c.Connect(p.opts.Address)
err := p.client.stream.Close() return c, errors.Prefix("connection error", err)
if err != nil {
return errors.Err(err)
}
return p.client.Close()
}
return nil
} }
// Has asks the peer if they have a hash // Has asks the peer if they have a hash
func (p *Store) Has(hash string) (bool, error) { func (p *Store) Has(hash string) (bool, error) {
if p.connErr != nil { c, err := p.getClient()
return false, errors.Prefix("connection error", p.connErr) if err != nil {
return false, err
} }
defer c.Close()
return p.client.HasBlob(hash) return c.HasBlob(hash)
} }
// Get downloads the blob from the peer // Get downloads the blob from the peer
func (p *Store) Get(hash string) (stream.Blob, error) { func (p *Store) Get(hash string) (stream.Blob, error) {
if p.connErr != nil { c, err := p.getClient()
return nil, errors.Prefix("connection error", p.connErr) if err != nil {
return nil, err
} }
defer c.Close()
return p.client.GetBlob(hash) return c.GetBlob(hash)
} }
// Put is not supported // Put is not supported