diff --git a/main.go b/main.go index d2428a8..58c7d89 100644 --- a/main.go +++ b/main.go @@ -13,7 +13,6 @@ import ( "github.com/akamensky/argparse" pb "github.com/lbryio/hub/protobuf/go" "github.com/lbryio/hub/server" - "google.golang.org/grpc" "google.golang.org/grpc/reflection" ) diff --git a/server/search.go b/server/search.go index b9838cc..776e37b 100644 --- a/server/search.go +++ b/server/search.go @@ -5,21 +5,25 @@ import ( "encoding/hex" "encoding/json" "fmt" + "log" + "math" + "reflect" + "strings" + + //"github.com/lbryio/hub/schema" + "github.com/btcsuite/btcutil/base58" "github.com/golang/protobuf/ptypes/wrappers" pb "github.com/lbryio/hub/protobuf/go" - "math" - //"github.com/lbryio/hub/schema" "github.com/lbryio/hub/util" "github.com/olivere/elastic/v7" "golang.org/x/text/cases" "golang.org/x/text/language" "gopkg.in/karalabe/cookiejar.v1/collections/deque" - "log" - "reflect" - "strings" ) +const DefaultSearchSize = 1000 + type record struct { Txid string `json:"tx_id"` Nout uint32 `json:"tx_nout"` @@ -60,29 +64,26 @@ func StrArrToInterface(arr []string) []interface{} { return searchVals } -func AddTermsField(arr []string, name string, q *elastic.BoolQuery) *elastic.BoolQuery { - if len(arr) > 0 { - searchVals := StrArrToInterface(arr) - return q.Must(elastic.NewTermsQuery(name, searchVals...)) - } - return q -} - -func AddIndividualTermFields(arr []string, name string, q *elastic.BoolQuery, invert bool) *elastic.BoolQuery { - if len(arr) > 0 { - for _, x := range arr { - if invert { - q = q.MustNot(elastic.NewTermQuery(name, x)) - } else { - q = q.Must(elastic.NewTermQuery(name, x)) - } - } +func AddTermsField(q *elastic.BoolQuery, arr []string, name string) *elastic.BoolQuery { + if len(arr) == 0 { return q } + searchVals := StrArrToInterface(arr) + return q.Must(elastic.NewTermsQuery(name, searchVals...)) +} + +func AddIndividualTermFields(q *elastic.BoolQuery, arr []string, name string, invert bool) *elastic.BoolQuery { + for _, x := range arr { + if invert { + q = q.MustNot(elastic.NewTermQuery(name, x)) + } else { + q = q.Must(elastic.NewTermQuery(name, x)) + } + } return q } -func AddRangeField(rq *pb.RangeField, name string, q *elastic.BoolQuery) *elastic.BoolQuery { +func AddRangeField(q *elastic.BoolQuery, rq *pb.RangeField, name string) *elastic.BoolQuery { if rq == nil { return q } @@ -91,7 +92,7 @@ func AddRangeField(rq *pb.RangeField, name string, q *elastic.BoolQuery) *elasti if rq.Op != pb.RangeField_EQ { return q } - return AddTermsField(rq.Value, name, q) + return AddTermsField(q, rq.Value, name) } if rq.Op == pb.RangeField_EQ { return q.Must(elastic.NewTermQuery(name, rq.Value[0])) @@ -106,7 +107,7 @@ func AddRangeField(rq *pb.RangeField, name string, q *elastic.BoolQuery) *elasti } } -func AddInvertibleField(field *pb.InvertibleField, name string, q *elastic.BoolQuery) *elastic.BoolQuery { +func AddInvertibleField(q *elastic.BoolQuery, field *pb.InvertibleField, name string) *elastic.BoolQuery { if field == nil { return q } @@ -122,26 +123,6 @@ func AddInvertibleField(field *pb.InvertibleField, name string, q *elastic.BoolQ } } -func (s *Server) normalizeTag(tag string) string { - c := cases.Lower(language.English) - res := s.MultiSpaceRe.ReplaceAll( - s.WeirdCharsRe.ReplaceAll( - []byte(strings.TrimSpace(strings.Replace(c.String(tag), "'", "", -1))), - []byte(" ")), - []byte(" ")) - - return string(res) -} - - -func (s *Server) cleanTags(tags []string) []string { - cleanedTags := make([]string, len(tags)) - for i, tag := range tags { - cleanedTags[i] = s.normalizeTag(tag) - } - return cleanedTags -} - // Search /* // Search logic is as follows: // 1) Setup query with params given @@ -168,6 +149,182 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, client = s.EsClient } + var from = 0 + var pageSize = 10 + var orderBy []orderField + var searchIndices = []string{} + + q := elastic.NewBoolQuery() + + q = s.setupEsQuery(q, in, &pageSize, &from, &orderBy) + + if s.Args.Dev && len(in.SearchIndices) == 0 { + // If we're running in dev mode ignore the mainnet claims index + indices, err := client.IndexNames() + if err != nil { + log.Fatalln(err) + } + var numIndices = len(indices) + searchIndices = make([]string, 0, numIndices) + for i := 0; i < numIndices; i++ { + if indices[i] == "claims" { + continue + } + searchIndices = append(searchIndices, indices[i]) + } + } + + if len(in.SearchIndices) > 0 { + searchIndices = in.SearchIndices + } + + fsc := elastic.NewFetchSourceContext(true).Exclude("description", "title") + search := client.Search(). + Index(searchIndices...). + FetchSourceContext(fsc). + Query(q). // specify the query + From(0).Size(DefaultSearchSize) + + + for _, x := range orderBy { + search = search.Sort(x.Field, x.IsAsc) + } + + searchResult, err := search.Do(ctx) // execute + if err != nil { + log.Println(err) + return nil, err + } + + log.Printf("%s: found %d results in %dms\n", in.Text, len(searchResult.Hits.Hits), searchResult.TookInMillis) + + txos, extraTxos, blocked := s.postProcessResults(ctx, client, searchResult, in, pageSize, from, searchIndices) + + if in.NoTotals != nil && !in.NoTotals.Value { + return &pb.Outputs{ + Txos: txos, + ExtraTxos: extraTxos, + Offset: uint32(int64(from) + searchResult.TotalHits()), + Blocked: blocked, + }, nil + } + + var blockedTotal uint32 = 0 + for _, b := range blocked { + blockedTotal += b.Count + } + return &pb.Outputs{ + Txos: txos, + ExtraTxos: extraTxos, + Total: uint32(searchResult.TotalHits()), + Offset: uint32(int64(from) + searchResult.TotalHits()), + Blocked: blocked, + BlockedTotal: blockedTotal, + }, nil +} + +func (s *Server) normalizeTag(tag string) string { + c := cases.Lower(language.English) + res := s.MultiSpaceRe.ReplaceAll( + s.WeirdCharsRe.ReplaceAll( + []byte(strings.TrimSpace(strings.Replace(c.String(tag), "'", "", -1))), + []byte(" ")), + []byte(" ")) + + return string(res) +} + + +func (s *Server) cleanTags(tags []string) []string { + cleanedTags := make([]string, len(tags)) + for i, tag := range tags { + cleanedTags[i] = s.normalizeTag(tag) + } + return cleanedTags +} + +func (s *Server) postProcessResults( + ctx context.Context, + client *elastic.Client, + searchResult *elastic.SearchResult, + in *pb.SearchRequest, + pageSize int, + from int, + searchIndices []string) ([]*pb.Output, []*pb.Output, []*pb.Blocked) { + var txos []*pb.Output + var records []*record + var blockedRecords []*record + var blocked []*pb.Blocked + var blockedMap map[string]*pb.Blocked + + records = make([]*record, 0, searchResult.TotalHits()) + + var r record + for _, item := range searchResult.Each(reflect.TypeOf(r)) { + if t, ok := item.(record); ok { + records = append(records, &t) + } + } + + //printJsonFullResults(searchResult) + records, blockedRecords, blockedMap = removeBlocked(records) + + if in.RemoveDuplicates != nil { + records = removeDuplicates(records) + } + + if in.LimitClaimsPerChannel != nil && in.LimitClaimsPerChannel.Value > 0 { + records = searchAhead(records, pageSize, int(in.LimitClaimsPerChannel.Value)) + } + + finalLength := int(math.Min(float64(len(records)), float64(pageSize))) + txos = make([]*pb.Output, 0, finalLength) + var j = 0 + for i := from; i < from + finalLength && i < len(records) && j < finalLength; i++ { + t := records[i] + res := t.recordToOutput() + txos = append(txos, res) + j += 1 + } + + //printJsonFullRecords(blockedRecords) + + //Get claims for reposts + repostClaims, repostRecords, repostedMap := getClaimsForReposts(ctx, client, records, searchIndices) + //get all unique channels + channels, channelMap := getUniqueChannels(append(append(records, repostRecords...), blockedRecords...), client, ctx, searchIndices) + //add these to extra txos + extraTxos := append(repostClaims, channels...) + + //Fill in channel / repost data for txos and blocked + for i, txo := range txos { + channel, cOk := channelMap[records[i].ChannelId] + repostClaim, rOk := repostedMap[records[i].RepostedClaimId] + if cOk { + txo.GetClaim().Channel = channel + } + if rOk { + txo.GetClaim().Repost = repostClaim + } + } + + blocked = make([]*pb.Blocked, 0, len(blockedMap)) + for k, v := range blockedMap { + if channel, ok := channelMap[k]; ok { + v.Channel = channel + } + blocked = append(blocked, v) + } + + return txos, extraTxos, blocked +} + +func (s *Server) setupEsQuery( + q *elastic.BoolQuery, + in *pb.SearchRequest, + from *int, + pageSize *int, + orderBy *[]orderField) *elastic.BoolQuery { claimTypes := map[string]int { "stream": 1, "channel": 2, @@ -212,21 +369,6 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, "tags": true, } - var from = 0 - var pageSize = 10 - var orderBy []orderField - - // Ping the Elasticsearch server to get e.g. the version number - //_, code, err := client.Ping("http://127.0.0.1:9200").Do(ctx) - //if err != nil { - // return nil, err - //} - //if code != 200 { - // return nil, errors.New("ping failed") - //} - - q := elastic.NewBoolQuery() - if in.IsControlling != nil { q = q.Must(elastic.NewTermQuery("is_controlling", in.IsControlling.Value)) } @@ -238,11 +380,11 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, } if in.Limit != nil { - pageSize = int(in.Limit.Value) + *pageSize = int(in.Limit.Value) } if in.Offset != nil { - from = int(in.Offset.Value) + *from = int(in.Offset.Value) } if len(in.Name) > 0 { @@ -270,7 +412,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, if _, ok := textFields[toAppend]; ok { toAppend = toAppend + ".keyword" } - orderBy = append(orderBy, orderField{toAppend, isAsc}) + *orderBy = append(*orderBy, orderField{toAppend, isAsc}) } } @@ -349,53 +491,53 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, q = q.Must(elastic.NewTermQuery("tx_nout", in.TxNout.Value)) } - q = AddTermsField(in.PublicKeyHash, "public_key_hash.keyword", q) - q = AddTermsField(in.Author, "author.keyword", q) - q = AddTermsField(in.Title, "title.keyword", q) - q = AddTermsField(in.CanonicalUrl, "canonical_url.keyword", q) - q = AddTermsField(in.ClaimName, "claim_name.keyword", q) - q = AddTermsField(in.Description, "description.keyword", q) - q = AddTermsField(in.MediaType, "media_type.keyword", q) - q = AddTermsField(in.Normalized, "normalized.keyword", q) - q = AddTermsField(in.PublicKeyBytes, "public_key_bytes.keyword", q) - q = AddTermsField(in.ShortUrl, "short_url.keyword", q) - q = AddTermsField(in.Signature, "signature.keyword", q) - q = AddTermsField(in.SignatureDigest, "signature_digest.keyword", q) - q = AddTermsField(in.TxId, "tx_id.keyword", q) - q = AddTermsField(in.FeeCurrency, "fee_currency.keyword", q) - q = AddTermsField(in.RepostedClaimId, "reposted_claim_id.keyword", q) + q = AddTermsField(q, in.PublicKeyHash, "public_key_hash.keyword") + q = AddTermsField(q, in.Author, "author.keyword") + q = AddTermsField(q, in.Title, "title.keyword") + q = AddTermsField(q, in.CanonicalUrl, "canonical_url.keyword") + q = AddTermsField(q, in.ClaimName, "claim_name.keyword") + q = AddTermsField(q, in.Description, "description.keyword") + q = AddTermsField(q, in.MediaType, "media_type.keyword") + q = AddTermsField(q, in.Normalized, "normalized.keyword") + q = AddTermsField(q, in.PublicKeyBytes, "public_key_bytes.keyword") + q = AddTermsField(q, in.ShortUrl, "short_url.keyword") + q = AddTermsField(q, in.Signature, "signature.keyword") + q = AddTermsField(q, in.SignatureDigest, "signature_digest.keyword") + q = AddTermsField(q, in.TxId, "tx_id.keyword") + q = AddTermsField(q, in.FeeCurrency, "fee_currency.keyword") + q = AddTermsField(q, in.RepostedClaimId, "reposted_claim_id.keyword") - q = AddTermsField(s.cleanTags(in.AnyTags), "tags.keyword", q) - q = AddIndividualTermFields(s.cleanTags(in.AllTags), "tags.keyword", q, false) - q = AddIndividualTermFields(s.cleanTags(in.NotTags), "tags.keyword", q, true) - q = AddTermsField(in.AnyLanguages, "languages", q) - q = AddIndividualTermFields(in.AllLanguages, "languages", q, false) + q = AddTermsField(q, s.cleanTags(in.AnyTags), "tags.keyword") + q = AddIndividualTermFields(q, s.cleanTags(in.AllTags), "tags.keyword", false) + q = AddIndividualTermFields(q, s.cleanTags(in.NotTags), "tags.keyword", true) + q = AddTermsField(q, in.AnyLanguages, "languages") + q = AddIndividualTermFields(q, in.AllLanguages, "languages", false) - q = AddInvertibleField(in.ChannelId, "channel_id.keyword", q) - q = AddInvertibleField(in.ChannelIds, "channel_id.keyword", q) + q = AddInvertibleField(q, in.ChannelId, "channel_id.keyword") + q = AddInvertibleField(q, in.ChannelIds, "channel_id.keyword") - q = AddRangeField(in.TxPosition, "tx_position", q) - q = AddRangeField(in.Amount, "amount", q) - q = AddRangeField(in.Timestamp, "timestamp", q) - q = AddRangeField(in.CreationTimestamp, "creation_timestamp", q) - q = AddRangeField(in.Height, "height", q) - q = AddRangeField(in.CreationHeight, "creation_height", q) - q = AddRangeField(in.ActivationHeight, "activation_height", q) - q = AddRangeField(in.ExpirationHeight, "expiration_height", q) - q = AddRangeField(in.ReleaseTime, "release_time", q) - q = AddRangeField(in.Reposted, "reposted", q) - q = AddRangeField(in.FeeAmount, "fee_amount", q) - q = AddRangeField(in.Duration, "duration", q) - q = AddRangeField(in.CensorType, "censor_type", q) - q = AddRangeField(in.ChannelJoin, "channel_join", q) - q = AddRangeField(in.EffectiveAmount, "effective_amount", q) - q = AddRangeField(in.SupportAmount, "support_amount", q) - q = AddRangeField(in.TrendingGroup, "trending_group", q) - q = AddRangeField(in.TrendingMixed, "trending_mixed", q) - q = AddRangeField(in.TrendingLocal, "trending_local", q) - q = AddRangeField(in.TrendingGlobal, "trending_global", q) + q = AddRangeField(q, in.TxPosition, "tx_position") + q = AddRangeField(q, in.Amount, "amount") + q = AddRangeField(q, in.Timestamp, "timestamp") + q = AddRangeField(q, in.CreationTimestamp, "creation_timestamp") + q = AddRangeField(q, in.Height, "height") + q = AddRangeField(q, in.CreationHeight, "creation_height") + q = AddRangeField(q, in.ActivationHeight, "activation_height") + q = AddRangeField(q, in.ExpirationHeight, "expiration_height") + q = AddRangeField(q, in.ReleaseTime, "release_time") + q = AddRangeField(q, in.Reposted, "reposted") + q = AddRangeField(q, in.FeeAmount, "fee_amount") + q = AddRangeField(q, in.Duration, "duration") + q = AddRangeField(q, in.CensorType, "censor_type") + q = AddRangeField(q, in.ChannelJoin, "channel_join") + q = AddRangeField(q, in.EffectiveAmount, "effective_amount") + q = AddRangeField(q, in.SupportAmount, "support_amount") + q = AddRangeField(q, in.TrendingGroup, "trending_group") + q = AddRangeField(q, in.TrendingMixed, "trending_mixed") + q = AddRangeField(q, in.TrendingLocal, "trending_local") + q = AddRangeField(q, in.TrendingGlobal, "trending_global") if in.Text != "" { textQuery := elastic.NewSimpleQueryStringQuery(in.Text). @@ -409,133 +551,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, q = q.Must(textQuery) } - var searchIndices = []string{} - if s.Args.Dev && len(in.SearchIndices) == 0 { - // If we're running in dev mode ignore the mainnet claims index - indices, err := client.IndexNames() - if err != nil { - log.Fatalln(err) - } - var numIndices = len(indices) - searchIndices = make([]string, 0, numIndices) - for i := 0; i < numIndices; i++ { - if indices[i] == "claims" { - continue - } - searchIndices = append(searchIndices, indices[i]) - } - } - - if len(in.SearchIndices) > 0 { - searchIndices = in.SearchIndices - } - - fsc := elastic.NewFetchSourceContext(true).Exclude("description", "title")//.Include("_id") - search := client.Search(). - Index(searchIndices...). - FetchSourceContext(fsc). - Query(q). // specify the query - From(0).Size(1000) - - - for _, x := range orderBy { - search = search.Sort(x.Field, x.IsAsc) - } - - searchResult, err := search.Do(ctx) // execute - if err != nil { - log.Println(err) - return nil, err - } - - log.Printf("%s: found %d results in %dms\n", in.Text, len(searchResult.Hits.Hits), searchResult.TookInMillis) - - var txos []*pb.Output - var records []*record - var blockedRecords []*record - var blocked []*pb.Blocked - var blockedMap map[string]*pb.Blocked - - records = make([]*record, 0, searchResult.TotalHits()) - - var r record - for _, item := range searchResult.Each(reflect.TypeOf(r)) { - if t, ok := item.(record); ok { - records = append(records, &t) - } - } - - //printJsonFullResults(searchResult) - records, blockedRecords, blockedMap = removeBlocked(records) - - if in.RemoveDuplicates != nil { - records = removeDuplicates(records) - } - - if in.LimitClaimsPerChannel != nil && in.LimitClaimsPerChannel.Value > 0 { - records = searchAhead(records, pageSize, int(in.LimitClaimsPerChannel.Value)) - } - - finalLength := int(math.Min(float64(len(records)), float64(pageSize))) - txos = make([]*pb.Output, 0, finalLength) - var j = 0 - for i := from; i < from + finalLength && i < len(records) && j < finalLength; i++ { - t := records[i] - res := t.recordToOutput() - txos = append(txos, res) - j += 1 - } - - //printJsonFullRecords(blockedRecords) - - //Get claims for reposts - repostClaims, repostRecords, repostedMap := getClaimsForReposts(records, client, ctx, searchIndices) - //get all unique channels - channels, channelMap := getUniqueChannels(append(append(records, repostRecords...), blockedRecords...), client, ctx, searchIndices) - //add these to extra txos - extraTxos := append(repostClaims, channels...) - - //Fill in channel / repost data for txos and blocked - for i, txo := range txos { - channel, cOk := channelMap[records[i].ChannelId] - repostClaim, rOk := repostedMap[records[i].RepostedClaimId] - if cOk { - txo.GetClaim().Channel = channel - } - if rOk { - txo.GetClaim().Repost = repostClaim - } - } - - blocked = make([]*pb.Blocked, 0, len(blockedMap)) - for k, v := range blockedMap { - if channel, ok := channelMap[k]; ok { - v.Channel = channel - } - blocked = append(blocked, v) - } - - if in.NoTotals != nil && !in.NoTotals.Value { - return &pb.Outputs{ - Txos: txos, - ExtraTxos: extraTxos, - Offset: uint32(int64(from) + searchResult.TotalHits()), - Blocked: blocked, - }, nil - } - - var blockedTotal uint32 = 0 - for _, b := range blocked { - blockedTotal += b.Count - } - return &pb.Outputs{ - Txos: txos, - ExtraTxos: extraTxos, - Total: uint32(searchResult.TotalHits()), - Offset: uint32(int64(from) + searchResult.TotalHits()), - Blocked: blocked, - BlockedTotal: blockedTotal, - }, nil + return q } func getUniqueChannels(records []*record, client *elastic.Client, ctx context.Context, searchIndices []string) ([]*pb.Output, map[string]*pb.Output) { @@ -588,7 +604,7 @@ func getUniqueChannels(records []*record, client *elastic.Client, ctx context.Co return channelTxos, channels } -func getClaimsForReposts(records []*record, client *elastic.Client, ctx context.Context, searchIndices []string) ([]*pb.Output, []*record, map[string]*pb.Output) { +func getClaimsForReposts(ctx context.Context, client *elastic.Client, records []*record, searchIndices []string) ([]*pb.Output, []*record, map[string]*pb.Output) { var totalReposted = 0 var mget = client.Mget()//.StoredFields("_id") diff --git a/server/server.go b/server/server.go index cdbbef5..7d50181 100644 --- a/server/server.go +++ b/server/server.go @@ -1,11 +1,12 @@ package server import ( + "log" + "regexp" + pb "github.com/lbryio/hub/protobuf/go" "github.com/olivere/elastic/v7" "google.golang.org/grpc" - "log" - "regexp" ) type Server struct {