mirror of
https://github.com/LBRYFoundation/LBRY-Vault.git
synced 2025-08-27 07:23:25 +00:00
Merge pull request #5319 from SomberNight/sync_progress_3_20190507
synchronizer: show progress in GUI (take 3) (req_answered/req_sent)
This commit is contained in:
commit
92260a798a
4 changed files with 43 additions and 3 deletions
|
@ -582,6 +582,12 @@ class AddressSynchronizer(Logger):
|
||||||
def is_up_to_date(self):
|
def is_up_to_date(self):
|
||||||
with self.lock: return self.up_to_date
|
with self.lock: return self.up_to_date
|
||||||
|
|
||||||
|
def get_history_sync_state_details(self) -> Tuple[int, int]:
|
||||||
|
if self.synchronizer:
|
||||||
|
return self.synchronizer.num_requests_sent_and_answered()
|
||||||
|
else:
|
||||||
|
return 0, 0
|
||||||
|
|
||||||
@with_transaction_lock
|
@with_transaction_lock
|
||||||
def get_tx_delta(self, tx_hash, address):
|
def get_tx_delta(self, tx_hash, address):
|
||||||
"""effect of tx on address"""
|
"""effect of tx on address"""
|
||||||
|
|
|
@ -312,6 +312,9 @@ class ElectrumWindow(App):
|
||||||
self._trigger_update_status = Clock.create_trigger(self.update_status, .5)
|
self._trigger_update_status = Clock.create_trigger(self.update_status, .5)
|
||||||
self._trigger_update_history = Clock.create_trigger(self.update_history, .5)
|
self._trigger_update_history = Clock.create_trigger(self.update_history, .5)
|
||||||
self._trigger_update_interfaces = Clock.create_trigger(self.update_interfaces, .5)
|
self._trigger_update_interfaces = Clock.create_trigger(self.update_interfaces, .5)
|
||||||
|
|
||||||
|
self._periodic_update_status_during_sync = Clock.schedule_interval(self.update_wallet_synchronizing_progress, .5)
|
||||||
|
|
||||||
# cached dialogs
|
# cached dialogs
|
||||||
self._settings_dialog = None
|
self._settings_dialog = None
|
||||||
self._password_dialog = None
|
self._password_dialog = None
|
||||||
|
@ -745,7 +748,9 @@ class ElectrumWindow(App):
|
||||||
server_height = self.network.get_server_height()
|
server_height = self.network.get_server_height()
|
||||||
server_lag = self.num_blocks - server_height
|
server_lag = self.num_blocks - server_height
|
||||||
if not self.wallet.up_to_date or server_height == 0:
|
if not self.wallet.up_to_date or server_height == 0:
|
||||||
status = _("Synchronizing...")
|
num_sent, num_answered = self.wallet.get_history_sync_state_details()
|
||||||
|
status = ("{} [size=18dp]({}/{})[/size]"
|
||||||
|
.format(_("Synchronizing..."), num_answered, num_sent))
|
||||||
elif server_lag > 1:
|
elif server_lag > 1:
|
||||||
status = _("Server is lagging ({} blocks)").format(server_lag)
|
status = _("Server is lagging ({} blocks)").format(server_lag)
|
||||||
else:
|
else:
|
||||||
|
@ -761,6 +766,12 @@ class ElectrumWindow(App):
|
||||||
self.balance = str(text.strip()) + ' [size=22dp]%s[/size]'% self.base_unit
|
self.balance = str(text.strip()) + ' [size=22dp]%s[/size]'% self.base_unit
|
||||||
self.fiat_balance = self.fx.format_amount(c+u+x) + ' [size=22dp]%s[/size]'% self.fx.ccy
|
self.fiat_balance = self.fx.format_amount(c+u+x) + ' [size=22dp]%s[/size]'% self.fx.ccy
|
||||||
|
|
||||||
|
def update_wallet_synchronizing_progress(self, *dt):
|
||||||
|
if not self.wallet:
|
||||||
|
return
|
||||||
|
if not self.wallet.up_to_date:
|
||||||
|
self._trigger_update_status()
|
||||||
|
|
||||||
def get_max_amount(self):
|
def get_max_amount(self):
|
||||||
from electrum.transaction import TxOutput
|
from electrum.transaction import TxOutput
|
||||||
if run_hook('abort_send', self):
|
if run_hook('abort_send', self):
|
||||||
|
|
|
@ -740,6 +740,9 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger):
|
||||||
if self.need_update.is_set():
|
if self.need_update.is_set():
|
||||||
self.need_update.clear()
|
self.need_update.clear()
|
||||||
self.update_wallet()
|
self.update_wallet()
|
||||||
|
elif not self.wallet.up_to_date:
|
||||||
|
# this updates "synchronizing" progress
|
||||||
|
self.update_status()
|
||||||
# resolve aliases
|
# resolve aliases
|
||||||
# FIXME this is a blocking network call that has a timeout of 5 sec
|
# FIXME this is a blocking network call that has a timeout of 5 sec
|
||||||
self.payto_e.resolve()
|
self.payto_e.resolve()
|
||||||
|
@ -822,7 +825,9 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger):
|
||||||
# until we get a headers subscription request response.
|
# until we get a headers subscription request response.
|
||||||
# Display the synchronizing message in that case.
|
# Display the synchronizing message in that case.
|
||||||
if not self.wallet.up_to_date or server_height == 0:
|
if not self.wallet.up_to_date or server_height == 0:
|
||||||
text = _("Synchronizing...")
|
num_sent, num_answered = self.wallet.get_history_sync_state_details()
|
||||||
|
text = ("{} ({}/{})"
|
||||||
|
.format(_("Synchronizing..."), num_answered, num_sent))
|
||||||
icon = read_QIcon("status_waiting.png")
|
icon = read_QIcon("status_waiting.png")
|
||||||
elif server_lag > 1:
|
elif server_lag > 1:
|
||||||
text = _("Server is lagging ({} blocks)").format(server_lag)
|
text = _("Server is lagging ({} blocks)").format(server_lag)
|
||||||
|
|
|
@ -24,7 +24,7 @@
|
||||||
# SOFTWARE.
|
# SOFTWARE.
|
||||||
import asyncio
|
import asyncio
|
||||||
import hashlib
|
import hashlib
|
||||||
from typing import Dict, List, TYPE_CHECKING
|
from typing import Dict, List, TYPE_CHECKING, Tuple
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
|
||||||
from aiorpcx import TaskGroup, run_in_thread
|
from aiorpcx import TaskGroup, run_in_thread
|
||||||
|
@ -59,12 +59,14 @@ class SynchronizerBase(NetworkJobOnDefaultServer):
|
||||||
def __init__(self, network: 'Network'):
|
def __init__(self, network: 'Network'):
|
||||||
self.asyncio_loop = network.asyncio_loop
|
self.asyncio_loop = network.asyncio_loop
|
||||||
NetworkJobOnDefaultServer.__init__(self, network)
|
NetworkJobOnDefaultServer.__init__(self, network)
|
||||||
|
self._reset_request_counters()
|
||||||
|
|
||||||
def _reset(self):
|
def _reset(self):
|
||||||
super()._reset()
|
super()._reset()
|
||||||
self.requested_addrs = set()
|
self.requested_addrs = set()
|
||||||
self.scripthash_to_address = {}
|
self.scripthash_to_address = {}
|
||||||
self._processed_some_notifications = False # so that we don't miss them
|
self._processed_some_notifications = False # so that we don't miss them
|
||||||
|
self._reset_request_counters()
|
||||||
# Queues
|
# Queues
|
||||||
self.add_queue = asyncio.Queue()
|
self.add_queue = asyncio.Queue()
|
||||||
self.status_queue = asyncio.Queue()
|
self.status_queue = asyncio.Queue()
|
||||||
|
@ -79,6 +81,10 @@ class SynchronizerBase(NetworkJobOnDefaultServer):
|
||||||
# we are being cancelled now
|
# we are being cancelled now
|
||||||
self.session.unsubscribe(self.status_queue)
|
self.session.unsubscribe(self.status_queue)
|
||||||
|
|
||||||
|
def _reset_request_counters(self):
|
||||||
|
self._requests_sent = 0
|
||||||
|
self._requests_answered = 0
|
||||||
|
|
||||||
def add(self, addr):
|
def add(self, addr):
|
||||||
asyncio.run_coroutine_threadsafe(self._add_address(addr), self.asyncio_loop)
|
asyncio.run_coroutine_threadsafe(self._add_address(addr), self.asyncio_loop)
|
||||||
|
|
||||||
|
@ -96,7 +102,9 @@ class SynchronizerBase(NetworkJobOnDefaultServer):
|
||||||
async def subscribe_to_address(addr):
|
async def subscribe_to_address(addr):
|
||||||
h = address_to_scripthash(addr)
|
h = address_to_scripthash(addr)
|
||||||
self.scripthash_to_address[h] = addr
|
self.scripthash_to_address[h] = addr
|
||||||
|
self._requests_sent += 1
|
||||||
await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue)
|
await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue)
|
||||||
|
self._requests_answered += 1
|
||||||
self.requested_addrs.remove(addr)
|
self.requested_addrs.remove(addr)
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
|
@ -110,6 +118,9 @@ class SynchronizerBase(NetworkJobOnDefaultServer):
|
||||||
await self.group.spawn(self._on_address_status, addr, status)
|
await self.group.spawn(self._on_address_status, addr, status)
|
||||||
self._processed_some_notifications = True
|
self._processed_some_notifications = True
|
||||||
|
|
||||||
|
def num_requests_sent_and_answered(self) -> Tuple[int, int]:
|
||||||
|
return self._requests_sent, self._requests_answered
|
||||||
|
|
||||||
async def main(self):
|
async def main(self):
|
||||||
raise NotImplementedError() # implemented by subclasses
|
raise NotImplementedError() # implemented by subclasses
|
||||||
|
|
||||||
|
@ -148,7 +159,9 @@ class Synchronizer(SynchronizerBase):
|
||||||
# request address history
|
# request address history
|
||||||
self.requested_histories[addr] = status
|
self.requested_histories[addr] = status
|
||||||
h = address_to_scripthash(addr)
|
h = address_to_scripthash(addr)
|
||||||
|
self._requests_sent += 1
|
||||||
result = await self.network.get_history_for_scripthash(h)
|
result = await self.network.get_history_for_scripthash(h)
|
||||||
|
self._requests_answered += 1
|
||||||
self.logger.info(f"receiving history {addr} {len(result)}")
|
self.logger.info(f"receiving history {addr} {len(result)}")
|
||||||
hashes = set(map(lambda item: item['tx_hash'], result))
|
hashes = set(map(lambda item: item['tx_hash'], result))
|
||||||
hist = list(map(lambda item: (item['tx_hash'], item['height']), result))
|
hist = list(map(lambda item: (item['tx_hash'], item['height']), result))
|
||||||
|
@ -187,6 +200,7 @@ class Synchronizer(SynchronizerBase):
|
||||||
await group.spawn(self._get_transaction(tx_hash, allow_server_not_finding_tx=allow_server_not_finding_tx))
|
await group.spawn(self._get_transaction(tx_hash, allow_server_not_finding_tx=allow_server_not_finding_tx))
|
||||||
|
|
||||||
async def _get_transaction(self, tx_hash, *, allow_server_not_finding_tx=False):
|
async def _get_transaction(self, tx_hash, *, allow_server_not_finding_tx=False):
|
||||||
|
self._requests_sent += 1
|
||||||
try:
|
try:
|
||||||
result = await self.network.get_transaction(tx_hash)
|
result = await self.network.get_transaction(tx_hash)
|
||||||
except UntrustedServerReturnedError as e:
|
except UntrustedServerReturnedError as e:
|
||||||
|
@ -196,6 +210,8 @@ class Synchronizer(SynchronizerBase):
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
finally:
|
||||||
|
self._requests_answered += 1
|
||||||
tx = Transaction(result)
|
tx = Transaction(result)
|
||||||
try:
|
try:
|
||||||
tx.deserialize() # see if raises
|
tx.deserialize() # see if raises
|
||||||
|
@ -234,6 +250,8 @@ class Synchronizer(SynchronizerBase):
|
||||||
if (up_to_date != self.wallet.is_up_to_date()
|
if (up_to_date != self.wallet.is_up_to_date()
|
||||||
or up_to_date and self._processed_some_notifications):
|
or up_to_date and self._processed_some_notifications):
|
||||||
self._processed_some_notifications = False
|
self._processed_some_notifications = False
|
||||||
|
if up_to_date:
|
||||||
|
self._reset_request_counters()
|
||||||
self.wallet.set_up_to_date(up_to_date)
|
self.wallet.set_up_to_date(up_to_date)
|
||||||
self.wallet.network.trigger_callback('wallet_updated', self.wallet)
|
self.wallet.network.trigger_callback('wallet_updated', self.wallet)
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue