diff --git a/main.go b/main.go index 204797f..370d765 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "github.com/lbryio/hub/util" "log" "net" "os" @@ -166,6 +167,6 @@ func main() { log.Printf("found %d results\n", r.GetTotal()) for _, t := range r.Txos { - fmt.Printf("%s:%d\n", server.FromHash(t.TxHash), t.Nout) + fmt.Printf("%s:%d\n", util.FromHash(t.TxHash), t.Nout) } } diff --git a/schema/url.go b/schema/url.go new file mode 100644 index 0000000..fcce04f --- /dev/null +++ b/schema/url.go @@ -0,0 +1,167 @@ +package schema + +import ( + "fmt" + "github.com/lbryio/hub/util" + "log" + "regexp" + "strconv" + "strings" +) + +type PathSegment struct { + Name string + ClaimId string + AmountOrder int +} + +type URL struct { + Stream *PathSegment + Channel *PathSegment +} + +func (ps *PathSegment) Normalized() string { + return util.Normalize(ps.Name) +} + +func (ps *PathSegment) IsShortID() bool { + return len(ps.ClaimId) < 40 +} + +func (ps *PathSegment) IsFullID() bool { + return len(ps.ClaimId) == 40 +} + +func (ps *PathSegment) String() string { + if ps == nil { + return "" + } + if ps.ClaimId != "" { + return ps.Name + ":" + ps.ClaimId + } else if ps.AmountOrder >= 0 { + return fmt.Sprintf("%s$%d", ps.Name, ps.AmountOrder) + } + return ps.Name +} + +func (url *URL) HasChannel() bool { + return url.Channel != nil +} + +func (url *URL) HasStream() bool { + return url.Stream != nil +} + +func (url *URL) HasStreamInChannel() bool { + return url.HasChannel() && url.HasStream() +} + +func (url *URL) GetParts() []*PathSegment { + if url.HasStreamInChannel() { + return []*PathSegment{url.Channel, url.Stream} + } + if url.HasChannel() { + return []*PathSegment{url.Channel} + } + return []*PathSegment{url.Stream} +} + +func (url *URL) String() string { + parts := url.GetParts() + stringParts := make([]string, len(parts)) + for i, x := range parts { + stringParts[i] = x.String() + } + return "lbry://" + strings.Join(stringParts, "/") +} + +func ParseURL(url string) *URL { + segmentNames := []string{"channel", "stream", "channel_with_stream", "stream_in_channel"} + re := createUrlRegex() + + match := re.FindStringSubmatch(url) + parts := make(map[string]string) + for i, name := range re.SubexpNames() { + if i != 0 && name != "" { + parts[name] = match[i] + } + } + + segments := make(map[string]*PathSegment) + var amountOrder int + for _, segment := range segmentNames { + if res, ok := parts[segment + "_name"]; ok && res != ""{ + x, ok := parts[segment + "_amount_order"] + if ok && x != "" { + parsedInt, err := strconv.Atoi(x) + if err != nil { + log.Fatalln("can't parse amount_order") + } + amountOrder = parsedInt + } else { + amountOrder = -1 + } + segments[segment] = &PathSegment{ + Name: parts[segment + "_name"], + ClaimId: parts[segment + "_claim_id"], + AmountOrder: amountOrder, + } + } + } + + var stream *PathSegment = nil + var channel *PathSegment = nil + if _, ok := segments["channel_with_stream"]; ok { + stream = segments["channel_with_stream"] + channel = segments["stream_in_channel"] + } else { + stream = segments["channel"] + channel = segments["stream"] + } + + return &URL{stream,channel} +} + +func createUrlRegex() *regexp.Regexp { + //invalidNamesRegex := "[^=&#:$@%?;\"/\\<>%{}|^~`\\[\\]" + "\u0000-\u0020\uD800-\uDFFF\uFFFE-\uFFFF]+" + //invalidNamesRegex := "[^=&#:$@%?;\"/\\<>%{}|^~`\\[\\]" + "\u0000-\u0020-\uFFFE-\uFFFF]+" + invalidNamesRegex := "[^=&#:$@%?;\"/\\<>%{}|^~`\\[\\]" + "]+" + + named := func (name string, regex string) string { + return "(?P<" + name + ">" + regex + ")" + } + + group := func(regex string) string { + return "(?:" + regex + ")" + } + + oneof := func(choices []string) string { + return group(strings.Join(choices, "|")) + } + + claim := func(name string, prefix string) string { + return group( + named(name+"_name", prefix + invalidNamesRegex) + + oneof( + []string { + group("[:#]" + named(name+"_claim_id", "[0-9a-f]{1,40}")), + group("\\$" + named(name+"_amount_order", "[1-9][0-9]*")), + }, + ) + "?", + ) + } + + finalStr := "^" + + named("scheme", "lbry://") + "?" + + oneof( + []string { + group(claim("channel_with_stream", "@") + "/" + claim("stream_in_channel", "")), + claim("channel", "@"), + claim("stream", ""), + }, + ) + + "$" + + re := regexp.MustCompile(finalStr) + return re +} \ No newline at end of file diff --git a/server/search.go b/server/search.go index 5310d8f..9b06246 100644 --- a/server/search.go +++ b/server/search.go @@ -8,10 +8,11 @@ import ( "github.com/btcsuite/btcutil/base58" "github.com/golang/protobuf/ptypes/wrappers" pb "github.com/lbryio/hub/protobuf/go" + "github.com/lbryio/hub/schema" + "github.com/lbryio/hub/util" "github.com/olivere/elastic/v7" "golang.org/x/text/cases" "golang.org/x/text/language" - "golang.org/x/text/unicode/norm" "log" "reflect" "strings" @@ -21,17 +22,21 @@ type record struct { Txid string `json:"tx_id"` Nout uint32 `json:"tx_nout"` Height uint32 `json:"height"` + ClaimId string `json:"claim_id"` } type orderField struct { Field string is_asc bool } - -func ReverseBytes(s []byte) { - for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 { - s[i], s[j] = s[j], s[i] - } +const ( + errorResolution = iota + channelResolution = iota + streamResolution = iota +) +type urlResolution struct { + resolutionType int + value string } func StrArrToInterface(arr []string) []interface{} { @@ -104,11 +109,6 @@ func AddInvertibleField(field *pb.InvertibleField, name string, q *elastic.BoolQ } } -func normalize(s string) string { - c := cases.Fold() - return c.String(norm.NFD.String(s)) -} - func (s *Server) normalizeTag(tag string) string { c := cases.Lower(language.English) res := s.MultiSpaceRe.ReplaceAll( @@ -129,12 +129,230 @@ func (s *Server) cleanTags(tags []string) []string { return cleanedTags } -func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, error) { - // TODO: reuse elastic client across requests - client, err := elastic.NewClient(elastic.SetSniff(false)) - if err != nil { - return nil, err +func (s *Server) fullIdFromShortId(ctx context.Context, channelName string, claimId string) (string, error) { + return "", nil +} + + +func (s *Server) resolveStream(ctx context.Context, url *schema.URL, channelId string) (string, error) { + return "", nil +} + +func (s *Server) resolveChannelId(ctx context.Context, url *schema.URL) (string, error) { + if !url.HasChannel() { + return "", nil } + if url.Channel.IsFullID() { + return url.Channel.ClaimId, nil + } + if url.Channel.IsShortID() { + channelId, err := s.fullIdFromShortId(ctx, url.Channel.Name, url.Channel.ClaimId) + if err != nil { + return "", err + } + return channelId, nil + } + + in := &pb.SearchRequest{} + in.Normalized = []string{util.Normalize(url.Channel.Name)} + if url.Channel.ClaimId == "" && url.Channel.AmountOrder < 0 { + in.IsControlling = &wrappers.BoolValue{Value: true} + } else { + if url.Channel.AmountOrder > 0 { + in.AmountOrder = &wrappers.Int32Value{Value: int32(url.Channel.AmountOrder)} + } + if url.Channel.ClaimId != "" { + in.ClaimId = &pb.InvertibleField{ + Invert: false, + Value: []string{url.Channel.ClaimId}, + } + } + } + + var size = 1 + var from = 0 + q := elastic.NewBoolQuery() + q = AddTermsField(in.Normalized, "normalized", q) + + if in.IsControlling != nil { + q = q.Must(elastic.NewTermQuery("is_controlling", in.IsControlling.Value)) + } + + if in.AmountOrder != nil { + in.Limit.Value = 1 + in.OrderBy = []string{"effective_amount"} + in.Offset = &wrappers.Int32Value{Value: in.AmountOrder.Value - 1} + } + + if in.Limit != nil { + size = int(in.Limit.Value) + } + + if in.Offset != nil { + from = int(in.Offset.Value) + } + + if in.ClaimId != nil { + searchVals := StrArrToInterface(in.ClaimId.Value) + if len(in.ClaimId.Value) == 1 && len(in.ClaimId.Value[0]) < 20 { + if in.ClaimId.Invert { + q = q.MustNot(elastic.NewPrefixQuery("claim_id.keyword", in.ClaimId.Value[0])) + } else { + q = q.Must(elastic.NewPrefixQuery("claim_id.keyword", in.ClaimId.Value[0])) + } + } else { + if in.ClaimId.Invert { + q = q.MustNot(elastic.NewTermsQuery("claim_id.keyword", searchVals...)) + } else { + q = q.Must(elastic.NewTermsQuery("claim_id.keyword", searchVals...)) + } + } + } + + + searchResult, err := s.EsClient.Search(). + Query(q). // specify the query + From(from).Size(size). + Do(ctx) + + if err != nil { + return "", err + } + + var r record + var channelId string + for _, item := range searchResult.Each(reflect.TypeOf(r)) { + if t, ok := item.(record); ok { + channelId = t.ClaimId + } + } + //matches, err := s.Search(ctx, in) + //if err != nil { + // return "", err + //} + + + return channelId, nil +} + +func (s *Server) resolveUrl(ctx context.Context, rawUrl string) *urlResolution { + url := schema.ParseURL(rawUrl) + if url == nil { + return nil + } + + channelId, err := s.resolveChannelId(ctx, url) + if err != nil { + return &urlResolution{ + resolutionType: errorResolution, + value: fmt.Sprintf("Could not find channel in \"%s\".", url), + } + } + + stream, _ := s.resolveStream(ctx, url, channelId) + + if url.HasStream() { + return &urlResolution{ + resolutionType: streamResolution, + value: stream, + } + } else { + return &urlResolution{ + resolutionType: channelResolution, + value: channelId, + } + } +} +/* + async def resolve_url(self, raw_url): + if raw_url not in self.resolution_cache: + self.resolution_cache[raw_url] = await self._resolve_url(raw_url) + return self.resolution_cache[raw_url] + + async def _resolve_url(self, raw_url): + try: + url = URL.parse(raw_url) + except ValueError as e: + return e + + stream = LookupError(f'Could not find claim at "{raw_url}".') + + channel_id = await self.resolve_channel_id(url) + if isinstance(channel_id, LookupError): + return channel_id + stream = (await self.resolve_stream(url, channel_id if isinstance(channel_id, str) else None)) or stream + if url.has_stream: + return StreamResolution(stream) + else: + return ChannelResolution(channel_id) + + async def resolve_channel_id(self, url: URL): + if not url.has_channel: + return + if url.channel.is_fullid: + return url.channel.claim_id + if url.channel.is_shortid: + channel_id = await self.full_id_from_short_id(url.channel.name, url.channel.claim_id) + if not channel_id: + return LookupError(f'Could not find channel in "{url}".') + return channel_id + + query = url.channel.to_dict() + if set(query) == {'name'}: + query['is_controlling'] = True + else: + query['order_by'] = ['^creation_height'] + matches, _, _ = await self.search(**query, limit=1) + if matches: + channel_id = matches[0]['claim_id'] + else: + return LookupError(f'Could not find channel in "{url}".') + return channel_id + + async def resolve_stream(self, url: URL, channel_id: str = None): + if not url.has_stream: + return None + if url.has_channel and channel_id is None: + return None + query = url.stream.to_dict() + if url.stream.claim_id is not None: + if url.stream.is_fullid: + claim_id = url.stream.claim_id + else: + claim_id = await self.full_id_from_short_id(query['name'], query['claim_id'], channel_id) + return claim_id + + if channel_id is not None: + if set(query) == {'name'}: + # temporarily emulate is_controlling for claims in channel + query['order_by'] = ['effective_amount', '^height'] + else: + query['order_by'] = ['^channel_join'] + query['channel_id'] = channel_id + query['signature_valid'] = True + elif set(query) == {'name'}: + query['is_controlling'] = True + matches, _, _ = await self.search(**query, limit=1) + if matches: + return matches[0]['claim_id'] + + */ + +func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, error) { + var client *elastic.Client = nil + if s.EsClient == nil { + tmpClient, err := elastic.NewClient(elastic.SetSniff(false)) + if err != nil { + return nil, err + } + client = tmpClient + s.EsClient = client + } else { + client = s.EsClient + } + + res := s.resolveUrl(ctx, "@abc#111") + log.Println(res) claimTypes := map[string]int { "stream": 1, @@ -218,7 +436,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, if len(in.Name) > 0 { normalized := make([]string, len(in.Name)) for i := 0; i < len(in.Name); i++ { - normalized[i] = normalize(in.Name[i]) + normalized[i] = util.Normalize(in.Name[i]) } in.Normalized = normalized } @@ -264,7 +482,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, if len(in.XId) > 0 { searchVals := make([]interface{}, len(in.XId)) for i := 0; i < len(in.XId); i++ { - ReverseBytes(in.XId[i]) + util.ReverseBytes(in.XId[i]) searchVals[i] = hex.Dump(in.XId[i]) } if len(in.XId) == 1 && len(in.XId[0]) < 20 { @@ -432,7 +650,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, for i, item := range searchResult.Each(reflect.TypeOf(r)) { if t, ok := item.(record); ok { txos[i] = &pb.Output{ - TxHash: toHash(t.Txid), + TxHash: util.ToHash(t.Txid), Nout: t.Nout, Height: t.Height, } @@ -466,32 +684,3 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, Offset: uint32(int64(from) + searchResult.TotalHits()), }, nil } - -// convert txid to txHash -func toHash(txid string) []byte { - t, err := hex.DecodeString(txid) - if err != nil { - return nil - } - - // reverse the bytes. thanks, Satoshi 😒 - for i, j := 0, len(t)-1; i < j; i, j = i+1, j-1 { - t[i], t[j] = t[j], t[i] - } - - return t -} - -// convert txHash to txid -func FromHash(txHash []byte) string { - t := make([]byte, len(txHash)) - copy(t, txHash) - - // reverse the bytes. thanks, Satoshi 😒 - for i, j := 0, len(txHash)-1; i < j; i, j = i+1, j-1 { - txHash[i], txHash[j] = txHash[j], txHash[i] - } - - return hex.EncodeToString(t) - -} diff --git a/server/server.go b/server/server.go index 338994f..57af86c 100644 --- a/server/server.go +++ b/server/server.go @@ -3,6 +3,7 @@ package server import ( "context" pb "github.com/lbryio/hub/protobuf/go" + "github.com/olivere/elastic/v7" "google.golang.org/grpc" "google.golang.org/grpc/metadata" "log" @@ -14,6 +15,7 @@ type Server struct { Args *Args MultiSpaceRe *regexp.Regexp WeirdCharsRe *regexp.Regexp + EsClient *elastic.Client pb.UnimplementedHubServer } diff --git a/util/util.go b/util/util.go new file mode 100644 index 0000000..3117409 --- /dev/null +++ b/util/util.go @@ -0,0 +1,47 @@ +package util + +import ( + "encoding/hex" + "golang.org/x/text/cases" + "golang.org/x/text/unicode/norm" +) + +func Normalize(s string) string { + c := cases.Fold() + return c.String(norm.NFD.String(s)) +} + +func ReverseBytes(s []byte) { + for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 { + s[i], s[j] = s[j], s[i] + } +} + +// convert txid to txHash +func ToHash(txid string) []byte { + t, err := hex.DecodeString(txid) + if err != nil { + return nil + } + + // reverse the bytes. thanks, Satoshi 😒 + for i, j := 0, len(t)-1; i < j; i, j = i+1, j-1 { + t[i], t[j] = t[j], t[i] + } + + return t +} + +// convert txHash to txid +func FromHash(txHash []byte) string { + t := make([]byte, len(txHash)) + copy(t, txHash) + + // reverse the bytes. thanks, Satoshi 😒 + for i, j := 0, len(txHash)-1; i < j; i, j = i+1, j-1 { + txHash[i], txHash[j] = txHash[j], txHash[i] + } + + return hex.EncodeToString(t) + +}