Skip to content

Commit a0aeaf9

Browse files
committed
Use Micrometer for console metrics in performance tool
To have the same source of truth as Prometheus.
1 parent cec4537 commit a0aeaf9

File tree

3 files changed

+131
-101
lines changed

3 files changed

+131
-101
lines changed

src/main/java/com/rabbitmq/stream/perf/DefaultPerformanceMetrics.java

+84-91
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@
1313
1414
package com.rabbitmq.stream.perf;
1515

16-
import com.codahale.metrics.*;
16+
import com.codahale.metrics.ConsoleReporter;
17+
import io.micrometer.core.instrument.Counter;
1718
import io.micrometer.core.instrument.Timer;
1819
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
19-
import io.micrometer.core.instrument.dropwizard.DropwizardConfig;
20+
import io.micrometer.core.instrument.distribution.HistogramSnapshot;
21+
import io.micrometer.core.instrument.distribution.HistogramSupport;
22+
import io.micrometer.core.instrument.distribution.ValueAtPercentile;
2023
import io.micrometer.core.instrument.dropwizard.DropwizardMeterRegistry;
21-
import io.micrometer.core.instrument.util.HierarchicalNameMapper;
2224
import java.io.*;
2325
import java.nio.file.Files;
2426
import java.nio.file.Path;
@@ -28,6 +30,7 @@
2830
import java.text.StringCharacterIterator;
2931
import java.time.Duration;
3032
import java.util.*;
33+
import java.util.Map.Entry;
3134
import java.util.concurrent.ConcurrentHashMap;
3235
import java.util.concurrent.Executors;
3336
import java.util.concurrent.ScheduledExecutorService;
@@ -45,7 +48,8 @@ class DefaultPerformanceMetrics implements PerformanceMetrics {
4548

4649
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultPerformanceMetrics.class);
4750

