From 9fa2d0b6caac248cd4ff17272d0b2e76fcd8dd28 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 4 Nov 2022 13:41:41 -0400 Subject: [PATCH] batched update blocked/filtered in ES --- hub/elastic_sync/service.py | 39 ++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/hub/elastic_sync/service.py b/hub/elastic_sync/service.py index 48e9b35..745e1bd 100644 --- a/hub/elastic_sync/service.py +++ b/hub/elastic_sync/service.py @@ -174,28 +174,31 @@ class ElasticSyncService(BlockchainReaderService): async def apply_filters(self, blocked_streams, blocked_channels, filtered_streams, filtered_channels): only_channels = lambda x: {k: chan for k, (chan, repost) in x.items()} + + async def batched_update_filter(items: typing.Dict[bytes, bytes], channel: bool, censor_type: int): + batches = [{}] + for k, v in items.items(): + if len(batches[-1]) == 2000: + batches.append({}) + batches[-1][k] = v + for batch in batches: + if batch: + await self.sync_client.update_by_query( + self.index, body=self.update_filter_query(censor_type, only_channels(batch)), slices=4) + if channel: + await self.sync_client.update_by_query( + self.index, body=self.update_filter_query(censor_type, only_channels(batch), True), + slices=4) + await self.sync_client.indices.refresh(self.index) + if filtered_streams: - await self.sync_client.update_by_query( - self.index, body=self.update_filter_query(Censor.SEARCH, only_channels(filtered_streams)), slices=4) - await self.sync_client.indices.refresh(self.index) + await batched_update_filter(filtered_streams, False, Censor.SEARCH) if filtered_channels: - await self.sync_client.update_by_query( - self.index, body=self.update_filter_query(Censor.SEARCH, only_channels(filtered_channels)), slices=4) - await self.sync_client.indices.refresh(self.index) - await self.sync_client.update_by_query( - self.index, body=self.update_filter_query(Censor.SEARCH, only_channels(filtered_channels), True), slices=4) - await self.sync_client.indices.refresh(self.index) + await batched_update_filter(filtered_channels, True, Censor.SEARCH) if blocked_streams: - await self.sync_client.update_by_query( - self.index, body=self.update_filter_query(Censor.RESOLVE, only_channels(blocked_streams)), slices=4) - await self.sync_client.indices.refresh(self.index) + await batched_update_filter(blocked_streams, False, Censor.RESOLVE) if blocked_channels: - await self.sync_client.update_by_query( - self.index, body=self.update_filter_query(Censor.RESOLVE, only_channels(blocked_channels)), slices=4) - await self.sync_client.indices.refresh(self.index) - await self.sync_client.update_by_query( - self.index, body=self.update_filter_query(Censor.RESOLVE, only_channels(blocked_channels), True), slices=4) - await self.sync_client.indices.refresh(self.index) + await batched_update_filter(blocked_channels, True, Censor.RESOLVE) @staticmethod def _upsert_claim_query(index, claim):