Skip to content

Commit 0192196

Browse files
committed
Add --dynamic-batch-size flag
References rabbitmq/rabbitmq-stream-java-client#649
1 parent 7b18622 commit 0192196

File tree

3 files changed

+62
-5
lines changed

3 files changed

+62
-5
lines changed

src/main/java/com/rabbitmq/stream/perf/DefaultPerformanceMetrics.java

+22
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.concurrent.atomic.AtomicInteger;
4242
import java.util.concurrent.atomic.AtomicLong;
4343
import java.util.function.BiFunction;
44+
import java.util.function.Consumer;
4445
import java.util.function.Function;
4546
import java.util.function.Supplier;
4647
import org.slf4j.Logger;
@@ -56,6 +57,7 @@ class DefaultPerformanceMetrics implements PerformanceMetrics {
5657
private final boolean summaryFile;
5758
private final PrintWriter out;
5859
private final boolean includeByteRates;
60+
private final boolean includeBatchSize;
5961
private final Supplier<String> memoryReportSupplier;
6062
private volatile Closeable closingSequence = () -> {};
6163
private volatile long lastPublishedCount = 0;
@@ -73,11 +75,13 @@ class DefaultPerformanceMetrics implements PerformanceMetrics {
7375
String metricsPrefix,
7476
boolean summaryFile,
7577
boolean includeByteRates,
78+
boolean batchSize,
7679
boolean confirmLatency,
7780
Supplier<String> memoryReportSupplier,
7881
PrintWriter out) {
7982
this.summaryFile = summaryFile;
8083
this.includeByteRates = includeByteRates;
84+
this.includeBatchSize = batchSize;
8185
this.memoryReportSupplier = memoryReportSupplier;
8286
this.out = out;
8387
this.metricsPrefix = metricsPrefix;
@@ -116,6 +120,7 @@ public void start(String description) throws Exception {
116120
long startTime = System.nanoTime();
117121

118122
String metricPublished = metricsName("published");
123+
String metricPublishBatchSize = metricsName("publish_batch_size");
119124
String metricProducerConfirmed = metricsName("producer_confirmed");
120125
String metricConsumed = metricsName("consumed");
121126
String metricChunkSize = metricsName("chunk_size");
@@ -133,6 +138,10 @@ public void start(String description) throws Exception {
133138
metricChunkSize,
134139
metricLatency));
135140

141+
if (this.includeBatchSize) {
142+
allMetrics.add(metricPublishBatchSize);
143+
}
144+
136145
if (confirmLatency()) {
137146
allMetrics.add(metricConfirmLatency);
138147
}
@@ -191,6 +200,17 @@ public void start(String description) throws Exception {
191200
});
192201
});
193202

203+
Consumer<StringBuilder> publishBatchSizeCallback;
204+
if (this.includeBatchSize) {
205+
HistogramSupport publishBatchSize = meterRegistry.get(metricPublishBatchSize).summary();
206+
Function<HistogramSupport, String> formatPublishBatchSize =
207+
histogram -> String.format("publish batch size %.0f", histogram.takeSnapshot().mean());
208+
publishBatchSizeCallback =
209+
sb -> sb.append(formatPublishBatchSize.apply(publishBatchSize)).append(", ");
210+
} else {
211+
publishBatchSizeCallback = ignored -> {};
212+
}
213+
194214
HistogramSupport chunkSize = meterRegistry.get(metricChunkSize).summary();
195215
Function<HistogramSupport, String> formatChunkSize =
196216
histogram -> String.format("chunk size %.0f", histogram.takeSnapshot().mean());
@@ -244,6 +264,7 @@ public void start(String description) throws Exception {
244264
.append(", ");
245265
}
246266
builder.append(formatLatency.apply("latency", latency)).append(", ");
267+
publishBatchSizeCallback.accept(builder);
247268
builder.append(formatChunkSize.apply(chunkSize));
248269
this.out.println(builder);
249270
String memoryReport = this.memoryReportSupplier.get();
@@ -299,6 +320,7 @@ public void start(String description) throws Exception {
299320
.append(", ");
300321
}
301322
builder.append(formatLatencySummary.apply("latency", latency)).append(", ");
323+
publishBatchSizeCallback.accept(builder);
302324
builder.append(formatChunkSize.apply(chunkSize));
303325
this.out.println();
304326
this.out.println(builder);

