From cae7792a1e837701daa9db7fc241779ea49c0a4d Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 25 May 2020 10:16:18 -0400 Subject: [PATCH 1/5] add `transaction_cache_size` to config --- lbry/conf.py | 1 + lbry/wallet/ledger.py | 2 +- lbry/wallet/manager.py | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/lbry/conf.py b/lbry/conf.py index 008742157..6945e537a 100644 --- a/lbry/conf.py +++ b/lbry/conf.py @@ -636,6 +636,7 @@ class Config(CLIConfig): "Strategy to use when selecting UTXOs for a transaction", STRATEGIES, "standard") + transaction_cache_size = Integer("Transaction cache size", 100_000) save_resolved_claims = Toggle( "Save content claims to the database when they are resolved to keep file_list up to date, " "only disable this if file_x commands are not needed", True diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 56656ae4b..50adf7467 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -158,7 +158,7 @@ class Ledger(metaclass=LedgerRegistry): self._on_ready_controller = StreamController() self.on_ready = self._on_ready_controller.stream - self._tx_cache = pylru.lrucache(100000) + self._tx_cache = pylru.lrucache(self.config.get("tx_cache_size", 100_000)) self._update_tasks = TaskGroup() self._other_tasks = TaskGroup() # that we dont need to start self._utxo_reservation_lock = asyncio.Lock() diff --git a/lbry/wallet/manager.py b/lbry/wallet/manager.py index 20658a2e5..22f0e5d21 100644 --- a/lbry/wallet/manager.py +++ b/lbry/wallet/manager.py @@ -184,6 +184,7 @@ class WalletManager: 'auto_connect': True, 'default_servers': config.lbryum_servers, 'data_path': config.wallet_dir, + 'tx_cache_size': config.transaction_cache_size } wallets_directory = os.path.join(config.wallet_dir, 'wallets') From c94cc293c28fc6959394ecc5aaf14e714bbf02ea Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 25 May 2020 10:21:36 -0400 Subject: [PATCH 2/5] fix uncaught errors in test_component_manager --- tests/unit/components/test_component_manager.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/unit/components/test_component_manager.py b/tests/unit/components/test_component_manager.py index b4e81fed7..48d844168 100644 --- a/tests/unit/components/test_component_manager.py +++ b/tests/unit/components/test_component_manager.py @@ -120,6 +120,8 @@ class FakeComponent: class FakeDelayedWallet(FakeComponent): component_name = "wallet" depends_on = [] + ledger = None + default_wallet = None async def stop(self): await asyncio.sleep(1) From 6a0302fec64bba1e95bf9107142e4fbd5168bb00 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 25 May 2020 10:23:11 -0400 Subject: [PATCH 3/5] fix uncaught dht DecodeError --- lbry/dht/protocol/protocol.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lbry/dht/protocol/protocol.py b/lbry/dht/protocol/protocol.py index 016ad50bf..7b90b5644 100644 --- a/lbry/dht/protocol/protocol.py +++ b/lbry/dht/protocol/protocol.py @@ -10,6 +10,7 @@ from asyncio.protocols import DatagramProtocol from asyncio.transports import DatagramTransport from lbry.dht import constants +from lbry.dht.serialization.bencoding import DecodeError from lbry.dht.serialization.datagram import decode_datagram, ErrorDatagram, ResponseDatagram, RequestDatagram from lbry.dht.serialization.datagram import RESPONSE_TYPE, ERROR_TYPE, PAGE_KEY from lbry.dht.error import RemoteException, TransportNotConnected @@ -554,7 +555,7 @@ class KademliaProtocol(DatagramProtocol): def datagram_received(self, datagram: bytes, address: typing.Tuple[str, int]) -> None: # pylint: disable=arguments-differ try: message = decode_datagram(datagram) - except (ValueError, TypeError): + except (ValueError, TypeError, DecodeError): self.peer_manager.report_failure(address[0], address[1]) log.warning("Couldn't decode dht datagram from %s: %s", address, binascii.hexlify(datagram).decode()) return From 34eae6e6086a1fb413a1b372fb9ed1715f0f3385 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 25 May 2020 10:24:31 -0400 Subject: [PATCH 4/5] fix wallet server prometheus bucket sizes --- lbry/wallet/rpc/session.py | 6 +++++- lbry/wallet/server/block_processor.py | 7 ++++++- lbry/wallet/server/session.py | 10 +++++++--- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/lbry/wallet/rpc/session.py b/lbry/wallet/rpc/session.py index dc353b50e..58d8a10fc 100644 --- a/lbry/wallet/rpc/session.py +++ b/lbry/wallet/rpc/session.py @@ -40,6 +40,10 @@ from .jsonrpc import Request, JSONRPCConnection, JSONRPCv2, JSONRPC, Batch, Noti from .jsonrpc import RPCError, ProtocolError from .framing import BadMagicError, BadChecksumError, OversizedPayloadError, BitcoinFramer, NewlineFramer +HISTOGRAM_BUCKETS = ( + .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') +) + class Connector: @@ -379,7 +383,7 @@ class RPCSession(SessionBase): for example JSON RPC.""" RESPONSE_TIMES = Histogram("response_time", "Response times", namespace=NAMESPACE, - labelnames=("method", "version")) + labelnames=("method", "version"), buckets=HISTOGRAM_BUCKETS) NOTIFICATION_COUNT = Counter("notification", "Number of notifications sent (for subscriptions)", namespace=NAMESPACE, labelnames=("method", "version")) REQUEST_ERRORS_COUNT = Counter( diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index cb6a32f55..69a57a2eb 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -130,6 +130,9 @@ class ChainError(Exception): NAMESPACE = "wallet_server" +HISTOGRAM_BUCKETS = ( + .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') +) class BlockProcessor: @@ -142,7 +145,9 @@ class BlockProcessor: block_count_metric = Gauge( "block_count", "Number of processed blocks", namespace=NAMESPACE ) - block_update_time_metric = Histogram("block_time", "Block update times", namespace=NAMESPACE) + block_update_time_metric = Histogram( + "block_time", "Block update times", namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS + ) reorg_count_metric = Gauge( "reorg_count", "Number of reorgs", namespace=NAMESPACE ) diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 1cce96ff9..c72278992 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -119,7 +119,9 @@ class SessionGroup: NAMESPACE = "wallet_server" - +HISTOGRAM_BUCKETS = ( + .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') +) class SessionManager: """Holds global state about all sessions.""" @@ -147,7 +149,9 @@ class SessionManager: db_error_metric = Counter( "internal_error", "Number of queries raising unexpected errors", namespace=NAMESPACE ) - executor_time_metric = Histogram("executor_time", "SQLite executor times", namespace=NAMESPACE) + executor_time_metric = Histogram( + "executor_time", "SQLite executor times", namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS + ) pending_query_metric = Gauge( "pending_queries_count", "Number of pending and running sqlite queries", namespace=NAMESPACE ) @@ -990,7 +994,7 @@ class LBRYElectrumX(SessionBase): except reader.SQLiteInterruptedError as error: metrics = self.get_metrics_or_placeholder_for_api(query_name) metrics.query_interrupt(start, error.metrics) - self.session_mgr.self.session_mgr.SQLITE_INTERRUPT_COUNT.inc() + self.session_mgr.interrupt_count_metric.inc() raise RPCError(JSONRPC.QUERY_TIMEOUT, 'sqlite query timed out') except reader.SQLiteOperationalError as error: metrics = self.get_metrics_or_placeholder_for_api(query_name) From 4bbd850898e42319db751feead7f71f8c1a05ea8 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 25 May 2020 10:25:04 -0400 Subject: [PATCH 5/5] fix uncaught ValueError in hashX_unsubscribe --- lbry/wallet/server/session.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index c72278992..186755b73 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -1220,7 +1220,10 @@ class LBRYElectrumX(SessionBase): return await self.address_status(hashX) async def hashX_unsubscribe(self, hashX, alias): - del self.hashX_subs[hashX] + try: + del self.hashX_subs[hashX] + except ValueError: + pass def address_to_hashX(self, address): try: