extra_txo / channel stuff works now

This commit is contained in:
Jeffrey Picard 2021-06-11 18:18:17 -04:00
parent af9b5de49d
commit 60a38f80d6

View file

@ -32,7 +32,7 @@ type record struct {
ShortUrl string `json:"short_url"` ShortUrl string `json:"short_url"`
CanonicalUrl string `json:"canonical_url"` CanonicalUrl string `json:"canonical_url"`
IsControlling bool `json:"is_controlling"` IsControlling bool `json:"is_controlling"`
TakeOverHeight uint32 `json:"take_over_height"` TakeOverHeight uint32 `json:"last_take_over_height"`
CreationHeight uint32 `json:"creation_height"` CreationHeight uint32 `json:"creation_height"`
ActivationHeight uint32 `json:"activation_height"` ActivationHeight uint32 `json:"activation_height"`
ExpirationHeight uint32 `json:"expiration_height"` ExpirationHeight uint32 `json:"expiration_height"`
@ -451,7 +451,9 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
var txos []*pb.Output var txos []*pb.Output
var records []*record var records []*record
var blocked []*pb.Blocked = make([]*pb.Blocked, 0) var blockedRecords []*record
var blocked []*pb.Blocked
var blockedMap map[string]*pb.Blocked
records = make([]*record, 0, searchResult.TotalHits()) records = make([]*record, 0, searchResult.TotalHits())
@ -462,7 +464,8 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
} }
} }
records = removeBlocked(records, &blocked) //printJsonFullResults(searchResult)
records, blockedRecords, blockedMap = removeBlocked(records)
if in.RemoveDuplicates != nil { if in.RemoveDuplicates != nil {
records = removeDuplicates(records) records = removeDuplicates(records)
@ -482,15 +485,39 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
j += 1 j += 1
} }
//printJsonFullResults(searchResult) //printJsonFullRecords(blockedRecords)
var searchIndex = "claims" //default
if len(searchIndices) > 0 {
searchIndex = searchIndices[0]
}
//Get claims for reposts //Get claims for reposts
repostClaims, repostRecords := getClaimsForReposts(records, client, ctx, searchIndices) repostClaims, repostRecords, repostedMap := getClaimsForReposts(records, client, ctx, searchIndex)
//get all unique channels //get all unique channels
channels := getUniqueChannels(append(records, repostRecords...)) channels, channelMap := getUniqueChannels(append(append(records, repostRecords...), blockedRecords...), client, ctx, searchIndex)
//add these to extra txos //add these to extra txos
extraTxos := append(repostClaims, channels...) extraTxos := append(repostClaims, channels...)
//extraTxos := repostClaims
//Fill in channel / repost data for txos and blocked
for i, txo := range txos {
channel, cOk := channelMap[records[i].ChannelId]
repostClaim, rOk := repostedMap[records[i].RepostedClaimId]
if cOk {
txo.GetClaim().Channel = channel
}
if rOk {
txo.GetClaim().Repost = repostClaim
}
}
blocked = make([]*pb.Blocked, 0, len(blockedMap))
for k, v := range blockedMap {
if channel, ok := channelMap[k]; ok {
v.Channel = channel
}
blocked = append(blocked, v)
}
if in.NoTotals != nil && !in.NoTotals.Value { if in.NoTotals != nil && !in.NoTotals.Value {
return &pb.Outputs{ return &pb.Outputs{
@ -515,66 +542,100 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
}, nil }, nil
} }
func getUniqueChannels(records []*record) []*pb.Output { func getUniqueChannels(records []*record, client *elastic.Client, ctx context.Context, searchIndex string) ([]*pb.Output, map[string]*pb.Output) {
channelTxos := make([]*pb.Output, 0, len(records)) channels := make(map[string]*pb.Output)
channels := make(map[string]bool) channelsSet := make(map[string]bool)
var mget = client.Mget()
var totalChannels = 0
for _, r := range records { for _, r := range records {
if r.ChannelId != "" && !channels[r.ChannelId] { if r.ChannelId != "" && !channelsSet[r.ChannelId] {
//txo := &pb.Output{ channelsSet[r.ChannelId] = true
// TxHash: util.ToHash(r.ChannelId), nmget := elastic.NewMultiGetItem().Id(r.ChannelId).Index(searchIndex)
//} mget = mget.Add(nmget)
txo := &pb.Output{ totalChannels++
TxHash: util.ToHash(r.Txid), }
//Meta: &pb.Output_Claim{ if r.CensorType != 0 && !channelsSet[r.CensoringChannelHash] {
// Claim: &pb.ClaimMeta{ channelsSet[r.CensoringChannelHash] = true
// Channel: r.recordToOutput(), nmget := elastic.NewMultiGetItem().Id(r.CensoringChannelHash).Index(searchIndex)
// }, mget = mget.Add(nmget)
//}, totalChannels++
}
channelTxos = append(channelTxos, txo)
channels[r.ChannelId] = true
} }
} }
if totalChannels == 0 {
return channelTxos return []*pb.Output{}, make(map[string]*pb.Output)
}
func getClaimsForReposts(records []*record, client *elastic.Client, ctx context.Context, searchIndices []string) ([]*pb.Output, []*record) {
var totalReposted = 0
var mget = client.Mget()
for _, r := range records {
if r.Reposted > 0 {
var nmget = elastic.NewMultiGetItem().Id(r.RepostedClaimId)
for _, index := range searchIndices {
nmget = nmget.Index(index)
}
mget = mget.Add(nmget)
totalReposted++
}
} }
res, err := mget.Do(ctx) res, err := mget.Do(ctx)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return []*pb.Output{}, []*record{} return []*pb.Output{}, make(map[string]*pb.Output)
}
channelTxos := make([]*pb.Output, totalChannels)
//repostedRecords := make([]*record, totalReposted)
log.Println("total channel", totalChannels)
for i, doc := range res.Docs {
var r record
err := json.Unmarshal(doc.Source, &r)
if err != nil {
return []*pb.Output{}, make(map[string]*pb.Output)
}
channelTxos[i] = r.recordToOutput()
channels[r.ClaimId] = channelTxos[i]
//log.Println(r)
//repostedRecords[i] = &r
}
return channelTxos, channels
}
func getClaimsForReposts(records []*record, client *elastic.Client, ctx context.Context, searchIndex string) ([]*pb.Output, []*record, map[string]*pb.Output) {
var totalReposted = 0
var mget = client.Mget()//.StoredFields("_id")
/*
var nmget = elastic.NewMultiGetItem()
for _, index := range searchIndices {
nmget = nmget.Index(index)
}
*/
for _, r := range records {
if r.RepostedClaimId != "" {
var nmget = elastic.NewMultiGetItem().Id(r.RepostedClaimId).Index(searchIndex)
//nmget = nmget.Id(r.RepostedClaimId)
mget = mget.Add(nmget)
totalReposted++
}
}
//mget = mget.Add(nmget)
if totalReposted == 0 {
return []*pb.Output{}, []*record{}, make(map[string]*pb.Output)
}
res, err := mget.Do(ctx)
if err != nil {
log.Println(err)
return []*pb.Output{}, []*record{}, make(map[string]*pb.Output)
} }
claims := make([]*pb.Output, totalReposted) claims := make([]*pb.Output, totalReposted)
repostedRecords := make([]*record, totalReposted) repostedRecords := make([]*record, totalReposted)
respostedMap := make(map[string]*pb.Output)
log.Println("reposted records", totalReposted) log.Println("reposted records", totalReposted)
for i, doc := range res.Docs { for i, doc := range res.Docs {
var r record var r record
err := json.Unmarshal(doc.Source, &r) err := json.Unmarshal(doc.Source, &r)
if err != nil { if err != nil {
return []*pb.Output{}, []*record{} return []*pb.Output{}, []*record{}, make(map[string]*pb.Output)
} }
claims[i] = r.recordToOutput() claims[i] = r.recordToOutput()
repostedRecords[i] = &r repostedRecords[i] = &r
respostedMap[r.ClaimId] = claims[i]
} }
return claims, repostedRecords return claims, repostedRecords, respostedMap
} }
func searchAhead(searchHits []*record, pageSize int, perChannelPerPage int) []*record { func searchAhead(searchHits []*record, pageSize int, perChannelPerPage int) []*record {
@ -619,6 +680,15 @@ func searchAhead(searchHits []*record, pageSize int, perChannelPerPage int) []*r
return finalHits return finalHits
} }
func (r *record) recordToChannelOutput() *pb.Output {
// Don't nee dthe meta for this one
return &pb.Output{
TxHash: util.ToHash(r.Txid),
Nout: r.Nout,
Height: r.Height,
}
}
func (r *record) recordToOutput() *pb.Output { func (r *record) recordToOutput() *pb.Output {
return &pb.Output{ return &pb.Output{
TxHash: util.ToHash(r.Txid), TxHash: util.ToHash(r.Txid),
@ -692,27 +762,41 @@ func removeDuplicates(searchHits []*record) []*record {
return deduped return deduped
} }
func removeBlocked(searchHits []*record, blocked *[]*pb.Blocked) []*record { func removeBlocked(searchHits []*record) ([]*record, []*record, map[string]*pb.Blocked) {
newHits := make([]*record, 0, len(searchHits)) newHits := make([]*record, 0, len(searchHits))
blockedHits := make([]*record, 0, len(searchHits))
blockedChannels := make(map[string]*pb.Blocked) blockedChannels := make(map[string]*pb.Blocked)
for _, r := range searchHits { for _, r := range searchHits {
if r.CensorType != 0 { if r.CensorType != 0 {
if blockedChannels[r.ChannelId] == nil { if blockedChannels[r.CensoringChannelHash] == nil {
blockedObj := &pb.Blocked{ blockedObj := &pb.Blocked{
Count: 1, Count: 1,
Channel: r.recordToOutput(), Channel: nil,
} }
*blocked = append(*blocked, blockedObj) blockedChannels[r.CensoringChannelHash] = blockedObj
blockedChannels[r.ChannelId] = blockedObj blockedHits = append(blockedHits, r)
} else { } else {
blockedChannels[r.ChannelId].Count += 1 blockedChannels[r.CensoringChannelHash].Count += 1
} }
} else { } else {
newHits = append(newHits, r) newHits = append(newHits, r)
} }
} }
return newHits return newHits, blockedHits, blockedChannels
}
func printJsonFullRecords(records []*record) {
// or if you want more control
for _, r := range records {
// hit.Index contains the name of the index
b, err := json.MarshalIndent(r, "", " ")
if err != nil {
fmt.Println("error:", err)
}
fmt.Println(string(b))
}
} }
func printJsonFullResults(searchResult *elastic.SearchResult) { func printJsonFullResults(searchResult *elastic.SearchResult) {