diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index f99dbe768..db4fd5f88 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -144,14 +144,13 @@ class BlobFile: while self.writers: self.writers.pop().finished.cancel() - async def delete(self): + def delete(self): self.close() - async with self.blob_write_lock: - self.saved_verified_blob = False - if os.path.isfile(self.file_path): - os.remove(self.file_path) - self.verified.clear() - self.finished_writing.clear() + self.saved_verified_blob = False + if os.path.isfile(self.file_path): + os.remove(self.file_path) + self.verified.clear() + self.finished_writing.clear() def decrypt(self, key: bytes, iv: bytes) -> bytes: """ diff --git a/lbrynet/blob/blob_manager.py b/lbrynet/blob/blob_manager.py index 73c2b0573..84d0daf59 100644 --- a/lbrynet/blob/blob_manager.py +++ b/lbrynet/blob/blob_manager.py @@ -2,7 +2,6 @@ import os import typing import asyncio import logging -from sqlite3 import IntegrityError from lbrynet.extras.daemon.storage import SQLiteStorage from lbrynet.blob.blob_file import BlobFile, is_valid_blobhash from lbrynet.stream.descriptor import StreamDescriptor @@ -63,23 +62,21 @@ class BlobFileManager: blobs = [self.get_blob(b) for b in blob_hashes] return [blob.blob_hash for blob in blobs if blob.get_is_verified()] - async def delete_blob(self, blob_hash: str): - try: - blob = self.get_blob(blob_hash) - await blob.delete() - except Exception as e: - log.warning("Failed to delete blob file. Reason: %s", e) - if blob_hash in self.completed_blob_hashes: - self.completed_blob_hashes.remove(blob_hash) - if blob_hash in self.blobs: - del self.blobs[blob_hash] + def delete_blob(self, blob_hash: str): + if not is_valid_blobhash(blob_hash): + raise Exception("invalid blob hash to delete") + + if blob_hash not in self.blobs: + if os.path.isfile(os.path.join(self.blob_dir, blob_hash)): + os.remove(os.path.join(self.blob_dir, blob_hash)) + else: + self.blobs.pop(blob_hash).delete() + if blob_hash in self.completed_blob_hashes: + self.completed_blob_hashes.remove(blob_hash) async def delete_blobs(self, blob_hashes: typing.List[str], delete_from_db: typing.Optional[bool] = True): - bh_to_delete_from_db = [] - await asyncio.gather(*map(self.delete_blob, blob_hashes), loop=self.loop) + for blob_hash in blob_hashes: + self.delete_blob(blob_hash) + if delete_from_db: - try: - await self.storage.delete_blobs_from_db(bh_to_delete_from_db) - except IntegrityError as err: - if str(err) != "FOREIGN KEY constraint failed": - raise err + await self.storage.delete_blobs_from_db(blob_hashes) diff --git a/lbrynet/extras/daemon/storage.py b/lbrynet/extras/daemon/storage.py index b37909240..68e93145c 100644 --- a/lbrynet/extras/daemon/storage.py +++ b/lbrynet/extras/daemon/storage.py @@ -154,6 +154,16 @@ def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Di return files +def delete_stream(transaction: sqlite3.Connection, descriptor: 'StreamDescriptor'): + blob_hashes = [(blob.blob_hash, ) for blob in descriptor.blobs[:-1]] + blob_hashes.append((descriptor.sd_hash, )) + transaction.execute("delete from content_claim where stream_hash=? ", (descriptor.stream_hash,)) + transaction.execute("delete from file where stream_hash=? ", (descriptor.stream_hash,)) + transaction.execute("delete from stream_blob where stream_hash=?", (descriptor.stream_hash,)) + transaction.execute("delete from stream where stream_hash=? ", (descriptor.stream_hash,)) + transaction.executemany("delete from blob where blob_hash=?", blob_hashes) + + class SQLiteStorage(SQLiteMixin): CREATE_TABLES_QUERY = """ pragma foreign_keys=on; @@ -336,7 +346,7 @@ class SQLiteStorage(SQLiteMixin): transaction.executemany( "delete from blob where blob_hash=?;", [(blob_hash,) for blob_hash in blob_hashes] ) - return self.db.run(delete_blobs) + return self.db.run_with_foreign_keys_disabled(delete_blobs) def get_all_blob_hashes(self): return self.run_and_return_list("select blob_hash from blob") @@ -425,15 +435,7 @@ class SQLiteStorage(SQLiteMixin): ) def delete_stream(self, descriptor: 'StreamDescriptor'): - def _delete_stream(transaction: sqlite3.Connection): - transaction.execute("delete from content_claim where stream_hash=? ", (descriptor.stream_hash,)) - transaction.execute("delete from file where stream_hash=? ", (descriptor.stream_hash, )) - transaction.execute("delete from stream_blob where stream_hash=?", (descriptor.stream_hash, )) - transaction.execute("delete from stream where stream_hash=? ", (descriptor.stream_hash, )) - transaction.execute("delete from blob where blob_hash=?", (descriptor.sd_hash, )) - transaction.executemany("delete from blob where blob_hash=?", - [(blob.blob_hash, ) for blob in descriptor.blobs[:-1]]) - return self.db.run(_delete_stream) + return self.db.run_with_foreign_keys_disabled(delete_stream, descriptor) # # # # # # # # # file stuff # # # # # # # # # diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index b704da285..e911381e6 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -65,6 +65,7 @@ class StreamManager: self.resume_downloading_task: asyncio.Task = None self.re_reflect_task: asyncio.Task = None self.update_stream_finished_futs: typing.List[asyncio.Future] = [] + self.running_reflector_uploads: typing.List[asyncio.Task] = [] async def _update_content_claim(self, stream: ManagedStream): claim_info = await self.storage.get_content_claim(stream.stream_hash) @@ -200,6 +201,8 @@ class StreamManager: stream.stop_download() while self.update_stream_finished_futs: self.update_stream_finished_futs.pop().cancel() + while self.running_reflector_uploads: + self.running_reflector_uploads.pop().cancel() async def create_stream(self, file_path: str, key: typing.Optional[bytes] = None, iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream: @@ -208,7 +211,12 @@ class StreamManager: self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) if self.config.reflect_streams and self.config.reflector_servers: host, port = random.choice(self.config.reflector_servers) - self.loop.create_task(stream.upload_to_reflector(host, port)) + task = self.loop.create_task(stream.upload_to_reflector(host, port)) + self.running_reflector_uploads.append(task) + task.add_done_callback( + lambda _: None + if task not in self.running_reflector_uploads else self.running_reflector_uploads.remove(task) + ) return stream async def delete_stream(self, stream: ManagedStream, delete_file: typing.Optional[bool] = False): diff --git a/tests/integration/test_file_commands.py b/tests/integration/test_file_commands.py index 9f660d051..b9f501f5f 100644 --- a/tests/integration/test_file_commands.py +++ b/tests/integration/test_file_commands.py @@ -37,7 +37,7 @@ class FileCommands(CommandTestCase): self.assertIn('error', resp) self.assertEquals('Failed to download data blobs for sd hash %s within timeout' % sd_hash, resp['error']) await self.daemon.jsonrpc_file_delete(claim_name='foo') - await self.server.blob_manager.delete_blob(sd_hash) + await self.server.blob_manager.delete_blobs([sd_hash]) resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2) self.assertIn('error', resp) self.assertEquals('Failed to download sd blob %s within timeout' % sd_hash, resp['error'])