From b6cedfec56b09bf7b5e5912a51eb22b4724982b7 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 9 May 2018 10:50:44 -0300 Subject: [PATCH] batch-start the file manager --- lbrynet/database/storage.py | 38 +++++++++++++++++++ .../file_manager/EncryptedFileDownloader.py | 19 ++++++---- lbrynet/file_manager/EncryptedFileManager.py | 9 +++-- 3 files changed, 55 insertions(+), 11 deletions(-) diff --git a/lbrynet/database/storage.py b/lbrynet/database/storage.py index 8f7a4b2cf..aae8f180a 100644 --- a/lbrynet/database/storage.py +++ b/lbrynet/database/storage.py @@ -714,6 +714,44 @@ class SQLiteStorage(object): ) defer.returnValue(result) + @defer.inlineCallbacks + def get_claims_from_stream_hashes(self, stream_hashes, include_supports=True): + def _batch_get_claim(transaction): + results = {} + bind = "({})".format(','.join('?' for _ in range(len(stream_hashes)))) + claim_infos = transaction.execute( + "select content_claim.stream_hash, c.*, " + "case when c.channel_claim_id is not null then " + "(select claim_name from claim where claim_id==c.channel_claim_id) " + "else null end as channel_name from content_claim " + "inner join claim c on c.claim_outpoint=content_claim.claim_outpoint " + "and content_claim.stream_hash in {} order by c.rowid desc".format(bind), + tuple(stream_hashes) + ).fetchall() + for claim_info in claim_infos: + channel_name = claim_info[-1] + stream_hash = claim_info[0] + result = _format_claim_response(*claim_info[1:-1]) + if channel_name: + result['channel_name'] = channel_name + results[stream_hash] = result + return results + + claims = yield self.db.runInteraction(_batch_get_claim) + if include_supports: + all_supports = {} + for support in (yield self.get_supports(*[claim['claim_id'] for claim in claims.values()])): + all_supports.setdefault(support['claim_id'], []).append(support) + for stream_hash in claims.keys(): + claim = claims[stream_hash] + supports = all_supports.get(claim['claim_id'], []) + claim['supports'] = supports + claim['effective_amount'] = float( + sum([support['amount'] for support in supports]) + claim['amount'] + ) + claims[stream_hash] = claim + defer.returnValue(claims) + @defer.inlineCallbacks def get_claim(self, claim_outpoint, include_supports=True): def _get_claim(transaction): diff --git a/lbrynet/file_manager/EncryptedFileDownloader.py b/lbrynet/file_manager/EncryptedFileDownloader.py index 2e2a054c1..25abd3e18 100644 --- a/lbrynet/file_manager/EncryptedFileDownloader.py +++ b/lbrynet/file_manager/EncryptedFileDownloader.py @@ -56,18 +56,21 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): self.channel_name = None self.metadata = None + def set_claim_info(self, claim_info): + self.claim_id = claim_info['claim_id'] + self.txid = claim_info['txid'] + self.nout = claim_info['nout'] + self.channel_claim_id = claim_info['channel_claim_id'] + self.outpoint = "%s:%i" % (self.txid, self.nout) + self.claim_name = claim_info['name'] + self.channel_name = claim_info['channel_name'] + self.metadata = claim_info['value']['stream']['metadata'] + @defer.inlineCallbacks def get_claim_info(self, include_supports=True): claim_info = yield self.storage.get_content_claim(self.stream_hash, include_supports) if claim_info: - self.claim_id = claim_info['claim_id'] - self.txid = claim_info['txid'] - self.nout = claim_info['nout'] - self.channel_claim_id = claim_info['channel_claim_id'] - self.outpoint = "%s:%i" % (self.txid, self.nout) - self.claim_name = claim_info['name'] - self.channel_name = claim_info['channel_name'] - self.metadata = claim_info['value']['stream']['metadata'] + self.set_claim_info(claim_info) defer.returnValue(claim_info) diff --git a/lbrynet/file_manager/EncryptedFileManager.py b/lbrynet/file_manager/EncryptedFileManager.py index 73cc3fb12..d28006dbd 100644 --- a/lbrynet/file_manager/EncryptedFileManager.py +++ b/lbrynet/file_manager/EncryptedFileManager.py @@ -97,13 +97,14 @@ class EncryptedFileManager(object): ) @defer.inlineCallbacks - def _start_lbry_file(self, file_info, payment_rate_manager, verify_stream): + def _start_lbry_file(self, file_info, payment_rate_manager, verify_stream, claim_info): lbry_file = self._get_lbry_file( file_info['row_id'], file_info['stream_hash'], payment_rate_manager, file_info['sd_hash'], file_info['key'], file_info['stream_name'], file_info['file_name'], file_info['download_directory'], file_info['suggested_file_name'] ) - yield lbry_file.get_claim_info() + if claim_info: + lbry_file.set_claim_info(claim_info) try: # verify if the stream is valid (we might have downloaded an invalid stream # in the past when the validation check didn't work. This runs after every @@ -130,13 +131,15 @@ class EncryptedFileManager(object): @defer.inlineCallbacks def _start_lbry_files(self, verify_streams): files = yield self.session.storage.get_all_lbry_files() + claim_infos = yield self.session.storage.get_claims_from_stream_hashes([file['stream_hash'] for file in files]) b_prm = self.session.base_payment_rate_manager payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker) log.info("Starting %i files", len(files)) dl = [] for file_info in files: - dl.append(self._start_lbry_file(file_info, payment_rate_manager, verify_streams)) + claim_info = claim_infos.get(file_info['stream_hash']) + dl.append(self._start_lbry_file(file_info, payment_rate_manager, verify_streams, claim_info)) yield defer.DeferredList(dl)