From d388527ffad5a1afd2782df0233f233a833a28ea Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 22 Feb 2021 20:47:56 -0300 Subject: [PATCH] log indexing errors --- lbry/wallet/server/db/elastic_search.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lbry/wallet/server/db/elastic_search.py b/lbry/wallet/server/db/elastic_search.py index ceb5ce85f..99c49b16f 100644 --- a/lbry/wallet/server/db/elastic_search.py +++ b/lbry/wallet/server/db/elastic_search.py @@ -6,7 +6,7 @@ from operator import itemgetter from typing import Optional, List, Iterable from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError -from elasticsearch.helpers import async_bulk +from elasticsearch.helpers import async_streaming_bulk from lbry.crypto.base58 import Base58 from lbry.error import ResolveCensoredError @@ -91,7 +91,9 @@ class SearchIndex: async def sync_queue(self, claim_queue): self.logger.info("Writing to index from a queue with %d elements.", claim_queue.qsize()) await self.client.indices.refresh(self.index) - await async_bulk(self.client, self._queue_consumer_doc_producer(claim_queue)) + async for ok, item in async_streaming_bulk(self.client, self._queue_consumer_doc_producer(claim_queue)): + if not ok: + self.logger.warning("indexing failed for an item: %s", item) await self.client.indices.refresh(self.index) await self.client.indices.flush(self.index) self.logger.info("Indexing done. Queue: %d elements", claim_queue.qsize())