src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java

+17-1
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,18 @@ public class StreamPerfTest implements Callable<Integer> {
458458
defaultValue = "false")
459459
private boolean noDevMode;
460460

461+
@CommandLine.Option(
462+
names = {"--dynamic-batch-size", "-dbs"},
463+
description = "use dynamic batch size for publishing",
464+
defaultValue = "false")
465+
private boolean dynamicBatch;
466+
467+
@CommandLine.Option(
468+
names = {"--batch-size-metric", "-bsm"},
469+
description = "display batch size",
470+
defaultValue = "false")
471+
private boolean includeBatchSizeMetric;
472+
461473
static class InstanceSyncOptions {
462474

463475
@CommandLine.Option(
@@ -689,7 +701,9 @@ public Integer call() throws Exception {
689701
.tags(tags)
690702
.register(meterRegistry);
691703
}
692-
this.metricsCollector = new PerformanceMicrometerMetricsCollector(meterRegistry, metricsPrefix);
704+
this.metricsCollector =
705+
new PerformanceMicrometerMetricsCollector(
706+
meterRegistry, metricsPrefix, this.includeBatchSizeMetric);
693707

694708
Counter producerConfirm = meterRegistry.counter(metricsPrefix + ".producer_confirmed");
695709

@@ -751,6 +765,7 @@ public Integer call() throws Exception {
751765
metricsPrefix,
752766
this.summaryFile,
753767
this.includeByteRates,
768+
this.includeBatchSizeMetric,
754769
this.confirmLatency,
755770
memoryReportSupplier,
756771
this.out);
@@ -972,6 +987,7 @@ public Integer call() throws Exception {
972987
ProducerBuilder producerBuilder =
973988
environment
974989
.producerBuilder()
990+
.dynamicBatch(this.dynamicBatch)
975991
.batchPublishingDelay(ofMillis(this.batchPublishingDelay));
976992

977993
String producerName = this.producerNameStrategy.apply(stream, i + 1);

src/main/java/com/rabbitmq/stream/perf/Utils.java

+23-4
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,7 @@
6666
import java.util.concurrent.Executors;
6767
import java.util.concurrent.ThreadFactory;
6868
import java.util.concurrent.atomic.AtomicLong;
69-
import java.util.function.BiFunction;
70-
import java.util.function.Function;
71-
import java.util.function.LongSupplier;
69+
import java.util.function.*;
7270
import java.util.stream.Collectors;
7371
import java.util.stream.IntStream;
7472
import javax.net.ssl.KeyManager;
@@ -827,8 +825,23 @@ public String apply(String stream, Integer index) {
827825

828826
static class PerformanceMicrometerMetricsCollector extends MicrometerMetricsCollector {
829827

830-
public PerformanceMicrometerMetricsCollector(MeterRegistry registry, String prefix) {
828+
private final IntConsumer publisherCallback;
829+
830+
public PerformanceMicrometerMetricsCollector(
831+
MeterRegistry registry, String prefix, boolean batchSize) {
831832
super(registry, prefix);
833+
if (batchSize) {
834+
DistributionSummary publishBatchSize =
835+
DistributionSummary.builder(prefix + ".publish_batch_size")
836+
.description("publish batch size")
837+
.publishPercentiles(0.5, 0.75, 0.95, 0.99)
838+
.distributionStatisticExpiry(ofSeconds(1))
839+
.serviceLevelObjectives()
840+
.register(registry);
841+
this.publisherCallback = publishBatchSize::record;
842+
} else {
843+
this.publisherCallback = ignored -> {};
844+
}
832845
}
833846

834847
@Override
@@ -849,6 +862,12 @@ protected DistributionSummary createChunkSizeDistributionSummary(
849862
.register(registry);
850863
}
851864

865+
@Override
866+
public void publish(int count) {
867+
super.publish(count);
868+
this.publisherCallback.accept(count);
869+
}
870+
852871
@Override
853872
public void chunk(int entriesCount) {
854873
this.chunkSize.record(entriesCount);

0 commit comments

Comments
 (0)