Skip to content

Use Micrometer for console metrics in performance tool #231

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 84 additions & 91 deletions src/main/java/com/rabbitmq/stream/perf/DefaultPerformanceMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
// [email protected].
package com.rabbitmq.stream.perf;

import com.codahale.metrics.*;
import com.codahale.metrics.ConsoleReporter;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import io.micrometer.core.instrument.dropwizard.DropwizardConfig;
import io.micrometer.core.instrument.distribution.HistogramSnapshot;
import io.micrometer.core.instrument.distribution.HistogramSupport;
import io.micrometer.core.instrument.distribution.ValueAtPercentile;
import io.micrometer.core.instrument.dropwizard.DropwizardMeterRegistry;
import io.micrometer.core.instrument.util.HierarchicalNameMapper;
import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
Expand All @@ -28,6 +30,7 @@
import java.text.StringCharacterIterator;
import java.time.Duration;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -45,7 +48,8 @@ class DefaultPerformanceMetrics implements PerformanceMetrics {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultPerformanceMetrics.class);

private final MetricRegistry metricRegistry;
private final String metricsPrefix;
private final CompositeMeterRegistry meterRegistry;
private final Timer latency, confirmLatency;
private final boolean summaryFile;
private final PrintWriter out;
Expand All @@ -55,7 +59,6 @@ class DefaultPerformanceMetrics implements PerformanceMetrics {
private volatile long lastPublishedCount = 0;
private volatile long lastConsumedCount = 0;
private volatile long offset;
private final String metricsSuffix;

DefaultPerformanceMetrics(
CompositeMeterRegistry meterRegistry,
Expand All @@ -69,31 +72,9 @@ class DefaultPerformanceMetrics implements PerformanceMetrics {
this.includeByteRates = includeByteRates;
this.memoryReportSupplier = memoryReportSupplier;
this.out = out;
DropwizardConfig dropwizardConfig =
new DropwizardConfig() {
@Override
public String prefix() {
return "";
}
this.metricsPrefix = metricsPrefix;
this.meterRegistry = meterRegistry;

@Override
public String get(String key) {
return null;
}
};
this.metricRegistry = new MetricRegistry();
DropwizardMeterRegistry dropwizardMeterRegistry =
new DropwizardMeterRegistry(
dropwizardConfig,
this.metricRegistry,
HierarchicalNameMapper.DEFAULT,
io.micrometer.core.instrument.Clock.SYSTEM) {
@Override
protected Double nullGaugeValue() {
return null;
}
};
meterRegistry.add(dropwizardMeterRegistry);
this.latency =
Timer.builder(metricsPrefix + ".latency")
.description("message latency")
Expand All @@ -112,36 +93,28 @@ protected Double nullGaugeValue() {
} else {
this.confirmLatency = null;
}
// the metrics name contains the tags, if any,
// so we extract the suffix to use it later when looking up the metrics
String key = metricRegistry.getMeters().keySet().iterator().next();
int index = key.indexOf(".");
this.metricsSuffix = index == -1 ? "" : key.substring(index);
}

private long getPublishedCount() {
return this.metricRegistry
.getMeters()
.get("rabbitmqStreamPublished" + metricsSuffix)
.getCount();
return (long) this.meterRegistry.get(metricsName("published")).counter().count();
}

private long getConsumedCount() {
return this.metricRegistry.getMeters().get("rabbitmqStreamConsumed" + metricsSuffix).getCount();
return (long) this.meterRegistry.get(metricsName("consumed")).counter().count();
}

@Override
public void start(String description) throws Exception {
long startTime = System.nanoTime();

String metricPublished = "rabbitmqStreamPublished" + metricsSuffix;
String metricProducerConfirmed = "rabbitmqStreamProducer_confirmed" + metricsSuffix;
String metricConsumed = "rabbitmqStreamConsumed" + metricsSuffix;
String metricChunkSize = "rabbitmqStreamChunk_size" + metricsSuffix;
String metricLatency = "rabbitmqStreamLatency" + metricsSuffix;
String metricConfirmLatency = "rabbitmqStreamConfirm_latency" + metricsSuffix;
String metricWrittenBytes = "rabbitmqStreamWritten_bytes" + metricsSuffix;
String metricReadBytes = "rabbitmqStreamRead_bytes" + metricsSuffix;
String metricPublished = metricsName("published");
String metricProducerConfirmed = metricsName("confirmed");
String metricConsumed = metricsName("consumed");
String metricChunkSize = metricsName("chunk_size");
String metricLatency = metricsName("latency");
String metricConfirmLatency = metricsName("confirm_latency");
String metricWrittenBytes = metricsName("written_bytes");
String metricReadBytes = metricsName("read_bytes");

Set<String> allMetrics =
new HashSet<>(
Expand All @@ -156,16 +129,16 @@ public void start(String description) throws Exception {
allMetrics.add(metricConfirmLatency);
}

Map<String, String> metersNamesAndLabels = new LinkedHashMap<>();
metersNamesAndLabels.put(metricPublished, "published");
metersNamesAndLabels.put(metricProducerConfirmed, "confirmed");
metersNamesAndLabels.put(metricConsumed, "consumed");
Map<String, String> countersNamesAndLabels = new LinkedHashMap<>();
countersNamesAndLabels.put(metricPublished, "published");
countersNamesAndLabels.put(metricProducerConfirmed, "confirmed");
countersNamesAndLabels.put(metricConsumed, "consumed");

if (this.includeByteRates) {
allMetrics.add(metricWrittenBytes);
allMetrics.add(metricReadBytes);
metersNamesAndLabels.put(metricWrittenBytes, "written bytes");
metersNamesAndLabels.put(metricReadBytes, "read bytes");
countersNamesAndLabels.put(metricWrittenBytes, "written bytes");
countersNamesAndLabels.put(metricReadBytes, "read bytes");
}

ScheduledExecutorService scheduledExecutorService =
Expand All @@ -174,65 +147,60 @@ public void start(String description) throws Exception {
Closeable summaryFileClosingSequence =
maybeSetSummaryFile(description, allMetrics, scheduledExecutorService);

SortedMap<String, Meter> registryMeters = metricRegistry.getMeters();

Map<String, Meter> meters = new LinkedHashMap<>(metersNamesAndLabels.size());
metersNamesAndLabels
Map<String, Counter> counters = new LinkedHashMap<>(countersNamesAndLabels.size());
countersNamesAndLabels
.entrySet()
.forEach(entry -> meters.put(entry.getValue(), registryMeters.get(entry.getKey())));
.forEach(
entry -> counters.put(entry.getValue(), meterRegistry.get(entry.getKey()).counter()));

Map<String, FormatCallback> formatMeter = new HashMap<>();
metersNamesAndLabels.entrySet().stream()
Map<String, FormatCallback> formatCounter = new HashMap<>();
countersNamesAndLabels.entrySet().stream()
.filter(entry -> !entry.getKey().contains("bytes"))
.forEach(
entry -> {
formatMeter.put(
formatCounter.put(
entry.getValue(),
(lastValue, currentValue, duration) -> {
long rate = 1000 * (currentValue - lastValue) / duration.toMillis();
return String.format("%s %d msg/s, ", entry.getValue(), rate);
});
});

metersNamesAndLabels.entrySet().stream()
countersNamesAndLabels.entrySet().stream()
.filter(entry -> entry.getKey().contains("bytes"))
.forEach(
entry -> {
formatMeter.put(
formatCounter.put(
entry.getValue(),
(lastValue, currentValue, duration) -> {
long rate = 1000 * (currentValue - lastValue) / duration.toMillis();
return formatByteRate(entry.getValue(), rate) + ", ";
});
});

Histogram chunkSize = metricRegistry.getHistograms().get(metricChunkSize);
Function<Histogram, String> formatChunkSize =
histogram -> String.format("chunk size %.0f", histogram.getSnapshot().getMean());

com.codahale.metrics.Timer latency = metricRegistry.getTimers().get(metricLatency);
com.codahale.metrics.Timer confirmLatency =
confirmLatency() ? metricRegistry.getTimers().get(metricConfirmLatency) : null;
HistogramSupport chunkSize = meterRegistry.get(metricChunkSize).summary();
Function<HistogramSupport, String> formatChunkSize =
histogram -> String.format("chunk size %.0f", histogram.takeSnapshot().mean());

Function<Number, Number> convertDuration =
in -> in instanceof Long ? in.longValue() / 1_000_000 : in.doubleValue() / 1_000_000;
BiFunction<String, com.codahale.metrics.Timer, String> formatLatency =
BiFunction<String, Timer, String> formatLatency =
(name, timer) -> {
Snapshot snapshot = timer.getSnapshot();
HistogramSnapshot snapshot = timer.takeSnapshot();

return String.format(
name + " min/median/75th/95th/99th %.0f/%.0f/%.0f/%.0f/%.0f ms",
convertDuration.apply(snapshot.getMin()),
convertDuration.apply(snapshot.getMedian()),
convertDuration.apply(snapshot.get75thPercentile()),
convertDuration.apply(snapshot.get95thPercentile()),
convertDuration.apply(snapshot.get99thPercentile()));
name + " median/75th/95th/99th %.0f/%.0f/%.0f/%.0f ms",
convertDuration.apply(percentile(snapshot, 0.5).value()),
convertDuration.apply(percentile(snapshot, 0.75).value()),
convertDuration.apply(percentile(snapshot, 0.95).value()),
convertDuration.apply(percentile(snapshot, 0.99).value()));
};

AtomicInteger reportCount = new AtomicInteger(1);

AtomicLong lastTick = new AtomicLong(startTime);
Map<String, Long> lastMetersValues = new ConcurrentHashMap<>(meters.size());
meters.entrySet().forEach(e -> lastMetersValues.put(e.getKey(), e.getValue().getCount()));
Map<String, Long> lastMetersValues = new ConcurrentHashMap<>(counters.size());
counters.entrySet().forEach(e -> lastMetersValues.put(e.getKey(), (long) e.getValue().count()));

ScheduledFuture<?> consoleReportingTask =
scheduledExecutorService.scheduleAtFixedRate(
Expand All @@ -244,16 +212,16 @@ public void start(String description) throws Exception {
lastTick.set(currentTime);
StringBuilder builder = new StringBuilder();
builder.append(reportCount.get()).append(", ");
meters
counters
.entrySet()
.forEach(
entry -> {
String meterName = entry.getKey();
Meter meter = entry.getValue();
Counter counter = entry.getValue();
long lastValue = lastMetersValues.get(meterName);
long currentValue = meter.getCount();
long currentValue = (long) counter.count();
builder.append(
formatMeter
formatCounter
.get(meterName)
.compute(lastValue, currentValue, duration));
lastMetersValues.put(meterName, currentValue);
Expand Down Expand Up @@ -291,27 +259,28 @@ public void start(String description) throws Exception {
Duration d = Duration.ofNanos(System.nanoTime() - startTime);
Duration duration = d.getSeconds() <= 0 ? Duration.ofSeconds(1) : d;

Function<Map.Entry<String, Meter>, String> formatMeterSummary =
Function<Entry<String, Counter>, String> formatMeterSummary =
entry -> {
if (entry.getKey().contains("bytes")) {
return formatByteRate(
entry.getKey(), 1000 * entry.getValue().getCount() / duration.toMillis())
entry.getKey(),
1000 * (long) entry.getValue().count() / duration.toMillis())
+ ", ";
} else {
return String.format(
"%s %d msg/s, ",
entry.getKey(), 1000 * entry.getValue().getCount() / duration.toMillis());
entry.getKey(), 1000 * (long) entry.getValue().count() / duration.toMillis());
}
};

BiFunction<String, com.codahale.metrics.Timer, String> formatLatencySummary =
BiFunction<String, HistogramSupport, String> formatLatencySummary =
(name, histogram) ->
String.format(
name + " 95th %.0f ms",
convertDuration.apply(histogram.getSnapshot().get95thPercentile()));
convertDuration.apply(percentile(histogram.takeSnapshot(), 0.95).value()));

StringBuilder builder = new StringBuilder("Summary: ");
meters.entrySet().forEach(entry -> builder.append(formatMeterSummary.apply(entry)));
counters.entrySet().forEach(entry -> builder.append(formatMeterSummary.apply(entry)));
if (confirmLatency()) {
builder
.append(formatLatencySummary.apply("confirm latency", confirmLatency))
Expand Down Expand Up @@ -360,8 +329,19 @@ private Closeable maybeSetSummaryFile(
printStream.println(description);
}

DropwizardMeterRegistry dropwizardMeterRegistry =
this.meterRegistry.getRegistries().stream()
.filter(r -> r instanceof DropwizardMeterRegistry)
.map(r -> (DropwizardMeterRegistry) r)
.findAny()
.orElseGet(() -> Utils.dropwizardMeterRegistry());

if (!this.meterRegistry.getRegistries().contains(dropwizardMeterRegistry)) {
this.meterRegistry.add(dropwizardMeterRegistry);
}

ConsoleReporter fileReporter =
ConsoleReporter.forRegistry(metricRegistry)
ConsoleReporter.forRegistry(dropwizardMeterRegistry.getDropwizardRegistry())
.filter((name, metric) -> allMetrics.contains(name))
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
Expand Down Expand Up @@ -431,4 +411,17 @@ private interface FormatCallback {

String compute(long lastValue, long currentValue, Duration duration);
}

private String metricsName(String name) {
return this.metricsPrefix + "." + name;
}

private static ValueAtPercentile percentile(HistogramSnapshot snapshot, double expected) {
for (ValueAtPercentile percentile : snapshot.percentileValues()) {
if (percentile.percentile() == expected) {
return percentile;
}
}
return null;
}
}
25 changes: 15 additions & 10 deletions src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -575,16 +575,6 @@ public Integer call() throws Exception {
memoryReportSupplier = () -> "";
}

this.performanceMetrics =
new DefaultPerformanceMetrics(
meterRegistry,
metricsPrefix,
this.summaryFile,
this.includeByteRates,
this.confirmLatency,
memoryReportSupplier,
this.out);

this.messageSize = this.messageSize < 8 ? 8 : this.messageSize; // we need to store a long in it

ShutdownService shutdownService = new ShutdownService();
Expand All @@ -598,6 +588,21 @@ public Integer call() throws Exception {
this.monitorings.forEach(m -> m.configure(monitoringContext));
monitoringContext.start();

if (meterRegistry.getRegistries().isEmpty()) {
// we need at least one to do the calculations
meterRegistry.add(Utils.dropwizardMeterRegistry());
}

this.performanceMetrics =
new DefaultPerformanceMetrics(
meterRegistry,
metricsPrefix,
this.summaryFile,
this.includeByteRates,
this.confirmLatency,
memoryReportSupplier,
this.out);

shutdownService.wrap(closeStep("Closing monitoring context", monitoringContext::close));

// FIXME add confirm latency
Expand Down
Loading