Skip to content

Commit ed1d30d

Browse files
committed
Add publish confirm latency metrics to performance tool
Fixes #129
1 parent 8c78667 commit ed1d30d

File tree

6 files changed

+148
-26
lines changed

6 files changed

+148
-26
lines changed

src/main/java/com/rabbitmq/stream/perf/DefaultPerformanceMetrics.java

+49-10
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -35,6 +35,7 @@
3535
import java.util.concurrent.TimeUnit;
3636
import java.util.concurrent.atomic.AtomicInteger;
3737
import java.util.concurrent.atomic.AtomicLong;
38+
import java.util.function.BiFunction;
3839
import java.util.function.Function;
3940
import java.util.function.Supplier;
4041
import org.slf4j.Logger;
@@ -45,7 +46,7 @@ class DefaultPerformanceMetrics implements PerformanceMetrics {
4546
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultPerformanceMetrics.class);
4647

4748
private final MetricRegistry metricRegistry;
48-
private final Timer latency;
49+
private final Timer latency, confirmLatency;
4950
private final boolean summaryFile;
5051
private final PrintWriter out;
5152
private final boolean includeByteRates;
@@ -60,6 +61,7 @@ class DefaultPerformanceMetrics implements PerformanceMetrics {
6061
String metricsPrefix,
6162
boolean summaryFile,
6263
boolean includeByteRates,
64+
boolean confirmLatency,
6365
Supplier<String> memoryReportSupplier,
6466
PrintWriter out) {
6567
this.summaryFile = summaryFile;
@@ -98,6 +100,17 @@ protected Double nullGaugeValue() {
98100
.distributionStatisticExpiry(Duration.ofSeconds(1))
99101
.serviceLevelObjectives()
100102
.register(meterRegistry);
103+
if (confirmLatency) {
104+
this.confirmLatency =
105+
Timer.builder(metricsPrefix + ".confirm_latency")
106+
.description("publish confirm latency")
107+
.publishPercentiles(0.5, 0.75, 0.95, 0.99)
108+
.distributionStatisticExpiry(Duration.ofSeconds(1))
109+
.serviceLevelObjectives()
110+
.register(meterRegistry);
111+
} else {
112+
this.confirmLatency = null;
113+
}
101114
}
102115

103116
private long getPublishedCount() {
@@ -117,6 +130,7 @@ public void start(String description) throws Exception {
117130
String metricConsumed = "rabbitmqStreamConsumed";
118131
String metricChunkSize = "rabbitmqStreamChunk_size";
119132
String metricLatency = "rabbitmqStreamLatency";
133+
String metricConfirmLatency = "rabbitmqStreamConfirm_latency";
120134
String metricWrittenBytes = "rabbitmqStreamWritten_bytes";
121135
String metricReadBytes = "rabbitmqStreamRead_bytes";
122136

@@ -129,6 +143,10 @@ public void start(String description) throws Exception {
129143
metricChunkSize,
130144
metricLatency));
131145

146+
if (confirmLatency()) {
147+
allMetrics.add(metricConfirmLatency);
148+
}
149+
132150
Map<String, String> metersNamesAndLabels = new LinkedHashMap<>();
133151
metersNamesAndLabels.put(metricPublished, "published");
134152
metersNamesAndLabels.put(metricProducerConfirmed, "confirmed");
@@ -184,14 +202,16 @@ public void start(String description) throws Exception {
184202
histogram -> String.format("chunk size %.0f", histogram.getSnapshot().getMean());
185203

186204
com.codahale.metrics.Timer latency = metricRegistry.getTimers().get(metricLatency);
205+
com.codahale.metrics.Timer confirmLatency =
206+
confirmLatency() ? metricRegistry.getTimers().get(metricConfirmLatency) : null;
187207

188208
Function<Number, Number> convertDuration =
189209
in -> in instanceof Long ? in.longValue() / 1_000_000 : in.doubleValue() / 1_000_000;
190-
Function<com.codahale.metrics.Timer, String> formatLatency =
191-
timer -> {
210+
BiFunction<String, com.codahale.metrics.Timer, String> formatLatency =
211+
(name, timer) -> {
192212
Snapshot snapshot = timer.getSnapshot();
193213
return String.format(
194-
"latency min/median/75th/95th/99th %.0f/%.0f/%.0f/%.0f/%.0f ms",
214+
name + " min/median/75th/95th/99th %.0f/%.0f/%.0f/%.0f/%.0f ms",
195215
convertDuration.apply(snapshot.getMin()),
196216
convertDuration.apply(snapshot.getMedian()),
197217
convertDuration.apply(snapshot.get75thPercentile()),
@@ -229,7 +249,12 @@ public void start(String description) throws Exception {
229249
.compute(lastValue, currentValue, duration));
230250
lastMetersValues.put(meterName, currentValue);
231251
});
232-
builder.append(formatLatency.apply(latency)).append(", ");
252+
if (confirmLatency()) {
253+
builder
254+
.append(formatLatency.apply("confirm latency", confirmLatency))
255+
.append(", ");
256+
}
257+
builder.append(formatLatency.apply("latency", latency)).append(", ");
233258
builder.append(formatChunkSize.apply(chunkSize));
234259
this.out.println(builder);
235260
String memoryReport = this.memoryReportSupplier.get();
@@ -270,15 +295,20 @@ public void start(String description) throws Exception {
270295
}
271296
};
272297

273-
Function<com.codahale.metrics.Timer, String> formatLatencySummary =
274-
histogram ->
298+
BiFunction<String, com.codahale.metrics.Timer, String> formatLatencySummary =
299+
(name, histogram) ->
275300
String.format(
276-
"latency 95th %.0f ms",
301+
name + " 95th %.0f ms",
277302
convertDuration.apply(latency.getSnapshot().get95thPercentile()));
278303

279304
StringBuilder builder = new StringBuilder("Summary: ");
280305
meters.entrySet().forEach(entry -> builder.append(formatMeterSummary.apply(entry)));
281-
builder.append(formatLatencySummary.apply(latency)).append(", ");
306+
if (confirmLatency()) {
307+
builder
308+
.append(formatLatencySummary.apply("confirm latency", confirmLatency))
309+
.append(", ");
310+
}
311+
builder.append(formatLatencySummary.apply("latency", latency)).append(", ");
282312
builder.append(formatChunkSize.apply(chunkSize));
283313
this.out.println();
284314
this.out.println(builder);
@@ -369,6 +399,11 @@ public void latency(long latency, TimeUnit unit) {
369399
this.latency.record(latency, unit);
370400
}
371401

402+
@Override
403+
public void confirmLatency(long latency, TimeUnit unit) {
404+
this.confirmLatency.record(latency, unit);
405+
}
406+
372407
@Override
373408
public void offset(long offset) {
374409
this.offset = offset;
@@ -379,6 +414,10 @@ public void close() throws Exception {
379414
this.closingSequence.close();
380415
}
381416

417+
private boolean confirmLatency() {
418+
return this.confirmLatency != null;
419+
}
420+
382421
private interface FormatCallback {
383422

384423
String compute(long lastValue, long currentValue, Duration duration);

src/main/java/com/rabbitmq/stream/perf/PerformanceMetrics.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -21,5 +21,7 @@ interface PerformanceMetrics extends AutoCloseable {
2121

2222
void latency(long latency, TimeUnit unit);
2323

24+
void confirmLatency(long latency, TimeUnit unit);
25+
2426
void offset(long offset);
2527
}

src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java

+47-12
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,12 @@ public class StreamPerfTest implements Callable<Integer> {
347347
converter = Utils.PositiveIntegerTypeConverter.class)
348348
private int rpcTimeout;
349349

350+
@CommandLine.Option(
351+
names = {"--confirm-latency", "-cl"},
352+
description = "evaluate confirm latency",
353+
defaultValue = "false")
354+
private boolean confirmLatency;
355+
350356
private MetricsCollector metricsCollector;
351357
private PerformanceMetrics performanceMetrics;
352358
private List<Monitoring> monitorings;
@@ -499,6 +505,7 @@ public Integer call() throws Exception {
499505
metricsPrefix,
500506
this.summaryFile,
501507
this.includeByteRates,
508+
this.confirmLatency,
502509
memoryReportSupplier,
503510
this.out);
504511

@@ -649,11 +656,6 @@ public Integer call() throws Exception {
649656
}));
650657
}
651658

652-
// FIXME handle metadata update for consumers and publishers
653-
// they should at least issue a warning that their stream has been deleted and that they're
654-
// now
655-
// useless
656-
657659
List<Producer> producers = Collections.synchronizedList(new ArrayList<>(this.producers));
658660
List<Runnable> producerRunnables =
659661
IntStream.range(0, this.producers)
@@ -686,18 +688,48 @@ public Integer call() throws Exception {
686688
.stream(stream)
687689
.build();
688690

691+
AtomicLong messageCount = new AtomicLong(0);
692+
ConfirmationHandler confirmationHandler;
693+
if (this.confirmLatency) {
694+
final PerformanceMetrics metrics = this.performanceMetrics;
695+
final int divisor = Utils.downSamplingDivisor(this.rate);
696+
confirmationHandler =
697+
confirmationStatus -> {
698+
if (confirmationStatus.isConfirmed()) {
699+
producerConfirm.increment();
700+
// at very high throughput ( > 1 M / s), the histogram can
701+
// become a bottleneck,
702+
// so we downsample and calculate latency for every x message
703+
// this should not affect the metric much
704+
if (messageCount.incrementAndGet() % divisor == 0) {
705+
try {
706+
long time =
707+
Utils.readLong(
708+
confirmationStatus.getMessage().getBodyAsBinary());
709+
// see below why we use current time to measure latency
710+
metrics.confirmLatency(
711+
System.currentTimeMillis() - time, TimeUnit.MILLISECONDS);
712+
} catch (Exception e) {
713+
// not able to read the body, something wrong?
714+
}
715+
}
716+
}
717+
};
718+
} else {
719+
confirmationHandler =
720+
confirmationStatus -> {
721+
if (confirmationStatus.isConfirmed()) {
722+
producerConfirm.increment();
723+
}
724+
};
725+
}
726+
689727
producers.add(producer);
690728

691729
return (Runnable)
692730
() -> {
693731
final int msgSize = this.messageSize;
694732

695-
ConfirmationHandler confirmationHandler =
696-
confirmationStatus -> {
697-
if (confirmationStatus.isConfirmed()) {
698-
producerConfirm.increment();
699-
}
700-
};
701733
try {
702734
while (true && !Thread.currentThread().isInterrupted()) {
703735
rateLimiterCallback.run();
@@ -747,14 +779,17 @@ public Integer call() throws Exception {
747779
.builder();
748780
}
749781

782+
// we assume the publishing rate is the same order as the consuming rate
783+
// we actually don't want to downsample for low rates
784+
final int divisor = Utils.downSamplingDivisor(this.rate);
750785
consumerBuilder =
751786
consumerBuilder.messageHandler(
752787
(context, message) -> {
753788
// at very high throughput ( > 1 M / s), the histogram can
754789
// become a bottleneck,
755790
// so we downsample and calculate latency for every x message
756791
// this should not affect the metric much
757-
if (messageCount.incrementAndGet() % 100 == 0) {
792+
if (messageCount.incrementAndGet() % 100 == divisor) {
758793
try {
759794
long time = Utils.readLong(message.getBodyAsBinary());
760795
// see above why we use current time to measure latency

src/main/java/com/rabbitmq/stream/perf/Utils.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -278,6 +278,17 @@ static CommandSpec buildCommandSpec(Object... commands) {
278278
return spec;
279279
}
280280

281+
static int downSamplingDivisor(int rate) {
282+
int divisor;
283+
if (rate > 0) {
284+
divisor = rate > 100 ? 100 : 1; // no downsampling for small rates
285+
} else {
286+
// no rate limitation, downsampling
287+
divisor = 100;
288+
}
289+
return divisor;
290+
}
291+
281292
static class ByteCapacityTypeConverter implements CommandLine.ITypeConverter<ByteCapacity> {
282293

283294
@Override

src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java

+26-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2021-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -341,6 +341,26 @@ void monitoringShouldReturnValidEndpoint() throws Exception {
341341
waitRunEnds();
342342
}
343343

344+
@Test
345+
void publishConfirmLatencyShouldBeIncludedWhenOptionIsEnabled() throws Exception {
346+
Future<?> run = run(builder().confirmLatency());
347+
waitUntilStreamExists(s);
348+
waitOneSecond();
349+
run.cancel(true);
350+
waitRunEnds();
351+
assertThat(consoleOutput()).contains("confirm latency");
352+
}
353+
354+
@Test
355+
void publishConfirmLatencyShouldNotBeIncludedWhenOptionIsDisabled() throws Exception {
356+
Future<?> run = run(builder());
357+
waitUntilStreamExists(s);
358+
waitOneSecond();
359+
run.cancel(true);
360+
waitRunEnds();
361+
assertThat(consoleOutput()).doesNotContain("confirm latency");
362+
}
363+
344364
private static HttpResponse httpRequest(String urlString) throws Exception {
345365
URL url = new URL(urlString);
346366
HttpURLConnection con = (HttpURLConnection) url.openConnection();
@@ -439,6 +459,11 @@ ArgumentsBuilder memoryReport() {
439459
return this;
440460
}
441461

462+
ArgumentsBuilder confirmLatency() {
463+
arguments.put("confirm-latency", "");
464+
return this;
465+
}
466+
442467
ArgumentsBuilder maxLengthBytes(ByteCapacity capacity) {
443468
arguments.put("max-length-bytes", capacity.toString());
444469
return this;

src/test/java/com/rabbitmq/stream/perf/UtilsTest.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -289,6 +289,16 @@ void buildCommandSpec() {
289289
assertThat(spec.optionsMap()).hasSize(4).containsKeys("AAA", "B", "C", "OFFSET");
290290
}
291291

292+
@ParameterizedTest
293+
@CsvSource({
294+
"0,100",
295+
"50,1",
296+
"1000,100",
297+
})
298+
void testDownSamplingDivisor(int rate, int expected) {
299+
assertThat(Utils.downSamplingDivisor(rate)).isEqualTo(expected);
300+
}
301+
292302
@ParameterizedTest
293303
@CsvSource({
294304
"--uris,URIS",

0 commit comments

Comments
 (0)