minor changes. proto v2 works, but always requests sd blob

This commit is contained in:
Alex Grintsvayg 2018-01-24 11:45:18 -05:00
parent 8ee6b26feb
commit 3c8416b576
4 changed files with 53 additions and 38 deletions

View file

@ -2,10 +2,11 @@ package main
import ( import (
"encoding/json" "encoding/json"
"fmt"
"log"
"net" "net"
"strconv" "strconv"
"github.com/lbryio/errors.go"
log "github.com/sirupsen/logrus"
) )
type Client struct { type Client struct {
@ -29,11 +30,11 @@ func (c *Client) Close() error {
func (c *Client) SendBlob(blob []byte) error { func (c *Client) SendBlob(blob []byte) error {
if !c.connected { if !c.connected {
return fmt.Errorf("Not connected") return errors.Err("not connected")
} }
if len(blob) != BlobSize { if len(blob) != BlobSize {
return fmt.Errorf("Blob must be exactly " + strconv.Itoa(BlobSize) + " bytes") return errors.Err("blob must be exactly " + strconv.Itoa(BlobSize) + " bytes")
} }
blobHash := getBlobHash(blob) blobHash := getBlobHash(blob)
@ -74,7 +75,7 @@ func (c *Client) SendBlob(blob []byte) error {
} }
if !transferResp.ReceivedBlob { if !transferResp.ReceivedBlob {
return fmt.Errorf("Server did not received blob") return errors.Err("server did not received blob")
} }
return nil return nil
@ -82,7 +83,7 @@ func (c *Client) SendBlob(blob []byte) error {
func (c *Client) doHandshake(version int) error { func (c *Client) doHandshake(version int) error {
if !c.connected { if !c.connected {
return fmt.Errorf("Not connected") return errors.Err("not connected")
} }
handshake, err := json.Marshal(handshakeRequestResponse{Version: version}) handshake, err := json.Marshal(handshakeRequestResponse{Version: version})
@ -101,7 +102,7 @@ func (c *Client) doHandshake(version int) error {
if err != nil { if err != nil {
return err return err
} else if resp.Version != version { } else if resp.Version != version {
return fmt.Errorf("Handshake version mismatch") return errors.Err("handshake version mismatch")
} }
return nil return nil

View file

@ -2,10 +2,11 @@ package main
import ( import (
"flag" "flag"
"log"
"math/rand" "math/rand"
"strconv" "strconv"
"time" "time"
log "github.com/sirupsen/logrus"
) )
func checkErr(err error) { func checkErr(err error) {
@ -15,11 +16,11 @@ func checkErr(err error) {
} }
func main() { func main() {
var err error
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
port := DefaultPort port := DefaultPort
address := "52.14.109.125:" + strconv.Itoa(port) //address := "52.14.109.125:" + strconv.Itoa(port)
address := "localhost:" + strconv.Itoa(port)
serve := flag.Bool("server", false, "Run server") serve := flag.Bool("server", false, "Run server")
blobDir := flag.String("blobdir", "", "Where blobs will be saved to") blobDir := flag.String("blobdir", "", "Where blobs will be saved to")
@ -33,6 +34,7 @@ func main() {
return return
} }
var err error
client := Client{} client := Client{}
log.Println("Connecting to " + address) log.Println("Connecting to " + address)

View file

@ -3,14 +3,16 @@ package main
import ( import (
"bufio" "bufio"
"encoding/json" "encoding/json"
"fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"log"
"net" "net"
"os" "os"
"path" "path"
"strconv" "strconv"
"github.com/lbryio/errors.go"
log "github.com/sirupsen/logrus"
) )
type Server struct { type Server struct {
@ -63,22 +65,20 @@ func (s *Server) handleConn(conn net.Conn) {
for { for {
err = s.receiveBlob(conn) err = s.receiveBlob(conn)
if err != nil { if err != nil {
if err == io.EOF { if err != io.EOF {
return
}
s.doError(conn, err) s.doError(conn, err)
}
return return
} }
} }
} }
func (s *Server) doError(conn net.Conn, e error) error { func (s *Server) doError(conn net.Conn, err error) error {
log.Println("Error: " + e.Error()) log.Errorln(err)
if e2, ok := e.(*json.SyntaxError); ok { if e2, ok := err.(*json.SyntaxError); ok {
log.Printf("syntax error at byte offset %d", e2.Offset) log.Printf("syntax error at byte offset %d", e2.Offset)
} }
resp, err := json.Marshal(errorResponse{Error: e.Error()}) resp, err := json.Marshal(errorResponse{Error: err.Error()})
if err != nil { if err != nil {
return err return err
} }
@ -117,7 +117,7 @@ func (s *Server) receiveBlob(conn net.Conn) error {
receivedBlobHash := getBlobHash(blob) receivedBlobHash := getBlobHash(blob)
if blobHash != receivedBlobHash { if blobHash != receivedBlobHash {
return fmt.Errorf("Hash of received blob data does not match hash from send request") return errors.Err("hash of received blob data does not match hash from send request")
// this can also happen if the blob size is wrong, because the server will read the wrong number of bytes from the stream // this can also happen if the blob size is wrong, because the server will read the wrong number of bytes from the stream
} }
log.Println("Got blob " + blobHash[:8]) log.Println("Got blob " + blobHash[:8])
@ -132,12 +132,11 @@ func (s *Server) receiveBlob(conn net.Conn) error {
func (s *Server) doHandshake(conn net.Conn) error { func (s *Server) doHandshake(conn net.Conn) error {
var handshake handshakeRequestResponse var handshake handshakeRequestResponse
dec := json.NewDecoder(conn) err := json.NewDecoder(conn).Decode(&handshake)
err := dec.Decode(&handshake)
if err != nil { if err != nil {
return err return err
} else if handshake.Version != protocolVersion1 && handshake.Version != protocolVersion2 { } else if handshake.Version != protocolVersion1 && handshake.Version != protocolVersion2 {
return fmt.Errorf("Protocol version not supported") return errors.Err("protocol version not supported")
} }
resp, err := json.Marshal(handshakeRequestResponse{Version: handshake.Version}) resp, err := json.Marshal(handshakeRequestResponse{Version: handshake.Version})
@ -155,32 +154,29 @@ func (s *Server) doHandshake(conn net.Conn) error {
func (s *Server) readBlobRequest(conn net.Conn) (int, string, bool, error) { func (s *Server) readBlobRequest(conn net.Conn) (int, string, bool, error) {
var sendRequest sendBlobRequest var sendRequest sendBlobRequest
dec := json.NewDecoder(conn) err := json.NewDecoder(conn).Decode(&sendRequest)
err := dec.Decode(&sendRequest)
if err != nil { if err != nil {
return 0, "", false, err return 0, "", false, err
} }
if sendRequest.SdBlobHash != "" && sendRequest.BlobHash != "" { if sendRequest.SdBlobHash != "" && sendRequest.BlobHash != "" {
return 0, "", false, fmt.Errorf("Invalid request") return 0, "", false, errors.Err("invalid request")
} }
var blobHash string var blobHash string
var blobSize int var blobSize int
isSdBlob := sendRequest.SdBlobHash != "" isSdBlob := sendRequest.SdBlobHash != ""
if blobSize > BlobSize {
return 0, "", isSdBlob, errors.Err("blob cannot be more than " + strconv.Itoa(BlobSize) + " bytes")
}
if isSdBlob { if isSdBlob {
blobSize = sendRequest.SdBlobSize blobSize = sendRequest.SdBlobSize
blobHash = sendRequest.SdBlobHash blobHash = sendRequest.SdBlobHash
if blobSize > BlobSize {
return 0, "", isSdBlob, fmt.Errorf("SD blob cannot be more than " + strconv.Itoa(BlobSize) + " bytes")
}
} else { } else {
blobSize = sendRequest.BlobSize blobSize = sendRequest.BlobSize
blobHash = sendRequest.BlobHash blobHash = sendRequest.BlobHash
if blobSize != BlobSize {
return 0, "", isSdBlob, fmt.Errorf("Blob must be exactly " + strconv.Itoa(BlobSize) + " bytes")
}
} }
return blobSize, blobHash, isSdBlob, nil return blobSize, blobHash, isSdBlob, nil
@ -238,7 +234,7 @@ func (s *Server) ensureBlobDirExists() error {
return err return err
} }
} else if !stat.IsDir() { } else if !stat.IsDir() {
return fmt.Errorf("blob dir exists but is not a dir") return errors.Err("blob dir exists but is not a dir")
} }
return nil return nil
} }

View file

@ -3,7 +3,8 @@ package main
import ( import (
"crypto/sha512" "crypto/sha512"
"encoding/hex" "encoding/hex"
"fmt"
"github.com/lbryio/errors.go"
) )
const ( const (
@ -11,11 +12,11 @@ const (
BlobSize = 2 * 1024 * 1024 BlobSize = 2 * 1024 * 1024
protocolVersion1 = 1 protocolVersion1 = 0
protocolVersion2 = 2 protocolVersion2 = 1
) )
var ErrBlobExists = fmt.Errorf("Blob exists on server") var ErrBlobExists = errors.Base("blob exists on server")
type errorResponse struct { type errorResponse struct {
Error string `json:"error"` Error string `json:"error"`
@ -53,3 +54,18 @@ func getBlobHash(blob []byte) string {
hashBytes := sha512.Sum384(blob) hashBytes := sha512.Sum384(blob)
return hex.EncodeToString(hashBytes[:]) return hex.EncodeToString(hashBytes[:])
} }
// can be used to read the sd blob and then return a list of blobs that are actually missing
type sdBlobContents struct {
StreamName string `json:"stream_name"`
Blobs []struct {
Length int `json:"length"`
BlobNum int `json:"blob_num"`
BlobHash string `json:"blob_hash,omitempty"`
Iv string `json:"iv"`
} `json:"blobs"`
StreamType string `json:"stream_type"`
Key string `json:"key"`
SuggestedFileName string `json:"suggested_file_name"`
StreamHash string `json:"stream_hash"`
}