From 57f1108df2f258919e7d7bcf45b4d93bff727285 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 5 Mar 2021 05:39:36 -0300 Subject: [PATCH] fix query being json serializable --- lbry/wallet/server/db/elastic_search.py | 43 +++++++++++++++++++++++-- lbry/wallet/server/session.py | 40 +---------------------- 2 files changed, 41 insertions(+), 42 deletions(-) diff --git a/lbry/wallet/server/db/elastic_search.py b/lbry/wallet/server/db/elastic_search.py index 2845a250a..ed7359bb6 100644 --- a/lbry/wallet/server/db/elastic_search.py +++ b/lbry/wallet/server/db/elastic_search.py @@ -1,5 +1,7 @@ import asyncio +import json import struct +import zlib from binascii import hexlify, unhexlify from decimal import Decimal from operator import itemgetter @@ -26,6 +28,7 @@ class SearchIndex: self.logger = class_logger(__name__, self.__class__.__name__) self.claim_cache = LRUCache(2 ** 15) # invalidated on touched self.short_id_cache = LRUCache(2 ** 17) # never invalidated, since short ids are forever + self.search_cache = LRUCache(2 ** 17) # fixme: dont let session manager replace it async def start(self): if self.client: @@ -145,6 +148,7 @@ class SearchIndex: await self.client.indices.refresh(self.index) await self.client.update_by_query(self.index, body=make_query(2, blocked_channels, True), slices=32) await self.client.indices.refresh(self.index) + self.search_cache.clear() async def delete_above_height(self, height): await self.client.delete_by_query(self.index, expand_query(height='>'+str(height))) @@ -210,7 +214,14 @@ class SearchIndex: return [], 0, 0 kwargs['channel_id'] = result['claim_id'] try: - result = await self.client.search(expand_query(**kwargs), index=self.index) + expanded = expand_query(**kwargs) + cache_item = ResultCacheItem.from_cache(json.dumps(expanded, sort_keys=True), self.search_cache) + async with cache_item.lock: + if cache_item.result: + result = json.loads(zlib.decompress(cache_item.result)) + else: + result = await self.client.search(expand_query(**kwargs), index=self.index) + cache_item.result = zlib.compress(json.dumps(result).encode(), 1) except NotFoundError: # index has no docs, fixme: log something return [], 0, 0 @@ -408,13 +419,13 @@ def expand_query(**kwargs): operator_length = 2 if value[:2] in ops else 1 operator, value = value[:operator_length], value[operator_length:] if key == 'fee_amount': - value = Decimal(value)*1000 + value = str(Decimal(value)*1000) query['must'].append({"range": {key: {ops[operator]: value}}}) elif many: query['must'].append({"terms": {key: value}}) else: if key == 'fee_amount': - value = Decimal(value)*1000 + value = str(Decimal(value)*1000) query['must'].append({"term": {key: {"value": value}}}) elif key == 'not_channel_ids': for channel_id in value: @@ -516,3 +527,29 @@ def expand_result(results): if inner_hits: return expand_result(inner_hits) return expanded + + +class ResultCacheItem: + __slots__ = '_result', 'lock', 'has_result' + + def __init__(self): + self.has_result = asyncio.Event() + self.lock = asyncio.Lock() + self._result = None + + @property + def result(self) -> str: + return self._result + + @result.setter + def result(self, result: str): + self._result = result + if result is not None: + self.has_result.set() + + @classmethod + def from_cache(cls, cache_key, cache): + cache_item = cache.get(cache_key) + if cache_item is None: + cache_item = cache[cache_key] = ResultCacheItem() + return cache_item diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index fc63f8a1f..595ee56d5 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -811,9 +811,6 @@ class LBRYSessionManager(SessionManager): self.running = False if self.env.websocket_host is not None and self.env.websocket_port is not None: self.websocket = AdminWebSocket(self) - self.search_cache = self.bp.search_cache - self.search_cache['search'] = LRUCacheWithMetrics(2 ** 14, metric_name='search', namespace=NAMESPACE) - self.search_cache['resolve'] = LRUCacheWithMetrics(2 ** 16, metric_name='resolve', namespace=NAMESPACE) async def process_metrics(self): while self.running: @@ -1008,23 +1005,7 @@ class LBRYElectrumX(SessionBase): async def run_and_cache_query(self, query_name, kwargs): if isinstance(kwargs, dict): kwargs['release_time'] = format_release_time(kwargs.get('release_time')) - metrics = self.get_metrics_or_placeholder_for_api(query_name) - metrics.start() - cache = self.session_mgr.search_cache[query_name] - cache_key = str(kwargs) - cache_item = cache.get(cache_key) - if cache_item is None: - cache_item = cache[cache_key] = ResultCacheItem() - elif cache_item.result is not None: - metrics.cache_response() - return cache_item.result - async with cache_item.lock: - if cache_item.result is None: - cache_item.result = await self.db.search_index.session_query(query_name, kwargs) - else: - metrics = self.get_metrics_or_placeholder_for_api(query_name) - metrics.cache_response() - return cache_item.result + return await self.db.search_index.session_query(query_name, kwargs) async def mempool_compact_histogram(self): return self.mempool.compact_fee_histogram() @@ -1590,25 +1571,6 @@ class LocalRPC(SessionBase): return 'RPC' -class ResultCacheItem: - __slots__ = '_result', 'lock', 'has_result' - - def __init__(self): - self.has_result = asyncio.Event() - self.lock = asyncio.Lock() - self._result = None - - @property - def result(self) -> str: - return self._result - - @result.setter - def result(self, result: str): - self._result = result - if result is not None: - self.has_result.set() - - def get_from_possible_keys(dictionary, *keys): for key in keys: if key in dictionary: