Compare commits

..

No commits in common. "master" and "v0.113.0" have entirely different histories.

30 changed files with 150 additions and 168 deletions

View file

@ -1,5 +1,5 @@
name: ci
on: ["push", "pull_request", "workflow_dispatch"]
on: ["push", "pull_request"]
jobs:
@ -7,12 +7,12 @@ jobs:
name: lint
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
- uses: actions/checkout@v2
- uses: actions/setup-python@v1
with:
python-version: '3.9'
python-version: '3.7'
- name: extract pip cache
uses: actions/cache@v4
uses: actions/cache@v2
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }}
@ -27,25 +27,25 @@ jobs:
matrix:
os:
- ubuntu-20.04
- macos-13
- windows-2022
- macos-latest
- windows-latest
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
- uses: actions/checkout@v2
- uses: actions/setup-python@v1
with:
python-version: '3.9'
python-version: '3.7'
- name: set pip cache dir
shell: bash
run: echo "PIP_CACHE_DIR=$(pip cache dir)" >> $GITHUB_ENV
id: pip-cache
run: echo "::set-output name=dir::$(pip cache dir)"
- name: extract pip cache
uses: actions/cache@v4
uses: actions/cache@v2
with:
path: ${{ env.PIP_CACHE_DIR }}
path: ${{ steps.pip-cache.outputs.dir }}
key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }}
restore-keys: ${{ runner.os }}-pip-
- id: os-name
uses: ASzc/change-string-case-action@v6
uses: ASzc/change-string-case-action@v1
with:
string: ${{ runner.os }}
- run: python -m pip install --user --upgrade pip wheel
@ -93,16 +93,16 @@ jobs:
uses: elastic/elastic-github-actions/elasticsearch@master
with:
stack-version: 7.12.1
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
- uses: actions/checkout@v2
- uses: actions/setup-python@v1
with:
python-version: '3.9'
python-version: '3.7'
- if: matrix.test == 'other'
run: |
sudo apt-get update
sudo apt-get install -y --no-install-recommends ffmpeg
- name: extract pip cache
uses: actions/cache@v4
uses: actions/cache@v2
with:
path: ./.tox
key: tox-integration-${{ matrix.test }}-${{ hashFiles('setup.py') }}
@ -138,29 +138,29 @@ jobs:
strategy:
matrix:
os:
- ubuntu-20.04
- macos-13
- windows-2022
- ubuntu-18.04
- macos-latest
- windows-latest
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
- uses: actions/checkout@v2
- uses: actions/setup-python@v1
with:
python-version: '3.9'
python-version: '3.7'
- id: os-name
uses: ASzc/change-string-case-action@v6
uses: ASzc/change-string-case-action@v1
with:
string: ${{ runner.os }}
- name: set pip cache dir
shell: bash
run: echo "PIP_CACHE_DIR=$(pip cache dir)" >> $GITHUB_ENV
id: pip-cache
run: echo "::set-output name=dir::$(pip cache dir)"
- name: extract pip cache
uses: actions/cache@v4
uses: actions/cache@v2
with:
path: ${{ env.PIP_CACHE_DIR }}
path: ${{ steps.pip-cache.outputs.dir }}
key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }}
restore-keys: ${{ runner.os }}-pip-
- run: pip install pyinstaller==6.0
- run: pip install pyinstaller==4.6
- run: pip install -e .
- if: startsWith(github.ref, 'refs/tags/v')
run: python docker/set_build.py
@ -175,7 +175,7 @@ jobs:
pip install pywin32==301
pyinstaller --additional-hooks-dir=scripts/. --icon=icons/lbry256.ico --onefile --name lbrynet lbry/extras/cli.py
dist/lbrynet.exe --version
- uses: actions/upload-artifact@v4
- uses: actions/upload-artifact@v2
with:
name: lbrynet-${{ steps.os-name.outputs.lowercase }}
path: dist/
@ -186,8 +186,8 @@ jobs:
needs: ["build"]
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v4
- uses: actions/download-artifact@v4
- uses: actions/checkout@v1
- uses: actions/download-artifact@v2
- name: upload binaries
env:
GITHUB_TOKEN: ${{ secrets.RELEASE_API_TOKEN }}

View file

