From 868110a6f261ae2aa98faaa35d1eba62bbd9ad2e Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 8 Feb 2019 15:52:13 -0500 Subject: [PATCH] populate stream manager with a single batch query --- lbrynet/extras/daemon/storage.py | 68 +++++++++++++++++--------------- lbrynet/stream/stream_manager.py | 2 + 2 files changed, 39 insertions(+), 31 deletions(-) diff --git a/lbrynet/extras/daemon/storage.py b/lbrynet/extras/daemon/storage.py index abdacfe6f..63f663eaa 100644 --- a/lbrynet/extras/daemon/storage.py +++ b/lbrynet/extras/daemon/storage.py @@ -113,6 +113,41 @@ def _batched_select(transaction, query, parameters): yield result +def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Dict]: + return [ + { + "row_id": rowid, + "stream_hash": stream_hash, + "file_name": file_name, # hex + "download_directory": download_dir, # hex + "blob_data_rate": data_rate, + "status": status, + "sd_hash": sd_hash, + "key": stream_key, + "stream_name": stream_name, # hex + "suggested_file_name": suggested_file_name, # hex + "claim": StoredStreamClaim(stream_hash, *claim_args) + } for (rowid, stream_hash, file_name, download_dir, data_rate, status, _, sd_hash, stream_key, + stream_name, suggested_file_name, *claim_args) in _batched_select( + transaction, "select file.rowid, file.*, stream.*, c.*, case when c.channel_claim_id is not null " + " then (select claim_name from claim where claim_id==c.channel_claim_id) " + " else null end as channel_name " + "from file inner join stream on file.stream_hash=stream.stream_hash " + "inner join content_claim cc on file.stream_hash=cc.stream_hash " + "inner join claim c on cc.claim_outpoint=c.claim_outpoint " + "where file.stream_hash in {} " + "order by c.rowid desc", + [ + stream_hash + for (stream_hash,) in transaction.execute("select stream_hash from file") + ] + ) + ] + + + + + class SQLiteStorage(SQLiteMixin): CREATE_TABLES_QUERY = """ pragma foreign_keys=on; @@ -407,37 +442,8 @@ class SQLiteStorage(SQLiteMixin): binascii.hexlify(download_directory.encode()).decode(), data_payment_rate, status) ) - async def get_all_lbry_files(self) -> typing.List[typing.Dict]: - def _lbry_file_dict(rowid, stream_hash, file_name, download_dir, data_rate, status, _, sd_hash, stream_key, - stream_name, suggested_file_name) -> typing.Dict: - return { - "row_id": rowid, - "stream_hash": stream_hash, - "file_name": file_name, - "download_directory": download_dir, - "blob_data_rate": data_rate, - "status": status, - "sd_hash": sd_hash, - "key": stream_key, - "stream_name": stream_name, - "suggested_file_name": suggested_file_name - } - - def _get_all_files(transaction: sqlite3.Connection) -> typing.List[typing.Dict]: - file_infos = list(map(lambda a: _lbry_file_dict(*a), transaction.execute( - "select file.rowid, file.*, stream.* " - "from file inner join stream on file.stream_hash=stream.stream_hash" - ).fetchall())) - stream_hashes = [file_info['stream_hash'] for file_info in file_infos] - claim_infos = get_claims_from_stream_hashes(transaction, stream_hashes) - for index in range(len(file_infos)): # pylint: disable=consider-using-enumerate - file_infos[index]['claim'] = claim_infos.get(file_infos[index]['stream_hash']) - return file_infos - - results = await self.db.run(_get_all_files) - if results: - return results - return [] + def get_all_lbry_files(self) -> typing.List[typing.Dict]: + return self.db.run(get_all_lbry_files) def change_file_status(self, stream_hash: str, new_status: str): log.info("update file status %s -> %s", stream_hash, new_status) diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index e587b6e07..2b140d669 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -143,7 +143,9 @@ class StreamManager: self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) async def load_streams_from_database(self): + log.info("Initializing stream manager from %s", self.storage._db_path) file_infos = await self.storage.get_all_lbry_files() + log.info("Initializing %i files", len(file_infos)) await asyncio.gather(*[ self.add_stream( file_info['sd_hash'], binascii.unhexlify(file_info['file_name']).decode(),