Skip to content

Commit ea9cf27

Browse files
authored
GH-3741: Fix metric tag to show underlying exception type
Fixes: #3741 Issue: #3741 When exceptions occur in Kafka listeners, the metrics currently show `ListenerExecutionFailedException` in both the `error` tag (when using observation) and `exception` tag (when using micrometer without observation), rather than the actual underlying exception. * Modify ListenerContainer to pass actual exception to failure metrics * Update MessagingMessageListenerAdapter to report cause to observation * Add MicrometerMetricsTests to verify both observation and non-observation metrics * Fix ObservationTests to verify correct error reporting in metrics This ensures metrics show the actual underlying exception while maintaining existing span behavior. Signed-off-by: Soby Chacko <[email protected]> **Auto-cherry-pick to `3.3.x` & `3.2.x`** * Addressing PR review
1 parent 4b83480 commit ea9cf27

File tree

4 files changed

+213
-9
lines changed

4 files changed

+213
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2357,13 +2357,18 @@ private void successTimer(@Nullable Object sample, @Nullable ConsumerRecord<?, ?
23572357
}
23582358
}
23592359

2360-
private void failureTimer(@Nullable Object sample, @Nullable ConsumerRecord<?, ?> record) {
2360+
private void failureTimer(@Nullable Object sample, @Nullable ConsumerRecord<?, ?> record,
2361+
Throwable exception) {
23612362
if (sample != null) {
2363+
String exceptionName = exception.getCause() != null
2364+
? exception.getCause().getClass().getSimpleName()
2365+
: exception.getClass().getSimpleName();
2366+
23622367
if (this.micrometerTagsProvider == null || record == null) {
2363-
this.micrometerHolder.failure(sample, "ListenerExecutionFailedException");
2368+
this.micrometerHolder.failure(sample, exceptionName);
23642369
}
23652370
else {
2366-
this.micrometerHolder.failure(sample, "ListenerExecutionFailedException", record);
2371+
this.micrometerHolder.failure(sample, exceptionName, record);
23672372
}
23682373
}
23692374
}
@@ -2441,7 +2446,7 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords<K, V> r
24412446
}
24422447
catch (RuntimeException e) {
24432448
this.batchFailed = true;
2444-
failureTimer(sample, null);
2449+
failureTimer(sample, null, e);
24452450
batchInterceptAfter(records, e);
24462451
throw e;
24472452
}
@@ -2776,7 +2781,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
27762781
recordInterceptAfter(cRecord, null);
27772782
}
27782783
catch (RuntimeException e) {
2779-
failureTimer(sample, cRecord);
2784+
failureTimer(sample, cRecord, e);
27802785
recordInterceptAfter(cRecord, e);
27812786
if (!isListenerAdapterObservationAware()) {
27822787
observation.error(e);

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,7 @@ protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, C
425425
}
426426
catch (ListenerExecutionFailedException e) {
427427
listenerError = e;
428-
currentObservation.error(e);
428+
currentObservation.error(e.getCause() != null ? e.getCause() : e);
429429
handleException(records, acknowledgment, consumer, message, e);
430430
}
431431
catch (Error e) {
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/*
2+
* Copyright 2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support.micrometer;
18+
19+
import java.util.concurrent.CountDownLatch;
20+
import java.util.concurrent.TimeUnit;
21+
22+
import io.micrometer.core.instrument.MeterRegistry;
23+
import io.micrometer.core.instrument.Timer;
24+
import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler;
25+
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
26+
import io.micrometer.observation.ObservationRegistry;
27+
import org.apache.kafka.clients.consumer.ConsumerRecord;
28+
import org.junit.jupiter.api.Test;
29+
30+
import org.springframework.beans.factory.annotation.Autowired;
31+
import org.springframework.context.annotation.Bean;
32+
import org.springframework.context.annotation.Configuration;
33+
import org.springframework.kafka.annotation.EnableKafka;
34+
import org.springframework.kafka.annotation.KafkaListener;
35+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
36+
import org.springframework.kafka.core.ConsumerFactory;
37+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
38+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
39+
import org.springframework.kafka.core.KafkaTemplate;
40+
import org.springframework.kafka.core.ProducerFactory;
41+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
42+
import org.springframework.kafka.test.context.EmbeddedKafka;
43+
import org.springframework.kafka.test.utils.KafkaTestUtils;
44+
import org.springframework.test.annotation.DirtiesContext;
45+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
46+
47+
import static org.assertj.core.api.Assertions.assertThat;
48+
49+
/**
50+
* @author Soby Chacko
51+
* @since 3.2.7
52+
*/
53+
@SpringJUnitConfig
54+
@EmbeddedKafka(topics = { MicrometerMetricsTests.METRICS_TEST_TOPIC }, partitions = 1)
55+
@DirtiesContext
56+
public class MicrometerMetricsTests {
57+
58+
public final static String METRICS_TEST_TOPIC = "metrics.test.topic";
59+
60+
@Test
61+
void verifyMetricsWithoutObservation(@Autowired MetricsListener listener,
62+
@Autowired MeterRegistry meterRegistry,
63+
@Autowired KafkaTemplate<Integer, String> template)
64+
throws Exception {
65+
66+
template.send(METRICS_TEST_TOPIC, "test").get(10, TimeUnit.SECONDS);
67+
assertThat(listener.latch.await(10, TimeUnit.SECONDS)).isTrue();
68+
69+
Timer timer = meterRegistry.find("spring.kafka.listener")
70+
.tags("name", "metricsTest-0")
71+
.tag("result", "failure")
72+
.timer();
73+
74+
assertThat(timer).isNotNull();
75+
assertThat(timer.getId().getTag("exception"))
76+
.isEqualTo("IllegalStateException");
77+
}
78+
79+
@Test
80+
void verifyMetricsWithObservation(@Autowired ObservationListener observationListener,
81+
@Autowired MeterRegistry meterRegistry,
82+
@Autowired KafkaTemplate<Integer, String> template)
83+
throws Exception {
84+
85+
template.send(METRICS_TEST_TOPIC, "test").get(10, TimeUnit.SECONDS);
86+
assertThat(observationListener.latch.await(10, TimeUnit.SECONDS)).isTrue();
87+
88+
Timer timer = meterRegistry.find("spring.kafka.listener")
89+
.tag("spring.kafka.listener.id", "observationTest-0")
90+
.tag("error", "IllegalStateException")
91+
.timer();
92+
93+
assertThat(timer).isNotNull();
94+
}
95+
96+
@Configuration
97+
@EnableKafka
98+
static class Config {
99+
100+
@Bean
101+
ProducerFactory<Integer, String> producerFactory(EmbeddedKafkaBroker broker) {
102+
return new DefaultKafkaProducerFactory<>(
103+
KafkaTestUtils.producerProps(broker));
104+
}
105+
106+
@Bean
107+
ConsumerFactory<Integer, String> consumerFactory(EmbeddedKafkaBroker broker) {
108+
return new DefaultKafkaConsumerFactory<>(
109+
KafkaTestUtils.consumerProps("metrics", "false", broker));
110+
}
111+
112+
@Bean
113+
KafkaTemplate<Integer, String> template(ProducerFactory<Integer, String> pf) {
114+
return new KafkaTemplate<>(pf);
115+
}
116+
117+
@Bean
118+
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(
119+
ConsumerFactory<Integer, String> cf) {
120+
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
121+
new ConcurrentKafkaListenerContainerFactory<>();
122+
factory.setConsumerFactory(cf);
123+
factory.getContainerProperties().setMicrometerEnabled(true);
124+
factory.getContainerProperties().setObservationEnabled(false);
125+
return factory;
126+
}
127+
128+
@Bean
129+
ConcurrentKafkaListenerContainerFactory<Integer, String> observationListenerContainerFactory(
130+
ConsumerFactory<Integer, String> cf, ObservationRegistry observationRegistry) {
131+
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
132+
new ConcurrentKafkaListenerContainerFactory<>();
133+
factory.setConsumerFactory(cf);
134+
factory.getContainerProperties().setObservationEnabled(true);
135+
factory.getContainerProperties().setObservationRegistry(observationRegistry);
136+
return factory;
137+
}
138+
139+
@Bean
140+
MetricsListener metricsListener() {
141+
return new MetricsListener();
142+
}
143+
144+
@Bean
145+
MeterRegistry meterRegistry() {
146+
return new SimpleMeterRegistry();
147+
}
148+
149+
@Bean
150+
ObservationListener observationListener() {
151+
return new ObservationListener();
152+
}
153+
154+
@Bean
155+
ObservationRegistry observationRegistry(MeterRegistry meterRegistry) {
156+
ObservationRegistry observationRegistry = ObservationRegistry.create();
157+
observationRegistry.observationConfig()
158+
.observationHandler(new DefaultMeterObservationHandler(meterRegistry));
159+
return observationRegistry;
160+
}
161+
}
162+
163+
static class MetricsListener {
164+
final CountDownLatch latch = new CountDownLatch(1);
165+
166+
@KafkaListener(id = "metricsTest", topics = METRICS_TEST_TOPIC)
167+
void listen(ConsumerRecord<Integer, String> in) {
168+
try {
169+
throw new IllegalStateException("metrics test exception");
170+
}
171+
finally {
172+
latch.countDown();
173+
}
174+
}
175+
}
176+
177+
static class ObservationListener {
178+
final CountDownLatch latch = new CountDownLatch(1);
179+
180+
@KafkaListener(id = "observationTest",
181+
topics = METRICS_TEST_TOPIC,
182+
containerFactory = "observationListenerContainerFactory")
183+
void listen(ConsumerRecord<Integer, String> in) {
184+
try {
185+
throw new IllegalStateException("observation test exception");
186+
}
187+
finally {
188+
latch.countDown();
189+
}
190+
}
191+
}
192+
193+
}
194+

spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -359,8 +359,9 @@ private void assertThatAdmin(Object object, KafkaAdmin admin, String brokersStri
359359
@Test
360360
void observationRuntimeException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer,
361361
@Autowired @Qualifier("throwableTemplate") KafkaTemplate<Integer, String> runtimeExceptionTemplate,
362-
@Autowired KafkaListenerEndpointRegistry endpointRegistry, @Autowired Config config)
363-
throws ExecutionException, InterruptedException, TimeoutException {
362+
@Autowired KafkaListenerEndpointRegistry endpointRegistry,
363+
@Autowired MeterRegistry meterRegistry, @Autowired Config config)
364+
throws ExecutionException, InterruptedException, TimeoutException {
364365

365366
runtimeExceptionTemplate.send(OBSERVATION_RUNTIME_EXCEPTION, "testRuntimeException").get(10, TimeUnit.SECONDS);
366367
assertThat(listener.latch4.await(10, TimeUnit.SECONDS)).isTrue();
@@ -372,10 +373,14 @@ void observationRuntimeException(@Autowired ExceptionListener listener, @Autowir
372373
assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("throwableTemplate");
373374
span = spans.poll();
374375
assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs4-0");
375-
assertThat(span.getError().getCause())
376+
assertThat(span.getError())
376377
.isInstanceOf(IllegalStateException.class)
377378
.hasMessage("obs4 run time exception");
378379

380+
assertThat(meterRegistry.get("spring.kafka.listener")
381+
.tag("error", "IllegalStateException")
382+
.timer().count()).isEqualTo(1);
383+
379384
assertThat(config.scopeInFailureReference.get()).isNotNull();
380385
}
381386

0 commit comments

Comments
 (0)