48-
private final MetricRegistry metricRegistry;
51+
private final String metricsPrefix;
52+
private final CompositeMeterRegistry meterRegistry;
4953
private final Timer latency, confirmLatency;
5054
private final boolean summaryFile;
5155
private final PrintWriter out;
@@ -55,7 +59,6 @@ class DefaultPerformanceMetrics implements PerformanceMetrics {
5559
private volatile long lastPublishedCount = 0;
5660
private volatile long lastConsumedCount = 0;
5761
private volatile long offset;
58-
private final String metricsSuffix;
5962

6063
DefaultPerformanceMetrics(
6164
CompositeMeterRegistry meterRegistry,
@@ -69,31 +72,9 @@ class DefaultPerformanceMetrics implements PerformanceMetrics {
6972
this.includeByteRates = includeByteRates;
7073
this.memoryReportSupplier = memoryReportSupplier;
7174
this.out = out;
72-
DropwizardConfig dropwizardConfig =
73-
new DropwizardConfig() {
74-
@Override
75-
public String prefix() {
76-
return "";
77-
}
75+
this.metricsPrefix = metricsPrefix;
76+
this.meterRegistry = meterRegistry;
7877

79-
@Override
80-
public String get(String key) {
81-
return null;
82-
}
83-
};
84-
this.metricRegistry = new MetricRegistry();
85-
DropwizardMeterRegistry dropwizardMeterRegistry =
86-
new DropwizardMeterRegistry(
87-
dropwizardConfig,
88-
this.metricRegistry,
89-
HierarchicalNameMapper.DEFAULT,
90-
io.micrometer.core.instrument.Clock.SYSTEM) {
91-
@Override
92-
protected Double nullGaugeValue() {
93-
return null;
94-
}
95-
};
96-
meterRegistry.add(dropwizardMeterRegistry);
9778
this.latency =
9879
Timer.builder(metricsPrefix + ".latency")
9980
.description("message latency")
@@ -112,36 +93,28 @@ protected Double nullGaugeValue() {
11293
} else {
11394
this.confirmLatency = null;
11495
}
115-
// the metrics name contains the tags, if any,
116-
// so we extract the suffix to use it later when looking up the metrics
117-
String key = metricRegistry.getMeters().keySet().iterator().next();
118-
int index = key.indexOf(".");
119-
this.metricsSuffix = index == -1 ? "" : key.substring(index);
12096
}
12197

12298
private long getPublishedCount() {
123-
return this.metricRegistry
124-
.getMeters()
125-
.get("rabbitmqStreamPublished" + metricsSuffix)
126-
.getCount();
99+
return (long) this.meterRegistry.get(metricsName("published")).counter().count();
127100
}
128101

129102
private long getConsumedCount() {
130-
return this.metricRegistry.getMeters().get("rabbitmqStreamConsumed" + metricsSuffix).getCount();
103+
return (long) this.meterRegistry.get(metricsName("consumed")).counter().count();
131104
}
132105

133106
@Override
134107
public void start(String description) throws Exception {
135108
long startTime = System.nanoTime();
136109

137-
String metricPublished = "rabbitmqStreamPublished" + metricsSuffix;
138-
String metricProducerConfirmed = "rabbitmqStreamProducer_confirmed" + metricsSuffix;
139-
String metricConsumed = "rabbitmqStreamConsumed" + metricsSuffix;
140-
String metricChunkSize = "rabbitmqStreamChunk_size" + metricsSuffix;
141-
String metricLatency = "rabbitmqStreamLatency" + metricsSuffix;
142-
String metricConfirmLatency = "rabbitmqStreamConfirm_latency" + metricsSuffix;
143-
String metricWrittenBytes = "rabbitmqStreamWritten_bytes" + metricsSuffix;
144-
String metricReadBytes = "rabbitmqStreamRead_bytes" + metricsSuffix;
110+
String metricPublished = metricsName("published");
111+
String metricProducerConfirmed = metricsName("confirmed");
112+
String metricConsumed = metricsName("consumed");
113+
String metricChunkSize = metricsName("chunk_size");
114+
String metricLatency = metricsName("latency");
115+
String metricConfirmLatency = metricsName("confirm_latency");
116+
String metricWrittenBytes = metricsName("written_bytes");
117+
String metricReadBytes = metricsName("read_bytes");
145118

146119
Set<String> allMetrics =
147120
new HashSet<>(
@@ -156,16 +129,16 @@ public void start(String description) throws Exception {
156129
allMetrics.add(metricConfirmLatency);
157130
}
158131

159-
Map<String, String> metersNamesAndLabels = new LinkedHashMap<>();
160-
metersNamesAndLabels.put(metricPublished, "published");
161-
metersNamesAndLabels.put(metricProducerConfirmed, "confirmed");
162-
metersNamesAndLabels.put(metricConsumed, "consumed");
132+
Map<String, String> countersNamesAndLabels = new LinkedHashMap<>();
133+
countersNamesAndLabels.put(metricPublished, "published");
134+
countersNamesAndLabels.put(metricProducerConfirmed, "confirmed");
135+
countersNamesAndLabels.put(metricConsumed, "consumed");
163136

164137
if (this.includeByteRates) {
165138
allMetrics.add(metricWrittenBytes);
166139
allMetrics.add(metricReadBytes);
167-
metersNamesAndLabels.put(metricWrittenBytes, "written bytes");
168-
metersNamesAndLabels.put(metricReadBytes, "read bytes");
140+
countersNamesAndLabels.put(metricWrittenBytes, "written bytes");
141+
countersNamesAndLabels.put(metricReadBytes, "read bytes");
169142
}
170143

171144
ScheduledExecutorService scheduledExecutorService =
@@ -174,65 +147,60 @@ public void start(String description) throws Exception {
174147
Closeable summaryFileClosingSequence =
175148
maybeSetSummaryFile(description, allMetrics, scheduledExecutorService);
176149

177-
SortedMap<String, Meter> registryMeters = metricRegistry.getMeters();
178-
179-
Map<String, Meter> meters = new LinkedHashMap<>(metersNamesAndLabels.size());
180-
metersNamesAndLabels
150+
Map<String, Counter> counters = new LinkedHashMap<>(countersNamesAndLabels.size());
151+
countersNamesAndLabels
181152
.entrySet()
182-
.forEach(entry -> meters.put(entry.getValue(), registryMeters.get(entry.getKey())));
153+
.forEach(
154+
entry -> counters.put(entry.getValue(), meterRegistry.get(entry.getKey()).counter()));
183155

184-
Map<String, FormatCallback> formatMeter = new HashMap<>();
185-
metersNamesAndLabels.entrySet().stream()
156+
Map<String, FormatCallback> formatCounter = new HashMap<>();
157+
countersNamesAndLabels.entrySet().stream()
186158
.filter(entry -> !entry.getKey().contains("bytes"))
187159
.forEach(
188160
entry -> {
189-
formatMeter.put(
161+
formatCounter.put(
190162
entry.getValue(),
191163
(lastValue, currentValue, duration) -> {
192164
long rate = 1000 * (currentValue - lastValue) / duration.toMillis();
193165
return String.format("%s %d msg/s, ", entry.getValue(), rate);
194166
});
195167
});
196168

197-
metersNamesAndLabels.entrySet().stream()
169+
countersNamesAndLabels.entrySet().stream()
198170
.filter(entry -> entry.getKey().contains("bytes"))
199171
.forEach(
200172
entry -> {
201-
formatMeter.put(
173+
formatCounter.put(
202174
entry.getValue(),
203175
(lastValue, currentValue, duration) -> {
204176
long rate = 1000 * (currentValue - lastValue) / duration.toMillis();
205177
return formatByteRate(entry.getValue(), rate) + ", ";
206178
});
207179
});
208180

209-
Histogram chunkSize = metricRegistry.getHistograms().get(metricChunkSize);
210-
Function<Histogram, String> formatChunkSize =
211-
histogram -> String.format("chunk size %.0f", histogram.getSnapshot().getMean());
212-
213-
com.codahale.metrics.Timer latency = metricRegistry.getTimers().get(metricLatency);
214-
com.codahale.metrics.Timer confirmLatency =
215-
confirmLatency() ? metricRegistry.getTimers().get(metricConfirmLatency) : null;
181+
HistogramSupport chunkSize = meterRegistry.get(metricChunkSize).summary();
182+
Function<HistogramSupport, String> formatChunkSize =
183+
histogram -> String.format("chunk size %.0f", histogram.takeSnapshot().mean());
216184

217185
Function<Number, Number> convertDuration =
218186
in -> in instanceof Long ? in.longValue() / 1_000_000 : in.doubleValue() / 1_000_000;
219-
BiFunction<String, com.codahale.metrics.Timer, String> formatLatency =
187+
BiFunction<String, Timer, String> formatLatency =
220188
(name, timer) -> {
221-
Snapshot snapshot = timer.getSnapshot();
189+
HistogramSnapshot snapshot = timer.takeSnapshot();
190+
222191
return String.format(
223-
name + " min/median/75th/95th/99th %.0f/%.0f/%.0f/%.0f/%.0f ms",
224-
convertDuration.apply(snapshot.getMin()),
225-
convertDuration.apply(snapshot.getMedian()),
226-
convertDuration.apply(snapshot.get75thPercentile()),
227-
convertDuration.apply(snapshot.get95thPercentile()),
228-
convertDuration.apply(snapshot.get99thPercentile()));
192+
name + " median/75th/95th/99th %.0f/%.0f/%.0f/%.0f ms",
193+
convertDuration.apply(percentile(snapshot, 0.5).value()),
194+
convertDuration.apply(percentile(snapshot, 0.75).value()),
195+
convertDuration.apply(percentile(snapshot, 0.95).value()),
196+
convertDuration.apply(percentile(snapshot, 0.99).value()));
229197
};
230198

231199
AtomicInteger reportCount = new AtomicInteger(1);
232200

233201
AtomicLong lastTick = new AtomicLong(startTime);
234-
Map<String, Long> lastMetersValues = new ConcurrentHashMap<>(meters.size());
235-
meters.entrySet().forEach(e -> lastMetersValues.put(e.getKey(), e.getValue().getCount()));
202+
Map<String, Long> lastMetersValues = new ConcurrentHashMap<>(counters.size());
203+
counters.entrySet().forEach(e -> lastMetersValues.put(e.getKey(), (long) e.getValue().count()));
236204

237205
ScheduledFuture<?> consoleReportingTask =
238206
scheduledExecutorService.scheduleAtFixedRate(
@@ -244,16 +212,16 @@ public void start(String description) throws Exception {
244212
lastTick.set(currentTime);
245213
StringBuilder builder = new StringBuilder();
246214
builder.append(reportCount.get()).append(", ");
247-
meters
215+
counters
248216
.entrySet()
249217
.forEach(
250218
entry -> {
251219
String meterName = entry.getKey();
252-
Meter meter = entry.getValue();
220+
Counter counter = entry.getValue();
253221
long lastValue = lastMetersValues.get(meterName);
254-
long currentValue = meter.getCount();
222+
long currentValue = (long) counter.count();
255223
builder.append(
256-
formatMeter
224+
formatCounter
257225
.get(meterName)
258226
.compute(lastValue, currentValue, duration));
259227
lastMetersValues.put(meterName, currentValue);
@@ -291,27 +259,28 @@ public void start(String description) throws Exception {
291259
Duration d = Duration.ofNanos(System.nanoTime() - startTime);
292260
Duration duration = d.getSeconds() <= 0 ? Duration.ofSeconds(1) : d;
293261

294-
Function<Map.Entry<String, Meter>, String> formatMeterSummary =
262+
Function<Entry<String, Counter>, String> formatMeterSummary =
295263
entry -> {
296264
if (entry.getKey().contains("bytes")) {
297265
return formatByteRate(
298-
entry.getKey(), 1000 * entry.getValue().getCount() / duration.toMillis())
266+
entry.getKey(),
267+
1000 * (long) entry.getValue().count() / duration.toMillis())
299268
+ ", ";
300269
} else {
301270
return String.format(
302271
"%s %d msg/s, ",
303-
entry.getKey(), 1000 * entry.getValue().getCount() / duration.toMillis());
272+
entry.getKey(), 1000 * (long) entry.getValue().count() / duration.toMillis());
304273
}
305274
};
306275

307-
BiFunction<String, com.codahale.metrics.Timer, String> formatLatencySummary =
276+
BiFunction<String, HistogramSupport, String> formatLatencySummary =
308277
(name, histogram) ->
309278
String.format(
310279
name + " 95th %.0f ms",
311-
convertDuration.apply(histogram.getSnapshot().get95thPercentile()));
280+
convertDuration.apply(percentile(histogram.takeSnapshot(), 0.95).value()));
312281

313282
StringBuilder builder = new StringBuilder("Summary: ");
314-
meters.entrySet().forEach(entry -> builder.append(formatMeterSummary.apply(entry)));
283+
counters.entrySet().forEach(entry -> builder.append(formatMeterSummary.apply(entry)));
315284
if (confirmLatency()) {
316285
builder
317286
.append(formatLatencySummary.apply("confirm latency", confirmLatency))
@@ -360,8 +329,19 @@ private Closeable maybeSetSummaryFile(
360329
printStream.println(description);
361330
}
362331

332+
DropwizardMeterRegistry dropwizardMeterRegistry =
333+
this.meterRegistry.getRegistries().stream()
334+
.filter(r -> r instanceof DropwizardMeterRegistry)
335+
.map(r -> (DropwizardMeterRegistry) r)
336+
.findAny()
337+
.orElseGet(() -> Utils.dropwizardMeterRegistry());
338+
339+
if (!this.meterRegistry.getRegistries().contains(dropwizardMeterRegistry)) {
340+
this.meterRegistry.add(dropwizardMeterRegistry);
341+
}
342+
363343
ConsoleReporter fileReporter =
364-
ConsoleReporter.forRegistry(metricRegistry)
344+
ConsoleReporter.forRegistry(dropwizardMeterRegistry.getDropwizardRegistry())
365345
.filter((name, metric) -> allMetrics.contains(name))
366346
.convertRatesTo(TimeUnit.SECONDS)
367347
.convertDurationsTo(TimeUnit.MILLISECONDS)
@@ -431,4 +411,17 @@ private interface FormatCallback {
431411

432412
String compute(long lastValue, long currentValue, Duration duration);
433413
}
414+
415+
private String metricsName(String name) {
416+
return this.metricsPrefix + "." + name;
417+
}
418+
419+
private static ValueAtPercentile percentile(HistogramSnapshot snapshot, double expected) {
420+
for (ValueAtPercentile percentile : snapshot.percentileValues()) {
421+
if (percentile.percentile() == expected) {
422+
return percentile;
423+
}
424+
}
425+
return null;
426+
}
434427
}

src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java

+15-10
Original file line numberDiff line numberDiff line change
@@ -575,16 +575,6 @@ public Integer call() throws Exception {
575575
memoryReportSupplier = () -> "";
576576
}
577577

578-
this.performanceMetrics =
579-
new DefaultPerformanceMetrics(
580-
meterRegistry,
581-
metricsPrefix,
582-
this.summaryFile,
583-
this.includeByteRates,
584-
this.confirmLatency,
585-
memoryReportSupplier,
586-
this.out);
587-
588578
this.messageSize = this.messageSize < 8 ? 8 : this.messageSize; // we need to store a long in it
589579

590580
ShutdownService shutdownService = new ShutdownService();
@@ -598,6 +588,21 @@ public Integer call() throws Exception {
598588
this.monitorings.forEach(m -> m.configure(monitoringContext));
599589
monitoringContext.start();
600590

591+
if (meterRegistry.getRegistries().isEmpty()) {
592+
// we need at least one to do the calculations
593+
meterRegistry.add(Utils.dropwizardMeterRegistry());
594+
}
595+
596+
this.performanceMetrics =
597+
new DefaultPerformanceMetrics(
598+
meterRegistry,
599+
metricsPrefix,
600+
this.summaryFile,
601+
this.includeByteRates,
602+
this.confirmLatency,
603+
memoryReportSupplier,
604+
this.out);
605+
601606
shutdownService.wrap(closeStep("Closing monitoring context", monitoringContext::close));
602607

603608
// FIXME add confirm latency

0 commit comments

Comments
 (0)