diff --git a/lbry/wallet/server/db/elastic_search.py b/lbry/wallet/server/db/elastic_search.py index ceb5ce85f..99c49b16f 100644 --- a/lbry/wallet/server/db/elastic_search.py +++ b/lbry/wallet/server/db/elastic_search.py @@ -6,7 +6,7 @@ from operator import itemgetter from typing import Optional, List, Iterable from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError -from elasticsearch.helpers import async_bulk +from elasticsearch.helpers import async_streaming_bulk from lbry.crypto.base58 import Base58 from lbry.error import ResolveCensoredError @@ -91,7 +91,9 @@ class SearchIndex: async def sync_queue(self, claim_queue): self.logger.info("Writing to index from a queue with %d elements.", claim_queue.qsize()) await self.client.indices.refresh(self.index) - await async_bulk(self.client, self._queue_consumer_doc_producer(claim_queue)) + async for ok, item in async_streaming_bulk(self.client, self._queue_consumer_doc_producer(claim_queue)): + if not ok: + self.logger.warning("indexing failed for an item: %s", item) await self.client.indices.refresh(self.index) await self.client.indices.flush(self.index) self.logger.info("Indexing done. Queue: %d elements", claim_queue.qsize())