From 2641a9abe548959b084f1fba5d113a44ac021e7d Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 5 Mar 2021 04:32:48 -0300 Subject: [PATCH] make better resolve cache --- lbry/schema/url.py | 8 +++ lbry/wallet/server/db/elastic_search.py | 94 +++++++++++++++---------- 2 files changed, 66 insertions(+), 36 deletions(-) diff --git a/lbry/schema/url.py b/lbry/schema/url.py index a09b5e78b..a1081b199 100644 --- a/lbry/schema/url.py +++ b/lbry/schema/url.py @@ -55,6 +55,14 @@ class PathSegment(NamedTuple): def normalized(self): return normalize_name(self.name) + @property + def is_shortid(self): + return self.claim_id is not None and len(self.claim_id) < 40 + + @property + def is_fullid(self): + return self.claim_id is not None and len(self.claim_id) == 40 + def to_dict(self): q = {'name': self.name} if self.claim_id is not None: diff --git a/lbry/wallet/server/db/elastic_search.py b/lbry/wallet/server/db/elastic_search.py index 34d3254ed..2845a250a 100644 --- a/lbry/wallet/server/db/elastic_search.py +++ b/lbry/wallet/server/db/elastic_search.py @@ -9,7 +9,7 @@ from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError from elasticsearch.helpers import async_streaming_bulk from lbry.crypto.base58 import Base58 -from lbry.error import ResolveCensoredError, claim_id +from lbry.error import ResolveCensoredError, claim_id as parse_claim_id from lbry.schema.result import Outputs, Censor from lbry.schema.tags import clean_tags from lbry.schema.url import URL, normalize_name @@ -24,8 +24,8 @@ class SearchIndex: self.index = index_prefix + 'claims' self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import self.logger = class_logger(__name__, self.__class__.__name__) - self.search_cache = LRUCache(2 ** 16) - self.channel_cache = LRUCache(2 ** 16) + self.claim_cache = LRUCache(2 ** 15) # invalidated on touched + self.short_id_cache = LRUCache(2 ** 17) # never invalidated, since short ids are forever async def start(self): if self.client: @@ -97,11 +97,18 @@ class SearchIndex: async def claim_consumer(self, claim_producer): await self.client.indices.refresh(self.index) + touched = set() async for ok, item in async_streaming_bulk(self.client, self._consume_claim_producer(claim_producer)): if not ok: self.logger.warning("indexing failed for an item: %s", item) + else: + item = item.popitem()[1] + touched.add(item['_id']) await self.client.indices.refresh(self.index) await self.client.indices.flush(self.index) + for claim_id in touched: + if claim_id in self.claim_cache: + self.claim_cache.pop(claim_id) self.logger.info("Indexing done.") async def apply_filters(self, blocked_streams, blocked_channels, filtered_streams, filtered_channels): @@ -112,6 +119,9 @@ class SearchIndex: update = expand_query(channel_id__in=list(blockdict.keys()), censor_type=f"<{censor_type}") else: update = expand_query(claim_id__in=list(blockdict.keys()), censor_type=f"<{censor_type}") + for claim_id in blockdict: + if claim_id in self.claim_cache: + self.claim_cache.pop(claim_id) key = 'channel_id' if channels else 'claim_id' update['script'] = { "source": f"ctx._source.censor_type={censor_type}; ctx._source.censoring_channel_hash=params[ctx._source.{key}]", @@ -135,8 +145,6 @@ class SearchIndex: await self.client.indices.refresh(self.index) await self.client.update_by_query(self.index, body=make_query(2, blocked_channels, True), slices=32) await self.client.indices.refresh(self.index) - self.search_cache.clear() - self.channel_cache.clear() async def delete_above_height(self, height): await self.client.delete_by_query(self.index, expand_query(height='>'+str(height))) @@ -168,15 +176,32 @@ class SearchIndex: return results, censored, censor async def get_many(self, *claim_ids): - cached = {claim_id: self.search_cache.get(claim_id) for claim_id in claim_ids if claim_id in self.search_cache} - missing = [claim_id for claim_id in claim_ids if claim_id not in cached] + missing = [claim_id for claim_id in claim_ids if claim_id not in self.claim_cache] if missing: results = await self.client.mget(index=self.index, body={"ids": missing}, _source_excludes=['description', 'title']) results = expand_result(filter(lambda doc: doc['found'], results["docs"])) for result in results: - self.search_cache.set(result['claim_id'], result) - return list(filter(None, map(self.search_cache.get, claim_ids))) + self.claim_cache.set(result['claim_id'], result) + return list(filter(None, map(self.claim_cache.get, claim_ids))) + + async def full_id_from_short_id(self, name, short_id, channel_id=None): + key = (channel_id or '') + name + short_id + if key not in self.short_id_cache: + query = {'name': name, 'claim_id': short_id} + if channel_id: + query['channel_id'] = channel_id + query['order_by'] = ['^channel_join'] + query['channel_id'] = channel_id + query['signature_valid'] = True + else: + query['order_by'] = '^creation_height' + result, _, _ = await self.search(**query, limit=1) + if len(result) == 1: + result = result[0]['claim_id'] + self.short_id_cache[key] = result + return self.short_id_cache.get(key, None) + async def search(self, **kwargs): if 'channel' in kwargs: @@ -217,23 +242,24 @@ class SearchIndex: async def resolve_channel_id(self, url: URL): if not url.has_channel: return - key = 'cid:' + str(url.channel) - if key in self.channel_cache: - return self.channel_cache[key] + if url.channel.is_fullid: + return url.channel.claim_id + if url.channel.is_shortid: + channel_id = await self.full_id_from_short_id(url.channel.name, url.channel.claim_id) + if not channel_id: + return LookupError(f'Could not find channel in "{url}".') + return channel_id + query = url.channel.to_dict() if set(query) == {'name'}: query['is_controlling'] = True else: query['order_by'] = ['^creation_height'] - if len(query.get('claim_id', '')) != 40: - matches, _, _ = await self.search(**query, limit=1) - if matches: - channel_id = matches[0]['claim_id'] - else: - return LookupError(f'Could not find channel in "{url}".') + matches, _, _ = await self.search(**query, limit=1) + if matches: + channel_id = matches[0]['claim_id'] else: - channel_id = query['claim_id'] - self.channel_cache.set(key, channel_id) + return LookupError(f'Could not find channel in "{url}".') return channel_id async def resolve_stream(self, url: URL, channel_id: str = None): @@ -242,14 +268,14 @@ class SearchIndex: if url.has_channel and channel_id is None: return None query = url.stream.to_dict() - stream = None - if 'claim_id' in query and len(query['claim_id']) == 40: - stream = (await self.get_many(query['claim_id'])) - stream = stream[0] if len(stream) else None - else: - key = (channel_id or '') + str(url.stream) - if key in self.search_cache: - return self.search_cache[key] + if url.stream.claim_id is not None: + if url.stream.is_fullid: + claim_id = url.stream.claim_id + else: + claim_id = await self.full_id_from_short_id(query['name'], query['claim_id'], channel_id) + stream = await self.get_many(claim_id) + return stream[0] if len(stream) else None + if channel_id is not None: if set(query) == {'name'}: # temporarily emulate is_controlling for claims in channel @@ -260,19 +286,15 @@ class SearchIndex: query['signature_valid'] = True elif set(query) == {'name'}: query['is_controlling'] = True - if not stream: - matches, _, _ = await self.search(**query, limit=1) - if matches: - stream = matches[0] - key = (channel_id or '') + str(url.stream) - self.search_cache.set(key, stream) - return stream + matches, _, _ = await self.search(**query, limit=1) + if matches: + return matches[0] async def _get_referenced_rows(self, txo_rows: List[dict]): txo_rows = [row for row in txo_rows if isinstance(row, dict)] repost_hashes = set(filter(None, map(itemgetter('reposted_claim_id'), txo_rows))) channel_hashes = set(filter(None, (row['channel_id'] for row in txo_rows))) - channel_hashes |= set(map(claim_id, filter(None, (row['censoring_channel_hash'] for row in txo_rows)))) + channel_hashes |= set(map(parse_claim_id, filter(None, (row['censoring_channel_hash'] for row in txo_rows)))) reposted_txos = [] if repost_hashes: