From 0e972ec2ae9914153c815c2a6b0a10707cb2c0e2 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 6 Feb 2019 09:20:21 -0500 Subject: [PATCH 1/6] refactor BlobFile.close to be non async --- lbrynet/blob/blob_file.py | 6 ++++-- lbrynet/blob_exchange/downloader.py | 2 +- lbrynet/extras/daemon/Components.py | 2 +- lbrynet/stream/descriptor.py | 2 +- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index 80702d0aa..5ad21f7f7 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -132,16 +132,18 @@ class BlobFile: with open(self.file_path, 'rb') as handle: return await self.loop.sendfile(writer.transport, handle, count=self.get_length()) - async def close(self): + def close(self): while self.writers: self.writers.pop().finished.cancel() async def delete(self): - await self.close() + self.close() async with self.blob_write_lock: self.saved_verified_blob = False if os.path.isfile(self.file_path): os.remove(self.file_path) + self.verified.clear() + self.finished_writing.clear() def decrypt(self, key: bytes, iv: bytes) -> bytes: """ diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index ada08c471..3bcdc97f0 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -86,7 +86,7 @@ class BlobDownloader: peer, task = self.active_connections.popitem() if task and not task.done(): task.cancel() - await blob.close() + blob.close() log.debug("downloaded %s", blob_hash[:8]) return blob except asyncio.CancelledError: diff --git a/lbrynet/extras/daemon/Components.py b/lbrynet/extras/daemon/Components.py index 934177372..55b6d17c8 100644 --- a/lbrynet/extras/daemon/Components.py +++ b/lbrynet/extras/daemon/Components.py @@ -323,7 +323,7 @@ class BlobComponent(Component): async def stop(self): while self.blob_manager and self.blob_manager.blobs: _, blob = self.blob_manager.blobs.popitem() - await blob.close() + blob.close() async def get_status(self): count = 0 diff --git a/lbrynet/stream/descriptor.py b/lbrynet/stream/descriptor.py index 103f42e18..2e151be2f 100644 --- a/lbrynet/stream/descriptor.py +++ b/lbrynet/stream/descriptor.py @@ -86,7 +86,7 @@ class StreamDescriptor: writer = sd_blob.open_for_writing() writer.write(sd_data) await sd_blob.verified.wait() - await sd_blob.close() + sd_blob.close() return sd_blob @classmethod From c1c6d5bc9911913d1a6c2352a3d25b06906bf660 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 6 Feb 2019 09:29:19 -0500 Subject: [PATCH 2/6] fix deleting partial downloads when stopped and previous streams when updating a publish --- lbrynet/blob/blob_manager.py | 38 ++++++++++++------------ lbrynet/extras/daemon/Daemon.py | 2 +- lbrynet/stream/assembler.py | 51 ++++++++++++++++++-------------- lbrynet/stream/downloader.py | 4 +++ lbrynet/stream/managed_stream.py | 13 ++++---- lbrynet/stream/stream_manager.py | 50 +++++++++++++++++++------------ 6 files changed, 90 insertions(+), 68 deletions(-) diff --git a/lbrynet/blob/blob_manager.py b/lbrynet/blob/blob_manager.py index 44d8fa8b5..f079ccb8f 100644 --- a/lbrynet/blob/blob_manager.py +++ b/lbrynet/blob/blob_manager.py @@ -63,23 +63,23 @@ class BlobFileManager: blob_hashes = await self.storage.get_all_blob_hashes() return self.check_completed_blobs(blob_hashes) - async def delete_blobs(self, blob_hashes: typing.List[str]): - bh_to_delete_from_db = [] - for blob_hash in blob_hashes: - if not blob_hash: - continue - try: - blob = self.get_blob(blob_hash) - await blob.delete() - bh_to_delete_from_db.append(blob_hash) - except Exception as e: - log.warning("Failed to delete blob file. Reason: %s", e) - if blob_hash in self.completed_blob_hashes: - self.completed_blob_hashes.remove(blob_hash) - if blob_hash in self.blobs: - del self.blobs[blob_hash] + async def delete_blob(self, blob_hash: str): try: - await self.storage.delete_blobs_from_db(bh_to_delete_from_db) - except IntegrityError as err: - if str(err) != "FOREIGN KEY constraint failed": - raise err + blob = self.get_blob(blob_hash) + await blob.delete() + except Exception as e: + log.warning("Failed to delete blob file. Reason: %s", e) + if blob_hash in self.completed_blob_hashes: + self.completed_blob_hashes.remove(blob_hash) + if blob_hash in self.blobs: + del self.blobs[blob_hash] + + async def delete_blobs(self, blob_hashes: typing.List[str], delete_from_db: typing.Optional[bool] = True): + bh_to_delete_from_db = [] + await asyncio.gather(*map(self.delete_blob, blob_hashes), loop=self.loop) + if delete_from_db: + try: + await self.storage.delete_blobs_from_db(bh_to_delete_from_db) + except IntegrityError as err: + if str(err) != "FOREIGN KEY constraint failed": + raise err diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index 7f19be3ee..f2f23625a 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -1614,7 +1614,7 @@ class Daemon(metaclass=JSONRPCServerType): await self.stream_manager.start_stream(stream) msg = "Resumed download" elif status == 'stop' and stream.running: - stream.stop_download() + await self.stream_manager.stop_stream(stream) msg = "Stopped download" else: msg = ( diff --git a/lbrynet/stream/assembler.py b/lbrynet/stream/assembler.py index 6b325d248..ae53dcc9a 100644 --- a/lbrynet/stream/assembler.py +++ b/lbrynet/stream/assembler.py @@ -43,7 +43,7 @@ class StreamAssembler: self.written_bytes: int = 0 async def _decrypt_blob(self, blob: 'BlobFile', blob_info: 'BlobInfo', key: str): - if not blob or self.stream_handle.closed: + if not blob or not self.stream_handle or self.stream_handle.closed: return False def _decrypt_and_write(): @@ -86,28 +86,35 @@ class StreamAssembler: self.sd_blob, self.descriptor ) await self.blob_manager.blob_completed(self.sd_blob) - with open(self.output_path, 'wb') as stream_handle: - self.stream_handle = stream_handle - for i, blob_info in enumerate(self.descriptor.blobs[:-1]): - if blob_info.blob_num != i: - log.error("sd blob %s is invalid, cannot assemble stream", self.descriptor.sd_hash) - return - while not stream_handle.closed: - try: - blob = await self.get_blob(blob_info.blob_hash, blob_info.length) - if await self._decrypt_blob(blob, blob_info, self.descriptor.key): - await self.blob_manager.blob_completed(blob) - break - except FileNotFoundError: - log.debug("stream assembler stopped") + written_blobs = None + try: + with open(self.output_path, 'wb') as stream_handle: + self.stream_handle = stream_handle + for i, blob_info in enumerate(self.descriptor.blobs[:-1]): + if blob_info.blob_num != i: + log.error("sd blob %s is invalid, cannot assemble stream", self.descriptor.sd_hash) return - except (ValueError, IOError, OSError): - log.warning("failed to decrypt blob %s for stream %s", blob_info.blob_hash, - self.descriptor.sd_hash) - continue - - self.stream_finished_event.set() - await self.after_finished() + while self.stream_handle and not self.stream_handle.closed: + try: + blob = await self.get_blob(blob_info.blob_hash, blob_info.length) + if await self._decrypt_blob(blob, blob_info, self.descriptor.key): + await self.blob_manager.blob_completed(blob) + written_blobs = i + break + except FileNotFoundError: + log.debug("stream assembler stopped") + return + except (ValueError, IOError, OSError): + log.warning("failed to decrypt blob %s for stream %s", blob_info.blob_hash, + self.descriptor.sd_hash) + continue + finally: + if written_blobs == len(self.descriptor.blobs) - 1: + log.debug("finished decrypting and assembling stream") + self.stream_finished_event.set() + await self.after_finished() + else: + log.debug("stream decryption and assembly did not finish") async def get_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile': return self.blob_manager.get_blob(blob_hash, length) diff --git a/lbrynet/stream/downloader.py b/lbrynet/stream/downloader.py index 3faedefd1..e9142c2c4 100644 --- a/lbrynet/stream/downloader.py +++ b/lbrynet/stream/downloader.py @@ -63,6 +63,10 @@ class StreamDownloader(StreamAssembler): self.fixed_peers_handle.cancel() self.fixed_peers_handle = None self.blob_downloader = None + if self.stream_handle: + if not self.stream_handle.closed: + self.stream_handle.close() + self.stream_handle = None async def get_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile': return await self.blob_downloader.download_blob(blob_hash, length) diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py index e8965318a..ecfac63bd 100644 --- a/lbrynet/stream/managed_stream.py +++ b/lbrynet/stream/managed_stream.py @@ -104,8 +104,12 @@ class ManagedStream: def blobs_remaining(self) -> int: return self.blobs_in_stream - self.blobs_completed + @property + def full_path(self) -> str: + return os.path.join(self.download_directory, os.path.basename(self.file_name)) + def as_dict(self) -> typing.Dict: - full_path = os.path.join(self.download_directory, self.file_name) + full_path = self.full_path if not os.path.isfile(full_path): full_path = None mime_type = guess_media_type(os.path.basename(self.file_name)) @@ -170,12 +174,7 @@ class ManagedStream: def stop_download(self): if self.downloader: self.downloader.stop() - if not self.downloader.stream_finished_event.is_set() and self.downloader.wrote_bytes_event.is_set(): - path = os.path.join(self.download_directory, self.file_name) - if os.path.isfile(path): - os.remove(path) - if not self.finished: - self.update_status(self.STATUS_STOPPED) + self.downloader = None async def upload_to_reflector(self, host: str, port: int) -> typing.List[str]: sent = [] diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index 4adad9639..1b77d572c 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -4,7 +4,7 @@ import typing import binascii import logging import random -from lbrynet.error import ResolveError +from lbrynet.error import ResolveError, InvalidStreamDescriptorError from lbrynet.stream.downloader import StreamDownloader from lbrynet.stream.managed_stream import ManagedStream from lbrynet.schema.claim import ClaimDict @@ -97,8 +97,9 @@ class StreamManager: await asyncio.wait_for(self.loop.create_task(stream.downloader.got_descriptor.wait()), self.config.download_timeout) except asyncio.TimeoutError: - stream.stop_download() - stream.downloader = None + await self.stop_stream(stream) + if stream in self.streams: + self.streams.remove(stream) return False file_name = os.path.basename(stream.downloader.output_path) await self.storage.change_file_download_dir_and_file_name( @@ -108,6 +109,18 @@ class StreamManager: return True return True + async def stop_stream(self, stream: ManagedStream): + stream.stop_download() + if not stream.finished and os.path.isfile(stream.full_path): + try: + os.remove(stream.full_path) + except OSError as err: + log.warning("Failed to delete partial download %s from downloads directory: %s", stream.full_path, + str(err)) + if stream.running: + stream.update_status(ManagedStream.STATUS_STOPPED) + await self.storage.change_file_status(stream.stream_hash, ManagedStream.STATUS_STOPPED) + def make_downloader(self, sd_hash: str, download_directory: str, file_name: str): return StreamDownloader( self.loop, self.config, self.blob_manager, sd_hash, download_directory, file_name @@ -116,13 +129,15 @@ class StreamManager: async def add_stream(self, sd_hash: str, file_name: str, download_directory: str, status: str, claim): sd_blob = self.blob_manager.get_blob(sd_hash) if sd_blob.get_is_verified(): - descriptor = await self.blob_manager.get_stream_descriptor(sd_blob.blob_hash) + try: + descriptor = await self.blob_manager.get_stream_descriptor(sd_blob.blob_hash) + except InvalidStreamDescriptorError as err: + log.warning("Failed to start stream for sd %s - %s", sd_hash, str(err)) + return + downloader = self.make_downloader(descriptor.sd_hash, download_directory, file_name) stream = ManagedStream( - self.loop, self.blob_manager, descriptor, - download_directory, - file_name, - downloader, status, claim + self.loop, self.blob_manager, descriptor, download_directory, file_name, downloader, status, claim ) self.streams.add(stream) self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) @@ -194,18 +209,14 @@ class StreamManager: return stream async def delete_stream(self, stream: ManagedStream, delete_file: typing.Optional[bool] = False): - stream.stop_download() - self.streams.remove(stream) + await self.stop_stream(stream) + if stream in self.streams: + self.streams.remove(stream) + blob_hashes = [stream.sd_hash] + [b.blob_hash for b in stream.descriptor.blobs[:-1]] + await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False) await self.storage.delete_stream(stream.descriptor) - - blob_hashes = [stream.sd_hash] - for blob_info in stream.descriptor.blobs[:-1]: - blob_hashes.append(blob_info.blob_hash) - await self.blob_manager.delete_blobs(blob_hashes) - if delete_file: - path = os.path.join(stream.download_directory, stream.file_name) - if os.path.isfile(path): - os.remove(path) + if delete_file and os.path.isfile(stream.full_path): + os.remove(stream.full_path) def wait_for_stream_finished(self, stream: ManagedStream): async def _wait_for_stream_finished(): @@ -213,6 +224,7 @@ class StreamManager: try: await stream.downloader.stream_finished_event.wait() stream.update_status(ManagedStream.STATUS_FINISHED) + await self.storage.change_file_status(stream.stream_hash, ManagedStream.STATUS_FINISHED) except asyncio.CancelledError: pass task = self.loop.create_task(_wait_for_stream_finished()) From f22b5da170e699c25c64474eb8c8bd66d569a132 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 6 Feb 2019 12:11:22 -0500 Subject: [PATCH 3/6] fix raised error for a peer address mismatch --- lbrynet/dht/protocol/protocol.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lbrynet/dht/protocol/protocol.py b/lbrynet/dht/protocol/protocol.py index 0b31bc1b5..ec3cd8dd0 100644 --- a/lbrynet/dht/protocol/protocol.py +++ b/lbrynet/dht/protocol/protocol.py @@ -474,7 +474,12 @@ class KademliaProtocol(DatagramProtocol): remote_exception = RemoteException(f"{error_datagram.exception_type}({error_datagram.response})") if error_datagram.rpc_id in self.sent_messages: peer, df, request = self.sent_messages.pop(error_datagram.rpc_id) - + if (peer.address, peer.udp_port) != address: + df.set_exception(RemoteException( + f"response from {address[0]}:{address[1]}, " + f"expected {peer.address}:{peer.udp_port}") + ) + return error_msg = f"" \ f"Error sending '{request.method}' to {peer.address}:{peer.udp_port}\n" \ f"Args: {request.args}\n" \ @@ -484,11 +489,6 @@ class KademliaProtocol(DatagramProtocol): else: log.warning("known dht protocol backwards compatibility error with %s:%i (lbrynet v%s)", peer.address, peer.udp_port, old_protocol_errors[error_datagram.response]) - - # reject replies coming from a different address than what we sent our request to - if (peer.address, peer.udp_port) != address: - log.error("node id mismatch in reply") - remote_exception = TimeoutError(peer.node_id) df.set_exception(remote_exception) return else: From ae11c5bb4b006027577f4bd93943164e6facc70c Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 6 Feb 2019 12:18:24 -0500 Subject: [PATCH 4/6] fix setting finished event --- lbrynet/stream/assembler.py | 5 +++-- tests/unit/stream/test_downloader.py | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/lbrynet/stream/assembler.py b/lbrynet/stream/assembler.py index ae53dcc9a..37fc056b3 100644 --- a/lbrynet/stream/assembler.py +++ b/lbrynet/stream/assembler.py @@ -109,12 +109,13 @@ class StreamAssembler: self.descriptor.sd_hash) continue finally: - if written_blobs == len(self.descriptor.blobs) - 1: + if written_blobs == len(self.descriptor.blobs) - 2: log.debug("finished decrypting and assembling stream") self.stream_finished_event.set() await self.after_finished() else: - log.debug("stream decryption and assembly did not finish") + log.debug("stream decryption and assembly did not finish (%i/%i blobs are done)", written_blobs, + len(self.descriptor.blobs) - 2) async def get_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile': return self.blob_manager.get_blob(blob_hash, length) diff --git a/tests/unit/stream/test_downloader.py b/tests/unit/stream/test_downloader.py index bc9654e9b..33f86bf2f 100644 --- a/tests/unit/stream/test_downloader.py +++ b/tests/unit/stream/test_downloader.py @@ -37,15 +37,16 @@ class TestStreamDownloader(BlobExchangeTestBase): return q2, self.loop.create_task(_task()) mock_node.accumulate_peers = mock_accumulate_peers or _mock_accumulate_peers - self.downloader.download(mock_node) await self.downloader.stream_finished_event.wait() + self.assertTrue(self.downloader.stream_handle.closed) + self.assertTrue(os.path.isfile(self.downloader.output_path)) self.downloader.stop() + self.assertIs(self.downloader.stream_handle, None) self.assertTrue(os.path.isfile(self.downloader.output_path)) with open(self.downloader.output_path, 'rb') as f: self.assertEqual(f.read(), self.stream_bytes) await asyncio.sleep(0.01) - self.assertTrue(self.downloader.stream_handle.closed) async def test_transfer_stream(self): await self._test_transfer_stream(10) From 58f6cb71c6591dface8b5578e8267db974d06729 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 6 Feb 2019 13:08:57 -0500 Subject: [PATCH 5/6] test stream manager --- lbrynet/stream/assembler.py | 6 +- lbrynet/stream/stream_manager.py | 2 - .../test_ExchangeRateManager.py | 8 ++ tests/unit/stream/test_stream_manager.py | 106 ++++++++++++++++++ 4 files changed, 118 insertions(+), 4 deletions(-) create mode 100644 tests/unit/stream/test_stream_manager.py diff --git a/lbrynet/stream/assembler.py b/lbrynet/stream/assembler.py index 37fc056b3..a0de756ef 100644 --- a/lbrynet/stream/assembler.py +++ b/lbrynet/stream/assembler.py @@ -56,7 +56,6 @@ class StreamAssembler: self.stream_handle.flush() self.written_bytes += len(_decrypted) log.debug("decrypted %s", blob.blob_hash[:8]) - self.wrote_bytes_event.set() await self.loop.run_in_executor(None, _decrypt_and_write) return True @@ -100,6 +99,9 @@ class StreamAssembler: if await self._decrypt_blob(blob, blob_info, self.descriptor.key): await self.blob_manager.blob_completed(blob) written_blobs = i + if not self.wrote_bytes_event.is_set(): + self.wrote_bytes_event.set() + log.debug("written %i/%i", written_blobs, len(self.descriptor.blobs) - 2) break except FileNotFoundError: log.debug("stream assembler stopped") @@ -114,7 +116,7 @@ class StreamAssembler: self.stream_finished_event.set() await self.after_finished() else: - log.debug("stream decryption and assembly did not finish (%i/%i blobs are done)", written_blobs, + log.debug("stream decryption and assembly did not finish (%i/%i blobs are done)", written_blobs or 0, len(self.descriptor.blobs) - 2) async def get_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile': diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index 1b77d572c..7edd3f8d1 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -279,7 +279,6 @@ class StreamManager: fee_amount: typing.Optional[float] = 0.0, fee_address: typing.Optional[str] = None, should_pay: typing.Optional[bool] = True) -> typing.Optional[ManagedStream]: - log.info("get lbry://%s#%s", claim_info['name'], claim_info['claim_id']) claim = ClaimDict.load_dict(claim_info['value']) sd_hash = claim.source_hash.decode() if sd_hash in self.starting_streams: @@ -306,7 +305,6 @@ class StreamManager: finally: if sd_hash in self.starting_streams: del self.starting_streams[sd_hash] - log.info("returned from get lbry://%s#%s", claim_info['name'], claim_info['claim_id']) def get_stream_by_stream_hash(self, stream_hash: str) -> typing.Optional[ManagedStream]: streams = tuple(filter(lambda stream: stream.stream_hash == stream_hash, self.streams)) diff --git a/tests/unit/lbrynet_daemon/test_ExchangeRateManager.py b/tests/unit/lbrynet_daemon/test_ExchangeRateManager.py index 0b6701edd..3eff3a17d 100644 --- a/tests/unit/lbrynet_daemon/test_ExchangeRateManager.py +++ b/tests/unit/lbrynet_daemon/test_ExchangeRateManager.py @@ -35,6 +35,14 @@ class DummyExchangeRateManager(exchange_rate_manager.ExchangeRateManager): feed.market, rates[feed.market]['spot'], rates[feed.market]['ts']) +def get_dummy_exchange_rate_manager(time): + rates = { + 'BTCLBC': {'spot': 3.0, 'ts': time.time() + 1}, + 'USDBTC': {'spot': 2.0, 'ts': time.time() + 2} + } + return DummyExchangeRateManager([BTCLBCFeed()], rates) + + class FeeFormatTest(unittest.TestCase): def test_fee_created_with_correct_inputs(self): fee_dict = { diff --git a/tests/unit/stream/test_stream_manager.py b/tests/unit/stream/test_stream_manager.py new file mode 100644 index 000000000..6b567d70c --- /dev/null +++ b/tests/unit/stream/test_stream_manager.py @@ -0,0 +1,106 @@ +import os +import binascii +from unittest import mock +import asyncio +import time +from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase +from tests.unit.lbrynet_daemon.test_ExchangeRateManager import get_dummy_exchange_rate_manager + +from lbrynet.extras.wallet.manager import LbryWalletManager +from lbrynet.stream.stream_manager import StreamManager +from lbrynet.stream.descriptor import StreamDescriptor +from lbrynet.dht.node import Node +from lbrynet.schema.claim import ClaimDict + + +def get_mock_node(peer): + def mock_accumulate_peers(q1: asyncio.Queue, q2: asyncio.Queue): + async def _task(): + pass + + q2.put_nowait([peer]) + return q2, asyncio.create_task(_task()) + + mock_node = mock.Mock(spec=Node) + mock_node.accumulate_peers = mock_accumulate_peers + return mock_node + + +def get_mock_wallet(sd_hash, storage): + claim = { + "address": "bYFeMtSL7ARuG1iMpjFyrnTe4oJHSAVNXF", + "amount": "0.1", + "claim_id": "c49566d631226492317d06ad7fdbe1ed32925124", + "claim_sequence": 1, + "decoded_claim": True, + "depth": 1057, + "effective_amount": "0.1", + "has_signature": False, + "height": 514081, + "hex": "", + "name": "33rpm", + "nout": 0, + "permanent_url": "33rpm#c49566d631226492317d06ad7fdbe1ed32925124", + "supports": [], + "txid": "81ac52662af926fdf639d56920069e0f63449d4cde074c61717cb99ddde40e3c", + "value": { + "claimType": "streamType", + "stream": { + "metadata": { + "author": "", + "description": "", + "language": "en", + "license": "None", + "licenseUrl": "", + "nsfw": False, + "preview": "", + "thumbnail": "", + "title": "33rpm", + "version": "_0_1_0" + }, + "source": { + "contentType": "image/png", + "source": sd_hash, + "sourceType": "lbry_sd_hash", + "version": "_0_0_1" + }, + "version": "_0_0_1" + }, + "version": "_0_0_1" + } + } + claim_dict = ClaimDict.load_dict(claim['value']) + claim['hex'] = binascii.hexlify(claim_dict.serialized).decode() + + async def mock_resolve(*args): + await storage.save_claims([claim]) + return { + claim['permanent_url']: claim + } + + mock_wallet = mock.Mock(spec=LbryWalletManager) + mock_wallet.resolve = mock_resolve + return mock_wallet, claim['permanent_url'] + + +class TestStreamManager(BlobExchangeTestBase): + async def asyncSetUp(self): + await super().asyncSetUp() + file_path = os.path.join(self.server_dir, "test_file") + with open(file_path, 'wb') as f: + f.write(os.urandom(20000000)) + descriptor = await StreamDescriptor.create_stream(self.loop, self.server_blob_manager.blob_dir, file_path) + self.sd_hash = descriptor.calculate_sd_hash() + self.mock_wallet, self.uri = get_mock_wallet(self.sd_hash, self.client_storage) + self.stream_manager = StreamManager(self.loop, self.client_config, self.client_blob_manager, self.mock_wallet, + self.client_storage, get_mock_node(self.server_from_client)) + self.exchange_rate_manager = get_dummy_exchange_rate_manager(time) + + async def test_download_from_uri(self): + self.assertSetEqual(self.stream_manager.streams, set()) + stream = await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) + self.assertTrue(stream.running) + self.assertFalse(stream.finished) + await stream.downloader.stream_finished_event.wait() + self.assertTrue(stream.finished) + self.assertFalse(stream.running) From dfdc8eda008638f2c04d3eba14de3d73fd0ce8a7 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 6 Feb 2019 13:32:50 -0500 Subject: [PATCH 6/6] improve test, fix stream_finished_event --- lbrynet/stream/assembler.py | 2 +- lbrynet/stream/stream_manager.py | 1 - tests/unit/stream/test_stream_manager.py | 35 +++++++++++++++++++++++- 3 files changed, 35 insertions(+), 3 deletions(-) diff --git a/lbrynet/stream/assembler.py b/lbrynet/stream/assembler.py index a0de756ef..e7cf23c2b 100644 --- a/lbrynet/stream/assembler.py +++ b/lbrynet/stream/assembler.py @@ -113,8 +113,8 @@ class StreamAssembler: finally: if written_blobs == len(self.descriptor.blobs) - 2: log.debug("finished decrypting and assembling stream") - self.stream_finished_event.set() await self.after_finished() + self.stream_finished_event.set() else: log.debug("stream decryption and assembly did not finish (%i/%i blobs are done)", written_blobs or 0, len(self.descriptor.blobs) - 2) diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index 7edd3f8d1..e587b6e07 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -224,7 +224,6 @@ class StreamManager: try: await stream.downloader.stream_finished_event.wait() stream.update_status(ManagedStream.STATUS_FINISHED) - await self.storage.change_file_status(stream.stream_hash, ManagedStream.STATUS_FINISHED) except asyncio.CancelledError: pass task = self.loop.create_task(_wait_for_stream_finished()) diff --git a/tests/unit/stream/test_stream_manager.py b/tests/unit/stream/test_stream_manager.py index 6b567d70c..e8fa67670 100644 --- a/tests/unit/stream/test_stream_manager.py +++ b/tests/unit/stream/test_stream_manager.py @@ -96,11 +96,44 @@ class TestStreamManager(BlobExchangeTestBase): self.client_storage, get_mock_node(self.server_from_client)) self.exchange_rate_manager = get_dummy_exchange_rate_manager(time) - async def test_download_from_uri(self): + async def test_download_stop_resume_delete(self): self.assertSetEqual(self.stream_manager.streams, set()) stream = await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) + stream_hash = stream.stream_hash + self.assertSetEqual(self.stream_manager.streams, {stream}) self.assertTrue(stream.running) self.assertFalse(stream.finished) + self.assertTrue(os.path.isfile(os.path.join(self.client_dir, "test_file"))) + stored_status = await self.client_storage.run_and_return_one_or_none( + "select status from file where stream_hash=?", stream_hash + ) + self.assertEqual(stored_status, "running") + + await self.stream_manager.stop_stream(stream) + + self.assertFalse(stream.finished) + self.assertFalse(stream.running) + self.assertFalse(os.path.isfile(os.path.join(self.client_dir, "test_file"))) + stored_status = await self.client_storage.run_and_return_one_or_none( + "select status from file where stream_hash=?", stream_hash + ) + self.assertEqual(stored_status, "stopped") + + await self.stream_manager.start_stream(stream) await stream.downloader.stream_finished_event.wait() + await asyncio.sleep(0.01) self.assertTrue(stream.finished) self.assertFalse(stream.running) + self.assertTrue(os.path.isfile(os.path.join(self.client_dir, "test_file"))) + stored_status = await self.client_storage.run_and_return_one_or_none( + "select status from file where stream_hash=?", stream_hash + ) + self.assertEqual(stored_status, "finished") + + await self.stream_manager.delete_stream(stream, True) + self.assertSetEqual(self.stream_manager.streams, set()) + self.assertFalse(os.path.isfile(os.path.join(self.client_dir, "test_file"))) + stored_status = await self.client_storage.run_and_return_one_or_none( + "select status from file where stream_hash=?", stream_hash + ) + self.assertEqual(stored_status, None)