From f43e370e896efd554f7e81169cd802c3d9006798 Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Mon, 15 Jul 2019 01:12:14 -0400 Subject: [PATCH] refactor --- dart/packages/lbry/bin/main.dart | 6 +- dart/packages/lbry/lib/lbry.dart | 4 +- .../packages/lbry/lib/src/load_generator.dart | 58 ++++++++++------- .../lbry/lib/src/server_connection.dart | 64 ++++++++++++------- 4 files changed, 82 insertions(+), 50 deletions(-) diff --git a/dart/packages/lbry/bin/main.dart b/dart/packages/lbry/bin/main.dart index 00aeec9e7..e9b7129ec 100644 --- a/dart/packages/lbry/bin/main.dart +++ b/dart/packages/lbry/bin/main.dart @@ -1,6 +1,6 @@ -//import 'package:lbry/src/load_generator.dart' as load; -import 'package:lbry/src/client.dart' as load; +import 'package:lbry/src/load_generator.dart' as load; +//import 'package:lbry/src/server_connection.dart' as load; main(List arguments) { - load.cli(); + load.generate_load(); } diff --git a/dart/packages/lbry/lib/lbry.dart b/dart/packages/lbry/lib/lbry.dart index 19e8b3ba3..87fe36dec 100644 --- a/dart/packages/lbry/lib/lbry.dart +++ b/dart/packages/lbry/lib/lbry.dart @@ -1,2 +1,2 @@ -export 'src/load_generator.dart' show LoadGenerator, LoadDataPoint; -export 'src/client.dart' show Client, MetricDataPoint; +export 'src/load_generator.dart'; +export 'src/server_connection.dart'; diff --git a/dart/packages/lbry/lib/src/load_generator.dart b/dart/packages/lbry/lib/src/load_generator.dart index a48a8f3af..751b347f3 100644 --- a/dart/packages/lbry/lib/src/load_generator.dart +++ b/dart/packages/lbry/lib/src/load_generator.dart @@ -44,24 +44,32 @@ class LoadRequest { } } -typedef bool LoadTestCallback(LoadGenerator load_generator, LoadDataPoint stats); +typedef bool LoadTickCallback(ClientLoadGenerator load_generator, ClientLoadDataPoint stats); -class LoadGenerator { - int load = 1; +class ClientLoadGenerator { Timer _timer; - String host; - int port; + + final String host; + final int port; + final LoadTickCallback tickCallback; + int load = 1; Map query; - LoadTestCallback cb; - - LoadGenerator(this.host, this.port, this.query, this.cb); + ClientLoadGenerator(this.host, this.port, {this.tickCallback, this.query}) { + if (query == null) { + query = { + 'id': 1, + 'method': 'blockchain.claimtrie.resolve', + 'params': ['one', 'two', 'three'] + }; + } + } start() { var previous = spawn_requests(); var backlog = []; _timer = Timer.periodic(Duration(seconds: 1), (t) { - var stat = LoadDataPoint(); + var stat = ClientLoadDataPoint(t.tick); backlog.removeWhere((r) { if (r.isDone) stat.addCatchup(r); return r.isDone; @@ -75,10 +83,11 @@ class LoadGenerator { } stat.backlog = backlog.length; stat.load = load; - if (cb(this, stat)) { + stat.close(); + if (tickCallback(this, stat)) { previous = spawn_requests(); } else { - t.cancel(); + stop(); } }); } @@ -97,18 +106,20 @@ class LoadGenerator { } -class LoadDataPoint { - final DateTime time = DateTime.now(); +class ClientLoadDataPoint { + final int tick; int success = 0; int errored = 0; int backlog = 0; int catchup = 0; int _success_total = 0; + int avg_success = 0; int _catchup_total = 0; + int avg_catchup = 0; int load = 0; - int get avg_success => _success_total > 0 ? (_success_total/success).round() : 0; - int get avg_catchup => _catchup_total > 0 ? (_catchup_total/catchup).round() : 0; + ClientLoadDataPoint(this.tick); + ClientLoadDataPoint.empty(): this(0); addSuccess(LoadRequest r) { success++; _success_total += r.elapsed; @@ -117,16 +128,19 @@ class LoadDataPoint { addCatchup(LoadRequest r) { catchup++; _catchup_total += r.elapsed; } + + close() { + avg_success = _success_total > 0 ? (_success_total/success).round() : 0; + avg_catchup = _catchup_total > 0 ? (_catchup_total/catchup).round() : 0; + } + } -cli() { + +generate_load() { var runs = 1; - LoadGenerator('localhost', 50001, { - 'id': 1, - 'method': 'blockchain.claimtrie.resolve', - 'params': ['one', 'two', 'three'] - }, (t, stats) { - print("run ${runs}: ${stats}"); + ClientLoadGenerator('localhost', 50001, tickCallback: (t, stats) { + print("run ${runs}: ${stats.load} load, ${stats.success} success, ${stats.backlog} backlog"); t.load = (runs < 4 ? t.load*2 : t.load/2).round(); return runs++ < 10; }).start(); diff --git a/dart/packages/lbry/lib/src/server_connection.dart b/dart/packages/lbry/lib/src/server_connection.dart index 8a7cbf2d4..4c1bbe796 100644 --- a/dart/packages/lbry/lib/src/server_connection.dart +++ b/dart/packages/lbry/lib/src/server_connection.dart @@ -5,36 +5,54 @@ import 'package:web_socket_channel/io.dart'; import 'package:web_socket_channel/status.dart' as status; -class Client { - String url; +class ServerConnection { + String url = ""; IOWebSocketChannel channel; - StreamController _metricsController = StreamController.broadcast(); - Stream get metrics => _metricsController.stream; + bool get isConnected => channel != null && channel.closeCode == null; - Client(this.url); + final StreamController _loadDataController = StreamController.broadcast(); + Stream get load_data => _loadDataController.stream; + + ServerConnection({this.url}); open() { + if (isConnected) return Future.value('already open'); channel = IOWebSocketChannel.connect(this.url); int tick = 1; channel.stream.listen((message) { Map data = json.decode(message); - Map commands = data['commands']; - _metricsController.add( - MetricDataPoint( + print(data); + Map commands = data['commands'] ?? {}; + _loadDataController.add( + ServerLoadDataPoint( tick, - CommandMetrics.from_map(commands['search'] ?? {}), - CommandMetrics.from_map(commands['resolve'] ?? {}) + APICallMetrics.from_map(commands['search'] ?? {}), + APICallMetrics.from_map(commands['resolve'] ?? {}) ) ); tick++; }); } - Future close() => channel.sink.close(status.goingAway); + close() { + if (isConnected) { + return channel.sink.close(status.goingAway); + } + return Future.value('already closed'); + } + + subscribe_to_server_load_data() { + if (isConnected) channel.sink.add('subscribe'); + } + + unsubscribe_from_server_load_data() { + if (isConnected) channel.sink.add('unsubscribe'); + } + } -class CommandMetrics { +class APICallMetrics { final int started; final int finished; final int total_time; @@ -46,7 +64,7 @@ class CommandMetrics { final int avg_execution_time; final int avg_query_time_per_search; final int avg_query_time_per_query; - CommandMetrics( + APICallMetrics( this.started, this.finished, this.total_time, this.execution_time, this.query_time, this.query_count): avg_wait_time=finished > 0 ? ((total_time - (execution_time + query_time))/finished).round() : 0, @@ -54,7 +72,7 @@ class CommandMetrics { avg_execution_time=finished > 0 ? (execution_time/finished).round() : 0, avg_query_time_per_search=finished > 0 ? (query_time/finished).round() : 0, avg_query_time_per_query=query_count > 0 ? (query_time/query_count).round() : 0; - CommandMetrics.from_map(Map data): this( + APICallMetrics.from_map(Map data): this( data['started'] ?? 0, data['finished'] ?? 0, data['total_time'] ?? 0, @@ -65,20 +83,20 @@ class CommandMetrics { } -class MetricDataPoint { +class ServerLoadDataPoint { final int tick; - final CommandMetrics search; - final CommandMetrics resolve; - MetricDataPoint(this.tick, this.search, this.resolve); - MetricDataPoint.empty(): + final APICallMetrics search; + final APICallMetrics resolve; + ServerLoadDataPoint(this.tick, this.search, this.resolve); + ServerLoadDataPoint.empty(): tick = 0, - search=CommandMetrics.from_map({}), - resolve=CommandMetrics.from_map({}); + search=APICallMetrics.from_map({}), + resolve=APICallMetrics.from_map({}); } -cli() { - Client('ws://localhost:8181/')..open()..metrics.listen((m) { +connect_and_listen_for_load_data() { + ServerConnection(url: 'ws://localhost:8181/')..open()..load_data.listen((m) { print(m.search); }); }