Compare commits

..

No commits in common. "master" and "v0.112.0" have entirely different histories.

37 changed files with 173 additions and 229 deletions

View file

@ -1,18 +1,18 @@
name: ci name: ci
on: ["push", "pull_request", "workflow_dispatch"] on: ["push", "pull_request"]
jobs: jobs:
lint: lint:
name: lint name: lint
runs-on: ubuntu-20.04 runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v2
- uses: actions/setup-python@v5 - uses: actions/setup-python@v1
with: with:
python-version: '3.9' python-version: '3.7'
- name: extract pip cache - name: extract pip cache
uses: actions/cache@v4 uses: actions/cache@v2
with: with:
path: ~/.cache/pip path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }} key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }}
@ -26,26 +26,26 @@ jobs:
strategy: strategy:
matrix: matrix:
os: os:
- ubuntu-20.04 - ubuntu-latest
- macos-13 - macos-latest
- windows-2022 - windows-latest
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v2
- uses: actions/setup-python@v5 - uses: actions/setup-python@v1
with: with:
python-version: '3.9' python-version: '3.7'
- name: set pip cache dir - name: set pip cache dir
shell: bash id: pip-cache
run: echo "PIP_CACHE_DIR=$(pip cache dir)" >> $GITHUB_ENV run: echo "::set-output name=dir::$(pip cache dir)"
- name: extract pip cache - name: extract pip cache
uses: actions/cache@v4 uses: actions/cache@v2
with: with:
path: ${{ env.PIP_CACHE_DIR }} path: ${{ steps.pip-cache.outputs.dir }}
key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }} key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }}
restore-keys: ${{ runner.os }}-pip- restore-keys: ${{ runner.os }}-pip-
- id: os-name - id: os-name
uses: ASzc/change-string-case-action@v6 uses: ASzc/change-string-case-action@v1
with: with:
string: ${{ runner.os }} string: ${{ runner.os }}
- run: python -m pip install --user --upgrade pip wheel - run: python -m pip install --user --upgrade pip wheel
@ -72,7 +72,7 @@ jobs:
tests-integration: tests-integration:
name: "tests / integration" name: "tests / integration"
runs-on: ubuntu-20.04 runs-on: ubuntu-latest
strategy: strategy:
matrix: matrix:
test: test:
@ -93,16 +93,16 @@ jobs:
uses: elastic/elastic-github-actions/elasticsearch@master uses: elastic/elastic-github-actions/elasticsearch@master
with: with:
stack-version: 7.12.1 stack-version: 7.12.1
- uses: actions/checkout@v4 - uses: actions/checkout@v2
- uses: actions/setup-python@v5 - uses: actions/setup-python@v1
with: with:
python-version: '3.9' python-version: '3.7'
- if: matrix.test == 'other' - if: matrix.test == 'other'
run: | run: |
sudo apt-get update sudo apt-get update
sudo apt-get install -y --no-install-recommends ffmpeg sudo apt-get install -y --no-install-recommends ffmpeg
- name: extract pip cache - name: extract pip cache
uses: actions/cache@v4 uses: actions/cache@v2
with: with:
path: ./.tox path: ./.tox
key: tox-integration-${{ matrix.test }}-${{ hashFiles('setup.py') }} key: tox-integration-${{ matrix.test }}-${{ hashFiles('setup.py') }}
@ -123,7 +123,7 @@ jobs:
coverage: coverage:
needs: ["tests-unit", "tests-integration"] needs: ["tests-unit", "tests-integration"]
runs-on: ubuntu-20.04 runs-on: ubuntu-latest
steps: steps:
- name: finalize coverage report submission - name: finalize coverage report submission
env: env:
@ -138,29 +138,29 @@ jobs:
strategy: strategy:
matrix: matrix:
os: os:
- ubuntu-20.04 - ubuntu-18.04
- macos-13 - macos-latest
- windows-2022 - windows-latest
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v2
- uses: actions/setup-python@v5 - uses: actions/setup-python@v1
with: with:
python-version: '3.9' python-version: '3.7'
- id: os-name - id: os-name
uses: ASzc/change-string-case-action@v6 uses: ASzc/change-string-case-action@v1
with: with:
string: ${{ runner.os }} string: ${{ runner.os }}
- name: set pip cache dir - name: set pip cache dir
shell: bash id: pip-cache
run: echo "PIP_CACHE_DIR=$(pip cache dir)" >> $GITHUB_ENV run: echo "::set-output name=dir::$(pip cache dir)"
- name: extract pip cache - name: extract pip cache
uses: actions/cache@v4 uses: actions/cache@v2
with: with:
path: ${{ env.PIP_CACHE_DIR }} path: ${{ steps.pip-cache.outputs.dir }}
key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }} key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }}
restore-keys: ${{ runner.os }}-pip- restore-keys: ${{ runner.os }}-pip-
- run: pip install pyinstaller==6.0 - run: pip install pyinstaller==4.4
- run: pip install -e . - run: pip install -e .
- if: startsWith(github.ref, 'refs/tags/v') - if: startsWith(github.ref, 'refs/tags/v')
run: python docker/set_build.py run: python docker/set_build.py
@ -175,7 +175,7 @@ jobs:
pip install pywin32==301 pip install pywin32==301
pyinstaller --additional-hooks-dir=scripts/. --icon=icons/lbry256.ico --onefile --name lbrynet lbry/extras/cli.py pyinstaller --additional-hooks-dir=scripts/. --icon=icons/lbry256.ico --onefile --name lbrynet lbry/extras/cli.py
dist/lbrynet.exe --version dist/lbrynet.exe --version
- uses: actions/upload-artifact@v4 - uses: actions/upload-artifact@v2
with: with:
name: lbrynet-${{ steps.os-name.outputs.lowercase }} name: lbrynet-${{ steps.os-name.outputs.lowercase }}
path: dist/ path: dist/
@ -184,10 +184,10 @@ jobs:
name: "release" name: "release"
if: startsWith(github.ref, 'refs/tags/v') if: startsWith(github.ref, 'refs/tags/v')
needs: ["build"] needs: ["build"]
runs-on: ubuntu-20.04 runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v1
- uses: actions/download-artifact@v4 - uses: actions/download-artifact@v2
- name: upload binaries - name: upload binaries
env: env:
GITHUB_TOKEN: ${{ secrets.RELEASE_API_TOKEN }} GITHUB_TOKEN: ${{ secrets.RELEASE_API_TOKEN }}

View file

@ -7,7 +7,7 @@ on:
jobs: jobs:
release: release:
name: "slack notification" name: "slack notification"
runs-on: ubuntu-20.04 runs-on: ubuntu-latest
steps: steps:
- uses: LoveToKnow/slackify-markdown-action@v1.0.0 - uses: LoveToKnow/slackify-markdown-action@v1.0.0
id: markdown id: markdown

