diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index f99dbe768..db4fd5f88 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -144,14 +144,13 @@ class BlobFile: while self.writers: self.writers.pop().finished.cancel() - async def delete(self): + def delete(self): self.close() - async with self.blob_write_lock: - self.saved_verified_blob = False - if os.path.isfile(self.file_path): - os.remove(self.file_path) - self.verified.clear() - self.finished_writing.clear() + self.saved_verified_blob = False + if os.path.isfile(self.file_path): + os.remove(self.file_path) + self.verified.clear() + self.finished_writing.clear() def decrypt(self, key: bytes, iv: bytes) -> bytes: """ diff --git a/lbrynet/blob/blob_manager.py b/lbrynet/blob/blob_manager.py index 8e0a5d2e6..84d0daf59 100644 --- a/lbrynet/blob/blob_manager.py +++ b/lbrynet/blob/blob_manager.py @@ -2,7 +2,6 @@ import os import typing import asyncio import logging -from sqlite3 import IntegrityError from lbrynet.extras.daemon.storage import SQLiteStorage from lbrynet.blob.blob_file import BlobFile, is_valid_blobhash from lbrynet.stream.descriptor import StreamDescriptor @@ -63,29 +62,21 @@ class BlobFileManager: blobs = [self.get_blob(b) for b in blob_hashes] return [blob.blob_hash for blob in blobs if blob.get_is_verified()] - async def delete_blob(self, blob_hash: str): + def delete_blob(self, blob_hash: str): if not is_valid_blobhash(blob_hash): raise Exception("invalid blob hash to delete") + if blob_hash not in self.blobs: if os.path.isfile(os.path.join(self.blob_dir, blob_hash)): os.remove(os.path.join(self.blob_dir, blob_hash)) else: - try: - blob = self.get_blob(blob_hash) - await blob.delete() - except Exception as e: - log.warning("Failed to delete blob file. Reason: %s", e) - if blob_hash in self.blobs: - del self.blobs[blob_hash] - if blob_hash in self.completed_blob_hashes: - self.completed_blob_hashes.remove(blob_hash) + self.blobs.pop(blob_hash).delete() + if blob_hash in self.completed_blob_hashes: + self.completed_blob_hashes.remove(blob_hash) async def delete_blobs(self, blob_hashes: typing.List[str], delete_from_db: typing.Optional[bool] = True): - bh_to_delete_from_db = [] - await asyncio.gather(*map(self.delete_blob, blob_hashes), loop=self.loop) + for blob_hash in blob_hashes: + self.delete_blob(blob_hash) + if delete_from_db: - try: - await self.storage.delete_blobs_from_db(bh_to_delete_from_db) - except IntegrityError as err: - if str(err) != "FOREIGN KEY constraint failed": - raise err + await self.storage.delete_blobs_from_db(blob_hashes) diff --git a/lbrynet/extras/daemon/storage.py b/lbrynet/extras/daemon/storage.py index 6843b677b..1b4f9e167 100644 --- a/lbrynet/extras/daemon/storage.py +++ b/lbrynet/extras/daemon/storage.py @@ -353,7 +353,7 @@ class SQLiteStorage(SQLiteMixin): transaction.executemany( "delete from blob where blob_hash=?;", [(blob_hash,) for blob_hash in blob_hashes] ) - return self.db.run(delete_blobs) + return self.db.run_with_foreign_keys_disabled(delete_blobs) def get_all_blob_hashes(self): return self.run_and_return_list("select blob_hash from blob") diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index b704da285..e911381e6 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -65,6 +65,7 @@ class StreamManager: self.resume_downloading_task: asyncio.Task = None self.re_reflect_task: asyncio.Task = None self.update_stream_finished_futs: typing.List[asyncio.Future] = [] + self.running_reflector_uploads: typing.List[asyncio.Task] = [] async def _update_content_claim(self, stream: ManagedStream): claim_info = await self.storage.get_content_claim(stream.stream_hash) @@ -200,6 +201,8 @@ class StreamManager: stream.stop_download() while self.update_stream_finished_futs: self.update_stream_finished_futs.pop().cancel() + while self.running_reflector_uploads: + self.running_reflector_uploads.pop().cancel() async def create_stream(self, file_path: str, key: typing.Optional[bytes] = None, iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream: @@ -208,7 +211,12 @@ class StreamManager: self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) if self.config.reflect_streams and self.config.reflector_servers: host, port = random.choice(self.config.reflector_servers) - self.loop.create_task(stream.upload_to_reflector(host, port)) + task = self.loop.create_task(stream.upload_to_reflector(host, port)) + self.running_reflector_uploads.append(task) + task.add_done_callback( + lambda _: None + if task not in self.running_reflector_uploads else self.running_reflector_uploads.remove(task) + ) return stream async def delete_stream(self, stream: ManagedStream, delete_file: typing.Optional[bool] = False):