@ -54,16 +54,3 @@ The documentation for the API can be found [here](https://lbry.tech/api/sdk).
Daemon defaults, ports, and other settings are documented [here](https://lbry.tech/resources/daemon-settings).
Settings can be configured using a daemon-settings.yml file. An example can be found [here](https://github.com/lbryio/lbry-sdk/blob/master/example_daemon_settings.yml).
## Troubleshooting
### Error: unsupported hash type 'ripemd160'
If you see this error when running `lbrynet start`, it likely means your OpenSSL or Python installation is missing RIPEMD160 support.
To fix it:
- Ensure you are using a version of Python compiled with OpenSSL support.
- On Ubuntu/Debian, try: `sudo apt install libssl-dev` and reinstall Python.
- Alternatively, use a precompiled Python version (e.g., via `pyenv install 3.8.18` after `libssl-dev` is installed).

View file

@ -1,2 +1,2 @@
__version__ = "0.114.0"
__version__ = "0.113.0"
version = tuple(map(int, __version__.split('.'))) # pylint: disable=invalid-name

View file

@ -64,7 +64,7 @@ class BlobDownloader:
self.scores[peer] = bytes_received / elapsed if bytes_received and elapsed else 1
async def new_peer_or_finished(self):
active_tasks = list(self.active_connections.values()) + [asyncio.create_task(asyncio.sleep(1))]
active_tasks = list(self.active_connections.values()) + [asyncio.sleep(1)]
await asyncio.wait(active_tasks, return_when='FIRST_COMPLETED')
def cleanup_active(self):

View file

@ -15,18 +15,11 @@ def sha512(x):
def ripemd160(x):
""" Simple wrapper of hashlib ripemd160. """
try:
h = hashlib.new('ripemd160')
except ValueError as e:
raise RuntimeError(
"Your Python/OpenSSL installation does not support RIPEMD160. "
"Try reinstalling Python with full OpenSSL support."
) from e
h = hashlib.new('ripemd160')
h.update(x)
return h.digest()
def double_sha256(x):
""" SHA-256 of SHA-256, as used extensively in bitcoin. """
return sha256(sha256(x))

View file

@ -42,6 +42,8 @@ class BlobAnnouncer:
log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
except Exception as err:
self.announcements_sent_metric.labels(peers=0, error=True).inc()
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise err
log.warning("error announcing %s: %s", blob_hash[:8], str(err))
async def _announce(self, batch_size: typing.Optional[int] = 10):

View file

@ -37,7 +37,7 @@ class Component(metaclass=ComponentType):
def running(self):
return self._running
async def get_status(self): # pylint: disable=no-self-use
async def get_status(self):
return
async def start(self):

View file

@ -118,7 +118,7 @@ class ComponentManager:
component._setup() for component in stage if not component.running
]
if needing_start:
await asyncio.wait(map(asyncio.create_task, needing_start))
await asyncio.wait(needing_start)
self.started.set()
async def stop(self):
@ -131,7 +131,7 @@ class ComponentManager:
component._stop() for component in stage if component.running
]
if needing_stop:
await asyncio.wait(map(asyncio.create_task, needing_stop))
await asyncio.wait(needing_stop)
def all_components_running(self, *component_names):
"""

View file

@ -374,7 +374,7 @@ class FileManagerComponent(Component):
log.info('Done setting up file manager')
async def stop(self):
await self.file_manager.stop()
self.file_manager.stop()
class BackgroundDownloaderComponent(Component):
@ -560,6 +560,8 @@ class UPnPComponent(Component):
self.upnp = await UPnP.discover(loop=self.component_manager.loop)
log.info("found upnp gateway: %s", self.upnp.gateway.manufacturer_string)
except Exception as err:
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise
log.warning("upnp discovery failed: %s", err)
self.upnp = None

View file

@ -614,8 +614,7 @@ class Daemon(metaclass=JSONRPCServerType):
content_type='application/json'
)
@staticmethod
async def handle_metrics_get_request(request: web.Request):
async def handle_metrics_get_request(self, request: web.Request):
try:
return web.Response(
text=prom_generate_latest().decode(),

View file

@ -80,6 +80,8 @@ class MarketFeed:
self.rate = ExchangeRate(self.market, rate, int(time.time()))
self.last_check = time.time()
return self.rate
except asyncio.CancelledError:
raise
except asyncio.TimeoutError:
log.warning("Timed out fetching exchange rate from %s.", self.name)
except json.JSONDecodeError as e:

View file

@ -793,7 +793,7 @@ class SQLiteStorage(SQLiteMixin):
await self.db.run(_save_claims)
if update_file_callbacks:
await asyncio.wait(map(asyncio.create_task, update_file_callbacks))
await asyncio.wait(update_file_callbacks)
if claim_id_to_supports:
await self.save_supports(claim_id_to_supports)

View file

@ -50,10 +50,10 @@ class FileManager:
await manager.started.wait()
self.started.set()
async def stop(self):
def stop(self):
for manager in self.source_managers.values():
# fixme: pop or not?
await manager.stop()
manager.stop()
self.started.clear()
@cache_concurrent
@ -99,6 +99,8 @@ class FileManager:
except asyncio.TimeoutError:
raise ResolveTimeoutError(uri)
except Exception as err:
if isinstance(err, asyncio.CancelledError):
raise
log.exception("Unexpected error resolving stream:")
raise ResolveError(f"Unexpected error resolving stream: {str(err)}")
if 'error' in resolved_result:
@ -247,7 +249,7 @@ class FileManager:
except asyncio.TimeoutError:
error = DownloadDataTimeoutError(stream.sd_hash)
raise error
except (Exception, asyncio.CancelledError) as err: # forgive data timeout, don't delete stream
except Exception as err: # forgive data timeout, don't delete stream
expected = (DownloadSDTimeoutError, DownloadDataTimeoutError, InsufficientFundsError,
KeyFeeAboveMaxAllowedError, ResolveError, InvalidStreamURLError)
if isinstance(err, expected):

View file

@ -67,7 +67,7 @@ class ManagedDownloadSource:
async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None):
raise NotImplementedError()
async def stop_tasks(self):
def stop_tasks(self):
raise NotImplementedError()
def set_claim(self, claim_info: typing.Dict, claim: 'Claim'):

View file

@ -59,11 +59,11 @@ class SourceManager:
def add(self, source: ManagedDownloadSource):
self._sources[source.identifier] = source
async def remove(self, source: ManagedDownloadSource):
def remove(self, source: ManagedDownloadSource):
if source.identifier not in self._sources:
return
self._sources.pop(source.identifier)
await source.stop_tasks()
source.stop_tasks()
async def initialize_from_database(self):
raise NotImplementedError()
@ -72,10 +72,10 @@ class SourceManager:
await self.initialize_from_database()
self.started.set()
async def stop(self):
def stop(self):
while self._sources:
_, source = self._sources.popitem()
await source.stop_tasks()
source.stop_tasks()
self.started.clear()
async def create(self, file_path: str, key: Optional[bytes] = None,
@ -83,7 +83,7 @@ class SourceManager:
raise NotImplementedError()
async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
await self.remove(source)
self.remove(source)
if delete_file and source.output_file_exists:
os.remove(source.full_path)

View file

@ -23,7 +23,6 @@ class BackgroundDownloader:
except ValueError:
return
except asyncio.CancelledError:
log.debug("Cancelled background downloader")
raise
except Exception:
log.error("Unexpected download error on background downloader")

View file

@ -191,7 +191,7 @@ class ManagedStream(ManagedDownloadSource):
Stop any running save/stream tasks as well as the downloader and update the status in the database
"""
await self.stop_tasks()
self.stop_tasks()
if (finished and self.status != self.STATUS_FINISHED) or self.status == self.STATUS_RUNNING:
await self.update_status(self.STATUS_FINISHED if finished else self.STATUS_STOPPED)
@ -279,7 +279,7 @@ class ManagedStream(ManagedDownloadSource):
log.info("finished saving file for lbry://%s#%s (sd hash %s...) -> %s", self.claim_name, self.claim_id,
self.sd_hash[:6], self.full_path)
await self.blob_manager.storage.set_saved_file(self.stream_hash)
except (Exception, asyncio.CancelledError) as err:
except Exception as err:
if os.path.isfile(output_path):
log.warning("removing incomplete download %s for %s", output_path, self.sd_hash)
os.remove(output_path)
@ -324,13 +324,12 @@ class ManagedStream(ManagedDownloadSource):
await asyncio.wait_for(self.started_writing.wait(), self.config.download_timeout)
except asyncio.TimeoutError:
log.warning("timeout starting to write data for lbry://%s#%s", self.claim_name, self.claim_id)
await self.stop_tasks()
self.stop_tasks()
await self.update_status(ManagedStream.STATUS_STOPPED)
async def stop_tasks(self):
def stop_tasks(self):
if self.file_output_task and not self.file_output_task.done():
self.file_output_task.cancel()
await asyncio.gather(self.file_output_task, return_exceptions=True)
self.file_output_task = None
while self.streaming_responses:
req, response = self.streaming_responses.pop()
@ -367,7 +366,7 @@ class ManagedStream(ManagedDownloadSource):
return sent
except ConnectionError:
return sent
except (OSError, Exception, asyncio.CancelledError) as err:
except (OSError, Exception) as err:
if isinstance(err, asyncio.CancelledError):
log.warning("stopped uploading %s#%s to reflector", self.claim_name, self.claim_id)
elif isinstance(err, OSError):

View file

@ -164,6 +164,8 @@ class StreamManager(SourceManager):
async def reflect_streams(self):
try:
return await self._reflect_streams()
except asyncio.CancelledError:
raise
except Exception:
log.exception("reflector task encountered an unexpected error!")
@ -196,8 +198,8 @@ class StreamManager(SourceManager):
await super().start()
self.re_reflect_task = self.loop.create_task(self.reflect_streams())
async def stop(self):
await super().stop()
def stop(self):
super().stop()
if self.resume_saving_task and not self.resume_saving_task.done():
self.resume_saving_task.cancel()
if self.re_reflect_task and not self.re_reflect_task.done():
@ -224,8 +226,7 @@ class StreamManager(SourceManager):
)
return task
@staticmethod
async def _retriable_reflect_stream(stream, host, port):
async def _retriable_reflect_stream(self, stream, host, port):
sent = await stream.upload_to_reflector(host, port)
while not stream.is_fully_reflected and stream.reflector_progress > 0 and len(sent) > 0:
stream.reflector_progress = 0
@ -260,7 +261,7 @@ class StreamManager(SourceManager):
return
if source.identifier in self.running_reflector_uploads:
self.running_reflector_uploads[source.identifier].cancel()
await source.stop_tasks()
source.stop_tasks()
if source.identifier in self.streams:
del self.streams[source.identifier]
blob_hashes = [source.identifier] + [b.blob_hash for b in source.descriptor.blobs[:-1]]

