From 8f32303d0760c3658572ae31fed676bf761c6763 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 9 Mar 2021 00:19:58 -0300 Subject: [PATCH] apply search timeout --- lbry/wallet/server/db/elastic_search.py | 22 ++++++++++++++-------- lbry/wallet/server/db/writer.py | 2 +- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/lbry/wallet/server/db/elastic_search.py b/lbry/wallet/server/db/elastic_search.py index eef8815b2..d25872d97 100644 --- a/lbry/wallet/server/db/elastic_search.py +++ b/lbry/wallet/server/db/elastic_search.py @@ -21,10 +21,12 @@ from lbry.wallet.server.util import class_logger class SearchIndex: - def __init__(self, index_prefix: str): + def __init__(self, index_prefix: str, search_timeout=3.0): + self.search_timeout = search_timeout + self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import + self.search_client: Optional[AsyncElasticsearch] = None self.client: Optional[AsyncElasticsearch] = None self.index = index_prefix + 'claims' - self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import self.logger = class_logger(__name__, self.__class__.__name__) self.claim_cache = LRUCache(2 ** 15) # invalidated on touched self.short_id_cache = LRUCache(2 ** 17) # never invalidated, since short ids are forever @@ -34,6 +36,7 @@ class SearchIndex: if self.client: return self.client = AsyncElasticsearch(timeout=self.sync_timeout) + self.search_client = AsyncElasticsearch(timeout=self.search_timeout) while True: try: await self.client.cluster.health(wait_for_status='yellow') @@ -79,9 +82,9 @@ class SearchIndex: return res.get('acknowledged', False) def stop(self): - client = self.client - self.client = None - return asyncio.ensure_future(client.close()) + clients = [self.client, self.search_client] + self.client, self.search_client = None, None + return asyncio.ensure_future(asyncio.gather(*(client.close() for client in clients))) def delete_index(self): return self.client.indices.delete(self.index, ignore_unavailable=True) @@ -183,8 +186,9 @@ class SearchIndex: async def get_many(self, *claim_ids): missing = [claim_id for claim_id in claim_ids if claim_id not in self.claim_cache] if missing: - results = await self.client.mget(index=self.index, body={"ids": missing}, - _source_excludes=['description', 'title']) + results = await self.search_client.mget( + index=self.index, body={"ids": missing}, _source_excludes=['description', 'title'] + ) results = expand_result(filter(lambda doc: doc['found'], results["docs"])) for result in results: self.claim_cache.set(result['claim_id'], result) @@ -220,7 +224,9 @@ class SearchIndex: if cache_item.result: result = json.loads(zlib.decompress(cache_item.result)) else: - result = await self.client.search(expand_query(**kwargs), index=self.index, track_total_hits=200) + result = await self.search_client.search( + expand_query(**kwargs), index=self.index, track_total_hits=200 + ) cache_item.result = zlib.compress(json.dumps(result).encode(), 1) except NotFoundError: # index has no docs, fixme: log something diff --git a/lbry/wallet/server/db/writer.py b/lbry/wallet/server/db/writer.py index 831038525..3c9abcf63 100644 --- a/lbry/wallet/server/db/writer.py +++ b/lbry/wallet/server/db/writer.py @@ -973,7 +973,7 @@ class LBRYLevelDB(LevelDB): ) # Search index - self.search_index = SearchIndex(self.env.es_index_prefix) + self.search_index = SearchIndex(self.env.es_index_prefix, self.env.database_query_timeout) def close(self): super().close()