Merge branch 'download-errors' into patch-0.16.2

# Conflicts:
#	lbrynet/__init__.py
This commit is contained in:
Jack Robison 2017-09-28 16:16:29 -04:00
commit fbe650c7cf
No known key found for this signature in database
GPG key ID: 284699E7404E3CFF
43 changed files with 1227 additions and 915 deletions

View file

@ -12,25 +12,39 @@ at anytime.
*
*
### Fixed
*
### Added
* Added ability for reflector to store stream information for head blob announce
*
### Fixed
* Fixed handling cancelled blob and availability requests
* Fixed redundant blob requests to a peer
* Fixed blob download history
### Deprecated
*
* Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead.
*
### Changed
*
*
* Announcing by head blob is turned on by default
* Updated reflector server dns
* Improved download analytics
* Improved download errors by distinguishing a data timeout from a sd timeout
### Added
*
*
* Added WAL pragma to sqlite3
* Added unit tests for `BlobFile`
* Updated exchange rate tests for the lbry.io api
* Use `hashlib` for sha384 instead of `pycrypto`
* Use `cryptography` instead of `pycrypto` for blob encryption and decryption
* Use `cryptography` for PKCS7 instead of doing it manually
* Use `BytesIO` buffers instead of temp files when processing blobs
* Refactored and pruned blob related classes into `lbrynet.blobs`
* Changed several `assert`s to raise more useful errors
### Removed
*
*
* Removed `TempBlobFile`
* Removed unused `EncryptedFileOpener`
## [0.16.2] - 2017-09-26

3
CONTRIBUTING.md Normal file
View file

@ -0,0 +1,3 @@
## Contributing to LBRY
https://lbry.io/faq/contributing

View file