View file

@ -54,16 +54,3 @@ The documentation for the API can be found [here](https://lbry.tech/api/sdk).
Daemon defaults, ports, and other settings are documented [here](https://lbry.tech/resources/daemon-settings). Daemon defaults, ports, and other settings are documented [here](https://lbry.tech/resources/daemon-settings).
Settings can be configured using a daemon-settings.yml file. An example can be found [here](https://github.com/lbryio/lbry-sdk/blob/master/example_daemon_settings.yml). Settings can be configured using a daemon-settings.yml file. An example can be found [here](https://github.com/lbryio/lbry-sdk/blob/master/example_daemon_settings.yml).
## Troubleshooting
### Error: unsupported hash type 'ripemd160'
If you see this error when running `lbrynet start`, it likely means your OpenSSL or Python installation is missing RIPEMD160 support.
To fix it:
- Ensure you are using a version of Python compiled with OpenSSL support.
- On Ubuntu/Debian, try: `sudo apt install libssl-dev` and reinstall Python.
- Alternatively, use a precompiled Python version (e.g., via `pyenv install 3.8.18` after `libssl-dev` is installed).

View file

@ -1,2 +1,2 @@
__version__ = "0.114.0" __version__ = "0.112.0"
version = tuple(map(int, __version__.split('.'))) # pylint: disable=invalid-name version = tuple(map(int, __version__.split('.'))) # pylint: disable=invalid-name

View file

@ -64,7 +64,7 @@ class BlobDownloader:
self.scores[peer] = bytes_received / elapsed if bytes_received and elapsed else 1 self.scores[peer] = bytes_received / elapsed if bytes_received and elapsed else 1
async def new_peer_or_finished(self): async def new_peer_or_finished(self):
active_tasks = list(self.active_connections.values()) + [asyncio.create_task(asyncio.sleep(1))] active_tasks = list(self.active_connections.values()) + [asyncio.sleep(1)]
await asyncio.wait(active_tasks, return_when='FIRST_COMPLETED') await asyncio.wait(active_tasks, return_when='FIRST_COMPLETED')
def cleanup_active(self): def cleanup_active(self):

View file

@ -688,9 +688,6 @@ class Config(CLIConfig):
tracker_servers = Servers("BitTorrent-compatible (BEP15) UDP trackers for helping P2P discovery", [ tracker_servers = Servers("BitTorrent-compatible (BEP15) UDP trackers for helping P2P discovery", [
('tracker.lbry.com', 9252), ('tracker.lbry.com', 9252),
('tracker.lbry.grin.io', 9252), ('tracker.lbry.grin.io', 9252),
('tracker.lbry.pigg.es', 9252),
('tracker.lizard.technology', 9252),
('s1.lbry.network', 9252),
]) ])
lbryum_servers = Servers("SPV wallet servers", [ lbryum_servers = Servers("SPV wallet servers", [
@ -703,20 +700,14 @@ class Config(CLIConfig):
('spv17.lbry.com', 50001), ('spv17.lbry.com', 50001),
('spv18.lbry.com', 50001), ('spv18.lbry.com', 50001),
('spv19.lbry.com', 50001), ('spv19.lbry.com', 50001),
('hub.lbry.grin.io', 50001),
('hub.lizard.technology', 50001),
('s1.lbry.network', 50001),
]) ])
known_dht_nodes = Servers("Known nodes for bootstrapping connection to the DHT", [ known_dht_nodes = Servers("Known nodes for bootstrapping connection to the DHT", [
('dht.lbry.grin.io', 4444), # Grin ('dht.lbry.grin.io', 4444), # Grin
('dht.lbry.madiator.com', 4444), # Madiator ('dht.lbry.madiator.com', 4444), # Madiator
('dht.lbry.pigg.es', 4444), # Pigges
('lbrynet1.lbry.com', 4444), # US EAST ('lbrynet1.lbry.com', 4444), # US EAST
('lbrynet2.lbry.com', 4444), # US WEST ('lbrynet2.lbry.com', 4444), # US WEST
('lbrynet3.lbry.com', 4444), # EU ('lbrynet3.lbry.com', 4444), # EU
('lbrynet4.lbry.com', 4444), # ASIA ('lbrynet4.lbry.com', 4444) # ASIA
('dht.lizard.technology', 4444), # Jack
('s2.lbry.network', 4444),
]) ])
# blockchain # blockchain

View file

@ -15,18 +15,11 @@ def sha512(x):
def ripemd160(x): def ripemd160(x):
""" Simple wrapper of hashlib ripemd160. """ """ Simple wrapper of hashlib ripemd160. """
try: h = hashlib.new('ripemd160')
h = hashlib.new('ripemd160')
except ValueError as e:
raise RuntimeError(
"Your Python/OpenSSL installation does not support RIPEMD160. "
"Try reinstalling Python with full OpenSSL support."
) from e
h.update(x) h.update(x)
return h.digest() return h.digest()
def double_sha256(x): def double_sha256(x):
""" SHA-256 of SHA-256, as used extensively in bitcoin. """ """ SHA-256 of SHA-256, as used extensively in bitcoin. """
return sha256(sha256(x)) return sha256(sha256(x))

View file

@ -42,6 +42,8 @@ class BlobAnnouncer:
log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers) log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
except Exception as err: except Exception as err:
self.announcements_sent_metric.labels(peers=0, error=True).inc() self.announcements_sent_metric.labels(peers=0, error=True).inc()
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise err
log.warning("error announcing %s: %s", blob_hash[:8], str(err)) log.warning("error announcing %s: %s", blob_hash[:8], str(err))
async def _announce(self, batch_size: typing.Optional[int] = 10): async def _announce(self, batch_size: typing.Optional[int] = 10):

View file

@ -8,7 +8,6 @@ from prometheus_client import Gauge
from lbry import utils from lbry import utils
from lbry.dht import constants from lbry.dht import constants
from lbry.dht.error import RemoteException
from lbry.dht.protocol.distance import Distance from lbry.dht.protocol.distance import Distance
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from lbry.dht.peer import KademliaPeer, PeerManager from lbry.dht.peer import KademliaPeer, PeerManager
@ -396,7 +395,7 @@ class TreeRoutingTable:
try: try:
await probe(to_replace) await probe(to_replace)
return False return False
except (asyncio.TimeoutError, RemoteException): except asyncio.TimeoutError:
log.debug("Replacing dead contact in bucket %i: %s:%i with %s:%i ", bucket_index, log.debug("Replacing dead contact in bucket %i: %s:%i with %s:%i ", bucket_index,
to_replace.address, to_replace.udp_port, peer.address, peer.udp_port) to_replace.address, to_replace.udp_port, peer.address, peer.udp_port)
if to_replace in self.buckets[bucket_index]: if to_replace in self.buckets[bucket_index]:

View file

@ -37,7 +37,7 @@ class Component(metaclass=ComponentType):
def running(self): def running(self):
return self._running return self._running
async def get_status(self): # pylint: disable=no-self-use async def get_status(self):
return return
async def start(self): async def start(self):

View file

@ -118,7 +118,7 @@ class ComponentManager:
component._setup() for component in stage if not component.running component._setup() for component in stage if not component.running
] ]
if needing_start: if needing_start:
await asyncio.wait(map(asyncio.create_task, needing_start)) await asyncio.wait(needing_start)
self.started.set() self.started.set()
async def stop(self): async def stop(self):
@ -131,7 +131,7 @@ class ComponentManager:
component._stop() for component in stage if component.running component._stop() for component in stage if component.running
] ]
if needing_stop: if needing_stop:
await asyncio.wait(map(asyncio.create_task, needing_stop)) await asyncio.wait(needing_stop)
def all_components_running(self, *component_names): def all_components_running(self, *component_names):
""" """

