subscribe headers with zmq

This commit is contained in:
Jeffrey Picard 2021-07-14 05:22:45 -04:00
parent e5225a5b58
commit c26f9ac257
8 changed files with 165 additions and 28 deletions

3
go.mod
View file

@ -5,10 +5,11 @@ go 1.16
require (
github.com/akamensky/argparse v1.2.2
github.com/btcsuite/btcutil v1.0.2
github.com/go-zeromq/zmq4 v0.13.0
github.com/golang/protobuf v1.5.2
github.com/lbryio/lbry.go/v2 v2.7.2-0.20210625145058-2b155597bf57
github.com/olivere/elastic/v7 v7.0.24
github.com/ybbus/jsonrpc/v2 v2.1.6 // indirect
github.com/ybbus/jsonrpc/v2 v2.1.6
golang.org/x/net v0.0.0-20210525063256-abc453219eb5 // indirect
golang.org/x/sys v0.0.0-20210525143221-35b2ab0089ea // indirect
golang.org/x/text v0.3.6

7
go.sum
View file

@ -40,6 +40,10 @@ github.com/go-errors/errors v1.1.1/go.mod h1:psDX2osz5VnTOnFWbDeWwS7yejl+uV3FEWE
github.com/go-ini/ini v1.48.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
github.com/go-ozzo/ozzo-validation v3.6.0+incompatible/go.mod h1:gsEKFIVnabGBt6mXmxK0MoFy+cZoTJY6mu5Ll3LVLBU=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-zeromq/goczmq/v4 v4.2.2 h1:HAJN+i+3NW55ijMJJhk7oWxHKXgAuSBkoFfvr8bYj4U=
github.com/go-zeromq/goczmq/v4 v4.2.2/go.mod h1:Sm/lxrfxP/Oxqs0tnHD6WAhwkWrx+S+1MRrKzcxoaYE=
github.com/go-zeromq/zmq4 v0.13.0 h1:XUWXLyeRsPsv4KlKMXnv/cEm//Vew2RLuNmDFQnZQXU=
github.com/go-zeromq/zmq4 v0.13.0/go.mod h1:TrFwdPHMSLG7Rhp8OVhQBkb4bSajfucWv8rwoEFIgSY=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
@ -104,6 +108,7 @@ github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W
github.com/onsi/ginkgo v1.10.2/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@ -170,6 +175,7 @@ golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAG
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@ -248,6 +254,7 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

23
main.go
View file

@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"io"
"log"
"net"
"os"
@ -66,6 +67,7 @@ func parseArgs(searchRequest *pb.SearchRequest, blockReq *pb.BlockRequest) *serv
searchCmd := parser.NewCommand("search", "claim search")
getblockCmd := parser.NewCommand("getblock", "get block")
getblockHeaderCmd := parser.NewCommand("getblockheader", "get block header")
subscribeHeaderCmd := parser.NewCommand("subscribeheader", "get block header")
host := parser.String("", "rpchost", &argparse.Options{Required: false, Help: "host", Default: defaultHost})
port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "port", Default: defaultPort})
@ -130,6 +132,9 @@ func parseArgs(searchRequest *pb.SearchRequest, blockReq *pb.BlockRequest) *serv
} else if getblockHeaderCmd.Happened() {
args.CmdType = server.GetblockHeaderCmd
blockReq.Verbose = true
} else if subscribeHeaderCmd.Happened() {
args.CmdType = server.SubscribeHeaderCmd
blockReq.Verbose = true
}
if *text != "" {
@ -168,6 +173,7 @@ func parseArgs(searchRequest *pb.SearchRequest, blockReq *pb.BlockRequest) *serv
}
func main() {
searchRequest := &pb.SearchRequest{}
blockReq := &pb.BlockRequest{}
@ -230,5 +236,22 @@ func main() {
log.Fatal(err)
}
log.Println(r)
} else if args.CmdType == server.SubscribeHeaderCmd {
ctx2, cancel2 := context.WithTimeout(context.Background(), time.Hour)
defer cancel2()
header, err := c.SubscribeHeaders(ctx2, blockReq)
if err != nil {
log.Fatal(err)
}
for {
x, err := header.Recv()
log.Println(x)
if err == io.EOF {
break
}
if err != nil {
log.Fatalln(err)
}
}
}
}

