From 9618c9e702ad291f0fc7bce5adb71643b5ec4dbb Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 17 Mar 2022 11:49:17 -0400 Subject: [PATCH] patch channel_tx_hash and repost_tx_hash not being updated in ES -this is a patch pending a real fix --- scribe/elasticsearch/search.py | 56 ++++++++++++++++++++++++---------- scribe/hub/session.py | 2 +- 2 files changed, 41 insertions(+), 17 deletions(-) diff --git a/scribe/elasticsearch/search.py b/scribe/elasticsearch/search.py index c038756..0e336a5 100644 --- a/scribe/elasticsearch/search.py +++ b/scribe/elasticsearch/search.py @@ -2,11 +2,11 @@ import logging import time import asyncio import struct -from binascii import unhexlify +from bisect import bisect_right from collections import Counter, deque from decimal import Decimal from operator import itemgetter -from typing import Optional, List, Iterable +from typing import Optional, List, Iterable, TYPE_CHECKING from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError from elasticsearch.helpers import async_streaming_bulk @@ -19,6 +19,8 @@ from scribe.db.common import CLAIM_TYPES, STREAM_TYPES from scribe.elasticsearch.constants import INDEX_DEFAULT_SETTINGS, REPLACEMENTS, FIELDS, TEXT_FIELDS, \ RANGE_FIELDS, ALL_FIELDS from scribe.db.common import ResolveResult +if TYPE_CHECKING: + from scribe.db import HubDB def expand_query(**kwargs): @@ -185,7 +187,9 @@ class IndexVersionMismatch(Exception): class SearchIndex: VERSION = 1 - def __init__(self, index_prefix: str, search_timeout=3.0, elastic_host='localhost', elastic_port=9200): + def __init__(self, hub_db: 'HubDB', index_prefix: str, search_timeout=3.0, elastic_host='localhost', + elastic_port=9200): + self.hub_db = hub_db self.search_timeout = search_timeout self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import self.search_client: Optional[AsyncElasticsearch] = None @@ -246,6 +250,26 @@ class SearchIndex: self.claim_cache.clear() def _make_resolve_result(self, es_result): + channel_hash = es_result['channel_hash'] + reposted_claim_hash = es_result['reposted_claim_hash'] + channel_tx_hash = None + channel_tx_position = None + channel_height = None + reposted_tx_hash = None + reposted_tx_position = None + reposted_height = None + if channel_hash: # FIXME: do this inside ES in a script + channel_txo = self.hub_db.get_cached_claim_txo(channel_hash[::-1]) + if channel_txo: + channel_tx_hash = self.hub_db.get_tx_hash(channel_txo.tx_num) + channel_tx_position = channel_txo.position + channel_height = bisect_right(self.hub_db.tx_counts, channel_txo.tx_num) + if reposted_claim_hash: + repost_txo = self.hub_db.get_cached_claim_txo(reposted_claim_hash[::-1]) + if repost_txo: + reposted_tx_hash = self.hub_db.get_tx_hash(repost_txo.tx_num) + reposted_tx_position = repost_txo.position + reposted_height = bisect_right(self.hub_db.tx_counts, repost_txo.tx_num) return ResolveResult( name=es_result['claim_name'], normalized_name=es_result['normalized_name'], @@ -265,16 +289,16 @@ class SearchIndex: support_amount=es_result['support_amount'], last_takeover_height=es_result['last_take_over_height'], claims_in_channel=es_result['claims_in_channel'], - channel_hash=es_result['channel_hash'], - reposted_claim_hash=es_result['reposted_claim_hash'], + channel_hash=channel_hash, + reposted_claim_hash=reposted_claim_hash, reposted=es_result['reposted'], signature_valid=es_result['signature_valid'], - reposted_tx_hash=bytes.fromhex(es_result['reposted_tx_id'] or '')[::-1] or None, - reposted_tx_position=es_result['reposted_tx_position'], - reposted_height=es_result['reposted_height'], - channel_tx_hash=bytes.fromhex(es_result['channel_tx_id'] or '')[::-1] or None, - channel_tx_position=es_result['channel_tx_position'], - channel_height=es_result['channel_height'], + reposted_tx_hash=reposted_tx_hash, + reposted_tx_position=reposted_tx_position, + reposted_height=reposted_height, + channel_tx_hash=channel_tx_hash, + channel_tx_position=channel_tx_position, + channel_height=channel_height, ) async def cached_search(self, kwargs): @@ -579,14 +603,14 @@ def expand_result(results): inner_hits.extend(inner_hit["hits"]["hits"]) continue result = result['_source'] - result['claim_hash'] = unhexlify(result['claim_id'])[::-1] + result['claim_hash'] = bytes.fromhex(result['claim_id'])[::-1] if result['reposted_claim_id']: - result['reposted_claim_hash'] = unhexlify(result['reposted_claim_id'])[::-1] + result['reposted_claim_hash'] = bytes.fromhex(result['reposted_claim_id'])[::-1] else: result['reposted_claim_hash'] = None - result['channel_hash'] = unhexlify(result['channel_id'])[::-1] if result['channel_id'] else None - result['txo_hash'] = unhexlify(result['tx_id'])[::-1] + struct.pack('