From 832537a5cfa92dc8600bd742fbf73d3aad6c7e5c Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 23 Apr 2019 09:53:17 -0400 Subject: [PATCH] set deleted downloads as streaming mode on startup --- lbrynet/extras/daemon/storage.py | 24 +++++++++++++++++++----- lbrynet/stream/stream_manager.py | 20 +++++++++++++------- 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/lbrynet/extras/daemon/storage.py b/lbrynet/extras/daemon/storage.py index 02612991a..52eef6eae 100644 --- a/lbrynet/extras/daemon/storage.py +++ b/lbrynet/extras/daemon/storage.py @@ -425,7 +425,7 @@ class SQLiteStorage(SQLiteMixin): return self.db.run(_sync_blobs) def sync_files_to_blobs(self): - def _sync_blobs(transaction: sqlite3.Connection) -> typing.Set[str]: + def _sync_blobs(transaction: sqlite3.Connection): transaction.executemany( "update file set status='stopped' where stream_hash=?", transaction.execute( @@ -435,6 +435,15 @@ class SQLiteStorage(SQLiteMixin): ) return self.db.run(_sync_blobs) + def set_files_as_streaming(self, stream_hashes: typing.List[str]): + def _set_streaming(transaction: sqlite3.Connection): + transaction.executemany( + "update file set file_name='{stream}', download_directory='{stream}' where stream_hash=?", + [(stream_hash, ) for stream_hash in stream_hashes] + ) + + return self.db.run(_set_streaming) + # # # # # # # # # stream functions # # # # # # # # # async def stream_exists(self, sd_hash: str) -> bool: @@ -526,10 +535,15 @@ class SQLiteStorage(SQLiteMixin): log.debug("update file status %s -> %s", stream_hash, new_status) return self.db.execute("update file set status=? where stream_hash=?", (new_status, stream_hash)) - def change_file_download_dir_and_file_name(self, stream_hash: str, download_dir: str, file_name: str): - return self.db.execute("update file set download_directory=?, file_name=? where stream_hash=?", ( - binascii.hexlify(download_dir.encode()).decode(), binascii.hexlify(file_name.encode()).decode(), - stream_hash + async def change_file_download_dir_and_file_name(self, stream_hash: str, download_dir: typing.Optional[str], + file_name: typing.Optional[str]): + if not file_name or not download_dir: + encoded_file_name, encoded_download_dir = "{stream}", "{stream}" + else: + encoded_file_name = binascii.hexlify(file_name.encode()).decode() + encoded_download_dir = binascii.hexlify(download_dir.encode()).decode() + return await self.db.execute("update file set download_directory=?, file_name=? where stream_hash=?", ( + encoded_download_dir, encoded_file_name, stream_hash, )) async def recover_streams(self, descriptors_and_sds: typing.List[typing.Tuple['StreamDescriptor', 'BlobFile']], diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index 2b896d16e..14a310e22 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -54,13 +54,7 @@ comparison_operators = { def path_or_none(p) -> typing.Optional[str]: - try: - return binascii.unhexlify(p).decode() - except binascii.Error as err: - if p == '{stream}': - return None - raise err - + return None if p == '{stream}' else binascii.unhexlify(p).decode() class StreamManager: def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobManager', @@ -155,6 +149,18 @@ class StreamManager: # log.info("Attempting to recover %i streams", len(to_recover)) await self.recover_streams(to_recover) + if self.config.streaming_only: + to_set_as_streaming = [] + for index in range(len(to_start)): + file_name = path_or_none(to_start[index]['file_name']) + download_dir = path_or_none(to_start[index]['download_directory']) + if file_name and download_dir and not os.path.isfile(os.path.join(file_name, download_dir)): + to_start[index]['file_name'], to_start[index]['download_directory'] = '{stream}', '{stream}' + to_set_as_streaming.append(to_start[index]['stream_hash']) + + if to_set_as_streaming: + await self.storage.set_files_as_streaming(to_set_as_streaming) + log.info("Initializing %i files", len(to_start)) if to_start: await asyncio.gather(*[