fixed a bunch of errors in wallet node

This commit is contained in:
Alex Grintsvayg 2019-01-16 11:41:58 -05:00
parent d77f5d17f3
commit 6acb99c74d
2 changed files with 55 additions and 26 deletions

View file

@ -21,6 +21,7 @@ import (
const blocklistURL = "https://api.lbry.io/file/list_blocked" const blocklistURL = "https://api.lbry.io/file/list_blocked"
func (s *Server) enableBlocklist(b store.Blocklister) { func (s *Server) enableBlocklist(b store.Blocklister) {
// TODO: updateBlocklist should be killed when server is shutting down
updateBlocklist(b) updateBlocklist(b)
t := time.NewTicker(12 * time.Hour) t := time.NewTicker(12 * time.Hour)
for { for {
@ -93,6 +94,7 @@ func sdHashesForOutpoints(outpoints []string) (map[string]valOrErr, error) {
values := make(map[string]valOrErr) values := make(map[string]valOrErr)
node := wallet.NewNode() node := wallet.NewNode()
defer node.Shutdown()
err := node.Connect([]string{ err := node.Connect([]string{
"victor.lbry.tech:50001", "victor.lbry.tech:50001",
//"lbryumx1.lbry.io:50001", // cant use real servers until victor pushes bugfix //"lbryumx1.lbry.io:50001", // cant use real servers until victor pushes bugfix

View file

@ -5,14 +5,15 @@ package wallet
import ( import (
"crypto/tls" "crypto/tls"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"math/rand" "math/rand"
"net" "net"
"sync" "sync"
"time" "time"
"github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/lbry.go/extras/stop" "github.com/lbryio/lbry.go/extras/stop"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/uber-go/atomic" "github.com/uber-go/atomic"
) )
@ -23,18 +24,28 @@ const (
) )
var ( var (
ErrNotImplemented = errors.New("not implemented") ErrNotImplemented = errors.Base("not implemented")
ErrNodeConnected = errors.New("node already connected") ErrNodeConnected = errors.Base("node already connected")
ErrConnectFailed = errors.New("failed to connect") ErrConnectFailed = errors.Base("failed to connect")
ErrTimeout = errors.Base("timeout")
) )
type responseError struct {
Code int `json:"code"`
Message string `json:"message"`
}
func (r responseError) Error() string { return fmt.Sprintf("%d: %s", r.Code, r.Message) }
type resp struct {
data []byte
err error
}
type response struct { type response struct {
Id uint32 `json:"id"` Id uint32 `json:"id"`
Method string `json:"method"` Method string `json:"method"`
Error struct { Error responseError `json:"error"`
Code int `json:"code"`
Message string `json:"message"`
} `json:"error"`
} }
type request struct { type request struct {
@ -49,20 +60,23 @@ type Node struct {
grp *stop.Group grp *stop.Group
handlersMu *sync.RWMutex handlersMu *sync.RWMutex
handlers map[uint32]chan []byte handlers map[uint32]chan resp
pushHandlersMu *sync.RWMutex pushHandlersMu *sync.RWMutex
pushHandlers map[string][]chan []byte pushHandlers map[string][]chan resp
timeout time.Duration
} }
// NewNode creates a new node. // NewNode creates a new node.
func NewNode() *Node { func NewNode() *Node {
return &Node{ return &Node{
handlers: make(map[uint32]chan []byte), handlers: make(map[uint32]chan resp),
pushHandlers: make(map[string][]chan []byte), pushHandlers: make(map[string][]chan resp),
handlersMu: &sync.RWMutex{}, handlersMu: &sync.RWMutex{},
pushHandlersMu: &sync.RWMutex{}, pushHandlersMu: &sync.RWMutex{},
grp: stop.New(), grp: stop.New(),
timeout: 1 * time.Second,
} }
} }
@ -151,10 +165,14 @@ func (n *Node) listen() {
n.err(err) n.err(err)
continue continue
} }
r := resp{}
if len(msg.Error.Message) > 0 { if len(msg.Error.Message) > 0 {
n.err(fmt.Errorf("error from server: %#v", msg.Error.Message)) r.err = msg.Error
continue } else {
r.data = bytes
} }
if len(msg.Method) > 0 { if len(msg.Method) > 0 {
n.pushHandlersMu.RLock() n.pushHandlersMu.RLock()
handlers := n.pushHandlers[msg.Method] handlers := n.pushHandlers[msg.Method]
@ -162,7 +180,7 @@ func (n *Node) listen() {
for _, handler := range handlers { for _, handler := range handlers {
select { select {
case handler <- bytes: case handler <- r:
default: default:
} }
} }
@ -172,7 +190,7 @@ func (n *Node) listen() {
c, ok := n.handlers[msg.Id] c, ok := n.handlers[msg.Id]
n.handlersMu.RUnlock() n.handlersMu.RUnlock()
if ok { if ok {
c <- bytes c <- r
} }
} }
} }
@ -202,22 +220,31 @@ func (n *Node) request(method string, params []string, v interface{}) error {
} }
bytes = append(bytes, delimiter) bytes = append(bytes, delimiter)
err = n.transport.Send(bytes) c := make(chan resp, 1)
if err != nil {
return err
}
c := make(chan []byte, 1)
n.handlersMu.Lock() n.handlersMu.Lock()
n.handlers[msg.Id] = c n.handlers[msg.Id] = c
n.handlersMu.Unlock() n.handlersMu.Unlock()
resp := <-c err = n.transport.Send(bytes)
if err != nil {
return err
}
var r resp
select {
case r = <-c:
case <-time.After(n.timeout):
r = resp{err: errors.Err(ErrTimeout)}
}
n.handlersMu.Lock() n.handlersMu.Lock()
delete(n.handlers, msg.Id) delete(n.handlers, msg.Id)
n.handlersMu.Unlock() n.handlersMu.Unlock()
return json.Unmarshal(resp, v) if r.err != nil {
return r.err
}
return json.Unmarshal(r.data, v)
} }