From cd236708b0e71d01c2d806fdcfbe8ca63a617193 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Fri, 31 Aug 2018 19:50:09 -0400 Subject: [PATCH] lbryumx client --- lbryumx/client.go | 43 +++++++++++ lbryumx/network.go | 186 +++++++++++++++++++++++++++++++++++++++++++++ lbryumx/tcp.go | 74 ++++++++++++++++++ 3 files changed, 303 insertions(+) create mode 100644 lbryumx/client.go create mode 100644 lbryumx/network.go create mode 100644 lbryumx/tcp.go diff --git a/lbryumx/client.go b/lbryumx/client.go new file mode 100644 index 0000000..80253a6 --- /dev/null +++ b/lbryumx/client.go @@ -0,0 +1,43 @@ +package lbryumx + +// ServerVersion returns the server's version. +// https://electrumx.readthedocs.io/en/latest/protocol-methods.html#server-version +func (n *Node) ServerVersion() (string, error) { + resp := &struct { + Result []string `json:"result"` + }{} + err := n.request("server.version", []string{"reflector.go", ProtocolVersion}, resp) + + var v string + if len(resp.Result) >= 2 { + v = resp.Result[1] + } + + return v, err +} + +type GetClaimsInTxResp struct { + Jsonrpc string `json:"jsonrpc"` + ID int `json:"id"` + Result []struct { + Name string `json:"name"` + ClaimID string `json:"claim_id"` + Txid string `json:"txid"` + Nout int `json:"nout"` + Amount int `json:"amount"` + Depth int `json:"depth"` + Height int `json:"height"` + Value string `json:"value"` + ClaimSequence int `json:"claim_sequence"` + Address string `json:"address"` + Supports []interface{} `json:"supports"` // TODO: finish me + EffectiveAmount int `json:"effective_amount"` + ValidAtHeight int `json:"valid_at_height"` + } `json:"result"` +} + +func (n *Node) GetClaimsInTx(txid string) (*GetClaimsInTxResp, error) { + var resp GetClaimsInTxResp + err := n.request("blockchain.claimtrie.getclaimsintx", []string{txid}, &resp) + return &resp, err +} diff --git a/lbryumx/network.go b/lbryumx/network.go new file mode 100644 index 0000000..1301004 --- /dev/null +++ b/lbryumx/network.go @@ -0,0 +1,186 @@ +package lbryumx + +// copied from https://github.com/d4l3k/go-electrum + +import ( + "crypto/tls" + "encoding/json" + "errors" + "fmt" + "log" + "sync" +) + +const ( + ClientVersion = "0.0.1" + ProtocolVersion = "1.0" +) + +var ( + ErrNotImplemented = errors.New("not implemented") + ErrNodeConnected = errors.New("node already connected") +) + +type Transport interface { + SendMessage([]byte) error + Responses() <-chan []byte + Errors() <-chan error +} + +type respMetadata struct { + Id int `json:"id"` + Method string `json:"method"` + Error struct { + Code int `json:"code"` + Message string `json:"message"` + } `json:"error"` +} + +type request struct { + Id int `json:"id"` + Method string `json:"method"` + Params []string `json:"params"` +} + +type Node struct { + Address string + + transport Transport + handlers map[int]chan []byte + handlersLock sync.RWMutex + + pushHandlers map[string][]chan []byte + pushHandlersLock sync.RWMutex + + nextId int +} + +// NewNode creates a new node. +func NewNode() *Node { + n := &Node{ + handlers: make(map[int]chan []byte), + pushHandlers: make(map[string][]chan []byte), + } + return n +} + +// ConnectTCP creates a new TCP connection to the specified address. +func (n *Node) ConnectTCP(addr string) error { + if n.transport != nil { + return ErrNodeConnected + } + n.Address = addr + transport, err := NewTCPTransport(addr) + if err != nil { + return err + } + n.transport = transport + go n.listen() + return nil +} + +// ConnectSLL creates a new SLL connection to the specified address. +func (n *Node) ConnectSSL(addr string, config *tls.Config) error { + if n.transport != nil { + return ErrNodeConnected + } + n.Address = addr + transport, err := NewSSLTransport(addr, config) + if err != nil { + return err + } + n.transport = transport + go n.listen() + return nil +} + +// err handles errors produced by the foreign node. +func (n *Node) err(err error) { + // TODO: Better error handling. + log.Fatal(err) +} + +// listen processes messages from the server. +func (n *Node) listen() { + for { + select { + case err := <-n.transport.Errors(): + n.err(err) + return + case bytes := <-n.transport.Responses(): + msg := &respMetadata{} + if err := json.Unmarshal(bytes, msg); err != nil { + n.err(err) + return + } + if len(msg.Error.Message) > 0 { + n.err(fmt.Errorf("error from server: %#v", msg.Error.Message)) + return + } + if len(msg.Method) > 0 { + n.pushHandlersLock.RLock() + handlers := n.pushHandlers[msg.Method] + n.pushHandlersLock.RUnlock() + + for _, handler := range handlers { + select { + case handler <- bytes: + default: + } + } + } + + n.handlersLock.RLock() + c, ok := n.handlers[msg.Id] + n.handlersLock.RUnlock() + + if ok { + c <- bytes + } + } + } +} + +// listenPush returns a channel of messages matching the method. +func (n *Node) listenPush(method string) <-chan []byte { + c := make(chan []byte, 1) + n.pushHandlersLock.Lock() + defer n.pushHandlersLock.Unlock() + n.pushHandlers[method] = append(n.pushHandlers[method], c) + return c +} + +// request makes a request to the server and unmarshals the response into v. +func (n *Node) request(method string, params []string, v interface{}) error { + msg := request{ + Id: n.nextId, + Method: method, + Params: params, + } + n.nextId++ + bytes, err := json.Marshal(msg) + if err != nil { + return err + } + bytes = append(bytes, delim) + if err := n.transport.SendMessage(bytes); err != nil { + return err + } + + c := make(chan []byte, 1) + + n.handlersLock.Lock() + n.handlers[msg.Id] = c + n.handlersLock.Unlock() + + resp := <-c + + n.handlersLock.Lock() + defer n.handlersLock.Unlock() + delete(n.handlers, msg.Id) + + if err := json.Unmarshal(resp, v); err != nil { + return nil + } + return nil +} diff --git a/lbryumx/tcp.go b/lbryumx/tcp.go new file mode 100644 index 0000000..8ab0c69 --- /dev/null +++ b/lbryumx/tcp.go @@ -0,0 +1,74 @@ +package lbryumx + +// copied from https://github.com/d4l3k/go-electrum + +import ( + "bufio" + "crypto/tls" + "log" + "net" +) + +type TCPTransport struct { + conn net.Conn + responses chan []byte + errors chan error +} + +func NewTCPTransport(addr string) (*TCPTransport, error) { + conn, err := net.Dial("tcp", addr) + if err != nil { + return nil, err + } + t := &TCPTransport{ + conn: conn, + responses: make(chan []byte), + errors: make(chan error), + } + go t.listen() + return t, nil +} + +func NewSSLTransport(addr string, config *tls.Config) (*TCPTransport, error) { + conn, err := tls.Dial("tcp", addr, config) + if err != nil { + return nil, err + } + t := &TCPTransport{ + conn: conn, + responses: make(chan []byte), + errors: make(chan error), + } + go t.listen() + return t, nil +} + +func (t *TCPTransport) SendMessage(body []byte) error { + log.Printf("%s <- %s", t.conn.RemoteAddr(), body) + _, err := t.conn.Write(body) + return err +} + +const delim = byte('\n') + +func (t *TCPTransport) listen() { + defer t.conn.Close() + reader := bufio.NewReader(t.conn) + for { + line, err := reader.ReadBytes(delim) + if err != nil { + t.errors <- err + log.Printf("error %s", err) + break + } + log.Printf("%s -> %s", t.conn.RemoteAddr(), line) + t.responses <- line + } +} + +func (t *TCPTransport) Responses() <-chan []byte { + return t.responses +} +func (t *TCPTransport) Errors() <-chan error { + return t.errors +}