diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c432495f..67fc087d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,25 +12,39 @@ at anytime. * * -### Fixed - * +### Added + * Added ability for reflector to store stream information for head blob announce * +### Fixed + * Fixed handling cancelled blob and availability requests + * Fixed redundant blob requests to a peer + * Fixed blob download history + ### Deprecated - * + * Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead. * ### Changed - * - * + * Announcing by head blob is turned on by default + * Updated reflector server dns + * Improved download analytics + * Improved download errors by distinguishing a data timeout from a sd timeout ### Added - * - * + * Added WAL pragma to sqlite3 + * Added unit tests for `BlobFile` + * Updated exchange rate tests for the lbry.io api + * Use `hashlib` for sha384 instead of `pycrypto` + * Use `cryptography` instead of `pycrypto` for blob encryption and decryption + * Use `cryptography` for PKCS7 instead of doing it manually + * Use `BytesIO` buffers instead of temp files when processing blobs + * Refactored and pruned blob related classes into `lbrynet.blobs` + * Changed several `assert`s to raise more useful errors ### Removed - * - * + * Removed `TempBlobFile` + * Removed unused `EncryptedFileOpener` ## [0.16.2] - 2017-09-26 diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 000000000..e7d0dce20 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,3 @@ +## Contributing to LBRY + +https://lbry.io/faq/contributing diff --git a/docs/cli.md b/docs/cli.md index 963c515e3..017865ff1 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -22,18 +22,6 @@ Returns: (bool) true if successful ``` -## blob_announce_all - -```text -Announce all blobs to the DHT - -Usage: - blob_announce_all - -Returns: - (str) Success/fail message -``` - ## blob_delete ```text @@ -239,9 +227,9 @@ Returns: 'depth': (int) claim depth, 'has_signature': (bool) included if decoded_claim 'name': (str) claim name, - 'supports: (list) list of supports [{'txid': txid, - 'nout': nout, - 'amount': amount}], + 'supports: (list) list of supports [{'txid': (str) txid, + 'nout': (int) nout, + 'amount': (float) amount}], 'txid': (str) claim txid, 'nout': (str) claim nout, 'signature_is_valid': (bool), included if has_signature, @@ -719,15 +707,18 @@ Returns: 'depth': (int) claim depth, 'has_signature': (bool) included if decoded_claim 'name': (str) claim name, - 'supports: (list) list of supports [{'txid': txid, - 'nout': nout, - 'amount': amount}], + 'supports: (list) list of supports [{'txid': (str) txid, + 'nout': (int) nout, + 'amount': (float) amount}], 'txid': (str) claim txid, 'nout': (str) claim nout, 'signature_is_valid': (bool), included if has_signature, 'value': ClaimDict if decoded, otherwise hex string } + If the uri resolves to a channel: + 'claims_in_channel': (int) number of claims in the channel, + If the uri resolves to a claim: 'claim': { 'address': (str) claim address, @@ -741,9 +732,9 @@ Returns: 'has_signature': (bool) included if decoded_claim 'name': (str) claim name, 'channel_name': (str) channel name if claim is in a channel - 'supports: (list) list of supports [{'txid': txid, - 'nout': nout, - 'amount': amount}] + 'supports: (list) list of supports [{'txid': (str) txid, + 'nout': (int) nout, + 'amount': (float) amount}] 'txid': (str) claim txid, 'nout': (str) claim nout, 'signature_is_valid': (bool), included if has_signature, @@ -905,10 +896,37 @@ Returns: List transactions belonging to wallet Usage: - transaction_list + transaction_list [-t] + +Options: + -t : Include claim tip information Returns: - (list) List of transactions + (list) List of transactions, where is_tip is null by default, + and set to a boolean if include_tip_info is true + + { + "claim_info": (list) claim info if in txn [{"amount": (float) claim amount, + "claim_id": (str) claim id, + "claim_name": (str) claim name, + "nout": (int) nout}], + "confirmations": (int) number of confirmations for the txn, + "date": (str) date and time of txn, + "fee": (float) txn fee, + "support_info": (list) support info if in txn [{"amount": (float) support amount, + "claim_id": (str) claim id, + "claim_name": (str) claim name, + "is_tip": (null) default, + (bool) if include_tip_info is true, + "nout": (int) nout}], + "timestamp": (int) timestamp, + "txid": (str) txn id, + "update_info": (list) update info if in txn [{"amount": (float) updated amount, + "claim_id": (str) claim id, + "claim_name": (str) claim name, + "nout": (int) nout}], + "value": (float) value of txn + } ``` ## transaction_show diff --git a/lbrynet/analytics.py b/lbrynet/analytics.py index 02885fb6c..4b41e0804 100644 --- a/lbrynet/analytics.py +++ b/lbrynet/analytics.py @@ -57,15 +57,14 @@ class Manager(object): self._event(DOWNLOAD_STARTED, self._download_properties(id_, name, claim_dict)) ) - def send_download_errored(self, id_, name, claim_dict=None): - self.analytics_api.track( - self._event(DOWNLOAD_ERRORED, self._download_properties(id_, name, claim_dict)) - ) + def send_download_errored(self, err, id_, name, claim_dict, report): + download_error_properties = self._download_error_properties(err, id_, name, claim_dict, + report) + self.analytics_api.track(self._event(DOWNLOAD_ERRORED, download_error_properties)) - def send_download_finished(self, id_, name, claim_dict=None): - self.analytics_api.track( - self._event(DOWNLOAD_FINISHED, self._download_properties(id_, name, claim_dict)) - ) + def send_download_finished(self, id_, name, report, claim_dict=None): + download_properties = self._download_properties(id_, name, claim_dict, report) + self.analytics_api.track(self._event(DOWNLOAD_FINISHED, download_properties)) def send_claim_action(self, action): self.analytics_api.track(self._event(CLAIM_ACTION, {'action': action})) @@ -159,18 +158,31 @@ class Manager(object): return properties @staticmethod - def _download_properties(id_, name, claim_dict=None): - sd_hash = None - if claim_dict: - try: - sd_hash = claim_dict.source_hash - except (KeyError, TypeError, ValueError): - log.debug('Failed to get sd_hash from %s', claim_dict, exc_info=True) - return { + def _download_properties(id_, name, claim_dict=None, report=None): + sd_hash = None if not claim_dict else claim_dict.source_hash + p = { 'download_id': id_, 'name': name, 'stream_info': sd_hash } + if report: + p['report'] = report + return p + + @staticmethod + def _download_error_properties(error, id_, name, claim_dict, report): + def error_name(err): + if not hasattr(type(err), "__name__"): + return str(type(err)) + return type(err).__name__ + return { + 'download_id': id_, + 'name': name, + 'stream_info': claim_dict.source_hash, + 'error': error_name(error), + 'reason': error.message, + 'report': report + } @staticmethod def _make_context(platform, wallet): diff --git a/lbrynet/blob/__init__.py b/lbrynet/blob/__init__.py new file mode 100644 index 000000000..e605ea317 --- /dev/null +++ b/lbrynet/blob/__init__.py @@ -0,0 +1,4 @@ +from blob_file import BlobFile +from creator import BlobFileCreator +from writer import HashBlobWriter +from reader import HashBlobReader diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py new file mode 100644 index 000000000..f1a2010ee --- /dev/null +++ b/lbrynet/blob/blob_file.py @@ -0,0 +1,227 @@ +import logging +import os +import threading +from twisted.internet import defer, threads +from twisted.protocols.basic import FileSender +from twisted.web.client import FileBodyProducer +from twisted.python.failure import Failure +from lbrynet import conf +from lbrynet.core.Error import DownloadCanceledError, InvalidDataError, InvalidBlobHashError +from lbrynet.core.utils import is_valid_blobhash +from lbrynet.blob.writer import HashBlobWriter +from lbrynet.blob.reader import HashBlobReader + + +log = logging.getLogger(__name__) + + +class BlobFile(object): + """ + A chunk of data available on the network which is specified by a hashsum + + This class is used to create blobs on the local filesystem + when we already know the blob hash before hand (i.e., when downloading blobs) + Also can be used for reading from blobs on the local filesystem + """ + + def __str__(self): + return self.blob_hash[:16] + + def __repr__(self): + return '<{}({})>'.format(self.__class__.__name__, str(self)) + + def __init__(self, blob_dir, blob_hash, length=None): + if not is_valid_blobhash(blob_hash): + raise InvalidBlobHashError(blob_hash) + self.blob_hash = blob_hash + self.length = length + self.writers = {} # {Peer: writer, finished_deferred} + self._verified = False + self.readers = 0 + self.blob_dir = blob_dir + self.file_path = os.path.join(blob_dir, self.blob_hash) + self.setting_verified_blob_lock = threading.Lock() + self.moved_verified_blob = False + if os.path.isfile(self.file_path): + self.set_length(os.path.getsize(self.file_path)) + # This assumes that the hash of the blob has already been + # checked as part of the blob creation process. It might + # be worth having a function that checks the actual hash; + # its probably too expensive to have that check be part of + # this call. + self._verified = True + + def open_for_writing(self, peer): + """ + open a blob file to be written by peer, supports concurrent + writers, as long as they are from differnt peers. + + returns tuple of (writer, finished_deferred) + + writer - a file like object with a write() function, close() when finished + finished_deferred - deferred that is fired when write is finished and returns + a instance of itself as HashBlob + """ + if not peer in self.writers: + log.debug("Opening %s to be written by %s", str(self), str(peer)) + finished_deferred = defer.Deferred() + writer = HashBlobWriter(self.get_length, self.writer_finished) + self.writers[peer] = (writer, finished_deferred) + return (writer, finished_deferred) + log.warning("Tried to download the same file twice simultaneously from the same peer") + return None, None + + def open_for_reading(self): + """ + open blob for reading + + returns a file handle that can be read() from. + once finished with the file handle, user must call close_read_handle() + otherwise blob cannot be deleted. + """ + if self._verified is True: + file_handle = None + try: + file_handle = open(self.file_path, 'rb') + self.readers += 1 + return file_handle + except IOError: + log.exception('Failed to open %s', self.file_path) + self.close_read_handle(file_handle) + return None + + def delete(self): + """ + delete blob file from file system, prevent deletion + if a blob is being read from or written to + + returns a deferred that firesback when delete is completed + """ + if not self.writers and not self.readers: + self._verified = False + self.moved_verified_blob = False + + def delete_from_file_system(): + if os.path.isfile(self.file_path): + os.remove(self.file_path) + + d = threads.deferToThread(delete_from_file_system) + + def log_error(err): + log.warning("An error occurred deleting %s: %s", + str(self.file_path), err.getErrorMessage()) + return err + + d.addErrback(log_error) + return d + else: + return defer.fail(Failure( + ValueError("File is currently being read or written and cannot be deleted"))) + + @property + def verified(self): + """ + Protect verified from being modified by other classes. + verified is True if a write to a blob has completed succesfully, + or a blob has been read to have the same length as specified + in init + """ + return self._verified + + def set_length(self, length): + if self.length is not None and length == self.length: + return True + if self.length is None and 0 <= length <= conf.settings['BLOB_SIZE']: + self.length = length + return True + log.warning("Got an invalid length. Previous length: %s, Invalid length: %s", + self.length, length) + return False + + def get_length(self): + return self.length + + def get_is_verified(self): + return self.verified + + def is_downloading(self): + if self.writers: + return True + return False + + def read(self, write_func): + def close_self(*args): + self.close_read_handle(file_handle) + return args[0] + + file_sender = FileSender() + reader = HashBlobReader(write_func) + file_handle = self.open_for_reading() + if file_handle is not None: + d = file_sender.beginFileTransfer(file_handle, reader) + d.addCallback(close_self) + else: + d = defer.fail(IOError("Could not read the blob")) + return d + + def writer_finished(self, writer, err=None): + def fire_finished_deferred(): + self._verified = True + for p, (w, finished_deferred) in self.writers.items(): + if w == writer: + del self.writers[p] + finished_deferred.callback(self) + return True + log.warning( + "Somehow, the writer that was accepted as being valid was already removed: %s", + writer) + return False + + def errback_finished_deferred(err): + for p, (w, finished_deferred) in self.writers.items(): + if w == writer: + del self.writers[p] + finished_deferred.errback(err) + + def cancel_other_downloads(): + for p, (w, finished_deferred) in self.writers.items(): + w.close() + + if err is None: + if writer.len_so_far == self.length and writer.blob_hash == self.blob_hash: + if self._verified is False: + d = self._save_verified_blob(writer) + d.addCallbacks(lambda _: fire_finished_deferred(), errback_finished_deferred) + d.addCallback(lambda _: cancel_other_downloads()) + else: + errback_finished_deferred(Failure(DownloadCanceledError())) + d = defer.succeed(True) + else: + err_string = "length vs expected: {0}, {1}, hash vs expected: {2}, {3}" + err_string = err_string.format(self.length, writer.len_so_far, self.blob_hash, + writer.blob_hash) + errback_finished_deferred(Failure(InvalidDataError(err_string))) + d = defer.succeed(True) + else: + errback_finished_deferred(err) + d = defer.succeed(True) + d.addBoth(lambda _: writer.close_handle()) + return d + + def close_read_handle(self, file_handle): + if file_handle is not None: + file_handle.close() + self.readers -= 1 + + @defer.inlineCallbacks + def _save_verified_blob(self, writer): + with self.setting_verified_blob_lock: + if self.moved_verified_blob is False: + writer.write_handle.seek(0) + out_path = os.path.join(self.blob_dir, self.blob_hash) + producer = FileBodyProducer(writer.write_handle) + yield producer.startProducing(open(out_path, 'wb')) + self.moved_verified_blob = True + defer.returnValue(True) + else: + raise DownloadCanceledError() diff --git a/lbrynet/blob/creator.py b/lbrynet/blob/creator.py new file mode 100644 index 000000000..963986d5c --- /dev/null +++ b/lbrynet/blob/creator.py @@ -0,0 +1,51 @@ +import os +import logging +from io import BytesIO +from twisted.internet import defer +from twisted.web.client import FileBodyProducer +from lbrynet.core.cryptoutils import get_lbry_hash_obj + +log = logging.getLogger(__name__) + + +class BlobFileCreator(object): + """ + This class is used to create blobs on the local filesystem + when we do not know the blob hash beforehand (i.e, when creating + a new stream) + """ + def __init__(self, blob_dir): + self.blob_dir = blob_dir + self.buffer = BytesIO() + self._is_open = True + self._hashsum = get_lbry_hash_obj() + self.len_so_far = 0 + self.blob_hash = None + self.length = None + + @defer.inlineCallbacks + def close(self): + self.length = self.len_so_far + self.blob_hash = self._hashsum.hexdigest() + if self.blob_hash and self._is_open and self.length > 0: + # do not save 0 length files (empty tail blob in streams) + # or if its been closed already + self.buffer.seek(0) + out_path = os.path.join(self.blob_dir, self.blob_hash) + producer = FileBodyProducer(self.buffer) + yield producer.startProducing(open(out_path, 'wb')) + self._is_open = False + if self.length > 0: + defer.returnValue(self.blob_hash) + else: + # 0 length files (empty tail blob in streams ) + # must return None as their blob_hash for + # it to be saved properly by EncryptedFileMetadataManagers + defer.returnValue(None) + + def write(self, data): + if not self._is_open: + raise IOError + self._hashsum.update(data) + self.len_so_far += len(data) + self.buffer.write(data) diff --git a/lbrynet/blob/reader.py b/lbrynet/blob/reader.py new file mode 100644 index 000000000..c85cc38f3 --- /dev/null +++ b/lbrynet/blob/reader.py @@ -0,0 +1,30 @@ +import logging +from twisted.internet import interfaces +from zope.interface import implements + +log = logging.getLogger(__name__) + + +class HashBlobReader(object): + implements(interfaces.IConsumer) + + def __init__(self, write_func): + self.write_func = write_func + + def registerProducer(self, producer, streaming): + from twisted.internet import reactor + + self.producer = producer + self.streaming = streaming + if self.streaming is False: + reactor.callLater(0, self.producer.resumeProducing) + + def unregisterProducer(self): + pass + + def write(self, data): + from twisted.internet import reactor + + self.write_func(data) + if self.streaming is False: + reactor.callLater(0, self.producer.resumeProducing) diff --git a/lbrynet/blob/writer.py b/lbrynet/blob/writer.py new file mode 100644 index 000000000..a95430386 --- /dev/null +++ b/lbrynet/blob/writer.py @@ -0,0 +1,53 @@ +import logging +from io import BytesIO +from twisted.python.failure import Failure +from lbrynet.core.Error import DownloadCanceledError, InvalidDataError +from lbrynet.core.cryptoutils import get_lbry_hash_obj + +log = logging.getLogger(__name__) + + +class HashBlobWriter(object): + def __init__(self, length_getter, finished_cb): + self.write_handle = BytesIO() + self.length_getter = length_getter + self.finished_cb = finished_cb + self.finished_cb_d = None + self._hashsum = get_lbry_hash_obj() + self.len_so_far = 0 + + @property + def blob_hash(self): + return self._hashsum.hexdigest() + + def write(self, data): + if self.write_handle is None: + log.exception("writer has already been closed") + raise IOError('I/O operation on closed file') + + self._hashsum.update(data) + self.len_so_far += len(data) + if self.len_so_far > self.length_getter(): + self.finished_cb_d = self.finished_cb( + self, + Failure(InvalidDataError("Length so far is greater than the expected length." + " %s to %s" % (self.len_so_far, + self.length_getter())))) + else: + self.write_handle.write(data) + if self.len_so_far == self.length_getter(): + self.finished_cb_d = self.finished_cb(self) + + def close_handle(self): + if self.write_handle is not None: + self.write_handle.close() + self.write_handle = None + + def close(self, reason=None): + # if we've already called finished_cb because we either finished writing + # or closed already, do nothing + if self.finished_cb_d is not None: + return + if reason is None: + reason = Failure(DownloadCanceledError()) + self.finished_cb_d = self.finished_cb(self, reason) diff --git a/lbrynet/conf.py b/lbrynet/conf.py index 8cacc41af..29f730f9b 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -251,7 +251,7 @@ ADJUSTABLE_SETTINGS = { 'download_directory': (str, default_download_dir), 'download_timeout': (int, 180), 'is_generous_host': (bool, True), - 'announce_head_blobs_only': (bool, False), + 'announce_head_blobs_only': (bool, True), 'known_dht_nodes': (list, DEFAULT_DHT_NODES, server_port), 'lbryum_wallet_dir': (str, default_lbryum_dir), 'max_connections_per_stream': (int, 5), @@ -272,7 +272,7 @@ ADJUSTABLE_SETTINGS = { # at every auto_re_reflect_interval seconds, useful if initial reflect is unreliable 'auto_re_reflect': (bool, True), 'auto_re_reflect_interval': (int, 3600), - 'reflector_servers': (list, [('reflector.lbry.io', 5566)], server_port), + 'reflector_servers': (list, [('reflector2.lbry.io', 5566)], server_port), 'run_reflector_server': (bool, False), 'sd_download_timeout': (int, 3), 'share_usage_data': (bool, True), # whether to share usage stats and diagnostic info with LBRY diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index b5407604c..6293db4ff 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -6,7 +6,8 @@ import sqlite3 from twisted.internet import threads, defer, reactor from twisted.enterprise import adbapi from lbrynet import conf -from lbrynet.core.HashBlob import BlobFile, BlobFileCreator +from lbrynet.blob.blob_file import BlobFile +from lbrynet.blob.creator import BlobFileCreator from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier from lbrynet.core.sqlite_helpers import rerun_if_locked @@ -29,7 +30,6 @@ class DiskBlobManager(DHTHashSupplier): self.blob_dir = blob_dir self.db_file = os.path.join(db_dir, "blobs.db") self.db_conn = adbapi.ConnectionPool('sqlite3', self.db_file, check_same_thread=False) - self.blob_type = BlobFile self.blob_creator_type = BlobFileCreator # TODO: consider using an LRU for blobs as there could potentially # be thousands of blobs loaded up, many stale @@ -51,7 +51,8 @@ class DiskBlobManager(DHTHashSupplier): """Return a blob identified by blob_hash, which may be a new blob or a blob that is already on the hard disk """ - assert length is None or isinstance(length, int) + if length is not None and not isinstance(length, int): + raise Exception("invalid length type: %s (%s)", length, str(type(length))) if blob_hash in self.blobs: return defer.succeed(self.blobs[blob_hash]) return self._make_new_blob(blob_hash, length) @@ -61,7 +62,7 @@ class DiskBlobManager(DHTHashSupplier): def _make_new_blob(self, blob_hash, length=None): log.debug('Making a new blob for %s', blob_hash) - blob = self.blob_type(self.blob_dir, blob_hash, length) + blob = BlobFile(self.blob_dir, blob_hash, length) self.blobs[blob_hash] = blob return defer.succeed(blob) @@ -87,12 +88,27 @@ class DiskBlobManager(DHTHashSupplier): def hashes_to_announce(self): return self._get_blobs_to_announce() + def set_should_announce(self, blob_hash, should_announce): + if blob_hash in self.blobs: + blob = self.blobs[blob_hash] + if blob.get_is_verified(): + return self._set_should_announce(blob_hash, + self.get_next_announce_time(), + should_announce) + return defer.succeed(False) + + def get_should_announce(self, blob_hash): + return self._should_announce(blob_hash) + def creator_finished(self, blob_creator, should_announce): log.debug("blob_creator.blob_hash: %s", blob_creator.blob_hash) - assert blob_creator.blob_hash is not None - assert blob_creator.blob_hash not in self.blobs - assert blob_creator.length is not None - new_blob = self.blob_type(self.blob_dir, blob_creator.blob_hash, blob_creator.length) + if blob_creator.blob_hash is None: + raise Exception("Blob hash is None") + if blob_creator.blob_hash in self.blobs: + raise Exception("Creator finished for blob that is already marked as completed") + if blob_creator.length is None: + raise Exception("Blob has a length of 0") + new_blob = BlobFile(self.blob_dir, blob_creator.blob_hash, blob_creator.length) self.blobs[blob_creator.blob_hash] = new_blob next_announce_time = self.get_next_announce_time() d = self.blob_completed(new_blob, next_announce_time, should_announce) @@ -112,6 +128,16 @@ class DiskBlobManager(DHTHashSupplier): d = self._add_blob_to_download_history(blob_hash, host, rate) return d + @defer.inlineCallbacks + def get_host_downloaded_from(self, blob_hash): + query_str = "SELECT host FROM download WHERE blob=? ORDER BY ts DESC LIMIT 1" + host = yield self.db_conn.runQuery(query_str, (blob_hash,)) + if host: + result = host[0][0] + else: + result = None + defer.returnValue(result) + def add_blob_to_upload_history(self, blob_hash, host, rate): d = self._add_blob_to_upload_history(blob_hash, host, rate) return d @@ -137,6 +163,7 @@ class DiskBlobManager(DHTHashSupplier): # threads. def create_tables(transaction): + transaction.execute('PRAGMA journal_mode=WAL') transaction.execute("create table if not exists blobs (" + " blob_hash text primary key, " + " blob_length integer, " + @@ -165,14 +192,29 @@ class DiskBlobManager(DHTHashSupplier): def _add_completed_blob(self, blob_hash, length, next_announce_time, should_announce): log.debug("Adding a completed blob. blob_hash=%s, length=%s", blob_hash, str(length)) should_announce = 1 if should_announce else 0 - d = self.db_conn.runQuery( - "insert into blobs (blob_hash, blob_length, next_announce_time, should_announce) "+ - "values (?, ?, ?, ?)", - (blob_hash, length, next_announce_time, should_announce) - ) + d = self.db_conn.runQuery("insert into blobs (blob_hash, blob_length, next_announce_time, " + "should_announce) values (?, ?, ?, ?)", (blob_hash, length, + next_announce_time, + should_announce)) + # TODO: why is this here? d.addErrback(lambda err: err.trap(sqlite3.IntegrityError)) return d + @rerun_if_locked + @defer.inlineCallbacks + def _set_should_announce(self, blob_hash, next_announce_time, should_announce): + yield self.db_conn.runOperation("update blobs set next_announce_time=?, should_announce=? " + "where blob_hash=?", (next_announce_time, should_announce, + blob_hash)) + defer.returnValue(True) + + @rerun_if_locked + @defer.inlineCallbacks + def _should_announce(self, blob_hash): + result = yield self.db_conn.runQuery("select should_announce from blobs where blob_hash=?", + (blob_hash,)) + defer.returnValue(result[0][0]) + @defer.inlineCallbacks def _completed_blobs(self, blobhashes_to_check): """Returns of the blobhashes_to_check, which are valid""" @@ -187,7 +229,6 @@ class DiskBlobManager(DHTHashSupplier): @rerun_if_locked def _get_blobs_to_announce(self): - def get_and_update(transaction): timestamp = time.time() if self.announce_head_blobs_only is True: @@ -225,6 +266,14 @@ class DiskBlobManager(DHTHashSupplier): d = self.db_conn.runQuery("select blob_hash from blobs") return d + @rerun_if_locked + @defer.inlineCallbacks + def _get_all_should_announce_blob_hashes(self): + # return a list of blob hashes where should_announce is True + blob_hashes = yield self.db_conn.runQuery( + "select blob_hash from blobs where should_announce = 1") + defer.returnValue([d[0] for d in blob_hashes]) + @rerun_if_locked def _get_all_verified_blob_hashes(self): d = self._get_all_blob_hashes() @@ -255,5 +304,3 @@ class DiskBlobManager(DHTHashSupplier): "insert into upload values (null, ?, ?, ?, ?) ", (blob_hash, str(host), float(rate), ts)) return d - - diff --git a/lbrynet/core/Error.py b/lbrynet/core/Error.py index d1e8bb785..4822b7342 100644 --- a/lbrynet/core/Error.py +++ b/lbrynet/core/Error.py @@ -9,11 +9,26 @@ class DuplicateStreamHashError(Exception): class DownloadCanceledError(Exception): pass + +class DownloadSDTimeout(Exception): + def __init__(self, download): + Exception.__init__(self, 'Failed to download sd blob {} within timeout'.format(download)) + self.download = download + + class DownloadTimeoutError(Exception): def __init__(self, download): Exception.__init__(self, 'Failed to download {} within timeout'.format(download)) self.download = download + +class DownloadDataTimeout(Exception): + def __init__(self, download): + Exception.__init__(self, 'Failed to download data blobs for sd hash ' + '{} within timeout'.format(download)) + self.download = download + + class RequestCanceledError(Exception): pass diff --git a/lbrynet/core/HashBlob.py b/lbrynet/core/HashBlob.py deleted file mode 100644 index d80cede96..000000000 --- a/lbrynet/core/HashBlob.py +++ /dev/null @@ -1,413 +0,0 @@ -from StringIO import StringIO -import logging -import os -import tempfile -import threading -import shutil -from twisted.internet import interfaces, defer, threads -from twisted.protocols.basic import FileSender -from twisted.python.failure import Failure -from zope.interface import implements -from lbrynet import conf -from lbrynet.core.Error import DownloadCanceledError, InvalidDataError -from lbrynet.core.cryptoutils import get_lbry_hash_obj -from lbrynet.core.utils import is_valid_blobhash - - -log = logging.getLogger(__name__) - - -class HashBlobReader(object): - implements(interfaces.IConsumer) - - def __init__(self, write_func): - self.write_func = write_func - - def registerProducer(self, producer, streaming): - - from twisted.internet import reactor - - self.producer = producer - self.streaming = streaming - if self.streaming is False: - reactor.callLater(0, self.producer.resumeProducing) - - def unregisterProducer(self): - pass - - def write(self, data): - - from twisted.internet import reactor - - self.write_func(data) - if self.streaming is False: - reactor.callLater(0, self.producer.resumeProducing) - - -class HashBlobWriter(object): - def __init__(self, write_handle, length_getter, finished_cb): - self.write_handle = write_handle - self.length_getter = length_getter - self.finished_cb = finished_cb - self._hashsum = get_lbry_hash_obj() - self.len_so_far = 0 - - @property - def blob_hash(self): - return self._hashsum.hexdigest() - - def write(self, data): - self._hashsum.update(data) - self.len_so_far += len(data) - if self.len_so_far > self.length_getter(): - self.finished_cb( - self, - Failure(InvalidDataError("Length so far is greater than the expected length." - " %s to %s" % (self.len_so_far, - self.length_getter())))) - else: - if self.write_handle is None: - log.debug("Tried to write to a write_handle that was None.") - return - self.write_handle.write(data) - if self.len_so_far == self.length_getter(): - self.finished_cb(self) - - def cancel(self, reason=None): - if reason is None: - reason = Failure(DownloadCanceledError()) - self.finished_cb(self, reason) - - -class HashBlob(object): - """A chunk of data available on the network which is specified by a hashsum""" - - def __init__(self, blob_hash, length=None): - assert is_valid_blobhash(blob_hash) - self.blob_hash = blob_hash - self.length = length - self.writers = {} # {Peer: writer, finished_deferred} - self.finished_deferred = None - self._verified = False - self.readers = 0 - - @property - def verified(self): - # protect verified from being modified by other classes - return self._verified - - def set_length(self, length): - if self.length is not None and length == self.length: - return True - if self.length is None and 0 <= length <= conf.settings['BLOB_SIZE']: - self.length = length - return True - log.warning("Got an invalid length. Previous length: %s, Invalid length: %s", - self.length, length) - return False - - def get_length(self): - return self.length - - def is_validated(self): - return bool(self._verified) - - def is_downloading(self): - if self.writers: - return True - return False - - def read(self, write_func): - - def close_self(*args): - self.close_read_handle(file_handle) - return args[0] - - file_sender = FileSender() - reader = HashBlobReader(write_func) - file_handle = self.open_for_reading() - if file_handle is not None: - d = file_sender.beginFileTransfer(file_handle, reader) - d.addCallback(close_self) - else: - d = defer.fail(ValueError("Could not read the blob")) - return d - - def writer_finished(self, writer, err=None): - - def fire_finished_deferred(): - self._verified = True - for p, (w, finished_deferred) in self.writers.items(): - if w == writer: - finished_deferred.callback(self) - del self.writers[p] - return True - log.warning( - "Somehow, the writer that was accepted as being valid was already removed: %s", - writer) - return False - - def errback_finished_deferred(err): - for p, (w, finished_deferred) in self.writers.items(): - if w == writer: - finished_deferred.errback(err) - del self.writers[p] - - def cancel_other_downloads(): - for p, (w, finished_deferred) in self.writers.items(): - w.cancel() - - if err is None: - if writer.len_so_far == self.length and writer.blob_hash == self.blob_hash: - if self._verified is False: - d = self._save_verified_blob(writer) - d.addCallbacks(lambda _: fire_finished_deferred(), errback_finished_deferred) - d.addCallback(lambda _: cancel_other_downloads()) - else: - errback_finished_deferred(Failure(DownloadCanceledError())) - d = defer.succeed(True) - else: - err_string = "length vs expected: {0}, {1}, hash vs expected: {2}, {3}" - err_string = err_string.format(self.length, writer.len_so_far, self.blob_hash, - writer.blob_hash) - errback_finished_deferred(Failure(InvalidDataError(err_string))) - d = defer.succeed(True) - else: - errback_finished_deferred(err) - d = defer.succeed(True) - - d.addBoth(lambda _: self._close_writer(writer)) - return d - - def open_for_writing(self, peer): - pass - - def open_for_reading(self): - pass - - def delete(self): - pass - - def close_read_handle(self, file_handle): - pass - - def _close_writer(self, writer): - pass - - def _save_verified_blob(self, writer): - pass - - def __str__(self): - return self.blob_hash[:16] - - def __repr__(self): - return '<{}({})>'.format(self.__class__.__name__, str(self)) - - -class BlobFile(HashBlob): - """A HashBlob which will be saved to the hard disk of the downloader""" - - def __init__(self, blob_dir, *args): - HashBlob.__init__(self, *args) - self.blob_dir = blob_dir - self.file_path = os.path.join(blob_dir, self.blob_hash) - self.setting_verified_blob_lock = threading.Lock() - self.moved_verified_blob = False - if os.path.isfile(self.file_path): - self.set_length(os.path.getsize(self.file_path)) - # This assumes that the hash of the blob has already been - # checked as part of the blob creation process. It might - # be worth having a function that checks the actual hash; - # its probably too expensive to have that check be part of - # this call. - self._verified = True - - def open_for_writing(self, peer): - if not peer in self.writers: - log.debug("Opening %s to be written by %s", str(self), str(peer)) - write_file = tempfile.NamedTemporaryFile(delete=False, dir=self.blob_dir) - finished_deferred = defer.Deferred() - writer = HashBlobWriter(write_file, self.get_length, self.writer_finished) - - self.writers[peer] = (writer, finished_deferred) - return finished_deferred, writer.write, writer.cancel - log.warning("Tried to download the same file twice simultaneously from the same peer") - return None, None, None - - def open_for_reading(self): - if self._verified is True: - file_handle = None - try: - file_handle = open(self.file_path, 'rb') - self.readers += 1 - return file_handle - except IOError: - log.exception('Failed to open %s', self.file_path) - self.close_read_handle(file_handle) - return None - - def delete(self): - if not self.writers and not self.readers: - self._verified = False - self.moved_verified_blob = False - - def delete_from_file_system(): - if os.path.isfile(self.file_path): - os.remove(self.file_path) - - d = threads.deferToThread(delete_from_file_system) - - def log_error(err): - log.warning("An error occurred deleting %s: %s", - str(self.file_path), err.getErrorMessage()) - return err - - d.addErrback(log_error) - return d - else: - return defer.fail(Failure( - ValueError("File is currently being read or written and cannot be deleted"))) - - def close_read_handle(self, file_handle): - if file_handle is not None: - file_handle.close() - self.readers -= 1 - - def _close_writer(self, writer): - if writer.write_handle is not None: - log.debug("Closing %s", str(self)) - name = writer.write_handle.name - writer.write_handle.close() - threads.deferToThread(os.remove, name) - writer.write_handle = None - - def _save_verified_blob(self, writer): - - def move_file(): - with self.setting_verified_blob_lock: - if self.moved_verified_blob is False: - temp_file_name = writer.write_handle.name - writer.write_handle.close() - shutil.move(temp_file_name, self.file_path) - writer.write_handle = None - self.moved_verified_blob = True - return True - else: - raise DownloadCanceledError() - - return threads.deferToThread(move_file) - - -class TempBlob(HashBlob): - """A HashBlob which will only exist in memory""" - def __init__(self, *args): - HashBlob.__init__(self, *args) - self.data_buffer = "" - - def open_for_writing(self, peer): - if not peer in self.writers: - temp_buffer = StringIO() - finished_deferred = defer.Deferred() - writer = HashBlobWriter(temp_buffer, self.get_length, self.writer_finished) - - self.writers[peer] = (writer, finished_deferred) - return finished_deferred, writer.write, writer.cancel - return None, None, None - - def open_for_reading(self): - if self._verified is True: - return StringIO(self.data_buffer) - return None - - def delete(self): - if not self.writers and not self.readers: - self._verified = False - self.data_buffer = '' - return defer.succeed(True) - else: - return defer.fail(Failure( - ValueError("Blob is currently being read or written and cannot be deleted"))) - - def close_read_handle(self, file_handle): - file_handle.close() - - def _close_writer(self, writer): - if writer.write_handle is not None: - writer.write_handle.close() - writer.write_handle = None - - def _save_verified_blob(self, writer): - if not self.data_buffer: - self.data_buffer = writer.write_handle.getvalue() - writer.write_handle.close() - writer.write_handle = None - return defer.succeed(True) - else: - return defer.fail(Failure(DownloadCanceledError())) - - -class HashBlobCreator(object): - def __init__(self): - self._hashsum = get_lbry_hash_obj() - self.len_so_far = 0 - self.blob_hash = None - self.length = None - - def open(self): - pass - - def close(self): - self.length = self.len_so_far - if self.length == 0: - self.blob_hash = None - else: - self.blob_hash = self._hashsum.hexdigest() - d = self._close() - if self.blob_hash is not None: - d.addCallback(lambda _: self.blob_hash) - else: - d.addCallback(lambda _: None) - return d - - def write(self, data): - self._hashsum.update(data) - self.len_so_far += len(data) - self._write(data) - - def _close(self): - pass - - def _write(self, data): - pass - - -class BlobFileCreator(HashBlobCreator): - def __init__(self, blob_dir): - HashBlobCreator.__init__(self) - self.blob_dir = blob_dir - self.out_file = tempfile.NamedTemporaryFile(delete=False, dir=self.blob_dir) - - def _close(self): - temp_file_name = self.out_file.name - self.out_file.close() - if self.blob_hash is not None: - shutil.move(temp_file_name, os.path.join(self.blob_dir, self.blob_hash)) - else: - os.remove(temp_file_name) - return defer.succeed(True) - - def _write(self, data): - self.out_file.write(data) - - -class TempBlobCreator(HashBlobCreator): - def __init__(self): - HashBlobCreator.__init__(self) - # TODO: use StringIO - self.data_buffer = '' - - def _close(self): - return defer.succeed(True) - - def _write(self, data): - self.data_buffer += data diff --git a/lbrynet/core/StreamCreator.py b/lbrynet/core/StreamCreator.py deleted file mode 100644 index 4aa0ae542..000000000 --- a/lbrynet/core/StreamCreator.py +++ /dev/null @@ -1,76 +0,0 @@ -import logging -from twisted.internet import interfaces, defer -from zope.interface import implements - - -log = logging.getLogger(__name__) - - -class StreamCreator(object): - """Classes which derive from this class create a 'stream', which can be any - collection of associated blobs and associated metadata. These classes - use the IConsumer interface to get data from an IProducer and transform - the data into a 'stream'""" - - implements(interfaces.IConsumer) - - def __init__(self, name): - """ - @param name: the name of the stream - """ - self.name = name - self.stopped = True - self.producer = None - self.streaming = None - self.blob_count = -1 - self.current_blob = None - self.finished_deferreds = [] - - def _blob_finished(self, blob_info): - pass - - def registerProducer(self, producer, streaming): - - from twisted.internet import reactor - - self.producer = producer - self.streaming = streaming - self.stopped = False - if streaming is False: - reactor.callLater(0, self.producer.resumeProducing) - - def unregisterProducer(self): - self.stopped = True - self.producer = None - - def stop(self): - """Stop creating the stream. Create the terminating zero-length blob.""" - log.debug("stop has been called for StreamCreator") - self.stopped = True - if self.current_blob is not None: - current_blob = self.current_blob - d = current_blob.close() - d.addCallback(self._blob_finished) - self.finished_deferreds.append(d) - self.current_blob = None - self._finalize() - dl = defer.DeferredList(self.finished_deferreds) - dl.addCallback(lambda _: self._finished()) - return dl - - def _finalize(self): - pass - - def _finished(self): - pass - - # TODO: move the stream creation process to its own thread and - # remove the reactor from this process. - def write(self, data): - from twisted.internet import reactor - self._write(data) - if self.stopped is False and self.streaming is False: - reactor.callLater(0, self.producer.resumeProducing) - - def _write(self, data): - pass diff --git a/lbrynet/core/client/BlobRequester.py b/lbrynet/core/client/BlobRequester.py index 95ffaa327..37185bbf0 100644 --- a/lbrynet/core/client/BlobRequester.py +++ b/lbrynet/core/client/BlobRequester.py @@ -4,6 +4,7 @@ from decimal import Decimal from twisted.internet import defer from twisted.python.failure import Failure +from twisted.internet.error import ConnectionAborted from zope.interface import implements from lbrynet.core.Error import ConnectionClosedBeforeResponseError @@ -225,7 +226,8 @@ class RequestHelper(object): self.requestor._update_local_score(self.peer, score) def _request_failed(self, reason, request_type): - if reason.check(RequestCanceledError): + if reason.check(DownloadCanceledError, RequestCanceledError, ConnectionAborted, + ConnectionClosedBeforeResponseError): return if reason.check(NoResponseError): self.requestor._incompatible_peers.append(self.peer) @@ -463,13 +465,13 @@ class DownloadRequest(RequestHelper): def find_blob(self, to_download): """Return the first blob in `to_download` that is successfully opened for write.""" for blob in to_download: - if blob.is_validated(): + if blob.get_is_verified(): log.debug('Skipping blob %s as its already validated', blob) continue - d, write_func, cancel_func = blob.open_for_writing(self.peer) + writer, d = blob.open_for_writing(self.peer) if d is not None: - return BlobDownloadDetails(blob, d, write_func, cancel_func, self.peer) - log.debug('Skipping blob %s as there was an issue opening it for writing', blob) + return BlobDownloadDetails(blob, d, writer.write, writer.close, self.peer) + log.warning('Skipping blob %s as there was an issue opening it for writing', blob) return None def _make_request(self, blob_details): @@ -514,8 +516,6 @@ class DownloadRequest(RequestHelper): def _pay_or_cancel_payment(self, arg, reserved_points, blob): if self._can_pay_peer(blob, arg): self._pay_peer(blob.length, reserved_points) - d = self.requestor.blob_manager.add_blob_to_download_history( - str(blob), str(self.peer.host), float(self.protocol_prices[self.protocol])) else: self._cancel_points(reserved_points) return arg @@ -565,8 +565,11 @@ class DownloadRequest(RequestHelper): self.peer.update_stats('blobs_downloaded', 1) self.peer.update_score(5.0) should_announce = blob.blob_hash == self.head_blob_hash - self.requestor.blob_manager.blob_completed(blob, should_announce=should_announce) - return arg + d = self.requestor.blob_manager.blob_completed(blob, should_announce=should_announce) + d.addCallback(lambda _: self.requestor.blob_manager.add_blob_to_download_history( + blob.blob_hash, self.peer.host, self.protocol_prices[self.protocol])) + d.addCallback(lambda _: arg) + return d def _download_failed(self, reason): if not reason.check(DownloadCanceledError, PriceDisagreementError): diff --git a/lbrynet/core/client/ClientProtocol.py b/lbrynet/core/client/ClientProtocol.py index a9dca8307..b8861a6ab 100644 --- a/lbrynet/core/client/ClientProtocol.py +++ b/lbrynet/core/client/ClientProtocol.py @@ -50,6 +50,7 @@ class ClientProtocol(Protocol, TimeoutMixin): log.debug("Data receieved from %s", self.peer) self.setTimeout(None) self._rate_limiter.report_dl_bytes(len(data)) + if self._downloading_blob is True: self._blob_download_request.write(data) else: @@ -101,8 +102,7 @@ class ClientProtocol(Protocol, TimeoutMixin): d = self.add_request(blob_request) self._blob_download_request = blob_request blob_request.finished_deferred.addCallbacks(self._downloading_finished, - self._downloading_failed) - blob_request.finished_deferred.addErrback(self._handle_response_error) + self._handle_response_error) return d else: raise ValueError("There is already a blob download request active") @@ -110,7 +110,7 @@ class ClientProtocol(Protocol, TimeoutMixin): def cancel_requests(self): self.connection_closing = True ds = [] - err = failure.Failure(RequestCanceledError()) + err = RequestCanceledError() for key, d in self._response_deferreds.items(): del self._response_deferreds[key] d.errback(err) @@ -119,6 +119,7 @@ class ClientProtocol(Protocol, TimeoutMixin): self._blob_download_request.cancel(err) ds.append(self._blob_download_request.finished_deferred) self._blob_download_request = None + self._downloading_blob = False return defer.DeferredList(ds) ######### Internal request handling ######### @@ -176,15 +177,24 @@ class ClientProtocol(Protocol, TimeoutMixin): def _handle_response_error(self, err): # If an error gets to this point, log it and kill the connection. - expected_errors = (MisbehavingPeerError, ConnectionClosedBeforeResponseError, - DownloadCanceledError, RequestCanceledError) - if not err.check(expected_errors): + if err.check(DownloadCanceledError, RequestCanceledError, error.ConnectionAborted): + # TODO: (wish-list) it seems silly to close the connection over this, and it shouldn't + # TODO: always be this way. it's done this way now because the client has no other way + # TODO: of telling the server it wants the download to stop. It would be great if the + # TODO: protocol had such a mechanism. + log.info("Closing the connection to %s because the download of blob %s was canceled", + self.peer, self._blob_download_request.blob) + result = None + elif not err.check(MisbehavingPeerError, ConnectionClosedBeforeResponseError): + log.warning("The connection to %s is closing due to: %s", self.peer, err) + result = err + else: log.error("The connection to %s is closing due to an unexpected error: %s", - self.peer, err.getErrorMessage()) - if not err.check(RequestCanceledError): - # The connection manager is closing the connection, so - # there's no need to do it here. - return err + self.peer, err) + result = err + + self.transport.loseConnection() + return result def _handle_response(self, response): ds = [] @@ -225,7 +235,7 @@ class ClientProtocol(Protocol, TimeoutMixin): log.debug("Asking for another request from %s", self.peer) self._ask_for_request() else: - log.debug("Not asking for another request from %s", self.peer) + log.warning("Not asking for another request from %s", self.peer) self.transport.loseConnection() dl.addCallback(get_next_request) @@ -236,16 +246,6 @@ class ClientProtocol(Protocol, TimeoutMixin): self._downloading_blob = False return arg - def _downloading_failed(self, err): - if err.check(DownloadCanceledError): - # TODO: (wish-list) it seems silly to close the connection over this, and it shouldn't - # TODO: always be this way. it's done this way now because the client has no other way - # TODO: of telling the server it wants the download to stop. It would be great if the - # TODO: protocol had such a mechanism. - log.debug("Closing the connection to %s because the download of blob %s was canceled", - self.peer, self._blob_download_request.blob) - return err - ######### IRateLimited ######### def throttle_upload(self): diff --git a/lbrynet/core/client/StandaloneBlobDownloader.py b/lbrynet/core/client/StandaloneBlobDownloader.py index 97dc4727f..10509fd27 100644 --- a/lbrynet/core/client/StandaloneBlobDownloader.py +++ b/lbrynet/core/client/StandaloneBlobDownloader.py @@ -5,7 +5,7 @@ from lbrynet.core.BlobInfo import BlobInfo from lbrynet.core.client.BlobRequester import BlobRequester from lbrynet.core.client.ConnectionManager import ConnectionManager from lbrynet.core.client.DownloadManager import DownloadManager -from lbrynet.core.Error import InvalidBlobHashError, DownloadTimeoutError +from lbrynet.core.Error import InvalidBlobHashError, DownloadSDTimeout from lbrynet.core.utils import is_valid_blobhash, safe_start_looping_call, safe_stop_looping_call from twisted.python.failure import Failure from twisted.internet import defer @@ -64,14 +64,14 @@ class SingleProgressManager(object): def stream_position(self): blobs = self.download_manager.blobs - if blobs and blobs[0].is_validated(): + if blobs and blobs[0].get_is_verified(): return 1 return 0 def needed_blobs(self): blobs = self.download_manager.blobs assert len(blobs) == 1 - return [b for b in blobs.itervalues() if not b.is_validated()] + return [b for b in blobs.itervalues() if not b.get_is_verified()] class DummyBlobHandler(object): @@ -136,7 +136,7 @@ class StandaloneBlobDownloader(object): def _download_timedout(self): self.stop() if not self.finished_deferred.called: - self.finished_deferred.errback(DownloadTimeoutError(self.blob_hash)) + self.finished_deferred.errback(DownloadSDTimeout(self.blob_hash)) def insufficient_funds(self, err): self.stop() diff --git a/lbrynet/core/client/StreamProgressManager.py b/lbrynet/core/client/StreamProgressManager.py index 29aea9d1a..bc16fe560 100644 --- a/lbrynet/core/client/StreamProgressManager.py +++ b/lbrynet/core/client/StreamProgressManager.py @@ -93,7 +93,7 @@ class FullStreamProgressManager(StreamProgressManager): return ( i not in blobs or ( - not blobs[i].is_validated() and + not blobs[i].get_is_verified() and i not in self.provided_blob_nums ) ) @@ -112,7 +112,7 @@ class FullStreamProgressManager(StreamProgressManager): blobs = self.download_manager.blobs return [ b for n, b in blobs.iteritems() - if not b.is_validated() and not n in self.provided_blob_nums + if not b.get_is_verified() and not n in self.provided_blob_nums ] ######### internal ######### @@ -145,7 +145,7 @@ class FullStreamProgressManager(StreamProgressManager): current_blob_num = self.last_blob_outputted + 1 - if current_blob_num in blobs and blobs[current_blob_num].is_validated(): + if current_blob_num in blobs and blobs[current_blob_num].get_is_verified(): log.debug("Outputting blob %s", str(self.last_blob_outputted + 1)) self.provided_blob_nums.append(self.last_blob_outputted + 1) d = self.download_manager.handle_blob(self.last_blob_outputted + 1) @@ -154,10 +154,11 @@ class FullStreamProgressManager(StreamProgressManager): d.addCallback(lambda _: check_if_finished()) def log_error(err): - log.warning("Error occurred in the output loop. Error: %s", err.getErrorMessage()) + log.warning("Error occurred in the output loop. Error: %s", err) if self.outputting_d is not None and not self.outputting_d.called: self.outputting_d.callback(True) self.outputting_d = None + self.stop() d.addErrback(log_error) else: diff --git a/lbrynet/core/cryptoutils.py b/lbrynet/core/cryptoutils.py index 7c0c5c40c..2528c7e69 100644 --- a/lbrynet/core/cryptoutils.py +++ b/lbrynet/core/cryptoutils.py @@ -1,9 +1,9 @@ -from Crypto.Hash import SHA384 import seccure +import hashlib def get_lbry_hash_obj(): - return SHA384.new() + return hashlib.sha384() def get_pub_key(pass_phrase): diff --git a/lbrynet/core/server/BlobRequestHandler.py b/lbrynet/core/server/BlobRequestHandler.py index b95b3ca84..de98cf898 100644 --- a/lbrynet/core/server/BlobRequestHandler.py +++ b/lbrynet/core/server/BlobRequestHandler.py @@ -143,7 +143,7 @@ class BlobRequestHandler(object): def open_blob_for_reading(self, blob, response): response_fields = {} d = defer.succeed(None) - if blob.is_validated(): + if blob.get_is_verified(): read_handle = blob.open_for_reading() if read_handle is not None: self.currently_uploading = blob @@ -162,7 +162,7 @@ class BlobRequestHandler(object): def record_transaction(self, blob): d = self.blob_manager.add_blob_to_upload_history( - str(blob), self.peer.host, self.blob_data_payment_rate) + blob.blob_hash, self.peer.host, self.blob_data_payment_rate) return d def _reply_to_send_request(self, response, incoming): diff --git a/lbrynet/cryptstream/CryptBlob.py b/lbrynet/cryptstream/CryptBlob.py index 0b26d6f89..a7303a588 100644 --- a/lbrynet/cryptstream/CryptBlob.py +++ b/lbrynet/cryptstream/CryptBlob.py @@ -1,11 +1,16 @@ import binascii import logging -from Crypto.Cipher import AES +from twisted.internet import defer +from cryptography.hazmat.primitives.ciphers import Cipher, modes +from cryptography.hazmat.primitives.ciphers.algorithms import AES +from cryptography.hazmat.primitives.padding import PKCS7 +from cryptography.hazmat.backends import default_backend from lbrynet import conf from lbrynet.core.BlobInfo import BlobInfo log = logging.getLogger(__name__) +backend = default_backend() class CryptBlobInfo(BlobInfo): @@ -31,7 +36,9 @@ class StreamBlobDecryptor(object): self.length = length self.buff = b'' self.len_read = 0 - self.cipher = AES.new(self.key, AES.MODE_CBC, self.iv) + cipher = Cipher(AES(self.key), modes.CBC(self.iv), backend=backend) + self.unpadder = PKCS7(AES.block_size).unpadder() + self.cipher = cipher.decryptor() def decrypt(self, write_func): """ @@ -42,22 +49,23 @@ class StreamBlobDecryptor(object): """ def remove_padding(data): - pad_len = ord(data[-1]) - data, padding = data[:-1 * pad_len], data[-1 * pad_len:] - for c in padding: - assert ord(c) == pad_len - return data + return self.unpadder.update(data) + self.unpadder.finalize() def write_bytes(): if self.len_read < self.length: - num_bytes_to_decrypt = greatest_multiple(len(self.buff), self.cipher.block_size) + num_bytes_to_decrypt = greatest_multiple(len(self.buff), (AES.block_size / 8)) data_to_decrypt, self.buff = split(self.buff, num_bytes_to_decrypt) - write_func(self.cipher.decrypt(data_to_decrypt)) + write_func(self.cipher.update(data_to_decrypt)) def finish_decrypt(): - assert len(self.buff) % self.cipher.block_size == 0 + bytes_left = len(self.buff) % (AES.block_size / 8) + if bytes_left != 0: + log.warning(self.buff[-1 * (AES.block_size / 8):].encode('hex')) + raise Exception("blob %s has incorrect padding: %i bytes left" % + (self.blob.blob_hash, bytes_left)) data_to_decrypt, self.buff = self.buff, b'' - write_func(remove_padding(self.cipher.decrypt(data_to_decrypt))) + last_chunk = self.cipher.update(data_to_decrypt) + self.cipher.finalize() + write_func(remove_padding(last_chunk)) def decrypt_bytes(data): self.buff += data @@ -84,8 +92,9 @@ class CryptStreamBlobMaker(object): self.iv = iv self.blob_num = blob_num self.blob = blob - self.cipher = AES.new(self.key, AES.MODE_CBC, self.iv) - self.buff = b'' + cipher = Cipher(AES(self.key), modes.CBC(self.iv), backend=backend) + self.padder = PKCS7(AES.block_size).padder() + self.cipher = cipher.encryptor() self.length = 0 def write(self, data): @@ -104,39 +113,26 @@ class CryptStreamBlobMaker(object): done = True else: num_bytes_to_write = len(data) - self.length += num_bytes_to_write data_to_write = data[:num_bytes_to_write] - self.buff += data_to_write - self._write_buffer() + self.length += len(data_to_write) + padded_data = self.padder.update(data_to_write) + encrypted_data = self.cipher.update(padded_data) + self.blob.write(encrypted_data) return done, num_bytes_to_write + @defer.inlineCallbacks def close(self): log.debug("closing blob %s with plaintext len %s", str(self.blob_num), str(self.length)) if self.length != 0: - self._close_buffer() - d = self.blob.close() - d.addCallback(self._return_info) + self.length += (AES.block_size / 8) - (self.length % (AES.block_size / 8)) + padded_data = self.padder.finalize() + encrypted_data = self.cipher.update(padded_data) + self.cipher.finalize() + self.blob.write(encrypted_data) + + blob_hash = yield self.blob.close() log.debug("called the finished_callback from CryptStreamBlobMaker.close") - return d - - def _write_buffer(self): - num_bytes_to_encrypt = (len(self.buff) // AES.block_size) * AES.block_size - data_to_encrypt, self.buff = split(self.buff, num_bytes_to_encrypt) - encrypted_data = self.cipher.encrypt(data_to_encrypt) - self.blob.write(encrypted_data) - - def _close_buffer(self): - data_to_encrypt, self.buff = self.buff, b'' - assert len(data_to_encrypt) < AES.block_size - pad_len = AES.block_size - len(data_to_encrypt) - padded_data = data_to_encrypt + chr(pad_len) * pad_len - self.length += pad_len - assert len(padded_data) == AES.block_size - encrypted_data = self.cipher.encrypt(padded_data) - self.blob.write(encrypted_data) - - def _return_info(self, blob_hash): - return CryptBlobInfo(blob_hash, self.blob_num, self.length, binascii.hexlify(self.iv)) + blob = CryptBlobInfo(blob_hash, self.blob_num, self.length, binascii.hexlify(self.iv)) + defer.returnValue(blob) def greatest_multiple(a, b): diff --git a/lbrynet/cryptstream/CryptStreamCreator.py b/lbrynet/cryptstream/CryptStreamCreator.py index e5b3c8bf3..9c94ad476 100644 --- a/lbrynet/cryptstream/CryptStreamCreator.py +++ b/lbrynet/cryptstream/CryptStreamCreator.py @@ -3,25 +3,27 @@ Utility for creating Crypt Streams, which are encrypted blobs and associated met """ import logging - +from twisted.internet import interfaces, defer +from zope.interface import implements from Crypto import Random from Crypto.Cipher import AES - -from twisted.internet import defer -from lbrynet.core.StreamCreator import StreamCreator from lbrynet.cryptstream.CryptBlob import CryptStreamBlobMaker log = logging.getLogger(__name__) -class CryptStreamCreator(StreamCreator): - """Create a new stream with blobs encrypted by a symmetric cipher. +class CryptStreamCreator(object): + """ + Create a new stream with blobs encrypted by a symmetric cipher. Each blob is encrypted with the same key, but each blob has its own initialization vector which is associated with the blob when the blob is associated with the stream. """ + + implements(interfaces.IConsumer) + def __init__(self, blob_manager, name=None, key=None, iv_generator=None): """@param blob_manager: Object that stores and provides access to blobs. @type blob_manager: BlobManager @@ -39,14 +41,59 @@ class CryptStreamCreator(StreamCreator): @return: None """ - StreamCreator.__init__(self, name) self.blob_manager = blob_manager + self.name = name self.key = key if iv_generator is None: self.iv_generator = self.random_iv_generator() else: self.iv_generator = iv_generator + self.stopped = True + self.producer = None + self.streaming = None + self.blob_count = -1 + self.current_blob = None + self.finished_deferreds = [] + + def registerProducer(self, producer, streaming): + from twisted.internet import reactor + + self.producer = producer + self.streaming = streaming + self.stopped = False + if streaming is False: + reactor.callLater(0, self.producer.resumeProducing) + + def unregisterProducer(self): + self.stopped = True + self.producer = None + + def stop(self): + """Stop creating the stream. Create the terminating zero-length blob.""" + log.debug("stop has been called for StreamCreator") + self.stopped = True + if self.current_blob is not None: + current_blob = self.current_blob + d = current_blob.close() + d.addCallback(self._blob_finished) + d.addErrback(self._error) + self.finished_deferreds.append(d) + self.current_blob = None + self._finalize() + dl = defer.DeferredList(self.finished_deferreds) + dl.addCallback(lambda _: self._finished()) + dl.addErrback(self._error) + return dl + + # TODO: move the stream creation process to its own thread and + # remove the reactor from this process. + def write(self, data): + from twisted.internet import reactor + self._write(data) + if self.stopped is False and self.streaming is False: + reactor.callLater(0, self.producer.resumeProducing) + @staticmethod def random_iv_generator(): while 1: @@ -77,11 +124,6 @@ class CryptStreamCreator(StreamCreator): self.finished_deferreds.append(d) def _write(self, data): - def close_blob(blob): - d = blob.close() - d.addCallback(self._blob_finished) - self.finished_deferreds.append(d) - while len(data) > 0: if self.current_blob is None: self.next_blob_creator = self.blob_manager.get_blob_creator() @@ -94,10 +136,19 @@ class CryptStreamCreator(StreamCreator): should_announce = self.blob_count == 0 d = self.current_blob.close() d.addCallback(self._blob_finished) - d.addCallback(lambda _: self.blob_manager.creator_finished( - self.next_blob_creator, should_announce)) + d.addCallback(lambda blob_info: self.blob_manager.creator_finished(blob_info, + should_announce)) self.finished_deferreds.append(d) self.current_blob = None def _get_blob_maker(self, iv, blob_creator): return CryptStreamBlobMaker(self.key, iv, self.blob_count, blob_creator) + + def _error(self, error): + log.error(error) + + def _finished(self): + raise NotImplementedError() + + def _blob_finished(self, blob_info): + raise NotImplementedError() diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index fd05afcbd..63d7d1461 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -29,7 +29,6 @@ from lbrynet.reflector import reupload from lbrynet.reflector import ServerFactory as reflector_server_factory from lbrynet.core.log_support import configure_loggly_handler from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileSaverFactory -from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileOpenerFactory from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier from lbrynet.lbry_file.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager from lbrynet.lbry_file.StreamDescriptor import EncryptedFileStreamType @@ -47,7 +46,7 @@ from lbrynet.core.looping_call_manager import LoopingCallManager from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.Error import InsufficientFundsError, UnknownNameError, NoSuchSDHash -from lbrynet.core.Error import NoSuchStreamHash +from lbrynet.core.Error import NoSuchStreamHash, DownloadDataTimeout, DownloadSDTimeout from lbrynet.core.Error import NullFundsError, NegativeFundsError log = logging.getLogger(__name__) @@ -317,7 +316,8 @@ class Daemon(AuthJSONRPCServer): if self.reflector_port is not None: reflector_factory = reflector_server_factory( self.session.peer_manager, - self.session.blob_manager + self.session.blob_manager, + self.stream_info_manager ) try: self.reflector_server_port = reactor.listenTCP(self.reflector_port, @@ -392,6 +392,11 @@ class Daemon(AuthJSONRPCServer): def _already_shutting_down(sig_num, frame): log.info("Already shutting down") + def _stop_streams(self): + """stop pending GetStream downloads""" + for claim_id, stream in self.streams.iteritems(): + stream.cancel(reason="daemon shutdown") + def _shutdown(self): # ignore INT/TERM signals once shutdown has started signal.signal(signal.SIGINT, self._already_shutting_down) @@ -399,6 +404,9 @@ class Daemon(AuthJSONRPCServer): log.info("Closing lbrynet session") log.info("Status at time of shutdown: " + self.startup_status[0]) + + self._stop_streams() + self.looping_call_manager.shutdown() if self.analytics_manager: self.analytics_manager.shutdown() @@ -578,17 +586,8 @@ class Daemon(AuthJSONRPCServer): self.session.wallet, self.download_directory ) - self.sd_identifier.add_stream_downloader_factory( - EncryptedFileStreamType, file_saver_factory) - file_opener_factory = EncryptedFileOpenerFactory( - self.session.peer_finder, - self.session.rate_limiter, - self.session.blob_manager, - self.stream_info_manager, - self.session.wallet - ) - self.sd_identifier.add_stream_downloader_factory( - EncryptedFileStreamType, file_opener_factory) + self.sd_identifier.add_stream_downloader_factory(EncryptedFileStreamType, + file_saver_factory) return defer.succeed(None) def _download_blob(self, blob_hash, rate_manager=None, timeout=None): @@ -608,6 +607,39 @@ class Daemon(AuthJSONRPCServer): timeout = timeout or 30 return download_sd_blob(self.session, blob_hash, rate_manager, timeout) + @defer.inlineCallbacks + def _get_stream_analytics_report(self, claim_dict): + sd_hash = claim_dict.source_hash + try: + stream_hash = yield self.stream_info_manager.get_stream_hash_for_sd_hash(sd_hash) + except Exception: + stream_hash = None + report = { + "sd_hash": sd_hash, + "stream_hash": stream_hash, + } + blobs = {} + try: + sd_host = yield self.session.blob_manager.get_host_downloaded_from(sd_hash) + except Exception: + sd_host = None + report["sd_blob"] = sd_host + if stream_hash: + blob_infos = yield self.stream_info_manager.get_blobs_for_stream(stream_hash) + report["known_blobs"] = len(blob_infos) + else: + blob_infos = [] + report["known_blobs"] = 0 + for blob_hash, blob_num, iv, length in blob_infos: + try: + host = yield self.session.blob_manager.get_host_downloaded_from(blob_hash) + except Exception: + host = None + if host: + blobs[blob_num] = host + report["blobs"] = json.dumps(blobs) + defer.returnValue(report) + @defer.inlineCallbacks def _download_name(self, name, claim_dict, claim_id, timeout=None, file_name=None): """ @@ -615,6 +647,17 @@ class Daemon(AuthJSONRPCServer): If it already exists in the file manager, return the existing lbry file """ + @defer.inlineCallbacks + def _download_finished(download_id, name, claim_dict): + report = yield self._get_stream_analytics_report(claim_dict) + self.analytics_manager.send_download_finished(download_id, name, report, claim_dict) + + @defer.inlineCallbacks + def _download_failed(error, download_id, name, claim_dict): + report = yield self._get_stream_analytics_report(claim_dict) + self.analytics_manager.send_download_errored(error, download_id, name, claim_dict, + report) + if claim_id in self.streams: downloader = self.streams[claim_id] result = yield downloader.finished_deferred @@ -630,17 +673,23 @@ class Daemon(AuthJSONRPCServer): file_name) try: lbry_file, finished_deferred = yield self.streams[claim_id].start(claim_dict, name) - finished_deferred.addCallback( - lambda _: self.analytics_manager.send_download_finished(download_id, - name, - claim_dict)) + finished_deferred.addCallbacks(lambda _: _download_finished(download_id, name, + claim_dict), + lambda e: _download_failed(e, download_id, name, + claim_dict)) + result = yield self._get_lbry_file_dict(lbry_file, full_status=True) - del self.streams[claim_id] except Exception as err: - log.warning('Failed to get %s: %s', name, err) - self.analytics_manager.send_download_errored(download_id, name, claim_dict) - del self.streams[claim_id] + yield _download_failed(err, download_id, name, claim_dict) + if isinstance(err, (DownloadDataTimeout, DownloadSDTimeout)): + log.warning('Failed to get %s (%s)', name, err) + else: + log.error('Failed to get %s (%s)', name, err) + if self.streams[claim_id].downloader: + yield self.streams[claim_id].downloader.stop(err) result = {'error': err.message} + finally: + del self.streams[claim_id] defer.returnValue(result) @defer.inlineCallbacks @@ -2462,27 +2511,24 @@ class Daemon(AuthJSONRPCServer): """ if announce_all: yield self.session.blob_manager.immediate_announce_all_blobs() - elif blob_hash: - blob_hashes = [blob_hash] - yield self.session.blob_manager._immediate_announce(blob_hashes) - elif stream_hash: - blobs = yield self.get_blobs_for_stream_hash(stream_hash) - blobs = [blob for blob in blobs if blob.is_validated()] - blob_hashes = [blob.blob_hash for blob in blobs] - yield self.session.blob_manager._immediate_announce(blob_hashes) - elif sd_hash: - blobs = yield self.get_blobs_for_sd_hash(sd_hash) - blobs = [blob for blob in blobs if blob.is_validated()] - blob_hashes = [blob.blob_hash for blob in blobs] - blob_hashes.append(sd_hash) - yield self.session.blob_manager._immediate_announce(blob_hashes) else: - raise Exception('single argument must be specified') + if blob_hash: + blob_hashes = [blob_hash] + elif stream_hash: + blobs = yield self.get_blobs_for_stream_hash(stream_hash) + blob_hashes = [blob.blob_hash for blob in blobs if blob.get_is_verified()] + elif sd_hash: + blobs = yield self.get_blobs_for_sd_hash(sd_hash) + blob_hashes = [sd_hash] + [blob.blob_hash for blob in blobs if + blob.get_is_verified()] + else: + raise Exception('single argument must be specified') + yield self.session.blob_manager._immediate_announce(blob_hashes) response = yield self._render_response(True) defer.returnValue(response) - # TODO: This command should be deprecated in favor of blob_announce + @AuthJSONRPCServer.deprecated("blob_announce") def jsonrpc_blob_announce_all(self): """ Announce all blobs to the DHT @@ -2493,10 +2539,7 @@ class Daemon(AuthJSONRPCServer): Returns: (str) Success/fail message """ - - d = self.session.blob_manager.immediate_announce_all_blobs() - d.addCallback(lambda _: self._render_response("Announced")) - return d + return self.jsonrpc_blob_announce(announce_all=True) @defer.inlineCallbacks def jsonrpc_file_reflect(self, **kwargs): @@ -2580,9 +2623,9 @@ class Daemon(AuthJSONRPCServer): blobs = self.session.blob_manager.blobs.itervalues() if needed: - blobs = [blob for blob in blobs if not blob.is_validated()] + blobs = [blob for blob in blobs if not blob.get_is_verified()] if finished: - blobs = [blob for blob in blobs if blob.is_validated()] + blobs = [blob for blob in blobs if blob.get_is_verified()] blob_hashes = [blob.blob_hash for blob in blobs] page_size = page_size or len(blob_hashes) diff --git a/lbrynet/daemon/Downloader.py b/lbrynet/daemon/Downloader.py index 488f02886..53a164d58 100644 --- a/lbrynet/daemon/Downloader.py +++ b/lbrynet/daemon/Downloader.py @@ -5,11 +5,11 @@ from twisted.internet.task import LoopingCall from lbryschema.fee import Fee -from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed, DownloadTimeoutError +from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed +from lbrynet.core.Error import DownloadDataTimeout, DownloadCanceledError from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call from lbrynet.core.StreamDescriptor import download_sd_blob from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory -from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet import conf INITIALIZING_CODE = 'initializing' @@ -61,23 +61,22 @@ class GetStream(object): return os.path.join(self.download_directory, self.downloader.file_name) def _check_status(self, status): - stop_condition = (status.num_completed > 0 or - status.running_status == ManagedEncryptedFileDownloader.STATUS_STOPPED) - if stop_condition and not self.data_downloading_deferred.called: + if status.num_completed > 0 and not self.data_downloading_deferred.called: self.data_downloading_deferred.callback(True) if self.data_downloading_deferred.called: safe_stop_looping_call(self.checker) else: - log.info("Downloading stream data (%i seconds)", self.timeout_counter) + log.info("Waiting for stream data (%i seconds)", self.timeout_counter) def check_status(self): """ Check if we've got the first data blob in the stream yet """ self.timeout_counter += 1 - if self.timeout_counter >= self.timeout: + if self.timeout_counter > self.timeout: if not self.data_downloading_deferred.called: - self.data_downloading_deferred.errback(DownloadTimeoutError(self.file_name)) + self.data_downloading_deferred.errback(DownloadDataTimeout(self.sd_hash)) + safe_stop_looping_call(self.checker) else: d = self.downloader.status() @@ -150,6 +149,10 @@ class GetStream(object): self._check_status(status) defer.returnValue(self.download_path) + def fail(self, err): + safe_stop_looping_call(self.checker) + raise err + @defer.inlineCallbacks def _initialize(self, stream_info): # Set sd_hash and return key_fee from stream_info @@ -179,7 +182,7 @@ class GetStream(object): log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path) self.finished_deferred = self.downloader.start() - self.finished_deferred.addCallback(self.finish, name) + self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), self.fail) @defer.inlineCallbacks def start(self, stream_info, name): @@ -204,9 +207,18 @@ class GetStream(object): try: yield self.data_downloading_deferred - except Exception as err: - self.downloader.stop() + except DownloadDataTimeout as err: safe_stop_looping_call(self.checker) - raise + raise err defer.returnValue((self.downloader, self.finished_deferred)) + + def cancel(self, reason=None): + if reason: + msg = "download stream cancelled: %s" % reason + else: + msg = "download stream cancelled" + if self.finished_deferred and not self.finished_deferred.called: + self.finished_deferred.errback(DownloadCanceledError(msg)) + if self.data_downloading_deferred and not self.data_downloading_deferred.called: + self.data_downloading_deferred.errback(DownloadCanceledError(msg)) diff --git a/lbrynet/daemon/ExchangeRateManager.py b/lbrynet/daemon/ExchangeRateManager.py index 998d06d0d..805df2db1 100644 --- a/lbrynet/daemon/ExchangeRateManager.py +++ b/lbrynet/daemon/ExchangeRateManager.py @@ -64,9 +64,8 @@ class MarketFeed(object): self.rate = ExchangeRate(self.market, price, int(time.time())) def _log_error(self, err): - log.warning( - "There was a problem updating %s exchange rate information from %s", - self.market, self.name) + log.warning("There was a problem updating %s exchange rate information from %s\n%s", + self.market, self.name, err) def _update_price(self): d = threads.deferToThread(self._make_request) @@ -141,7 +140,10 @@ class LBRYioBTCFeed(MarketFeed): ) def _handle_response(self, response): - json_response = json.loads(response) + try: + json_response = json.loads(response) + except ValueError: + raise InvalidExchangeRateResponse(self.name, "invalid rate response : %s" % response) if 'data' not in json_response: raise InvalidExchangeRateResponse(self.name, 'result not found') return defer.succeed(1.0 / json_response['data']['btc_usd']) diff --git a/lbrynet/file_manager/EncryptedFileCreator.py b/lbrynet/file_manager/EncryptedFileCreator.py index 99d15e5e4..bf6d3bea7 100644 --- a/lbrynet/file_manager/EncryptedFileCreator.py +++ b/lbrynet/file_manager/EncryptedFileCreator.py @@ -29,8 +29,9 @@ class EncryptedFileStreamCreator(CryptStreamCreator): self.blob_infos = [] def _blob_finished(self, blob_info): - log.debug("length: %s", str(blob_info.length)) + log.debug("length: %s", blob_info.length) self.blob_infos.append(blob_info) + return blob_info def _save_stream_info(self): stream_info_manager = self.lbry_file_manager.stream_info_manager @@ -40,10 +41,6 @@ class EncryptedFileStreamCreator(CryptStreamCreator): self.blob_infos) return d - def setup(self): - d = CryptStreamCreator.setup(self) - return d - def _get_blobs_hashsum(self): blobs_hashsum = get_lbry_hash_obj() for blob_info in sorted(self.blob_infos, key=lambda b_i: b_i.blob_num): diff --git a/lbrynet/file_manager/EncryptedFileManager.py b/lbrynet/file_manager/EncryptedFileManager.py index b623cf88e..9d38548dd 100644 --- a/lbrynet/file_manager/EncryptedFileManager.py +++ b/lbrynet/file_manager/EncryptedFileManager.py @@ -250,7 +250,7 @@ class EncryptedFileManager(object): if self.sql_db: yield self.sql_db.close() self.sql_db = None - log.info("Stopped %s", self) + log.info("Stopped encrypted file manager") defer.returnValue(True) def get_count_for_stream_hash(self, stream_hash): @@ -303,8 +303,10 @@ class EncryptedFileManager(object): @rerun_if_locked def _change_file_status(self, rowid, new_status): - return self.sql_db.runQuery("update lbry_file_options set status = ? where rowid = ?", + d = self.sql_db.runQuery("update lbry_file_options set status = ? where rowid = ?", (new_status, rowid)) + d.addCallback(lambda _: new_status) + return d @rerun_if_locked def _get_lbry_file_status(self, rowid): diff --git a/lbrynet/lbry_file/client/EncryptedFileDownloader.py b/lbrynet/lbry_file/client/EncryptedFileDownloader.py index e18c6f2cf..735c8027e 100644 --- a/lbrynet/lbry_file/client/EncryptedFileDownloader.py +++ b/lbrynet/lbry_file/client/EncryptedFileDownloader.py @@ -1,4 +1,3 @@ -import subprocess import binascii from zope.interface import implements @@ -10,8 +9,7 @@ from lbrynet.core.StreamDescriptor import StreamMetadata from lbrynet.interfaces import IStreamDownloaderFactory from lbrynet.lbry_file.client.EncryptedFileMetadataHandler import EncryptedFileMetadataHandler import os -from twisted.internet import defer, threads, reactor -from twisted.python.procutils import which +from twisted.internet import defer, threads import logging import traceback @@ -282,90 +280,3 @@ class EncryptedFileSaverFactory(EncryptedFileDownloaderFactory): @staticmethod def get_description(): return "Save" - - -class EncryptedFileOpener(EncryptedFileDownloader): - def __init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager, - payment_rate_manager, wallet): - EncryptedFileDownloader.__init__(self, stream_hash, - peer_finder, rate_limiter, - blob_manager, stream_info_manager, - payment_rate_manager, wallet, - ) - self.process = None - self.process_log = None - - def stop(self, err=None): - d = EncryptedFileDownloader.stop(self, err=err) - d.addCallback(lambda _: self._delete_from_info_manager()) - return d - - def _get_progress_manager(self, download_manager): - return FullStreamProgressManager(self._finished_downloading, - self.blob_manager, - download_manager) - - def _setup_output(self): - def start_process(): - if os.name == "nt": - paths = [r'C:\Program Files\VideoLAN\VLC\vlc.exe', - r'C:\Program Files (x86)\VideoLAN\VLC\vlc.exe'] - for p in paths: - if os.path.exists(p): - vlc_path = p - break - else: - raise ValueError("You must install VLC media player to stream files") - else: - vlc_path = 'vlc' - self.process_log = open("vlc.out", 'a') - try: - self.process = subprocess.Popen([vlc_path, '-'], stdin=subprocess.PIPE, - stdout=self.process_log, stderr=self.process_log) - except OSError: - raise ValueError("VLC media player could not be opened") - - d = threads.deferToThread(start_process) - return d - - def _close_output(self): - if self.process is not None: - self.process.stdin.close() - self.process = None - return defer.succeed(True) - - def _get_write_func(self): - def write_func(data): - if self.stopped is False and self.process is not None: - try: - self.process.stdin.write(data) - except IOError: - reactor.callLater(0, self.stop) - return write_func - - def _delete_from_info_manager(self): - return self.stream_info_manager.delete_stream(self.stream_hash) - - -class EncryptedFileOpenerFactory(EncryptedFileDownloaderFactory): - def can_download(self, sd_validator): - if which('vlc'): - return True - elif os.name == "nt": - paths = [r'C:\Program Files\VideoLAN\VLC\vlc.exe', - r'C:\Program Files (x86)\VideoLAN\VLC\vlc.exe'] - for p in paths: - if os.path.exists(p): - return True - return False - - def _make_downloader(self, stream_hash, payment_rate_manager, stream_info): - return EncryptedFileOpener(stream_hash, self.peer_finder, - self.rate_limiter, self.blob_manager, - self.stream_info_manager, - payment_rate_manager, self.wallet, - ) - - @staticmethod - def get_description(): - return "Stream" diff --git a/lbrynet/lbry_file/client/EncryptedFileMetadataHandler.py b/lbrynet/lbry_file/client/EncryptedFileMetadataHandler.py index bd09dfdfc..116ac7080 100644 --- a/lbrynet/lbry_file/client/EncryptedFileMetadataHandler.py +++ b/lbrynet/lbry_file/client/EncryptedFileMetadataHandler.py @@ -1,5 +1,6 @@ import logging from zope.interface import implements +from twisted.internet import defer from lbrynet.cryptstream.CryptBlob import CryptBlobInfo from lbrynet.interfaces import IMetadataHandler @@ -18,10 +19,11 @@ class EncryptedFileMetadataHandler(object): ######### IMetadataHandler ######### + @defer.inlineCallbacks def get_initial_blobs(self): - d = self.stream_info_manager.get_blobs_for_stream(self.stream_hash) - d.addCallback(self._format_initial_blobs_for_download_manager) - return d + blob_infos = yield self.stream_info_manager.get_blobs_for_stream(self.stream_hash) + formatted_infos = self._format_initial_blobs_for_download_manager(blob_infos) + defer.returnValue(formatted_infos) def final_blob_num(self): return self._final_blob_num @@ -30,10 +32,12 @@ class EncryptedFileMetadataHandler(object): def _format_initial_blobs_for_download_manager(self, blob_infos): infos = [] - for blob_hash, blob_num, iv, length in blob_infos: - if blob_hash is not None: + for i, (blob_hash, blob_num, iv, length) in enumerate(blob_infos): + if blob_hash is not None and length: infos.append(CryptBlobInfo(blob_hash, blob_num, length, iv)) else: + if i != len(blob_infos) - 1: + raise Exception("Invalid stream terminator") log.debug("Setting _final_blob_num to %s", str(blob_num - 1)) self._final_blob_num = blob_num - 1 return infos diff --git a/lbrynet/reflector/client/blob.py b/lbrynet/reflector/client/blob.py index 854dc6489..1f1c540a2 100644 --- a/lbrynet/reflector/client/blob.py +++ b/lbrynet/reflector/client/blob.py @@ -132,7 +132,7 @@ class BlobReflectorClient(Protocol): return self.set_not_uploading() def open_blob_for_reading(self, blob): - if blob.is_validated(): + if blob.get_is_verified(): read_handle = blob.open_for_reading() if read_handle is not None: log.debug('Getting ready to send %s', blob.blob_hash) diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index 26882d186..ebf605b02 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -112,11 +112,11 @@ class EncryptedFileReflectorClient(Protocol): def get_validated_blobs(self, blobs_in_stream): def get_blobs(blobs): for (blob, _, _, blob_len) in blobs: - if blob: + if blob and blob_len: yield self.blob_manager.get_blob(blob, blob_len) dl = defer.DeferredList(list(get_blobs(blobs_in_stream)), consumeErrors=True) - dl.addCallback(lambda blobs: [blob for r, blob in blobs if r and blob.is_validated()]) + dl.addCallback(lambda blobs: [blob for r, blob in blobs if r and blob.get_is_verified()]) return dl def set_blobs_to_send(self, blobs_to_send): @@ -253,7 +253,7 @@ class EncryptedFileReflectorClient(Protocol): return self.set_not_uploading() def open_blob_for_reading(self, blob): - if blob.is_validated(): + if blob.get_is_verified(): read_handle = blob.open_for_reading() if read_handle is not None: log.debug('Getting ready to send %s', blob.blob_hash) diff --git a/lbrynet/reflector/server/server.py b/lbrynet/reflector/server/server.py index 8467e5321..a22b93add 100644 --- a/lbrynet/reflector/server/server.py +++ b/lbrynet/reflector/server/server.py @@ -4,7 +4,9 @@ from twisted.python import failure from twisted.internet import error, defer from twisted.internet.protocol import Protocol, ServerFactory from lbrynet.core.utils import is_valid_blobhash -from lbrynet.core.Error import DownloadCanceledError, InvalidBlobHashError +from lbrynet.core.Error import DownloadCanceledError, InvalidBlobHashError, NoSuchSDHash +from lbrynet.core.StreamDescriptor import BlobStreamDescriptorReader +from lbrynet.lbry_file.StreamDescriptor import save_sd_info from lbrynet.reflector.common import REFLECTOR_V1, REFLECTOR_V2 from lbrynet.reflector.common import ReflectorRequestError, ReflectorClientVersionError @@ -30,16 +32,17 @@ class ReflectorServer(Protocol): log.debug('Connection made to %s', peer_info) self.peer = self.factory.peer_manager.get_peer(peer_info.host, peer_info.port) self.blob_manager = self.factory.blob_manager + self.stream_info_manager = self.factory.stream_info_manager self.protocol_version = self.factory.protocol_version self.received_handshake = False self.peer_version = None self.receiving_blob = False self.incoming_blob = None - self.blob_write = None self.blob_finished_d = None - self.cancel_write = None self.request_buff = "" + self.blob_writer = None + def connectionLost(self, reason=failure.Failure(error.ConnectionDone())): log.info("Reflector upload from %s finished" % self.peer.host) @@ -61,10 +64,74 @@ class ReflectorServer(Protocol): else: log.exception(err) + @defer.inlineCallbacks + def check_head_blob_announce(self, stream_hash): + blob_infos = yield self.stream_info_manager.get_blobs_for_stream(stream_hash) + blob_hash, blob_num, blob_iv, blob_length = blob_infos[0] + if blob_hash in self.blob_manager.blobs: + head_blob = self.blob_manager.blobs[blob_hash] + if head_blob.get_is_verified(): + should_announce = yield self.blob_manager.get_should_announce(blob_hash) + if should_announce == 0: + yield self.blob_manager.set_should_announce(blob_hash, 1) + log.info("Discovered previously completed head blob (%s), " + "setting it to be announced", blob_hash[:8]) + defer.returnValue(None) + + @defer.inlineCallbacks + def check_sd_blob_announce(self, sd_hash): + if sd_hash in self.blob_manager.blobs: + sd_blob = self.blob_manager.blobs[sd_hash] + if sd_blob.get_is_verified(): + should_announce = yield self.blob_manager.get_should_announce(sd_hash) + if should_announce == 0: + yield self.blob_manager.set_should_announce(sd_hash, 1) + log.info("Discovered previously completed sd blob (%s), " + "setting it to be announced", sd_hash[:8]) + try: + yield self.stream_info_manager.get_stream_hash_for_sd_hash(sd_hash) + except NoSuchSDHash: + log.info("Adding blobs to stream") + sd_info = yield BlobStreamDescriptorReader(sd_blob).get_info() + yield save_sd_info(self.stream_info_manager, sd_info) + yield self.stream_info_manager.save_sd_blob_hash_to_stream( + sd_info['stream_hash'], + sd_hash) + defer.returnValue(None) + @defer.inlineCallbacks def _on_completed_blob(self, blob, response_key): - yield self.blob_manager.blob_completed(blob) + should_announce = False + if response_key == RECEIVED_SD_BLOB: + sd_info = yield BlobStreamDescriptorReader(blob).get_info() + yield save_sd_info(self.stream_info_manager, sd_info) + yield self.stream_info_manager.save_sd_blob_hash_to_stream(sd_info['stream_hash'], + blob.blob_hash) + should_announce = True + + # if we already have the head blob, set it to be announced now that we know it's + # a head blob + d = self.check_head_blob_announce(sd_info['stream_hash']) + + else: + d = defer.succeed(None) + stream_hash = yield self.stream_info_manager.get_stream_of_blob(blob.blob_hash) + if stream_hash is not None: + blob_num = yield self.stream_info_manager._get_blob_num_by_hash(stream_hash, + blob.blob_hash) + if blob_num == 0: + should_announce = True + sd_hashes = yield self.stream_info_manager.get_sd_blob_hashes_for_stream( + stream_hash) + + # if we already have the sd blob, set it to be announced now that we know it's + # a sd blob + for sd_hash in sd_hashes: + d.addCallback(lambda _: self.check_sd_blob_announce(sd_hash)) + + yield self.blob_manager.blob_completed(blob, should_announce=should_announce) yield self.close_blob() + yield d log.info("Received %s", blob) yield self.send_response({response_key: True}) @@ -82,14 +149,14 @@ class ReflectorServer(Protocol): """ blob = self.incoming_blob - self.blob_finished_d, self.blob_write, self.cancel_write = blob.open_for_writing(self.peer) + self.blob_writer, self.blob_finished_d = blob.open_for_writing(self.peer) self.blob_finished_d.addCallback(self._on_completed_blob, response_key) self.blob_finished_d.addErrback(self._on_failed_blob, response_key) def close_blob(self): + self.blob_writer.close() + self.blob_writer = None self.blob_finished_d = None - self.blob_write = None - self.cancel_write = None self.incoming_blob = None self.receiving_blob = False @@ -99,7 +166,7 @@ class ReflectorServer(Protocol): def dataReceived(self, data): if self.receiving_blob: - self.blob_write(data) + self.blob_writer.write(data) else: log.debug('Not yet recieving blob, data needs further processing') self.request_buff += data @@ -110,7 +177,7 @@ class ReflectorServer(Protocol): d.addErrback(self.handle_error) if self.receiving_blob and extra_data: log.debug('Writing extra data to blob') - self.blob_write(extra_data) + self.blob_writer.write(extra_data) def _get_valid_response(self, response_msg): extra_data = None @@ -221,7 +288,7 @@ class ReflectorServer(Protocol): sd_blob_hash = request_dict[SD_BLOB_HASH] sd_blob_size = request_dict[SD_BLOB_SIZE] - if self.blob_write is None: + if self.blob_writer is None: d = self.blob_manager.get_blob(sd_blob_hash, sd_blob_size) d.addCallback(self.get_descriptor_response) d.addCallback(self.send_response) @@ -230,16 +297,29 @@ class ReflectorServer(Protocol): d = self.blob_finished_d return d + @defer.inlineCallbacks def get_descriptor_response(self, sd_blob): - if sd_blob.is_validated(): - d = defer.succeed({SEND_SD_BLOB: False}) - d.addCallback(self.request_needed_blobs, sd_blob) + if sd_blob.get_is_verified(): + # if we already have the sd blob being offered, make sure we have it and the head blob + # marked as such for announcement now that we know it's an sd blob that we have. + yield self.check_sd_blob_announce(sd_blob.blob_hash) + try: + stream_hash = yield self.stream_info_manager.get_stream_hash_for_sd_hash( + sd_blob.blob_hash) + except NoSuchSDHash: + sd_info = yield BlobStreamDescriptorReader(sd_blob).get_info() + stream_hash = sd_info['stream_hash'] + yield save_sd_info(self.stream_info_manager, sd_info) + yield self.stream_info_manager.save_sd_blob_hash_to_stream(stream_hash, + sd_blob.blob_hash) + yield self.check_head_blob_announce(stream_hash) + response = yield self.request_needed_blobs({SEND_SD_BLOB: False}, sd_blob) else: self.incoming_blob = sd_blob self.receiving_blob = True self.handle_incoming_blob(RECEIVED_SD_BLOB) - d = defer.succeed({SEND_SD_BLOB: True}) - return d + response = {SEND_SD_BLOB: True} + defer.returnValue(response) def request_needed_blobs(self, response, sd_blob): def _add_needed_blobs_to_response(needed_blobs): @@ -267,7 +347,7 @@ class ReflectorServer(Protocol): if 'blob_hash' in blob and 'length' in blob: blob_hash, blob_len = blob['blob_hash'], blob['length'] d = self.blob_manager.get_blob(blob_hash, blob_len) - d.addCallback(lambda blob: blob_hash if not blob.is_validated() else None) + d.addCallback(lambda blob: blob_hash if not blob.get_is_verified() else None) yield d def handle_blob_request(self, request_dict): @@ -293,7 +373,7 @@ class ReflectorServer(Protocol): blob_hash = request_dict[BLOB_HASH] blob_size = request_dict[BLOB_SIZE] - if self.blob_write is None: + if self.blob_writer is None: log.debug('Received info for blob: %s', blob_hash[:16]) d = self.blob_manager.get_blob(blob_hash, blob_size) d.addCallback(self.get_blob_response) @@ -305,7 +385,7 @@ class ReflectorServer(Protocol): return d def get_blob_response(self, blob): - if blob.is_validated(): + if blob.get_is_verified(): return defer.succeed({SEND_BLOB: False}) else: self.incoming_blob = blob @@ -318,9 +398,10 @@ class ReflectorServer(Protocol): class ReflectorServerFactory(ServerFactory): protocol = ReflectorServer - def __init__(self, peer_manager, blob_manager): + def __init__(self, peer_manager, blob_manager, stream_info_manager): self.peer_manager = peer_manager self.blob_manager = blob_manager + self.stream_info_manager = stream_info_manager self.protocol_version = REFLECTOR_V2 def buildProtocol(self, addr): diff --git a/requirements.txt b/requirements.txt index a3a53274c..2d5a0dcf2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ Twisted==16.6.0 +cryptography==2.0.3 appdirs==1.4.3 argparse==1.2.1 docopt==0.6.2 diff --git a/scripts/decrypt_blob.py b/scripts/decrypt_blob.py index bc905bf2e..4f5c8b8e9 100644 --- a/scripts/decrypt_blob.py +++ b/scripts/decrypt_blob.py @@ -10,7 +10,7 @@ from twisted.internet import reactor from lbrynet import conf from lbrynet.cryptstream import CryptBlob -from lbrynet.core import HashBlob +from lbrynet.blob import BlobFile from lbrynet.core import log_support @@ -46,7 +46,7 @@ def decrypt_blob(blob_file, key, iv, output): filename = os.path.abspath(blob_file) length = os.path.getsize(filename) directory, blob_hash = os.path.split(filename) - blob = HashBlob.BlobFile(directory, blob_hash, True, length) + blob = BlobFile(directory, blob_hash, length) decryptor = CryptBlob.StreamBlobDecryptor( blob, binascii.unhexlify(key), binascii.unhexlify(iv), length) with open(output, 'w') as f: diff --git a/scripts/download_blob_from_peer.py b/scripts/download_blob_from_peer.py index fffc44c9a..43a510328 100644 --- a/scripts/download_blob_from_peer.py +++ b/scripts/download_blob_from_peer.py @@ -2,7 +2,7 @@ import argparse import logging import sys -import tempfile +import os from twisted.internet import defer from twisted.internet import reactor @@ -13,7 +13,7 @@ from lbrynet import conf from lbrynet.core import log_support from lbrynet.core import BlobManager from lbrynet.core import HashAnnouncer -from lbrynet.core import HashBlob +from lbrynet.blob import BlobFile from lbrynet.core import RateLimiter from lbrynet.core import Peer from lbrynet.core import Wallet @@ -31,13 +31,14 @@ def main(args=None): parser.add_argument('--timeout', type=int, default=30) parser.add_argument('peer') parser.add_argument('blob_hash') + parser.add_argument('directory', type=str, default=os.getcwd()) args = parser.parse_args(args) log_support.configure_console(level='DEBUG') announcer = HashAnnouncer.DummyHashAnnouncer() blob_manager = MyBlobManager(announcer) - blob = HashBlob.TempBlob(args.blob_hash, False) + blob = BlobFile(args.directory, args.blob_hash) download_manager = SingleBlobDownloadManager(blob) peer = Peer.Peer(*conf.server_port(args.peer)) payment_rate_manager = DumbPaymentRateManager() diff --git a/scripts/encrypt_blob.py b/scripts/encrypt_blob.py index 440ea7a8d..3d3552f48 100644 --- a/scripts/encrypt_blob.py +++ b/scripts/encrypt_blob.py @@ -1,17 +1,18 @@ """Encrypt a single file using the given key and iv""" import argparse -import binascii import logging -import StringIO import sys from twisted.internet import defer from twisted.internet import reactor +from twisted.protocols import basic +from twisted.web.client import FileBodyProducer from lbrynet import conf -from lbrynet.cryptstream import CryptBlob from lbrynet.core import log_support -from lbrynet.core import cryptoutils +from lbrynet.core.HashAnnouncer import DummyHashAnnouncer +from lbrynet.core.BlobManager import DiskBlobManager +from lbrynet.cryptstream.CryptStreamCreator import CryptStreamCreator log = logging.getLogger('decrypt_blob') @@ -26,7 +27,7 @@ def main(): args = parser.parse_args() log_support.configure_console(level='DEBUG') - d = run(args) + run(args) reactor.run() @@ -40,29 +41,23 @@ def run(args): reactor.callLater(0, reactor.stop) +@defer.inlineCallbacks def encrypt_blob(filename, key, iv): - blob = Blob() - blob_maker = CryptBlob.CryptStreamBlobMaker( - binascii.unhexlify(key), binascii.unhexlify(iv), 0, blob) - with open(filename) as fin: - blob_maker.write(fin.read()) - blob_maker.close() + dummy_announcer = DummyHashAnnouncer() + manager = DiskBlobManager(dummy_announcer, '.', '.') + yield manager.setup() + creator = CryptStreamCreator(manager, filename, key, iv_generator(iv)) + with open(filename, 'r') as infile: + producer = FileBodyProducer(infile, readSize=2**22) + yield producer.startProducing(creator) + yield creator.stop() -class Blob(object): - def __init__(self): - self.data = StringIO.StringIO() - - def write(self, data): - self.data.write(data) - - def close(self): - hashsum = cryptoutils.get_lbry_hash_obj() - buffer = self.data.getvalue() - hashsum.update(buffer) - with open(hashsum.hexdigest(), 'w') as fout: - fout.write(buffer) - return defer.succeed(True) +def iv_generator(iv): + iv = int(iv, 16) + while 1: + iv += 1 + yield ("%016d" % iv)[-16:] if __name__ == '__main__': diff --git a/scripts/query_available_blobs.py b/scripts/query_available_blobs.py index 727c019c8..39e1f406f 100644 --- a/scripts/query_available_blobs.py +++ b/scripts/query_available_blobs.py @@ -59,7 +59,7 @@ def main(args=None): use_upnp=False, wallet=wallet ) - api = analytics.Api.new_instance() + api = analytics.Api.new_instance(conf.settings['share_usage_data']) run(args, session, api) reactor.run() finally: diff --git a/tests/functional/test_reflector.py b/tests/functional/test_reflector.py index 9eb638fa0..f0f8a76eb 100644 --- a/tests/functional/test_reflector.py +++ b/tests/functional/test_reflector.py @@ -81,17 +81,21 @@ class TestReflector(unittest.TestCase): self.server_db_dir, self.server_blob_dir = mk_db_and_blob_dir() self.server_blob_manager = BlobManager.DiskBlobManager( hash_announcer, self.server_blob_dir, self.server_db_dir) + self.server_stream_info_manager = EncryptedFileMetadataManager.DBEncryptedFileMetadataManager(self.server_db_dir) + d = self.session.setup() d.addCallback(lambda _: self.stream_info_manager.setup()) d.addCallback(lambda _: EncryptedFileOptions.add_lbry_file_to_sd_identifier(sd_identifier)) d.addCallback(lambda _: self.lbry_file_manager.setup()) d.addCallback(lambda _: self.server_blob_manager.setup()) + d.addCallback(lambda _: self.server_stream_info_manager.setup()) def verify_equal(sd_info): self.assertEqual(mocks.create_stream_sd_file, sd_info) def save_sd_blob_hash(sd_hash): + self.sd_hash = sd_hash self.expected_blobs.append((sd_hash, 923)) def verify_stream_descriptor_file(stream_hash): @@ -120,7 +124,7 @@ class TestReflector(unittest.TestCase): return d def start_server(): - server_factory = reflector.ServerFactory(peer_manager, self.server_blob_manager) + server_factory = reflector.ServerFactory(peer_manager, self.server_blob_manager, self.server_stream_info_manager) from twisted.internet import reactor port = 8943 while self.reflector_port is None: @@ -160,12 +164,34 @@ class TestReflector(unittest.TestCase): return d def test_stream_reflector(self): - def verify_data_on_reflector(): + def verify_blob_on_reflector(): check_blob_ds = [] for blob_hash, blob_size in self.expected_blobs: check_blob_ds.append(verify_have_blob(blob_hash, blob_size)) return defer.DeferredList(check_blob_ds) + @defer.inlineCallbacks + def verify_stream_on_reflector(): + # check stream_info_manager has all the right information + streams = yield self.server_stream_info_manager.get_all_streams() + self.assertEqual(1, len(streams)) + self.assertEqual(self.stream_hash, streams[0]) + + blobs = yield self.server_stream_info_manager.get_blobs_for_stream(self.stream_hash) + blob_hashes = [b[0] for b in blobs if b[0] is not None] + expected_blob_hashes = [b[0] for b in self.expected_blobs[:-1] if b[0] is not None] + self.assertEqual(expected_blob_hashes, blob_hashes) + sd_hashes = yield self.server_stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash) + self.assertEqual(1, len(sd_hashes)) + expected_sd_hash = self.expected_blobs[-1][0] + self.assertEqual(self.sd_hash, sd_hashes[0]) + + # check should_announce blobs on blob_manager + blob_hashes = yield self.server_blob_manager._get_all_should_announce_blob_hashes() + self.assertEqual(2, len(blob_hashes)) + self.assertTrue(self.sd_hash in blob_hashes) + self.assertTrue(expected_blob_hashes[0] in blob_hashes) + def verify_have_blob(blob_hash, blob_size): d = self.server_blob_manager.get_blob(blob_hash) d.addCallback(lambda blob: verify_blob_completed(blob, blob_size)) @@ -182,12 +208,13 @@ class TestReflector(unittest.TestCase): return factory.finished_deferred def verify_blob_completed(blob, blob_size): - self.assertTrue(blob.is_validated()) + self.assertTrue(blob.get_is_verified()) self.assertEqual(blob_size, blob.length) return d = send_to_server() - d.addCallback(lambda _: verify_data_on_reflector()) + d.addCallback(lambda _: verify_blob_on_reflector()) + d.addCallback(lambda _: verify_stream_on_reflector()) return d def test_blob_reflector(self): @@ -213,7 +240,7 @@ class TestReflector(unittest.TestCase): return factory.finished_deferred def verify_blob_completed(blob, blob_size): - self.assertTrue(blob.is_validated()) + self.assertTrue(blob.get_is_verified()) self.assertEqual(blob_size, blob.length) d = send_to_server([x[0] for x in self.expected_blobs]) @@ -221,6 +248,15 @@ class TestReflector(unittest.TestCase): return d def test_blob_reflector_v1(self): + @defer.inlineCallbacks + def verify_stream_on_reflector(): + # this protocol should not have any impact on stream info manager + streams = yield self.server_stream_info_manager.get_all_streams() + self.assertEqual(0, len(streams)) + # there should be no should announce blobs here + blob_hashes = yield self.server_blob_manager._get_all_should_announce_blob_hashes() + self.assertEqual(0, len(blob_hashes)) + def verify_data_on_reflector(): check_blob_ds = [] for blob_hash, blob_size in self.expected_blobs: @@ -244,13 +280,85 @@ class TestReflector(unittest.TestCase): return factory.finished_deferred def verify_blob_completed(blob, blob_size): - self.assertTrue(blob.is_validated()) + self.assertTrue(blob.get_is_verified()) self.assertEqual(blob_size, blob.length) d = send_to_server([x[0] for x in self.expected_blobs]) d.addCallback(lambda _: verify_data_on_reflector()) return d + # test case when we reflect blob, and than that same blob + # is reflected as stream + def test_blob_reflect_and_stream(self): + + def verify_blob_on_reflector(): + check_blob_ds = [] + for blob_hash, blob_size in self.expected_blobs: + check_blob_ds.append(verify_have_blob(blob_hash, blob_size)) + return defer.DeferredList(check_blob_ds) + + @defer.inlineCallbacks + def verify_stream_on_reflector(): + # check stream_info_manager has all the right information + + streams = yield self.server_stream_info_manager.get_all_streams() + self.assertEqual(1, len(streams)) + self.assertEqual(self.stream_hash, streams[0]) + + blobs = yield self.server_stream_info_manager.get_blobs_for_stream(self.stream_hash) + blob_hashes = [b[0] for b in blobs if b[0] is not None] + expected_blob_hashes = [b[0] for b in self.expected_blobs[:-1] if b[0] is not None] + self.assertEqual(expected_blob_hashes, blob_hashes) + sd_hashes = yield self.server_stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash) + self.assertEqual(1, len(sd_hashes)) + expected_sd_hash = self.expected_blobs[-1][0] + self.assertEqual(self.sd_hash, sd_hashes[0]) + + # check should_announce blobs on blob_manager + blob_hashes = yield self.server_blob_manager._get_all_should_announce_blob_hashes() + self.assertEqual(2, len(blob_hashes)) + self.assertTrue(self.sd_hash in blob_hashes) + self.assertTrue(expected_blob_hashes[0] in blob_hashes) + + def verify_have_blob(blob_hash, blob_size): + d = self.server_blob_manager.get_blob(blob_hash) + d.addCallback(lambda blob: verify_blob_completed(blob, blob_size)) + return d + + def send_to_server_as_blobs(blob_hashes_to_send): + factory = reflector.BlobClientFactory( + self.session.blob_manager, + blob_hashes_to_send + ) + factory.protocol_version = 0 + + from twisted.internet import reactor + reactor.connectTCP('localhost', self.port, factory) + return factory.finished_deferred + + def send_to_server_as_stream(result): + fake_lbry_file = mocks.FakeLBRYFile(self.session.blob_manager, + self.stream_info_manager, + self.stream_hash) + factory = reflector.ClientFactory(fake_lbry_file) + + from twisted.internet import reactor + reactor.connectTCP('localhost', self.port, factory) + return factory.finished_deferred + + + def verify_blob_completed(blob, blob_size): + self.assertTrue(blob.get_is_verified()) + self.assertEqual(blob_size, blob.length) + + # Modify this to change which blobs to send + blobs_to_send = self.expected_blobs + + d = send_to_server_as_blobs([x[0] for x in self.expected_blobs]) + d.addCallback(send_to_server_as_stream) + d.addCallback(lambda _: verify_blob_on_reflector()) + d.addCallback(lambda _: verify_stream_on_reflector()) + return d def iv_generator(): iv = 0 diff --git a/tests/unit/core/server/test_BlobRequestHandler.py b/tests/unit/core/server/test_BlobRequestHandler.py index af2197d0b..32103e374 100644 --- a/tests/unit/core/server/test_BlobRequestHandler.py +++ b/tests/unit/core/server/test_BlobRequestHandler.py @@ -53,7 +53,7 @@ class TestBlobRequestHandlerQueries(unittest.TestCase): def test_blob_unavailable_when_blob_not_validated(self): blob = mock.Mock() - blob.is_validated.return_value = False + blob.get_is_verified.return_value = False self.blob_manager.get_blob.return_value = defer.succeed(blob) query = { 'blob_data_payment_rate': 1.0, @@ -68,7 +68,7 @@ class TestBlobRequestHandlerQueries(unittest.TestCase): def test_blob_unavailable_when_blob_cannot_be_opened(self): blob = mock.Mock() - blob.is_validated.return_value = True + blob.get_is_verified.return_value = True blob.open_for_reading.return_value = None self.blob_manager.get_blob.return_value = defer.succeed(blob) query = { @@ -84,7 +84,7 @@ class TestBlobRequestHandlerQueries(unittest.TestCase): def test_blob_details_are_set_when_all_conditions_are_met(self): blob = mock.Mock() - blob.is_validated.return_value = True + blob.get_is_verified.return_value = True blob.open_for_reading.return_value = True blob.blob_hash = 'DEADBEEF' blob.length = 42 diff --git a/tests/unit/core/test_BlobManager.py b/tests/unit/core/test_BlobManager.py index 1b7271dc2..ddeee7c0c 100644 --- a/tests/unit/core/test_BlobManager.py +++ b/tests/unit/core/test_BlobManager.py @@ -47,13 +47,13 @@ class BlobManagerTest(unittest.TestCase): yield self.bm.setup() blob = yield self.bm.get_blob(blob_hash,len(data)) - finished_d, write, cancel =yield blob.open_for_writing(self.peer) - yield write(data) + writer, finished_d = yield blob.open_for_writing(self.peer) + yield writer.write(data) yield self.bm.blob_completed(blob) - yield self.bm.add_blob_to_upload_history(blob_hash,'test',len(data)) + yield self.bm.add_blob_to_upload_history(blob_hash, 'test', len(data)) # check to see if blob is there - self.assertTrue(os.path.isfile(os.path.join(self.blob_dir,blob_hash))) + self.assertTrue(os.path.isfile(os.path.join(self.blob_dir, blob_hash))) blobs = yield self.bm.get_all_verified_blobs() self.assertTrue(blob_hash in blobs) defer.returnValue(blob_hash) @@ -105,7 +105,7 @@ class BlobManagerTest(unittest.TestCase): # open the last blob blob = yield self.bm.get_blob(blob_hashes[-1]) - finished_d, write, cancel = yield blob.open_for_writing(self.peer) + writer, finished_d = yield blob.open_for_writing(self.peer) # delete the last blob and check if it still exists out = yield self.bm.delete_blobs([blob_hash]) @@ -114,4 +114,3 @@ class BlobManagerTest(unittest.TestCase): self.assertTrue(blob_hashes[-1] in blobs) self.assertTrue(os.path.isfile(os.path.join(self.blob_dir,blob_hashes[-1]))) - blob._close_writer(blob.writers[self.peer][0]) diff --git a/tests/unit/core/test_HashBlob.py b/tests/unit/core/test_HashBlob.py new file mode 100644 index 000000000..d1c282478 --- /dev/null +++ b/tests/unit/core/test_HashBlob.py @@ -0,0 +1,127 @@ +from lbrynet.blob import BlobFile +from lbrynet.core.Error import DownloadCanceledError, InvalidDataError + + +from tests.util import mk_db_and_blob_dir, rm_db_and_blob_dir, random_lbry_hash +from twisted.trial import unittest +from twisted.internet import defer +import os +import time + + +class BlobFileTest(unittest.TestCase): + def setUp(self): + self.db_dir, self.blob_dir = mk_db_and_blob_dir() + self.fake_content_len = 64 + self.fake_content = bytearray('0'*self.fake_content_len) + self.fake_content_hash = '53871b26a08e90cb62142f2a39f0b80de41792322b0ca5602b6eb7b5cf067c49498a7492bb9364bbf90f40c1c5412105' + + def tearDown(self): + rm_db_and_blob_dir(self.db_dir, self.blob_dir) + + @defer.inlineCallbacks + def test_good_write_and_read(self): + # test a write that should succeed + blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) + self.assertFalse(blob_file.verified) + + writer, finished_d = blob_file.open_for_writing(peer=1) + writer.write(self.fake_content) + writer.close() + out = yield finished_d + self.assertTrue(isinstance(out, BlobFile)) + self.assertTrue(out.verified) + self.assertEqual(self.fake_content_len, out.get_length()) + + # read from the instance used to write to, and verify content + f = blob_file.open_for_reading() + c = f.read() + self.assertEqual(c, self.fake_content) + self.assertFalse(out.is_downloading()) + + # read from newly declared instance, and verify content + del blob_file + blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) + self.assertTrue(blob_file.verified) + f = blob_file.open_for_reading() + c = f.read() + self.assertEqual(c, self.fake_content) + + @defer.inlineCallbacks + def test_delete(self): + blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) + writer, finished_d = blob_file.open_for_writing(peer=1) + writer.write(self.fake_content) + out = yield finished_d + out = yield blob_file.delete() + + blob_file = BlobFile(self.blob_dir, self.fake_content_hash) + self.assertFalse(blob_file.verified) + + @defer.inlineCallbacks + def test_too_much_write(self): + # writing too much data should result in failure + expected_length= 16 + content = bytearray('0'*32) + blob_hash = random_lbry_hash() + blob_file = BlobFile(self.blob_dir, blob_hash, expected_length) + writer, finished_d = blob_file.open_for_writing(peer=1) + writer.write(content) + out = yield self.assertFailure(finished_d, InvalidDataError) + + @defer.inlineCallbacks + def test_bad_hash(self): + # test a write that should fail because its content's hash + # does not equal the blob_hash + length= 64 + content = bytearray('0'*length) + blob_hash = random_lbry_hash() + blob_file = BlobFile(self.blob_dir, blob_hash, length) + writer, finished_d = blob_file.open_for_writing(peer=1) + writer.write(content) + yield self.assertFailure(finished_d, InvalidDataError) + + @defer.inlineCallbacks + def test_close_on_incomplete_write(self): + # write all but 1 byte of data, + blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) + writer, finished_d = blob_file.open_for_writing(peer=1) + writer.write(self.fake_content[:self.fake_content_len-1]) + writer.close() + yield self.assertFailure(finished_d, DownloadCanceledError) + + # writes after close will throw a IOError exception + with self.assertRaises(IOError): + writer.write(self.fake_content) + + # another call to close will do nothing + writer.close() + + # file should not exist, since we did not finish write + blob_file_2 = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) + out = blob_file_2.open_for_reading() + self.assertEqual(None, out) + + @defer.inlineCallbacks + def test_multiple_writers(self): + # start first writer and write half way, and then start second writer and write everything + blob_hash = self.fake_content_hash + blob_file = BlobFile(self.blob_dir, blob_hash, self.fake_content_len) + writer_1, finished_d_1 = blob_file.open_for_writing(peer=1) + writer_1.write(self.fake_content[:self.fake_content_len/2]) + + writer_2, finished_d_2 = blob_file.open_for_writing(peer=2) + writer_2.write(self.fake_content) + out_2 = yield finished_d_2 + out_1 = yield self.assertFailure(finished_d_1, DownloadCanceledError) + + self.assertTrue(isinstance(out_2, BlobFile)) + self.assertTrue(out_2.verified) + self.assertEqual(self.fake_content_len, out_2.get_length()) + + f = blob_file.open_for_reading() + c = f.read() + self.assertEqual(self.fake_content_len, len(c)) + self.assertEqual(bytearray(c), self.fake_content) + + diff --git a/tests/unit/cryptstream/test_cryptblob.py b/tests/unit/cryptstream/test_cryptblob.py index cb1110d77..624f1a747 100644 --- a/tests/unit/cryptstream/test_cryptblob.py +++ b/tests/unit/cryptstream/test_cryptblob.py @@ -1,7 +1,6 @@ from twisted.trial import unittest from twisted.internet import defer from lbrynet.cryptstream import CryptBlob -from lbrynet.core.HashBlob import TempBlobCreator from lbrynet import conf from tests.mocks import mock_conf_settings diff --git a/tests/unit/lbrynet_daemon/test_Downloader.py b/tests/unit/lbrynet_daemon/test_Downloader.py index df3e4315c..231e277f4 100644 --- a/tests/unit/lbrynet_daemon/test_Downloader.py +++ b/tests/unit/lbrynet_daemon/test_Downloader.py @@ -1,26 +1,22 @@ import types import mock -import json from twisted.trial import unittest from twisted.internet import defer, task -from lbryschema.claim import ClaimDict from lbrynet.core import Session, PaymentRateManager, Wallet -from lbrynet.core.Error import DownloadTimeoutError +from lbrynet.core.Error import DownloadDataTimeout, DownloadSDTimeout from lbrynet.daemon import Downloader -from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier,StreamMetadata -from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier +from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier from lbrynet.file_manager.EncryptedFileStatusReport import EncryptedFileStatusReport -from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader, ManagedEncryptedFileDownloaderFactory +from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.daemon.ExchangeRateManager import ExchangeRateManager -from tests.mocks import BlobAvailabilityTracker as DummyBlobAvailabilityTracker from tests.mocks import ExchangeRateManager as DummyExchangeRateManager -from tests.mocks import BTCLBCFeed, USDBTCFeed from tests.mocks import mock_conf_settings + class MocDownloader(object): def __init__(self): self.finish_deferred = defer.Deferred(None) @@ -106,7 +102,7 @@ class GetStreamTests(unittest.TestCase): DownloadTimeoutError is raised """ def download_sd_blob(self): - raise DownloadTimeoutError(self.file_name) + raise DownloadSDTimeout(self.file_name) getstream = self.init_getstream_with_mocs() getstream._initialize = types.MethodType(moc_initialize, getstream) @@ -115,15 +111,14 @@ class GetStreamTests(unittest.TestCase): getstream.pay_key_fee = types.MethodType(moc_pay_key_fee, getstream) name='test' stream_info = None - with self.assertRaises(DownloadTimeoutError): + with self.assertRaises(DownloadSDTimeout): yield getstream.start(stream_info,name) self.assertFalse(getstream.pay_key_fee_called) - @defer.inlineCallbacks def test_timeout(self): """ - test that timeout (set to 2 here) exception is raised + test that timeout (set to 3 here) exception is raised when download times out while downloading first blob, and key fee is paid """ getstream = self.init_getstream_with_mocs() @@ -136,9 +131,9 @@ class GetStreamTests(unittest.TestCase): start = getstream.start(stream_info,name) self.clock.advance(1) self.clock.advance(1) - with self.assertRaises(DownloadTimeoutError): + self.clock.advance(1) + with self.assertRaises(DownloadDataTimeout): yield start - self.assertTrue(getstream.downloader.stop_called) self.assertTrue(getstream.pay_key_fee_called) @defer.inlineCallbacks @@ -163,21 +158,20 @@ class GetStreamTests(unittest.TestCase): downloader, f_deferred = yield start self.assertTrue(getstream.pay_key_fee_called) - - @defer.inlineCallbacks - def test_finish_stopped_downloader(self): - """ - test that if we have a stopped downloader, beforfe a blob is downloaded, - start() returns - """ - getstream = self.init_getstream_with_mocs() - getstream._initialize = types.MethodType(moc_initialize, getstream) - getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream) - getstream._download = types.MethodType(moc_download, getstream) - name='test' - stream_info = None - start = getstream.start(stream_info,name) - - getstream.downloader.running_status = ManagedEncryptedFileDownloader.STATUS_STOPPED - self.clock.advance(1) - downloader, f_deferred = yield start + # @defer.inlineCallbacks + # def test_finish_stopped_downloader(self): + # """ + # test that if we have a stopped downloader, beforfe a blob is downloaded, + # start() returns + # """ + # getstream = self.init_getstream_with_mocs() + # getstream._initialize = types.MethodType(moc_initialize, getstream) + # getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream) + # getstream._download = types.MethodType(moc_download, getstream) + # name='test' + # stream_info = None + # start = getstream.start(stream_info,name) + # + # getstream.downloader.running_status = ManagedEncryptedFileDownloader.STATUS_STOPPED + # self.clock.advance(1) + # downloader, f_deferred = yield start