From a5524d490c4b489a24334706e9db6f4caa7fd330 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 22 Jan 2019 12:47:46 -0500 Subject: [PATCH] async lbrynet.blob --- lbrynet/blob/CryptBlob.py | 166 ------------ lbrynet/blob/CryptStreamCreator.py | 149 ----------- lbrynet/blob/EncryptedFileCreator.py | 132 ---------- lbrynet/blob/EncryptedFileDownloader.py | 189 -------------- lbrynet/blob/EncryptedFileManager.py | 254 ------------------ lbrynet/blob/EncryptedFileStatusReport.py | 6 - lbrynet/blob/__init__.py | 6 + lbrynet/blob/blob_file.py | 302 ++++++++++------------ lbrynet/blob/blob_info.py | 19 ++ lbrynet/blob/blob_manager.py | 89 +++++++ lbrynet/blob/creator.py | 51 ---- lbrynet/blob/reader.py | 30 --- lbrynet/blob/writer.py | 83 +++--- lbrynet/p2p/BlobInfo.py | 18 -- lbrynet/p2p/BlobManager.py | 134 ---------- tests/unit/blob/__init__.py | 0 tests/unit/blob/test_blob_file.py | 35 +++ 17 files changed, 333 insertions(+), 1330 deletions(-) delete mode 100644 lbrynet/blob/CryptBlob.py delete mode 100644 lbrynet/blob/CryptStreamCreator.py delete mode 100644 lbrynet/blob/EncryptedFileCreator.py delete mode 100644 lbrynet/blob/EncryptedFileDownloader.py delete mode 100644 lbrynet/blob/EncryptedFileManager.py delete mode 100644 lbrynet/blob/EncryptedFileStatusReport.py create mode 100644 lbrynet/blob/blob_info.py create mode 100644 lbrynet/blob/blob_manager.py delete mode 100644 lbrynet/blob/creator.py delete mode 100644 lbrynet/blob/reader.py delete mode 100644 lbrynet/p2p/BlobInfo.py delete mode 100644 lbrynet/p2p/BlobManager.py create mode 100644 tests/unit/blob/__init__.py create mode 100644 tests/unit/blob/test_blob_file.py diff --git a/lbrynet/blob/CryptBlob.py b/lbrynet/blob/CryptBlob.py deleted file mode 100644 index 7538b11d8..000000000 --- a/lbrynet/blob/CryptBlob.py +++ /dev/null @@ -1,166 +0,0 @@ -import binascii -import logging -from io import BytesIO -from twisted.internet import defer -from twisted.web.client import FileBodyProducer -from cryptography.hazmat.primitives.ciphers import Cipher, modes -from cryptography.hazmat.primitives.ciphers.algorithms import AES -from cryptography.hazmat.primitives.padding import PKCS7 -from cryptography.hazmat.backends import default_backend -from lbrynet.p2p.BlobInfo import BlobInfo -from lbrynet.blob.blob_file import MAX_BLOB_SIZE - -log = logging.getLogger(__name__) -backend = default_backend() - - -class CryptBlobInfo(BlobInfo): - def __init__(self, blob_hash, blob_num, length, iv): - super().__init__(blob_hash, blob_num, length) - self.iv = iv - - def get_dict(self): - info = { - "blob_num": self.blob_num, - "length": self.length, - "iv": self.iv.decode() - } - if self.blob_hash: - info['blob_hash'] = self.blob_hash - return info - - -class StreamBlobDecryptor: - def __init__(self, blob, key, iv, length): - """ - This class decrypts blob - - blob - object which implements read() function. - key = encryption_key - iv = initialization vector - blob_num = blob number (has no effect on encryption) - length = length in bytes of blob - """ - self.blob = blob - self.key = key - self.iv = iv - self.length = length - self.buff = b'' - self.len_read = 0 - cipher = Cipher(AES(self.key), modes.CBC(self.iv), backend=backend) - self.unpadder = PKCS7(AES.block_size).unpadder() - self.cipher = cipher.decryptor() - - def decrypt(self, write_func): - """ - Decrypt blob and write its content using write_func - - write_func - function that takes decrypted string as - argument and writes it somewhere - - Returns: - - deferred that returns after decrypting blob and writing content - """ - - def remove_padding(data): - return self.unpadder.update(data) + self.unpadder.finalize() - - def write_bytes(): - if self.len_read < self.length: - num_bytes_to_decrypt = greatest_multiple(len(self.buff), (AES.block_size // 8)) - data_to_decrypt, self.buff = split(self.buff, num_bytes_to_decrypt) - write_func(self.cipher.update(data_to_decrypt)) - - def finish_decrypt(): - bytes_left = len(self.buff) % (AES.block_size // 8) - if bytes_left != 0: - log.warning(binascii.hexlify(self.buff[-1 * (AES.block_size // 8):]).decode()) - raise Exception("blob %s has incorrect padding: %i bytes left" % - (self.blob.blob_hash, bytes_left)) - data_to_decrypt, self.buff = self.buff, b'' - last_chunk = self.cipher.update(data_to_decrypt) + self.cipher.finalize() - write_func(remove_padding(last_chunk)) - - - read_handle = self.blob.open_for_reading() - - @defer.inlineCallbacks - def decrypt_bytes(): - producer = FileBodyProducer(read_handle) - buff = BytesIO() - yield producer.startProducing(buff) - self.buff = buff.getvalue() - self.len_read += len(self.buff) - write_bytes() - finish_decrypt() - - d = decrypt_bytes() - return d - - -class CryptStreamBlobMaker: - def __init__(self, key, iv, blob_num, blob): - """ - This class encrypts data and writes it to a new blob - - key = encryption_key - iv = initialization vector - blob_num = blob number (has no effect on encryption) - blob = object which implements write(), close() function , close() function must - be a deferred. (Will generally be of HashBlobCreator type) - """ - self.key = key - self.iv = iv - self.blob_num = blob_num - self.blob = blob - cipher = Cipher(AES(self.key), modes.CBC(self.iv), backend=backend) - self.padder = PKCS7(AES.block_size).padder() - self.cipher = cipher.encryptor() - self.length = 0 - - def write(self, data): - """ - encrypt and write string data - - Returns: - tuple (done, num_bytes_to_write) where done is True if - max bytes are written. num_bytes_to_write is the number - of bytes that will be written from data in this call - """ - max_bytes_to_write = MAX_BLOB_SIZE - self.length - 1 - done = False - if max_bytes_to_write <= len(data): - num_bytes_to_write = max_bytes_to_write - done = True - else: - num_bytes_to_write = len(data) - data_to_write = data[:num_bytes_to_write] - self.length += len(data_to_write) - padded_data = self.padder.update(data_to_write) - encrypted_data = self.cipher.update(padded_data) - self.blob.write(encrypted_data) - return done, num_bytes_to_write - - @defer.inlineCallbacks - def close(self): - log.debug("closing blob %s with plaintext len %s", str(self.blob_num), str(self.length)) - if self.length != 0: - self.length += (AES.block_size // 8) - (self.length % (AES.block_size // 8)) - padded_data = self.padder.finalize() - encrypted_data = self.cipher.update(padded_data) + self.cipher.finalize() - self.blob.write(encrypted_data) - - blob_hash = yield self.blob.close() - log.debug("called the finished_callback from CryptStreamBlobMaker.close") - blob = CryptBlobInfo(blob_hash, self.blob_num, self.length, binascii.hexlify(self.iv)) - defer.returnValue(blob) - - -def greatest_multiple(a, b): - """return the largest value `c`, that is a multiple of `b` and is <= `a`""" - return (a // b) * b - - -def split(buff, cutoff): - return buff[:cutoff], buff[cutoff:] diff --git a/lbrynet/blob/CryptStreamCreator.py b/lbrynet/blob/CryptStreamCreator.py deleted file mode 100644 index e69eda6d9..000000000 --- a/lbrynet/blob/CryptStreamCreator.py +++ /dev/null @@ -1,149 +0,0 @@ -""" -Utility for creating Crypt Streams, which are encrypted blobs and associated metadata. -""" -import os -import logging - -from cryptography.hazmat.primitives.ciphers.algorithms import AES -from twisted.internet import defer -from lbrynet.blob.CryptBlob import CryptStreamBlobMaker - - -log = logging.getLogger(__name__) - - -class CryptStreamCreator: - """ - Create a new stream with blobs encrypted by a symmetric cipher. - - Each blob is encrypted with the same key, but each blob has its - own initialization vector which is associated with the blob when - the blob is associated with the stream. - """ - - #implements(interfaces.IConsumer) - - def __init__(self, blob_manager, name=None, key=None, iv_generator=None): - """@param blob_manager: Object that stores and provides access to blobs. - @type blob_manager: BlobManager - - @param name: the name of the stream, which will be presented to the user - @type name: string - - @param key: the raw AES key which will be used to encrypt the - blobs. If None, a random key will be generated. - @type key: string - - @param iv_generator: a generator which yields initialization - vectors for the blobs. Will be called once for each blob. - @type iv_generator: a generator function which yields strings - - @return: None - """ - self.blob_manager = blob_manager - self.name = name - self.key = key - if iv_generator is None: - self.iv_generator = self.random_iv_generator() - else: - self.iv_generator = iv_generator - - self.stopped = True - self.producer = None - self.streaming = None - self.blob_count = -1 - self.current_blob = None - self.finished_deferreds = [] - - def registerProducer(self, producer, streaming): - from twisted.internet import reactor - - self.producer = producer - self.streaming = streaming - self.stopped = False - if streaming is False: - reactor.callLater(0, self.producer.resumeProducing) - - def unregisterProducer(self): - self.stopped = True - self.producer = None - - def _close_current_blob(self): - # close the blob that was being written to - # and save it to blob manager - should_announce = self.blob_count == 0 - d = self.current_blob.close() - d.addCallback(self._blob_finished) - d.addCallback(lambda blob_info: self.blob_manager.creator_finished(blob_info, - should_announce)) - self.finished_deferreds.append(d) - self.current_blob = None - - def stop(self): - """Stop creating the stream. Create the terminating zero-length blob.""" - log.debug("stop has been called for StreamCreator") - self.stopped = True - if self.current_blob is not None: - self._close_current_blob() - d = self._finalize() - d.addCallback(lambda _: self._finished()) - return d - - # TODO: move the stream creation process to its own thread and - # remove the reactor from this process. - def write(self, data): - from twisted.internet import reactor - self._write(data) - if self.stopped is False and self.streaming is False: - reactor.callLater(0, self.producer.resumeProducing) - - @staticmethod - def random_iv_generator(): - while 1: - yield os.urandom(AES.block_size // 8) - - def setup(self): - """Create the symmetric key if it wasn't provided""" - - if self.key is None: - self.key = os.urandom(AES.block_size // 8) - - return defer.succeed(True) - - @defer.inlineCallbacks - def _finalize(self): - """ - Finalize a stream by adding an empty - blob at the end, this is to indicate that - the stream has ended. This empty blob is not - saved to the blob manager - """ - - yield defer.DeferredList(self.finished_deferreds) - self.blob_count += 1 - iv = next(self.iv_generator) - final_blob = self._get_blob_maker(iv, self.blob_manager.get_blob_creator()) - stream_terminator = yield final_blob.close() - terminator_info = yield self._blob_finished(stream_terminator) - defer.returnValue(terminator_info) - - def _write(self, data): - while len(data) > 0: - if self.current_blob is None: - self.next_blob_creator = self.blob_manager.get_blob_creator() - self.blob_count += 1 - iv = next(self.iv_generator) - self.current_blob = self._get_blob_maker(iv, self.next_blob_creator) - done, num_bytes_written = self.current_blob.write(data) - data = data[num_bytes_written:] - if done is True: - self._close_current_blob() - - def _get_blob_maker(self, iv, blob_creator): - return CryptStreamBlobMaker(self.key, iv, self.blob_count, blob_creator) - - def _finished(self): - raise NotImplementedError() - - def _blob_finished(self, blob_info): - raise NotImplementedError() diff --git a/lbrynet/blob/EncryptedFileCreator.py b/lbrynet/blob/EncryptedFileCreator.py deleted file mode 100644 index d101e3575..000000000 --- a/lbrynet/blob/EncryptedFileCreator.py +++ /dev/null @@ -1,132 +0,0 @@ -""" -Utilities for turning plain files into LBRY Files. -""" - -import os -import logging -from binascii import hexlify - -from twisted.internet import defer -from twisted.protocols.basic import FileSender - -from lbrynet.extras.compat import f2d -from lbrynet.p2p.StreamDescriptor import BlobStreamDescriptorWriter, EncryptedFileStreamType -from lbrynet.p2p.StreamDescriptor import format_sd_info, get_stream_hash, validate_descriptor -from lbrynet.blob.CryptStreamCreator import CryptStreamCreator - -log = logging.getLogger(__name__) - - -class EncryptedFileStreamCreator(CryptStreamCreator): - """ - A CryptStreamCreator which adds itself and its additional metadata to an EncryptedFileManager - """ - - def __init__(self, blob_manager, lbry_file_manager, stream_name=None, - key=None, iv_generator=None): - super().__init__(blob_manager, stream_name, key, iv_generator) - self.lbry_file_manager = lbry_file_manager - self.stream_hash = None - self.blob_infos = [] - self.sd_info = None - - def _blob_finished(self, blob_info): - log.debug("length: %s", blob_info.length) - self.blob_infos.append(blob_info.get_dict()) - return blob_info - - def _finished(self): - # calculate the stream hash - self.stream_hash = get_stream_hash( - hexlify(self.name.encode()).decode(), hexlify(self.key).decode(), hexlify(self.name.encode()).decode(), - self.blob_infos - ) - - # generate the sd info - self.sd_info = format_sd_info( - EncryptedFileStreamType, hexlify(self.name.encode()).decode(), hexlify(self.key).decode(), - hexlify(self.name.encode()).decode(), self.stream_hash, self.blob_infos - ) - - # sanity check - validate_descriptor(self.sd_info) - return defer.succeed(self.stream_hash) - - -# TODO: this should be run its own thread. Encrypting a large file can -# be very cpu intensive and there is no need to run that on the -# main reactor thread. The FileSender mechanism that is used is -# great when sending over the network, but this is all local so -# we can simply read the file from the disk without needing to -# involve reactor. -@defer.inlineCallbacks -def create_lbry_file(blob_manager, storage, payment_rate_manager, lbry_file_manager, file_name, file_handle, - key=None, iv_generator=None): - """Turn a plain file into an LBRY File. - - An LBRY File is a collection of encrypted blobs of data and the metadata that binds them - together which, when decrypted and put back together according to the metadata, results - in the original file. - - The stream parameters that aren't specified are generated, the file is read and broken - into chunks and encrypted, and then a stream descriptor file with the stream parameters - and other metadata is written to disk. - - @param session: An Session object. - @type session: Session - - @param lbry_file_manager: The EncryptedFileManager object this LBRY File will be added to. - @type lbry_file_manager: EncryptedFileManager - - @param file_name: The path to the plain file. - @type file_name: string - - @param file_handle: The file-like object to read - @type file_handle: any file-like object which can be read by twisted.protocols.basic.FileSender - - @param key: the raw AES key which will be used to encrypt the blobs. If None, a random key will - be generated. - @type key: string - - @param iv_generator: a generator which yields initialization - vectors for the blobs. Will be called once for each blob. - @type iv_generator: a generator function which yields strings - - @return: a Deferred which fires with the stream_hash of the LBRY File - @rtype: Deferred which fires with hex-encoded string - """ - - base_file_name = os.path.basename(file_name) - file_directory = os.path.dirname(file_handle.name) - - lbry_file_creator = EncryptedFileStreamCreator( - blob_manager, lbry_file_manager, base_file_name, key, iv_generator - ) - - yield lbry_file_creator.setup() - # TODO: Using FileSender isn't necessary, we can just read - # straight from the disk. The stream creation process - # should be in its own thread anyway so we don't need to - # worry about interacting with the twisted reactor - file_sender = FileSender() - yield file_sender.beginFileTransfer(file_handle, lbry_file_creator) - - log.debug("the file sender has triggered its deferred. stopping the stream writer") - yield lbry_file_creator.stop() - - log.debug("making the sd blob") - sd_info = lbry_file_creator.sd_info - descriptor_writer = BlobStreamDescriptorWriter(blob_manager) - sd_hash = yield descriptor_writer.create_descriptor(sd_info) - - log.debug("saving the stream") - yield f2d(storage.store_stream( - sd_info['stream_hash'], sd_hash, sd_info['stream_name'], sd_info['key'], - sd_info['suggested_file_name'], sd_info['blobs'] - )) - log.debug("adding to the file manager") - lbry_file = yield f2d(lbry_file_manager.add_published_file( - sd_info['stream_hash'], sd_hash, hexlify(file_directory.encode()), payment_rate_manager, - payment_rate_manager.min_blob_data_payment_rate - )) - defer.returnValue(lbry_file) diff --git a/lbrynet/blob/EncryptedFileDownloader.py b/lbrynet/blob/EncryptedFileDownloader.py deleted file mode 100644 index e66b87192..000000000 --- a/lbrynet/blob/EncryptedFileDownloader.py +++ /dev/null @@ -1,189 +0,0 @@ -""" -Download LBRY Files from LBRYnet and save them to disk. -""" -import logging -from binascii import hexlify, unhexlify - -from twisted.internet import defer -from lbrynet.conf import Config -from lbrynet.extras.compat import f2d -from lbrynet.p2p.client.StreamProgressManager import FullStreamProgressManager -from lbrynet.p2p.HTTPBlobDownloader import HTTPBlobDownloader -from lbrynet.utils import short_hash -from lbrynet.blob.client.EncryptedFileDownloader import EncryptedFileSaver -from lbrynet.blob.EncryptedFileStatusReport import EncryptedFileStatusReport -from lbrynet.p2p.StreamDescriptor import save_sd_info - -log = logging.getLogger(__name__) - - -def log_status(sd_hash, status): - if status == ManagedEncryptedFileDownloader.STATUS_RUNNING: - status_string = "running" - elif status == ManagedEncryptedFileDownloader.STATUS_STOPPED: - status_string = "stopped" - elif status == ManagedEncryptedFileDownloader.STATUS_FINISHED: - status_string = "finished" - else: - status_string = "unknown" - log.debug("stream %s is %s", short_hash(sd_hash), status_string) - - -class ManagedEncryptedFileDownloader(EncryptedFileSaver): - STATUS_RUNNING = "running" - STATUS_STOPPED = "stopped" - STATUS_FINISHED = "finished" - - def __init__(self, conf: Config, rowid, stream_hash, peer_finder, rate_limiter, blob_manager, storage, - lbry_file_manager, payment_rate_manager, wallet, download_directory, file_name, stream_name, - sd_hash, key, suggested_file_name, download_mirrors=None): - super().__init__( - conf, stream_hash, peer_finder, rate_limiter, blob_manager, storage, payment_rate_manager, - wallet, download_directory, key, stream_name, file_name - ) - self.sd_hash = sd_hash - self.rowid = rowid - self.suggested_file_name = unhexlify(suggested_file_name).decode() - self.lbry_file_manager = lbry_file_manager - self._saving_status = False - self.claim_id = None - self.outpoint = None - self.claim_name = None - self.txid = None - self.nout = None - self.channel_claim_id = None - self.channel_name = None - self.metadata = None - self.mirror = None - if download_mirrors or conf.download_mirrors: - self.mirror = HTTPBlobDownloader( - self.blob_manager, servers=download_mirrors or conf.download_mirrors - ) - - def set_claim_info(self, claim_info): - self.claim_id = claim_info['claim_id'] - self.txid = claim_info['txid'] - self.nout = claim_info['nout'] - self.channel_claim_id = claim_info['channel_claim_id'] - self.outpoint = "%s:%i" % (self.txid, self.nout) - self.claim_name = claim_info['name'] - self.channel_name = claim_info['channel_name'] - self.metadata = claim_info['value']['stream']['metadata'] - - async def get_claim_info(self, include_supports=True): - claim_info = await self.storage.get_content_claim(self.stream_hash, include_supports) - if claim_info: - self.set_claim_info(claim_info) - return claim_info - - @property - def saving_status(self): - return self._saving_status - - def restore(self, status): - if status == ManagedEncryptedFileDownloader.STATUS_RUNNING: - # start returns self.finished_deferred - # which fires when we've finished downloading the file - # and we don't want to wait for the entire download - self.start() - elif status == ManagedEncryptedFileDownloader.STATUS_STOPPED: - pass - elif status == ManagedEncryptedFileDownloader.STATUS_FINISHED: - self.completed = True - else: - raise Exception(f"Unknown status for stream {self.stream_hash}: {status}") - - @defer.inlineCallbacks - def stop(self, err=None, change_status=True): - log.debug('Stopping download for stream %s', short_hash(self.stream_hash)) - if self.mirror: - self.mirror.stop() - # EncryptedFileSaver deletes metadata when it's stopped. We don't want that here. - yield super().stop(err) - if change_status is True: - status = yield self._save_status() - defer.returnValue(status) - - async def status(self): - blobs = await self.storage.get_blobs_for_stream(self.stream_hash) - blob_hashes = [b.blob_hash for b in blobs if b.blob_hash is not None] - completed_blobs = self.blob_manager.completed_blobs(blob_hashes) - num_blobs_completed = len(completed_blobs) - num_blobs_known = len(blob_hashes) - - if self.completed: - status = "completed" - elif self.stopped: - status = "stopped" - else: - status = "running" - - return EncryptedFileStatusReport( - self.file_name, num_blobs_completed, num_blobs_known, status - ) - - @defer.inlineCallbacks - def _start(self): - yield EncryptedFileSaver._start(self) - status = yield self._save_status() - log_status(self.sd_hash, status) - if self.mirror: - self.mirror.download_stream(self.stream_hash, self.sd_hash) - defer.returnValue(status) - - def _get_finished_deferred_callback_value(self): - if self.completed is True: - return "Download successful" - else: - return "Download stopped" - - @defer.inlineCallbacks - def _save_status(self): - self._saving_status = True - if self.completed is True: - status = ManagedEncryptedFileDownloader.STATUS_FINISHED - elif self.stopped is True: - status = ManagedEncryptedFileDownloader.STATUS_STOPPED - else: - status = ManagedEncryptedFileDownloader.STATUS_RUNNING - status = yield self.lbry_file_manager.change_lbry_file_status(self, status) - self._saving_status = False - return status - - def save_status(self): - return self._save_status() - - def _get_progress_manager(self, download_manager): - return FullStreamProgressManager(self._finished_downloading, - self.blob_manager, download_manager) - - -class ManagedEncryptedFileDownloaderFactory: - #implements(IStreamDownloaderFactory) - - def __init__(self, lbry_file_manager, blob_manager): - self.lbry_file_manager = lbry_file_manager - self.blob_manager = blob_manager - - def can_download(self, sd_validator): - # TODO: add a sd_validator for non live streams, use it - return True - - @defer.inlineCallbacks - def make_downloader(self, metadata, data_rate, payment_rate_manager, download_directory, file_name=None, - download_mirrors=None): - stream_hash = yield save_sd_info(self.blob_manager, - metadata.source_blob_hash, - metadata.validator.raw_info) - if file_name: - file_name = hexlify(file_name.encode()) - hex_download_directory = hexlify(download_directory.encode()) - lbry_file = yield f2d(self.lbry_file_manager.add_downloaded_file( - stream_hash, metadata.source_blob_hash, hex_download_directory, payment_rate_manager, - data_rate, file_name=file_name, download_mirrors=download_mirrors - )) - defer.returnValue(lbry_file) - - @staticmethod - def get_description(): - return "Save the file to disk" diff --git a/lbrynet/blob/EncryptedFileManager.py b/lbrynet/blob/EncryptedFileManager.py deleted file mode 100644 index 0b01561a3..000000000 --- a/lbrynet/blob/EncryptedFileManager.py +++ /dev/null @@ -1,254 +0,0 @@ -""" -Keep track of which LBRY Files are downloading and store their LBRY File specific metadata -""" -import os -import logging -import random -from binascii import hexlify, unhexlify - -from twisted.internet import defer, task, reactor -from twisted.python.failure import Failure -from lbrynet.conf import Config -from lbrynet.extras.compat import f2d -from lbrynet.extras.reflector.reupload import reflect_file -from lbrynet.blob.EncryptedFileDownloader import ManagedEncryptedFileDownloader -from lbrynet.blob.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory -from lbrynet.p2p.StreamDescriptor import EncryptedFileStreamType, get_sd_info -from lbrynet.blob.client.CryptStreamDownloader import AlreadyStoppedError -from lbrynet.blob.client.CryptStreamDownloader import CurrentlyStoppingError -from lbrynet.utils import safe_start_looping_call, safe_stop_looping_call - -log = logging.getLogger(__name__) - - -class EncryptedFileManager: - """ - Keeps track of currently opened LBRY Files, their options, and - their LBRY File specific metadata. - """ - # when reflecting files, reflect up to this many files at a time - CONCURRENT_REFLECTS = 5 - - def __init__(self, conf: Config, peer_finder, rate_limiter, blob_manager, wallet, - payment_rate_manager, storage, sd_identifier): - self.conf = conf - self.auto_re_reflect = conf.reflect_uploads and conf.auto_re_reflect_interval > 0 - self.auto_re_reflect_interval = conf.auto_re_reflect_interval - self.peer_finder = peer_finder - self.rate_limiter = rate_limiter - self.blob_manager = blob_manager - self.wallet = wallet - self.payment_rate_manager = payment_rate_manager - self.storage = storage - # TODO: why is sd_identifier part of the file manager? - self.sd_identifier = sd_identifier - self.lbry_files = [] - self.lbry_file_reflector = task.LoopingCall(self.reflect_lbry_files) - - def setup(self): - self._add_to_sd_identifier() - return self._start_lbry_files() - - def get_lbry_file_status(self, lbry_file): - return self.storage.get_lbry_file_status(lbry_file.rowid) - - def set_lbry_file_data_payment_rate(self, lbry_file, new_rate): - return self.storage(lbry_file.rowid, new_rate) - - def change_lbry_file_status(self, lbry_file, status): - log.debug("Changing status of %s to %s", lbry_file.stream_hash, status) - return f2d(self.storage.change_file_status(lbry_file.rowid, status)) - - def get_lbry_file_status_reports(self): - ds = [] - - for lbry_file in self.lbry_files: - ds.append(lbry_file.status()) - - dl = defer.DeferredList(ds) - - def filter_failures(status_reports): - return [status_report for success, status_report in status_reports if success is True] - - dl.addCallback(filter_failures) - return dl - - def _add_to_sd_identifier(self): - downloader_factory = ManagedEncryptedFileDownloaderFactory(self, self.blob_manager) - self.sd_identifier.add_stream_downloader_factory( - EncryptedFileStreamType, downloader_factory) - - def _get_lbry_file(self, rowid, stream_hash, payment_rate_manager, sd_hash, key, - stream_name, file_name, download_directory, suggested_file_name, download_mirrors=None): - return ManagedEncryptedFileDownloader( - self.conf, - rowid, - stream_hash, - self.peer_finder, - self.rate_limiter, - self.blob_manager, - self.storage, - self, - payment_rate_manager, - self.wallet, - download_directory, - file_name, - stream_name=stream_name, - sd_hash=sd_hash, - key=key, - suggested_file_name=suggested_file_name, - download_mirrors=download_mirrors - ) - - def _start_lbry_file(self, file_info, payment_rate_manager, claim_info, download_mirrors=None): - lbry_file = self._get_lbry_file( - file_info['row_id'], file_info['stream_hash'], payment_rate_manager, file_info['sd_hash'], - file_info['key'], file_info['stream_name'], file_info['file_name'], file_info['download_directory'], - file_info['suggested_file_name'], download_mirrors - ) - if claim_info: - lbry_file.set_claim_info(claim_info) - try: - # restore will raise an Exception if status is unknown - lbry_file.restore(file_info['status']) - self.storage.content_claim_callbacks[lbry_file.stream_hash] = lbry_file.get_claim_info - self.lbry_files.append(lbry_file) - if len(self.lbry_files) % 500 == 0: - log.info("Started %i files", len(self.lbry_files)) - except Exception: - log.warning("Failed to start %i", file_info.get('rowid')) - - async def _start_lbry_files(self): - files = await self.storage.get_all_lbry_files() - claim_infos = await self.storage.get_claims_from_stream_hashes([file['stream_hash'] for file in files]) - prm = self.payment_rate_manager - - log.info("Starting %i files", len(files)) - for file_info in files: - claim_info = claim_infos.get(file_info['stream_hash']) - self._start_lbry_file(file_info, prm, claim_info) - - log.info("Started %i lbry files", len(self.lbry_files)) - if self.auto_re_reflect is True: - safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval / 10) - - @defer.inlineCallbacks - def _stop_lbry_file(self, lbry_file): - def wait_for_finished(lbry_file, count=2): - if count or lbry_file.saving_status is not False: - return task.deferLater(reactor, 1, self._stop_lbry_file, lbry_file, - count=count - 1) - try: - yield lbry_file.stop(change_status=False) - self.lbry_files.remove(lbry_file) - except CurrentlyStoppingError: - yield wait_for_finished(lbry_file) - except AlreadyStoppedError: - pass - finally: - defer.returnValue(None) - - @defer.inlineCallbacks - def _stop_lbry_files(self): - log.info("Stopping %i lbry files", len(self.lbry_files)) - yield defer.DeferredList([self._stop_lbry_file(lbry_file) for lbry_file in list(self.lbry_files)]) - - async def add_published_file(self, stream_hash, sd_hash, download_directory, payment_rate_manager, blob_data_rate): - status = ManagedEncryptedFileDownloader.STATUS_FINISHED - stream_metadata = await get_sd_info(self.storage, stream_hash, include_blobs=False) - key = stream_metadata['key'] - stream_name = stream_metadata['stream_name'] - file_name = stream_metadata['suggested_file_name'] - rowid = await self.storage.save_published_file( - stream_hash, file_name, download_directory, blob_data_rate, status - ) - lbry_file = self._get_lbry_file( - rowid, stream_hash, payment_rate_manager, sd_hash, key, stream_name, file_name, download_directory, - stream_metadata['suggested_file_name'], download_mirrors=None - ) - lbry_file.restore(status) - await lbry_file.get_claim_info() - self.storage.content_claim_callbacks[stream_hash] = lbry_file.get_claim_info - self.lbry_files.append(lbry_file) - return lbry_file - - async def add_downloaded_file(self, stream_hash, sd_hash, download_directory, payment_rate_manager=None, - blob_data_rate=None, status=None, file_name=None, download_mirrors=None): - status = status or ManagedEncryptedFileDownloader.STATUS_STOPPED - payment_rate_manager = payment_rate_manager or self.payment_rate_manager - blob_data_rate = blob_data_rate or payment_rate_manager.min_blob_data_payment_rate - stream_metadata = await get_sd_info(self.storage, stream_hash, include_blobs=False) - key = stream_metadata['key'] - stream_name = stream_metadata['stream_name'] - file_name = file_name or stream_metadata['suggested_file_name'] - - # when we save the file we'll atomic touch the nearest file to the suggested file name - # that doesn't yet exist in the download directory - rowid = await self.storage.save_downloaded_file( - stream_hash, hexlify(os.path.basename(unhexlify(file_name))), download_directory, blob_data_rate - ) - file_name = (await self.storage.get_filename_for_rowid(rowid)).decode() - lbry_file = self._get_lbry_file( - rowid, stream_hash, payment_rate_manager, sd_hash, key, stream_name, file_name, download_directory, - stream_metadata['suggested_file_name'], download_mirrors - ) - lbry_file.restore(status) - await lbry_file.get_claim_info(include_supports=False) - self.storage.content_claim_callbacks[stream_hash] = lbry_file.get_claim_info - self.lbry_files.append(lbry_file) - return lbry_file - - @defer.inlineCallbacks - def delete_lbry_file(self, lbry_file, delete_file=False): - if lbry_file not in self.lbry_files: - raise ValueError("Could not find that LBRY file") - - def wait_for_finished(count=2): - if count <= 0 or lbry_file.saving_status is False: - return True - else: - return task.deferLater(reactor, 1, wait_for_finished, count=count - 1) - - full_path = os.path.join(lbry_file.download_directory, lbry_file.file_name) - - try: - yield lbry_file.stop() - except (AlreadyStoppedError, CurrentlyStoppingError): - yield wait_for_finished() - - self.lbry_files.remove(lbry_file) - - if lbry_file.stream_hash in self.storage.content_claim_callbacks: - del self.storage.content_claim_callbacks[lbry_file.stream_hash] - - yield lbry_file.delete_data() - yield f2d(self.storage.delete_stream(lbry_file.stream_hash)) - - if delete_file and os.path.isfile(full_path): - os.remove(full_path) - - defer.returnValue(True) - - def toggle_lbry_file_running(self, lbry_file): - """Toggle whether a stream reader is currently running""" - for l in self.lbry_files: - if l == lbry_file: - return l.toggle_running() - return defer.fail(Failure(ValueError("Could not find that LBRY file"))) - - @defer.inlineCallbacks - def reflect_lbry_files(self): - sem = defer.DeferredSemaphore(self.CONCURRENT_REFLECTS) - ds = [] - sd_hashes_to_reflect = yield f2d(self.storage.get_streams_to_re_reflect()) - for lbry_file in self.lbry_files: - if lbry_file.sd_hash in sd_hashes_to_reflect: - ds.append(sem.run(reflect_file, lbry_file, random.choice(self.conf.reflector_servers))) - yield defer.DeferredList(ds) - - @defer.inlineCallbacks - def stop(self): - safe_stop_looping_call(self.lbry_file_reflector) - yield self._stop_lbry_files() - log.info("Stopped encrypted file manager") - defer.returnValue(True) diff --git a/lbrynet/blob/EncryptedFileStatusReport.py b/lbrynet/blob/EncryptedFileStatusReport.py deleted file mode 100644 index 467f965dd..000000000 --- a/lbrynet/blob/EncryptedFileStatusReport.py +++ /dev/null @@ -1,6 +0,0 @@ -class EncryptedFileStatusReport: - def __init__(self, name, num_completed, num_known, running_status): - self.name = name - self.num_completed = num_completed - self.num_known = num_known - self.running_status = running_status diff --git a/lbrynet/blob/__init__.py b/lbrynet/blob/__init__.py index e69de29bb..b1f41917d 100644 --- a/lbrynet/blob/__init__.py +++ b/lbrynet/blob/__init__.py @@ -0,0 +1,6 @@ +from lbrynet.cryptoutils import get_lbry_hash_obj + +MAX_BLOB_SIZE = 2 * 2 ** 20 + +# digest_size is in bytes, and blob hashes are hex encoded +blobhash_length = get_lbry_hash_obj().digest_size * 2 diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index 9ff9d09a6..fdb5a215f 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -1,26 +1,27 @@ import os +import asyncio +import binascii import logging -from twisted.internet import defer -from twisted.web.client import FileBodyProducer -from twisted.python.failure import Failure -from lbrynet.cryptoutils import get_lbry_hash_obj -from lbrynet.p2p.Error import DownloadCanceledError, InvalidDataError, InvalidBlobHashError +import typing +from cryptography.hazmat.primitives.ciphers import Cipher, modes +from cryptography.hazmat.primitives.ciphers.algorithms import AES +from cryptography.hazmat.primitives.padding import PKCS7 + +from lbrynet.cryptoutils import backend, get_lbry_hash_obj +from lbrynet.error import DownloadCancelledError, InvalidBlobHashError, InvalidDataError + +from lbrynet.blob import MAX_BLOB_SIZE, blobhash_length +from lbrynet.blob.blob_info import BlobInfo from lbrynet.blob.writer import HashBlobWriter -from lbrynet.blob.reader import HashBlobReader log = logging.getLogger(__name__) -MAX_BLOB_SIZE = 2 * 2 ** 20 -# digest_size is in bytes, and blob hashes are hex encoded -blobhash_length = get_lbry_hash_obj().digest_size * 2 - - -def is_valid_hashcharacter(char): +def is_valid_hashcharacter(char: str) -> bool: return char in "0123456789abcdef" -def is_valid_blobhash(blobhash): +def is_valid_blobhash(blobhash: str) -> bool: """Checks whether the blobhash is the correct length and contains only valid characters (0-9, a-f) @@ -31,6 +32,16 @@ def is_valid_blobhash(blobhash): return len(blobhash) == blobhash_length and all(is_valid_hashcharacter(l) for l in blobhash) +def encrypt_blob_bytes(key: bytes, iv: bytes, unencrypted: bytes) -> typing.Tuple[bytes, str]: + cipher = Cipher(AES(key), modes.CBC(iv), backend=backend) + padder = PKCS7(AES.block_size).padder() + encryptor = cipher.encryptor() + encrypted = encryptor.update(padder.update(unencrypted) + padder.finalize()) + encryptor.finalize() + digest = get_lbry_hash_obj() + digest.update(encrypted) + return encrypted, digest.hexdigest() + + class BlobFile: """ A chunk of data available on the network which is specified by a hashsum @@ -40,178 +51,137 @@ class BlobFile: Also can be used for reading from blobs on the local filesystem """ - def __str__(self): - return self.blob_hash[:16] - - def __repr__(self): - return '<{}({})>'.format(self.__class__.__name__, str(self)) - - def __init__(self, blob_dir, blob_hash, length=None): + def __init__(self, loop: asyncio.BaseEventLoop, blob_dir: str, blob_hash: str, + length: typing.Optional[int] = None, + blob_completed_callback: typing.Optional[typing.Callable[['BlobFile'], typing.Awaitable]] = None): if not is_valid_blobhash(blob_hash): raise InvalidBlobHashError(blob_hash) + self.loop = loop self.blob_hash = blob_hash self.length = length - self.writers = {} # {Peer: writer, finished_deferred} - self._verified = False - self.readers = 0 self.blob_dir = blob_dir self.file_path = os.path.join(blob_dir, self.blob_hash) - self.blob_write_lock = defer.DeferredLock() + self.writers: typing.List[HashBlobWriter] = [] + + self.verified: asyncio.Event = asyncio.Event(loop=self.loop) + self.finished_writing = asyncio.Event(loop=loop) + self.blob_write_lock = asyncio.Lock(loop=loop) + if os.path.isfile(os.path.join(blob_dir, blob_hash)): + length = length or int(os.stat(os.path.join(blob_dir, blob_hash)).st_size) + self.length = length + self.verified.set() + self.finished_writing.set() self.saved_verified_blob = False - if os.path.isfile(self.file_path): - self.set_length(os.path.getsize(self.file_path)) - # This assumes that the hash of the blob has already been - # checked as part of the blob creation process. It might - # be worth having a function that checks the actual hash; - # its probably too expensive to have that check be part of - # this call. - self._verified = True + self.blob_completed_callback = blob_completed_callback - def open_for_writing(self, peer): - """ - open a blob file to be written by peer, supports concurrent - writers, as long as they are from different peers. - - returns tuple of (writer, finished_deferred) - - writer - a file like object with a write() function, close() when finished - finished_deferred - deferred that is fired when write is finished and returns - a instance of itself as HashBlob - """ - if peer not in self.writers: - log.debug("Opening %s to be written by %s", str(self), str(peer)) - finished_deferred = defer.Deferred() - writer = HashBlobWriter(self.get_length, self.writer_finished) - self.writers[peer] = (writer, finished_deferred) - return writer, finished_deferred - log.warning("Tried to download the same file twice simultaneously from the same peer") - return None, None - - def open_for_reading(self): - """ - open blob for reading - - returns a file like object that can be read() from, and closed() when - finished - """ - if self._verified is True: - f = open(self.file_path, 'rb') - reader = HashBlobReader(f, self.reader_finished) - self.readers += 1 - return reader - return None - - def delete(self): - """ - delete blob file from file system, prevent deletion - if a blob is being read from or written to - - returns a deferred that firesback when delete is completed - """ - if not self.writers and not self.readers: - self._verified = False - self.saved_verified_blob = False + def writer_finished(self, writer: HashBlobWriter): + def callback(finished: asyncio.Future): try: - if os.path.isfile(self.file_path): - os.remove(self.file_path) - except Exception as e: - log.exception("An error occurred deleting %s:", str(self.file_path), exc_info=e) - else: - raise ValueError("File is currently being read or written and cannot be deleted") + error = finished.result() + except Exception as err: + error = err + if writer in self.writers: # remove this download attempt + self.writers.remove(writer) + if not error: # the blob downloaded, cancel all the other download attempts and set the result + while self.writers: + other = self.writers.pop() + other.finished.cancel() + t = self.loop.create_task(self.save_verified_blob(writer)) + t.add_done_callback(lambda *_: self.finished_writing.set()) + return + if isinstance(error, (InvalidBlobHashError, InvalidDataError)): + log.error("writer error downloading %s: %s", self.blob_hash[:8], str(error)) + elif not isinstance(error, (DownloadCancelledError, asyncio.CancelledError, asyncio.TimeoutError)): + log.exception("something else") + raise error + return callback - @property - def verified(self): + async def save_verified_blob(self, writer): + def _save_verified(): + # log.debug(f"write blob file {self.blob_hash[:8]} from {writer.peer.address}") + if not self.saved_verified_blob and not os.path.isfile(self.file_path): + if self.get_length() == len(writer.verified_bytes): + with open(self.file_path, 'wb') as write_handle: + write_handle.write(writer.verified_bytes) + self.saved_verified_blob = True + else: + raise Exception("length mismatch") + + if self.verified.is_set(): + return + async with self.blob_write_lock: + await self.loop.run_in_executor(None, _save_verified) + if self.blob_completed_callback: + await self.blob_completed_callback(self) + self.verified.set() + + def open_for_writing(self) -> HashBlobWriter: + if os.path.exists(self.file_path): + raise OSError(f"File already exists '{self.file_path}'") + fut = asyncio.Future(loop=self.loop) + writer = HashBlobWriter(self.blob_hash, self.get_length, fut) + self.writers.append(writer) + fut.add_done_callback(self.writer_finished(writer)) + return writer + + async def sendfile(self, writer: asyncio.StreamWriter) -> int: """ - Protect verified from being modified by other classes. - verified is True if a write to a blob has completed successfully, - or a blob has been read to have the same length as specified - in init + Read and send the file to the writer and return the number of bytes sent """ - return self._verified + + with open(self.file_path, 'rb') as handle: + return await self.loop.sendfile(writer.transport, handle) + + async def close(self): + while self.writers: + self.writers.pop().finished.cancel() + + async def delete(self): + await self.close() + async with self.blob_write_lock: + self.saved_verified_blob = False + if os.path.isfile(self.file_path): + os.remove(self.file_path) + + def decrypt(self, key: bytes, iv: bytes) -> bytes: + """ + Decrypt a BlobFile to plaintext bytes + """ + + with open(self.file_path, "rb") as f: + buff = f.read() + if len(buff) != self.length: + raise ValueError("unexpected length") + cipher = Cipher(AES(key), modes.CBC(iv), backend=backend) + unpadder = PKCS7(AES.block_size).unpadder() + decryptor = cipher.decryptor() + return unpadder.update(decryptor.update(buff) + decryptor.finalize()) + unpadder.finalize() + + @classmethod + async def create_from_unencrypted(cls, loop: asyncio.BaseEventLoop, blob_dir: str, key: bytes, + iv: bytes, unencrypted: bytes, blob_num: int) -> BlobInfo: + """ + Create an encrypted BlobFile from plaintext bytes + """ + + blob_bytes, blob_hash = encrypt_blob_bytes(key, iv, unencrypted) + length = len(blob_bytes) + blob = cls(loop, blob_dir, blob_hash, length) + writer = blob.open_for_writing() + writer.write(blob_bytes) + await blob.verified.wait() + return BlobInfo(blob_num, length, binascii.hexlify(iv).decode(), blob_hash) def set_length(self, length): if self.length is not None and length == self.length: - return True + return if self.length is None and 0 <= length <= MAX_BLOB_SIZE: self.length = length - return True - log.warning("Got an invalid length. Previous length: %s, Invalid length: %s", - self.length, length) - return False + return + log.warning("Got an invalid length. Previous length: %s, Invalid length: %s", self.length, length) def get_length(self): return self.length def get_is_verified(self): - return self.verified - - def is_downloading(self): - if self.writers: - return True - return False - - def reader_finished(self, reader): - self.readers -= 1 - return defer.succeed(True) - - def writer_finished(self, writer, err=None): - def fire_finished_deferred(): - self._verified = True - for p, (w, finished_deferred) in list(self.writers.items()): - if w == writer: - del self.writers[p] - finished_deferred.callback(self) - return True - log.warning( - "Somehow, the writer that was accepted as being valid was already removed: %s", - writer) - return False - - def errback_finished_deferred(err): - for p, (w, finished_deferred) in list(self.writers.items()): - if w == writer: - del self.writers[p] - finished_deferred.errback(err) - - def cancel_other_downloads(): - for p, (w, finished_deferred) in self.writers.items(): - w.close() - - if err is None: - if writer.len_so_far == self.length and writer.blob_hash == self.blob_hash: - if self._verified is False: - d = self.save_verified_blob(writer) - d.addCallbacks(lambda _: fire_finished_deferred(), errback_finished_deferred) - d.addCallback(lambda _: cancel_other_downloads()) - else: - d = defer.succeed(None) - fire_finished_deferred() - else: - if writer.len_so_far != self.length: - err_string = "blob length is %i vs expected %i" % (writer.len_so_far, self.length) - else: - err_string = f"blob hash is {writer.blob_hash} vs expected {self.blob_hash}" - errback_finished_deferred(Failure(InvalidDataError(err_string))) - d = defer.succeed(None) - else: - errback_finished_deferred(err) - d = defer.succeed(None) - d.addBoth(lambda _: writer.close_handle()) - return d - - def save_verified_blob(self, writer): - # we cannot have multiple _save_verified_blob interrupting - # each other, can happen since startProducing is a deferred - return self.blob_write_lock.run(self._save_verified_blob, writer) - - @defer.inlineCallbacks - def _save_verified_blob(self, writer): - if self.saved_verified_blob is False: - writer.write_handle.seek(0) - out_path = os.path.join(self.blob_dir, self.blob_hash) - producer = FileBodyProducer(writer.write_handle) - yield producer.startProducing(open(out_path, 'wb')) - self.saved_verified_blob = True - defer.returnValue(True) - else: - raise DownloadCanceledError() + return self.verified.is_set() diff --git a/lbrynet/blob/blob_info.py b/lbrynet/blob/blob_info.py new file mode 100644 index 000000000..d300fd905 --- /dev/null +++ b/lbrynet/blob/blob_info.py @@ -0,0 +1,19 @@ +import typing + + +class BlobInfo: + def __init__(self, blob_num: int, length: int, iv: str, blob_hash: typing.Optional[str] = None): + self.blob_hash = blob_hash + self.blob_num = blob_num + self.length = length + self.iv = iv + + def as_dict(self) -> typing.Dict: + d = { + 'length': self.length, + 'blob_num': self.blob_num, + 'iv': self.iv, + } + if self.blob_hash: # non-terminator blobs have a blob hash + d['blob_hash'] = self.blob_hash + return d diff --git a/lbrynet/blob/blob_manager.py b/lbrynet/blob/blob_manager.py new file mode 100644 index 000000000..3ae555b81 --- /dev/null +++ b/lbrynet/blob/blob_manager.py @@ -0,0 +1,89 @@ +import typing +import asyncio +import logging +from sqlite3 import IntegrityError +from lbrynet.extras.daemon.storage import SQLiteStorage +from lbrynet.blob.blob_file import BlobFile +from lbrynet.stream.descriptor import StreamDescriptor + +if typing.TYPE_CHECKING: + from lbrynet.dht.protocol.data_store import DictDataStore + +log = logging.getLogger(__name__) + + +class BlobFileManager: + def __init__(self, loop: asyncio.BaseEventLoop, blob_dir: str, storage: SQLiteStorage, + node_data_store: typing.Optional['DictDataStore'] = None): + """ + This class stores blobs on the hard disk + + blob_dir - directory where blobs are stored + storage - SQLiteStorage object + """ + self.loop = loop + self.blob_dir = blob_dir + self.storage = storage + self._node_data_store = node_data_store + self.completed_blob_hashes: typing.Set[str] = set() if not self._node_data_store\ + else self._node_data_store.completed_blobs + self.blobs: typing.Dict[str, BlobFile] = {} + + async def setup(self) -> bool: + raw_blob_hashes = await self.get_all_verified_blobs() + self.completed_blob_hashes.update(raw_blob_hashes) + return True + + def get_blob(self, blob_hash, length: typing.Optional[int] = None): + if blob_hash in self.blobs: + if length and self.blobs[blob_hash].length is None: + self.blobs[blob_hash].set_length(length) + else: + self.blobs[blob_hash] = BlobFile(self.loop, self.blob_dir, blob_hash, length, self.blob_completed) + return self.blobs[blob_hash] + + def get_stream_descriptor(self, sd_hash): + return StreamDescriptor.from_stream_descriptor_blob(self.loop, self.blob_dir, self.get_blob(sd_hash)) + + async def blob_completed(self, blob: BlobFile): + if blob.blob_hash is None: + raise Exception("Blob hash is None") + if not blob.length: + raise Exception("Blob has a length of 0") + if blob.blob_hash not in self.completed_blob_hashes: + self.completed_blob_hashes.add(blob.blob_hash) + await self.storage.add_completed_blob(blob.blob_hash) + + def check_completed_blobs(self, blob_hashes: typing.List[str]) -> typing.List[str]: + """Returns of the blobhashes_to_check, which are valid""" + blobs = [self.get_blob(b) for b in blob_hashes] + return [blob.blob_hash for blob in blobs if blob.get_is_verified()] + + async def set_should_announce(self, blob_hash: str, should_announce: bool): + now = self.loop.time() + return await self.storage.set_should_announce(blob_hash, now, should_announce) + + async def get_all_verified_blobs(self) -> typing.List[str]: + blob_hashes = await self.storage.get_all_blob_hashes() + return self.check_completed_blobs(blob_hashes) + + async def delete_blobs(self, blob_hashes: typing.List[str]): + bh_to_delete_from_db = [] + for blob_hash in blob_hashes: + if not blob_hash: + continue + try: + blob = self.get_blob(blob_hash) + await blob.delete() + bh_to_delete_from_db.append(blob_hash) + except Exception as e: + log.warning("Failed to delete blob file. Reason: %s", e) + if blob_hash in self.completed_blob_hashes: + self.completed_blob_hashes.remove(blob_hash) + if blob_hash in self.blobs: + del self.blobs[blob_hash] + try: + await self.storage.delete_blobs_from_db(bh_to_delete_from_db) + except IntegrityError as err: + if str(err) != "FOREIGN KEY constraint failed": + raise err diff --git a/lbrynet/blob/creator.py b/lbrynet/blob/creator.py deleted file mode 100644 index af9dc213f..000000000 --- a/lbrynet/blob/creator.py +++ /dev/null @@ -1,51 +0,0 @@ -import os -import logging -from io import BytesIO -from twisted.internet import defer -from twisted.web.client import FileBodyProducer -from lbrynet.cryptoutils import get_lbry_hash_obj - -log = logging.getLogger(__name__) - - -class BlobFileCreator: - """ - This class is used to create blobs on the local filesystem - when we do not know the blob hash beforehand (i.e, when creating - a new stream) - """ - def __init__(self, blob_dir): - self.blob_dir = blob_dir - self.buffer = BytesIO() - self._is_open = True - self._hashsum = get_lbry_hash_obj() - self.len_so_far = 0 - self.blob_hash = None - self.length = None - - @defer.inlineCallbacks - def close(self): - self.length = self.len_so_far - self.blob_hash = self._hashsum.hexdigest() - if self.blob_hash and self._is_open and self.length > 0: - # do not save 0 length files (empty tail blob in streams) - # or if its been closed already - self.buffer.seek(0) - out_path = os.path.join(self.blob_dir, self.blob_hash) - producer = FileBodyProducer(self.buffer) - yield producer.startProducing(open(out_path, 'wb')) - self._is_open = False - if self.length > 0: - defer.returnValue(self.blob_hash) - else: - # 0 length files (empty tail blob in streams ) - # must return None as their blob_hash for - # it to be saved properly by EncryptedFileMetadataManagers - defer.returnValue(None) - - def write(self, data): - if not self._is_open: - raise IOError - self._hashsum.update(data) - self.len_so_far += len(data) - self.buffer.write(data) diff --git a/lbrynet/blob/reader.py b/lbrynet/blob/reader.py deleted file mode 100644 index 26aca0dbc..000000000 --- a/lbrynet/blob/reader.py +++ /dev/null @@ -1,30 +0,0 @@ -import logging - -log = logging.getLogger(__name__) - - -class HashBlobReader: - """ - This is a file like reader class that supports - read(size) and close() - """ - def __init__(self, read_handle, finished_cb): - self.finished_cb = finished_cb - self.finished_cb_d = None - self.read_handle = read_handle - - def __del__(self): - if self.finished_cb_d is None: - log.warning("Garbage collection was called, but reader for %s was not closed yet", - self.read_handle.name) - self.close() - - def read(self, size=-1): - return self.read_handle.read(size) - - def close(self): - # if we've already closed and called finished_cb, do nothing - if self.finished_cb_d is not None: - return - self.read_handle.close() - self.finished_cb_d = self.finished_cb(self) diff --git a/lbrynet/blob/writer.py b/lbrynet/blob/writer.py index c66e3b037..8fc05bd37 100644 --- a/lbrynet/blob/writer.py +++ b/lbrynet/blob/writer.py @@ -1,58 +1,71 @@ +import typing import logging +import asyncio from io import BytesIO -from twisted.python.failure import Failure -from lbrynet.p2p.Error import DownloadCanceledError, InvalidDataError +from lbrynet.error import InvalidBlobHashError, InvalidDataError from lbrynet.cryptoutils import get_lbry_hash_obj log = logging.getLogger(__name__) class HashBlobWriter: - def __init__(self, length_getter, finished_cb): - self.write_handle = BytesIO() - self.length_getter = length_getter - self.finished_cb = finished_cb - self.finished_cb_d = None + def __init__(self, expected_blob_hash: str, get_length: typing.Callable[[], int], + finished: asyncio.Future): + self.expected_blob_hash = expected_blob_hash + self.get_length = get_length + self.buffer = BytesIO() + self.finished = finished + self.finished.add_done_callback(lambda *_: self.close_handle()) self._hashsum = get_lbry_hash_obj() self.len_so_far = 0 + self.verified_bytes = b'' def __del__(self): - if self.finished_cb_d is None: + if self.buffer is not None: log.warning("Garbage collection was called, but writer was not closed yet") - self.close() + self.close_handle() - @property - def blob_hash(self): + def calculate_blob_hash(self) -> str: return self._hashsum.hexdigest() - def write(self, data): - if self.write_handle is None: + def closed(self): + return self.buffer is None or self.buffer.closed + + def write(self, data: bytes): + expected_length = self.get_length() + if not expected_length: + raise IOError("unknown blob length") + if self.buffer is None: log.warning("writer has already been closed") + if not self.finished.done(): + self.finished.cancel() + return raise IOError('I/O operation on closed file') self._hashsum.update(data) self.len_so_far += len(data) - if self.len_so_far > self.length_getter(): - self.finished_cb_d = self.finished_cb( - self, - Failure(InvalidDataError("Length so far is greater than the expected length." - " %s to %s" % (self.len_so_far, - self.length_getter())))) - else: - self.write_handle.write(data) - if self.len_so_far == self.length_getter(): - self.finished_cb_d = self.finished_cb(self) + if self.len_so_far > expected_length: + self.close_handle() + self.finished.set_result(InvalidDataError( + f'Length so far is greater than the expected length. {self.len_so_far} to {expected_length}' + )) + return + self.buffer.write(data) + if self.len_so_far == expected_length: + blob_hash = self.calculate_blob_hash() + if blob_hash != self.expected_blob_hash: + self.close_handle() + self.finished.set_result(InvalidBlobHashError( + f"blob hash is {blob_hash} vs expected {self.expected_blob_hash}" + )) + return + self.buffer.seek(0) + self.verified_bytes = self.buffer.read() + self.close_handle() + if self.finished and not (self.finished.done() or self.finished.cancelled()): + self.finished.set_result(None) def close_handle(self): - if self.write_handle is not None: - self.write_handle.close() - self.write_handle = None - - def close(self, reason=None): - # if we've already called finished_cb because we either finished writing - # or closed already, do nothing - if self.finished_cb_d is not None: - return - if reason is None: - reason = Failure(DownloadCanceledError()) - self.finished_cb_d = self.finished_cb(self, reason) + if self.buffer is not None: + self.buffer.close() + self.buffer = None diff --git a/lbrynet/p2p/BlobInfo.py b/lbrynet/p2p/BlobInfo.py deleted file mode 100644 index 819556fd9..000000000 --- a/lbrynet/p2p/BlobInfo.py +++ /dev/null @@ -1,18 +0,0 @@ -class BlobInfo: - """ - This structure is used to represent the metadata of a blob. - - @ivar blob_hash: The sha384 hashsum of the blob's data. - @type blob_hash: string, hex-encoded - - @ivar blob_num: For streams, the position of the blob in the stream. - @type blob_num: integer - - @ivar length: The length of the blob in bytes. - @type length: integer - """ - - def __init__(self, blob_hash, blob_num, length): - self.blob_hash = blob_hash - self.blob_num = blob_num - self.length = length diff --git a/lbrynet/p2p/BlobManager.py b/lbrynet/p2p/BlobManager.py deleted file mode 100644 index ecea90cb4..000000000 --- a/lbrynet/p2p/BlobManager.py +++ /dev/null @@ -1,134 +0,0 @@ -import asyncio -import logging -import os -from binascii import unhexlify -from sqlite3 import IntegrityError -from twisted.internet import defer -from lbrynet.extras.compat import f2d -from lbrynet.blob.blob_file import BlobFile -from lbrynet.blob.creator import BlobFileCreator - -log = logging.getLogger(__name__) - - -class DiskBlobManager: - def __init__(self, blob_dir, storage, node_datastore=None): - """ - This class stores blobs on the hard disk - - blob_dir - directory where blobs are stored - storage - SQLiteStorage object - """ - self.storage = storage - self.blob_dir = blob_dir - self._node_datastore = node_datastore - self.blob_creator_type = BlobFileCreator - # TODO: consider using an LRU for blobs as there could potentially - # be thousands of blobs loaded up, many stale - self.blobs = {} - self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)} - - async def setup(self): - if self._node_datastore is not None: - raw_blob_hashes = await self.storage.get_all_finished_blobs() - self._node_datastore.completed_blobs.update(raw_blob_hashes) - - async def stop(self): - pass - - def get_blob(self, blob_hash, length=None): - """Return a blob identified by blob_hash, which may be a new blob or a - blob that is already on the hard disk - """ - if length is not None and not isinstance(length, int): - raise Exception("invalid length type: {} ({})".format(length, str(type(length)))) - if blob_hash in self.blobs: - return self.blobs[blob_hash] - return self._make_new_blob(blob_hash, length) - - def get_blob_creator(self): - return self.blob_creator_type(self.blob_dir) - - def _make_new_blob(self, blob_hash, length=None): - log.debug('Making a new blob for %s', blob_hash) - blob = BlobFile(self.blob_dir, blob_hash, length) - self.blobs[blob_hash] = blob - return blob - - @defer.inlineCallbacks - def blob_completed(self, blob, should_announce=False, next_announce_time=None): - yield f2d(self.storage.add_completed_blob( - blob.blob_hash, blob.length, next_announce_time, should_announce - )) - if self._node_datastore is not None: - self._node_datastore.completed_blobs.add(unhexlify(blob.blob_hash)) - - def completed_blobs(self, blobhashes_to_check): - return self._completed_blobs(blobhashes_to_check) - - def count_should_announce_blobs(self): - return f2d(self.storage.count_should_announce_blobs()) - - def set_should_announce(self, blob_hash, should_announce): - return f2d(self.storage.set_should_announce( - blob_hash, asyncio.get_event_loop().time(), should_announce - )) - - def get_should_announce(self, blob_hash): - return f2d(self.storage.should_announce(blob_hash)) - - def creator_finished(self, blob_creator, should_announce): - log.debug("blob_creator.blob_hash: %s", blob_creator.blob_hash) - if blob_creator.blob_hash is None: - raise Exception("Blob hash is None") - if blob_creator.blob_hash in self.blobs: - raise Exception("Creator finished for blob that is already marked as completed") - if blob_creator.length is None: - raise Exception("Blob has a length of 0") - new_blob = BlobFile(self.blob_dir, blob_creator.blob_hash, blob_creator.length) - self.blobs[blob_creator.blob_hash] = new_blob - return self.blob_completed(new_blob, should_announce) - - def get_all_verified_blobs(self): - d = f2d(self._get_all_verified_blob_hashes()) - d.addCallback(self.completed_blobs) - return d - - @defer.inlineCallbacks - def delete_blobs(self, blob_hashes): - bh_to_delete_from_db = [] - for blob_hash in blob_hashes: - if not blob_hash: - continue - if self._node_datastore is not None: - try: - self._node_datastore.completed_blobs.remove(unhexlify(blob_hash)) - except KeyError: - pass - try: - blob = self.get_blob(blob_hash) - blob.delete() - bh_to_delete_from_db.append(blob_hash) - del self.blobs[blob_hash] - except Exception as e: - log.warning("Failed to delete blob file. Reason: %s", e) - try: - yield f2d(self.storage.delete_blobs_from_db(bh_to_delete_from_db)) - except IntegrityError as err: - if str(err) != "FOREIGN KEY constraint failed": - raise err - - def _completed_blobs(self, blobhashes_to_check): - """Returns of the blobhashes_to_check, which are valid""" - blobs = [self.get_blob(b) for b in blobhashes_to_check] - blob_hashes = [b.blob_hash for b in blobs if b.verified] - return blob_hashes - - async def _get_all_verified_blob_hashes(self): - blobs = await self.storage.get_all_blob_hashes() - verified_blobs = [] - for blob_hash in blobs: - file_path = os.path.join(self.blob_dir, blob_hash) - if os.path.isfile(file_path): - verified_blobs.append(blob_hash) - return verified_blobs diff --git a/tests/unit/blob/__init__.py b/tests/unit/blob/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/unit/blob/test_blob_file.py b/tests/unit/blob/test_blob_file.py new file mode 100644 index 000000000..3862df63e --- /dev/null +++ b/tests/unit/blob/test_blob_file.py @@ -0,0 +1,35 @@ +import asyncio +import tempfile +import shutil +import os +from torba.testcase import AsyncioTestCase +from lbrynet.extras.daemon.storage import SQLiteStorage +from lbrynet.blob.blob_manager import BlobFileManager + + +class TestBlobfile(AsyncioTestCase): + async def test_create_blob(self): + blob_hash = "7f5ab2def99f0ddd008da71db3a3772135f4002b19b7605840ed1034c8955431bd7079549e65e6b2a3b9c17c773073ed" + blob_bytes = b'1' * ((2 * 2 ** 20) - 1) + + loop = asyncio.get_event_loop() + tmp_dir = tempfile.mkdtemp() + self.addCleanup(lambda: shutil.rmtree(tmp_dir)) + + storage = SQLiteStorage(os.path.join(tmp_dir, "lbrynet.sqlite")) + blob_manager = BlobFileManager(loop, tmp_dir, storage) + + await storage.open() + await blob_manager.setup() + + # add the blob on the server + blob = blob_manager.get_blob(blob_hash, len(blob_bytes)) + self.assertEqual(blob.get_is_verified(), False) + self.assertNotIn(blob_hash, blob_manager.completed_blob_hashes) + + writer = blob.open_for_writing() + writer.write(blob_bytes) + await blob.finished_writing.wait() + self.assertTrue(os.path.isfile(blob.file_path), True) + self.assertEqual(blob.get_is_verified(), True) + self.assertIn(blob_hash, blob_manager.completed_blob_hashes)