View file

@ -7,6 +7,7 @@ import "result.proto";
package pb;
service Hub {
rpc SubscribeHeaders (BlockRequest) returns (stream BlockHeaderOutput) {}
rpc Search (SearchRequest) returns (Outputs) {}
rpc GetBlock (BlockRequest) returns (BlockOutput) {}
rpc GetBlockHeader (BlockRequest) returns (BlockHeaderOutput) {}

View file

@ -1006,20 +1006,24 @@ var file_hub_proto_rawDesc = []byte{
0x74, 0x12, 0x1c, 0x0a, 0x09, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x68, 0x61, 0x73, 0x68, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x68, 0x61, 0x73, 0x68, 0x12,
0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x62, 0x6f, 0x73, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08,
0x52, 0x07, 0x76, 0x65, 0x72, 0x62, 0x6f, 0x73, 0x65, 0x32, 0x9f, 0x01, 0x0a, 0x03, 0x48, 0x75,
0x62, 0x12, 0x2a, 0x0a, 0x06, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x12, 0x11, 0x2e, 0x70, 0x62,
0x2e, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0b,
0x2e, 0x70, 0x62, 0x2e, 0x4f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x73, 0x22, 0x00, 0x12, 0x2f, 0x0a,
0x08, 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x12, 0x10, 0x2e, 0x70, 0x62, 0x2e, 0x42,
0x6c, 0x6f, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0f, 0x2e, 0x70, 0x62,
0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x4f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x22, 0x00, 0x12, 0x3b,
0x0a, 0x0e, 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72,
0x12, 0x10, 0x2e, 0x70, 0x62, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x1a, 0x15, 0x2e, 0x70, 0x62, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x48, 0x65, 0x61,
0x64, 0x65, 0x72, 0x4f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x22, 0x00, 0x42, 0x26, 0x5a, 0x24, 0x67,
0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x62, 0x72, 0x79, 0x69, 0x6f,
0x2f, 0x68, 0x75, 0x62, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x67, 0x6f,
0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x52, 0x07, 0x76, 0x65, 0x72, 0x62, 0x6f, 0x73, 0x65, 0x32, 0xe0, 0x01, 0x0a, 0x03, 0x48, 0x75,
0x62, 0x12, 0x3f, 0x0a, 0x10, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x48, 0x65,
0x61, 0x64, 0x65, 0x72, 0x73, 0x12, 0x10, 0x2e, 0x70, 0x62, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x70, 0x62, 0x2e, 0x42, 0x6c, 0x6f,
0x63, 0x6b, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x4f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x22, 0x00,
0x30, 0x01, 0x12, 0x2a, 0x0a, 0x06, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x12, 0x11, 0x2e, 0x70,
0x62, 0x2e, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
0x0b, 0x2e, 0x70, 0x62, 0x2e, 0x4f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x73, 0x22, 0x00, 0x12, 0x2f,
0x0a, 0x08, 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x12, 0x10, 0x2e, 0x70, 0x62, 0x2e,
0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0f, 0x2e, 0x70,
0x62, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x4f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x22, 0x00, 0x12,
0x3b, 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x48, 0x65, 0x61, 0x64, 0x65,
0x72, 0x12, 0x10, 0x2e, 0x70, 0x62, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x70, 0x62, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x48, 0x65,
0x61, 0x64, 0x65, 0x72, 0x4f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x22, 0x00, 0x42, 0x26, 0x5a, 0x24,
0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x62, 0x72, 0x79, 0x69,
0x6f, 0x2f, 0x68, 0x75, 0x62, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x67,
0x6f, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@ -1044,9 +1048,9 @@ var file_hub_proto_goTypes = []interface{}{
(*BlockRequest)(nil), // 4: pb.BlockRequest
(*wrapperspb.Int32Value)(nil), // 5: google.protobuf.Int32Value
(*wrapperspb.BoolValue)(nil), // 6: google.protobuf.BoolValue
(*Outputs)(nil), // 7: pb.Outputs
(*BlockOutput)(nil), // 8: pb.BlockOutput
(*BlockHeaderOutput)(nil), // 9: pb.BlockHeaderOutput
(*BlockHeaderOutput)(nil), // 7: pb.BlockHeaderOutput
(*Outputs)(nil), // 8: pb.Outputs
(*BlockOutput)(nil), // 9: pb.BlockOutput
}
var file_hub_proto_depIdxs = []int32{
0, // 0: pb.RangeField.op:type_name -> pb.RangeField.Op
@ -1084,14 +1088,16 @@ var file_hub_proto_depIdxs = []int32{
5, // 32: pb.SearchRequest.limit_claims_per_channel:type_name -> google.protobuf.Int32Value
6, // 33: pb.SearchRequest.remove_duplicates:type_name -> google.protobuf.BoolValue
6, // 34: pb.SearchRequest.no_totals:type_name -> google.protobuf.BoolValue
3, // 35: pb.Hub.Search:input_type -> pb.SearchRequest
4, // 36: pb.Hub.GetBlock:input_type -> pb.BlockRequest
4, // 37: pb.Hub.GetBlockHeader:input_type -> pb.BlockRequest
7, // 38: pb.Hub.Search:output_type -> pb.Outputs
8, // 39: pb.Hub.GetBlock:output_type -> pb.BlockOutput
9, // 40: pb.Hub.GetBlockHeader:output_type -> pb.BlockHeaderOutput
38, // [38:41] is the sub-list for method output_type
35, // [35:38] is the sub-list for method input_type
4, // 35: pb.Hub.SubscribeHeaders:input_type -> pb.BlockRequest
3, // 36: pb.Hub.Search:input_type -> pb.SearchRequest
4, // 37: pb.Hub.GetBlock:input_type -> pb.BlockRequest
4, // 38: pb.Hub.GetBlockHeader:input_type -> pb.BlockRequest
7, // 39: pb.Hub.SubscribeHeaders:output_type -> pb.BlockHeaderOutput
8, // 40: pb.Hub.Search:output_type -> pb.Outputs
9, // 41: pb.Hub.GetBlock:output_type -> pb.BlockOutput
7, // 42: pb.Hub.GetBlockHeader:output_type -> pb.BlockHeaderOutput
39, // [39:43] is the sub-list for method output_type
35, // [35:39] is the sub-list for method input_type
35, // [35:35] is the sub-list for extension type_name
35, // [35:35] is the sub-list for extension extendee
0, // [0:35] is the sub-list for field type_name

View file

@ -18,6 +18,7 @@ const _ = grpc.SupportPackageIsVersion7
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type HubClient interface {
SubscribeHeaders(ctx context.Context, in *BlockRequest, opts ...grpc.CallOption) (Hub_SubscribeHeadersClient, error)
Search(ctx context.Context, in *SearchRequest, opts ...grpc.CallOption) (*Outputs, error)
GetBlock(ctx context.Context, in *BlockRequest, opts ...grpc.CallOption) (*BlockOutput, error)
GetBlockHeader(ctx context.Context, in *BlockRequest, opts ...grpc.CallOption) (*BlockHeaderOutput, error)
@ -31,6 +32,38 @@ func NewHubClient(cc grpc.ClientConnInterface) HubClient {
return &hubClient{cc}
}
func (c *hubClient) SubscribeHeaders(ctx context.Context, in *BlockRequest, opts ...grpc.CallOption) (Hub_SubscribeHeadersClient, error) {
stream, err := c.cc.NewStream(ctx, &Hub_ServiceDesc.Streams[0], "/pb.Hub/SubscribeHeaders", opts...)
if err != nil {
return nil, err
}
x := &hubSubscribeHeadersClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Hub_SubscribeHeadersClient interface {
Recv() (*BlockHeaderOutput, error)
grpc.ClientStream
}
type hubSubscribeHeadersClient struct {
grpc.ClientStream
}
func (x *hubSubscribeHeadersClient) Recv() (*BlockHeaderOutput, error) {
m := new(BlockHeaderOutput)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *hubClient) Search(ctx context.Context, in *SearchRequest, opts ...grpc.CallOption) (*Outputs, error) {
out := new(Outputs)
err := c.cc.Invoke(ctx, "/pb.Hub/Search", in, out, opts...)
@ -62,6 +95,7 @@ func (c *hubClient) GetBlockHeader(ctx context.Context, in *BlockRequest, opts .
// All implementations must embed UnimplementedHubServer
// for forward compatibility
type HubServer interface {
SubscribeHeaders(*BlockRequest, Hub_SubscribeHeadersServer) error
Search(context.Context, *SearchRequest) (*Outputs, error)
GetBlock(context.Context, *BlockRequest) (*BlockOutput, error)
GetBlockHeader(context.Context, *BlockRequest) (*BlockHeaderOutput, error)
@ -72,6 +106,9 @@ type HubServer interface {
type UnimplementedHubServer struct {
}
func (UnimplementedHubServer) SubscribeHeaders(*BlockRequest, Hub_SubscribeHeadersServer) error {
return status.Errorf(codes.Unimplemented, "method SubscribeHeaders not implemented")
}
func (UnimplementedHubServer) Search(context.Context, *SearchRequest) (*Outputs, error) {
return nil, status.Errorf(codes.Unimplemented, "method Search not implemented")
}
@ -94,6 +131,27 @@ func RegisterHubServer(s grpc.ServiceRegistrar, srv HubServer) {
s.RegisterService(&Hub_ServiceDesc, srv)
}
func _Hub_SubscribeHeaders_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(BlockRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(HubServer).SubscribeHeaders(m, &hubSubscribeHeadersServer{stream})
}
type Hub_SubscribeHeadersServer interface {
Send(*BlockHeaderOutput) error
grpc.ServerStream
}
type hubSubscribeHeadersServer struct {
grpc.ServerStream
}
func (x *hubSubscribeHeadersServer) Send(m *BlockHeaderOutput) error {
return x.ServerStream.SendMsg(m)
}
func _Hub_Search_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(SearchRequest)
if err := dec(in); err != nil {
@ -168,6 +226,12 @@ var Hub_ServiceDesc = grpc.ServiceDesc{
Handler: _Hub_GetBlockHeader_Handler,
},
},
Streams: []grpc.StreamDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "SubscribeHeaders",
Handler: _Hub_SubscribeHeaders_Handler,
ServerStreams: true,
},
},
Metadata: "hub.proto",
}

View file

@ -3,13 +3,47 @@ package server
import (
"context"
"encoding/base64"
"encoding/binary"
"encoding/hex"
pb "github.com/lbryio/hub/protobuf/go"
"log"
zmq "github.com/go-zeromq/zmq4"
//"net/rpc/jsonrpc"
"github.com/ybbus/jsonrpc/v2"
)
func (s *Server) SubscribeHeaders(request *pb.BlockRequest, stream pb.Hub_SubscribeHeadersServer) error {
//log.SetPrefix("psenvsub: ")
// Prepare our subscriber
log.Println("asdf")
sub := zmq.NewSub(context.Background())
defer sub.Close()
err := sub.Dial("tcp://localhost:28333")
if err != nil {
log.Fatalf("could not dial: %v", err)
}
err = sub.SetOption(zmq.OptionSubscribe, "hashblockheader")
//err = sub.SetOption(zmq.OptionSubscribe, "hashblock")
if err != nil {
log.Fatalf("could not subscribe: %v", err)
}
for {
// Read envelope
msg, err := sub.Recv()
if err != nil {
log.Fatalf("could not receive message: %v", err)
}
hash := hex.EncodeToString(msg.Frames[1][0:32])
var height uint32 = binary.LittleEndian.Uint32(msg.Frames[1][32:])
log.Printf("[%s] %s\n", msg.Frames[0], hash)
stream.Send(&pb.BlockHeaderOutput{Hash: hash, Height: int64(height)})
}
}
func (s *Server) GetBlock(ctx context.Context, blockReq *pb.BlockRequest) (*pb.BlockOutput, error) {
log.Println("In GetBlock")
@ -47,7 +81,7 @@ func (s *Server) GetBlockHeader(ctx context.Context, blockReq *pb.BlockRequest)
log.Println("Making call ...")
var r pb.BlockHeaderOutput
res, err := rpcClient.Call("getblock", blockReq.Blockhash)
res, err := rpcClient.Call("getblockheader", blockReq.Blockhash)
if err != nil {
log.Println(err)
return &pb.BlockHeaderOutput{Hash: "", Confirmations: 0}, err

View file

@ -23,6 +23,7 @@ const (
SearchCmd = iota
GetblockCmd = iota
GetblockHeaderCmd = iota
SubscribeHeaderCmd = iota
)
type Args struct {