mirror of
https://github.com/LBRYFoundation/LBRY-Vault.git
synced 2025-08-23 17:47:31 +00:00
network: put NetworkTimeout constants together (#4945)
* network: put NetworkTimeout constants together * fix prev
This commit is contained in:
parent
43461a1866
commit
7773443c17
2 changed files with 29 additions and 6 deletions
|
@ -52,6 +52,17 @@ if TYPE_CHECKING:
|
||||||
ca_path = certifi.where()
|
ca_path = certifi.where()
|
||||||
|
|
||||||
|
|
||||||
|
class NetworkTimeout:
|
||||||
|
# seconds
|
||||||
|
class Generic:
|
||||||
|
NORMAL = 30
|
||||||
|
RELAXED = 45
|
||||||
|
MOST_RELAXED = 180
|
||||||
|
class Urgent(Generic):
|
||||||
|
NORMAL = 10
|
||||||
|
RELAXED = 20
|
||||||
|
MOST_RELAXED = 60
|
||||||
|
|
||||||
class NotificationSession(RPCSession):
|
class NotificationSession(RPCSession):
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
|
@ -59,6 +70,7 @@ class NotificationSession(RPCSession):
|
||||||
self.subscriptions = defaultdict(list)
|
self.subscriptions = defaultdict(list)
|
||||||
self.cache = {}
|
self.cache = {}
|
||||||
self.in_flight_requests_semaphore = asyncio.Semaphore(100)
|
self.in_flight_requests_semaphore = asyncio.Semaphore(100)
|
||||||
|
self.default_timeout = NetworkTimeout.Generic.NORMAL
|
||||||
|
|
||||||
async def handle_request(self, request):
|
async def handle_request(self, request):
|
||||||
# note: if server sends malformed request and we raise, the superclass
|
# note: if server sends malformed request and we raise, the superclass
|
||||||
|
@ -76,7 +88,7 @@ class NotificationSession(RPCSession):
|
||||||
async def send_request(self, *args, timeout=None, **kwargs):
|
async def send_request(self, *args, timeout=None, **kwargs):
|
||||||
# note: the timeout starts after the request touches the wire!
|
# note: the timeout starts after the request touches the wire!
|
||||||
if timeout is None:
|
if timeout is None:
|
||||||
timeout = 30
|
timeout = self.default_timeout
|
||||||
# note: the semaphore implementation guarantees no starvation
|
# note: the semaphore implementation guarantees no starvation
|
||||||
async with self.in_flight_requests_semaphore:
|
async with self.in_flight_requests_semaphore:
|
||||||
try:
|
try:
|
||||||
|
@ -327,9 +339,9 @@ class Interface(PrintError):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
async def get_block_header(self, height, assert_mode):
|
async def get_block_header(self, height, assert_mode):
|
||||||
# use lower timeout as we usually have network.bhi_lock here
|
|
||||||
self.print_error('requesting block header {} in mode {}'.format(height, assert_mode))
|
self.print_error('requesting block header {} in mode {}'.format(height, assert_mode))
|
||||||
timeout = 5 if not self.proxy else 10
|
# use lower timeout as we usually have network.bhi_lock here
|
||||||
|
timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
|
||||||
res = await self.session.send_request('blockchain.block.header', [height], timeout=timeout)
|
res = await self.session.send_request('blockchain.block.header', [height], timeout=timeout)
|
||||||
return blockchain.deserialize_header(bytes.fromhex(res), height)
|
return blockchain.deserialize_header(bytes.fromhex(res), height)
|
||||||
|
|
||||||
|
@ -358,6 +370,7 @@ class Interface(PrintError):
|
||||||
host=self.host, port=self.port,
|
host=self.host, port=self.port,
|
||||||
ssl=sslc, proxy=self.proxy) as session:
|
ssl=sslc, proxy=self.proxy) as session:
|
||||||
self.session = session # type: NotificationSession
|
self.session = session # type: NotificationSession
|
||||||
|
self.session.default_timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Generic)
|
||||||
try:
|
try:
|
||||||
ver = await session.send_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])
|
ver = await session.send_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])
|
||||||
except aiorpcx.jsonrpc.RPCError as e:
|
except aiorpcx.jsonrpc.RPCError as e:
|
||||||
|
|
|
@ -49,7 +49,8 @@ from . import constants
|
||||||
from . import blockchain
|
from . import blockchain
|
||||||
from . import bitcoin
|
from . import bitcoin
|
||||||
from .blockchain import Blockchain, HEADER_SIZE
|
from .blockchain import Blockchain, HEADER_SIZE
|
||||||
from .interface import Interface, serialize_server, deserialize_server, RequestTimedOut
|
from .interface import (Interface, serialize_server, deserialize_server,
|
||||||
|
RequestTimedOut, NetworkTimeout)
|
||||||
from .version import PROTOCOL_VERSION
|
from .version import PROTOCOL_VERSION
|
||||||
from .simple_config import SimpleConfig
|
from .simple_config import SimpleConfig
|
||||||
|
|
||||||
|
@ -649,11 +650,18 @@ class Network(PrintError):
|
||||||
await self._close_interface(interface)
|
await self._close_interface(interface)
|
||||||
self.trigger_callback('network_updated')
|
self.trigger_callback('network_updated')
|
||||||
|
|
||||||
|
def get_network_timeout_seconds(self, request_type=NetworkTimeout.Generic) -> int:
|
||||||
|
if self.oneserver and not self.auto_connect:
|
||||||
|
return request_type.MOST_RELAXED
|
||||||
|
if self.proxy:
|
||||||
|
return request_type.RELAXED
|
||||||
|
return request_type.NORMAL
|
||||||
|
|
||||||
@ignore_exceptions # do not kill main_taskgroup
|
@ignore_exceptions # do not kill main_taskgroup
|
||||||
@log_exceptions
|
@log_exceptions
|
||||||
async def _run_new_interface(self, server):
|
async def _run_new_interface(self, server):
|
||||||
interface = Interface(self, server, self.proxy)
|
interface = Interface(self, server, self.proxy)
|
||||||
timeout = 10 if not self.proxy else 20
|
timeout = self.get_network_timeout_seconds(NetworkTimeout.Urgent)
|
||||||
try:
|
try:
|
||||||
await asyncio.wait_for(interface.ready, timeout)
|
await asyncio.wait_for(interface.ready, timeout)
|
||||||
except BaseException as e:
|
except BaseException as e:
|
||||||
|
@ -724,7 +732,9 @@ class Network(PrintError):
|
||||||
return await self.interface.session.send_request('blockchain.transaction.get_merkle', [tx_hash, tx_height])
|
return await self.interface.session.send_request('blockchain.transaction.get_merkle', [tx_hash, tx_height])
|
||||||
|
|
||||||
@best_effort_reliable
|
@best_effort_reliable
|
||||||
async def broadcast_transaction(self, tx, *, timeout=10):
|
async def broadcast_transaction(self, tx, *, timeout=None):
|
||||||
|
if timeout is None:
|
||||||
|
timeout = self.get_network_timeout_seconds(NetworkTimeout.Urgent)
|
||||||
out = await self.interface.session.send_request('blockchain.transaction.broadcast', [str(tx)], timeout=timeout)
|
out = await self.interface.session.send_request('blockchain.transaction.broadcast', [str(tx)], timeout=timeout)
|
||||||
if out != tx.txid():
|
if out != tx.txid():
|
||||||
raise Exception(out)
|
raise Exception(out)
|
||||||
|
|
Loading…
Add table
Reference in a new issue