From 0f02906c9bfbddd951f682c700b682a6c4c26a09 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 16 Apr 2021 04:35:12 -0300 Subject: [PATCH 1/4] fix has_source for reposted channels --- lbry/wallet/server/db/elasticsearch/search.py | 1 + lbry/wallet/server/db/writer.py | 1 + tests/integration/blockchain/test_claim_commands.py | 6 +++--- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/lbry/wallet/server/db/elasticsearch/search.py b/lbry/wallet/server/db/elasticsearch/search.py index 883b7877f..e03d4cbec 100644 --- a/lbry/wallet/server/db/elasticsearch/search.py +++ b/lbry/wallet/server/db/elasticsearch/search.py @@ -499,6 +499,7 @@ def expand_query(**kwargs): query['should'].append( {"bool": {"must": [{"match": {"has_source": kwargs['has_source']}}, is_stream_or_repost]}}) query['should'].append({"bool": {"must_not": [is_stream_or_repost]}}) + query['should'].append({"bool": {"must": [{"term": {"reposted_claim_type": CLAIM_TYPES['channel']}}]}}) if kwargs.get('text'): query['must'].append( {"simple_query_string": diff --git a/lbry/wallet/server/db/writer.py b/lbry/wallet/server/db/writer.py index e1a117635..dc1281220 100644 --- a/lbry/wallet/server/db/writer.py +++ b/lbry/wallet/server/db/writer.py @@ -829,6 +829,7 @@ class SQLDB: (select group_concat(tag, ',,') from tag where tag.claim_hash in (claim.claim_hash, claim.reposted_claim_hash)) as tags, (select group_concat(language, ' ') from language where language.claim_hash in (claim.claim_hash, claim.reposted_claim_hash)) as languages, (select cr.has_source from claim cr where cr.claim_hash = claim.reposted_claim_hash) as reposted_has_source, + (select cr.claim_type from claim cr where cr.claim_hash = claim.reposted_claim_hash) as reposted_claim_type, claim.* FROM claim LEFT JOIN claimtrie USING (claim_hash) WHERE claim.claim_hash in (SELECT claim_hash FROM changelog) diff --git a/tests/integration/blockchain/test_claim_commands.py b/tests/integration/blockchain/test_claim_commands.py index 8d62777bd..01d3b4151 100644 --- a/tests/integration/blockchain/test_claim_commands.py +++ b/tests/integration/blockchain/test_claim_commands.py @@ -196,11 +196,11 @@ class ClaimSearchCommand(ClaimTestCase): normal = await self.stream_create('normal', data=b'normal') normal_repost = await self.stream_repost(self.get_claim_id(normal), 'normal-repost') no_source_repost = await self.stream_repost(self.get_claim_id(no_source), 'no-source-repost') - await self.assertFindsClaims([no_source_repost, no_source, channel], has_no_source=True) + await self.assertFindsClaims([channel_repost, no_source_repost, no_source, channel], has_no_source=True) await self.assertListsClaims([no_source, channel], has_no_source=True) - await self.assertFindsClaims([normal_repost, normal, channel], has_source=True) + await self.assertFindsClaims([channel_repost, normal_repost, normal, channel], has_source=True) await self.assertListsClaims([no_source_repost, normal_repost, normal], has_source=True) - await self.assertFindsClaims([no_source_repost, normal_repost, normal, no_source, channel]) + await self.assertFindsClaims([channel_repost, no_source_repost, normal_repost, normal, no_source, channel]) await self.assertListsClaims([no_source_repost, normal_repost, normal, no_source, channel]) async def test_pagination(self): From d5f722792f81fd8dab0704794e01462cc3bcf015 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 3 May 2021 18:40:03 -0300 Subject: [PATCH 2/4] fix and test has_source for channel reposts --- lbry/wallet/server/db/writer.py | 16 ++++++++++------ .../blockchain/test_claim_commands.py | 5 +++-- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/lbry/wallet/server/db/writer.py b/lbry/wallet/server/db/writer.py index dc1281220..5e7367dde 100644 --- a/lbry/wallet/server/db/writer.py +++ b/lbry/wallet/server/db/writer.py @@ -822,8 +822,8 @@ class SQLDB: f"SELECT claim_hash, normalized FROM claim WHERE expiration_height = {height}" ) - def enqueue_changes(self): - for claim in self.execute(f""" + def enqueue_changes(self, shard=None, total_shards=None): + query = """ SELECT claimtrie.claim_hash as is_controlling, claimtrie.last_take_over_height, (select group_concat(tag, ',,') from tag where tag.claim_hash in (claim.claim_hash, claim.reposted_claim_hash)) as tags, @@ -832,8 +832,12 @@ class SQLDB: (select cr.claim_type from claim cr where cr.claim_hash = claim.reposted_claim_hash) as reposted_claim_type, claim.* FROM claim LEFT JOIN claimtrie USING (claim_hash) - WHERE claim.claim_hash in (SELECT claim_hash FROM changelog) - """): + """ + if shard is not None and total_shards is not None: + query += f" WHERE claim.height % {total_shards} = {shard}" + else: + query += " WHERE claim.claim_hash in (SELECT claim_hash FROM changelog)" + for claim in self.execute(query): claim = claim._asdict() id_set = set(filter(None, (claim['claim_hash'], claim['channel_hash'], claim['reposted_claim_hash']))) claim['censor_type'] = 0 @@ -860,11 +864,11 @@ class SQLDB: def clear_changelog(self): self.execute("delete from changelog;") - def claim_producer(self): + def claim_producer(self, shard=None, total_shards=None): while self.pending_deletes: claim_hash = self.pending_deletes.pop() yield 'delete', hexlify(claim_hash[::-1]).decode() - for claim in self.enqueue_changes(): + for claim in self.enqueue_changes(shard, total_shards): yield claim self.clear_changelog() diff --git a/tests/integration/blockchain/test_claim_commands.py b/tests/integration/blockchain/test_claim_commands.py index 01d3b4151..8ded93654 100644 --- a/tests/integration/blockchain/test_claim_commands.py +++ b/tests/integration/blockchain/test_claim_commands.py @@ -196,12 +196,13 @@ class ClaimSearchCommand(ClaimTestCase): normal = await self.stream_create('normal', data=b'normal') normal_repost = await self.stream_repost(self.get_claim_id(normal), 'normal-repost') no_source_repost = await self.stream_repost(self.get_claim_id(no_source), 'no-source-repost') + channel_repost = await self.stream_repost(self.get_claim_id(channel), 'channel-repost') await self.assertFindsClaims([channel_repost, no_source_repost, no_source, channel], has_no_source=True) await self.assertListsClaims([no_source, channel], has_no_source=True) await self.assertFindsClaims([channel_repost, normal_repost, normal, channel], has_source=True) - await self.assertListsClaims([no_source_repost, normal_repost, normal], has_source=True) + await self.assertListsClaims([channel_repost, no_source_repost, normal_repost, normal], has_source=True) await self.assertFindsClaims([channel_repost, no_source_repost, normal_repost, normal, no_source, channel]) - await self.assertListsClaims([no_source_repost, normal_repost, normal, no_source, channel]) + await self.assertListsClaims([channel_repost, no_source_repost, normal_repost, normal, no_source, channel]) async def test_pagination(self): await self.create_channel() From 7bf211a52ba9ed2d4a205588839fb5af106298d2 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 3 May 2021 18:40:21 -0300 Subject: [PATCH 3/4] apply reposted_claim_type on es sync --- lbry/wallet/server/db/elasticsearch/sync.py | 1 + 1 file changed, 1 insertion(+) diff --git a/lbry/wallet/server/db/elasticsearch/sync.py b/lbry/wallet/server/db/elasticsearch/sync.py index b551aeeab..bc716021a 100644 --- a/lbry/wallet/server/db/elasticsearch/sync.py +++ b/lbry/wallet/server/db/elasticsearch/sync.py @@ -30,6 +30,7 @@ SELECT claimtrie.claim_hash as is_controlling, (select group_concat(tag, ',,') from tag where tag.claim_hash in (claim.claim_hash, claim.reposted_claim_hash)) as tags, (select group_concat(language, ' ') from language where language.claim_hash in (claim.claim_hash, claim.reposted_claim_hash)) as languages, (select cr.has_source from claim cr where cr.claim_hash = claim.reposted_claim_hash) as reposted_has_source, + (select cr.claim_type from claim cr where cr.claim_hash = claim.reposted_claim_hash) as reposted_claim_type, claim.* FROM claim LEFT JOIN claimtrie USING (claim_hash) WHERE claim.height % {shards_total} = {shard_num} From a4058b84ce06158c3985cb6eb0c5487593a1f665 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 7 May 2021 12:00:30 -0300 Subject: [PATCH 4/4] clean out unused sharding --- lbry/wallet/server/db/writer.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/lbry/wallet/server/db/writer.py b/lbry/wallet/server/db/writer.py index 5e7367dde..409a223b9 100644 --- a/lbry/wallet/server/db/writer.py +++ b/lbry/wallet/server/db/writer.py @@ -822,7 +822,7 @@ class SQLDB: f"SELECT claim_hash, normalized FROM claim WHERE expiration_height = {height}" ) - def enqueue_changes(self, shard=None, total_shards=None): + def enqueue_changes(self): query = """ SELECT claimtrie.claim_hash as is_controlling, claimtrie.last_take_over_height, @@ -832,11 +832,8 @@ class SQLDB: (select cr.claim_type from claim cr where cr.claim_hash = claim.reposted_claim_hash) as reposted_claim_type, claim.* FROM claim LEFT JOIN claimtrie USING (claim_hash) + WHERE claim.claim_hash in (SELECT claim_hash FROM changelog) """ - if shard is not None and total_shards is not None: - query += f" WHERE claim.height % {total_shards} = {shard}" - else: - query += " WHERE claim.claim_hash in (SELECT claim_hash FROM changelog)" for claim in self.execute(query): claim = claim._asdict() id_set = set(filter(None, (claim['claim_hash'], claim['channel_hash'], claim['reposted_claim_hash']))) @@ -864,11 +861,11 @@ class SQLDB: def clear_changelog(self): self.execute("delete from changelog;") - def claim_producer(self, shard=None, total_shards=None): + def claim_producer(self): while self.pending_deletes: claim_hash = self.pending_deletes.pop() yield 'delete', hexlify(claim_hash[::-1]).decode() - for claim in self.enqueue_changes(shard, total_shards): + for claim in self.enqueue_changes(): yield claim self.clear_changelog()