View file

@ -374,7 +374,7 @@ class FileManagerComponent(Component):
log.info('Done setting up file manager') log.info('Done setting up file manager')
async def stop(self): async def stop(self):
await self.file_manager.stop() self.file_manager.stop()
class BackgroundDownloaderComponent(Component): class BackgroundDownloaderComponent(Component):
@ -560,6 +560,8 @@ class UPnPComponent(Component):
self.upnp = await UPnP.discover(loop=self.component_manager.loop) self.upnp = await UPnP.discover(loop=self.component_manager.loop)
log.info("found upnp gateway: %s", self.upnp.gateway.manufacturer_string) log.info("found upnp gateway: %s", self.upnp.gateway.manufacturer_string)
except Exception as err: except Exception as err:
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise
log.warning("upnp discovery failed: %s", err) log.warning("upnp discovery failed: %s", err)
self.upnp = None self.upnp = None

View file

@ -614,8 +614,7 @@ class Daemon(metaclass=JSONRPCServerType):
content_type='application/json' content_type='application/json'
) )
@staticmethod async def handle_metrics_get_request(self, request: web.Request):
async def handle_metrics_get_request(request: web.Request):
try: try:
return web.Response( return web.Response(
text=prom_generate_latest().decode(), text=prom_generate_latest().decode(),

View file

@ -80,6 +80,8 @@ class MarketFeed:
self.rate = ExchangeRate(self.market, rate, int(time.time())) self.rate = ExchangeRate(self.market, rate, int(time.time()))
self.last_check = time.time() self.last_check = time.time()
return self.rate return self.rate
except asyncio.CancelledError:
raise
except asyncio.TimeoutError: except asyncio.TimeoutError:
log.warning("Timed out fetching exchange rate from %s.", self.name) log.warning("Timed out fetching exchange rate from %s.", self.name)
except json.JSONDecodeError as e: except json.JSONDecodeError as e:

View file

@ -793,7 +793,7 @@ class SQLiteStorage(SQLiteMixin):
await self.db.run(_save_claims) await self.db.run(_save_claims)
if update_file_callbacks: if update_file_callbacks:
await asyncio.wait(map(asyncio.create_task, update_file_callbacks)) await asyncio.wait(update_file_callbacks)
if claim_id_to_supports: if claim_id_to_supports:
await self.save_supports(claim_id_to_supports) await self.save_supports(claim_id_to_supports)

View file

@ -50,10 +50,10 @@ class FileManager:
await manager.started.wait() await manager.started.wait()
self.started.set() self.started.set()
async def stop(self): def stop(self):
for manager in self.source_managers.values(): for manager in self.source_managers.values():
# fixme: pop or not? # fixme: pop or not?
await manager.stop() manager.stop()
self.started.clear() self.started.clear()
@cache_concurrent @cache_concurrent
@ -99,6 +99,8 @@ class FileManager:
except asyncio.TimeoutError: except asyncio.TimeoutError:
raise ResolveTimeoutError(uri) raise ResolveTimeoutError(uri)
except Exception as err: except Exception as err:
if isinstance(err, asyncio.CancelledError):
raise
log.exception("Unexpected error resolving stream:") log.exception("Unexpected error resolving stream:")
raise ResolveError(f"Unexpected error resolving stream: {str(err)}") raise ResolveError(f"Unexpected error resolving stream: {str(err)}")
if 'error' in resolved_result: if 'error' in resolved_result:
@ -247,7 +249,7 @@ class FileManager:
except asyncio.TimeoutError: except asyncio.TimeoutError:
error = DownloadDataTimeoutError(stream.sd_hash) error = DownloadDataTimeoutError(stream.sd_hash)
raise error raise error
except (Exception, asyncio.CancelledError) as err: # forgive data timeout, don't delete stream except Exception as err: # forgive data timeout, don't delete stream
expected = (DownloadSDTimeoutError, DownloadDataTimeoutError, InsufficientFundsError, expected = (DownloadSDTimeoutError, DownloadDataTimeoutError, InsufficientFundsError,
KeyFeeAboveMaxAllowedError, ResolveError, InvalidStreamURLError) KeyFeeAboveMaxAllowedError, ResolveError, InvalidStreamURLError)
if isinstance(err, expected): if isinstance(err, expected):

View file

@ -67,7 +67,7 @@ class ManagedDownloadSource:
async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None): async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None):
raise NotImplementedError() raise NotImplementedError()
async def stop_tasks(self): def stop_tasks(self):
raise NotImplementedError() raise NotImplementedError()
def set_claim(self, claim_info: typing.Dict, claim: 'Claim'): def set_claim(self, claim_info: typing.Dict, claim: 'Claim'):

View file

