mirror of
https://github.com/LBRYFoundation/LBRY-Vault.git
synced 2025-09-04 12:55:10 +00:00
rename all TaskGroup() fields to "taskgroup"
for consistency
This commit is contained in:
parent
c8260249b0
commit
ed234d3444
9 changed files with 33 additions and 33 deletions
|
@ -254,8 +254,8 @@ class Interface(Logger):
|
||||||
self.debug = False
|
self.debug = False
|
||||||
|
|
||||||
asyncio.run_coroutine_threadsafe(
|
asyncio.run_coroutine_threadsafe(
|
||||||
self.network.main_taskgroup.spawn(self.run()), self.network.asyncio_loop)
|
self.network.taskgroup.spawn(self.run()), self.network.asyncio_loop)
|
||||||
self.group = SilentTaskGroup()
|
self.taskgroup = SilentTaskGroup()
|
||||||
|
|
||||||
def diagnostic_name(self):
|
def diagnostic_name(self):
|
||||||
return str(NetAddress(self.host, self.port))
|
return str(NetAddress(self.host, self.port))
|
||||||
|
@ -370,7 +370,7 @@ class Interface(Logger):
|
||||||
self.ready.cancel()
|
self.ready.cancel()
|
||||||
return wrapper_func
|
return wrapper_func
|
||||||
|
|
||||||
@ignore_exceptions # do not kill main_taskgroup
|
@ignore_exceptions # do not kill network.taskgroup
|
||||||
@log_exceptions
|
@log_exceptions
|
||||||
@handle_disconnect
|
@handle_disconnect
|
||||||
async def run(self):
|
async def run(self):
|
||||||
|
@ -489,7 +489,7 @@ class Interface(Logger):
|
||||||
self.logger.info(f"connection established. version: {ver}")
|
self.logger.info(f"connection established. version: {ver}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async with self.group as group:
|
async with self.taskgroup as group:
|
||||||
await group.spawn(self.ping)
|
await group.spawn(self.ping)
|
||||||
await group.spawn(self.run_fetch_blocks)
|
await group.spawn(self.run_fetch_blocks)
|
||||||
await group.spawn(self.monitor_connection)
|
await group.spawn(self.monitor_connection)
|
||||||
|
|
|
@ -94,7 +94,7 @@ class Peer(Logger):
|
||||||
self._local_changed_events = defaultdict(asyncio.Event)
|
self._local_changed_events = defaultdict(asyncio.Event)
|
||||||
self._remote_changed_events = defaultdict(asyncio.Event)
|
self._remote_changed_events = defaultdict(asyncio.Event)
|
||||||
Logger.__init__(self)
|
Logger.__init__(self)
|
||||||
self.group = SilentTaskGroup()
|
self.taskgroup = SilentTaskGroup()
|
||||||
|
|
||||||
def send_message(self, message_name: str, **kwargs):
|
def send_message(self, message_name: str, **kwargs):
|
||||||
assert type(message_name) is str
|
assert type(message_name) is str
|
||||||
|
@ -242,7 +242,7 @@ class Peer(Logger):
|
||||||
@log_exceptions
|
@log_exceptions
|
||||||
@handle_disconnect
|
@handle_disconnect
|
||||||
async def main_loop(self):
|
async def main_loop(self):
|
||||||
async with self.group as group:
|
async with self.taskgroup as group:
|
||||||
await group.spawn(self._message_loop())
|
await group.spawn(self._message_loop())
|
||||||
await group.spawn(self.query_gossip())
|
await group.spawn(self.query_gossip())
|
||||||
await group.spawn(self.process_gossip())
|
await group.spawn(self.process_gossip())
|
||||||
|
|
|
@ -75,7 +75,7 @@ class LNChannelVerifier(NetworkJobOnDefaultServer):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
async def _start_tasks(self):
|
async def _start_tasks(self):
|
||||||
async with self.group as group:
|
async with self.taskgroup as group:
|
||||||
await group.spawn(self.main)
|
await group.spawn(self.main)
|
||||||
|
|
||||||
async def main(self):
|
async def main(self):
|
||||||
|
@ -100,10 +100,10 @@ class LNChannelVerifier(NetworkJobOnDefaultServer):
|
||||||
header = blockchain.read_header(block_height)
|
header = blockchain.read_header(block_height)
|
||||||
if header is None:
|
if header is None:
|
||||||
if block_height < constants.net.max_checkpoint():
|
if block_height < constants.net.max_checkpoint():
|
||||||
await self.group.spawn(self.network.request_chunk(block_height, None, can_return_early=True))
|
await self.taskgroup.spawn(self.network.request_chunk(block_height, None, can_return_early=True))
|
||||||
continue
|
continue
|
||||||
self.started_verifying_channel.add(short_channel_id)
|
self.started_verifying_channel.add(short_channel_id)
|
||||||
await self.group.spawn(self.verify_channel(block_height, short_channel_id))
|
await self.taskgroup.spawn(self.verify_channel(block_height, short_channel_id))
|
||||||
#self.logger.info(f'requested short_channel_id {bh2u(short_channel_id)}')
|
#self.logger.info(f'requested short_channel_id {bh2u(short_channel_id)}')
|
||||||
|
|
||||||
async def verify_channel(self, block_height: int, short_channel_id: ShortChannelID):
|
async def verify_channel(self, block_height: int, short_channel_id: ShortChannelID):
|
||||||
|
|
|
@ -1294,7 +1294,7 @@ class LNWallet(LNWorker):
|
||||||
continue
|
continue
|
||||||
peer = self.peers.get(chan.node_id, None)
|
peer = self.peers.get(chan.node_id, None)
|
||||||
if peer:
|
if peer:
|
||||||
await peer.group.spawn(peer.reestablish_channel(chan))
|
await peer.taskgroup.spawn(peer.reestablish_channel(chan))
|
||||||
else:
|
else:
|
||||||
await self.taskgroup.spawn(self.reestablish_peer_for_given_channel(chan))
|
await self.taskgroup.spawn(self.reestablish_peer_for_given_channel(chan))
|
||||||
|
|
||||||
|
|
|
@ -268,7 +268,7 @@ class Network(Logger):
|
||||||
if not self.default_server:
|
if not self.default_server:
|
||||||
self.default_server = pick_random_server()
|
self.default_server = pick_random_server()
|
||||||
|
|
||||||
self.main_taskgroup = None # type: TaskGroup
|
self.taskgroup = None # type: TaskGroup
|
||||||
|
|
||||||
# locks
|
# locks
|
||||||
self.restart_lock = asyncio.Lock()
|
self.restart_lock = asyncio.Lock()
|
||||||
|
@ -661,7 +661,7 @@ class Network(Logger):
|
||||||
old_server = old_interface.server if old_interface else None
|
old_server = old_interface.server if old_interface else None
|
||||||
|
|
||||||
# Stop any current interface in order to terminate subscriptions,
|
# Stop any current interface in order to terminate subscriptions,
|
||||||
# and to cancel tasks in interface.group.
|
# and to cancel tasks in interface.taskgroup.
|
||||||
# However, for headers sub, give preference to this interface
|
# However, for headers sub, give preference to this interface
|
||||||
# over unknown ones, i.e. start it again right away.
|
# over unknown ones, i.e. start it again right away.
|
||||||
if old_server and old_server != server:
|
if old_server and old_server != server:
|
||||||
|
@ -680,7 +680,7 @@ class Network(Logger):
|
||||||
assert i.ready.done(), "interface we are switching to is not ready yet"
|
assert i.ready.done(), "interface we are switching to is not ready yet"
|
||||||
blockchain_updated = i.blockchain != self.blockchain()
|
blockchain_updated = i.blockchain != self.blockchain()
|
||||||
self.interface = i
|
self.interface = i
|
||||||
await i.group.spawn(self._request_server_info(i))
|
await i.taskgroup.spawn(self._request_server_info(i))
|
||||||
self.trigger_callback('default_server_changed')
|
self.trigger_callback('default_server_changed')
|
||||||
self.default_server_changed_event.set()
|
self.default_server_changed_event.set()
|
||||||
self.default_server_changed_event.clear()
|
self.default_server_changed_event.clear()
|
||||||
|
@ -1118,8 +1118,8 @@ class Network(Logger):
|
||||||
f.write(json.dumps(cp, indent=4))
|
f.write(json.dumps(cp, indent=4))
|
||||||
|
|
||||||
async def _start(self):
|
async def _start(self):
|
||||||
assert not self.main_taskgroup
|
assert not self.taskgroup
|
||||||
self.main_taskgroup = main_taskgroup = SilentTaskGroup()
|
self.taskgroup = taskgroup = SilentTaskGroup()
|
||||||
assert not self.interface and not self.interfaces
|
assert not self.interface and not self.interfaces
|
||||||
assert not self.connecting and not self.server_queue
|
assert not self.connecting and not self.server_queue
|
||||||
self.logger.info('starting network')
|
self.logger.info('starting network')
|
||||||
|
@ -1135,11 +1135,11 @@ class Network(Logger):
|
||||||
await self._init_headers_file()
|
await self._init_headers_file()
|
||||||
# note: if a task finishes with CancelledError, that
|
# note: if a task finishes with CancelledError, that
|
||||||
# will NOT raise, and the group will keep the other tasks running
|
# will NOT raise, and the group will keep the other tasks running
|
||||||
async with main_taskgroup as group:
|
async with taskgroup as group:
|
||||||
await group.spawn(self._maintain_sessions())
|
await group.spawn(self._maintain_sessions())
|
||||||
[await group.spawn(job) for job in self._jobs]
|
[await group.spawn(job) for job in self._jobs]
|
||||||
except BaseException as e:
|
except BaseException as e:
|
||||||
self.logger.exception('main_taskgroup died.')
|
self.logger.exception('taskgroup died.')
|
||||||
raise e
|
raise e
|
||||||
asyncio.run_coroutine_threadsafe(main(), self.asyncio_loop)
|
asyncio.run_coroutine_threadsafe(main(), self.asyncio_loop)
|
||||||
|
|
||||||
|
@ -1158,10 +1158,10 @@ class Network(Logger):
|
||||||
async def _stop(self, full_shutdown=False):
|
async def _stop(self, full_shutdown=False):
|
||||||
self.logger.info("stopping network")
|
self.logger.info("stopping network")
|
||||||
try:
|
try:
|
||||||
await asyncio.wait_for(self.main_taskgroup.cancel_remaining(), timeout=2)
|
await asyncio.wait_for(self.taskgroup.cancel_remaining(), timeout=2)
|
||||||
except (asyncio.TimeoutError, asyncio.CancelledError) as e:
|
except (asyncio.TimeoutError, asyncio.CancelledError) as e:
|
||||||
self.logger.info(f"exc during main_taskgroup cancellation: {repr(e)}")
|
self.logger.info(f"exc during main_taskgroup cancellation: {repr(e)}")
|
||||||
self.main_taskgroup = None # type: TaskGroup
|
self.taskgroup = None # type: TaskGroup
|
||||||
self.interface = None # type: Interface
|
self.interface = None # type: Interface
|
||||||
self.interfaces = {} # type: Dict[str, Interface]
|
self.interfaces = {} # type: Dict[str, Interface]
|
||||||
self.connecting.clear()
|
self.connecting.clear()
|
||||||
|
@ -1196,7 +1196,7 @@ class Network(Logger):
|
||||||
async def launch_already_queued_up_new_interfaces():
|
async def launch_already_queued_up_new_interfaces():
|
||||||
while self.server_queue.qsize() > 0:
|
while self.server_queue.qsize() > 0:
|
||||||
server = self.server_queue.get()
|
server = self.server_queue.get()
|
||||||
await self.main_taskgroup.spawn(self._run_new_interface(server))
|
await self.taskgroup.spawn(self._run_new_interface(server))
|
||||||
async def maybe_queue_new_interfaces_to_be_launched_later():
|
async def maybe_queue_new_interfaces_to_be_launched_later():
|
||||||
now = time.time()
|
now = time.time()
|
||||||
for i in range(self.num_server - len(self.interfaces) - len(self.connecting)):
|
for i in range(self.num_server - len(self.interfaces) - len(self.connecting)):
|
||||||
|
@ -1218,7 +1218,7 @@ class Network(Logger):
|
||||||
await self._ensure_there_is_a_main_interface()
|
await self._ensure_there_is_a_main_interface()
|
||||||
if self.is_connected():
|
if self.is_connected():
|
||||||
if self.config.is_fee_estimates_update_required():
|
if self.config.is_fee_estimates_update_required():
|
||||||
await self.interface.group.spawn(self._request_fee_estimates, self.interface)
|
await self.interface.taskgroup.spawn(self._request_fee_estimates, self.interface)
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
|
@ -1228,7 +1228,7 @@ class Network(Logger):
|
||||||
await maintain_main_interface()
|
await maintain_main_interface()
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
# suppress spurious cancellations
|
# suppress spurious cancellations
|
||||||
group = self.main_taskgroup
|
group = self.taskgroup
|
||||||
if not group or group.closed():
|
if not group or group.closed():
|
||||||
raise
|
raise
|
||||||
await asyncio.sleep(0.1)
|
await asyncio.sleep(0.1)
|
||||||
|
|
|
@ -75,7 +75,7 @@ class SynchronizerBase(NetworkJobOnDefaultServer):
|
||||||
|
|
||||||
async def _start_tasks(self):
|
async def _start_tasks(self):
|
||||||
try:
|
try:
|
||||||
async with self.group as group:
|
async with self.taskgroup as group:
|
||||||
await group.spawn(self.send_subscriptions())
|
await group.spawn(self.send_subscriptions())
|
||||||
await group.spawn(self.handle_status())
|
await group.spawn(self.handle_status())
|
||||||
await group.spawn(self.main())
|
await group.spawn(self.main())
|
||||||
|
@ -116,13 +116,13 @@ class SynchronizerBase(NetworkJobOnDefaultServer):
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
addr = await self.add_queue.get()
|
addr = await self.add_queue.get()
|
||||||
await self.group.spawn(subscribe_to_address, addr)
|
await self.taskgroup.spawn(subscribe_to_address, addr)
|
||||||
|
|
||||||
async def handle_status(self):
|
async def handle_status(self):
|
||||||
while True:
|
while True:
|
||||||
h, status = await self.status_queue.get()
|
h, status = await self.status_queue.get()
|
||||||
addr = self.scripthash_to_address[h]
|
addr = self.scripthash_to_address[h]
|
||||||
await self.group.spawn(self._on_address_status, addr, status)
|
await self.taskgroup.spawn(self._on_address_status, addr, status)
|
||||||
self._processed_some_notifications = True
|
self._processed_some_notifications = True
|
||||||
|
|
||||||
def num_requests_sent_and_answered(self) -> Tuple[int, int]:
|
def num_requests_sent_and_answered(self) -> Tuple[int, int]:
|
||||||
|
|
|
@ -16,7 +16,7 @@ class MockTaskGroup:
|
||||||
async def spawn(self, x): return
|
async def spawn(self, x): return
|
||||||
|
|
||||||
class MockNetwork:
|
class MockNetwork:
|
||||||
main_taskgroup = MockTaskGroup()
|
taskgroup = MockTaskGroup()
|
||||||
asyncio_loop = asyncio.get_event_loop()
|
asyncio_loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
class MockInterface(Interface):
|
class MockInterface(Interface):
|
||||||
|
|
|
@ -1124,14 +1124,14 @@ class NetworkJobOnDefaultServer(Logger):
|
||||||
"""Initialise fields. Called every time the underlying
|
"""Initialise fields. Called every time the underlying
|
||||||
server connection changes.
|
server connection changes.
|
||||||
"""
|
"""
|
||||||
self.group = SilentTaskGroup()
|
self.taskgroup = SilentTaskGroup()
|
||||||
|
|
||||||
async def _start(self, interface: 'Interface'):
|
async def _start(self, interface: 'Interface'):
|
||||||
self.interface = interface
|
self.interface = interface
|
||||||
await interface.group.spawn(self._start_tasks)
|
await interface.taskgroup.spawn(self._start_tasks)
|
||||||
|
|
||||||
async def _start_tasks(self):
|
async def _start_tasks(self):
|
||||||
"""Start tasks in self.group. Called every time the underlying
|
"""Start tasks in self.taskgroup. Called every time the underlying
|
||||||
server connection changes.
|
server connection changes.
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError() # implemented by subclasses
|
raise NotImplementedError() # implemented by subclasses
|
||||||
|
@ -1141,7 +1141,7 @@ class NetworkJobOnDefaultServer(Logger):
|
||||||
await self._stop()
|
await self._stop()
|
||||||
|
|
||||||
async def _stop(self):
|
async def _stop(self):
|
||||||
await self.group.cancel_remaining()
|
await self.taskgroup.cancel_remaining()
|
||||||
|
|
||||||
@log_exceptions
|
@log_exceptions
|
||||||
async def _restart(self, *args):
|
async def _restart(self, *args):
|
||||||
|
|
|
@ -59,7 +59,7 @@ class SPV(NetworkJobOnDefaultServer):
|
||||||
self.requested_merkle = set() # txid set of pending requests
|
self.requested_merkle = set() # txid set of pending requests
|
||||||
|
|
||||||
async def _start_tasks(self):
|
async def _start_tasks(self):
|
||||||
async with self.group as group:
|
async with self.taskgroup as group:
|
||||||
await group.spawn(self.main)
|
await group.spawn(self.main)
|
||||||
|
|
||||||
def diagnostic_name(self):
|
def diagnostic_name(self):
|
||||||
|
@ -87,12 +87,12 @@ class SPV(NetworkJobOnDefaultServer):
|
||||||
header = self.blockchain.read_header(tx_height)
|
header = self.blockchain.read_header(tx_height)
|
||||||
if header is None:
|
if header is None:
|
||||||
if tx_height < constants.net.max_checkpoint():
|
if tx_height < constants.net.max_checkpoint():
|
||||||
await self.group.spawn(self.network.request_chunk(tx_height, None, can_return_early=True))
|
await self.taskgroup.spawn(self.network.request_chunk(tx_height, None, can_return_early=True))
|
||||||
continue
|
continue
|
||||||
# request now
|
# request now
|
||||||
self.logger.info(f'requested merkle {tx_hash}')
|
self.logger.info(f'requested merkle {tx_hash}')
|
||||||
self.requested_merkle.add(tx_hash)
|
self.requested_merkle.add(tx_hash)
|
||||||
await self.group.spawn(self._request_and_verify_single_proof, tx_hash, tx_height)
|
await self.taskgroup.spawn(self._request_and_verify_single_proof, tx_hash, tx_height)
|
||||||
|
|
||||||
async def _request_and_verify_single_proof(self, tx_hash, tx_height):
|
async def _request_and_verify_single_proof(self, tx_hash, tx_height):
|
||||||
try:
|
try:
|
||||||
|
|
Loading…
Add table
Reference in a new issue