27
27
import io .micrometer .core .instrument .util .NamedThreadFactory ;
28
28
import org .apache .kafka .common .Metric ;
29
29
import org .apache .kafka .common .MetricName ;
30
+ import org .apache .kafka .common .metrics .KafkaMetric ;
31
+ import org .apache .kafka .common .metrics .Measurable ;
30
32
31
33
import java .time .Duration ;
32
34
import java .util .*;
@@ -69,6 +71,23 @@ class KafkaMetrics implements MeterBinder, AutoCloseable {
69
71
static final String KAFKA_VERSION_TAG_NAME = "kafka.version" ;
70
72
static final String DEFAULT_VALUE = "unknown" ;
71
73
74
+ private static final Set <Class <?>> counterMeasurableClasses = new HashSet <>();
75
+
76
+ static {
77
+ Set <String > classNames = new HashSet <>();
78
+ classNames .add ("org.apache.kafka.common.metrics.stats.CumulativeSum" );
79
+ classNames .add ("org.apache.kafka.common.metrics.stats.CumulativeCount" );
80
+
81
+ for (String className : classNames ) {
82
+ try {
83
+ counterMeasurableClasses .add (Class .forName (className ));
84
+ }
85
+ catch (ClassNotFoundException e ) {
86
+ // Class doesn't exist in this version of kafka client - skip
87
+ }
88
+ }
89
+ }
90
+
72
91
private final Supplier <Map <MetricName , ? extends Metric >> metricsSupplier ;
73
92
74
93
private final AtomicReference <Map <MetricName , ? extends Metric >> metrics = new AtomicReference <>();
@@ -135,7 +154,7 @@ void prepareToBindMetrics(MeterRegistry registry) {
135
154
this .metrics .set (this .metricsSupplier .get ());
136
155
Map <MetricName , ? extends Metric > metrics = this .metrics .get ();
137
156
// Collect static metrics and tags
138
- MetricName startTime = null ;
157
+ Metric startTimeMetric = null ;
139
158
140
159
for (Map .Entry <MetricName , ? extends Metric > entry : metrics .entrySet ()) {
141
160
MetricName name = entry .getKey ();
@@ -144,12 +163,13 @@ void prepareToBindMetrics(MeterRegistry registry) {
144
163
kafkaVersion = (String ) entry .getValue ().metricValue ();
145
164
}
146
165
else if (START_TIME_METRIC_NAME .equals (name .name ())) {
147
- startTime = entry .getKey ();
166
+ startTimeMetric = entry .getValue ();
148
167
}
149
168
}
150
169
151
- if (startTime != null ) {
152
- bindMeter (registry , startTime , meterName (startTime ), meterTags (startTime ));
170
+ if (startTimeMetric != null ) {
171
+ MetricName startTimeMetricName = startTimeMetric .metricName ();
172
+ bindMeter (registry , startTimeMetric , meterName (startTimeMetricName ), meterTags (startTimeMetricName ));
153
173
}
154
174
}
155
175
@@ -220,7 +240,7 @@ else if (tags.size() == meterTagsWithCommonTags.size())
220
240
221
241
List <Tag > tags = meterTags (name );
222
242
try {
223
- Meter meter = bindMeter (registry , metric . metricName () , meterName , tags );
243
+ Meter meter = bindMeter (registry , metric , meterName , tags );
224
244
List <Meter > meters = registryMetersByNames .computeIfAbsent (meterName , k -> new ArrayList <>());
225
245
meters .add (meter );
226
246
}
@@ -242,18 +262,34 @@ else if (tags.size() == meterTagsWithCommonTags.size())
242
262
}
243
263
}
244
264
245
- private Meter bindMeter (MeterRegistry registry , MetricName metricName , String meterName , Iterable <Tag > tags ) {
246
- Meter meter = registerMeter (registry , metricName , meterName , tags );
265
+ private Meter bindMeter (MeterRegistry registry , Metric metric , String meterName , Iterable <Tag > tags ) {
266
+ Meter meter = registerMeter (registry , metric , meterName , tags );
247
267
registeredMeterIds .add (meter .getId ());
248
268
return meter ;
249
269
}
250
270
251
- private Meter registerMeter (MeterRegistry registry , MetricName metricName , String meterName , Iterable <Tag > tags ) {
252
- if (meterName .endsWith ("total" )) {
271
+ private Meter registerMeter (MeterRegistry registry , Metric metric , String meterName , Iterable <Tag > tags ) {
272
+ MetricName metricName = metric .metricName ();
273
+ Class <? extends Measurable > measurableClass = getMeasurableClass (metric );
274
+ if ((measurableClass == null && meterName .endsWith ("total" ))
275
+ || (measurableClass != null && counterMeasurableClasses .contains (measurableClass ))) {
253
276
return registerCounter (registry , metricName , meterName , tags );
254
277
}
255
- else {
256
- return registerGauge (registry , metricName , meterName , tags );
278
+
279
+ return registerGauge (registry , metricName , meterName , tags );
280
+ }
281
+
282
+ @ Nullable
283
+ private static Class <? extends Measurable > getMeasurableClass (Metric metric ) {
284
+ if (!(metric instanceof KafkaMetric )) {
285
+ return null ;
286
+ }
287
+
288
+ try {
289
+ return ((KafkaMetric ) metric ).measurable ().getClass ();
290
+ }
291
+ catch (IllegalStateException ex ) {
292
+ return null ;
257
293
}
258
294
}
259
295
0 commit comments