@ -59,11 +59,11 @@ class SourceManager:
def add(self, source: ManagedDownloadSource): def add(self, source: ManagedDownloadSource):
self._sources[source.identifier] = source self._sources[source.identifier] = source
async def remove(self, source: ManagedDownloadSource): def remove(self, source: ManagedDownloadSource):
if source.identifier not in self._sources: if source.identifier not in self._sources:
return return
self._sources.pop(source.identifier) self._sources.pop(source.identifier)
await source.stop_tasks() source.stop_tasks()
async def initialize_from_database(self): async def initialize_from_database(self):
raise NotImplementedError() raise NotImplementedError()
@ -72,10 +72,10 @@ class SourceManager:
await self.initialize_from_database() await self.initialize_from_database()
self.started.set() self.started.set()
async def stop(self): def stop(self):
while self._sources: while self._sources:
_, source = self._sources.popitem() _, source = self._sources.popitem()
await source.stop_tasks() source.stop_tasks()
self.started.clear() self.started.clear()
async def create(self, file_path: str, key: Optional[bytes] = None, async def create(self, file_path: str, key: Optional[bytes] = None,
@ -83,7 +83,7 @@ class SourceManager:
raise NotImplementedError() raise NotImplementedError()
async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False): async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
await self.remove(source) self.remove(source)
if delete_file and source.output_file_exists: if delete_file and source.output_file_exists:
os.remove(source.full_path) os.remove(source.full_path)

View file

@ -23,7 +23,6 @@ class BackgroundDownloader:
except ValueError: except ValueError:
return return
except asyncio.CancelledError: except asyncio.CancelledError:
log.debug("Cancelled background downloader")
raise raise
except Exception: except Exception:
log.error("Unexpected download error on background downloader") log.error("Unexpected download error on background downloader")

View file

@ -191,7 +191,7 @@ class ManagedStream(ManagedDownloadSource):
Stop any running save/stream tasks as well as the downloader and update the status in the database Stop any running save/stream tasks as well as the downloader and update the status in the database
""" """
await self.stop_tasks() self.stop_tasks()
if (finished and self.status != self.STATUS_FINISHED) or self.status == self.STATUS_RUNNING: if (finished and self.status != self.STATUS_FINISHED) or self.status == self.STATUS_RUNNING:
await self.update_status(self.STATUS_FINISHED if finished else self.STATUS_STOPPED) await self.update_status(self.STATUS_FINISHED if finished else self.STATUS_STOPPED)
@ -279,7 +279,7 @@ class ManagedStream(ManagedDownloadSource):
log.info("finished saving file for lbry://%s#%s (sd hash %s...) -> %s", self.claim_name, self.claim_id, log.info("finished saving file for lbry://%s#%s (sd hash %s...) -> %s", self.claim_name, self.claim_id,
self.sd_hash[:6], self.full_path) self.sd_hash[:6], self.full_path)
await self.blob_manager.storage.set_saved_file(self.stream_hash) await self.blob_manager.storage.set_saved_file(self.stream_hash)
except (Exception, asyncio.CancelledError) as err: except Exception as err:
if os.path.isfile(output_path): if os.path.isfile(output_path):
log.warning("removing incomplete download %s for %s", output_path, self.sd_hash) log.warning("removing incomplete download %s for %s", output_path, self.sd_hash)
os.remove(output_path) os.remove(output_path)
@ -324,13 +324,12 @@ class ManagedStream(ManagedDownloadSource):
await asyncio.wait_for(self.started_writing.wait(), self.config.download_timeout) await asyncio.wait_for(self.started_writing.wait(), self.config.download_timeout)
except asyncio.TimeoutError: except asyncio.TimeoutError:
log.warning("timeout starting to write data for lbry://%s#%s", self.claim_name, self.claim_id) log.warning("timeout starting to write data for lbry://%s#%s", self.claim_name, self.claim_id)
await self.stop_tasks() self.stop_tasks()
await self.update_status(ManagedStream.STATUS_STOPPED) await self.update_status(ManagedStream.STATUS_STOPPED)
async def stop_tasks(self): def stop_tasks(self):
if self.file_output_task and not self.file_output_task.done(): if self.file_output_task and not self.file_output_task.done():
self.file_output_task.cancel() self.file_output_task.cancel()
await asyncio.gather(self.file_output_task, return_exceptions=True)
self.file_output_task = None self.file_output_task = None
while self.streaming_responses: while self.streaming_responses:
req, response = self.streaming_responses.pop() req, response = self.streaming_responses.pop()
@ -367,7 +366,7 @@ class ManagedStream(ManagedDownloadSource):
return sent return sent
except ConnectionError: except ConnectionError:
return sent return sent
except (OSError, Exception, asyncio.CancelledError) as err: except (OSError, Exception) as err:
if isinstance(err, asyncio.CancelledError): if isinstance(err, asyncio.CancelledError):
log.warning("stopped uploading %s#%s to reflector", self.claim_name, self.claim_id) log.warning("stopped uploading %s#%s to reflector", self.claim_name, self.claim_id)
elif isinstance(err, OSError): elif isinstance(err, OSError):

View file

@ -164,6 +164,8 @@ class StreamManager(SourceManager):
async def reflect_streams(self): async def reflect_streams(self):
try: try:
return await self._reflect_streams() return await self._reflect_streams()
except asyncio.CancelledError:
raise
except Exception: except Exception:
log.exception("reflector task encountered an unexpected error!") log.exception("reflector task encountered an unexpected error!")
@ -196,8 +198,8 @@ class StreamManager(SourceManager):
await super().start() await super().start()
self.re_reflect_task = self.loop.create_task(self.reflect_streams()) self.re_reflect_task = self.loop.create_task(self.reflect_streams())
async def stop(self): def stop(self):
await super().stop() super().stop()
if self.resume_saving_task and not self.resume_saving_task.done(): if self.resume_saving_task and not self.resume_saving_task.done():
self.resume_saving_task.cancel() self.resume_saving_task.cancel()
if self.re_reflect_task and not self.re_reflect_task.done(): if self.re_reflect_task and not self.re_reflect_task.done():
@ -224,8 +226,7 @@ class StreamManager(SourceManager):
) )
return task return task
@staticmethod async def _retriable_reflect_stream(self, stream, host, port):
async def _retriable_reflect_stream(stream, host, port):
sent = await stream.upload_to_reflector(host, port) sent = await stream.upload_to_reflector(host, port)
while not stream.is_fully_reflected and stream.reflector_progress > 0 and len(sent) > 0: while not stream.is_fully_reflected and stream.reflector_progress > 0 and len(sent) > 0:
stream.reflector_progress = 0 stream.reflector_progress = 0
@ -260,7 +261,7 @@ class StreamManager(SourceManager):
return return
if source.identifier in self.running_reflector_uploads: if source.identifier in self.running_reflector_uploads:
self.running_reflector_uploads[source.identifier].cancel() self.running_reflector_uploads[source.identifier].cancel()
await source.stop_tasks() source.stop_tasks()
if source.identifier in self.streams: if source.identifier in self.streams:
del self.streams[source.identifier] del self.streams[source.identifier]
blob_hashes = [source.identifier] + [b.blob_hash for b in source.descriptor.blobs[:-1]] blob_hashes = [source.identifier] + [b.blob_hash for b in source.descriptor.blobs[:-1]]

View file

@ -74,7 +74,7 @@ class TorrentSource(ManagedDownloadSource):
def bt_infohash(self): def bt_infohash(self):
return self.identifier return self.identifier
async def stop_tasks(self): def stop_tasks(self):
pass pass
@property @property
@ -118,8 +118,8 @@ class TorrentManager(SourceManager):
async def start(self): async def start(self):
await super().start() await super().start()
async def stop(self): def stop(self):
await super().stop() super().stop()
log.info("finished stopping the torrent manager") log.info("finished stopping the torrent manager")
async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False): async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):

