This commit is contained in:
Lex Berezhny 2019-07-15 01:12:14 -04:00
parent 45bdb9023a
commit f43e370e89
4 changed files with 82 additions and 50 deletions

View file

@ -1,6 +1,6 @@
//import 'package:lbry/src/load_generator.dart' as load; import 'package:lbry/src/load_generator.dart' as load;
import 'package:lbry/src/client.dart' as load; //import 'package:lbry/src/server_connection.dart' as load;
main(List<String> arguments) { main(List<String> arguments) {
load.cli(); load.generate_load();
} }

View file

@ -1,2 +1,2 @@
export 'src/load_generator.dart' show LoadGenerator, LoadDataPoint; export 'src/load_generator.dart';
export 'src/client.dart' show Client, MetricDataPoint; export 'src/server_connection.dart';

View file

@ -44,24 +44,32 @@ class LoadRequest {
} }
} }
typedef bool LoadTestCallback(LoadGenerator load_generator, LoadDataPoint stats); typedef bool LoadTickCallback(ClientLoadGenerator load_generator, ClientLoadDataPoint stats);
class LoadGenerator { class ClientLoadGenerator {
int load = 1;
Timer _timer; Timer _timer;
String host;
int port; final String host;
final int port;
final LoadTickCallback tickCallback;
int load = 1;
Map query; Map query;
LoadTestCallback cb; ClientLoadGenerator(this.host, this.port, {this.tickCallback, this.query}) {
if (query == null) {
LoadGenerator(this.host, this.port, this.query, this.cb); query = {
'id': 1,
'method': 'blockchain.claimtrie.resolve',
'params': ['one', 'two', 'three']
};
}
}
start() { start() {
var previous = spawn_requests(); var previous = spawn_requests();
var backlog = <LoadRequest>[]; var backlog = <LoadRequest>[];
_timer = Timer.periodic(Duration(seconds: 1), (t) { _timer = Timer.periodic(Duration(seconds: 1), (t) {
var stat = LoadDataPoint(); var stat = ClientLoadDataPoint(t.tick);
backlog.removeWhere((r) { backlog.removeWhere((r) {
if (r.isDone) stat.addCatchup(r); if (r.isDone) stat.addCatchup(r);
return r.isDone; return r.isDone;
@ -75,10 +83,11 @@ class LoadGenerator {
} }
stat.backlog = backlog.length; stat.backlog = backlog.length;
stat.load = load; stat.load = load;
if (cb(this, stat)) { stat.close();
if (tickCallback(this, stat)) {
previous = spawn_requests(); previous = spawn_requests();
} else { } else {
t.cancel(); stop();
} }
}); });
} }
@ -97,18 +106,20 @@ class LoadGenerator {
} }
class LoadDataPoint { class ClientLoadDataPoint {
final DateTime time = DateTime.now(); final int tick;
int success = 0; int success = 0;
int errored = 0; int errored = 0;
int backlog = 0; int backlog = 0;
int catchup = 0; int catchup = 0;
int _success_total = 0; int _success_total = 0;
int avg_success = 0;
int _catchup_total = 0; int _catchup_total = 0;
int avg_catchup = 0;
int load = 0; int load = 0;
int get avg_success => _success_total > 0 ? (_success_total/success).round() : 0; ClientLoadDataPoint(this.tick);
int get avg_catchup => _catchup_total > 0 ? (_catchup_total/catchup).round() : 0; ClientLoadDataPoint.empty(): this(0);
addSuccess(LoadRequest r) { addSuccess(LoadRequest r) {
success++; _success_total += r.elapsed; success++; _success_total += r.elapsed;
@ -117,16 +128,19 @@ class LoadDataPoint {
addCatchup(LoadRequest r) { addCatchup(LoadRequest r) {
catchup++; _catchup_total += r.elapsed; catchup++; _catchup_total += r.elapsed;
} }
close() {
avg_success = _success_total > 0 ? (_success_total/success).round() : 0;
avg_catchup = _catchup_total > 0 ? (_catchup_total/catchup).round() : 0;
}
} }
cli() {
generate_load() {
var runs = 1; var runs = 1;
LoadGenerator('localhost', 50001, { ClientLoadGenerator('localhost', 50001, tickCallback: (t, stats) {
'id': 1, print("run ${runs}: ${stats.load} load, ${stats.success} success, ${stats.backlog} backlog");
'method': 'blockchain.claimtrie.resolve',
'params': ['one', 'two', 'three']
}, (t, stats) {
print("run ${runs}: ${stats}");
t.load = (runs < 4 ? t.load*2 : t.load/2).round(); t.load = (runs < 4 ? t.load*2 : t.load/2).round();
return runs++ < 10; return runs++ < 10;
}).start(); }).start();

View file

@ -5,36 +5,54 @@ import 'package:web_socket_channel/io.dart';
import 'package:web_socket_channel/status.dart' as status; import 'package:web_socket_channel/status.dart' as status;
class Client { class ServerConnection {
String url; String url = "";
IOWebSocketChannel channel; IOWebSocketChannel channel;
StreamController<MetricDataPoint> _metricsController = StreamController.broadcast(); bool get isConnected => channel != null && channel.closeCode == null;
Stream<MetricDataPoint> get metrics => _metricsController.stream;
Client(this.url); final StreamController<ServerLoadDataPoint> _loadDataController = StreamController.broadcast();
Stream<ServerLoadDataPoint> get load_data => _loadDataController.stream;
ServerConnection({this.url});
open() { open() {
if (isConnected) return Future.value('already open');
channel = IOWebSocketChannel.connect(this.url); channel = IOWebSocketChannel.connect(this.url);
int tick = 1; int tick = 1;
channel.stream.listen((message) { channel.stream.listen((message) {
Map data = json.decode(message); Map data = json.decode(message);
Map commands = data['commands']; print(data);
_metricsController.add( Map commands = data['commands'] ?? {};
MetricDataPoint( _loadDataController.add(
ServerLoadDataPoint(
tick, tick,
CommandMetrics.from_map(commands['search'] ?? {}), APICallMetrics.from_map(commands['search'] ?? {}),
CommandMetrics.from_map(commands['resolve'] ?? {}) APICallMetrics.from_map(commands['resolve'] ?? {})
) )
); );
tick++; tick++;
}); });
} }
Future close() => channel.sink.close(status.goingAway); close() {
if (isConnected) {
return channel.sink.close(status.goingAway);
}
return Future.value('already closed');
}
subscribe_to_server_load_data() {
if (isConnected) channel.sink.add('subscribe');
}
unsubscribe_from_server_load_data() {
if (isConnected) channel.sink.add('unsubscribe');
}
} }
class CommandMetrics { class APICallMetrics {
final int started; final int started;
final int finished; final int finished;
final int total_time; final int total_time;
@ -46,7 +64,7 @@ class CommandMetrics {
final int avg_execution_time; final int avg_execution_time;
final int avg_query_time_per_search; final int avg_query_time_per_search;
final int avg_query_time_per_query; final int avg_query_time_per_query;
CommandMetrics( APICallMetrics(
this.started, this.finished, this.total_time, this.started, this.finished, this.total_time,
this.execution_time, this.query_time, this.query_count): this.execution_time, this.query_time, this.query_count):
avg_wait_time=finished > 0 ? ((total_time - (execution_time + query_time))/finished).round() : 0, avg_wait_time=finished > 0 ? ((total_time - (execution_time + query_time))/finished).round() : 0,
@ -54,7 +72,7 @@ class CommandMetrics {
avg_execution_time=finished > 0 ? (execution_time/finished).round() : 0, avg_execution_time=finished > 0 ? (execution_time/finished).round() : 0,
avg_query_time_per_search=finished > 0 ? (query_time/finished).round() : 0, avg_query_time_per_search=finished > 0 ? (query_time/finished).round() : 0,
avg_query_time_per_query=query_count > 0 ? (query_time/query_count).round() : 0; avg_query_time_per_query=query_count > 0 ? (query_time/query_count).round() : 0;
CommandMetrics.from_map(Map data): this( APICallMetrics.from_map(Map data): this(
data['started'] ?? 0, data['started'] ?? 0,
data['finished'] ?? 0, data['finished'] ?? 0,
data['total_time'] ?? 0, data['total_time'] ?? 0,
@ -65,20 +83,20 @@ class CommandMetrics {
} }
class MetricDataPoint { class ServerLoadDataPoint {
final int tick; final int tick;
final CommandMetrics search; final APICallMetrics search;
final CommandMetrics resolve; final APICallMetrics resolve;
MetricDataPoint(this.tick, this.search, this.resolve); ServerLoadDataPoint(this.tick, this.search, this.resolve);
MetricDataPoint.empty(): ServerLoadDataPoint.empty():
tick = 0, tick = 0,
search=CommandMetrics.from_map({}), search=APICallMetrics.from_map({}),
resolve=CommandMetrics.from_map({}); resolve=APICallMetrics.from_map({});
} }
cli() { connect_and_listen_for_load_data() {
Client('ws://localhost:8181/')..open()..metrics.listen((m) { ServerConnection(url: 'ws://localhost:8181/')..open()..load_data.listen((m) {
print(m.search); print(m.search);
}); });
} }