View file

@ -74,7 +74,7 @@ class TorrentSource(ManagedDownloadSource):
def bt_infohash(self):
return self.identifier
async def stop_tasks(self):
def stop_tasks(self):
pass
@property
@ -118,8 +118,8 @@ class TorrentManager(SourceManager):
async def start(self):
await super().start()
async def stop(self):
await super().stop()
def stop(self):
super().stop()
log.info("finished stopping the torrent manager")
async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):

View file

@ -141,7 +141,7 @@ class CoinSelector:
_) -> List[OutputEffectiveAmountEstimator]:
""" Accumulate UTXOs at random until there is enough to cover the target. """
target = self.target + self.cost_of_change
self.random.shuffle(txos, random=self.random.random) # pylint: disable=deprecated-argument
self.random.shuffle(txos, self.random.random)
selection = []
amount = 0
for coin in txos:

View file

@ -329,10 +329,10 @@ class Ledger(metaclass=LedgerRegistry):
async def start(self):
if not os.path.exists(self.path):
os.mkdir(self.path)
await asyncio.wait(map(asyncio.create_task, [
await asyncio.wait([
self.db.open(),
self.headers.open()
]))
])
fully_synced = self.on_ready.first
asyncio.create_task(self.network.start())
await self.network.on_connected.first
@ -466,9 +466,9 @@ class Ledger(metaclass=LedgerRegistry):
async def subscribe_accounts(self):
if self.network.is_connected and self.accounts:
log.info("Subscribe to %i accounts", len(self.accounts))
await asyncio.wait(map(asyncio.create_task, [
await asyncio.wait([
self.subscribe_account(a) for a in self.accounts
]))
])
async def subscribe_account(self, account: Account):
for address_manager in account.address_managers.values():
@ -938,7 +938,9 @@ class Ledger(metaclass=LedgerRegistry):
"%d change addresses (gap: %d), %d channels, %d certificates and %d claims. ",
account.id, balance, total_receiving, account.receiving.gap, total_change,
account.change.gap, channel_count, len(account.channel_keys), claim_count)
except Exception:
except Exception as err:
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise
log.exception(
'Failed to display wallet state, please file issue '
'for this bug along with the traceback you see below:')
@ -961,7 +963,9 @@ class Ledger(metaclass=LedgerRegistry):
claim_ids = [p.purchased_claim_id for p in purchases]
try:
resolved, _, _, _ = await self.claim_search([], claim_ids=claim_ids)
except Exception:
except Exception as err:
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise
log.exception("Resolve failed while looking up purchased claim ids:")
resolved = []
lookup = {claim.claim_id: claim for claim in resolved}
@ -1041,7 +1045,9 @@ class Ledger(metaclass=LedgerRegistry):
claim_ids = collection.claim.collection.claims.ids[offset:page_size + offset]
try:
resolve_results, _, _, _ = await self.claim_search([], claim_ids=claim_ids)
except Exception:
except Exception as err:
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise
log.exception("Resolve failed while looking up collection claim ids:")
return []
claims = []