View file

@ -141,7 +141,7 @@ class CoinSelector:
_) -> List[OutputEffectiveAmountEstimator]: _) -> List[OutputEffectiveAmountEstimator]:
""" Accumulate UTXOs at random until there is enough to cover the target. """ """ Accumulate UTXOs at random until there is enough to cover the target. """
target = self.target + self.cost_of_change target = self.target + self.cost_of_change
self.random.shuffle(txos, random=self.random.random) # pylint: disable=deprecated-argument self.random.shuffle(txos, self.random.random)
selection = [] selection = []
amount = 0 amount = 0
for coin in txos: for coin in txos:

View file

@ -329,10 +329,10 @@ class Ledger(metaclass=LedgerRegistry):
async def start(self): async def start(self):
if not os.path.exists(self.path): if not os.path.exists(self.path):
os.mkdir(self.path) os.mkdir(self.path)
await asyncio.wait(map(asyncio.create_task, [ await asyncio.wait([
self.db.open(), self.db.open(),
self.headers.open() self.headers.open()
])) ])
fully_synced = self.on_ready.first fully_synced = self.on_ready.first
asyncio.create_task(self.network.start()) asyncio.create_task(self.network.start())
await self.network.on_connected.first await self.network.on_connected.first
@ -466,9 +466,9 @@ class Ledger(metaclass=LedgerRegistry):
async def subscribe_accounts(self): async def subscribe_accounts(self):
if self.network.is_connected and self.accounts: if self.network.is_connected and self.accounts:
log.info("Subscribe to %i accounts", len(self.accounts)) log.info("Subscribe to %i accounts", len(self.accounts))
await asyncio.wait(map(asyncio.create_task, [ await asyncio.wait([
self.subscribe_account(a) for a in self.accounts self.subscribe_account(a) for a in self.accounts
])) ])
async def subscribe_account(self, account: Account): async def subscribe_account(self, account: Account):
for address_manager in account.address_managers.values(): for address_manager in account.address_managers.values():
@ -938,7 +938,9 @@ class Ledger(metaclass=LedgerRegistry):
"%d change addresses (gap: %d), %d channels, %d certificates and %d claims. ", "%d change addresses (gap: %d), %d channels, %d certificates and %d claims. ",
account.id, balance, total_receiving, account.receiving.gap, total_change, account.id, balance, total_receiving, account.receiving.gap, total_change,
account.change.gap, channel_count, len(account.channel_keys), claim_count) account.change.gap, channel_count, len(account.channel_keys), claim_count)
except Exception: except Exception as err:
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise
log.exception( log.exception(
'Failed to display wallet state, please file issue ' 'Failed to display wallet state, please file issue '
'for this bug along with the traceback you see below:') 'for this bug along with the traceback you see below:')
@ -961,7 +963,9 @@ class Ledger(metaclass=LedgerRegistry):
claim_ids = [p.purchased_claim_id for p in purchases] claim_ids = [p.purchased_claim_id for p in purchases]
try: try:
resolved, _, _, _ = await self.claim_search([], claim_ids=claim_ids) resolved, _, _, _ = await self.claim_search([], claim_ids=claim_ids)
except Exception: except Exception as err:
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise
log.exception("Resolve failed while looking up purchased claim ids:") log.exception("Resolve failed while looking up purchased claim ids:")
resolved = [] resolved = []
lookup = {claim.claim_id: claim for claim in resolved} lookup = {claim.claim_id: claim for claim in resolved}
@ -1041,7 +1045,9 @@ class Ledger(metaclass=LedgerRegistry):
claim_ids = collection.claim.collection.claims.ids[offset:page_size + offset] claim_ids = collection.claim.collection.claims.ids[offset:page_size + offset]
try: try:
resolve_results, _, _, _ = await self.claim_search([], claim_ids=claim_ids) resolve_results, _, _, _ = await self.claim_search([], claim_ids=claim_ids)
except Exception: except Exception as err:
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise
log.exception("Resolve failed while looking up collection claim ids:") log.exception("Resolve failed while looking up collection claim ids:")
return [] return []
claims = [] claims = []

View file

@ -99,13 +99,13 @@ class ClientSession(BaseClientSession):
self._concurrency.release() self._concurrency.release()
async def ensure_server_version(self, required=None, timeout=3): async def ensure_server_version(self, required=None, timeout=3):
required = required or self.network.PROTOCOL_MAX_VERSION required = required or self.network.PROTOCOL_VERSION
response = await asyncio.wait_for( response = await asyncio.wait_for(
self.send_request('server.version', [self.network.CLIENT_NAME, required]), timeout=timeout self.send_request('server.version', [__version__, required]), timeout=timeout
) )
if tuple(int(piece) for piece in response[1].split(".")) >= self.network.PROTOCOL_MIN_VERSION: if tuple(int(piece) for piece in response[0].split(".")) < self.network.MINIMUM_REQUIRED:
return response raise IncompatibleWalletServerError(*self.server)
raise IncompatibleWalletServerError(*self.server) return response
async def keepalive_loop(self, timeout=3, max_idle=60): async def keepalive_loop(self, timeout=3, max_idle=60):
try: try:
@ -117,7 +117,7 @@ class ClientSession(BaseClientSession):
) )
else: else:
await asyncio.sleep(max(0, max_idle - (now - self.last_send))) await asyncio.sleep(max(0, max_idle - (now - self.last_send)))
except (Exception, asyncio.CancelledError) as err: except Exception as err:
if isinstance(err, asyncio.CancelledError): if isinstance(err, asyncio.CancelledError):
log.info("closing connection to %s:%i", *self.server) log.info("closing connection to %s:%i", *self.server)
else: else:
@ -149,11 +149,8 @@ class ClientSession(BaseClientSession):
class Network: class Network:
CLIENT_VERSION = __version__ PROTOCOL_VERSION = __version__
CLIENT_NAME = "LBRY SDK " + CLIENT_VERSION MINIMUM_REQUIRED = (0, 65, 0)
PROTOCOL_MIN_VERSION = (0, 65, 0)
PROTOCOL_MAX_VERSION = __version__
def __init__(self, ledger): def __init__(self, ledger):
self.ledger = ledger self.ledger = ledger
@ -217,7 +214,7 @@ class Network:
def loop_task_done_callback(f): def loop_task_done_callback(f):
try: try:
f.result() f.result()
except (Exception, asyncio.CancelledError): except Exception:
if self.running: if self.running:
log.exception("wallet server connection loop crashed") log.exception("wallet server connection loop crashed")
@ -315,8 +312,7 @@ class Network:
sleep_delay = 30 sleep_delay = 30
while self.running: while self.running:
await asyncio.wait( await asyncio.wait(
map(asyncio.create_task, [asyncio.sleep(30), self._urgent_need_reconnect.wait()]), [asyncio.sleep(30), self._urgent_need_reconnect.wait()], return_when=asyncio.FIRST_COMPLETED
return_when=asyncio.FIRST_COMPLETED
) )
if self._urgent_need_reconnect.is_set(): if self._urgent_need_reconnect.is_set():
sleep_delay = 30 sleep_delay = 30
@ -342,7 +338,7 @@ class Network:
try: try:
if not self._urgent_need_reconnect.is_set(): if not self._urgent_need_reconnect.is_set():
await asyncio.wait( await asyncio.wait(
[self._keepalive_task, asyncio.create_task(self._urgent_need_reconnect.wait())], [self._keepalive_task, self._urgent_need_reconnect.wait()],
return_when=asyncio.FIRST_COMPLETED return_when=asyncio.FIRST_COMPLETED
) )
else: else:

