Merge pull request #3136 from lbryio/fix-reflector-loop

fix reflector loop crashing upon an unexpected error
This commit is contained in:
Jack Robison 2021-01-08 12:40:18 -05:00 committed by GitHub
commit a4db0820bc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 13 additions and 17 deletions

View file

@ -354,20 +354,19 @@ class ManagedStream(ManagedDownloadSource):
self.reflector_progress = int((i + 1) / len(we_have) * 100) self.reflector_progress = int((i + 1) / len(we_have) * 100)
except (asyncio.TimeoutError, ValueError): except (asyncio.TimeoutError, ValueError):
return sent return sent
except ConnectionRefusedError: except ConnectionError:
return sent return sent
except (OSError, Exception) as err: except (OSError, Exception) as err:
if isinstance(err, asyncio.CancelledError): if isinstance(err, asyncio.CancelledError):
log.warning("stopped uploading %s#%s to reflector", self.claim_name, self.claim_id) log.warning("stopped uploading %s#%s to reflector", self.claim_name, self.claim_id)
raise err elif isinstance(err, OSError):
if isinstance(err, OSError):
log.warning( log.warning(
"stopped uploading %s#%s to reflector because blobs were deleted or moved", self.claim_name, "stopped uploading %s#%s to reflector because blobs were deleted or moved", self.claim_name,
self.claim_id self.claim_id
) )
else: else:
log.exception("unexpected error reflecting %s#%s", self.claim_name, self.claim_id) log.exception("unexpected error reflecting %s#%s", self.claim_name, self.claim_id)
raise asyncio.CancelledError() return sent
finally: finally:
if protocol.transport: if protocol.transport:
protocol.transport.close() protocol.transport.close()

View file

@ -160,13 +160,13 @@ class StreamManager(SourceManager):
async def reflect_streams(self): async def reflect_streams(self):
try: try:
return await self._reflact_streams() return await self._reflect_streams()
except asyncio.CancelledError: except asyncio.CancelledError:
raise raise
except Exception: except Exception:
log.exception("reflector task encountered an unexpected error!") log.exception("reflector task encountered an unexpected error!")
async def _reflact_streams(self): async def _reflect_streams(self):
# todo: those debug statements are temporary for #2987 - remove them if its closed # todo: those debug statements are temporary for #2987 - remove them if its closed
while True: while True:
if self.config.reflect_streams and self.config.reflector_servers: if self.config.reflect_streams and self.config.reflector_servers:

View file

@ -93,8 +93,7 @@ class TestReflector(AsyncioTestCase):
await incoming.wait() await incoming.wait()
stop.set() stop.set()
# this used to raise (and then propagate) a CancelledError # this used to raise (and then propagate) a CancelledError
with self.assertRaises(asyncio.CancelledError): self.assertListEqual(await reflect_task, [])
await reflect_task
self.assertFalse(self.stream.is_fully_reflected) self.assertFalse(self.stream.is_fully_reflected)
self.assertFalse(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified()) self.assertFalse(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified())
@ -114,8 +113,8 @@ class TestReflector(AsyncioTestCase):
await incoming.wait() await incoming.wait()
await not_incoming.wait() await not_incoming.wait()
stop.set() stop.set()
with self.assertRaises(asyncio.CancelledError): sent = await reflect_task
await reflect_task self.assertListEqual([self.stream.sd_hash], sent)
self.assertTrue(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified()) self.assertTrue(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified())
self.assertFalse(self.stream.is_fully_reflected) self.assertFalse(self.stream.is_fully_reflected)
@ -137,8 +136,8 @@ class TestReflector(AsyncioTestCase):
await incoming.wait() await incoming.wait()
await not_incoming.wait() await not_incoming.wait()
stop.set() stop.set()
with self.assertRaises(asyncio.CancelledError): sent = await reflect_task
await reflect_task self.assertListEqual([self.stream.sd_hash, self.stream.descriptor.blobs[0].blob_hash], sent)
self.assertTrue(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified()) self.assertTrue(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified())
self.assertTrue(self.server_blob_manager.get_blob(self.stream.descriptor.blobs[0].blob_hash).get_is_verified()) self.assertTrue(self.server_blob_manager.get_blob(self.stream.descriptor.blobs[0].blob_hash).get_is_verified())
self.assertFalse(self.stream.is_fully_reflected) self.assertFalse(self.stream.is_fully_reflected)
@ -160,9 +159,7 @@ class TestReflector(AsyncioTestCase):
await not_incoming.wait() await not_incoming.wait()
await incoming.wait() await incoming.wait()
stop.set() stop.set()
with self.assertRaises(asyncio.CancelledError): self.assertListEqual(await reflect_task, [self.stream.sd_hash])
await reflect_task
# self.assertListEqual(await reflect_task, [self.stream.sd_hash])
self.assertTrue(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified()) self.assertTrue(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified())
self.assertFalse( self.assertFalse(
self.server_blob_manager.get_blob(self.stream.descriptor.blobs[0].blob_hash).get_is_verified() self.server_blob_manager.get_blob(self.stream.descriptor.blobs[0].blob_hash).get_is_verified()
@ -187,8 +184,8 @@ class TestReflector(AsyncioTestCase):
await incoming.wait() await incoming.wait()
await self.stream_manager.delete(self.stream, delete_file=True) await self.stream_manager.delete(self.stream, delete_file=True)
# this used to raise OSError when it can't read the deleted blob for the upload # this used to raise OSError when it can't read the deleted blob for the upload
with self.assertRaises(asyncio.CancelledError): sent = await reflect_task
await reflect_task self.assertListEqual([self.stream.sd_hash], sent)
self.assertTrue(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified()) self.assertTrue(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified())
self.assertFalse( self.assertFalse(
self.server_blob_manager.get_blob(self.stream.descriptor.blobs[0].blob_hash).get_is_verified() self.server_blob_manager.get_blob(self.stream.descriptor.blobs[0].blob_hash).get_is_verified()