herald.go/server/server.go
Jeffrey Picard 395e1db489 UDPServer / ip address resolution
Got the UDPServer ping/pong protocol working internally, only tested
against other running go hub servers. Should in theory work with
python server and clients, but still need to test that.

Also switched to serving udp on the same port as grpc, and taking that
into account when pinging other hubs with udp.

Unit test for udp ip address lookup.
2021-12-06 11:26:29 -05:00

311 lines
8.3 KiB
Go

package server
import (
"context"
"crypto/sha256"
"fmt"
"hash"
"log"
"net"
"net/http"
"os"
"regexp"
"sync"
"time"
"github.com/ReneKroon/ttlcache/v2"
"github.com/lbryio/hub/internal/metrics"
"github.com/lbryio/hub/meta"
pb "github.com/lbryio/hub/protobuf/go"
"github.com/olivere/elastic/v7"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
type Server struct {
GrpcServer *grpc.Server
Args *Args
MultiSpaceRe *regexp.Regexp
WeirdCharsRe *regexp.Regexp
EsClient *elastic.Client
QueryCache *ttlcache.Cache
S256 *hash.Hash
LastRefreshCheck time.Time
RefreshDelta time.Duration
NumESRefreshes int64
PeerServers map[string]*FederatedServer
PeerServersMut sync.RWMutex
NumPeerServers *int64
PeerSubs map[string]*FederatedServer
PeerSubsMut sync.RWMutex
NumPeerSubs *int64
ExternalIP string
pb.UnimplementedHubServer
}
func getVersion() string {
return meta.Version
}
/*
'blockchain.block.get_chunk'
'blockchain.block.get_header'
'blockchain.estimatefee'
'blockchain.relayfee'
'blockchain.scripthash.get_balance'
'blockchain.scripthash.get_history'
'blockchain.scripthash.get_mempool'
'blockchain.scripthash.listunspent'
'blockchain.scripthash.subscribe'
'blockchain.transaction.broadcast'
'blockchain.transaction.get'
'blockchain.transaction.get_batch'
'blockchain.transaction.info'
'blockchain.transaction.get_merkle'
'server.add_peer'
'server.banner'
'server.payment_address'
'server.donation_address'
'server.features'
'server.peers.subscribe'
'server.version'
'blockchain.transaction.get_height'
'blockchain.claimtrie.search'
'blockchain.claimtrie.resolve'
'blockchain.claimtrie.getclaimsbyids'
'blockchain.block.get_server_height'
'mempool.get_fee_histogram'
'blockchain.block.headers'
'server.ping'
'blockchain.headers.subscribe'
'blockchain.address.get_balance'
'blockchain.address.get_history'
'blockchain.address.get_mempool'
'blockchain.address.listunspent'
'blockchain.address.subscribe'
'blockchain.address.unsubscribe'
*/
func (s *Server) PeerSubsLoadOrStore(peer *FederatedServer) (actual *FederatedServer, loaded bool) {
key := peer.peerKey()
s.PeerSubsMut.RLock()
if actual, ok := s.PeerSubs[key]; ok {
s.PeerSubsMut.RUnlock()
return actual, true
} else {
s.PeerSubsMut.RUnlock()
s.PeerSubsMut.Lock()
s.PeerSubs[key] = peer
s.PeerSubsMut.Unlock()
return peer, false
}
}
func (s *Server) PeerServersLoadOrStore(peer *FederatedServer) (actual *FederatedServer, loaded bool) {
key := peer.peerKey()
s.PeerServersMut.RLock()
if actual, ok := s.PeerServers[key]; ok {
s.PeerServersMut.RUnlock()
return actual, true
} else {
s.PeerServersMut.RUnlock()
s.PeerServersMut.Lock()
s.PeerServers[key] = peer
s.PeerServersMut.Unlock()
return peer, false
}
}
func (s *Server) Run() {
l, err := net.Listen("tcp", ":"+s.Args.Port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
pb.RegisterHubServer(s.GrpcServer, s)
reflection.Register(s.GrpcServer)
log.Printf("listening on %s\n", l.Addr().String())
log.Println(s.Args)
if err := s.GrpcServer.Serve(l); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
// MakeHubServer takes the arguments given to a hub when it's started and
// initializes everything. It loads information about previously known peers,
// creates needed internal data structures, and initializes goroutines.
func MakeHubServer(ctx context.Context, args *Args) *Server {
grpcServer := grpc.NewServer(grpc.NumStreamWorkers(10))
multiSpaceRe, err := regexp.Compile(`\s{2,}`)
if err != nil {
log.Fatal(err)
}
weirdCharsRe, err := regexp.Compile("[#!~]")
if err != nil {
log.Fatal(err)
}
var client *elastic.Client = nil
if !args.DisableEs {
esUrl := args.EsHost + ":" + args.EsPort
opts := []elastic.ClientOptionFunc{
elastic.SetSniff(true),
elastic.SetSnifferTimeoutStartup(time.Second * 60),
elastic.SetSnifferTimeout(time.Second * 60),
elastic.SetURL(esUrl),
}
if args.Debug {
opts = append(opts, elastic.SetTraceLog(log.New(os.Stderr, "[[ELASTIC]]", 0)))
}
client, err = elastic.NewClient(opts...)
if err != nil {
log.Fatal(err)
}
}
cache := ttlcache.NewCache()
err = cache.SetTTL(time.Duration(args.CacheTTL) * time.Minute)
if err != nil {
log.Fatal(err)
}
s256 := sha256.New()
var refreshDelta = time.Second * time.Duration(args.RefreshDelta)
if args.Debug {
refreshDelta = time.Second * 0
}
numPeers := new(int64)
*numPeers = 0
numSubs := new(int64)
*numSubs = 0
s := &Server{
GrpcServer: grpcServer,
Args: args,
MultiSpaceRe: multiSpaceRe,
WeirdCharsRe: weirdCharsRe,
EsClient: client,
QueryCache: cache,
S256: &s256,
LastRefreshCheck: time.Now(),
RefreshDelta: refreshDelta,
NumESRefreshes: 0,
PeerServers: make(map[string]*FederatedServer),
PeerServersMut: sync.RWMutex{},
NumPeerServers: numPeers,
PeerSubs: make(map[string]*FederatedServer),
PeerSubsMut: sync.RWMutex{},
NumPeerSubs: numSubs,
ExternalIP: "",
}
// Start up our background services
if args.StartPrometheus {
go s.prometheusEndpoint(s.Args.PrometheusPort, "metrics")
}
if args.StartUDP {
go func() {
err := UDPServer(args)
if err != nil {
log.Println("UDP Server failed!", err)
}
}()
}
// Load peers from disk and subscribe to one if there are any
if args.LoadPeers {
go func() {
err := s.loadPeers()
if err != nil {
log.Println(err)
}
}()
}
return s
}
// prometheusEndpoint is a goroutine which start up a prometheus endpoint
// for this hub to allow for metric tracking.
func (s *Server) prometheusEndpoint(port string, endpoint string) {
http.Handle("/"+endpoint, promhttp.Handler())
log.Println(fmt.Sprintf("listening on :%s /%s", port, endpoint))
err := http.ListenAndServe(":"+port, nil)
log.Fatalln("Shouldn't happen??!?!", err)
}
// Hello is a grpc endpoint to allow another hub to tell us about itself.
// The passed message includes information about the other hub, and all
// of its peers which are added to the knowledge of this hub.
func (s *Server) Hello(ctx context.Context, args *pb.HelloMessage) (*pb.HelloMessage, error) {
metrics.RequestsCount.With(prometheus.Labels{"method": "hello"}).Inc()
port := args.Port
host := args.Host
server := &FederatedServer{
Address: host,
Port: port,
Ts: time.Now(),
}
log.Println(server)
err := s.addPeer(&pb.ServerMessage{Address: host, Port: port}, false, true)
// They just contacted us, so this shouldn't happen
if err != nil {
log.Println(err)
}
s.mergeFederatedServers(args.Servers)
s.writePeers()
s.notifyPeerSubs(server)
return s.makeHelloMessage(), nil
}
// PeerSubscribe adds a peer hub to the list of subscribers to update about
// new peers.
func (s *Server) PeerSubscribe(ctx context.Context, in *pb.ServerMessage) (*pb.StringValue, error) {
metrics.RequestsCount.With(prometheus.Labels{"method": "peer_subscribe"}).Inc()
var msg = "Success"
peer := &FederatedServer{
Address: in.Address,
Port: in.Port,
Ts: time.Now(),
}
if _, loaded := s.PeerSubsLoadOrStore(peer); !loaded {
s.incNumSubs()
metrics.PeersSubscribed.Inc()
} else {
msg = "Already subscribed"
}
return &pb.StringValue{Value: msg}, nil
}
// AddPeer is a grpc endpoint to tell this hub about another hub in the network.
func (s *Server) AddPeer(ctx context.Context, args *pb.ServerMessage) (*pb.StringValue, error) {
metrics.RequestsCount.With(prometheus.Labels{"method": "add_peer"}).Inc()
var msg = "Success"
err := s.addPeer(args, true, true)
if err != nil {
log.Println(err)
msg = "Failed"
}
return &pb.StringValue{Value: msg}, err
}
// Ping is a grpc endpoint that returns a short message.
func (s *Server) Ping(ctx context.Context, args *pb.EmptyMessage) (*pb.StringValue, error) {
metrics.RequestsCount.With(prometheus.Labels{"method": "ping"}).Inc()
return &pb.StringValue{Value: "Hello, world!"}, nil
}
// Version is a grpc endpoint to get this hub's version.
func (s *Server) Version(ctx context.Context, args *pb.EmptyMessage) (*pb.StringValue, error) {
metrics.RequestsCount.With(prometheus.Labels{"method": "version"}).Inc()
return &pb.StringValue{Value: getVersion()}, nil
}