View file

@ -214,7 +214,6 @@ class SPVNode:
self.port = 50001 + node_number # avoid conflict with default daemon self.port = 50001 + node_number # avoid conflict with default daemon
self.udp_port = self.port self.udp_port = self.port
self.elastic_notifier_port = 19080 + node_number self.elastic_notifier_port = 19080 + node_number
self.elastic_services = f'localhost:9200/localhost:{self.elastic_notifier_port}'
self.session_timeout = 600 self.session_timeout = 600
self.stopped = True self.stopped = True
self.index_name = uuid4().hex self.index_name = uuid4().hex
@ -236,7 +235,7 @@ class SPVNode:
'host': self.hostname, 'host': self.hostname,
'tcp_port': self.port, 'tcp_port': self.port,
'udp_port': self.udp_port, 'udp_port': self.udp_port,
'elastic_services': self.elastic_services, 'elastic_notifier_port': self.elastic_notifier_port,
'session_timeout': self.session_timeout, 'session_timeout': self.session_timeout,
'max_query_workers': 0, 'max_query_workers': 0,
'es_index_prefix': self.index_name, 'es_index_prefix': self.index_name,
@ -264,7 +263,8 @@ class SPVNode:
await self.server.start() await self.server.start()
except Exception as e: except Exception as e:
self.stopped = True self.stopped = True
log.exception("failed to start spv node") if not isinstance(e, asyncio.CancelledError):
log.exception("failed to start spv node")
raise e raise e
async def stop(self, cleanup=True): async def stop(self, cleanup=True):

View file

@ -182,8 +182,6 @@ class Wallet:
raise InvalidPasswordError() raise InvalidPasswordError()
if "unknown compression method" in e.args[0].lower(): if "unknown compression method" in e.args[0].lower():
raise InvalidPasswordError() raise InvalidPasswordError()
if "invalid window size" in e.args[0].lower():
raise InvalidPasswordError()
raise raise
return json.loads(decompressed) return json.loads(decompressed)

View file

@ -28,7 +28,6 @@ disable=
no-else-return, no-else-return,
cyclic-import, cyclic-import,
missing-docstring, missing-docstring,
consider-using-f-string,
duplicate-code, duplicate-code,
expression-not-assigned, expression-not-assigned,
inconsistent-return-statements, inconsistent-return-statements,

View file

