diff --git a/lbry/wallet/server/db/elasticsearch/__init__.py b/lbry/wallet/server/db/elasticsearch/__init__.py new file mode 100644 index 000000000..385e96219 --- /dev/null +++ b/lbry/wallet/server/db/elasticsearch/__init__.py @@ -0,0 +1 @@ +from .search import SearchIndex \ No newline at end of file diff --git a/lbry/wallet/server/db/elasticsearch/constants.py b/lbry/wallet/server/db/elasticsearch/constants.py new file mode 100644 index 000000000..12483ed10 --- /dev/null +++ b/lbry/wallet/server/db/elasticsearch/constants.py @@ -0,0 +1,61 @@ +INDEX_DEFAULT_SETTINGS = { + "settings": + {"analysis": + {"analyzer": { + "default": {"tokenizer": "whitespace", "filter": ["lowercase", "porter_stem"]}}}, + "index": + {"refresh_interval": -1, + "number_of_shards": 1, + "number_of_replicas": 0, + "sort": { + "field": ["trending_mixed", "release_time"], + "order": ["desc", "desc"] + }} + }, + "mappings": { + "properties": { + "claim_id": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text", + "index_prefixes": { + "min_chars": 1, + "max_chars": 10 + } + }, + "height": {"type": "integer"}, + "claim_type": {"type": "byte"}, + "censor_type": {"type": "byte"}, + "trending_mixed": {"type": "float"}, + "release_time": {"type": "long"}, + } + } +} +FIELDS = {'is_controlling', 'last_take_over_height', 'claim_id', 'claim_name', 'normalized', 'tx_position', 'amount', + 'timestamp', 'creation_timestamp', 'height', 'creation_height', 'activation_height', 'expiration_height', + 'release_time', 'short_url', 'canonical_url', 'title', 'author', 'description', 'claim_type', 'reposted', + 'stream_type', 'media_type', 'fee_amount', 'fee_currency', 'duration', 'reposted_claim_hash', 'censor_type', + 'claims_in_channel', 'channel_join', 'signature_valid', 'effective_amount', 'support_amount', + 'trending_group', 'trending_mixed', 'trending_local', 'trending_global', 'channel_id', 'tx_id', 'tx_nout', + 'signature', 'signature_digest', 'public_key_bytes', 'public_key_hash', 'public_key_id', '_id', 'tags', + 'reposted_claim_id', 'has_source'} +TEXT_FIELDS = {'author', 'canonical_url', 'channel_id', 'claim_name', 'description', 'claim_id', + 'media_type', 'normalized', 'public_key_bytes', 'public_key_hash', 'short_url', 'signature', + 'signature_digest', 'stream_type', 'title', 'tx_id', 'fee_currency', 'reposted_claim_id', 'tags'} +RANGE_FIELDS = { + 'height', 'creation_height', 'activation_height', 'expiration_height', + 'timestamp', 'creation_timestamp', 'duration', 'release_time', 'fee_amount', + 'tx_position', 'channel_join', 'reposted', 'limit_claims_per_channel', + 'amount', 'effective_amount', 'support_amount', + 'trending_group', 'trending_mixed', 'censor_type', + 'trending_local', 'trending_global', +} +REPLACEMENTS = { + 'name': 'normalized', + 'txid': 'tx_id', + 'claim_hash': '_id' +} diff --git a/lbry/wallet/server/db/elastic_search.py b/lbry/wallet/server/db/elasticsearch/search.py similarity index 86% rename from lbry/wallet/server/db/elastic_search.py rename to lbry/wallet/server/db/elasticsearch/search.py index 78e5633e1..64a320518 100644 --- a/lbry/wallet/server/db/elastic_search.py +++ b/lbry/wallet/server/db/elasticsearch/search.py @@ -15,6 +15,8 @@ from lbry.schema.tags import clean_tags from lbry.schema.url import URL, normalize_name from lbry.utils import LRUCache from lbry.wallet.server.db.common import CLAIM_TYPES, STREAM_TYPES +from lbry.wallet.server.db.elasticsearch.constants import INDEX_DEFAULT_SETTINGS, REPLACEMENTS, FIELDS, TEXT_FIELDS, \ + RANGE_FIELDS from lbry.wallet.server.util import class_logger @@ -51,46 +53,7 @@ class SearchIndex: except ConnectionError: self.logger.warning("Failed to connect to Elasticsearch. Waiting for it!") await asyncio.sleep(1) - res = await self.client.indices.create( - self.index, - { - "settings": - {"analysis": - {"analyzer": { - "default": {"tokenizer": "whitespace", "filter": ["lowercase", "porter_stem"]}}}, - "index": - {"refresh_interval": -1, - "number_of_shards": 1, - "number_of_replicas": 0, - "sort": { - "field": ["trending_mixed", "release_time"], - "order": ["desc", "desc"] - }} - }, - "mappings": { - "properties": { - "claim_id": { - "fields": { - "keyword": { - "ignore_above": 256, - "type": "keyword" - } - }, - "type": "text", - "index_prefixes": { - "min_chars": 1, - "max_chars": 10 - } - }, - "height": {"type": "integer"}, - "claim_type": {"type": "byte"}, - "censor_type": {"type": "byte"}, - "trending_mixed": {"type": "float"}, - "release_time": {"type": "long"}, - } - } - }, ignore=400 - ) + res = await self.client.indices.create(self.index, INDEX_DEFAULT_SETTINGS, ignore=400) return res.get('acknowledged', False) def stop(self): @@ -230,7 +193,6 @@ class SearchIndex: if channel_id: query['channel_id'] = channel_id query['order_by'] = ['^channel_join'] - query['channel_id'] = channel_id query['signature_valid'] = True else: query['order_by'] = '^creation_height' @@ -242,10 +204,9 @@ class SearchIndex: async def search(self, **kwargs): if 'channel' in kwargs: - channel_id = await self.resolve_url(kwargs.pop('channel')) - if not channel_id or not isinstance(channel_id, str): + kwargs['channel_id'] = await self.resolve_url(kwargs.pop('channel')) + if not kwargs['channel_id'] or not isinstance(kwargs['channel_id'], str): return [], 0, 0 - kwargs['channel_id'] = channel_id try: result = (await self.search_client.search( expand_query(**kwargs), index=self.index, track_total_hits=False if kwargs.get('no_totals') else 10_000 @@ -365,34 +326,7 @@ def extract_doc(doc, index): doc['claim_type'] = doc.get('claim_type', 0) or 0 doc['stream_type'] = int(doc.get('stream_type', 0) or 0) doc['has_source'] = bool(doc['has_source']) - return {'doc': doc, '_id': doc['claim_id'], '_index': index, '_op_type': 'update', - 'doc_as_upsert': True} - - -FIELDS = {'is_controlling', 'last_take_over_height', 'claim_id', 'claim_name', 'normalized', 'tx_position', 'amount', - 'timestamp', 'creation_timestamp', 'height', 'creation_height', 'activation_height', 'expiration_height', - 'release_time', 'short_url', 'canonical_url', 'title', 'author', 'description', 'claim_type', 'reposted', - 'stream_type', 'media_type', 'fee_amount', 'fee_currency', 'duration', 'reposted_claim_hash', 'censor_type', - 'claims_in_channel', 'channel_join', 'signature_valid', 'effective_amount', 'support_amount', - 'trending_group', 'trending_mixed', 'trending_local', 'trending_global', 'channel_id', 'tx_id', 'tx_nout', - 'signature', 'signature_digest', 'public_key_bytes', 'public_key_hash', 'public_key_id', '_id', 'tags', - 'reposted_claim_id', 'has_source'} -TEXT_FIELDS = {'author', 'canonical_url', 'channel_id', 'claim_name', 'description', 'claim_id', - 'media_type', 'normalized', 'public_key_bytes', 'public_key_hash', 'short_url', 'signature', - 'signature_digest', 'stream_type', 'title', 'tx_id', 'fee_currency', 'reposted_claim_id', 'tags'} -RANGE_FIELDS = { - 'height', 'creation_height', 'activation_height', 'expiration_height', - 'timestamp', 'creation_timestamp', 'duration', 'release_time', 'fee_amount', - 'tx_position', 'channel_join', 'reposted', 'limit_claims_per_channel', - 'amount', 'effective_amount', 'support_amount', - 'trending_group', 'trending_mixed', 'censor_type', - 'trending_local', 'trending_global', -} -REPLACEMENTS = { - 'name': 'normalized', - 'txid': 'tx_id', - 'claim_hash': '_id' -} + return {'doc': doc, '_id': doc['claim_id'], '_index': index, '_op_type': 'update', 'doc_as_upsert': True} def expand_query(**kwargs): diff --git a/lbry/wallet/server/db/elastic_sync.py b/lbry/wallet/server/db/elasticsearch/sync.py similarity index 97% rename from lbry/wallet/server/db/elastic_sync.py rename to lbry/wallet/server/db/elasticsearch/sync.py index b5ccce1bd..c3cf53181 100644 --- a/lbry/wallet/server/db/elastic_sync.py +++ b/lbry/wallet/server/db/elasticsearch/sync.py @@ -9,7 +9,7 @@ import apsw from elasticsearch import AsyncElasticsearch from elasticsearch.helpers import async_bulk -from lbry.wallet.server.db.elastic_search import extract_doc, SearchIndex +from .search import extract_doc, SearchIndex INDEX = 'claims' diff --git a/lbry/wallet/server/db/writer.py b/lbry/wallet/server/db/writer.py index 3c9abcf63..80fc4b556 100644 --- a/lbry/wallet/server/db/writer.py +++ b/lbry/wallet/server/db/writer.py @@ -1,5 +1,4 @@ import os -from asyncio import Queue import apsw from typing import Union, Tuple, Set, List @@ -20,7 +19,7 @@ from lbry.wallet.server.db.canonical import register_canonical_functions from lbry.wallet.server.db.trending import TRENDING_ALGORITHMS from .common import CLAIM_TYPES, STREAM_TYPES, COMMON_TAGS, INDEXED_LANGUAGES -from .elastic_search import SearchIndex +from lbry.wallet.server.db.elasticsearch import SearchIndex ATTRIBUTE_ARRAY_MAX_LENGTH = 100 diff --git a/setup.py b/setup.py index 56a42c7e4..40b2d896c 100644 --- a/setup.py +++ b/setup.py @@ -30,7 +30,7 @@ setup( 'lbrynet=lbry.extras.cli:main', 'torba-server=lbry.wallet.server.cli:main', 'orchstr8=lbry.wallet.orchstr8.cli:main', - 'torba-elastic-sync=lbry.wallet.server.db.elastic_sync:run_elastic_sync' + 'torba-elastic-sync=lbry.wallet.server.db.elasticsearch.sync:run_elastic_sync' ], }, install_requires=[