diff --git a/build.sh b/build.sh index 8128a1f..4daa308 100755 --- a/build.sh +++ b/build.sh @@ -1,4 +1,4 @@ #!/bin/bash go build . -docker build . -t lbry/hub:latest \ No newline at end of file +sudo docker build . -t lbry/hub:latest diff --git a/go.mod b/go.mod index 8102219..64a690f 100644 --- a/go.mod +++ b/go.mod @@ -13,4 +13,5 @@ require ( google.golang.org/genproto v0.0.0-20210524171403-669157292da3 // indirect google.golang.org/grpc v1.38.0 google.golang.org/protobuf v1.26.0 + gopkg.in/karalabe/cookiejar.v1 v1.0.0-20141109175019-e1490cae028c ) diff --git a/go.sum b/go.sum index 5ffd2a0..fb6d6bb 100644 --- a/go.sum +++ b/go.sum @@ -173,6 +173,8 @@ google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/l google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/karalabe/cookiejar.v1 v1.0.0-20141109175019-e1490cae028c h1:4GYkPhjcYLPrPAnoxHVQlH/xcXtWN8pEgqBnHrPAs8c= +gopkg.in/karalabe/cookiejar.v1 v1.0.0-20141109175019-e1490cae028c/go.mod h1:xd7qpr5uPMNy4hsRJ5JEBXA8tJjTFmUI1soCjlCIgAE= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/server/search.go b/server/search.go index 28259d6..6fb1619 100644 --- a/server/search.go +++ b/server/search.go @@ -7,11 +7,14 @@ import ( "github.com/btcsuite/btcutil/base58" "github.com/golang/protobuf/ptypes/wrappers" pb "github.com/lbryio/hub/protobuf/go" + "math" + //"github.com/lbryio/hub/schema" "github.com/lbryio/hub/util" "github.com/olivere/elastic/v7" "golang.org/x/text/cases" "golang.org/x/text/language" + "gopkg.in/karalabe/cookiejar.v1/collections/deque" "log" "reflect" "sort" @@ -19,26 +22,27 @@ import ( ) type record struct { - Txid string `json:"tx_id"` - Nout uint32 `json:"tx_nout"` - Height uint32 `json:"height"` - ClaimId string `json:"claim_id"` + Txid string `json:"tx_id"` + Nout uint32 `json:"tx_nout"` + Height uint32 `json:"height"` + ClaimId string `json:"claim_id"` + ChannelId string `json:"channel_id"` } -type compareFunc func(r1, r2 *record, invert bool) int +type compareFunc func(r1, r2 **record, invert bool) int type multiSorter struct { - records []record + records []*record compare []compareFunc invert []bool } var compareFuncs = map[string]compareFunc { - "height": func(r1, r2 *record, invert bool) int { + "height": func(r1, r2 **record, invert bool) int { var res = 0 - if r1.Height < r2.Height { + if (*r1).Height < (*r2).Height { res = -1 - } else if r1.Height > r2.Height { + } else if (*r1).Height > (*r2).Height { res = 1 } if invert { @@ -49,7 +53,7 @@ var compareFuncs = map[string]compareFunc { } // Sort sorts the argument slice according to the less functions passed to OrderedBy. -func (ms *multiSorter) Sort(records []record) { +func (ms *multiSorter) Sort(records []*record) { ms.records = records sort.Sort(ms) } @@ -494,7 +498,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, } searchIndices := make([]string, numIndices) j := 0 - for i := 0; i < len(indices); i++ { + for i := 0; j < numIndices; i++ { if indices[i] == "claims" { continue } @@ -508,7 +512,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, Index(searchIndices...). FetchSourceContext(fsc). Query(q). // specify the query - From(from).Size(size) + From(0).Size(1000) //if in.LimitClaimsPerChannel != nil { // search = search.Collapse(collapse) //} @@ -525,17 +529,16 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, log.Printf("%s: found %d results in %dms\n", in.Text, len(searchResult.Hits.Hits), searchResult.TookInMillis) var txos []*pb.Output - var records []record + var records []*record //if in.LimitClaimsPerChannel == nil { if true { - txos = make([]*pb.Output, searchResult.TotalHits()) - records = make([]record, 0, searchResult.TotalHits()) + records = make([]*record, 0, searchResult.TotalHits()) var r record for _, item := range searchResult.Each(reflect.TypeOf(r)) { if t, ok := item.(record); ok { - records = append(records, t) + records = append(records, &t) //txos[i] = &pb.Output{ // TxHash: util.ToHash(t.Txid), // Nout: t.Nout, @@ -544,7 +547,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, } } } else { - records = make([]record, 0, len(searchResult.Hits.Hits) * int(in.LimitClaimsPerChannel.Value)) + records = make([]*record, 0, len(searchResult.Hits.Hits) * int(in.LimitClaimsPerChannel.Value)) txos = make([]*pb.Output, 0, len(searchResult.Hits.Hits) * int(in.LimitClaimsPerChannel.Value)) var i = 0 for _, hit := range searchResult.Hits.Hits { @@ -553,7 +556,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, if i >= size { break } - var t record + var t *record err := json.Unmarshal(hitt.Source, &t) if err != nil { return nil, err @@ -575,12 +578,38 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, } } - for i, t := range records { - txos[i] = &pb.Output{ + var finalRecords []*record + for _, rec := range records { + log.Println(*rec) + } + + + log.Println("#########################") + + + if in.LimitClaimsPerChannel != nil { + finalRecords = searchAhead(records, pageSize, int(in.LimitClaimsPerChannel.Value)) + for _, rec := range finalRecords { + log.Println(*rec) + } + } else { + finalRecords = records + } + + finalLength := int(math.Min(float64(len(finalRecords)), float64(pageSize))) + // var start int = from + txos = make([]*pb.Output, 0, finalLength) + //for i, t := range finalRecords { + j = 0 + for i := from; i < from + finalLength && i < len(finalRecords) && j < finalLength; i++ { + t := finalRecords[i] + res := &pb.Output{ TxHash: util.ToHash(t.Txid), Nout: t.Nout, Height: t.Height, } + txos = append(txos, res) + j += 1 } // or if you want more control @@ -611,3 +640,91 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, Offset: uint32(int64(from) + searchResult.TotalHits()), }, nil } + +/* def __search_ahead(self, search_hits: list, page_size: int, per_channel_per_page: int): + reordered_hits = [] + channel_counters = Counter() + next_page_hits_maybe_check_later = deque() + while search_hits or next_page_hits_maybe_check_later: + if reordered_hits and len(reordered_hits) % page_size == 0: + channel_counters.clear() + elif not reordered_hits: + pass + else: + break # means last page was incomplete and we are left with bad replacements + for _ in range(len(next_page_hits_maybe_check_later)): + claim_id, channel_id = next_page_hits_maybe_check_later.popleft() + if per_channel_per_page > 0 and channel_counters[channel_id] < per_channel_per_page: + reordered_hits.append((claim_id, channel_id)) + channel_counters[channel_id] += 1 + else: + next_page_hits_maybe_check_later.append((claim_id, channel_id)) + while search_hits: + hit = search_hits.popleft() + hit_id, hit_channel_id = hit['_id'], hit['_source']['channel_id'] + if hit_channel_id is None or per_channel_per_page <= 0: + reordered_hits.append((hit_id, hit_channel_id)) + elif channel_counters[hit_channel_id] < per_channel_per_page: + reordered_hits.append((hit_id, hit_channel_id)) + channel_counters[hit_channel_id] += 1 + if len(reordered_hits) % page_size == 0: + break + else: + next_page_hits_maybe_check_later.append((hit_id, hit_channel_id)) + return reordered_hits + + */ + + +func sumCounters(channelCounters map[string]int) int { + var sum int = 0 + for _, v := range channelCounters { + sum += v + } + + return sum +} + +func searchAhead(searchHits []*record, pageSize int, perChannelPerPage int) []*record { + finalHits := make([]*record, 0 , len(searchHits)) + var channelCounters map[string]int + channelCounters = make(map[string]int) + nextPageHitsMaybeCheckLater := deque.New() + searchHitsQ := deque.New() + for _, rec := range searchHits { + searchHitsQ.PushRight(rec) + } + for !searchHitsQ.Empty() || !nextPageHitsMaybeCheckLater.Empty() { + if len(finalHits) > 0 && len(finalHits) % pageSize == 0 { + channelCounters = make(map[string]int) + } else if len(finalHits) != 0 { + // means last page was incomplete and we are left with bad replacements + break + } + + // log.Printf("searchHitsQ = %d, nextPageHitsMaybeCheckLater = %d\n", searchHitsQ.Size(), nextPageHitsMaybeCheckLater.Size()) + + for i := 0; i < nextPageHitsMaybeCheckLater.Size(); i++ { + rec := nextPageHitsMaybeCheckLater.PopLeft().(*record) + if perChannelPerPage > 0 && channelCounters[rec.ChannelId] < perChannelPerPage { + finalHits = append(finalHits, rec) + channelCounters[rec.ChannelId] = channelCounters[rec.ChannelId] + 1 + } + } + for !searchHitsQ.Empty() { + hit := searchHitsQ.PopLeft().(*record) + if hit.ChannelId == "" || perChannelPerPage < 0 { + finalHits = append(finalHits, hit) + } else if channelCounters[hit.ChannelId] < perChannelPerPage { + finalHits = append(finalHits, hit) + channelCounters[hit.ChannelId] = channelCounters[hit.ChannelId] + 1 + if len(finalHits) % pageSize == 0 { + break + } + } else { + nextPageHitsMaybeCheckLater.PushRight(hit) + } + } + } + return finalHits +}