From 79db1be087261b4ef35d21df8886436d33951293 Mon Sep 17 00:00:00 2001 From: Jeffrey Picard Date: Tue, 8 Jun 2021 22:07:59 -0400 Subject: [PATCH] implement remove duplicates --- protobuf/definitions/hub.proto | 1 + protobuf/go/hub.pb.go | 44 +++++++---- server/search.go | 130 +++++++++++++++++---------------- 3 files changed, 99 insertions(+), 76 deletions(-) diff --git a/protobuf/definitions/hub.proto b/protobuf/definitions/hub.proto index 28dd352..68ddec1 100644 --- a/protobuf/definitions/hub.proto +++ b/protobuf/definitions/hub.proto @@ -90,4 +90,5 @@ message SearchRequest { .google.protobuf.Int32Value limit_claims_per_channel = 73; repeated string any_languages = 74; repeated string all_languages = 75; + .google.protobuf.BoolValue remove_duplicates = 76; } \ No newline at end of file diff --git a/protobuf/go/hub.pb.go b/protobuf/go/hub.pb.go index 4698224..b3dd928 100644 --- a/protobuf/go/hub.pb.go +++ b/protobuf/go/hub.pb.go @@ -253,6 +253,7 @@ type SearchRequest struct { LimitClaimsPerChannel *wrapperspb.Int32Value `protobuf:"bytes,73,opt,name=limit_claims_per_channel,json=limitClaimsPerChannel,proto3" json:"limit_claims_per_channel"` AnyLanguages []string `protobuf:"bytes,74,rep,name=any_languages,json=anyLanguages,proto3" json:"any_languages"` AllLanguages []string `protobuf:"bytes,75,rep,name=all_languages,json=allLanguages,proto3" json:"all_languages"` + RemoveDuplicates *wrapperspb.BoolValue `protobuf:"bytes,76,opt,name=remove_duplicates,json=removeDuplicates,proto3" json:"remove_duplicates"` } func (x *SearchRequest) Reset() { @@ -721,6 +722,13 @@ func (x *SearchRequest) GetAllLanguages() []string { return nil } +func (x *SearchRequest) GetRemoveDuplicates() *wrapperspb.BoolValue { + if x != nil { + return x.RemoveDuplicates + } + return nil +} + var File_hub_proto protoreflect.FileDescriptor var file_hub_proto_rawDesc = []byte{ @@ -739,7 +747,7 @@ var file_hub_proto_rawDesc = []byte{ 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x2e, 0x0a, 0x02, 0x4f, 0x70, 0x12, 0x06, 0x0a, 0x02, 0x45, 0x51, 0x10, 0x00, 0x12, 0x07, 0x0a, 0x03, 0x4c, 0x54, 0x45, 0x10, 0x01, 0x12, 0x07, 0x0a, 0x03, 0x47, 0x54, 0x45, 0x10, 0x02, 0x12, 0x06, 0x0a, 0x02, 0x4c, 0x54, 0x10, 0x03, 0x12, 0x06, 0x0a, - 0x02, 0x47, 0x54, 0x10, 0x04, 0x22, 0xd0, 0x15, 0x0a, 0x0d, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, + 0x02, 0x47, 0x54, 0x10, 0x04, 0x22, 0x99, 0x16, 0x0a, 0x0d, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x65, 0x78, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x65, 0x78, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, @@ -912,13 +920,18 @@ var file_hub_proto_rawDesc = []byte{ 0x4a, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0c, 0x61, 0x6e, 0x79, 0x4c, 0x61, 0x6e, 0x67, 0x75, 0x61, 0x67, 0x65, 0x73, 0x12, 0x23, 0x0a, 0x0d, 0x61, 0x6c, 0x6c, 0x5f, 0x6c, 0x61, 0x6e, 0x67, 0x75, 0x61, 0x67, 0x65, 0x73, 0x18, 0x4b, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0c, 0x61, 0x6c, 0x6c, 0x4c, - 0x61, 0x6e, 0x67, 0x75, 0x61, 0x67, 0x65, 0x73, 0x32, 0x31, 0x0a, 0x03, 0x48, 0x75, 0x62, 0x12, - 0x2a, 0x0a, 0x06, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x12, 0x11, 0x2e, 0x70, 0x62, 0x2e, 0x53, - 0x65, 0x61, 0x72, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0b, 0x2e, 0x70, - 0x62, 0x2e, 0x4f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x73, 0x22, 0x00, 0x42, 0x26, 0x5a, 0x24, 0x67, - 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x62, 0x72, 0x79, 0x69, 0x6f, - 0x2f, 0x68, 0x75, 0x62, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x67, 0x6f, - 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x61, 0x6e, 0x67, 0x75, 0x61, 0x67, 0x65, 0x73, 0x12, 0x47, 0x0a, 0x11, 0x72, 0x65, 0x6d, 0x6f, + 0x76, 0x65, 0x5f, 0x64, 0x75, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65, 0x73, 0x18, 0x4c, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x42, 0x6f, 0x6f, 0x6c, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, + 0x10, 0x72, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x44, 0x75, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65, + 0x73, 0x32, 0x31, 0x0a, 0x03, 0x48, 0x75, 0x62, 0x12, 0x2a, 0x0a, 0x06, 0x53, 0x65, 0x61, 0x72, + 0x63, 0x68, 0x12, 0x11, 0x2e, 0x70, 0x62, 0x2e, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0b, 0x2e, 0x70, 0x62, 0x2e, 0x4f, 0x75, 0x74, 0x70, 0x75, + 0x74, 0x73, 0x22, 0x00, 0x42, 0x26, 0x5a, 0x24, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x6f, 0x6d, 0x2f, 0x6c, 0x62, 0x72, 0x79, 0x69, 0x6f, 0x2f, 0x68, 0x75, 0x62, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x67, 0x6f, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -978,13 +991,14 @@ var file_hub_proto_depIdxs = []int32{ 5, // 30: pb.SearchRequest.has_channel_signature:type_name -> google.protobuf.BoolValue 5, // 31: pb.SearchRequest.has_source:type_name -> google.protobuf.BoolValue 4, // 32: pb.SearchRequest.limit_claims_per_channel:type_name -> google.protobuf.Int32Value - 3, // 33: pb.Hub.Search:input_type -> pb.SearchRequest - 6, // 34: pb.Hub.Search:output_type -> pb.Outputs - 34, // [34:35] is the sub-list for method output_type - 33, // [33:34] is the sub-list for method input_type - 33, // [33:33] is the sub-list for extension type_name - 33, // [33:33] is the sub-list for extension extendee - 0, // [0:33] is the sub-list for field type_name + 5, // 33: pb.SearchRequest.remove_duplicates:type_name -> google.protobuf.BoolValue + 3, // 34: pb.Hub.Search:input_type -> pb.SearchRequest + 6, // 35: pb.Hub.Search:output_type -> pb.Outputs + 35, // [35:36] is the sub-list for method output_type + 34, // [34:35] is the sub-list for method input_type + 34, // [34:34] is the sub-list for extension type_name + 34, // [34:34] is the sub-list for extension extendee + 0, // [0:34] is the sub-list for field type_name } func init() { file_hub_proto_init() } diff --git a/server/search.go b/server/search.go index 6fb1619..056a67a 100644 --- a/server/search.go +++ b/server/search.go @@ -22,11 +22,13 @@ import ( ) type record struct { - Txid string `json:"tx_id"` - Nout uint32 `json:"tx_nout"` - Height uint32 `json:"height"` - ClaimId string `json:"claim_id"` - ChannelId string `json:"channel_id"` + Txid string `json:"tx_id"` + Nout uint32 `json:"tx_nout"` + Height uint32 `json:"height"` + ClaimId string `json:"claim_id"` + ChannelId string `json:"channel_id"` + CreationHeight uint32 `json:"creation_height"` + RepostedClaimId string `json:"reposted_claim_id"` } type compareFunc func(r1, r2 **record, invert bool) int @@ -506,7 +508,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, j = j + 1 } - fsc := elastic.NewFetchSourceContext(true).Exclude("description", "title") + fsc := elastic.NewFetchSourceContext(true).Exclude("description", "title")//.Include("_id") log.Printf("from: %d, size: %d\n", from, size) search := client.Search(). Index(searchIndices...). @@ -577,32 +579,30 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, txos = append(txos, res) } } + // + //for _, rec := range records { + // log.Println(*rec) + //} + // + //log.Println("#########################") + // - var finalRecords []*record - for _, rec := range records { - log.Println(*rec) + if in.RemoveDuplicates != nil { + records = removeDuplicates(records) } - - log.Println("#########################") - - if in.LimitClaimsPerChannel != nil { - finalRecords = searchAhead(records, pageSize, int(in.LimitClaimsPerChannel.Value)) - for _, rec := range finalRecords { - log.Println(*rec) - } - } else { - finalRecords = records + records = searchAhead(records, pageSize, int(in.LimitClaimsPerChannel.Value)) + //for _, rec := range records { + // log.Println(*rec) + //} } - finalLength := int(math.Min(float64(len(finalRecords)), float64(pageSize))) - // var start int = from + finalLength := int(math.Min(float64(len(records)), float64(pageSize))) txos = make([]*pb.Output, 0, finalLength) - //for i, t := range finalRecords { j = 0 - for i := from; i < from + finalLength && i < len(finalRecords) && j < finalLength; i++ { - t := finalRecords[i] + for i := from; i < from + finalLength && i < len(records) && j < finalLength; i++ { + t := records[i] res := &pb.Output{ TxHash: util.ToHash(t.Txid), Nout: t.Nout, @@ -612,7 +612,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, j += 1 } - // or if you want more control + //// or if you want more control //for _, hit := range searchResult.Hits.Hits { // // hit.Index contains the name of the index // @@ -641,40 +641,6 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, }, nil } -/* def __search_ahead(self, search_hits: list, page_size: int, per_channel_per_page: int): - reordered_hits = [] - channel_counters = Counter() - next_page_hits_maybe_check_later = deque() - while search_hits or next_page_hits_maybe_check_later: - if reordered_hits and len(reordered_hits) % page_size == 0: - channel_counters.clear() - elif not reordered_hits: - pass - else: - break # means last page was incomplete and we are left with bad replacements - for _ in range(len(next_page_hits_maybe_check_later)): - claim_id, channel_id = next_page_hits_maybe_check_later.popleft() - if per_channel_per_page > 0 and channel_counters[channel_id] < per_channel_per_page: - reordered_hits.append((claim_id, channel_id)) - channel_counters[channel_id] += 1 - else: - next_page_hits_maybe_check_later.append((claim_id, channel_id)) - while search_hits: - hit = search_hits.popleft() - hit_id, hit_channel_id = hit['_id'], hit['_source']['channel_id'] - if hit_channel_id is None or per_channel_per_page <= 0: - reordered_hits.append((hit_id, hit_channel_id)) - elif channel_counters[hit_channel_id] < per_channel_per_page: - reordered_hits.append((hit_id, hit_channel_id)) - channel_counters[hit_channel_id] += 1 - if len(reordered_hits) % page_size == 0: - break - else: - next_page_hits_maybe_check_later.append((hit_id, hit_channel_id)) - return reordered_hits - - */ - func sumCounters(channelCounters map[string]int) int { var sum int = 0 @@ -702,8 +668,6 @@ func searchAhead(searchHits []*record, pageSize int, perChannelPerPage int) []*r break } - // log.Printf("searchHitsQ = %d, nextPageHitsMaybeCheckLater = %d\n", searchHitsQ.Size(), nextPageHitsMaybeCheckLater.Size()) - for i := 0; i < nextPageHitsMaybeCheckLater.Size(); i++ { rec := nextPageHitsMaybeCheckLater.PopLeft().(*record) if perChannelPerPage > 0 && channelCounters[rec.ChannelId] < perChannelPerPage { @@ -728,3 +692,47 @@ func searchAhead(searchHits []*record, pageSize int, perChannelPerPage int) []*r } return finalHits } + +func (r *record) getHitId() string { + if r.RepostedClaimId != "" { + return r.RepostedClaimId + } else { + return r.ClaimId + } +} + +func removeDuplicates(searchHits []*record) []*record { + dropped := make(map[*record]bool) + // claim_id -> (creation_height, hit_id), where hit_id is either reposted claim id or original + knownIds := make(map[string]*record) + + for _, hit := range searchHits { + hitHeight := hit.Height + hitId := hit.getHitId() + + + if knownIds[hitId] == nil { + knownIds[hitId] = hit + } else { + prevHit := knownIds[hitId] + if hitHeight < prevHit.Height { + knownIds[hitId] = hit + dropped[prevHit] = true + } else { + dropped[hit] = true + } + } + } + + deduped := make([]*record, len(searchHits) - len(dropped)) + + var i = 0 + for _, hit := range searchHits { + if !dropped[hit] { + deduped[i] = hit + i++ + } + } + + return deduped +}