@ -22,18 +22,6 @@ Returns:
(bool) true if successful
```
## blob_announce_all
```text
Announce all blobs to the DHT
Usage:
blob_announce_all
Returns:
(str) Success/fail message
```
## blob_delete
```text
@ -239,9 +227,9 @@ Returns:
'depth': (int) claim depth,
'has_signature': (bool) included if decoded_claim
'name': (str) claim name,
'supports: (list) list of supports [{'txid': txid,
'nout': nout,
'amount': amount}],
'supports: (list) list of supports [{'txid': (str) txid,
'nout': (int) nout,
'amount': (float) amount}],
'txid': (str) claim txid,
'nout': (str) claim nout,
'signature_is_valid': (bool), included if has_signature,
@ -719,15 +707,18 @@ Returns:
'depth': (int) claim depth,
'has_signature': (bool) included if decoded_claim
'name': (str) claim name,
'supports: (list) list of supports [{'txid': txid,
'nout': nout,
'amount': amount}],
'supports: (list) list of supports [{'txid': (str) txid,
'nout': (int) nout,
'amount': (float) amount}],
'txid': (str) claim txid,
'nout': (str) claim nout,
'signature_is_valid': (bool), included if has_signature,
'value': ClaimDict if decoded, otherwise hex string
}
If the uri resolves to a channel:
'claims_in_channel': (int) number of claims in the channel,
If the uri resolves to a claim:
'claim': {
'address': (str) claim address,
@ -741,9 +732,9 @@ Returns:
'has_signature': (bool) included if decoded_claim
'name': (str) claim name,
'channel_name': (str) channel name if claim is in a channel
'supports: (list) list of supports [{'txid': txid,
'nout': nout,
'amount': amount}]
'supports: (list) list of supports [{'txid': (str) txid,
'nout': (int) nout,
'amount': (float) amount}]
'txid': (str) claim txid,
'nout': (str) claim nout,
'signature_is_valid': (bool), included if has_signature,
@ -905,10 +896,37 @@ Returns:
List transactions belonging to wallet
Usage:
transaction_list
transaction_list [-t]
Options:
-t : Include claim tip information
Returns:
(list) List of transactions
(list) List of transactions, where is_tip is null by default,
and set to a boolean if include_tip_info is true
{
"claim_info": (list) claim info if in txn [{"amount": (float) claim amount,
"claim_id": (str) claim id,
"claim_name": (str) claim name,
"nout": (int) nout}],
"confirmations": (int) number of confirmations for the txn,
"date": (str) date and time of txn,
"fee": (float) txn fee,
"support_info": (list) support info if in txn [{"amount": (float) support amount,
"claim_id": (str) claim id,
"claim_name": (str) claim name,
"is_tip": (null) default,
(bool) if include_tip_info is true,
"nout": (int) nout}],
"timestamp": (int) timestamp,
"txid": (str) txn id,
"update_info": (list) update info if in txn [{"amount": (float) updated amount,
"claim_id": (str) claim id,
"claim_name": (str) claim name,
"nout": (int) nout}],
"value": (float) value of txn
}
```
## transaction_show

View file

@ -57,15 +57,14 @@ class Manager(object):
self._event(DOWNLOAD_STARTED, self._download_properties(id_, name, claim_dict))
)
def send_download_errored(self, id_, name, claim_dict=None):
self.analytics_api.track(
self._event(DOWNLOAD_ERRORED, self._download_properties(id_, name, claim_dict))
)
def send_download_errored(self, err, id_, name, claim_dict, report):
download_error_properties = self._download_error_properties(err, id_, name, claim_dict,
report)
self.analytics_api.track(self._event(DOWNLOAD_ERRORED, download_error_properties))
def send_download_finished(self, id_, name, claim_dict=None):
self.analytics_api.track(
self._event(DOWNLOAD_FINISHED, self._download_properties(id_, name, claim_dict))
)
def send_download_finished(self, id_, name, report, claim_dict=None):
download_properties = self._download_properties(id_, name, claim_dict, report)
self.analytics_api.track(self._event(DOWNLOAD_FINISHED, download_properties))
def send_claim_action(self, action):
self.analytics_api.track(self._event(CLAIM_ACTION, {'action': action}))
@ -159,18 +158,31 @@ class Manager(object):
return properties
@staticmethod
def _download_properties(id_, name, claim_dict=None):
sd_hash = None
if claim_dict:
try:
sd_hash = claim_dict.source_hash
except (KeyError, TypeError, ValueError):
log.debug('Failed to get sd_hash from %s', claim_dict, exc_info=True)
return {
def _download_properties(id_, name, claim_dict=None, report=None):
sd_hash = None if not claim_dict else claim_dict.source_hash
p = {
'download_id': id_,
'name': name,
'stream_info': sd_hash
}
if report:
p['report'] = report
return p
@staticmethod
def _download_error_properties(error, id_, name, claim_dict, report):
def error_name(err):
if not hasattr(type(err), "__name__"):
return str(type(err))
return type(err).__name__
return {
'download_id': id_,
'name': name,
'stream_info': claim_dict.source_hash,
'error': error_name(error),
'reason': error.message,
'report': report
}
@staticmethod
def _make_context(platform, wallet):

4
lbrynet/blob/__init__.py Normal file
View file

@ -0,0 +1,4 @@
from blob_file import BlobFile
from creator import BlobFileCreator
from writer import HashBlobWriter
from reader import HashBlobReader

227
lbrynet/blob/blob_file.py Normal file
View file

@ -0,0 +1,227 @@
import logging
import os
import threading
from twisted.internet import defer, threads
from twisted.protocols.basic import FileSender
from twisted.web.client import FileBodyProducer
from twisted.python.failure import Failure
from lbrynet import conf
from lbrynet.core.Error import DownloadCanceledError, InvalidDataError, InvalidBlobHashError
from lbrynet.core.utils import is_valid_blobhash
from lbrynet.blob.writer import HashBlobWriter
from lbrynet.blob.reader import HashBlobReader
log = logging.getLogger(__name__)
class BlobFile(object):
"""
A chunk of data available on the network which is specified by a hashsum
This class is used to create blobs on the local filesystem
when we already know the blob hash before hand (i.e., when downloading blobs)
Also can be used for reading from blobs on the local filesystem
"""
def __str__(self):
return self.blob_hash[:16]
def __repr__(self):
return '<{}({})>'.format(self.__class__.__name__, str(self))
def __init__(self, blob_dir, blob_hash, length=None):
if not is_valid_blobhash(blob_hash):
raise InvalidBlobHashError(blob_hash)
self.blob_hash = blob_hash
self.length = length
self.writers = {} # {Peer: writer, finished_deferred}
self._verified = False
self.readers = 0
self.blob_dir = blob_dir
self.file_path = os.path.join(blob_dir, self.blob_hash)
self.setting_verified_blob_lock = threading.Lock()
self.moved_verified_blob = False
if os.path.isfile(self.file_path):
self.set_length(os.path.getsize(self.file_path))
# This assumes that the hash of the blob has already been
# checked as part of the blob creation process. It might
# be worth having a function that checks the actual hash;
# its probably too expensive to have that check be part of
# this call.
self._verified = True
def open_for_writing(self, peer):
"""
open a blob file to be written by peer, supports concurrent
writers, as long as they are from differnt peers.
returns tuple of (writer, finished_deferred)
writer - a file like object with a write() function, close() when finished
finished_deferred - deferred that is fired when write is finished and returns
a instance of itself as HashBlob
"""
if not peer in self.writers:
log.debug("Opening %s to be written by %s", str(self), str(peer))
finished_deferred = defer.Deferred()
writer = HashBlobWriter(self.get_length, self.writer_finished)
self.writers[peer] = (writer, finished_deferred)
return (writer, finished_deferred)
log.warning("Tried to download the same file twice simultaneously from the same peer")
return None, None
def open_for_reading(self):
"""
open blob for reading
returns a file handle that can be read() from.
once finished with the file handle, user must call close_read_handle()
otherwise blob cannot be deleted.
"""
if self._verified is True:
file_handle = None
try:
file_handle = open(self.file_path, 'rb')
self.readers += 1
return file_handle
except IOError:
log.exception('Failed to open %s', self.file_path)
self.close_read_handle(file_handle)
return None
def delete(self):
"""
delete blob file from file system, prevent deletion
if a blob is being read from or written to
returns a deferred that firesback when delete is completed
"""
if not self.writers and not self.readers:
self._verified = False
self.moved_verified_blob = False
def delete_from_file_system():
if os.path.isfile(self.file_path):
os.remove(self.file_path)
d = threads.deferToThread(delete_from_file_system)
def log_error(err):
log.warning("An error occurred deleting %s: %s",
str(self.file_path), err.getErrorMessage())
return err
d.addErrback(log_error)
return d
else:
return defer.fail(Failure(
ValueError("File is currently being read or written and cannot be deleted")))
@property
def verified(self):
"""
Protect verified from being modified by other classes.
verified is True if a write to a blob has completed succesfully,
or a blob has been read to have the same length as specified
in init
"""
return self._verified
def set_length(self, length):
if self.length is not None and length == self.length:
return True
if self.length is None and 0 <= length <= conf.settings['BLOB_SIZE']:
self.length = length
return True
log.warning("Got an invalid length. Previous length: %s, Invalid length: %s",
self.length, length)
return False
def get_length(self):
return self.length
def get_is_verified(self):
return self.verified
def is_downloading(self):
if self.writers:
return True
return False
def read(self, write_func):
def close_self(*args):
self.close_read_handle(file_handle)
return args[0]
file_sender = FileSender()
reader = HashBlobReader(write_func)
file_handle = self.open_for_reading()
if file_handle is not None:
d = file_sender.beginFileTransfer(file_handle, reader)
d.addCallback(close_self)
else:
d = defer.fail(IOError("Could not read the blob"))
return d
def writer_finished(self, writer, err=None):
def fire_finished_deferred():
self._verified = True
for p, (w, finished_deferred) in self.writers.items():
if w == writer:
del self.writers[p]
finished_deferred.callback(self)
return True
log.warning(
"Somehow, the writer that was accepted as being valid was already removed: %s",
writer)
return False
def errback_finished_deferred(err):
for p, (w, finished_deferred) in self.writers.items():
if w == writer:
del self.writers[p]
finished_deferred.errback(err)
def cancel_other_downloads():
for p, (w, finished_deferred) in self.writers.items():
w.close()
if err is None:
if writer.len_so_far == self.length and writer.blob_hash == self.blob_hash:
if self._verified is False:
d = self._save_verified_blob(writer)
d.addCallbacks(lambda _: fire_finished_deferred(), errback_finished_deferred)
d.addCallback(lambda _: cancel_other_downloads())
else:
errback_finished_deferred(Failure(DownloadCanceledError()))
d = defer.succeed(True)
else:
err_string = "length vs expected: {0}, {1}, hash vs expected: {2}, {3}"
err_string = err_string.format(self.length, writer.len_so_far, self.blob_hash,
writer.blob_hash)
errback_finished_deferred(Failure(InvalidDataError(err_string)))
d = defer.succeed(True)
else:
errback_finished_deferred(err)
d = defer.succeed(True)
d.addBoth(lambda _: writer.close_handle())
return d
def close_read_handle(self, file_handle):
if file_handle is not None:
file_handle.close()
self.readers -= 1
@defer.inlineCallbacks
def _save_verified_blob(self, writer):
with self.setting_verified_blob_lock:
if self.moved_verified_blob is False:
writer.write_handle.seek(0)
out_path = os.path.join(self.blob_dir, self.blob_hash)
producer = FileBodyProducer(writer.write_handle)
yield producer.startProducing(open(out_path, 'wb'))
self.moved_verified_blob = True
defer.returnValue(True)
else:
raise DownloadCanceledError()

51
lbrynet/blob/creator.py Normal file
View file

@ -0,0 +1,51 @@
import os
import logging
from io import BytesIO
from twisted.internet import defer
from twisted.web.client import FileBodyProducer
from lbrynet.core.cryptoutils import get_lbry_hash_obj
log = logging.getLogger(__name__)
class BlobFileCreator(object):
"""
This class is used to create blobs on the local filesystem
when we do not know the blob hash beforehand (i.e, when creating
a new stream)
"""
def __init__(self, blob_dir):
self.blob_dir = blob_dir
self.buffer = BytesIO()
self._is_open = True
self._hashsum = get_lbry_hash_obj()
self.len_so_far = 0
self.blob_hash = None
self.length = None
@defer.inlineCallbacks
def close(self):
self.length = self.len_so_far
self.blob_hash = self._hashsum.hexdigest()
if self.blob_hash and self._is_open and self.length > 0:
# do not save 0 length files (empty tail blob in streams)
# or if its been closed already
self.buffer.seek(0)
out_path = os.path.join(self.blob_dir, self.blob_hash)
producer = FileBodyProducer(self.buffer)
yield producer.startProducing(open(out_path, 'wb'))
self._is_open = False
if self.length > 0:
defer.returnValue(self.blob_hash)
else:
# 0 length files (empty tail blob in streams )
# must return None as their blob_hash for
# it to be saved properly by EncryptedFileMetadataManagers
defer.returnValue(None)
def write(self, data):
if not self._is_open:
raise IOError
self._hashsum.update(data)
self.len_so_far += len(data)
self.buffer.write(data)

30
lbrynet/blob/reader.py Normal file
View file

@ -0,0 +1,30 @@
import logging
from twisted.internet import interfaces
from zope.interface import implements
log = logging.getLogger(__name__)
class HashBlobReader(object):
implements(interfaces.IConsumer)
def __init__(self, write_func):
self.write_func = write_func
def registerProducer(self, producer, streaming):
from twisted.internet import reactor
self.producer = producer
self.streaming = streaming
if self.streaming is False:
reactor.callLater(0, self.producer.resumeProducing)
def unregisterProducer(self):
pass
def write(self, data):
from twisted.internet import reactor
self.write_func(data)
if self.streaming is False:
reactor.callLater(0, self.producer.resumeProducing)

53
lbrynet/blob/writer.py Normal file
View file

@ -0,0 +1,53 @@
import logging
from io import BytesIO
from twisted.python.failure import Failure
from lbrynet.core.Error import DownloadCanceledError, InvalidDataError
from lbrynet.core.cryptoutils import get_lbry_hash_obj
log = logging.getLogger(__name__)
class HashBlobWriter(object):
def __init__(self, length_getter, finished_cb):
self.write_handle = BytesIO()
self.length_getter = length_getter
self.finished_cb = finished_cb
self.finished_cb_d = None
self._hashsum = get_lbry_hash_obj()
self.len_so_far = 0
@property
def blob_hash(self):
return self._hashsum.hexdigest()
def write(self, data):
if self.write_handle is None:
log.exception("writer has already been closed")
raise IOError('I/O operation on closed file')
self._hashsum.update(data)
self.len_so_far += len(data)
if self.len_so_far > self.length_getter():
self.finished_cb_d = self.finished_cb(
self,
Failure(InvalidDataError("Length so far is greater than the expected length."
" %s to %s" % (self.len_so_far,
self.length_getter()))))
else:
self.write_handle.write(data)
if self.len_so_far == self.length_getter():
self.finished_cb_d = self.finished_cb(self)
def close_handle(self):
if self.write_handle is not None:
self.write_handle.close()
self.write_handle = None
def close(self, reason=None):
# if we've already called finished_cb because we either finished writing
# or closed already, do nothing
if self.finished_cb_d is not None:
return
if reason is None:
reason = Failure(DownloadCanceledError())
self.finished_cb_d = self.finished_cb(self, reason)

View file

@ -251,7 +251,7 @@ ADJUSTABLE_SETTINGS = {
'download_directory': (str, default_download_dir),
'download_timeout': (int, 180),
'is_generous_host': (bool, True),
'announce_head_blobs_only': (bool, False),
'announce_head_blobs_only': (bool, True),
'known_dht_nodes': (list, DEFAULT_DHT_NODES, server_port),
'lbryum_wallet_dir': (str, default_lbryum_dir),
'max_connections_per_stream': (int, 5),
@ -272,7 +272,7 @@ ADJUSTABLE_SETTINGS = {
# at every auto_re_reflect_interval seconds, useful if initial reflect is unreliable
'auto_re_reflect': (bool, True),
'auto_re_reflect_interval': (int, 3600),
'reflector_servers': (list, [('reflector.lbry.io', 5566)], server_port),
'reflector_servers': (list, [('reflector2.lbry.io', 5566)], server_port),
'run_reflector_server': (bool, False),
'sd_download_timeout': (int, 3),
'share_usage_data': (bool, True), # whether to share usage stats and diagnostic info with LBRY

View file

@ -6,7 +6,8 @@ import sqlite3
from twisted.internet import threads, defer, reactor
from twisted.enterprise import adbapi
from lbrynet import conf
from lbrynet.core.HashBlob import BlobFile, BlobFileCreator
from lbrynet.blob.blob_file import BlobFile
from lbrynet.blob.creator import BlobFileCreator
from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier
from lbrynet.core.sqlite_helpers import rerun_if_locked
@ -29,7 +30,6 @@ class DiskBlobManager(DHTHashSupplier):
self.blob_dir = blob_dir
self.db_file = os.path.join(db_dir, "blobs.db")
self.db_conn = adbapi.ConnectionPool('sqlite3', self.db_file, check_same_thread=False)
self.blob_type = BlobFile
self.blob_creator_type = BlobFileCreator
# TODO: consider using an LRU for blobs as there could potentially
# be thousands of blobs loaded up, many stale
@ -51,7 +51,8 @@ class DiskBlobManager(DHTHashSupplier):
"""Return a blob identified by blob_hash, which may be a new blob or a
blob that is already on the hard disk
"""
assert length is None or isinstance(length, int)
if length is not None and not isinstance(length, int):
raise Exception("invalid length type: %s (%s)", length, str(type(length)))
if blob_hash in self.blobs:
return defer.succeed(self.blobs[blob_hash])
return self._make_new_blob(blob_hash, length)
@ -61,7 +62,7 @@ class DiskBlobManager(DHTHashSupplier):
def _make_new_blob(self, blob_hash, length=None):
log.debug('Making a new blob for %s', blob_hash)
blob = self.blob_type(self.blob_dir, blob_hash, length)
blob = BlobFile(self.blob_dir, blob_hash, length)
self.blobs[blob_hash] = blob
return defer.succeed(blob)
@ -87,12 +88,27 @@ class DiskBlobManager(DHTHashSupplier):
def hashes_to_announce(self):
return self._get_blobs_to_announce()
def set_should_announce(self, blob_hash, should_announce):
if blob_hash in self.blobs:
blob = self.blobs[blob_hash]
if blob.get_is_verified():
return self._set_should_announce(blob_hash,
self.get_next_announce_time(),
should_announce)
return defer.succeed(False)
def get_should_announce(self, blob_hash):
return self._should_announce(blob_hash)
def creator_finished(self, blob_creator, should_announce):
log.debug("blob_creator.blob_hash: %s", blob_creator.blob_hash)
assert blob_creator.blob_hash is not None
assert blob_creator.blob_hash not in self.blobs
assert blob_creator.length is not None
new_blob = self.blob_type(self.blob_dir, blob_creator.blob_hash, blob_creator.length)
if blob_creator.blob_hash is None:
raise Exception("Blob hash is None")
if blob_creator.blob_hash in self.blobs:
raise Exception("Creator finished for blob that is already marked as completed")
if blob_creator.length is None:
raise Exception("Blob has a length of 0")
new_blob = BlobFile(self.blob_dir, blob_creator.blob_hash, blob_creator.length)
self.blobs[blob_creator.blob_hash] = new_blob
next_announce_time = self.get_next_announce_time()
d = self.blob_completed(new_blob, next_announce_time, should_announce)
@ -112,6 +128,16 @@ class DiskBlobManager(DHTHashSupplier):
d = self._add_blob_to_download_history(blob_hash, host, rate)
return d
@defer.inlineCallbacks
def get_host_downloaded_from(self, blob_hash):
query_str = "SELECT host FROM download WHERE blob=? ORDER BY ts DESC LIMIT 1"
host = yield self.db_conn.runQuery(query_str, (blob_hash,))
if host:
result = host[0][0]
else:
result = None
defer.returnValue(result)
def add_blob_to_upload_history(self, blob_hash, host, rate):
d = self._add_blob_to_upload_history(blob_hash, host, rate)
return d
@ -137,6 +163,7 @@ class DiskBlobManager(DHTHashSupplier):
# threads.
def create_tables(transaction):
transaction.execute('PRAGMA journal_mode=WAL')
transaction.execute("create table if not exists blobs (" +
" blob_hash text primary key, " +
" blob_length integer, " +
@ -165,14 +192,29 @@ class DiskBlobManager(DHTHashSupplier):
def _add_completed_blob(self, blob_hash, length, next_announce_time, should_announce):
log.debug("Adding a completed blob. blob_hash=%s, length=%s", blob_hash, str(length))
should_announce = 1 if should_announce else 0
d = self.db_conn.runQuery(
"insert into blobs (blob_hash, blob_length, next_announce_time, should_announce) "+
"values (?, ?, ?, ?)",
(blob_hash, length, next_announce_time, should_announce)
)
d = self.db_conn.runQuery("insert into blobs (blob_hash, blob_length, next_announce_time, "
"should_announce) values (?, ?, ?, ?)", (blob_hash, length,
next_announce_time,
should_announce))
# TODO: why is this here?
d.addErrback(lambda err: err.trap(sqlite3.IntegrityError))
return d
@rerun_if_locked
@defer.inlineCallbacks
def _set_should_announce(self, blob_hash, next_announce_time, should_announce):
yield self.db_conn.runOperation("update blobs set next_announce_time=?, should_announce=? "
"where blob_hash=?", (next_announce_time, should_announce,
blob_hash))
defer.returnValue(True)
@rerun_if_locked
@defer.inlineCallbacks
def _should_announce(self, blob_hash):
result = yield self.db_conn.runQuery("select should_announce from blobs where blob_hash=?",
(blob_hash,))
defer.returnValue(result[0][0])
@defer.inlineCallbacks
def _completed_blobs(self, blobhashes_to_check):
"""Returns of the blobhashes_to_check, which are valid"""
@ -187,7 +229,6 @@ class DiskBlobManager(DHTHashSupplier):
@rerun_if_locked
def _get_blobs_to_announce(self):
def get_and_update(transaction):
timestamp = time.time()
if self.announce_head_blobs_only is True:
@ -225,6 +266,14 @@ class DiskBlobManager(DHTHashSupplier):
d = self.db_conn.runQuery("select blob_hash from blobs")
return d
@rerun_if_locked
@defer.inlineCallbacks
def _get_all_should_announce_blob_hashes(self):
# return a list of blob hashes where should_announce is True
blob_hashes = yield self.db_conn.runQuery(
"select blob_hash from blobs where should_announce = 1")
defer.returnValue([d[0] for d in blob_hashes])
@rerun_if_locked
def _get_all_verified_blob_hashes(self):
d = self._get_all_blob_hashes()
@ -255,5 +304,3 @@ class DiskBlobManager(DHTHashSupplier):
"insert into upload values (null, ?, ?, ?, ?) ",
(blob_hash, str(host), float(rate), ts))
return d

View file

@ -9,11 +9,26 @@ class DuplicateStreamHashError(Exception):
class DownloadCanceledError(Exception):
pass
class DownloadSDTimeout(Exception):
def __init__(self, download):
Exception.__init__(self, 'Failed to download sd blob {} within timeout'.format(download))
self.download = download
class DownloadTimeoutError(Exception):
def __init__(self, download):
Exception.__init__(self, 'Failed to download {} within timeout'.format(download))
self.download = download
class DownloadDataTimeout(Exception):
def __init__(self, download):
Exception.__init__(self, 'Failed to download data blobs for sd hash '
'{} within timeout'.format(download))
self.download = download
class RequestCanceledError(Exception):
pass

View file

@ -1,413 +0,0 @@
from StringIO import StringIO
import logging
import os
import tempfile
import threading
import shutil
from twisted.internet import interfaces, defer, threads
from twisted.protocols.basic import FileSender
from twisted.python.failure import Failure
from zope.interface import implements
from lbrynet import conf
from lbrynet.core.Error import DownloadCanceledError, InvalidDataError
from lbrynet.core.cryptoutils import get_lbry_hash_obj
from lbrynet.core.utils import is_valid_blobhash
log = logging.getLogger(__name__)
class HashBlobReader(object):
implements(interfaces.IConsumer)
def __init__(self, write_func):
self.write_func = write_func
def registerProducer(self, producer, streaming):
from twisted.internet import reactor
self.producer = producer
self.streaming = streaming
if self.streaming is False:
reactor.callLater(0, self.producer.resumeProducing)
def unregisterProducer(self):
pass
def write(self, data):
from twisted.internet import reactor
self.write_func(data)
if self.streaming is False:
reactor.callLater(0, self.producer.resumeProducing)
class HashBlobWriter(object):
def __init__(self, write_handle, length_getter, finished_cb):
self.write_handle = write_handle
self.length_getter = length_getter
self.finished_cb = finished_cb
self._hashsum = get_lbry_hash_obj()
self.len_so_far = 0
@property
def blob_hash(self):
return self._hashsum.hexdigest()
def write(self, data):
self._hashsum.update(data)
self.len_so_far += len(data)
if self.len_so_far > self.length_getter():
self.finished_cb(
self,
Failure(InvalidDataError("Length so far is greater than the expected length."
" %s to %s" % (self.len_so_far,
self.length_getter()))))
else:
if self.write_handle is None:
log.debug("Tried to write to a write_handle that was None.")
return
self.write_handle.write(data)
if self.len_so_far == self.length_getter():
self.finished_cb(self)
def cancel(self, reason=None):
if reason is None:
reason = Failure(DownloadCanceledError())
self.finished_cb(self, reason)
class HashBlob(object):
"""A chunk of data available on the network which is specified by a hashsum"""
def __init__(self, blob_hash, length=None):
assert is_valid_blobhash(blob_hash)
self.blob_hash = blob_hash
self.length = length
self.writers = {} # {Peer: writer, finished_deferred}
self.finished_deferred = None
self._verified = False
self.readers = 0
@property
def verified(self):
# protect verified from being modified by other classes
return self._verified
def set_length(self, length):
if self.length is not None and length == self.length:
return True
if self.length is None and 0 <= length <= conf.settings['BLOB_SIZE']:
self.length = length
return True
log.warning("Got an invalid length. Previous length: %s, Invalid length: %s",
self.length, length)
return False
def get_length(self):
return self.length
def is_validated(self):
return bool(self._verified)
def is_downloading(self):
if self.writers:
return True
return False
def read(self, write_func):
def close_self(*args):
self.close_read_handle(file_handle)
return args[0]
file_sender = FileSender()
reader = HashBlobReader(write_func)
file_handle = self.open_for_reading()
if file_handle is not None:
d = file_sender.beginFileTransfer(file_handle, reader)
d.addCallback(close_self)
else:
d = defer.fail(ValueError("Could not read the blob"))
return d
def writer_finished(self, writer, err=None):
def fire_finished_deferred():
self._verified = True
for p, (w, finished_deferred) in self.writers.items():
if w == writer:
finished_deferred.callback(self)
del self.writers[p]
return True
log.warning(
"Somehow, the writer that was accepted as being valid was already removed: %s",
writer)
return False
def errback_finished_deferred(err):
for p, (w, finished_deferred) in self.writers.items():
if w == writer:
finished_deferred.errback(err)
del self.writers[p]
def cancel_other_downloads():
for p, (w, finished_deferred) in self.writers.items():
w.cancel()
if err is None:
if writer.len_so_far == self.length and writer.blob_hash == self.blob_hash:
if self._verified is False:
d = self._save_verified_blob(writer)
d.addCallbacks(lambda _: fire_finished_deferred(), errback_finished_deferred)
d.addCallback(lambda _: cancel_other_downloads())
else:
errback_finished_deferred(Failure(DownloadCanceledError()))
d = defer.succeed(True)
else:
err_string = "length vs expected: {0}, {1}, hash vs expected: {2}, {3}"
err_string = err_string.format(self.length, writer.len_so_far, self.blob_hash,
writer.blob_hash)
errback_finished_deferred(Failure(InvalidDataError(err_string)))
d = defer.succeed(True)
else:
errback_finished_deferred(err)
d = defer.succeed(True)
d.addBoth(lambda _: self._close_writer(writer))
return d
def open_for_writing(self, peer):
pass
def open_for_reading(self):
pass
def delete(self):
pass
def close_read_handle(self, file_handle):
pass
def _close_writer(self, writer):
pass
def _save_verified_blob(self, writer):
pass
def __str__(self):
return self.blob_hash[:16]
def __repr__(self):
return '<{}({})>'.format(self.__class__.__name__, str(self))
class BlobFile(HashBlob):
"""A HashBlob which will be saved to the hard disk of the downloader"""
def __init__(self, blob_dir, *args):
HashBlob.__init__(self, *args)
self.blob_dir = blob_dir
self.file_path = os.path.join(blob_dir, self.blob_hash)
self.setting_verified_blob_lock = threading.Lock()
self.moved_verified_blob = False
if os.path.isfile(self.file_path):
self.set_length(os.path.getsize(self.file_path))
# This assumes that the hash of the blob has already been
# checked as part of the blob creation process. It might
# be worth having a function that checks the actual hash;
# its probably too expensive to have that check be part of
# this call.
self._verified = True
def open_for_writing(self, peer):
if not peer in self.writers:
log.debug("Opening %s to be written by %s", str(self), str(peer))
write_file = tempfile.NamedTemporaryFile(delete=False, dir=self.blob_dir)
finished_deferred = defer.Deferred()
writer = HashBlobWriter(write_file, self.get_length, self.writer_finished)
self.writers[peer] = (writer, finished_deferred)
return finished_deferred, writer.write, writer.cancel
log.warning("Tried to download the same file twice simultaneously from the same peer")
return None, None, None
def open_for_reading(self):
if self._verified is True:
file_handle = None
try:
file_handle = open(self.file_path, 'rb')
self.readers += 1
return file_handle
except IOError:
log.exception('Failed to open %s', self.file_path)
self.close_read_handle(file_handle)
return None
def delete(self):
if not self.writers and not self.readers:
self._verified = False
self.moved_verified_blob = False
def delete_from_file_system():
if os.path.isfile(self.file_path):
os.remove(self.file_path)
d = threads.deferToThread(delete_from_file_system)
def log_error(err):
log.warning("An error occurred deleting %s: %s",
str(self.file_path), err.getErrorMessage())
return err
d.addErrback(log_error)
return d
else:
return defer.fail(Failure(
ValueError("File is currently being read or written and cannot be deleted")))
def close_read_handle(self, file_handle):
if file_handle is not None:
file_handle.close()
self.readers -= 1
def _close_writer(self, writer):
if writer.write_handle is not None:
log.debug("Closing %s", str(self))
name = writer.write_handle.name
writer.write_handle.close()
threads.deferToThread(os.remove, name)
writer.write_handle = None
def _save_verified_blob(self, writer):
def move_file():
with self.setting_verified_blob_lock:
if self.moved_verified_blob is False:
temp_file_name = writer.write_handle.name
writer.write_handle.close()
shutil.move(temp_file_name, self.file_path)
writer.write_handle = None
self.moved_verified_blob = True
return True
else:
raise DownloadCanceledError()
return threads.deferToThread(move_file)
class TempBlob(HashBlob):
"""A HashBlob which will only exist in memory"""
def __init__(self, *args):
HashBlob.__init__(self, *args)
self.data_buffer = ""
def open_for_writing(self, peer):
if not peer in self.writers:
temp_buffer = StringIO()
finished_deferred = defer.Deferred()
writer = HashBlobWriter(temp_buffer, self.get_length, self.writer_finished)
self.writers[peer] = (writer, finished_deferred)
return finished_deferred, writer.write, writer.cancel
return None, None, None
def open_for_reading(self):
if self._verified is True:
return StringIO(self.data_buffer)
return None
def delete(self):
if not self.writers and not self.readers:
self._verified = False
self.data_buffer = ''
return defer.succeed(True)
else:
return defer.fail(Failure(
ValueError("Blob is currently being read or written and cannot be deleted")))
def close_read_handle(self, file_handle):
file_handle.close()
def _close_writer(self, writer):
if writer.write_handle is not None:
writer.write_handle.close()
writer.write_handle = None
def _save_verified_blob(self, writer):
if not self.data_buffer:
self.data_buffer = writer.write_handle.getvalue()
writer.write_handle.close()
writer.write_handle = None
return defer.succeed(True)
else:
return defer.fail(Failure(DownloadCanceledError()))
class HashBlobCreator(object):
def __init__(self):
self._hashsum = get_lbry_hash_obj()
self.len_so_far = 0
self.blob_hash = None
self.length = None
def open(self):
pass
def close(self):
self.length = self.len_so_far
if self.length == 0:
self.blob_hash = None
else:
self.blob_hash = self._hashsum.hexdigest()
d = self._close()
if self.blob_hash is not None:
d.addCallback(lambda _: self.blob_hash)
else:
d.addCallback(lambda _: None)
return d
def write(self, data):
self._hashsum.update(data)
self.len_so_far += len(data)
self._write(data)
def _close(self):
pass
def _write(self, data):
pass
class BlobFileCreator(HashBlobCreator):
def __init__(self, blob_dir):
HashBlobCreator.__init__(self)
self.blob_dir = blob_dir
self.out_file = tempfile.NamedTemporaryFile(delete=False, dir=self.blob_dir)
def _close(self):
temp_file_name = self.out_file.name
self.out_file.close()
if self.blob_hash is not None:
shutil.move(temp_file_name, os.path.join(self.blob_dir, self.blob_hash))
else:
os.remove(temp_file_name)
return defer.succeed(True)
def _write(self, data):
self.out_file.write(data)
class TempBlobCreator(HashBlobCreator):
def __init__(self):
HashBlobCreator.__init__(self)
# TODO: use StringIO
self.data_buffer = ''
def _close(self):
return defer.succeed(True)
def _write(self, data):
self.data_buffer += data

View file

@ -1,76 +0,0 @@
import logging
from twisted.internet import interfaces, defer
from zope.interface import implements
log = logging.getLogger(__name__)
class StreamCreator(object):
"""Classes which derive from this class create a 'stream', which can be any
collection of associated blobs and associated metadata. These classes
use the IConsumer interface to get data from an IProducer and transform
the data into a 'stream'"""
implements(interfaces.IConsumer)
def __init__(self, name):
"""
@param name: the name of the stream
"""
self.name = name
self.stopped = True
self.producer = None
self.streaming = None
self.blob_count = -1
self.current_blob = None
self.finished_deferreds = []
def _blob_finished(self, blob_info):
pass
def registerProducer(self, producer, streaming):
from twisted.internet import reactor
self.producer = producer
self.streaming = streaming
self.stopped = False
if streaming is False:
reactor.callLater(0, self.producer.resumeProducing)
def unregisterProducer(self):
self.stopped = True
self.producer = None
def stop(self):
"""Stop creating the stream. Create the terminating zero-length blob."""
log.debug("stop has been called for StreamCreator")
self.stopped = True
if self.current_blob is not None:
current_blob = self.current_blob
d = current_blob.close()
d.addCallback(self._blob_finished)
self.finished_deferreds.append(d)
self.current_blob = None
self._finalize()
dl = defer.DeferredList(self.finished_deferreds)
dl.addCallback(lambda _: self._finished())
return dl
def _finalize(self):
pass
def _finished(self):
pass
# TODO: move the stream creation process to its own thread and
# remove the reactor from this process.
def write(self, data):
from twisted.internet import reactor
self._write(data)
if self.stopped is False and self.streaming is False:
reactor.callLater(0, self.producer.resumeProducing)
def _write(self, data):
pass

View file

@ -4,6 +4,7 @@ from decimal import Decimal
from twisted.internet import defer
from twisted.python.failure import Failure
from twisted.internet.error import ConnectionAborted
from zope.interface import implements
from lbrynet.core.Error import ConnectionClosedBeforeResponseError
@ -225,7 +226,8 @@ class RequestHelper(object):
self.requestor._update_local_score(self.peer, score)
def _request_failed(self, reason, request_type):
if reason.check(RequestCanceledError):
if reason.check(DownloadCanceledError, RequestCanceledError, ConnectionAborted,
ConnectionClosedBeforeResponseError):
return
if reason.check(NoResponseError):
self.requestor._incompatible_peers.append(self.peer)
@ -463,13 +465,13 @@ class DownloadRequest(RequestHelper):
def find_blob(self, to_download):
"""Return the first blob in `to_download` that is successfully opened for write."""
for blob in to_download:
if blob.is_validated():
if blob.get_is_verified():
log.debug('Skipping blob %s as its already validated', blob)
continue
d, write_func, cancel_func = blob.open_for_writing(self.peer)
writer, d = blob.open_for_writing(self.peer)
if d is not None:
return BlobDownloadDetails(blob, d, write_func, cancel_func, self.peer)
log.debug('Skipping blob %s as there was an issue opening it for writing', blob)
return BlobDownloadDetails(blob, d, writer.write, writer.close, self.peer)
log.warning('Skipping blob %s as there was an issue opening it for writing', blob)
return None
def _make_request(self, blob_details):
@ -514,8 +516,6 @@ class DownloadRequest(RequestHelper):
def _pay_or_cancel_payment(self, arg, reserved_points, blob):
if self._can_pay_peer(blob, arg):
self._pay_peer(blob.length, reserved_points)
d = self.requestor.blob_manager.add_blob_to_download_history(
str(blob), str(self.peer.host), float(self.protocol_prices[self.protocol]))
else:
self._cancel_points(reserved_points)
return arg
@ -565,8 +565,11 @@ class DownloadRequest(RequestHelper):
self.peer.update_stats('blobs_downloaded', 1)
self.peer.update_score(5.0)
should_announce = blob.blob_hash == self.head_blob_hash
self.requestor.blob_manager.blob_completed(blob, should_announce=should_announce)
return arg
d = self.requestor.blob_manager.blob_completed(blob, should_announce=should_announce)
d.addCallback(lambda _: self.requestor.blob_manager.add_blob_to_download_history(
blob.blob_hash, self.peer.host, self.protocol_prices[self.protocol]))
d.addCallback(lambda _: arg)
return d
def _download_failed(self, reason):
if not reason.check(DownloadCanceledError, PriceDisagreementError):

View file

@ -50,6 +50,7 @@ class ClientProtocol(Protocol, TimeoutMixin):
log.debug("Data receieved from %s", self.peer)
self.setTimeout(None)
self._rate_limiter.report_dl_bytes(len(data))
if self._downloading_blob is True:
self._blob_download_request.write(data)
else:
@ -101,8 +102,7 @@ class ClientProtocol(Protocol, TimeoutMixin):
d = self.add_request(blob_request)
self._blob_download_request = blob_request
blob_request.finished_deferred.addCallbacks(self._downloading_finished,
self._downloading_failed)
blob_request.finished_deferred.addErrback(self._handle_response_error)
self._handle_response_error)
return d
else:
raise ValueError("There is already a blob download request active")
@ -110,7 +110,7 @@ class ClientProtocol(Protocol, TimeoutMixin):
def cancel_requests(self):
self.connection_closing = True
ds = []
err = failure.Failure(RequestCanceledError())
err = RequestCanceledError()
for key, d in self._response_deferreds.items():
del self._response_deferreds[key]
d.errback(err)
@ -119,6 +119,7 @@ class ClientProtocol(Protocol, TimeoutMixin):
self._blob_download_request.cancel(err)
ds.append(self._blob_download_request.finished_deferred)
self._blob_download_request = None
self._downloading_blob = False
return defer.DeferredList(ds)
######### Internal request handling #########
@ -176,15 +177,24 @@ class ClientProtocol(Protocol, TimeoutMixin):
def _handle_response_error(self, err):
# If an error gets to this point, log it and kill the connection.
expected_errors = (MisbehavingPeerError, ConnectionClosedBeforeResponseError,
DownloadCanceledError, RequestCanceledError)
if not err.check(expected_errors):
if err.check(DownloadCanceledError, RequestCanceledError, error.ConnectionAborted):
# TODO: (wish-list) it seems silly to close the connection over this, and it shouldn't
# TODO: always be this way. it's done this way now because the client has no other way
# TODO: of telling the server it wants the download to stop. It would be great if the
# TODO: protocol had such a mechanism.
log.info("Closing the connection to %s because the download of blob %s was canceled",
self.peer, self._blob_download_request.blob)
result = None
elif not err.check(MisbehavingPeerError, ConnectionClosedBeforeResponseError):
log.warning("The connection to %s is closing due to: %s", self.peer, err)
result = err
else:
log.error("The connection to %s is closing due to an unexpected error: %s",
self.peer, err.getErrorMessage())
if not err.check(RequestCanceledError):
# The connection manager is closing the connection, so
# there's no need to do it here.
return err
self.peer, err)
result = err
self.transport.loseConnection()
return result
def _handle_response(self, response):
ds = []
@ -225,7 +235,7 @@ class ClientProtocol(Protocol, TimeoutMixin):
log.debug("Asking for another request from %s", self.peer)
self._ask_for_request()
else:
log.debug("Not asking for another request from %s", self.peer)
log.warning("Not asking for another request from %s", self.peer)
self.transport.loseConnection()
dl.addCallback(get_next_request)
@ -236,16 +246,6 @@ class ClientProtocol(Protocol, TimeoutMixin):
self._downloading_blob = False
return arg
def _downloading_failed(self, err):
if err.check(DownloadCanceledError):
# TODO: (wish-list) it seems silly to close the connection over this, and it shouldn't
# TODO: always be this way. it's done this way now because the client has no other way
# TODO: of telling the server it wants the download to stop. It would be great if the
# TODO: protocol had such a mechanism.
log.debug("Closing the connection to %s because the download of blob %s was canceled",
self.peer, self._blob_download_request.blob)
return err
######### IRateLimited #########
def throttle_upload(self):

View file

@ -5,7 +5,7 @@ from lbrynet.core.BlobInfo import BlobInfo
from lbrynet.core.client.BlobRequester import BlobRequester
from lbrynet.core.client.ConnectionManager import ConnectionManager
from lbrynet.core.client.DownloadManager import DownloadManager
from lbrynet.core.Error import InvalidBlobHashError, DownloadTimeoutError
from lbrynet.core.Error import InvalidBlobHashError, DownloadSDTimeout
from lbrynet.core.utils import is_valid_blobhash, safe_start_looping_call, safe_stop_looping_call
from twisted.python.failure import Failure
from twisted.internet import defer
@ -64,14 +64,14 @@ class SingleProgressManager(object):
def stream_position(self):
blobs = self.download_manager.blobs
if blobs and blobs[0].is_validated():
if blobs and blobs[0].get_is_verified():
return 1
return 0
def needed_blobs(self):
blobs = self.download_manager.blobs
assert len(blobs) == 1
return [b for b in blobs.itervalues() if not b.is_validated()]
return [b for b in blobs.itervalues() if not b.get_is_verified()]
class DummyBlobHandler(object):
@ -136,7 +136,7 @@ class StandaloneBlobDownloader(object):
def _download_timedout(self):
self.stop()
if not self.finished_deferred.called:
self.finished_deferred.errback(DownloadTimeoutError(self.blob_hash))
self.finished_deferred.errback(DownloadSDTimeout(self.blob_hash))
def insufficient_funds(self, err):
self.stop()

View file

@ -93,7 +93,7 @@ class FullStreamProgressManager(StreamProgressManager):
return (
i not in blobs or
(
not blobs[i].is_validated() and
not blobs[i].get_is_verified() and
i not in self.provided_blob_nums
)
)
@ -112,7 +112,7 @@ class FullStreamProgressManager(StreamProgressManager):
blobs = self.download_manager.blobs
return [
b for n, b in blobs.iteritems()
if not b.is_validated() and not n in self.provided_blob_nums
if not b.get_is_verified() and not n in self.provided_blob_nums
]
######### internal #########
@ -145,7 +145,7 @@ class FullStreamProgressManager(StreamProgressManager):
current_blob_num = self.last_blob_outputted + 1
if current_blob_num in blobs and blobs[current_blob_num].is_validated():
if current_blob_num in blobs and blobs[current_blob_num].get_is_verified():
log.debug("Outputting blob %s", str(self.last_blob_outputted + 1))
self.provided_blob_nums.append(self.last_blob_outputted + 1)
d = self.download_manager.handle_blob(self.last_blob_outputted + 1)
@ -154,10 +154,11 @@ class FullStreamProgressManager(StreamProgressManager):
d.addCallback(lambda _: check_if_finished())
def log_error(err):
log.warning("Error occurred in the output loop. Error: %s", err.getErrorMessage())
log.warning("Error occurred in the output loop. Error: %s", err)
if self.outputting_d is not None and not self.outputting_d.called:
self.outputting_d.callback(True)
self.outputting_d = None
self.stop()
d.addErrback(log_error)
else:

View file

@ -1,9 +1,9 @@
from Crypto.Hash import SHA384
import seccure
import hashlib
def get_lbry_hash_obj():
return SHA384.new()
return hashlib.sha384()
def get_pub_key(pass_phrase):

View file

@ -143,7 +143,7 @@ class BlobRequestHandler(object):
def open_blob_for_reading(self, blob, response):
response_fields = {}
d = defer.succeed(None)
if blob.is_validated():
if blob.get_is_verified():
read_handle = blob.open_for_reading()
if read_handle is not None:
self.currently_uploading = blob
@ -162,7 +162,7 @@ class BlobRequestHandler(object):
def record_transaction(self, blob):
d = self.blob_manager.add_blob_to_upload_history(
str(blob), self.peer.host, self.blob_data_payment_rate)
blob.blob_hash, self.peer.host, self.blob_data_payment_rate)
return d
def _reply_to_send_request(self, response, incoming):

View file

@ -1,11 +1,16 @@
import binascii
import logging
from Crypto.Cipher import AES
from twisted.internet import defer
from cryptography.hazmat.primitives.ciphers import Cipher, modes
from cryptography.hazmat.primitives.ciphers.algorithms import AES
from cryptography.hazmat.primitives.padding import PKCS7
from cryptography.hazmat.backends import default_backend
from lbrynet import conf
from lbrynet.core.BlobInfo import BlobInfo
log = logging.getLogger(__name__)
backend = default_backend()
class CryptBlobInfo(BlobInfo):
@ -31,7 +36,9 @@ class StreamBlobDecryptor(object):
self.length = length
self.buff = b''
self.len_read = 0
self.cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
cipher = Cipher(AES(self.key), modes.CBC(self.iv), backend=backend)
self.unpadder = PKCS7(AES.block_size).unpadder()
self.cipher = cipher.decryptor()
def decrypt(self, write_func):
"""
@ -42,22 +49,23 @@ class StreamBlobDecryptor(object):
"""
def remove_padding(data):
pad_len = ord(data[-1])
data, padding = data[:-1 * pad_len], data[-1 * pad_len:]
for c in padding:
assert ord(c) == pad_len
return data
return self.unpadder.update(data) + self.unpadder.finalize()
def write_bytes():
if self.len_read < self.length:
num_bytes_to_decrypt = greatest_multiple(len(self.buff), self.cipher.block_size)
num_bytes_to_decrypt = greatest_multiple(len(self.buff), (AES.block_size / 8))
data_to_decrypt, self.buff = split(self.buff, num_bytes_to_decrypt)
write_func(self.cipher.decrypt(data_to_decrypt))
write_func(self.cipher.update(data_to_decrypt))
def finish_decrypt():
assert len(self.buff) % self.cipher.block_size == 0
bytes_left = len(self.buff) % (AES.block_size / 8)
if bytes_left != 0:
log.warning(self.buff[-1 * (AES.block_size / 8):].encode('hex'))
raise Exception("blob %s has incorrect padding: %i bytes left" %
(self.blob.blob_hash, bytes_left))
data_to_decrypt, self.buff = self.buff, b''
write_func(remove_padding(self.cipher.decrypt(data_to_decrypt)))
last_chunk = self.cipher.update(data_to_decrypt) + self.cipher.finalize()
write_func(remove_padding(last_chunk))
def decrypt_bytes(data):
self.buff += data
@ -84,8 +92,9 @@ class CryptStreamBlobMaker(object):
self.iv = iv
self.blob_num = blob_num
self.blob = blob
self.cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
self.buff = b''
cipher = Cipher(AES(self.key), modes.CBC(self.iv), backend=backend)
self.padder = PKCS7(AES.block_size).padder()
self.cipher = cipher.encryptor()
self.length = 0
def write(self, data):
@ -104,39 +113,26 @@ class CryptStreamBlobMaker(object):
done = True
else:
num_bytes_to_write = len(data)
self.length += num_bytes_to_write
data_to_write = data[:num_bytes_to_write]
self.buff += data_to_write
self._write_buffer()
self.length += len(data_to_write)
padded_data = self.padder.update(data_to_write)
encrypted_data = self.cipher.update(padded_data)
self.blob.write(encrypted_data)
return done, num_bytes_to_write
@defer.inlineCallbacks
def close(self):
log.debug("closing blob %s with plaintext len %s", str(self.blob_num), str(self.length))
if self.length != 0:
self._close_buffer()
d = self.blob.close()
d.addCallback(self._return_info)
self.length += (AES.block_size / 8) - (self.length % (AES.block_size / 8))
padded_data = self.padder.finalize()
encrypted_data = self.cipher.update(padded_data) + self.cipher.finalize()
self.blob.write(encrypted_data)
blob_hash = yield self.blob.close()
log.debug("called the finished_callback from CryptStreamBlobMaker.close")
return d
def _write_buffer(self):
num_bytes_to_encrypt = (len(self.buff) // AES.block_size) * AES.block_size
data_to_encrypt, self.buff = split(self.buff, num_bytes_to_encrypt)
encrypted_data = self.cipher.encrypt(data_to_encrypt)
self.blob.write(encrypted_data)
def _close_buffer(self):
data_to_encrypt, self.buff = self.buff, b''
assert len(data_to_encrypt) < AES.block_size
pad_len = AES.block_size - len(data_to_encrypt)
padded_data = data_to_encrypt + chr(pad_len) * pad_len
self.length += pad_len
assert len(padded_data) == AES.block_size
encrypted_data = self.cipher.encrypt(padded_data)
self.blob.write(encrypted_data)
def _return_info(self, blob_hash):
return CryptBlobInfo(blob_hash, self.blob_num, self.length, binascii.hexlify(self.iv))
blob = CryptBlobInfo(blob_hash, self.blob_num, self.length, binascii.hexlify(self.iv))
defer.returnValue(blob)
def greatest_multiple(a, b):

View file

@ -3,25 +3,27 @@ Utility for creating Crypt Streams, which are encrypted blobs and associated met
"""
import logging
from twisted.internet import interfaces, defer
from zope.interface import implements
from Crypto import Random
from Crypto.Cipher import AES
from twisted.internet import defer
from lbrynet.core.StreamCreator import StreamCreator
from lbrynet.cryptstream.CryptBlob import CryptStreamBlobMaker
log = logging.getLogger(__name__)
class CryptStreamCreator(StreamCreator):
"""Create a new stream with blobs encrypted by a symmetric cipher.
class CryptStreamCreator(object):
"""
Create a new stream with blobs encrypted by a symmetric cipher.
Each blob is encrypted with the same key, but each blob has its
own initialization vector which is associated with the blob when
the blob is associated with the stream.
"""
implements(interfaces.IConsumer)
def __init__(self, blob_manager, name=None, key=None, iv_generator=None):
"""@param blob_manager: Object that stores and provides access to blobs.
@type blob_manager: BlobManager
@ -39,14 +41,59 @@ class CryptStreamCreator(StreamCreator):
@return: None
"""
StreamCreator.__init__(self, name)
self.blob_manager = blob_manager
self.name = name
self.key = key
if iv_generator is None:
self.iv_generator = self.random_iv_generator()
else:
self.iv_generator = iv_generator
self.stopped = True
self.producer = None
self.streaming = None
self.blob_count = -1
self.current_blob = None
self.finished_deferreds = []
def registerProducer(self, producer, streaming):
from twisted.internet import reactor
self.producer = producer
self.streaming = streaming
self.stopped = False
if streaming is False:
reactor.callLater(0, self.producer.resumeProducing)
def unregisterProducer(self):
self.stopped = True
self.producer = None
def stop(self):
"""Stop creating the stream. Create the terminating zero-length blob."""
log.debug("stop has been called for StreamCreator")
self.stopped = True
if self.current_blob is not None:
current_blob = self.current_blob
d = current_blob.close()
d.addCallback(self._blob_finished)
d.addErrback(self._error)
self.finished_deferreds.append(d)
self.current_blob = None
self._finalize()
dl = defer.DeferredList(self.finished_deferreds)
dl.addCallback(lambda _: self._finished())
dl.addErrback(self._error)
return dl
# TODO: move the stream creation process to its own thread and
# remove the reactor from this process.
def write(self, data):
from twisted.internet import reactor
self._write(data)
if self.stopped is False and self.streaming is False:
reactor.callLater(0, self.producer.resumeProducing)
@staticmethod
def random_iv_generator():
while 1:
@ -77,11 +124,6 @@ class CryptStreamCreator(StreamCreator):
self.finished_deferreds.append(d)
def _write(self, data):
def close_blob(blob):
d = blob.close()
d.addCallback(self._blob_finished)
self.finished_deferreds.append(d)
while len(data) > 0:
if self.current_blob is None:
self.next_blob_creator = self.blob_manager.get_blob_creator()
@ -94,10 +136,19 @@ class CryptStreamCreator(StreamCreator):
should_announce = self.blob_count == 0
d = self.current_blob.close()
d.addCallback(self._blob_finished)
d.addCallback(lambda _: self.blob_manager.creator_finished(
self.next_blob_creator, should_announce))
d.addCallback(lambda blob_info: self.blob_manager.creator_finished(blob_info,
should_announce))
self.finished_deferreds.append(d)
self.current_blob = None
def _get_blob_maker(self, iv, blob_creator):
return CryptStreamBlobMaker(self.key, iv, self.blob_count, blob_creator)
def _error(self, error):
log.error(error)
def _finished(self):
raise NotImplementedError()
def _blob_finished(self, blob_info):
raise NotImplementedError()

View file

@ -29,7 +29,6 @@ from lbrynet.reflector import reupload
from lbrynet.reflector import ServerFactory as reflector_server_factory
from lbrynet.core.log_support import configure_loggly_handler
from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileSaverFactory
from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileOpenerFactory
from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier
from lbrynet.lbry_file.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager
from lbrynet.lbry_file.StreamDescriptor import EncryptedFileStreamType
@ -47,7 +46,7 @@ from lbrynet.core.looping_call_manager import LoopingCallManager
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
from lbrynet.core.Error import InsufficientFundsError, UnknownNameError, NoSuchSDHash
from lbrynet.core.Error import NoSuchStreamHash
from lbrynet.core.Error import NoSuchStreamHash, DownloadDataTimeout, DownloadSDTimeout
from lbrynet.core.Error import NullFundsError, NegativeFundsError
log = logging.getLogger(__name__)
@ -317,7 +316,8 @@ class Daemon(AuthJSONRPCServer):
if self.reflector_port is not None:
reflector_factory = reflector_server_factory(
self.session.peer_manager,
self.session.blob_manager
self.session.blob_manager,
self.stream_info_manager
)
try:
self.reflector_server_port = reactor.listenTCP(self.reflector_port,
@ -392,6 +392,11 @@ class Daemon(AuthJSONRPCServer):
def _already_shutting_down(sig_num, frame):
log.info("Already shutting down")
def _stop_streams(self):
"""stop pending GetStream downloads"""
for claim_id, stream in self.streams.iteritems():
stream.cancel(reason="daemon shutdown")
def _shutdown(self):
# ignore INT/TERM signals once shutdown has started
signal.signal(signal.SIGINT, self._already_shutting_down)
@ -399,6 +404,9 @@ class Daemon(AuthJSONRPCServer):
log.info("Closing lbrynet session")
log.info("Status at time of shutdown: " + self.startup_status[0])
self._stop_streams()
self.looping_call_manager.shutdown()
if self.analytics_manager:
self.analytics_manager.shutdown()
@ -578,17 +586,8 @@ class Daemon(AuthJSONRPCServer):
self.session.wallet,
self.download_directory
)
self.sd_identifier.add_stream_downloader_factory(
EncryptedFileStreamType, file_saver_factory)
file_opener_factory = EncryptedFileOpenerFactory(
self.session.peer_finder,
self.session.rate_limiter,
self.session.blob_manager,
self.stream_info_manager,
self.session.wallet
)
self.sd_identifier.add_stream_downloader_factory(
EncryptedFileStreamType, file_opener_factory)
self.sd_identifier.add_stream_downloader_factory(EncryptedFileStreamType,
file_saver_factory)
return defer.succeed(None)
def _download_blob(self, blob_hash, rate_manager=None, timeout=None):
@ -608,6 +607,39 @@ class Daemon(AuthJSONRPCServer):
timeout = timeout or 30
return download_sd_blob(self.session, blob_hash, rate_manager, timeout)
@defer.inlineCallbacks
def _get_stream_analytics_report(self, claim_dict):
sd_hash = claim_dict.source_hash
try:
stream_hash = yield self.stream_info_manager.get_stream_hash_for_sd_hash(sd_hash)
except Exception:
stream_hash = None
report = {
"sd_hash": sd_hash,
"stream_hash": stream_hash,
}
blobs = {}
try:
sd_host = yield self.session.blob_manager.get_host_downloaded_from(sd_hash)
except Exception:
sd_host = None
report["sd_blob"] = sd_host
if stream_hash:
blob_infos = yield self.stream_info_manager.get_blobs_for_stream(stream_hash)
report["known_blobs"] = len(blob_infos)
else:
blob_infos = []
report["known_blobs"] = 0
for blob_hash, blob_num, iv, length in blob_infos:
try:
host = yield self.session.blob_manager.get_host_downloaded_from(blob_hash)
except Exception:
host = None
if host:
blobs[blob_num] = host
report["blobs"] = json.dumps(blobs)
defer.returnValue(report)
@defer.inlineCallbacks
def _download_name(self, name, claim_dict, claim_id, timeout=None, file_name=None):
"""
@ -615,6 +647,17 @@ class Daemon(AuthJSONRPCServer):
If it already exists in the file manager, return the existing lbry file
"""
@defer.inlineCallbacks
def _download_finished(download_id, name, claim_dict):
report = yield self._get_stream_analytics_report(claim_dict)
self.analytics_manager.send_download_finished(download_id, name, report, claim_dict)
@defer.inlineCallbacks
def _download_failed(error, download_id, name, claim_dict):
report = yield self._get_stream_analytics_report(claim_dict)
self.analytics_manager.send_download_errored(error, download_id, name, claim_dict,
report)
if claim_id in self.streams:
downloader = self.streams[claim_id]
result = yield downloader.finished_deferred
@ -630,17 +673,23 @@ class Daemon(AuthJSONRPCServer):
file_name)
try:
lbry_file, finished_deferred = yield self.streams[claim_id].start(claim_dict, name)
finished_deferred.addCallback(
lambda _: self.analytics_manager.send_download_finished(download_id,
name,
claim_dict))
finished_deferred.addCallbacks(lambda _: _download_finished(download_id, name,
claim_dict),
lambda e: _download_failed(e, download_id, name,
claim_dict))
result = yield self._get_lbry_file_dict(lbry_file, full_status=True)
del self.streams[claim_id]
except Exception as err:
log.warning('Failed to get %s: %s', name, err)
self.analytics_manager.send_download_errored(download_id, name, claim_dict)
del self.streams[claim_id]
yield _download_failed(err, download_id, name, claim_dict)
if isinstance(err, (DownloadDataTimeout, DownloadSDTimeout)):
log.warning('Failed to get %s (%s)', name, err)
else:
log.error('Failed to get %s (%s)', name, err)
if self.streams[claim_id].downloader:
yield self.streams[claim_id].downloader.stop(err)
result = {'error': err.message}
finally:
del self.streams[claim_id]
defer.returnValue(result)
@defer.inlineCallbacks
@ -2462,27 +2511,24 @@ class Daemon(AuthJSONRPCServer):
"""
if announce_all:
yield self.session.blob_manager.immediate_announce_all_blobs()
elif blob_hash:
blob_hashes = [blob_hash]
yield self.session.blob_manager._immediate_announce(blob_hashes)
elif stream_hash:
blobs = yield self.get_blobs_for_stream_hash(stream_hash)
blobs = [blob for blob in blobs if blob.is_validated()]
blob_hashes = [blob.blob_hash for blob in blobs]
yield self.session.blob_manager._immediate_announce(blob_hashes)
elif sd_hash:
blobs = yield self.get_blobs_for_sd_hash(sd_hash)
blobs = [blob for blob in blobs if blob.is_validated()]
blob_hashes = [blob.blob_hash for blob in blobs]
blob_hashes.append(sd_hash)
yield self.session.blob_manager._immediate_announce(blob_hashes)
else:
raise Exception('single argument must be specified')
if blob_hash:
blob_hashes = [blob_hash]
elif stream_hash:
blobs = yield self.get_blobs_for_stream_hash(stream_hash)
blob_hashes = [blob.blob_hash for blob in blobs if blob.get_is_verified()]
elif sd_hash:
blobs = yield self.get_blobs_for_sd_hash(sd_hash)
blob_hashes = [sd_hash] + [blob.blob_hash for blob in blobs if
blob.get_is_verified()]
else:
raise Exception('single argument must be specified')
yield self.session.blob_manager._immediate_announce(blob_hashes)
response = yield self._render_response(True)
defer.returnValue(response)
# TODO: This command should be deprecated in favor of blob_announce
@AuthJSONRPCServer.deprecated("blob_announce")
def jsonrpc_blob_announce_all(self):
"""
Announce all blobs to the DHT
@ -2493,10 +2539,7 @@ class Daemon(AuthJSONRPCServer):
Returns:
(str) Success/fail message
"""
d = self.session.blob_manager.immediate_announce_all_blobs()
d.addCallback(lambda _: self._render_response("Announced"))
return d
return self.jsonrpc_blob_announce(announce_all=True)
@defer.inlineCallbacks
def jsonrpc_file_reflect(self, **kwargs):
@ -2580,9 +2623,9 @@ class Daemon(AuthJSONRPCServer):
blobs = self.session.blob_manager.blobs.itervalues()
if needed:
blobs = [blob for blob in blobs if not blob.is_validated()]
blobs = [blob for blob in blobs if not blob.get_is_verified()]
if finished:
blobs = [blob for blob in blobs if blob.is_validated()]
blobs = [blob for blob in blobs if blob.get_is_verified()]
blob_hashes = [blob.blob_hash for blob in blobs]
page_size = page_size or len(blob_hashes)

View file

@ -5,11 +5,11 @@ from twisted.internet.task import LoopingCall
from lbryschema.fee import Fee
from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed, DownloadTimeoutError
from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed
from lbrynet.core.Error import DownloadDataTimeout, DownloadCanceledError
from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call
from lbrynet.core.StreamDescriptor import download_sd_blob
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
from lbrynet import conf
INITIALIZING_CODE = 'initializing'
@ -61,23 +61,22 @@ class GetStream(object):
return os.path.join(self.download_directory, self.downloader.file_name)
def _check_status(self, status):
stop_condition = (status.num_completed > 0 or
status.running_status == ManagedEncryptedFileDownloader.STATUS_STOPPED)
if stop_condition and not self.data_downloading_deferred.called:
if status.num_completed > 0 and not self.data_downloading_deferred.called:
self.data_downloading_deferred.callback(True)
if self.data_downloading_deferred.called:
safe_stop_looping_call(self.checker)
else:
log.info("Downloading stream data (%i seconds)", self.timeout_counter)
log.info("Waiting for stream data (%i seconds)", self.timeout_counter)
def check_status(self):
"""
Check if we've got the first data blob in the stream yet
"""
self.timeout_counter += 1
if self.timeout_counter >= self.timeout:
if self.timeout_counter > self.timeout:
if not self.data_downloading_deferred.called:
self.data_downloading_deferred.errback(DownloadTimeoutError(self.file_name))
self.data_downloading_deferred.errback(DownloadDataTimeout(self.sd_hash))
safe_stop_looping_call(self.checker)
else:
d = self.downloader.status()
@ -150,6 +149,10 @@ class GetStream(object):
self._check_status(status)
defer.returnValue(self.download_path)
def fail(self, err):
safe_stop_looping_call(self.checker)
raise err
@defer.inlineCallbacks
def _initialize(self, stream_info):
# Set sd_hash and return key_fee from stream_info
@ -179,7 +182,7 @@ class GetStream(object):
log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path)
self.finished_deferred = self.downloader.start()
self.finished_deferred.addCallback(self.finish, name)
self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), self.fail)
@defer.inlineCallbacks
def start(self, stream_info, name):
@ -204,9 +207,18 @@ class GetStream(object):
try:
yield self.data_downloading_deferred
except Exception as err:
self.downloader.stop()
except DownloadDataTimeout as err:
safe_stop_looping_call(self.checker)
raise
raise err
defer.returnValue((self.downloader, self.finished_deferred))
def cancel(self, reason=None):
if reason:
msg = "download stream cancelled: %s" % reason
else:
msg = "download stream cancelled"
if self.finished_deferred and not self.finished_deferred.called:
self.finished_deferred.errback(DownloadCanceledError(msg))
if self.data_downloading_deferred and not self.data_downloading_deferred.called:
self.data_downloading_deferred.errback(DownloadCanceledError(msg))

View file

@ -64,9 +64,8 @@ class MarketFeed(object):
self.rate = ExchangeRate(self.market, price, int(time.time()))
def _log_error(self, err):
log.warning(
"There was a problem updating %s exchange rate information from %s",
self.market, self.name)
log.warning("There was a problem updating %s exchange rate information from %s\n%s",
self.market, self.name, err)
def _update_price(self):
d = threads.deferToThread(self._make_request)
@ -141,7 +140,10 @@ class LBRYioBTCFeed(MarketFeed):
)
def _handle_response(self, response):
json_response = json.loads(response)
try:
json_response = json.loads(response)
except ValueError:
raise InvalidExchangeRateResponse(self.name, "invalid rate response : %s" % response)
if 'data' not in json_response:
raise InvalidExchangeRateResponse(self.name, 'result not found')
return defer.succeed(1.0 / json_response['data']['btc_usd'])

View file

@ -29,8 +29,9 @@ class EncryptedFileStreamCreator(CryptStreamCreator):
self.blob_infos = []
def _blob_finished(self, blob_info):
log.debug("length: %s", str(blob_info.length))
log.debug("length: %s", blob_info.length)
self.blob_infos.append(blob_info)
return blob_info
def _save_stream_info(self):
stream_info_manager = self.lbry_file_manager.stream_info_manager
@ -40,10 +41,6 @@ class EncryptedFileStreamCreator(CryptStreamCreator):
self.blob_infos)
return d
def setup(self):
d = CryptStreamCreator.setup(self)
return d
def _get_blobs_hashsum(self):
blobs_hashsum = get_lbry_hash_obj()
for blob_info in sorted(self.blob_infos, key=lambda b_i: b_i.blob_num):

View file

@ -250,7 +250,7 @@ class EncryptedFileManager(object):
if self.sql_db:
yield self.sql_db.close()
self.sql_db = None
log.info("Stopped %s", self)
log.info("Stopped encrypted file manager")
defer.returnValue(True)
def get_count_for_stream_hash(self, stream_hash):
@ -303,8 +303,10 @@ class EncryptedFileManager(object):
@rerun_if_locked
def _change_file_status(self, rowid, new_status):
return self.sql_db.runQuery("update lbry_file_options set status = ? where rowid = ?",
d = self.sql_db.runQuery("update lbry_file_options set status = ? where rowid = ?",
(new_status, rowid))
d.addCallback(lambda _: new_status)
return d
@rerun_if_locked
def _get_lbry_file_status(self, rowid):

View file

@ -1,4 +1,3 @@
import subprocess
import binascii
from zope.interface import implements
@ -10,8 +9,7 @@ from lbrynet.core.StreamDescriptor import StreamMetadata
from lbrynet.interfaces import IStreamDownloaderFactory
from lbrynet.lbry_file.client.EncryptedFileMetadataHandler import EncryptedFileMetadataHandler
import os
from twisted.internet import defer, threads, reactor
from twisted.python.procutils import which
from twisted.internet import defer, threads
import logging
import traceback
@ -282,90 +280,3 @@ class EncryptedFileSaverFactory(EncryptedFileDownloaderFactory):
@staticmethod
def get_description():
return "Save"
class EncryptedFileOpener(EncryptedFileDownloader):
def __init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager,
payment_rate_manager, wallet):
EncryptedFileDownloader.__init__(self, stream_hash,
peer_finder, rate_limiter,
blob_manager, stream_info_manager,
payment_rate_manager, wallet,
)
self.process = None
self.process_log = None
def stop(self, err=None):
d = EncryptedFileDownloader.stop(self, err=err)
d.addCallback(lambda _: self._delete_from_info_manager())
return d
def _get_progress_manager(self, download_manager):
return FullStreamProgressManager(self._finished_downloading,
self.blob_manager,
download_manager)
def _setup_output(self):
def start_process():
if os.name == "nt":
paths = [r'C:\Program Files\VideoLAN\VLC\vlc.exe',
r'C:\Program Files (x86)\VideoLAN\VLC\vlc.exe']
for p in paths:
if os.path.exists(p):
vlc_path = p
break
else:
raise ValueError("You must install VLC media player to stream files")
else:
vlc_path = 'vlc'
self.process_log = open("vlc.out", 'a')
try:
self.process = subprocess.Popen([vlc_path, '-'], stdin=subprocess.PIPE,
stdout=self.process_log, stderr=self.process_log)
except OSError:
raise ValueError("VLC media player could not be opened")
d = threads.deferToThread(start_process)
return d
def _close_output(self):
if self.process is not None:
self.process.stdin.close()
self.process = None
return defer.succeed(True)
def _get_write_func(self):
def write_func(data):
if self.stopped is False and self.process is not None:
try:
self.process.stdin.write(data)
except IOError:
reactor.callLater(0, self.stop)
return write_func
def _delete_from_info_manager(self):
return self.stream_info_manager.delete_stream(self.stream_hash)
class EncryptedFileOpenerFactory(EncryptedFileDownloaderFactory):
def can_download(self, sd_validator):
if which('vlc'):
return True
elif os.name == "nt":
paths = [r'C:\Program Files\VideoLAN\VLC\vlc.exe',
r'C:\Program Files (x86)\VideoLAN\VLC\vlc.exe']
for p in paths:
if os.path.exists(p):
return True
return False
def _make_downloader(self, stream_hash, payment_rate_manager, stream_info):
return EncryptedFileOpener(stream_hash, self.peer_finder,
self.rate_limiter, self.blob_manager,
self.stream_info_manager,
payment_rate_manager, self.wallet,
)
@staticmethod
def get_description():
return "Stream"

View file

@ -1,5 +1,6 @@
import logging
from zope.interface import implements
from twisted.internet import defer
from lbrynet.cryptstream.CryptBlob import CryptBlobInfo
from lbrynet.interfaces import IMetadataHandler
@ -18,10 +19,11 @@ class EncryptedFileMetadataHandler(object):
######### IMetadataHandler #########
@defer.inlineCallbacks
def get_initial_blobs(self):
d = self.stream_info_manager.get_blobs_for_stream(self.stream_hash)
d.addCallback(self._format_initial_blobs_for_download_manager)
return d
blob_infos = yield self.stream_info_manager.get_blobs_for_stream(self.stream_hash)
formatted_infos = self._format_initial_blobs_for_download_manager(blob_infos)
defer.returnValue(formatted_infos)
def final_blob_num(self):
return self._final_blob_num
@ -30,10 +32,12 @@ class EncryptedFileMetadataHandler(object):
def _format_initial_blobs_for_download_manager(self, blob_infos):
infos = []
for blob_hash, blob_num, iv, length in blob_infos:
if blob_hash is not None:
for i, (blob_hash, blob_num, iv, length) in enumerate(blob_infos):
if blob_hash is not None and length:
infos.append(CryptBlobInfo(blob_hash, blob_num, length, iv))
else:
if i != len(blob_infos) - 1:
raise Exception("Invalid stream terminator")
log.debug("Setting _final_blob_num to %s", str(blob_num - 1))
self._final_blob_num = blob_num - 1
return infos

View file

@ -132,7 +132,7 @@ class BlobReflectorClient(Protocol):
return self.set_not_uploading()
def open_blob_for_reading(self, blob):
if blob.is_validated():
if blob.get_is_verified():
read_handle = blob.open_for_reading()
if read_handle is not None:
log.debug('Getting ready to send %s', blob.blob_hash)

View file

@ -112,11 +112,11 @@ class EncryptedFileReflectorClient(Protocol):
def get_validated_blobs(self, blobs_in_stream):
def get_blobs(blobs):
for (blob, _, _, blob_len) in blobs:
if blob:
if blob and blob_len:
yield self.blob_manager.get_blob(blob, blob_len)
dl = defer.DeferredList(list(get_blobs(blobs_in_stream)), consumeErrors=True)
dl.addCallback(lambda blobs: [blob for r, blob in blobs if r and blob.is_validated()])
dl.addCallback(lambda blobs: [blob for r, blob in blobs if r and blob.get_is_verified()])
return dl
def set_blobs_to_send(self, blobs_to_send):
@ -253,7 +253,7 @@ class EncryptedFileReflectorClient(Protocol):
return self.set_not_uploading()
def open_blob_for_reading(self, blob):
if blob.is_validated():
if blob.get_is_verified():
read_handle = blob.open_for_reading()
if read_handle is not None:
log.debug('Getting ready to send %s', blob.blob_hash)

View file

@ -4,7 +4,9 @@ from twisted.python import failure
from twisted.internet import error, defer
from twisted.internet.protocol import Protocol, ServerFactory
from lbrynet.core.utils import is_valid_blobhash
from lbrynet.core.Error import DownloadCanceledError, InvalidBlobHashError
from lbrynet.core.Error import DownloadCanceledError, InvalidBlobHashError, NoSuchSDHash
from lbrynet.core.StreamDescriptor import BlobStreamDescriptorReader
from lbrynet.lbry_file.StreamDescriptor import save_sd_info
from lbrynet.reflector.common import REFLECTOR_V1, REFLECTOR_V2
from lbrynet.reflector.common import ReflectorRequestError, ReflectorClientVersionError
@ -30,16 +32,17 @@ class ReflectorServer(Protocol):
log.debug('Connection made to %s', peer_info)
self.peer = self.factory.peer_manager.get_peer(peer_info.host, peer_info.port)
self.blob_manager = self.factory.blob_manager
self.stream_info_manager = self.factory.stream_info_manager
self.protocol_version = self.factory.protocol_version
self.received_handshake = False
self.peer_version = None
self.receiving_blob = False
self.incoming_blob = None
self.blob_write = None
self.blob_finished_d = None
self.cancel_write = None
self.request_buff = ""
self.blob_writer = None
def connectionLost(self, reason=failure.Failure(error.ConnectionDone())):
log.info("Reflector upload from %s finished" % self.peer.host)
@ -61,10 +64,74 @@ class ReflectorServer(Protocol):
else:
log.exception(err)
@defer.inlineCallbacks
def check_head_blob_announce(self, stream_hash):
blob_infos = yield self.stream_info_manager.get_blobs_for_stream(stream_hash)
blob_hash, blob_num, blob_iv, blob_length = blob_infos[0]
if blob_hash in self.blob_manager.blobs:
head_blob = self.blob_manager.blobs[blob_hash]
if head_blob.get_is_verified():
should_announce = yield self.blob_manager.get_should_announce(blob_hash)
if should_announce == 0:
yield self.blob_manager.set_should_announce(blob_hash, 1)
log.info("Discovered previously completed head blob (%s), "
"setting it to be announced", blob_hash[:8])
defer.returnValue(None)
@defer.inlineCallbacks
def check_sd_blob_announce(self, sd_hash):
if sd_hash in self.blob_manager.blobs:
sd_blob = self.blob_manager.blobs[sd_hash]
if sd_blob.get_is_verified():
should_announce = yield self.blob_manager.get_should_announce(sd_hash)
if should_announce == 0:
yield self.blob_manager.set_should_announce(sd_hash, 1)
log.info("Discovered previously completed sd blob (%s), "
"setting it to be announced", sd_hash[:8])
try:
yield self.stream_info_manager.get_stream_hash_for_sd_hash(sd_hash)
except NoSuchSDHash:
log.info("Adding blobs to stream")
sd_info = yield BlobStreamDescriptorReader(sd_blob).get_info()
yield save_sd_info(self.stream_info_manager, sd_info)
yield self.stream_info_manager.save_sd_blob_hash_to_stream(
sd_info['stream_hash'],
sd_hash)
defer.returnValue(None)
@defer.inlineCallbacks
def _on_completed_blob(self, blob, response_key):
yield self.blob_manager.blob_completed(blob)
should_announce = False
if response_key == RECEIVED_SD_BLOB:
sd_info = yield BlobStreamDescriptorReader(blob).get_info()
yield save_sd_info(self.stream_info_manager, sd_info)
yield self.stream_info_manager.save_sd_blob_hash_to_stream(sd_info['stream_hash'],
blob.blob_hash)
should_announce = True
# if we already have the head blob, set it to be announced now that we know it's
# a head blob
d = self.check_head_blob_announce(sd_info['stream_hash'])
else:
d = defer.succeed(None)
stream_hash = yield self.stream_info_manager.get_stream_of_blob(blob.blob_hash)
if stream_hash is not None:
blob_num = yield self.stream_info_manager._get_blob_num_by_hash(stream_hash,
blob.blob_hash)
if blob_num == 0:
should_announce = True
sd_hashes = yield self.stream_info_manager.get_sd_blob_hashes_for_stream(
stream_hash)
# if we already have the sd blob, set it to be announced now that we know it's
# a sd blob
for sd_hash in sd_hashes:
d.addCallback(lambda _: self.check_sd_blob_announce(sd_hash))
yield self.blob_manager.blob_completed(blob, should_announce=should_announce)
yield self.close_blob()
yield d
log.info("Received %s", blob)
yield self.send_response({response_key: True})
@ -82,14 +149,14 @@ class ReflectorServer(Protocol):
"""
blob = self.incoming_blob
self.blob_finished_d, self.blob_write, self.cancel_write = blob.open_for_writing(self.peer)
self.blob_writer, self.blob_finished_d = blob.open_for_writing(self.peer)
self.blob_finished_d.addCallback(self._on_completed_blob, response_key)
self.blob_finished_d.addErrback(self._on_failed_blob, response_key)
def close_blob(self):
self.blob_writer.close()
self.blob_writer = None
self.blob_finished_d = None
self.blob_write = None
self.cancel_write = None
self.incoming_blob = None
self.receiving_blob = False
@ -99,7 +166,7 @@ class ReflectorServer(Protocol):
def dataReceived(self, data):
if self.receiving_blob:
self.blob_write(data)
self.blob_writer.write(data)
else:
log.debug('Not yet recieving blob, data needs further processing')
self.request_buff += data
@ -110,7 +177,7 @@ class ReflectorServer(Protocol):
d.addErrback(self.handle_error)
if self.receiving_blob and extra_data:
log.debug('Writing extra data to blob')
self.blob_write(extra_data)
self.blob_writer.write(extra_data)
def _get_valid_response(self, response_msg):
extra_data = None
@ -221,7 +288,7 @@ class ReflectorServer(Protocol):
sd_blob_hash = request_dict[SD_BLOB_HASH]
sd_blob_size = request_dict[SD_BLOB_SIZE]
if self.blob_write is None:
if self.blob_writer is None:
d = self.blob_manager.get_blob(sd_blob_hash, sd_blob_size)
d.addCallback(self.get_descriptor_response)
d.addCallback(self.send_response)
@ -230,16 +297,29 @@ class ReflectorServer(Protocol):
d = self.blob_finished_d
return d
@defer.inlineCallbacks
def get_descriptor_response(self, sd_blob):
if sd_blob.is_validated():
d = defer.succeed({SEND_SD_BLOB: False})
d.addCallback(self.request_needed_blobs, sd_blob)
if sd_blob.get_is_verified():
# if we already have the sd blob being offered, make sure we have it and the head blob
# marked as such for announcement now that we know it's an sd blob that we have.
yield self.check_sd_blob_announce(sd_blob.blob_hash)
try:
stream_hash = yield self.stream_info_manager.get_stream_hash_for_sd_hash(
sd_blob.blob_hash)
except NoSuchSDHash:
sd_info = yield BlobStreamDescriptorReader(sd_blob).get_info()
stream_hash = sd_info['stream_hash']
yield save_sd_info(self.stream_info_manager, sd_info)
yield self.stream_info_manager.save_sd_blob_hash_to_stream(stream_hash,
sd_blob.blob_hash)
yield self.check_head_blob_announce(stream_hash)
response = yield self.request_needed_blobs({SEND_SD_BLOB: False}, sd_blob)
else:
self.incoming_blob = sd_blob
self.receiving_blob = True
self.handle_incoming_blob(RECEIVED_SD_BLOB)
d = defer.succeed({SEND_SD_BLOB: True})
return d
response = {SEND_SD_BLOB: True}
defer.returnValue(response)
def request_needed_blobs(self, response, sd_blob):
def _add_needed_blobs_to_response(needed_blobs):
@ -267,7 +347,7 @@ class ReflectorServer(Protocol):
if 'blob_hash' in blob and 'length' in blob:
blob_hash, blob_len = blob['blob_hash'], blob['length']
d = self.blob_manager.get_blob(blob_hash, blob_len)
d.addCallback(lambda blob: blob_hash if not blob.is_validated() else None)
d.addCallback(lambda blob: blob_hash if not blob.get_is_verified() else None)
yield d
def handle_blob_request(self, request_dict):
@ -293,7 +373,7 @@ class ReflectorServer(Protocol):
blob_hash = request_dict[BLOB_HASH]
blob_size = request_dict[BLOB_SIZE]
if self.blob_write is None:
if self.blob_writer is None:
log.debug('Received info for blob: %s', blob_hash[:16])
d = self.blob_manager.get_blob(blob_hash, blob_size)
d.addCallback(self.get_blob_response)
@ -305,7 +385,7 @@ class ReflectorServer(Protocol):
return d
def get_blob_response(self, blob):
if blob.is_validated():
if blob.get_is_verified():
return defer.succeed({SEND_BLOB: False})
else:
self.incoming_blob = blob
@ -318,9 +398,10 @@ class ReflectorServer(Protocol):
class ReflectorServerFactory(ServerFactory):
protocol = ReflectorServer
def __init__(self, peer_manager, blob_manager):
def __init__(self, peer_manager, blob_manager, stream_info_manager):
self.peer_manager = peer_manager
self.blob_manager = blob_manager
self.stream_info_manager = stream_info_manager
self.protocol_version = REFLECTOR_V2
def buildProtocol(self, addr):

View file

@ -1,4 +1,5 @@
Twisted==16.6.0
cryptography==2.0.3
appdirs==1.4.3
argparse==1.2.1
docopt==0.6.2

View file

@ -10,7 +10,7 @@ from twisted.internet import reactor
from lbrynet import conf
from lbrynet.cryptstream import CryptBlob
from lbrynet.core import HashBlob
from lbrynet.blob import BlobFile
from lbrynet.core import log_support
@ -46,7 +46,7 @@ def decrypt_blob(blob_file, key, iv, output):
filename = os.path.abspath(blob_file)
length = os.path.getsize(filename)
directory, blob_hash = os.path.split(filename)
blob = HashBlob.BlobFile(directory, blob_hash, True, length)
blob = BlobFile(directory, blob_hash, length)
decryptor = CryptBlob.StreamBlobDecryptor(
blob, binascii.unhexlify(key), binascii.unhexlify(iv), length)
with open(output, 'w') as f:

View file

@ -2,7 +2,7 @@
import argparse
import logging
import sys
import tempfile
import os
from twisted.internet import defer
from twisted.internet import reactor
@ -13,7 +13,7 @@ from lbrynet import conf
from lbrynet.core import log_support
from lbrynet.core import BlobManager
from lbrynet.core import HashAnnouncer
from lbrynet.core import HashBlob
from lbrynet.blob import BlobFile
from lbrynet.core import RateLimiter
from lbrynet.core import Peer
from lbrynet.core import Wallet
@ -31,13 +31,14 @@ def main(args=None):
parser.add_argument('--timeout', type=int, default=30)
parser.add_argument('peer')
parser.add_argument('blob_hash')
parser.add_argument('directory', type=str, default=os.getcwd())
args = parser.parse_args(args)
log_support.configure_console(level='DEBUG')
announcer = HashAnnouncer.DummyHashAnnouncer()
blob_manager = MyBlobManager(announcer)
blob = HashBlob.TempBlob(args.blob_hash, False)
blob = BlobFile(args.directory, args.blob_hash)
download_manager = SingleBlobDownloadManager(blob)
peer = Peer.Peer(*conf.server_port(args.peer))
payment_rate_manager = DumbPaymentRateManager()

View file

@ -1,17 +1,18 @@
"""Encrypt a single file using the given key and iv"""
import argparse
import binascii
import logging
import StringIO
import sys
from twisted.internet import defer
from twisted.internet import reactor
from twisted.protocols import basic
from twisted.web.client import FileBodyProducer
from lbrynet import conf
from lbrynet.cryptstream import CryptBlob
from lbrynet.core import log_support
from lbrynet.core import cryptoutils
from lbrynet.core.HashAnnouncer import DummyHashAnnouncer
from lbrynet.core.BlobManager import DiskBlobManager
from lbrynet.cryptstream.CryptStreamCreator import CryptStreamCreator
log = logging.getLogger('decrypt_blob')
@ -26,7 +27,7 @@ def main():
args = parser.parse_args()
log_support.configure_console(level='DEBUG')
d = run(args)
run(args)
reactor.run()
@ -40,29 +41,23 @@ def run(args):
reactor.callLater(0, reactor.stop)
@defer.inlineCallbacks
def encrypt_blob(filename, key, iv):
blob = Blob()
blob_maker = CryptBlob.CryptStreamBlobMaker(
binascii.unhexlify(key), binascii.unhexlify(iv), 0, blob)
with open(filename) as fin:
blob_maker.write(fin.read())
blob_maker.close()
dummy_announcer = DummyHashAnnouncer()
manager = DiskBlobManager(dummy_announcer, '.', '.')
yield manager.setup()
creator = CryptStreamCreator(manager, filename, key, iv_generator(iv))
with open(filename, 'r') as infile:
producer = FileBodyProducer(infile, readSize=2**22)
yield producer.startProducing(creator)
yield creator.stop()
class Blob(object):
def __init__(self):
self.data = StringIO.StringIO()
def write(self, data):
self.data.write(data)
def close(self):
hashsum = cryptoutils.get_lbry_hash_obj()
buffer = self.data.getvalue()
hashsum.update(buffer)
with open(hashsum.hexdigest(), 'w') as fout:
fout.write(buffer)
return defer.succeed(True)
def iv_generator(iv):
iv = int(iv, 16)
while 1:
iv += 1
yield ("%016d" % iv)[-16:]
if __name__ == '__main__':

View file

@ -59,7 +59,7 @@ def main(args=None):
use_upnp=False,
wallet=wallet
)
api = analytics.Api.new_instance()
api = analytics.Api.new_instance(conf.settings['share_usage_data'])
run(args, session, api)
reactor.run()
finally:

View file

@ -81,17 +81,21 @@ class TestReflector(unittest.TestCase):
self.server_db_dir, self.server_blob_dir = mk_db_and_blob_dir()
self.server_blob_manager = BlobManager.DiskBlobManager(
hash_announcer, self.server_blob_dir, self.server_db_dir)
self.server_stream_info_manager = EncryptedFileMetadataManager.DBEncryptedFileMetadataManager(self.server_db_dir)
d = self.session.setup()
d.addCallback(lambda _: self.stream_info_manager.setup())
d.addCallback(lambda _: EncryptedFileOptions.add_lbry_file_to_sd_identifier(sd_identifier))
d.addCallback(lambda _: self.lbry_file_manager.setup())
d.addCallback(lambda _: self.server_blob_manager.setup())
d.addCallback(lambda _: self.server_stream_info_manager.setup())
def verify_equal(sd_info):
self.assertEqual(mocks.create_stream_sd_file, sd_info)
def save_sd_blob_hash(sd_hash):
self.sd_hash = sd_hash
self.expected_blobs.append((sd_hash, 923))
def verify_stream_descriptor_file(stream_hash):
@ -120,7 +124,7 @@ class TestReflector(unittest.TestCase):
return d
def start_server():
server_factory = reflector.ServerFactory(peer_manager, self.server_blob_manager)
server_factory = reflector.ServerFactory(peer_manager, self.server_blob_manager, self.server_stream_info_manager)
from twisted.internet import reactor
port = 8943
while self.reflector_port is None:
@ -160,12 +164,34 @@ class TestReflector(unittest.TestCase):
return d
def test_stream_reflector(self):
def verify_data_on_reflector():
def verify_blob_on_reflector():
check_blob_ds = []
for blob_hash, blob_size in self.expected_blobs:
check_blob_ds.append(verify_have_blob(blob_hash, blob_size))
return defer.DeferredList(check_blob_ds)
@defer.inlineCallbacks
def verify_stream_on_reflector():
# check stream_info_manager has all the right information
streams = yield self.server_stream_info_manager.get_all_streams()
self.assertEqual(1, len(streams))
self.assertEqual(self.stream_hash, streams[0])
blobs = yield self.server_stream_info_manager.get_blobs_for_stream(self.stream_hash)
blob_hashes = [b[0] for b in blobs if b[0] is not None]
expected_blob_hashes = [b[0] for b in self.expected_blobs[:-1] if b[0] is not None]
self.assertEqual(expected_blob_hashes, blob_hashes)
sd_hashes = yield self.server_stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash)
self.assertEqual(1, len(sd_hashes))
expected_sd_hash = self.expected_blobs[-1][0]
self.assertEqual(self.sd_hash, sd_hashes[0])
# check should_announce blobs on blob_manager
blob_hashes = yield self.server_blob_manager._get_all_should_announce_blob_hashes()
self.assertEqual(2, len(blob_hashes))
self.assertTrue(self.sd_hash in blob_hashes)
self.assertTrue(expected_blob_hashes[0] in blob_hashes)
def verify_have_blob(blob_hash, blob_size):
d = self.server_blob_manager.get_blob(blob_hash)
d.addCallback(lambda blob: verify_blob_completed(blob, blob_size))
@ -182,12 +208,13 @@ class TestReflector(unittest.TestCase):
return factory.finished_deferred
def verify_blob_completed(blob, blob_size):
self.assertTrue(blob.is_validated())
self.assertTrue(blob.get_is_verified())
self.assertEqual(blob_size, blob.length)
return
d = send_to_server()
d.addCallback(lambda _: verify_data_on_reflector())
d.addCallback(lambda _: verify_blob_on_reflector())
d.addCallback(lambda _: verify_stream_on_reflector())
return d
def test_blob_reflector(self):
@ -213,7 +240,7 @@ class TestReflector(unittest.TestCase):
return factory.finished_deferred
def verify_blob_completed(blob, blob_size):
self.assertTrue(blob.is_validated())
self.assertTrue(blob.get_is_verified())
self.assertEqual(blob_size, blob.length)
d = send_to_server([x[0] for x in self.expected_blobs])
@ -221,6 +248,15 @@ class TestReflector(unittest.TestCase):
return d
def test_blob_reflector_v1(self):
@defer.inlineCallbacks
def verify_stream_on_reflector():
# this protocol should not have any impact on stream info manager
streams = yield self.server_stream_info_manager.get_all_streams()
self.assertEqual(0, len(streams))
# there should be no should announce blobs here
blob_hashes = yield self.server_blob_manager._get_all_should_announce_blob_hashes()
self.assertEqual(0, len(blob_hashes))
def verify_data_on_reflector():
check_blob_ds = []
for blob_hash, blob_size in self.expected_blobs:
@ -244,13 +280,85 @@ class TestReflector(unittest.TestCase):
return factory.finished_deferred
def verify_blob_completed(blob, blob_size):
self.assertTrue(blob.is_validated())
self.assertTrue(blob.get_is_verified())
self.assertEqual(blob_size, blob.length)
d = send_to_server([x[0] for x in self.expected_blobs])
d.addCallback(lambda _: verify_data_on_reflector())
return d
# test case when we reflect blob, and than that same blob
# is reflected as stream
def test_blob_reflect_and_stream(self):
def verify_blob_on_reflector():
check_blob_ds = []
for blob_hash, blob_size in self.expected_blobs:
check_blob_ds.append(verify_have_blob(blob_hash, blob_size))
return defer.DeferredList(check_blob_ds)
@defer.inlineCallbacks
def verify_stream_on_reflector():
# check stream_info_manager has all the right information
streams = yield self.server_stream_info_manager.get_all_streams()
self.assertEqual(1, len(streams))
self.assertEqual(self.stream_hash, streams[0])
blobs = yield self.server_stream_info_manager.get_blobs_for_stream(self.stream_hash)
blob_hashes = [b[0] for b in blobs if b[0] is not None]
expected_blob_hashes = [b[0] for b in self.expected_blobs[:-1] if b[0] is not None]
self.assertEqual(expected_blob_hashes, blob_hashes)
sd_hashes = yield self.server_stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash)
self.assertEqual(1, len(sd_hashes))
expected_sd_hash = self.expected_blobs[-1][0]
self.assertEqual(self.sd_hash, sd_hashes[0])
# check should_announce blobs on blob_manager
blob_hashes = yield self.server_blob_manager._get_all_should_announce_blob_hashes()
self.assertEqual(2, len(blob_hashes))
self.assertTrue(self.sd_hash in blob_hashes)
self.assertTrue(expected_blob_hashes[0] in blob_hashes)
def verify_have_blob(blob_hash, blob_size):
d = self.server_blob_manager.get_blob(blob_hash)
d.addCallback(lambda blob: verify_blob_completed(blob, blob_size))
return d
def send_to_server_as_blobs(blob_hashes_to_send):
factory = reflector.BlobClientFactory(
self.session.blob_manager,
blob_hashes_to_send
)
factory.protocol_version = 0
from twisted.internet import reactor
reactor.connectTCP('localhost', self.port, factory)
return factory.finished_deferred
def send_to_server_as_stream(result):
fake_lbry_file = mocks.FakeLBRYFile(self.session.blob_manager,
self.stream_info_manager,
self.stream_hash)
factory = reflector.ClientFactory(fake_lbry_file)
from twisted.internet import reactor
reactor.connectTCP('localhost', self.port, factory)
return factory.finished_deferred
def verify_blob_completed(blob, blob_size):
self.assertTrue(blob.get_is_verified())
self.assertEqual(blob_size, blob.length)
# Modify this to change which blobs to send
blobs_to_send = self.expected_blobs
d = send_to_server_as_blobs([x[0] for x in self.expected_blobs])
d.addCallback(send_to_server_as_stream)
d.addCallback(lambda _: verify_blob_on_reflector())
d.addCallback(lambda _: verify_stream_on_reflector())
return d
def iv_generator():
iv = 0

View file

@ -53,7 +53,7 @@ class TestBlobRequestHandlerQueries(unittest.TestCase):
def test_blob_unavailable_when_blob_not_validated(self):
blob = mock.Mock()
blob.is_validated.return_value = False
blob.get_is_verified.return_value = False
self.blob_manager.get_blob.return_value = defer.succeed(blob)
query = {
'blob_data_payment_rate': 1.0,
@ -68,7 +68,7 @@ class TestBlobRequestHandlerQueries(unittest.TestCase):
def test_blob_unavailable_when_blob_cannot_be_opened(self):
blob = mock.Mock()
blob.is_validated.return_value = True
blob.get_is_verified.return_value = True
blob.open_for_reading.return_value = None
self.blob_manager.get_blob.return_value = defer.succeed(blob)
query = {
@ -84,7 +84,7 @@ class TestBlobRequestHandlerQueries(unittest.TestCase):
def test_blob_details_are_set_when_all_conditions_are_met(self):
blob = mock.Mock()
blob.is_validated.return_value = True
blob.get_is_verified.return_value = True
blob.open_for_reading.return_value = True
blob.blob_hash = 'DEADBEEF'
blob.length = 42

View file

@ -47,13 +47,13 @@ class BlobManagerTest(unittest.TestCase):
yield self.bm.setup()
blob = yield self.bm.get_blob(blob_hash,len(data))
finished_d, write, cancel =yield blob.open_for_writing(self.peer)
yield write(data)
writer, finished_d = yield blob.open_for_writing(self.peer)
yield writer.write(data)
yield self.bm.blob_completed(blob)
yield self.bm.add_blob_to_upload_history(blob_hash,'test',len(data))
yield self.bm.add_blob_to_upload_history(blob_hash, 'test', len(data))
# check to see if blob is there
self.assertTrue(os.path.isfile(os.path.join(self.blob_dir,blob_hash)))
self.assertTrue(os.path.isfile(os.path.join(self.blob_dir, blob_hash)))
blobs = yield self.bm.get_all_verified_blobs()
self.assertTrue(blob_hash in blobs)
defer.returnValue(blob_hash)
@ -105,7 +105,7 @@ class BlobManagerTest(unittest.TestCase):
# open the last blob
blob = yield self.bm.get_blob(blob_hashes[-1])
finished_d, write, cancel = yield blob.open_for_writing(self.peer)
writer, finished_d = yield blob.open_for_writing(self.peer)
# delete the last blob and check if it still exists
out = yield self.bm.delete_blobs([blob_hash])
@ -114,4 +114,3 @@ class BlobManagerTest(unittest.TestCase):
self.assertTrue(blob_hashes[-1] in blobs)
self.assertTrue(os.path.isfile(os.path.join(self.blob_dir,blob_hashes[-1])))
blob._close_writer(blob.writers[self.peer][0])

View file

@ -0,0 +1,127 @@
from lbrynet.blob import BlobFile
from lbrynet.core.Error import DownloadCanceledError, InvalidDataError
from tests.util import mk_db_and_blob_dir, rm_db_and_blob_dir, random_lbry_hash
from twisted.trial import unittest
from twisted.internet import defer
import os
import time
class BlobFileTest(unittest.TestCase):
def setUp(self):
self.db_dir, self.blob_dir = mk_db_and_blob_dir()
self.fake_content_len = 64
self.fake_content = bytearray('0'*self.fake_content_len)
self.fake_content_hash = '53871b26a08e90cb62142f2a39f0b80de41792322b0ca5602b6eb7b5cf067c49498a7492bb9364bbf90f40c1c5412105'
def tearDown(self):
rm_db_and_blob_dir(self.db_dir, self.blob_dir)
@defer.inlineCallbacks
def test_good_write_and_read(self):
# test a write that should succeed
blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len)
self.assertFalse(blob_file.verified)
writer, finished_d = blob_file.open_for_writing(peer=1)
writer.write(self.fake_content)
writer.close()
out = yield finished_d
self.assertTrue(isinstance(out, BlobFile))
self.assertTrue(out.verified)
self.assertEqual(self.fake_content_len, out.get_length())
# read from the instance used to write to, and verify content
f = blob_file.open_for_reading()
c = f.read()
self.assertEqual(c, self.fake_content)
self.assertFalse(out.is_downloading())
# read from newly declared instance, and verify content
del blob_file
blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len)
self.assertTrue(blob_file.verified)
f = blob_file.open_for_reading()
c = f.read()
self.assertEqual(c, self.fake_content)
@defer.inlineCallbacks
def test_delete(self):
blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len)
writer, finished_d = blob_file.open_for_writing(peer=1)
writer.write(self.fake_content)
out = yield finished_d
out = yield blob_file.delete()
blob_file = BlobFile(self.blob_dir, self.fake_content_hash)
self.assertFalse(blob_file.verified)
@defer.inlineCallbacks
def test_too_much_write(self):
# writing too much data should result in failure
expected_length= 16
content = bytearray('0'*32)
blob_hash = random_lbry_hash()
blob_file = BlobFile(self.blob_dir, blob_hash, expected_length)
writer, finished_d = blob_file.open_for_writing(peer=1)
writer.write(content)
out = yield self.assertFailure(finished_d, InvalidDataError)
@defer.inlineCallbacks
def test_bad_hash(self):
# test a write that should fail because its content's hash
# does not equal the blob_hash
length= 64
content = bytearray('0'*length)
blob_hash = random_lbry_hash()
blob_file = BlobFile(self.blob_dir, blob_hash, length)
writer, finished_d = blob_file.open_for_writing(peer=1)
writer.write(content)
yield self.assertFailure(finished_d, InvalidDataError)
@defer.inlineCallbacks
def test_close_on_incomplete_write(self):
# write all but 1 byte of data,
blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len)
writer, finished_d = blob_file.open_for_writing(peer=1)
writer.write(self.fake_content[:self.fake_content_len-1])
writer.close()
yield self.assertFailure(finished_d, DownloadCanceledError)
# writes after close will throw a IOError exception
with self.assertRaises(IOError):
writer.write(self.fake_content)
# another call to close will do nothing
writer.close()
# file should not exist, since we did not finish write
blob_file_2 = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len)
out = blob_file_2.open_for_reading()
self.assertEqual(None, out)
@defer.inlineCallbacks
def test_multiple_writers(self):
# start first writer and write half way, and then start second writer and write everything
blob_hash = self.fake_content_hash
blob_file = BlobFile(self.blob_dir, blob_hash, self.fake_content_len)
writer_1, finished_d_1 = blob_file.open_for_writing(peer=1)
writer_1.write(self.fake_content[:self.fake_content_len/2])
writer_2, finished_d_2 = blob_file.open_for_writing(peer=2)
writer_2.write(self.fake_content)
out_2 = yield finished_d_2
out_1 = yield self.assertFailure(finished_d_1, DownloadCanceledError)
self.assertTrue(isinstance(out_2, BlobFile))
self.assertTrue(out_2.verified)
self.assertEqual(self.fake_content_len, out_2.get_length())
f = blob_file.open_for_reading()
c = f.read()
self.assertEqual(self.fake_content_len, len(c))
self.assertEqual(bytearray(c), self.fake_content)

View file

@ -1,7 +1,6 @@
from twisted.trial import unittest
from twisted.internet import defer
from lbrynet.cryptstream import CryptBlob
from lbrynet.core.HashBlob import TempBlobCreator
from lbrynet import conf
from tests.mocks import mock_conf_settings

View file

@ -1,26 +1,22 @@
import types
import mock
import json
from twisted.trial import unittest
from twisted.internet import defer, task
from lbryschema.claim import ClaimDict
from lbrynet.core import Session, PaymentRateManager, Wallet
from lbrynet.core.Error import DownloadTimeoutError
from lbrynet.core.Error import DownloadDataTimeout, DownloadSDTimeout
from lbrynet.daemon import Downloader
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier,StreamMetadata
from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier
from lbrynet.file_manager.EncryptedFileStatusReport import EncryptedFileStatusReport
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader, ManagedEncryptedFileDownloaderFactory
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
from lbrynet.daemon.ExchangeRateManager import ExchangeRateManager
from tests.mocks import BlobAvailabilityTracker as DummyBlobAvailabilityTracker
from tests.mocks import ExchangeRateManager as DummyExchangeRateManager
from tests.mocks import BTCLBCFeed, USDBTCFeed
from tests.mocks import mock_conf_settings
class MocDownloader(object):
def __init__(self):
self.finish_deferred = defer.Deferred(None)
@ -106,7 +102,7 @@ class GetStreamTests(unittest.TestCase):
DownloadTimeoutError is raised
"""
def download_sd_blob(self):
raise DownloadTimeoutError(self.file_name)
raise DownloadSDTimeout(self.file_name)
getstream = self.init_getstream_with_mocs()
getstream._initialize = types.MethodType(moc_initialize, getstream)
@ -115,15 +111,14 @@ class GetStreamTests(unittest.TestCase):
getstream.pay_key_fee = types.MethodType(moc_pay_key_fee, getstream)
name='test'
stream_info = None
with self.assertRaises(DownloadTimeoutError):
with self.assertRaises(DownloadSDTimeout):
yield getstream.start(stream_info,name)
self.assertFalse(getstream.pay_key_fee_called)
@defer.inlineCallbacks
def test_timeout(self):
"""
test that timeout (set to 2 here) exception is raised
test that timeout (set to 3 here) exception is raised
when download times out while downloading first blob, and key fee is paid
"""
getstream = self.init_getstream_with_mocs()
@ -136,9 +131,9 @@ class GetStreamTests(unittest.TestCase):
start = getstream.start(stream_info,name)
self.clock.advance(1)
self.clock.advance(1)
with self.assertRaises(DownloadTimeoutError):
self.clock.advance(1)
with self.assertRaises(DownloadDataTimeout):
yield start
self.assertTrue(getstream.downloader.stop_called)
self.assertTrue(getstream.pay_key_fee_called)
@defer.inlineCallbacks
@ -163,21 +158,20 @@ class GetStreamTests(unittest.TestCase):
downloader, f_deferred = yield start
self.assertTrue(getstream.pay_key_fee_called)
@defer.inlineCallbacks
def test_finish_stopped_downloader(self):
"""
test that if we have a stopped downloader, beforfe a blob is downloaded,
start() returns
"""
getstream = self.init_getstream_with_mocs()
getstream._initialize = types.MethodType(moc_initialize, getstream)
getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream)
getstream._download = types.MethodType(moc_download, getstream)
name='test'
stream_info = None
start = getstream.start(stream_info,name)
getstream.downloader.running_status = ManagedEncryptedFileDownloader.STATUS_STOPPED
self.clock.advance(1)
downloader, f_deferred = yield start
# @defer.inlineCallbacks
# def test_finish_stopped_downloader(self):
# """
# test that if we have a stopped downloader, beforfe a blob is downloaded,
# start() returns
# """
# getstream = self.init_getstream_with_mocs()
# getstream._initialize = types.MethodType(moc_initialize, getstream)
# getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream)
# getstream._download = types.MethodType(moc_download, getstream)
# name='test'
# stream_info = None
# start = getstream.start(stream_info,name)
#
# getstream.downloader.running_status = ManagedEncryptedFileDownloader.STATUS_STOPPED
# self.clock.advance(1)
# downloader, f_deferred = yield start