View file

@ -99,13 +99,13 @@ class ClientSession(BaseClientSession):
self._concurrency.release()
async def ensure_server_version(self, required=None, timeout=3):
required = required or self.network.PROTOCOL_MAX_VERSION
required = required or self.network.PROTOCOL_VERSION
response = await asyncio.wait_for(
self.send_request('server.version', [self.network.CLIENT_NAME, required]), timeout=timeout
self.send_request('server.version', [__version__, required]), timeout=timeout
)
if tuple(int(piece) for piece in response[1].split(".")) >= self.network.PROTOCOL_MIN_VERSION:
return response
raise IncompatibleWalletServerError(*self.server)
if tuple(int(piece) for piece in response[0].split(".")) < self.network.MINIMUM_REQUIRED:
raise IncompatibleWalletServerError(*self.server)
return response
async def keepalive_loop(self, timeout=3, max_idle=60):
try:
@ -117,7 +117,7 @@ class ClientSession(BaseClientSession):
)
else:
await asyncio.sleep(max(0, max_idle - (now - self.last_send)))
except (Exception, asyncio.CancelledError) as err:
except Exception as err:
if isinstance(err, asyncio.CancelledError):
log.info("closing connection to %s:%i", *self.server)
else:
@ -149,11 +149,8 @@ class ClientSession(BaseClientSession):
class Network:
CLIENT_VERSION = __version__
CLIENT_NAME = "LBRY SDK " + CLIENT_VERSION
PROTOCOL_MIN_VERSION = (0, 65, 0)
PROTOCOL_MAX_VERSION = __version__
PROTOCOL_VERSION = __version__
MINIMUM_REQUIRED = (0, 65, 0)
def __init__(self, ledger):
self.ledger = ledger
@ -217,7 +214,7 @@ class Network:
def loop_task_done_callback(f):
try:
f.result()
except (Exception, asyncio.CancelledError):
except Exception:
if self.running:
log.exception("wallet server connection loop crashed")
@ -315,8 +312,7 @@ class Network:
sleep_delay = 30
while self.running:
await asyncio.wait(
map(asyncio.create_task, [asyncio.sleep(30), self._urgent_need_reconnect.wait()]),
return_when=asyncio.FIRST_COMPLETED
[asyncio.sleep(30), self._urgent_need_reconnect.wait()], return_when=asyncio.FIRST_COMPLETED
)
if self._urgent_need_reconnect.is_set():
sleep_delay = 30
@ -342,7 +338,7 @@ class Network:
try:
if not self._urgent_need_reconnect.is_set():
await asyncio.wait(
[self._keepalive_task, asyncio.create_task(self._urgent_need_reconnect.wait())],
[self._keepalive_task, self._urgent_need_reconnect.wait()],
return_when=asyncio.FIRST_COMPLETED
)
else:

