Skip to content

Commit e6f3b7f

Browse files
authored
Improve implementation of Kafka metrics type mapping (#3513)
* Improve implementation of Kafka metrics type mapping Closes gh-3499 * Assert that type-based improved counter detection produces the same set with ".total"-ending detection
1 parent 266cd4e commit e6f3b7f

File tree

2 files changed

+57
-11
lines changed

2 files changed

+57
-11
lines changed

micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaMetrics.java

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import io.micrometer.core.util.internal.logging.WarnThenDebugLogger;
2828
import org.apache.kafka.common.Metric;
2929
import org.apache.kafka.common.MetricName;
30+
import org.apache.kafka.common.metrics.KafkaMetric;
31+
import org.apache.kafka.common.metrics.Measurable;
3032

3133
import java.time.Duration;
3234
import java.util.*;
@@ -69,6 +71,23 @@ class KafkaMetrics implements MeterBinder, AutoCloseable {
6971
static final String KAFKA_VERSION_TAG_NAME = "kafka.version";
7072
static final String DEFAULT_VALUE = "unknown";
7173

74+
private static final Set<Class<?>> counterMeasurableClasses = new HashSet<>();
75+
76+
static {
77+
Set<String> classNames = new HashSet<>();
78+
classNames.add("org.apache.kafka.common.metrics.stats.CumulativeSum");
79+
classNames.add("org.apache.kafka.common.metrics.stats.CumulativeCount");
80+
81+
for (String className : classNames) {
82+
try {
83+
counterMeasurableClasses.add(Class.forName(className));
84+
}
85+
catch (ClassNotFoundException e) {
86+
// Class doesn't exist in this version of kafka client - skip
87+
}
88+
}
89+
}
90+
7291
private final Supplier<Map<MetricName, ? extends Metric>> metricsSupplier;
7392

7493
private final AtomicReference<Map<MetricName, ? extends Metric>> metrics = new AtomicReference<>();
@@ -135,7 +154,7 @@ void prepareToBindMetrics(MeterRegistry registry) {
135154
this.metrics.set(this.metricsSupplier.get());
136155
Map<MetricName, ? extends Metric> metrics = this.metrics.get();
137156
// Collect static metrics and tags
138-
MetricName startTime = null;
157+
Metric startTimeMetric = null;
139158

140159
for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
141160
MetricName name = entry.getKey();
@@ -144,12 +163,13 @@ void prepareToBindMetrics(MeterRegistry registry) {
144163
kafkaVersion = (String) entry.getValue().metricValue();
145164
}
146165
else if (START_TIME_METRIC_NAME.equals(name.name())) {
147-
startTime = entry.getKey();
166+
startTimeMetric = entry.getValue();
148167
}
149168
}
150169

151-
if (startTime != null) {
152-
bindMeter(registry, startTime, meterName(startTime), meterTags(startTime));
170+
if (startTimeMetric != null) {
171+
MetricName startTimeMetricName = startTimeMetric.metricName();
172+
bindMeter(registry, startTimeMetric, meterName(startTimeMetricName), meterTags(startTimeMetricName));
153173
}
154174
}
155175

@@ -220,7 +240,7 @@ else if (tags.size() == meterTagsWithCommonTags.size())
220240

221241
List<Tag> tags = meterTags(name);
222242
try {
223-
Meter meter = bindMeter(registry, metric.metricName(), meterName, tags);
243+
Meter meter = bindMeter(registry, metric, meterName, tags);
224244
List<Meter> meters = registryMetersByNames.computeIfAbsent(meterName, k -> new ArrayList<>());
225245
meters.add(meter);
226246
}
@@ -242,18 +262,34 @@ else if (tags.size() == meterTagsWithCommonTags.size())
242262
}
243263
}
244264

245-
private Meter bindMeter(MeterRegistry registry, MetricName metricName, String meterName, Iterable<Tag> tags) {
246-
Meter meter = registerMeter(registry, metricName, meterName, tags);
265+
private Meter bindMeter(MeterRegistry registry, Metric metric, String meterName, Iterable<Tag> tags) {
266+
Meter meter = registerMeter(registry, metric, meterName, tags);
247267
registeredMeterIds.add(meter.getId());
248268
return meter;
249269
}
250270

251-
private Meter registerMeter(MeterRegistry registry, MetricName metricName, String meterName, Iterable<Tag> tags) {
252-
if (meterName.endsWith("total")) {
271+
private Meter registerMeter(MeterRegistry registry, Metric metric, String meterName, Iterable<Tag> tags) {
272+
MetricName metricName = metric.metricName();
273+
Class<? extends Measurable> measurableClass = getMeasurableClass(metric);
274+
if ((measurableClass == null && meterName.endsWith("total"))
275+
|| (measurableClass != null && counterMeasurableClasses.contains(measurableClass))) {
253276
return registerCounter(registry, metricName, meterName, tags);
254277
}
255-
else {
256-
return registerGauge(registry, metricName, meterName, tags);
278+
279+
return registerGauge(registry, metricName, meterName, tags);
280+
}
281+
282+
@Nullable
283+
private static Class<? extends Measurable> getMeasurableClass(Metric metric) {
284+
if (!(metric instanceof KafkaMetric)) {
285+
return null;
286+
}
287+
288+
try {
289+
return ((KafkaMetric) metric).measurable().getClass();
290+
}
291+
catch (IllegalStateException ex) {
292+
return null;
257293
}
258294
}
259295

micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetricsIntegrationTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
*/
1616
package io.micrometer.core.instrument.binder.kafka;
1717

18+
import io.micrometer.core.instrument.FunctionCounter;
1819
import io.micrometer.core.instrument.Gauge;
20+
import io.micrometer.core.instrument.Meter;
1921
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
2022
import org.apache.kafka.clients.consumer.Consumer;
2123
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -35,7 +37,9 @@
3537

3638
import java.time.Duration;
3739
import java.util.Collections;
40+
import java.util.List;
3841
import java.util.Properties;
42+
import java.util.stream.Collectors;
3943

4044
import static java.lang.System.out;
4145
import static org.assertj.core.api.Assertions.assertThat;
@@ -117,6 +121,12 @@ void shouldManageProducerAndConsumerMetrics() {
117121
out.println("All meters from producer and consumer:");
118122
printMeters(registry);
119123

124+
List<Meter> metersEndingWithTotal = registry.getMeters().stream()
125+
.filter(meter -> meter.getId().getName().endsWith(".total")).collect(Collectors.toList());
126+
List<Meter> functionCounters = registry.getMeters().stream().filter(meter -> meter instanceof FunctionCounter)
127+
.collect(Collectors.toList());
128+
assertThat(metersEndingWithTotal).isEqualTo(functionCounters);
129+
120130
producerKafkaMetrics.close();
121131
consumerKafkaMetrics.close();
122132
}

0 commit comments

Comments
 (0)