diff --git a/dev.sh b/dev.sh index 78b6117..d96fca7 100755 --- a/dev.sh +++ b/dev.sh @@ -3,4 +3,4 @@ hash reflex 2>/dev/null || go get github.com/cespare/reflex hash reflex 2>/dev/null || { echo >&2 'Make sure '"$(go env GOPATH)"'/bin is in your $PATH'; exit 1; } -reflex --decoration=none --start-service=true -- sh -c "go run . serve --dev" +reflex --decoration=none --start-service=true -- sh -c "go run . serve --debug" diff --git a/go.mod b/go.mod index 7471b86..59d908d 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,9 @@ module github.com/lbryio/hub go 1.16 require ( + github.com/ReneKroon/ttlcache/v2 v2.8.1 github.com/akamensky/argparse v1.2.2 - github.com/btcsuite/btcutil v1.0.2 + github.com/btcsuite/btcutil v1.0.2 // indirect github.com/lbryio/lbry.go/v2 v2.7.2-0.20210625145058-2b155597bf57 github.com/olivere/elastic/v7 v7.0.24 github.com/prometheus/client_golang v1.11.0 diff --git a/go.sum b/go.sum index cce7b53..6c0e5c7 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/ReneKroon/ttlcache/v2 v2.8.1 h1:0Exdyt5+vEsdRoFO1T7qDIYM3gq/ETbeYV+vjgcPxZk= +github.com/ReneKroon/ttlcache/v2 v2.8.1/go.mod h1:mBxvsNY+BT8qLLd6CuAJubbKo6r0jh3nb5et22bbfGY= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/akamensky/argparse v1.2.2 h1:P17T0ZjlUNJuWTPPJ2A5dM1wxarHgHqfYH+AZTo2xQA= github.com/akamensky/argparse v1.2.2/go.mod h1:S5kwC7IuDcEr5VeXtGPRVZ5o/FdhcMlQz4IZQuw64xA= @@ -196,6 +198,8 @@ github.com/ybbus/jsonrpc v0.0.0-20180411222309-2a548b7d822d/go.mod h1:XJrh1eMSzd github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0= +go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -207,6 +211,8 @@ golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 h1:2M3HP5CCK1Si9FQhwnzYhXdG6DXeebvUHFpre8QvbyI= golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -236,6 +242,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -271,8 +279,11 @@ golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3 golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/tools v0.0.0-20210112230658-8b4aab62c064/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -310,6 +321,7 @@ google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/l google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/ini.v1 v1.48.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= diff --git a/server/search.go b/server/search.go index 983f775..274c762 100644 --- a/server/search.go +++ b/server/search.go @@ -2,8 +2,10 @@ package server import ( "context" + "encoding/hex" "encoding/json" "fmt" + "github.com/btcsuite/btcutil/base58" "log" "math" "reflect" @@ -18,6 +20,7 @@ import ( "github.com/olivere/elastic/v7" "golang.org/x/text/cases" "golang.org/x/text/language" + "google.golang.org/protobuf/encoding/protojson" "gopkg.in/karalabe/cookiejar.v1/collections/deque" ) @@ -182,19 +185,36 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, var searchResult *elastic.SearchResult = nil client := s.EsClient searchIndices = make([]string, 0, 1) - searchIndices = append(searchIndices, s.Args.EsIndex) + //searchIndices = append(searchIndices, s.Args.EsIndex) + + indices, _ := client.IndexNames() + for _, index := range indices { + if index != "claims" { + log.Println(index) + searchIndices = append(searchIndices, index) + } + } + + //cacheHit := false + var records []*record + + cacheKey := s.serializeSearchRequest(in) - cacheHit := false - var cachedRecords []*record /* - TODO: cache based on search request params + cache based on search request params include from value and number of results. When another search request comes in with same search params and same or increased offset (which we currently don't even use?) that will be a cache hit. + FIXME: For now the cache is turned off when in debugging mode + (for unit tests) because it breaks on some of them. + FIXME: Currently the cache just skips the initial search, + the mgets and post processing are still done. There's probably + a more efficient way to store the final result. */ - if !cacheHit { + if val, err := s.QueryCache.Get(cacheKey); err != nil || s.Args.Debug { + q := elastic.NewBoolQuery() err := s.checkQuery(in) @@ -227,13 +247,17 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, log.Printf("%s: found %d results in %dms\n", in.Text, len(searchResult.Hits.Hits), searchResult.TookInMillis) - cachedRecords = make([]*record, 0, 0) + records = s.searchResultToRecords(searchResult) + err = s.QueryCache.Set(cacheKey, records) + if err != nil { + //FIXME: Should this be fatal? + log.Println("Error storing records in cache: ", err) + } } else { - //TODO fill cached records here - cachedRecords = make([]*record, 0, 0) + records = val.([]*record) } - txos, extraTxos, blocked := s.postProcessResults(ctx, client, searchResult, in, pageSize, from, searchIndices, cachedRecords) + txos, extraTxos, blocked := s.postProcessResults(ctx, client, records, in, pageSize, from, searchIndices) t1 := time.Now() @@ -283,34 +307,33 @@ func (s *Server) cleanTags(tags []string) []string { return cleanedTags } +func (s *Server) searchResultToRecords( + searchResult *elastic.SearchResult) []*record { + records := make([]*record, 0, searchResult.TotalHits()) + + var r record + for _, item := range searchResult.Each(reflect.TypeOf(r)) { + if t, ok := item.(record); ok { + records = append(records, &t) + } + } + + return records +} + func (s *Server) postProcessResults( ctx context.Context, client *elastic.Client, - searchResult *elastic.SearchResult, + records []*record, in *pb.SearchRequest, pageSize int, from int, - searchIndices []string, - cachedRecords []*record) ([]*pb.Output, []*pb.Output, []*pb.Blocked) { + searchIndices []string) ([]*pb.Output, []*pb.Output, []*pb.Blocked) { var txos []*pb.Output - var records []*record var blockedRecords []*record var blocked []*pb.Blocked var blockedMap map[string]*pb.Blocked - if len(cachedRecords) < 0 { - records = make([]*record, 0, searchResult.TotalHits()) - - var r record - for _, item := range searchResult.Each(reflect.TypeOf(r)) { - if t, ok := item.(record); ok { - records = append(records, &t) - } - } - } else { - records = cachedRecords - } - //printJsonFullResults(searchResult) records, blockedRecords, blockedMap = removeBlocked(records) @@ -508,7 +531,9 @@ func (s *Server) setupEsQuery( } if in.PublicKeyId != "" { - q = q.Must(elastic.NewTermQuery("public_key_id.keyword", in.PublicKeyId)) + value := hex.EncodeToString(base58.Decode(in.PublicKeyId)[1:21]) + q = q.Must(elastic.NewTermQuery("public_key_id.keyword", value)) + // q = q.Must(elastic.NewTermQuery("public_key_id.keyword", in.PublicKeyId)) } if in.HasChannelSignature { @@ -544,7 +569,8 @@ func (s *Server) setupEsQuery( q = AddTermField(q, in.ShortUrl, "short_url.keyword") q = AddTermField(q, in.Signature, "signature.keyword") q = AddTermField(q, in.TxId, "tx_id.keyword") - q = AddTermField(q, strings.ToUpper(in.FeeCurrency), "fee_currency.keyword") + // q = AddTermField(q, strings.ToUpper(in.FeeCurrency), "fee_currency.keyword") + q = AddTermField(q, in.FeeCurrency, "fee_currency.keyword") q = AddTermField(q, in.RepostedClaimId, "reposted_claim_id.keyword") q = AddTermsField(q, s.cleanTags(in.AnyTags), "tags.keyword") @@ -692,6 +718,20 @@ func (s *Server) getClaimsForReposts(ctx context.Context, client *elastic.Client return claims, repostedRecords, respostedMap } +/* + Takes a search request and serializes into a string for use as a key into the + internal cache for the hub. +*/ +func (s *Server) serializeSearchRequest(request *pb.SearchRequest) string { + bytes, err := protojson.Marshal(request) + if err != nil { + return "" + } + str := string((*s.S256).Sum(bytes)) + log.Println(str) + return str +} + func searchAhead(searchHits []*record, pageSize int, perChannelPerPage int) []*record { finalHits := make([]*record, 0, len(searchHits)) var channelCounters map[string]int @@ -826,3 +866,4 @@ func removeBlocked(searchHits []*record) ([]*record, []*record, map[string]*pb.B return newHits, blockedHits, blockedChannels } + diff --git a/server/server.go b/server/server.go index dde6d4e..d6e68f0 100644 --- a/server/server.go +++ b/server/server.go @@ -2,14 +2,16 @@ package server import ( "context" + "crypto/sha256" "fmt" + "hash" "log" + "net/http" "os" "regexp" - - "net/http" "time" + "github.com/ReneKroon/ttlcache/v2" "github.com/lbryio/hub/meta" pb "github.com/lbryio/hub/protobuf/go" "github.com/olivere/elastic/v7" @@ -24,6 +26,8 @@ type Server struct { WeirdCharsRe *regexp.Regexp EsClient *elastic.Client Servers []*FederatedServer + QueryCache *ttlcache.Cache + S256 *hash.Hash pb.UnimplementedHubServer } @@ -123,12 +127,21 @@ func MakeHubServer(args *Args) *Server { if err != nil { log.Fatal(err) } + + cache := ttlcache.NewCache() + err = cache.SetTTL(5 * time.Minute) + if err != nil { + log.Fatal(err) + } + s256 := sha256.New() s := &Server{ GrpcServer: grpcServer, Args: args, MultiSpaceRe: multiSpaceRe, WeirdCharsRe: weirdCharsRe, EsClient: client, + QueryCache: cache, + S256: &s256, } return s