View file

@ -264,7 +264,8 @@ class SPVNode:
await self.server.start()
except Exception as e:
self.stopped = True
log.exception("failed to start spv node")
if not isinstance(e, asyncio.CancelledError):
log.exception("failed to start spv node")
raise e
async def stop(self, cleanup=True):

View file

@ -28,7 +28,6 @@ disable=
no-else-return,
cyclic-import,
missing-docstring,
consider-using-f-string,
duplicate-code,
expression-not-assigned,
inconsistent-return-statements,

View file

@ -18,7 +18,7 @@ setup(
long_description_content_type="text/markdown",
keywords="lbry protocol media",
license='MIT',
python_requires='>=3.8',
python_requires='>=3.7',
packages=find_packages(exclude=('tests',)),
zip_safe=False,
entry_points={
@ -36,7 +36,7 @@ setup(
'distro==1.4.0',
'base58==1.0.0',
'cffi==1.13.2',
'cryptography==3.4.7',
'cryptography==2.5',
'protobuf==3.17.2',
'prometheus_client==0.7.1',
'ecdsa==0.13.3',
@ -50,7 +50,7 @@ setup(
],
extras_require={
'lint': [
'pylint==2.13.9'
'pylint==2.10.0'
],
'test': [
'coverage',

View file

@ -61,14 +61,16 @@ def mock_network_loop(loop: asyncio.AbstractEventLoop,
dht_network[from_addr] = protocol
return transport, protocol
mock_sock = mock.Mock(spec=socket.socket)
mock_sock.setsockopt = lambda *_: None
mock_sock.bind = lambda *_: None
mock_sock.setblocking = lambda *_: None
mock_sock.getsockname = lambda: "0.0.0.0"
mock_sock.getpeername = lambda: ""
mock_sock.close = lambda: None
mock_sock.type = socket.SOCK_DGRAM
mock_sock.fileno = lambda: 7
loop.create_datagram_endpoint = create_datagram_endpoint
yield
with mock.patch('socket.socket') as mock_socket:
mock_sock = mock.Mock(spec=socket.socket)
mock_sock.setsockopt = lambda *_: None
mock_sock.bind = lambda *_: None
mock_sock.setblocking = lambda *_: None
mock_sock.getsockname = lambda: "0.0.0.0"
mock_sock.getpeername = lambda: ""
mock_sock.close = lambda: None
mock_sock.type = socket.SOCK_DGRAM
mock_sock.fileno = lambda: 7
mock_socket.return_value = mock_sock
loop.create_datagram_endpoint = create_datagram_endpoint
yield

View file

@ -354,7 +354,7 @@ class FileCommands(CommandTestCase):
await self.daemon.jsonrpc_get('lbry://foo')
with open(original_path, 'wb') as handle:
handle.write(b'some other stuff was there instead')
await self.daemon.file_manager.stop()
self.daemon.file_manager.stop()
await self.daemon.file_manager.start()
await asyncio.wait_for(self.wait_files_to_complete(), timeout=5) # if this hangs, file didn't get set completed
# check that internal state got through up to the file list API
@ -382,7 +382,8 @@ class FileCommands(CommandTestCase):
resp = await self.out(self.daemon.jsonrpc_get('lbry://foo', timeout=2))
self.assertNotIn('error', resp)
self.assertTrue(os.path.isfile(path))
await self.daemon.file_manager.stop()
self.daemon.file_manager.stop()
await asyncio.sleep(0.01) # FIXME: this sleep should not be needed
self.assertFalse(os.path.isfile(path))
async def test_incomplete_downloads_retry(self):
@ -477,7 +478,7 @@ class FileCommands(CommandTestCase):
# restart the daemon and make sure the fee is still there
await self.daemon.file_manager.stop()
self.daemon.file_manager.stop()
await self.daemon.file_manager.start()
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].content_fee.raw, raw_content_fee)

View file

@ -3,9 +3,7 @@ import hashlib
import aiohttp
import aiohttp.web
import asyncio
import contextlib
from lbry.file.source import ManagedDownloadSource
from lbry.utils import aiohttp_request
from lbry.blob.blob_file import MAX_BLOB_SIZE
from lbry.testcase import CommandTestCase
@ -23,7 +21,7 @@ def get_random_bytes(n: int) -> bytes:
class RangeRequests(CommandTestCase):
async def _restart_stream_manager(self):
await self.daemon.file_manager.stop()
self.daemon.file_manager.stop()
await self.daemon.file_manager.start()
return
@ -354,21 +352,14 @@ class RangeRequests(CommandTestCase):
path = stream.full_path
self.assertIsNotNone(path)
if wait_for_start_writing:
with contextlib.suppress(asyncio.CancelledError):
await stream.started_writing.wait()
await stream.started_writing.wait()
self.assertTrue(os.path.isfile(path))
await self.daemon.file_manager.stop()
# while stopped, we get no response to query and no file is present
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'], [])
self.assertEqual(os.path.isfile(path), stream.status == ManagedDownloadSource.STATUS_FINISHED)
await self.daemon.file_manager.start()
# after restart, we get a response to query and same file path
await self._restart_stream_manager()
stream = (await self.daemon.jsonrpc_file_list())['items'][0]
self.assertIsNotNone(stream.full_path)
self.assertEqual(stream.full_path, path)
self.assertFalse(os.path.isfile(path))
if wait_for_start_writing:
with contextlib.suppress(asyncio.CancelledError):
await stream.started_writing.wait()
await stream.started_writing.wait()
self.assertTrue(os.path.isfile(path))
async def test_file_save_stop_before_finished_streaming_only_wait_for_start(self):

View file

@ -7,28 +7,28 @@ from lbry.extras.daemon.exchange_rate_manager import (
class TestExchangeRateManager(AsyncioTestCase):
# async def test_exchange_rate_manager(self):
# manager = ExchangeRateManager(FEEDS)
# manager.start()
# self.addCleanup(manager.stop)
# for feed in manager.market_feeds:
# self.assertFalse(feed.is_online)
# self.assertIsNone(feed.rate)
# await manager.wait()
# failures = set()
# for feed in manager.market_feeds:
# if feed.is_online:
# self.assertIsInstance(feed.rate, ExchangeRate)
# else:
# failures.add(feed.name)
# self.assertFalse(feed.has_rate)
# self.assertLessEqual(len(failures), 1, f"feed failures: {failures}. Please check exchange rate feeds!")
# lbc = manager.convert_currency('USD', 'LBC', Decimal('1.0'))
# self.assertGreaterEqual(lbc, 2.0)
# self.assertLessEqual(lbc, 120.0)
# lbc = manager.convert_currency('BTC', 'LBC', Decimal('0.01'))
# self.assertGreaterEqual(lbc, 1_000)
# self.assertLessEqual(lbc, 30_000)
async def test_exchange_rate_manager(self):
manager = ExchangeRateManager(FEEDS)
manager.start()
self.addCleanup(manager.stop)
for feed in manager.market_feeds:
self.assertFalse(feed.is_online)
self.assertIsNone(feed.rate)
await manager.wait()
failures = set()
for feed in manager.market_feeds:
if feed.is_online:
self.assertIsInstance(feed.rate, ExchangeRate)
else:
failures.add(feed.name)
self.assertFalse(feed.has_rate)
self.assertLessEqual(len(failures), 1, f"feed failures: {failures}. Please check exchange rate feeds!")
lbc = manager.convert_currency('USD', 'LBC', Decimal('1.0'))
self.assertGreaterEqual(lbc, 2.0)
self.assertLessEqual(lbc, 120.0)
lbc = manager.convert_currency('BTC', 'LBC', Decimal('0.01'))
self.assertGreaterEqual(lbc, 1_000)
self.assertLessEqual(lbc, 30_000)
async def test_it_handles_feed_being_offline(self):
class FakeFeed(MarketFeed):

View file

@ -424,7 +424,7 @@ class TestStreamManager(BlobExchangeTestBase):
self.assertIsNone(stream.full_path)
self.assertEqual(0, stream.written_bytes)
await self.stream_manager.stop()
self.stream_manager.stop()
await self.stream_manager.start()
self.assertEqual(1, len(self.stream_manager.streams))
stream = list(self.stream_manager.streams.values())[0]
@ -449,7 +449,7 @@ class TestStreamManager(BlobExchangeTestBase):
stream = await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager)
await stream.finished_writing.wait()
await asyncio.sleep(0)
await self.stream_manager.stop()
self.stream_manager.stop()
self.client_blob_manager.stop()
# partial removal, only sd blob is missing.
# in this case, we recover the sd blob while the other blobs are kept untouched as 'finished'