From 60a38f80d650f8dea754368ae11e51e399a848d8 Mon Sep 17 00:00:00 2001 From: Jeffrey Picard Date: Fri, 11 Jun 2021 18:18:17 -0400 Subject: [PATCH] extra_txo / channel stuff works now --- server/search.go | 186 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 135 insertions(+), 51 deletions(-) diff --git a/server/search.go b/server/search.go index 830e297..d621f0f 100644 --- a/server/search.go +++ b/server/search.go @@ -32,7 +32,7 @@ type record struct { ShortUrl string `json:"short_url"` CanonicalUrl string `json:"canonical_url"` IsControlling bool `json:"is_controlling"` - TakeOverHeight uint32 `json:"take_over_height"` + TakeOverHeight uint32 `json:"last_take_over_height"` CreationHeight uint32 `json:"creation_height"` ActivationHeight uint32 `json:"activation_height"` ExpirationHeight uint32 `json:"expiration_height"` @@ -451,7 +451,9 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, var txos []*pb.Output var records []*record - var blocked []*pb.Blocked = make([]*pb.Blocked, 0) + var blockedRecords []*record + var blocked []*pb.Blocked + var blockedMap map[string]*pb.Blocked records = make([]*record, 0, searchResult.TotalHits()) @@ -462,7 +464,8 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, } } - records = removeBlocked(records, &blocked) + //printJsonFullResults(searchResult) + records, blockedRecords, blockedMap = removeBlocked(records) if in.RemoveDuplicates != nil { records = removeDuplicates(records) @@ -482,15 +485,39 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, j += 1 } - //printJsonFullResults(searchResult) + //printJsonFullRecords(blockedRecords) + + var searchIndex = "claims" //default + if len(searchIndices) > 0 { + searchIndex = searchIndices[0] + } //Get claims for reposts - repostClaims, repostRecords := getClaimsForReposts(records, client, ctx, searchIndices) + repostClaims, repostRecords, repostedMap := getClaimsForReposts(records, client, ctx, searchIndex) //get all unique channels - channels := getUniqueChannels(append(records, repostRecords...)) + channels, channelMap := getUniqueChannels(append(append(records, repostRecords...), blockedRecords...), client, ctx, searchIndex) //add these to extra txos extraTxos := append(repostClaims, channels...) - //extraTxos := repostClaims + + //Fill in channel / repost data for txos and blocked + for i, txo := range txos { + channel, cOk := channelMap[records[i].ChannelId] + repostClaim, rOk := repostedMap[records[i].RepostedClaimId] + if cOk { + txo.GetClaim().Channel = channel + } + if rOk { + txo.GetClaim().Repost = repostClaim + } + } + + blocked = make([]*pb.Blocked, 0, len(blockedMap)) + for k, v := range blockedMap { + if channel, ok := channelMap[k]; ok { + v.Channel = channel + } + blocked = append(blocked, v) + } if in.NoTotals != nil && !in.NoTotals.Value { return &pb.Outputs{ @@ -515,66 +542,100 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, }, nil } -func getUniqueChannels(records []*record) []*pb.Output { - channelTxos := make([]*pb.Output, 0, len(records)) - channels := make(map[string]bool) +func getUniqueChannels(records []*record, client *elastic.Client, ctx context.Context, searchIndex string) ([]*pb.Output, map[string]*pb.Output) { + channels := make(map[string]*pb.Output) + channelsSet := make(map[string]bool) + var mget = client.Mget() + var totalChannels = 0 for _, r := range records { - if r.ChannelId != "" && !channels[r.ChannelId] { - //txo := &pb.Output{ - // TxHash: util.ToHash(r.ChannelId), - //} - txo := &pb.Output{ - TxHash: util.ToHash(r.Txid), - //Meta: &pb.Output_Claim{ - // Claim: &pb.ClaimMeta{ - // Channel: r.recordToOutput(), - // }, - //}, - } - channelTxos = append(channelTxos, txo) - channels[r.ChannelId] = true + if r.ChannelId != "" && !channelsSet[r.ChannelId] { + channelsSet[r.ChannelId] = true + nmget := elastic.NewMultiGetItem().Id(r.ChannelId).Index(searchIndex) + mget = mget.Add(nmget) + totalChannels++ + } + if r.CensorType != 0 && !channelsSet[r.CensoringChannelHash] { + channelsSet[r.CensoringChannelHash] = true + nmget := elastic.NewMultiGetItem().Id(r.CensoringChannelHash).Index(searchIndex) + mget = mget.Add(nmget) + totalChannels++ } } - - return channelTxos -} - -func getClaimsForReposts(records []*record, client *elastic.Client, ctx context.Context, searchIndices []string) ([]*pb.Output, []*record) { - - var totalReposted = 0 - var mget = client.Mget() - for _, r := range records { - if r.Reposted > 0 { - var nmget = elastic.NewMultiGetItem().Id(r.RepostedClaimId) - for _, index := range searchIndices { - nmget = nmget.Index(index) - } - mget = mget.Add(nmget) - totalReposted++ - } + if totalChannels == 0 { + return []*pb.Output{}, make(map[string]*pb.Output) } res, err := mget.Do(ctx) if err != nil { log.Println(err) - return []*pb.Output{}, []*record{} + return []*pb.Output{}, make(map[string]*pb.Output) + } + + channelTxos := make([]*pb.Output, totalChannels) + //repostedRecords := make([]*record, totalReposted) + + log.Println("total channel", totalChannels) + for i, doc := range res.Docs { + var r record + err := json.Unmarshal(doc.Source, &r) + if err != nil { + return []*pb.Output{}, make(map[string]*pb.Output) + } + channelTxos[i] = r.recordToOutput() + channels[r.ClaimId] = channelTxos[i] + //log.Println(r) + //repostedRecords[i] = &r + } + + return channelTxos, channels +} + +func getClaimsForReposts(records []*record, client *elastic.Client, ctx context.Context, searchIndex string) ([]*pb.Output, []*record, map[string]*pb.Output) { + + var totalReposted = 0 + var mget = client.Mget()//.StoredFields("_id") + /* + var nmget = elastic.NewMultiGetItem() + for _, index := range searchIndices { + nmget = nmget.Index(index) + } + */ + for _, r := range records { + if r.RepostedClaimId != "" { + var nmget = elastic.NewMultiGetItem().Id(r.RepostedClaimId).Index(searchIndex) + //nmget = nmget.Id(r.RepostedClaimId) + mget = mget.Add(nmget) + totalReposted++ + } + } + //mget = mget.Add(nmget) + if totalReposted == 0 { + return []*pb.Output{}, []*record{}, make(map[string]*pb.Output) + } + + res, err := mget.Do(ctx) + if err != nil { + log.Println(err) + return []*pb.Output{}, []*record{}, make(map[string]*pb.Output) } claims := make([]*pb.Output, totalReposted) repostedRecords := make([]*record, totalReposted) + respostedMap := make(map[string]*pb.Output) log.Println("reposted records", totalReposted) for i, doc := range res.Docs { var r record err := json.Unmarshal(doc.Source, &r) if err != nil { - return []*pb.Output{}, []*record{} + return []*pb.Output{}, []*record{}, make(map[string]*pb.Output) } claims[i] = r.recordToOutput() repostedRecords[i] = &r + respostedMap[r.ClaimId] = claims[i] } - return claims, repostedRecords + return claims, repostedRecords, respostedMap } func searchAhead(searchHits []*record, pageSize int, perChannelPerPage int) []*record { @@ -619,6 +680,15 @@ func searchAhead(searchHits []*record, pageSize int, perChannelPerPage int) []*r return finalHits } +func (r *record) recordToChannelOutput() *pb.Output { + // Don't nee dthe meta for this one + return &pb.Output{ + TxHash: util.ToHash(r.Txid), + Nout: r.Nout, + Height: r.Height, + } +} + func (r *record) recordToOutput() *pb.Output { return &pb.Output{ TxHash: util.ToHash(r.Txid), @@ -692,27 +762,41 @@ func removeDuplicates(searchHits []*record) []*record { return deduped } -func removeBlocked(searchHits []*record, blocked *[]*pb.Blocked) []*record { +func removeBlocked(searchHits []*record) ([]*record, []*record, map[string]*pb.Blocked) { newHits := make([]*record, 0, len(searchHits)) + blockedHits := make([]*record, 0, len(searchHits)) blockedChannels := make(map[string]*pb.Blocked) for _, r := range searchHits { if r.CensorType != 0 { - if blockedChannels[r.ChannelId] == nil { + if blockedChannels[r.CensoringChannelHash] == nil { blockedObj := &pb.Blocked{ Count: 1, - Channel: r.recordToOutput(), + Channel: nil, } - *blocked = append(*blocked, blockedObj) - blockedChannels[r.ChannelId] = blockedObj + blockedChannels[r.CensoringChannelHash] = blockedObj + blockedHits = append(blockedHits, r) } else { - blockedChannels[r.ChannelId].Count += 1 + blockedChannels[r.CensoringChannelHash].Count += 1 } } else { newHits = append(newHits, r) } } - return newHits + return newHits, blockedHits, blockedChannels +} + +func printJsonFullRecords(records []*record) { + // or if you want more control + for _, r := range records { + // hit.Index contains the name of the index + + b, err := json.MarshalIndent(r, "", " ") + if err != nil { + fmt.Println("error:", err) + } + fmt.Println(string(b)) + } } func printJsonFullResults(searchResult *elastic.SearchResult) {