From c26f9ac257a48cefe1676bd20b77a7fe8f36b393 Mon Sep 17 00:00:00 2001 From: Jeffrey Picard Date: Wed, 14 Jul 2021 05:22:45 -0400 Subject: [PATCH] subscribe headers with zmq --- go.mod | 3 +- go.sum | 7 ++++ main.go | 23 ++++++++++++ protobuf/definitions/hub.proto | 1 + protobuf/go/hub.pb.go | 56 ++++++++++++++++------------- protobuf/go/hub_grpc.pb.go | 66 +++++++++++++++++++++++++++++++++- server/block.go | 36 ++++++++++++++++++- server/server.go | 1 + 8 files changed, 165 insertions(+), 28 deletions(-) diff --git a/go.mod b/go.mod index 9cd9a68..f00b890 100644 --- a/go.mod +++ b/go.mod @@ -5,10 +5,11 @@ go 1.16 require ( github.com/akamensky/argparse v1.2.2 github.com/btcsuite/btcutil v1.0.2 + github.com/go-zeromq/zmq4 v0.13.0 github.com/golang/protobuf v1.5.2 github.com/lbryio/lbry.go/v2 v2.7.2-0.20210625145058-2b155597bf57 github.com/olivere/elastic/v7 v7.0.24 - github.com/ybbus/jsonrpc/v2 v2.1.6 // indirect + github.com/ybbus/jsonrpc/v2 v2.1.6 golang.org/x/net v0.0.0-20210525063256-abc453219eb5 // indirect golang.org/x/sys v0.0.0-20210525143221-35b2ab0089ea // indirect golang.org/x/text v0.3.6 diff --git a/go.sum b/go.sum index 6fffc53..7f57558 100644 --- a/go.sum +++ b/go.sum @@ -40,6 +40,10 @@ github.com/go-errors/errors v1.1.1/go.mod h1:psDX2osz5VnTOnFWbDeWwS7yejl+uV3FEWE github.com/go-ini/ini v1.48.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/go-ozzo/ozzo-validation v3.6.0+incompatible/go.mod h1:gsEKFIVnabGBt6mXmxK0MoFy+cZoTJY6mu5Ll3LVLBU= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/go-zeromq/goczmq/v4 v4.2.2 h1:HAJN+i+3NW55ijMJJhk7oWxHKXgAuSBkoFfvr8bYj4U= +github.com/go-zeromq/goczmq/v4 v4.2.2/go.mod h1:Sm/lxrfxP/Oxqs0tnHD6WAhwkWrx+S+1MRrKzcxoaYE= +github.com/go-zeromq/zmq4 v0.13.0 h1:XUWXLyeRsPsv4KlKMXnv/cEm//Vew2RLuNmDFQnZQXU= +github.com/go-zeromq/zmq4 v0.13.0/go.mod h1:TrFwdPHMSLG7Rhp8OVhQBkb4bSajfucWv8rwoEFIgSY= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= @@ -104,6 +108,7 @@ github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W github.com/onsi/ginkgo v1.10.2/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -170,6 +175,7 @@ golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAG golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -248,6 +254,7 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go index 8e3d76e..e3f99b2 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "io" "log" "net" "os" @@ -66,6 +67,7 @@ func parseArgs(searchRequest *pb.SearchRequest, blockReq *pb.BlockRequest) *serv searchCmd := parser.NewCommand("search", "claim search") getblockCmd := parser.NewCommand("getblock", "get block") getblockHeaderCmd := parser.NewCommand("getblockheader", "get block header") + subscribeHeaderCmd := parser.NewCommand("subscribeheader", "get block header") host := parser.String("", "rpchost", &argparse.Options{Required: false, Help: "host", Default: defaultHost}) port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "port", Default: defaultPort}) @@ -130,6 +132,9 @@ func parseArgs(searchRequest *pb.SearchRequest, blockReq *pb.BlockRequest) *serv } else if getblockHeaderCmd.Happened() { args.CmdType = server.GetblockHeaderCmd blockReq.Verbose = true + } else if subscribeHeaderCmd.Happened() { + args.CmdType = server.SubscribeHeaderCmd + blockReq.Verbose = true } if *text != "" { @@ -168,6 +173,7 @@ func parseArgs(searchRequest *pb.SearchRequest, blockReq *pb.BlockRequest) *serv } func main() { + searchRequest := &pb.SearchRequest{} blockReq := &pb.BlockRequest{} @@ -230,5 +236,22 @@ func main() { log.Fatal(err) } log.Println(r) + } else if args.CmdType == server.SubscribeHeaderCmd { + ctx2, cancel2 := context.WithTimeout(context.Background(), time.Hour) + defer cancel2() + header, err := c.SubscribeHeaders(ctx2, blockReq) + if err != nil { + log.Fatal(err) + } + for { + x, err := header.Recv() + log.Println(x) + if err == io.EOF { + break + } + if err != nil { + log.Fatalln(err) + } + } } } diff --git a/protobuf/definitions/hub.proto b/protobuf/definitions/hub.proto index 8c187c9..8b0a8a2 100644 --- a/protobuf/definitions/hub.proto +++ b/protobuf/definitions/hub.proto @@ -7,6 +7,7 @@ import "result.proto"; package pb; service Hub { + rpc SubscribeHeaders (BlockRequest) returns (stream BlockHeaderOutput) {} rpc Search (SearchRequest) returns (Outputs) {} rpc GetBlock (BlockRequest) returns (BlockOutput) {} rpc GetBlockHeader (BlockRequest) returns (BlockHeaderOutput) {} diff --git a/protobuf/go/hub.pb.go b/protobuf/go/hub.pb.go index deb11e0..10cc63d 100644 --- a/protobuf/go/hub.pb.go +++ b/protobuf/go/hub.pb.go @@ -1006,20 +1006,24 @@ var file_hub_proto_rawDesc = []byte{ 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x68, 0x61, 0x73, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x68, 0x61, 0x73, 0x68, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x62, 0x6f, 0x73, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, - 0x52, 0x07, 0x76, 0x65, 0x72, 0x62, 0x6f, 0x73, 0x65, 0x32, 0x9f, 0x01, 0x0a, 0x03, 0x48, 0x75, - 0x62, 0x12, 0x2a, 0x0a, 0x06, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x12, 0x11, 0x2e, 0x70, 0x62, - 0x2e, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0b, - 0x2e, 0x70, 0x62, 0x2e, 0x4f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x73, 0x22, 0x00, 0x12, 0x2f, 0x0a, - 0x08, 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x12, 0x10, 0x2e, 0x70, 0x62, 0x2e, 0x42, - 0x6c, 0x6f, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0f, 0x2e, 0x70, 0x62, - 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x4f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x22, 0x00, 0x12, 0x3b, - 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, - 0x12, 0x10, 0x2e, 0x70, 0x62, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x70, 0x62, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x48, 0x65, 0x61, - 0x64, 0x65, 0x72, 0x4f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x22, 0x00, 0x42, 0x26, 0x5a, 0x24, 0x67, - 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x62, 0x72, 0x79, 0x69, 0x6f, - 0x2f, 0x68, 0x75, 0x62, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x67, 0x6f, - 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x52, 0x07, 0x76, 0x65, 0x72, 0x62, 0x6f, 0x73, 0x65, 0x32, 0xe0, 0x01, 0x0a, 0x03, 0x48, 0x75, + 0x62, 0x12, 0x3f, 0x0a, 0x10, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x48, 0x65, + 0x61, 0x64, 0x65, 0x72, 0x73, 0x12, 0x10, 0x2e, 0x70, 0x62, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x70, 0x62, 0x2e, 0x42, 0x6c, 0x6f, + 0x63, 0x6b, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x4f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x22, 0x00, + 0x30, 0x01, 0x12, 0x2a, 0x0a, 0x06, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x12, 0x11, 0x2e, 0x70, + 0x62, 0x2e, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x0b, 0x2e, 0x70, 0x62, 0x2e, 0x4f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x73, 0x22, 0x00, 0x12, 0x2f, + 0x0a, 0x08, 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x12, 0x10, 0x2e, 0x70, 0x62, 0x2e, + 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0f, 0x2e, 0x70, + 0x62, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x4f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x22, 0x00, 0x12, + 0x3b, 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x48, 0x65, 0x61, 0x64, 0x65, + 0x72, 0x12, 0x10, 0x2e, 0x70, 0x62, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x70, 0x62, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x48, 0x65, + 0x61, 0x64, 0x65, 0x72, 0x4f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x22, 0x00, 0x42, 0x26, 0x5a, 0x24, + 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x62, 0x72, 0x79, 0x69, + 0x6f, 0x2f, 0x68, 0x75, 0x62, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x67, + 0x6f, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1044,9 +1048,9 @@ var file_hub_proto_goTypes = []interface{}{ (*BlockRequest)(nil), // 4: pb.BlockRequest (*wrapperspb.Int32Value)(nil), // 5: google.protobuf.Int32Value (*wrapperspb.BoolValue)(nil), // 6: google.protobuf.BoolValue - (*Outputs)(nil), // 7: pb.Outputs - (*BlockOutput)(nil), // 8: pb.BlockOutput - (*BlockHeaderOutput)(nil), // 9: pb.BlockHeaderOutput + (*BlockHeaderOutput)(nil), // 7: pb.BlockHeaderOutput + (*Outputs)(nil), // 8: pb.Outputs + (*BlockOutput)(nil), // 9: pb.BlockOutput } var file_hub_proto_depIdxs = []int32{ 0, // 0: pb.RangeField.op:type_name -> pb.RangeField.Op @@ -1084,14 +1088,16 @@ var file_hub_proto_depIdxs = []int32{ 5, // 32: pb.SearchRequest.limit_claims_per_channel:type_name -> google.protobuf.Int32Value 6, // 33: pb.SearchRequest.remove_duplicates:type_name -> google.protobuf.BoolValue 6, // 34: pb.SearchRequest.no_totals:type_name -> google.protobuf.BoolValue - 3, // 35: pb.Hub.Search:input_type -> pb.SearchRequest - 4, // 36: pb.Hub.GetBlock:input_type -> pb.BlockRequest - 4, // 37: pb.Hub.GetBlockHeader:input_type -> pb.BlockRequest - 7, // 38: pb.Hub.Search:output_type -> pb.Outputs - 8, // 39: pb.Hub.GetBlock:output_type -> pb.BlockOutput - 9, // 40: pb.Hub.GetBlockHeader:output_type -> pb.BlockHeaderOutput - 38, // [38:41] is the sub-list for method output_type - 35, // [35:38] is the sub-list for method input_type + 4, // 35: pb.Hub.SubscribeHeaders:input_type -> pb.BlockRequest + 3, // 36: pb.Hub.Search:input_type -> pb.SearchRequest + 4, // 37: pb.Hub.GetBlock:input_type -> pb.BlockRequest + 4, // 38: pb.Hub.GetBlockHeader:input_type -> pb.BlockRequest + 7, // 39: pb.Hub.SubscribeHeaders:output_type -> pb.BlockHeaderOutput + 8, // 40: pb.Hub.Search:output_type -> pb.Outputs + 9, // 41: pb.Hub.GetBlock:output_type -> pb.BlockOutput + 7, // 42: pb.Hub.GetBlockHeader:output_type -> pb.BlockHeaderOutput + 39, // [39:43] is the sub-list for method output_type + 35, // [35:39] is the sub-list for method input_type 35, // [35:35] is the sub-list for extension type_name 35, // [35:35] is the sub-list for extension extendee 0, // [0:35] is the sub-list for field type_name diff --git a/protobuf/go/hub_grpc.pb.go b/protobuf/go/hub_grpc.pb.go index 39430f1..7456a66 100644 --- a/protobuf/go/hub_grpc.pb.go +++ b/protobuf/go/hub_grpc.pb.go @@ -18,6 +18,7 @@ const _ = grpc.SupportPackageIsVersion7 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type HubClient interface { + SubscribeHeaders(ctx context.Context, in *BlockRequest, opts ...grpc.CallOption) (Hub_SubscribeHeadersClient, error) Search(ctx context.Context, in *SearchRequest, opts ...grpc.CallOption) (*Outputs, error) GetBlock(ctx context.Context, in *BlockRequest, opts ...grpc.CallOption) (*BlockOutput, error) GetBlockHeader(ctx context.Context, in *BlockRequest, opts ...grpc.CallOption) (*BlockHeaderOutput, error) @@ -31,6 +32,38 @@ func NewHubClient(cc grpc.ClientConnInterface) HubClient { return &hubClient{cc} } +func (c *hubClient) SubscribeHeaders(ctx context.Context, in *BlockRequest, opts ...grpc.CallOption) (Hub_SubscribeHeadersClient, error) { + stream, err := c.cc.NewStream(ctx, &Hub_ServiceDesc.Streams[0], "/pb.Hub/SubscribeHeaders", opts...) + if err != nil { + return nil, err + } + x := &hubSubscribeHeadersClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Hub_SubscribeHeadersClient interface { + Recv() (*BlockHeaderOutput, error) + grpc.ClientStream +} + +type hubSubscribeHeadersClient struct { + grpc.ClientStream +} + +func (x *hubSubscribeHeadersClient) Recv() (*BlockHeaderOutput, error) { + m := new(BlockHeaderOutput) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + func (c *hubClient) Search(ctx context.Context, in *SearchRequest, opts ...grpc.CallOption) (*Outputs, error) { out := new(Outputs) err := c.cc.Invoke(ctx, "/pb.Hub/Search", in, out, opts...) @@ -62,6 +95,7 @@ func (c *hubClient) GetBlockHeader(ctx context.Context, in *BlockRequest, opts . // All implementations must embed UnimplementedHubServer // for forward compatibility type HubServer interface { + SubscribeHeaders(*BlockRequest, Hub_SubscribeHeadersServer) error Search(context.Context, *SearchRequest) (*Outputs, error) GetBlock(context.Context, *BlockRequest) (*BlockOutput, error) GetBlockHeader(context.Context, *BlockRequest) (*BlockHeaderOutput, error) @@ -72,6 +106,9 @@ type HubServer interface { type UnimplementedHubServer struct { } +func (UnimplementedHubServer) SubscribeHeaders(*BlockRequest, Hub_SubscribeHeadersServer) error { + return status.Errorf(codes.Unimplemented, "method SubscribeHeaders not implemented") +} func (UnimplementedHubServer) Search(context.Context, *SearchRequest) (*Outputs, error) { return nil, status.Errorf(codes.Unimplemented, "method Search not implemented") } @@ -94,6 +131,27 @@ func RegisterHubServer(s grpc.ServiceRegistrar, srv HubServer) { s.RegisterService(&Hub_ServiceDesc, srv) } +func _Hub_SubscribeHeaders_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(BlockRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(HubServer).SubscribeHeaders(m, &hubSubscribeHeadersServer{stream}) +} + +type Hub_SubscribeHeadersServer interface { + Send(*BlockHeaderOutput) error + grpc.ServerStream +} + +type hubSubscribeHeadersServer struct { + grpc.ServerStream +} + +func (x *hubSubscribeHeadersServer) Send(m *BlockHeaderOutput) error { + return x.ServerStream.SendMsg(m) +} + func _Hub_Search_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(SearchRequest) if err := dec(in); err != nil { @@ -168,6 +226,12 @@ var Hub_ServiceDesc = grpc.ServiceDesc{ Handler: _Hub_GetBlockHeader_Handler, }, }, - Streams: []grpc.StreamDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "SubscribeHeaders", + Handler: _Hub_SubscribeHeaders_Handler, + ServerStreams: true, + }, + }, Metadata: "hub.proto", } diff --git a/server/block.go b/server/block.go index 127962f..f078da6 100644 --- a/server/block.go +++ b/server/block.go @@ -3,13 +3,47 @@ package server import ( "context" "encoding/base64" + "encoding/binary" + "encoding/hex" pb "github.com/lbryio/hub/protobuf/go" "log" + zmq "github.com/go-zeromq/zmq4" //"net/rpc/jsonrpc" "github.com/ybbus/jsonrpc/v2" ) +func (s *Server) SubscribeHeaders(request *pb.BlockRequest, stream pb.Hub_SubscribeHeadersServer) error { + //log.SetPrefix("psenvsub: ") + + // Prepare our subscriber + log.Println("asdf") + sub := zmq.NewSub(context.Background()) + defer sub.Close() + + err := sub.Dial("tcp://localhost:28333") + if err != nil { + log.Fatalf("could not dial: %v", err) + } + + err = sub.SetOption(zmq.OptionSubscribe, "hashblockheader") + //err = sub.SetOption(zmq.OptionSubscribe, "hashblock") + if err != nil { + log.Fatalf("could not subscribe: %v", err) + } + + for { + // Read envelope + msg, err := sub.Recv() + if err != nil { + log.Fatalf("could not receive message: %v", err) + } + hash := hex.EncodeToString(msg.Frames[1][0:32]) + var height uint32 = binary.LittleEndian.Uint32(msg.Frames[1][32:]) + log.Printf("[%s] %s\n", msg.Frames[0], hash) + stream.Send(&pb.BlockHeaderOutput{Hash: hash, Height: int64(height)}) + } +} func (s *Server) GetBlock(ctx context.Context, blockReq *pb.BlockRequest) (*pb.BlockOutput, error) { log.Println("In GetBlock") @@ -47,7 +81,7 @@ func (s *Server) GetBlockHeader(ctx context.Context, blockReq *pb.BlockRequest) log.Println("Making call ...") var r pb.BlockHeaderOutput - res, err := rpcClient.Call("getblock", blockReq.Blockhash) + res, err := rpcClient.Call("getblockheader", blockReq.Blockhash) if err != nil { log.Println(err) return &pb.BlockHeaderOutput{Hash: "", Confirmations: 0}, err diff --git a/server/server.go b/server/server.go index 6088c51..69ed160 100644 --- a/server/server.go +++ b/server/server.go @@ -23,6 +23,7 @@ const ( SearchCmd = iota GetblockCmd = iota GetblockHeaderCmd = iota + SubscribeHeaderCmd = iota ) type Args struct {