@ -18,7 +18,7 @@ setup(
long_description_content_type="text/markdown", long_description_content_type="text/markdown",
keywords="lbry protocol media", keywords="lbry protocol media",
license='MIT', license='MIT',
python_requires='>=3.8', python_requires='>=3.7',
packages=find_packages(exclude=('tests',)), packages=find_packages(exclude=('tests',)),
zip_safe=False, zip_safe=False,
entry_points={ entry_points={
@ -36,7 +36,7 @@ setup(
'distro==1.4.0', 'distro==1.4.0',
'base58==1.0.0', 'base58==1.0.0',
'cffi==1.13.2', 'cffi==1.13.2',
'cryptography==3.4.7', 'cryptography==2.5',
'protobuf==3.17.2', 'protobuf==3.17.2',
'prometheus_client==0.7.1', 'prometheus_client==0.7.1',
'ecdsa==0.13.3', 'ecdsa==0.13.3',
@ -50,14 +50,14 @@ setup(
], ],
extras_require={ extras_require={
'lint': [ 'lint': [
'pylint==2.13.9' 'pylint==2.10.0'
], ],
'test': [ 'test': [
'coverage', 'coverage',
'jsonschema==4.4.0', 'jsonschema==4.4.0',
], ],
'hub': [ 'hub': [
'hub@git+https://github.com/lbryio/hub.git@929448d64bcbe6c5e476757ec78456beaa85e56a' 'hub@git+https://github.com/lbryio/hub.git@024aceda53fe6d1ab8d519b73584437c25de6975'
] ]
}, },
classifiers=[ classifiers=[

View file

@ -61,14 +61,16 @@ def mock_network_loop(loop: asyncio.AbstractEventLoop,
dht_network[from_addr] = protocol dht_network[from_addr] = protocol
return transport, protocol return transport, protocol
mock_sock = mock.Mock(spec=socket.socket) with mock.patch('socket.socket') as mock_socket:
mock_sock.setsockopt = lambda *_: None mock_sock = mock.Mock(spec=socket.socket)
mock_sock.bind = lambda *_: None mock_sock.setsockopt = lambda *_: None
mock_sock.setblocking = lambda *_: None mock_sock.bind = lambda *_: None
mock_sock.getsockname = lambda: "0.0.0.0" mock_sock.setblocking = lambda *_: None
mock_sock.getpeername = lambda: "" mock_sock.getsockname = lambda: "0.0.0.0"
mock_sock.close = lambda: None mock_sock.getpeername = lambda: ""
mock_sock.type = socket.SOCK_DGRAM mock_sock.close = lambda: None
mock_sock.fileno = lambda: 7 mock_sock.type = socket.SOCK_DGRAM
loop.create_datagram_endpoint = create_datagram_endpoint mock_sock.fileno = lambda: 7
yield mock_socket.return_value = mock_sock
loop.create_datagram_endpoint = create_datagram_endpoint
yield

View file

@ -354,7 +354,7 @@ class FileCommands(CommandTestCase):
await self.daemon.jsonrpc_get('lbry://foo') await self.daemon.jsonrpc_get('lbry://foo')
with open(original_path, 'wb') as handle: with open(original_path, 'wb') as handle:
handle.write(b'some other stuff was there instead') handle.write(b'some other stuff was there instead')
await self.daemon.file_manager.stop() self.daemon.file_manager.stop()
await self.daemon.file_manager.start() await self.daemon.file_manager.start()
await asyncio.wait_for(self.wait_files_to_complete(), timeout=5) # if this hangs, file didn't get set completed await asyncio.wait_for(self.wait_files_to_complete(), timeout=5) # if this hangs, file didn't get set completed
# check that internal state got through up to the file list API # check that internal state got through up to the file list API
@ -382,7 +382,8 @@ class FileCommands(CommandTestCase):
resp = await self.out(self.daemon.jsonrpc_get('lbry://foo', timeout=2)) resp = await self.out(self.daemon.jsonrpc_get('lbry://foo', timeout=2))
self.assertNotIn('error', resp) self.assertNotIn('error', resp)
self.assertTrue(os.path.isfile(path)) self.assertTrue(os.path.isfile(path))
await self.daemon.file_manager.stop() self.daemon.file_manager.stop()
await asyncio.sleep(0.01) # FIXME: this sleep should not be needed
self.assertFalse(os.path.isfile(path)) self.assertFalse(os.path.isfile(path))
async def test_incomplete_downloads_retry(self): async def test_incomplete_downloads_retry(self):
@ -477,7 +478,7 @@ class FileCommands(CommandTestCase):
# restart the daemon and make sure the fee is still there # restart the daemon and make sure the fee is still there
await self.daemon.file_manager.stop() self.daemon.file_manager.stop()
await self.daemon.file_manager.start() await self.daemon.file_manager.start()
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].content_fee.raw, raw_content_fee) self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].content_fee.raw, raw_content_fee)

View file

@ -3,9 +3,7 @@ import hashlib
import aiohttp import aiohttp
import aiohttp.web import aiohttp.web
import asyncio import asyncio
import contextlib
from lbry.file.source import ManagedDownloadSource
from lbry.utils import aiohttp_request from lbry.utils import aiohttp_request
from lbry.blob.blob_file import MAX_BLOB_SIZE from lbry.blob.blob_file import MAX_BLOB_SIZE
from lbry.testcase import CommandTestCase from lbry.testcase import CommandTestCase
@ -23,7 +21,7 @@ def get_random_bytes(n: int) -> bytes:
class RangeRequests(CommandTestCase): class RangeRequests(CommandTestCase):
async def _restart_stream_manager(self): async def _restart_stream_manager(self):
await self.daemon.file_manager.stop() self.daemon.file_manager.stop()
await self.daemon.file_manager.start() await self.daemon.file_manager.start()
return return
@ -354,21 +352,14 @@ class RangeRequests(CommandTestCase):
path = stream.full_path path = stream.full_path
self.assertIsNotNone(path) self.assertIsNotNone(path)
if wait_for_start_writing: if wait_for_start_writing:
with contextlib.suppress(asyncio.CancelledError): await stream.started_writing.wait()
await stream.started_writing.wait()
self.assertTrue(os.path.isfile(path)) self.assertTrue(os.path.isfile(path))
await self.daemon.file_manager.stop() await self._restart_stream_manager()
# while stopped, we get no response to query and no file is present
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'], [])
self.assertEqual(os.path.isfile(path), stream.status == ManagedDownloadSource.STATUS_FINISHED)
await self.daemon.file_manager.start()
# after restart, we get a response to query and same file path
stream = (await self.daemon.jsonrpc_file_list())['items'][0] stream = (await self.daemon.jsonrpc_file_list())['items'][0]
self.assertIsNotNone(stream.full_path) self.assertIsNotNone(stream.full_path)
self.assertEqual(stream.full_path, path) self.assertFalse(os.path.isfile(path))
if wait_for_start_writing: if wait_for_start_writing:
with contextlib.suppress(asyncio.CancelledError): await stream.started_writing.wait()
await stream.started_writing.wait()
self.assertTrue(os.path.isfile(path)) self.assertTrue(os.path.isfile(path))
async def test_file_save_stop_before_finished_streaming_only_wait_for_start(self): async def test_file_save_stop_before_finished_streaming_only_wait_for_start(self):

View file

@ -7,28 +7,28 @@ from lbry.extras.daemon.exchange_rate_manager import (
class TestExchangeRateManager(AsyncioTestCase): class TestExchangeRateManager(AsyncioTestCase):
# async def test_exchange_rate_manager(self): async def test_exchange_rate_manager(self):
# manager = ExchangeRateManager(FEEDS) manager = ExchangeRateManager(FEEDS)
# manager.start() manager.start()
# self.addCleanup(manager.stop) self.addCleanup(manager.stop)
# for feed in manager.market_feeds: for feed in manager.market_feeds:
# self.assertFalse(feed.is_online) self.assertFalse(feed.is_online)
# self.assertIsNone(feed.rate) self.assertIsNone(feed.rate)
# await manager.wait() await manager.wait()
# failures = set() failures = set()
# for feed in manager.market_feeds: for feed in manager.market_feeds:
# if feed.is_online: if feed.is_online:
# self.assertIsInstance(feed.rate, ExchangeRate) self.assertIsInstance(feed.rate, ExchangeRate)
# else: else:
# failures.add(feed.name) failures.add(feed.name)
# self.assertFalse(feed.has_rate) self.assertFalse(feed.has_rate)
# self.assertLessEqual(len(failures), 1, f"feed failures: {failures}. Please check exchange rate feeds!") self.assertLessEqual(len(failures), 1, f"feed failures: {failures}. Please check exchange rate feeds!")
# lbc = manager.convert_currency('USD', 'LBC', Decimal('1.0')) lbc = manager.convert_currency('USD', 'LBC', Decimal('1.0'))
# self.assertGreaterEqual(lbc, 2.0) self.assertGreaterEqual(lbc, 2.0)
# self.assertLessEqual(lbc, 120.0) self.assertLessEqual(lbc, 120.0)
# lbc = manager.convert_currency('BTC', 'LBC', Decimal('0.01')) lbc = manager.convert_currency('BTC', 'LBC', Decimal('0.01'))
# self.assertGreaterEqual(lbc, 1_000) self.assertGreaterEqual(lbc, 1_000)
# self.assertLessEqual(lbc, 30_000) self.assertLessEqual(lbc, 30_000)
async def test_it_handles_feed_being_offline(self): async def test_it_handles_feed_being_offline(self):
class FakeFeed(MarketFeed): class FakeFeed(MarketFeed):

View file

@ -1508,27 +1508,27 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
COIN = int(1E8) COIN = int(1E8)
self.assertEqual(self.conductor.spv_node.writer.height, 207) self.assertEqual(self.conductor.spv_node.writer.height, 207)
self.conductor.spv_node.writer.db.prefix_db.trending_notification.stash_put( self.conductor.spv_node.writer.db.prefix_db.trending_notification.stage_put(
(208, bytes.fromhex(claim_id1)), (0, 10 * COIN) (208, bytes.fromhex(claim_id1)), (0, 10 * COIN)
) )
await self.generate(1) await self.generate(1)
self.assertEqual(self.conductor.spv_node.writer.height, 208) self.assertEqual(self.conductor.spv_node.writer.height, 208)
self.assertEqual(1.7090807854206793, await get_trending_score(claim_id1)) self.assertEqual(1.7090807854206793, await get_trending_score(claim_id1))
self.conductor.spv_node.writer.db.prefix_db.trending_notification.stash_put( self.conductor.spv_node.writer.db.prefix_db.trending_notification.stage_put(
(209, bytes.fromhex(claim_id1)), (10 * COIN, 100 * COIN) (209, bytes.fromhex(claim_id1)), (10 * COIN, 100 * COIN)
) )
await self.generate(1) await self.generate(1)
self.assertEqual(self.conductor.spv_node.writer.height, 209) self.assertEqual(self.conductor.spv_node.writer.height, 209)
self.assertEqual(2.2437974397778886, await get_trending_score(claim_id1)) self.assertEqual(2.2437974397778886, await get_trending_score(claim_id1))
self.conductor.spv_node.writer.db.prefix_db.trending_notification.stash_put( self.conductor.spv_node.writer.db.prefix_db.trending_notification.stage_put(
(309, bytes.fromhex(claim_id1)), (100 * COIN, 1000000 * COIN) (309, bytes.fromhex(claim_id1)), (100 * COIN, 1000000 * COIN)
) )
await self.generate(100) await self.generate(100)
self.assertEqual(self.conductor.spv_node.writer.height, 309) self.assertEqual(self.conductor.spv_node.writer.height, 309)
self.assertEqual(5.157053472135866, await get_trending_score(claim_id1)) self.assertEqual(5.157053472135866, await get_trending_score(claim_id1))
self.conductor.spv_node.writer.db.prefix_db.trending_notification.stash_put( self.conductor.spv_node.writer.db.prefix_db.trending_notification.stage_put(
(409, bytes.fromhex(claim_id1)), (1000000 * COIN, 1 * COIN) (409, bytes.fromhex(claim_id1)), (1000000 * COIN, 1 * COIN)
) )

View file

@ -2,7 +2,7 @@ import asyncio
import unittest import unittest
from lbry.testcase import CommandTestCase from lbry.testcase import CommandTestCase
from lbry.wallet import Transaction
class TransactionCommandsTestCase(CommandTestCase): class TransactionCommandsTestCase(CommandTestCase):
@ -29,42 +29,17 @@ class TransactionCommandsTestCase(CommandTestCase):
# someone's tx # someone's tx
change_address = await self.blockchain.get_raw_change_address() change_address = await self.blockchain.get_raw_change_address()
sendtxid = await self.blockchain.send_to_address(change_address, 10) sendtxid = await self.blockchain.send_to_address(change_address, 10)
# After a few tries, Hub should have the transaction (in mempool). await asyncio.sleep(0.2)
for i in range(5):
tx = await self.daemon.jsonrpc_transaction_show(sendtxid)
# Retry if Hub is not aware of the transaction.
if isinstance(tx, dict):
# Fields: 'success', 'code', 'message'
self.assertFalse(tx['success'], tx)
self.assertEqual(tx['code'], 404, tx)
self.assertEqual(tx['message'], "transaction not found", tx)
await asyncio.sleep(0.1)
continue
break
# verify transaction show (in mempool)
self.assertTrue(isinstance(tx, Transaction), str(tx))
# Fields: 'txid', 'raw', 'height', 'position', 'is_verified', and more.
self.assertEqual(tx.id, sendtxid, vars(tx))
self.assertEqual(tx.height, -1, vars(tx))
self.assertEqual(tx.is_verified, False, vars(tx))
# transaction is confirmed and leaves mempool
await self.generate(1)
# verify transaction show
tx = await self.daemon.jsonrpc_transaction_show(sendtxid) tx = await self.daemon.jsonrpc_transaction_show(sendtxid)
self.assertTrue(isinstance(tx, Transaction), str(tx)) self.assertEqual(tx.id, sendtxid)
self.assertEqual(tx.id, sendtxid, vars(tx)) self.assertEqual(tx.height, -1)
self.assertEqual(tx.height, self.ledger.headers.height, vars(tx)) await self.generate(1)
self.assertEqual(tx.is_verified, True, vars(tx)) tx = await self.daemon.jsonrpc_transaction_show(sendtxid)
self.assertEqual(tx.height, self.ledger.headers.height)
# inexistent # inexistent
result = await self.daemon.jsonrpc_transaction_show('0'*64) result = await self.daemon.jsonrpc_transaction_show('0'*64)
self.assertTrue(isinstance(result, dict), result) self.assertFalse(result['success'])
# Fields: 'success', 'code', 'message'
self.assertFalse(result['success'], result)
self.assertEqual(result['code'], 404, result)
self.assertEqual(result['message'], "transaction not found", result)
async def test_utxo_release(self): async def test_utxo_release(self):
await self.send_to_address_and_wait( await self.send_to_address_and_wait(

View file

@ -424,7 +424,7 @@ class TestStreamManager(BlobExchangeTestBase):
self.assertIsNone(stream.full_path) self.assertIsNone(stream.full_path)
self.assertEqual(0, stream.written_bytes) self.assertEqual(0, stream.written_bytes)
await self.stream_manager.stop() self.stream_manager.stop()
await self.stream_manager.start() await self.stream_manager.start()
self.assertEqual(1, len(self.stream_manager.streams)) self.assertEqual(1, len(self.stream_manager.streams))
stream = list(self.stream_manager.streams.values())[0] stream = list(self.stream_manager.streams.values())[0]
@ -449,7 +449,7 @@ class TestStreamManager(BlobExchangeTestBase):
stream = await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager) stream = await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager)
await stream.finished_writing.wait() await stream.finished_writing.wait()
await asyncio.sleep(0) await asyncio.sleep(0)
await self.stream_manager.stop() self.stream_manager.stop()
self.client_blob_manager.stop() self.client_blob_manager.stop()
# partial removal, only sd blob is missing. # partial removal, only sd blob is missing.
# in this case, we recover the sd blob while the other blobs are kept untouched as 'finished' # in this case, we recover the sd blob while the other blobs are kept untouched as 'finished'

View file

@ -470,7 +470,7 @@ class TestUpgrade(AsyncioTestCase):
class TestSQLiteRace(AsyncioTestCase): class TestSQLiteRace(AsyncioTestCase):
max_misuse_attempts = 120000 max_misuse_attempts = 80000
def setup_db(self): def setup_db(self):
self.db = sqlite3.connect(":memory:", isolation_level=None) self.db = sqlite3.connect(":memory:", isolation_level=None)