mirror of
https://github.com/LBRYFoundation/LBRY-Vault.git
synced 2025-08-23 17:47:31 +00:00
interface: follow-up 6cc70bc7a2
This commit is contained in:
parent
cd52350f5d
commit
e8bc5bbec4
2 changed files with 29 additions and 12 deletions
|
@ -32,10 +32,12 @@ from typing import Tuple, Union, List, TYPE_CHECKING, Optional
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from ipaddress import IPv4Network, IPv6Network, ip_address
|
from ipaddress import IPv4Network, IPv6Network, ip_address
|
||||||
import itertools
|
import itertools
|
||||||
|
import logging
|
||||||
|
|
||||||
import aiorpcx
|
import aiorpcx
|
||||||
from aiorpcx import RPCSession, Notification, NetAddress
|
from aiorpcx import RPCSession, Notification, NetAddress
|
||||||
from aiorpcx.curio import timeout_after, TaskTimeout
|
from aiorpcx.curio import timeout_after, TaskTimeout
|
||||||
|
from aiorpcx.jsonrpc import JSONRPC
|
||||||
import certifi
|
import certifi
|
||||||
|
|
||||||
from .util import ignore_exceptions, log_exceptions, bfh, SilentTaskGroup
|
from .util import ignore_exceptions, log_exceptions, bfh, SilentTaskGroup
|
||||||
|
@ -148,7 +150,13 @@ class NotificationSession(RPCSession):
|
||||||
self.interface.logger.debug(msg)
|
self.interface.logger.debug(msg)
|
||||||
|
|
||||||
|
|
||||||
class GracefulDisconnect(Exception): pass
|
class GracefulDisconnect(Exception):
|
||||||
|
log_level = logging.INFO
|
||||||
|
|
||||||
|
def __init__(self, *args, log_level=None, **kwargs):
|
||||||
|
Exception.__init__(self, *args, **kwargs)
|
||||||
|
if log_level is not None:
|
||||||
|
self.log_level = log_level
|
||||||
|
|
||||||
|
|
||||||
class RequestTimedOut(GracefulDisconnect):
|
class RequestTimedOut(GracefulDisconnect):
|
||||||
|
@ -305,9 +313,7 @@ class Interface(Logger):
|
||||||
try:
|
try:
|
||||||
return await func(self, *args, **kwargs)
|
return await func(self, *args, **kwargs)
|
||||||
except GracefulDisconnect as e:
|
except GracefulDisconnect as e:
|
||||||
self.logger.info(f"disconnecting gracefully. {repr(e)}")
|
self.logger.log(e.log_level, f"disconnecting due to {repr(e)}")
|
||||||
except aiorpcx.jsonrpc.RPCError as e:
|
|
||||||
self.logger.error(f"disconnecting due to {repr(e)}")
|
|
||||||
finally:
|
finally:
|
||||||
await self.network.connection_down(self)
|
await self.network.connection_down(self)
|
||||||
self.got_disconnected.set_result(1)
|
self.got_disconnected.set_result(1)
|
||||||
|
@ -428,17 +434,21 @@ class Interface(Logger):
|
||||||
f'in bucket {self.bucket_based_on_ipaddress()}')
|
f'in bucket {self.bucket_based_on_ipaddress()}')
|
||||||
self.logger.info(f"connection established. version: {ver}")
|
self.logger.info(f"connection established. version: {ver}")
|
||||||
|
|
||||||
async with self.group as group:
|
try:
|
||||||
await group.spawn(self.ping)
|
async with self.group as group:
|
||||||
await group.spawn(self.run_fetch_blocks)
|
await group.spawn(self.ping)
|
||||||
await group.spawn(self.monitor_connection)
|
await group.spawn(self.run_fetch_blocks)
|
||||||
# NOTE: group.__aexit__ will be called here; this is needed to notice exceptions in the group!
|
await group.spawn(self.monitor_connection)
|
||||||
|
except aiorpcx.jsonrpc.RPCError as e:
|
||||||
|
if e.code in (JSONRPC.EXCESSIVE_RESOURCE_USAGE, JSONRPC.SERVER_BUSY):
|
||||||
|
raise GracefulDisconnect(e, log_level=logging.ERROR) from e
|
||||||
|
raise
|
||||||
|
|
||||||
async def monitor_connection(self):
|
async def monitor_connection(self):
|
||||||
while True:
|
while True:
|
||||||
await asyncio.sleep(1)
|
await asyncio.sleep(1)
|
||||||
if not self.session or self.session.is_closing():
|
if not self.session or self.session.is_closing():
|
||||||
raise GracefulDisconnect('server closed session')
|
raise GracefulDisconnect('session was closed')
|
||||||
|
|
||||||
async def ping(self):
|
async def ping(self):
|
||||||
while True:
|
while True:
|
||||||
|
|
|
@ -26,14 +26,16 @@ import asyncio
|
||||||
import hashlib
|
import hashlib
|
||||||
from typing import Dict, List, TYPE_CHECKING, Tuple
|
from typing import Dict, List, TYPE_CHECKING, Tuple
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
import logging
|
||||||
|
|
||||||
from aiorpcx import TaskGroup, run_in_thread
|
from aiorpcx import TaskGroup, run_in_thread, RPCError
|
||||||
|
|
||||||
from .transaction import Transaction
|
from .transaction import Transaction
|
||||||
from .util import bh2u, make_aiohttp_session, NetworkJobOnDefaultServer
|
from .util import bh2u, make_aiohttp_session, NetworkJobOnDefaultServer
|
||||||
from .bitcoin import address_to_scripthash, is_address
|
from .bitcoin import address_to_scripthash, is_address
|
||||||
from .network import UntrustedServerReturnedError
|
from .network import UntrustedServerReturnedError
|
||||||
from .logging import Logger
|
from .logging import Logger
|
||||||
|
from .interface import GracefulDisconnect
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from .network import Network
|
from .network import Network
|
||||||
|
@ -103,7 +105,12 @@ class SynchronizerBase(NetworkJobOnDefaultServer):
|
||||||
h = address_to_scripthash(addr)
|
h = address_to_scripthash(addr)
|
||||||
self.scripthash_to_address[h] = addr
|
self.scripthash_to_address[h] = addr
|
||||||
self._requests_sent += 1
|
self._requests_sent += 1
|
||||||
await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue)
|
try:
|
||||||
|
await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue)
|
||||||
|
except RPCError as e:
|
||||||
|
if e.message == 'history too large': # no unique error code
|
||||||
|
raise GracefulDisconnect(e, log_level=logging.ERROR) from e
|
||||||
|
raise
|
||||||
self._requests_answered += 1
|
self._requests_answered += 1
|
||||||
self.requested_addrs.remove(addr)
|
self.requested_addrs.remove(addr)
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue