diff --git a/src/main/java/com/rabbitmq/stream/perf/DefaultPerformanceMetrics.java b/src/main/java/com/rabbitmq/stream/perf/DefaultPerformanceMetrics.java index ed6ad5c3cd..7cdc5604a4 100644 --- a/src/main/java/com/rabbitmq/stream/perf/DefaultPerformanceMetrics.java +++ b/src/main/java/com/rabbitmq/stream/perf/DefaultPerformanceMetrics.java @@ -13,12 +13,14 @@ // info@rabbitmq.com. package com.rabbitmq.stream.perf; -import com.codahale.metrics.*; +import com.codahale.metrics.ConsoleReporter; +import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.composite.CompositeMeterRegistry; -import io.micrometer.core.instrument.dropwizard.DropwizardConfig; +import io.micrometer.core.instrument.distribution.HistogramSnapshot; +import io.micrometer.core.instrument.distribution.HistogramSupport; +import io.micrometer.core.instrument.distribution.ValueAtPercentile; import io.micrometer.core.instrument.dropwizard.DropwizardMeterRegistry; -import io.micrometer.core.instrument.util.HierarchicalNameMapper; import java.io.*; import java.nio.file.Files; import java.nio.file.Path; @@ -28,6 +30,7 @@ import java.text.StringCharacterIterator; import java.time.Duration; import java.util.*; +import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -45,7 +48,8 @@ class DefaultPerformanceMetrics implements PerformanceMetrics { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultPerformanceMetrics.class); - private final MetricRegistry metricRegistry; + private final String metricsPrefix; + private final CompositeMeterRegistry meterRegistry; private final Timer latency, confirmLatency; private final boolean summaryFile; private final PrintWriter out; @@ -55,7 +59,6 @@ class DefaultPerformanceMetrics implements PerformanceMetrics { private volatile long lastPublishedCount = 0; private volatile long lastConsumedCount = 0; private volatile long offset; - private final String metricsSuffix; DefaultPerformanceMetrics( CompositeMeterRegistry meterRegistry, @@ -69,31 +72,9 @@ class DefaultPerformanceMetrics implements PerformanceMetrics { this.includeByteRates = includeByteRates; this.memoryReportSupplier = memoryReportSupplier; this.out = out; - DropwizardConfig dropwizardConfig = - new DropwizardConfig() { - @Override - public String prefix() { - return ""; - } + this.metricsPrefix = metricsPrefix; + this.meterRegistry = meterRegistry; - @Override - public String get(String key) { - return null; - } - }; - this.metricRegistry = new MetricRegistry(); - DropwizardMeterRegistry dropwizardMeterRegistry = - new DropwizardMeterRegistry( - dropwizardConfig, - this.metricRegistry, - HierarchicalNameMapper.DEFAULT, - io.micrometer.core.instrument.Clock.SYSTEM) { - @Override - protected Double nullGaugeValue() { - return null; - } - }; - meterRegistry.add(dropwizardMeterRegistry); this.latency = Timer.builder(metricsPrefix + ".latency") .description("message latency") @@ -112,36 +93,28 @@ protected Double nullGaugeValue() { } else { this.confirmLatency = null; } - // the metrics name contains the tags, if any, - // so we extract the suffix to use it later when looking up the metrics - String key = metricRegistry.getMeters().keySet().iterator().next(); - int index = key.indexOf("."); - this.metricsSuffix = index == -1 ? "" : key.substring(index); } private long getPublishedCount() { - return this.metricRegistry - .getMeters() - .get("rabbitmqStreamPublished" + metricsSuffix) - .getCount(); + return (long) this.meterRegistry.get(metricsName("published")).counter().count(); } private long getConsumedCount() { - return this.metricRegistry.getMeters().get("rabbitmqStreamConsumed" + metricsSuffix).getCount(); + return (long) this.meterRegistry.get(metricsName("consumed")).counter().count(); } @Override public void start(String description) throws Exception { long startTime = System.nanoTime(); - String metricPublished = "rabbitmqStreamPublished" + metricsSuffix; - String metricProducerConfirmed = "rabbitmqStreamProducer_confirmed" + metricsSuffix; - String metricConsumed = "rabbitmqStreamConsumed" + metricsSuffix; - String metricChunkSize = "rabbitmqStreamChunk_size" + metricsSuffix; - String metricLatency = "rabbitmqStreamLatency" + metricsSuffix; - String metricConfirmLatency = "rabbitmqStreamConfirm_latency" + metricsSuffix; - String metricWrittenBytes = "rabbitmqStreamWritten_bytes" + metricsSuffix; - String metricReadBytes = "rabbitmqStreamRead_bytes" + metricsSuffix; + String metricPublished = metricsName("published"); + String metricProducerConfirmed = metricsName("confirmed"); + String metricConsumed = metricsName("consumed"); + String metricChunkSize = metricsName("chunk_size"); + String metricLatency = metricsName("latency"); + String metricConfirmLatency = metricsName("confirm_latency"); + String metricWrittenBytes = metricsName("written_bytes"); + String metricReadBytes = metricsName("read_bytes"); Set allMetrics = new HashSet<>( @@ -156,16 +129,16 @@ public void start(String description) throws Exception { allMetrics.add(metricConfirmLatency); } - Map metersNamesAndLabels = new LinkedHashMap<>(); - metersNamesAndLabels.put(metricPublished, "published"); - metersNamesAndLabels.put(metricProducerConfirmed, "confirmed"); - metersNamesAndLabels.put(metricConsumed, "consumed"); + Map countersNamesAndLabels = new LinkedHashMap<>(); + countersNamesAndLabels.put(metricPublished, "published"); + countersNamesAndLabels.put(metricProducerConfirmed, "confirmed"); + countersNamesAndLabels.put(metricConsumed, "consumed"); if (this.includeByteRates) { allMetrics.add(metricWrittenBytes); allMetrics.add(metricReadBytes); - metersNamesAndLabels.put(metricWrittenBytes, "written bytes"); - metersNamesAndLabels.put(metricReadBytes, "read bytes"); + countersNamesAndLabels.put(metricWrittenBytes, "written bytes"); + countersNamesAndLabels.put(metricReadBytes, "read bytes"); } ScheduledExecutorService scheduledExecutorService = @@ -174,19 +147,18 @@ public void start(String description) throws Exception { Closeable summaryFileClosingSequence = maybeSetSummaryFile(description, allMetrics, scheduledExecutorService); - SortedMap registryMeters = metricRegistry.getMeters(); - - Map meters = new LinkedHashMap<>(metersNamesAndLabels.size()); - metersNamesAndLabels + Map counters = new LinkedHashMap<>(countersNamesAndLabels.size()); + countersNamesAndLabels .entrySet() - .forEach(entry -> meters.put(entry.getValue(), registryMeters.get(entry.getKey()))); + .forEach( + entry -> counters.put(entry.getValue(), meterRegistry.get(entry.getKey()).counter())); - Map formatMeter = new HashMap<>(); - metersNamesAndLabels.entrySet().stream() + Map formatCounter = new HashMap<>(); + countersNamesAndLabels.entrySet().stream() .filter(entry -> !entry.getKey().contains("bytes")) .forEach( entry -> { - formatMeter.put( + formatCounter.put( entry.getValue(), (lastValue, currentValue, duration) -> { long rate = 1000 * (currentValue - lastValue) / duration.toMillis(); @@ -194,11 +166,11 @@ public void start(String description) throws Exception { }); }); - metersNamesAndLabels.entrySet().stream() + countersNamesAndLabels.entrySet().stream() .filter(entry -> entry.getKey().contains("bytes")) .forEach( entry -> { - formatMeter.put( + formatCounter.put( entry.getValue(), (lastValue, currentValue, duration) -> { long rate = 1000 * (currentValue - lastValue) / duration.toMillis(); @@ -206,33 +178,29 @@ public void start(String description) throws Exception { }); }); - Histogram chunkSize = metricRegistry.getHistograms().get(metricChunkSize); - Function formatChunkSize = - histogram -> String.format("chunk size %.0f", histogram.getSnapshot().getMean()); - - com.codahale.metrics.Timer latency = metricRegistry.getTimers().get(metricLatency); - com.codahale.metrics.Timer confirmLatency = - confirmLatency() ? metricRegistry.getTimers().get(metricConfirmLatency) : null; + HistogramSupport chunkSize = meterRegistry.get(metricChunkSize).summary(); + Function formatChunkSize = + histogram -> String.format("chunk size %.0f", histogram.takeSnapshot().mean()); Function convertDuration = in -> in instanceof Long ? in.longValue() / 1_000_000 : in.doubleValue() / 1_000_000; - BiFunction formatLatency = + BiFunction formatLatency = (name, timer) -> { - Snapshot snapshot = timer.getSnapshot(); + HistogramSnapshot snapshot = timer.takeSnapshot(); + return String.format( - name + " min/median/75th/95th/99th %.0f/%.0f/%.0f/%.0f/%.0f ms", - convertDuration.apply(snapshot.getMin()), - convertDuration.apply(snapshot.getMedian()), - convertDuration.apply(snapshot.get75thPercentile()), - convertDuration.apply(snapshot.get95thPercentile()), - convertDuration.apply(snapshot.get99thPercentile())); + name + " median/75th/95th/99th %.0f/%.0f/%.0f/%.0f ms", + convertDuration.apply(percentile(snapshot, 0.5).value()), + convertDuration.apply(percentile(snapshot, 0.75).value()), + convertDuration.apply(percentile(snapshot, 0.95).value()), + convertDuration.apply(percentile(snapshot, 0.99).value())); }; AtomicInteger reportCount = new AtomicInteger(1); AtomicLong lastTick = new AtomicLong(startTime); - Map lastMetersValues = new ConcurrentHashMap<>(meters.size()); - meters.entrySet().forEach(e -> lastMetersValues.put(e.getKey(), e.getValue().getCount())); + Map lastMetersValues = new ConcurrentHashMap<>(counters.size()); + counters.entrySet().forEach(e -> lastMetersValues.put(e.getKey(), (long) e.getValue().count())); ScheduledFuture consoleReportingTask = scheduledExecutorService.scheduleAtFixedRate( @@ -244,16 +212,16 @@ public void start(String description) throws Exception { lastTick.set(currentTime); StringBuilder builder = new StringBuilder(); builder.append(reportCount.get()).append(", "); - meters + counters .entrySet() .forEach( entry -> { String meterName = entry.getKey(); - Meter meter = entry.getValue(); + Counter counter = entry.getValue(); long lastValue = lastMetersValues.get(meterName); - long currentValue = meter.getCount(); + long currentValue = (long) counter.count(); builder.append( - formatMeter + formatCounter .get(meterName) .compute(lastValue, currentValue, duration)); lastMetersValues.put(meterName, currentValue); @@ -291,27 +259,28 @@ public void start(String description) throws Exception { Duration d = Duration.ofNanos(System.nanoTime() - startTime); Duration duration = d.getSeconds() <= 0 ? Duration.ofSeconds(1) : d; - Function, String> formatMeterSummary = + Function, String> formatMeterSummary = entry -> { if (entry.getKey().contains("bytes")) { return formatByteRate( - entry.getKey(), 1000 * entry.getValue().getCount() / duration.toMillis()) + entry.getKey(), + 1000 * (long) entry.getValue().count() / duration.toMillis()) + ", "; } else { return String.format( "%s %d msg/s, ", - entry.getKey(), 1000 * entry.getValue().getCount() / duration.toMillis()); + entry.getKey(), 1000 * (long) entry.getValue().count() / duration.toMillis()); } }; - BiFunction formatLatencySummary = + BiFunction formatLatencySummary = (name, histogram) -> String.format( name + " 95th %.0f ms", - convertDuration.apply(histogram.getSnapshot().get95thPercentile())); + convertDuration.apply(percentile(histogram.takeSnapshot(), 0.95).value())); StringBuilder builder = new StringBuilder("Summary: "); - meters.entrySet().forEach(entry -> builder.append(formatMeterSummary.apply(entry))); + counters.entrySet().forEach(entry -> builder.append(formatMeterSummary.apply(entry))); if (confirmLatency()) { builder .append(formatLatencySummary.apply("confirm latency", confirmLatency)) @@ -360,8 +329,19 @@ private Closeable maybeSetSummaryFile( printStream.println(description); } + DropwizardMeterRegistry dropwizardMeterRegistry = + this.meterRegistry.getRegistries().stream() + .filter(r -> r instanceof DropwizardMeterRegistry) + .map(r -> (DropwizardMeterRegistry) r) + .findAny() + .orElseGet(() -> Utils.dropwizardMeterRegistry()); + + if (!this.meterRegistry.getRegistries().contains(dropwizardMeterRegistry)) { + this.meterRegistry.add(dropwizardMeterRegistry); + } + ConsoleReporter fileReporter = - ConsoleReporter.forRegistry(metricRegistry) + ConsoleReporter.forRegistry(dropwizardMeterRegistry.getDropwizardRegistry()) .filter((name, metric) -> allMetrics.contains(name)) .convertRatesTo(TimeUnit.SECONDS) .convertDurationsTo(TimeUnit.MILLISECONDS) @@ -431,4 +411,17 @@ private interface FormatCallback { String compute(long lastValue, long currentValue, Duration duration); } + + private String metricsName(String name) { + return this.metricsPrefix + "." + name; + } + + private static ValueAtPercentile percentile(HistogramSnapshot snapshot, double expected) { + for (ValueAtPercentile percentile : snapshot.percentileValues()) { + if (percentile.percentile() == expected) { + return percentile; + } + } + return null; + } } diff --git a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java index 1ee7d0aa74..1c8eda7e74 100644 --- a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java +++ b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java @@ -575,16 +575,6 @@ public Integer call() throws Exception { memoryReportSupplier = () -> ""; } - this.performanceMetrics = - new DefaultPerformanceMetrics( - meterRegistry, - metricsPrefix, - this.summaryFile, - this.includeByteRates, - this.confirmLatency, - memoryReportSupplier, - this.out); - this.messageSize = this.messageSize < 8 ? 8 : this.messageSize; // we need to store a long in it ShutdownService shutdownService = new ShutdownService(); @@ -598,6 +588,21 @@ public Integer call() throws Exception { this.monitorings.forEach(m -> m.configure(monitoringContext)); monitoringContext.start(); + if (meterRegistry.getRegistries().isEmpty()) { + // we need at least one to do the calculations + meterRegistry.add(Utils.dropwizardMeterRegistry()); + } + + this.performanceMetrics = + new DefaultPerformanceMetrics( + meterRegistry, + metricsPrefix, + this.summaryFile, + this.includeByteRates, + this.confirmLatency, + memoryReportSupplier, + this.out); + shutdownService.wrap(closeStep("Closing monitoring context", monitoringContext::close)); // FIXME add confirm latency diff --git a/src/main/java/com/rabbitmq/stream/perf/Utils.java b/src/main/java/com/rabbitmq/stream/perf/Utils.java index cb0e482b26..3224a4ea65 100644 --- a/src/main/java/com/rabbitmq/stream/perf/Utils.java +++ b/src/main/java/com/rabbitmq/stream/perf/Utils.java @@ -13,6 +13,7 @@ // info@rabbitmq.com. package com.rabbitmq.stream.perf; +import com.codahale.metrics.MetricRegistry; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; @@ -29,6 +30,9 @@ import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.dropwizard.DropwizardConfig; +import io.micrometer.core.instrument.dropwizard.DropwizardMeterRegistry; +import io.micrometer.core.instrument.util.HierarchicalNameMapper; import java.lang.reflect.Constructor; import java.lang.reflect.Field; import java.lang.reflect.Method; @@ -793,4 +797,32 @@ public void chunk(int entriesCount) { this.chunkSize.record(entriesCount); } } + + static DropwizardMeterRegistry dropwizardMeterRegistry() { + DropwizardConfig dropwizardConfig = + new DropwizardConfig() { + @Override + public String prefix() { + return ""; + } + + @Override + public String get(String key) { + return null; + } + }; + MetricRegistry metricRegistry = new MetricRegistry(); + DropwizardMeterRegistry dropwizardMeterRegistry = + new DropwizardMeterRegistry( + dropwizardConfig, + metricRegistry, + HierarchicalNameMapper.DEFAULT, + io.micrometer.core.instrument.Clock.SYSTEM) { + @Override + protected Double nullGaugeValue() { + return null; + } + }; + return dropwizardMeterRegistry; + } }