From 6acb99c74dd7f705f48f6b514fe54eedb66c14c8 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Wed, 16 Jan 2019 11:41:58 -0500 Subject: [PATCH] fixed a bunch of errors in wallet node --- reflector/blocklist.go | 2 ++ wallet/network.go | 79 ++++++++++++++++++++++++++++-------------- 2 files changed, 55 insertions(+), 26 deletions(-) diff --git a/reflector/blocklist.go b/reflector/blocklist.go index 927fd78..2b06d43 100644 --- a/reflector/blocklist.go +++ b/reflector/blocklist.go @@ -21,6 +21,7 @@ import ( const blocklistURL = "https://api.lbry.io/file/list_blocked" func (s *Server) enableBlocklist(b store.Blocklister) { + // TODO: updateBlocklist should be killed when server is shutting down updateBlocklist(b) t := time.NewTicker(12 * time.Hour) for { @@ -93,6 +94,7 @@ func sdHashesForOutpoints(outpoints []string) (map[string]valOrErr, error) { values := make(map[string]valOrErr) node := wallet.NewNode() + defer node.Shutdown() err := node.Connect([]string{ "victor.lbry.tech:50001", //"lbryumx1.lbry.io:50001", // cant use real servers until victor pushes bugfix diff --git a/wallet/network.go b/wallet/network.go index 65cb299..66a85c7 100644 --- a/wallet/network.go +++ b/wallet/network.go @@ -5,14 +5,15 @@ package wallet import ( "crypto/tls" "encoding/json" - "errors" "fmt" "math/rand" "net" "sync" "time" + "github.com/lbryio/lbry.go/extras/errors" "github.com/lbryio/lbry.go/extras/stop" + log "github.com/sirupsen/logrus" "github.com/uber-go/atomic" ) @@ -23,18 +24,28 @@ const ( ) var ( - ErrNotImplemented = errors.New("not implemented") - ErrNodeConnected = errors.New("node already connected") - ErrConnectFailed = errors.New("failed to connect") + ErrNotImplemented = errors.Base("not implemented") + ErrNodeConnected = errors.Base("node already connected") + ErrConnectFailed = errors.Base("failed to connect") + ErrTimeout = errors.Base("timeout") ) +type responseError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +func (r responseError) Error() string { return fmt.Sprintf("%d: %s", r.Code, r.Message) } + +type resp struct { + data []byte + err error +} + type response struct { - Id uint32 `json:"id"` - Method string `json:"method"` - Error struct { - Code int `json:"code"` - Message string `json:"message"` - } `json:"error"` + Id uint32 `json:"id"` + Method string `json:"method"` + Error responseError `json:"error"` } type request struct { @@ -49,20 +60,23 @@ type Node struct { grp *stop.Group handlersMu *sync.RWMutex - handlers map[uint32]chan []byte + handlers map[uint32]chan resp pushHandlersMu *sync.RWMutex - pushHandlers map[string][]chan []byte + pushHandlers map[string][]chan resp + + timeout time.Duration } // NewNode creates a new node. func NewNode() *Node { return &Node{ - handlers: make(map[uint32]chan []byte), - pushHandlers: make(map[string][]chan []byte), + handlers: make(map[uint32]chan resp), + pushHandlers: make(map[string][]chan resp), handlersMu: &sync.RWMutex{}, pushHandlersMu: &sync.RWMutex{}, grp: stop.New(), + timeout: 1 * time.Second, } } @@ -151,10 +165,14 @@ func (n *Node) listen() { n.err(err) continue } + + r := resp{} if len(msg.Error.Message) > 0 { - n.err(fmt.Errorf("error from server: %#v", msg.Error.Message)) - continue + r.err = msg.Error + } else { + r.data = bytes } + if len(msg.Method) > 0 { n.pushHandlersMu.RLock() handlers := n.pushHandlers[msg.Method] @@ -162,7 +180,7 @@ func (n *Node) listen() { for _, handler := range handlers { select { - case handler <- bytes: + case handler <- r: default: } } @@ -172,7 +190,7 @@ func (n *Node) listen() { c, ok := n.handlers[msg.Id] n.handlersMu.RUnlock() if ok { - c <- bytes + c <- r } } } @@ -202,22 +220,31 @@ func (n *Node) request(method string, params []string, v interface{}) error { } bytes = append(bytes, delimiter) - err = n.transport.Send(bytes) - if err != nil { - return err - } - - c := make(chan []byte, 1) + c := make(chan resp, 1) n.handlersMu.Lock() n.handlers[msg.Id] = c n.handlersMu.Unlock() - resp := <-c + err = n.transport.Send(bytes) + if err != nil { + return err + } + + var r resp + select { + case r = <-c: + case <-time.After(n.timeout): + r = resp{err: errors.Err(ErrTimeout)} + } n.handlersMu.Lock() delete(n.handlers, msg.Id) n.handlersMu.Unlock() - return json.Unmarshal(resp, v) + if r.err != nil { + return r.err + } + + return json.Unmarshal(r.data, v) }