Skip to content

Commit 45ba99d

Browse files
committed
Listener exceptions are not saved to the observation.
* Embedded the (original) exception into the observation, allowing downstream tracing code to handle it. * Add unit test for observation Error and RuntimeException. Resolves spring-projects#3049
1 parent cd4341c commit 45ba99d

File tree

2 files changed

+147
-36
lines changed

2 files changed

+147
-36
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 51 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2763,6 +2763,7 @@ private void pauseForNackSleep() {
27632763
* @throws Error an error.
27642764
*/
27652765
@Nullable
2766+
@SuppressWarnings("try")
27662767
private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cRecord, // NOSONAR
27672768
Iterator<ConsumerRecord<K, V>> iterator) {
27682769

@@ -2772,37 +2773,59 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
27722773
DefaultKafkaListenerObservationConvention.INSTANCE,
27732774
() -> new KafkaRecordReceiverContext(cRecord, getListenerId(), this::clusterId),
27742775
this.observationRegistry);
2775-
return observation.observe(() -> {
2776+
RuntimeException runtimeException;
2777+
observation.start();
2778+
try {
2779+
try (Observation.Scope ignored = observation.openScope()) {
2780+
runtimeException = doInvokeRecordListener(cRecord, iterator, sample);
2781+
}
2782+
catch (Throwable error) {
2783+
observation.error(error);
2784+
throw error;
2785+
}
2786+
if (runtimeException != null) {
2787+
observation.error(runtimeException);
2788+
}
2789+
}
2790+
finally {
2791+
observation.stop();
2792+
}
2793+
return runtimeException;
2794+
}
2795+
2796+
@Nullable
2797+
private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cRecord, // NOSONAR
2798+
Iterator<ConsumerRecord<K, V>> iterator, @Nullable Object sample) {
2799+
2800+
try {
2801+
invokeOnMessage(cRecord);
2802+
successTimer(sample, cRecord);
2803+
recordInterceptAfter(cRecord, null);
2804+
}
2805+
catch (RuntimeException e) {
2806+
failureTimer(sample, cRecord);
2807+
recordInterceptAfter(cRecord, e);
2808+
if (this.commonErrorHandler == null) {
2809+
throw e;
2810+
}
27762811
try {
2777-
invokeOnMessage(cRecord);
2778-
successTimer(sample, cRecord);
2779-
recordInterceptAfter(cRecord, null);
2812+
invokeErrorHandler(cRecord, iterator, e);
2813+
commitOffsetsIfNeededAfterHandlingError(cRecord);
27802814
}
2781-
catch (RuntimeException e) {
2782-
failureTimer(sample, cRecord);
2783-
recordInterceptAfter(cRecord, e);
2784-
if (this.commonErrorHandler == null) {
2785-
throw e;
2786-
}
2787-
try {
2788-
invokeErrorHandler(cRecord, iterator, e);
2789-
commitOffsetsIfNeededAfterHandlingError(cRecord);
2790-
}
2791-
catch (KafkaException ke) {
2792-
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
2793-
return ke;
2794-
}
2795-
catch (RuntimeException ee) {
2796-
this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION);
2797-
return ee;
2798-
}
2799-
catch (Error er) { // NOSONAR
2800-
this.logger.error(er, "Error handler threw an error");
2801-
throw er;
2802-
}
2815+
catch (KafkaException ke) {
2816+
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
2817+
return ke;
28032818
}
2804-
return null;
2805-
});
2819+
catch (RuntimeException ee) {
2820+
this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION);
2821+
return ee;
2822+
}
2823+
catch (Error er) { // NOSONAR
2824+
this.logger.error(er, "Error handler threw an error");
2825+
throw er;
2826+
}
2827+
}
2828+
return null;
28062829
}
28072830

28082831
private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecord<K, V> cRecord) {

spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java

Lines changed: 96 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2023 the original author or authors.
2+
* Copyright 2022-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -35,6 +35,7 @@
3535
import org.apache.kafka.clients.consumer.ConsumerRecord;
3636
import org.apache.kafka.clients.producer.ProducerConfig;
3737
import org.apache.kafka.common.errors.InvalidTopicException;
38+
import org.apache.kafka.common.header.Header;
3839
import org.apache.kafka.common.header.Headers;
3940
import org.junit.jupiter.api.Test;
4041

@@ -54,7 +55,6 @@
5455
import org.springframework.kafka.core.KafkaAdmin;
5556
import org.springframework.kafka.core.KafkaTemplate;
5657
import org.springframework.kafka.core.ProducerFactory;
57-
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
5858
import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention;
5959
import org.springframework.kafka.support.micrometer.KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention;
6060
import org.springframework.kafka.test.EmbeddedKafkaBroker;
@@ -85,14 +85,20 @@
8585
/**
8686
* @author Gary Russell
8787
* @author Artem Bilan
88+
* @author Wang Zhiyang
8889
*
8990
* @since 3.0
9091
*/
9192
@SpringJUnitConfig
92-
@EmbeddedKafka(topics = { "observation.testT1", "observation.testT2", "ObservationTests.testT3" })
93+
@EmbeddedKafka(topics = { "observation.testT1", "observation.testT2", "observation.testT3",
94+
ObservationTests.OBSERVATION_RUNTIME_EXCEPTION, ObservationTests.OBSERVATION_ERROR})
9395
@DirtiesContext
9496
public class ObservationTests {
9597

98+
public final static String OBSERVATION_RUNTIME_EXCEPTION = "observation.runtime-exception";
99+
100+
public final static String OBSERVATION_ERROR = "observation.error";
101+
96102
@Test
97103
void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, String> template,
98104
@Autowired SimpleTracer tracer, @Autowired KafkaListenerEndpointRegistry rler,
@@ -106,8 +112,8 @@ void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, St
106112
assertThat(listener.latch1.await(10, TimeUnit.SECONDS)).isTrue();
107113
assertThat(listener.record).isNotNull();
108114
Headers headers = listener.record.headers();
109-
assertThat(headers.lastHeader("foo")).extracting(hdr -> hdr.value()).isEqualTo("some foo value".getBytes());
110-
assertThat(headers.lastHeader("bar")).extracting(hdr -> hdr.value()).isEqualTo("some bar value".getBytes());
115+
assertThat(headers.lastHeader("foo")).extracting(Header::value).isEqualTo("some foo value".getBytes());
116+
assertThat(headers.lastHeader("bar")).extracting(Header::value).isEqualTo("some bar value".getBytes());
111117
Deque<SimpleSpan> spans = tracer.getSpans();
112118
assertThat(spans).hasSize(4);
113119
SimpleSpan span = spans.poll();
@@ -148,14 +154,15 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context)
148154
}
149155

150156
});
157+
151158
rler.getListenerContainer("obs1").stop();
152159
rler.getListenerContainer("obs1").start();
153160
template.send("observation.testT1", "test").get(10, TimeUnit.SECONDS);
154161
assertThat(listener.latch2.await(10, TimeUnit.SECONDS)).isTrue();
155162
assertThat(listener.record).isNotNull();
156163
headers = listener.record.headers();
157-
assertThat(headers.lastHeader("foo")).extracting(hdr -> hdr.value()).isEqualTo("some foo value".getBytes());
158-
assertThat(headers.lastHeader("bar")).extracting(hdr -> hdr.value()).isEqualTo("some bar value".getBytes());
164+
assertThat(headers.lastHeader("foo")).extracting(Header::value).isEqualTo("some foo value".getBytes());
165+
assertThat(headers.lastHeader("bar")).extracting(Header::value).isEqualTo("some bar value".getBytes());
159166
assertThat(spans).hasSize(4);
160167
span = spans.poll();
161168
assertThat(span.getTags()).containsEntry("spring.kafka.template.name", "template");
@@ -230,6 +237,48 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context)
230237
.doesNotHaveMeterWithNameAndTags("spring.kafka.template", KeyValues.of("error", "KafkaException"));
231238
}
232239

240+
@Test
241+
void observationRuntimeException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer,
242+
@Autowired @Qualifier("runtimeExceptionTemplate") KafkaTemplate<Integer, String> runtimeExceptionTemplate,
243+
@Autowired KafkaListenerEndpointRegistry endpointRegistry)
244+
throws ExecutionException, InterruptedException, TimeoutException {
245+
246+
runtimeExceptionTemplate.send(OBSERVATION_RUNTIME_EXCEPTION, "testRuntimeException").get(10, TimeUnit.SECONDS);
247+
assertThat(listener.latch4.await(10, TimeUnit.SECONDS)).isTrue();
248+
endpointRegistry.getListenerContainer("obs4").stop();
249+
250+
Deque<SimpleSpan> spans = tracer.getSpans();
251+
assertThat(spans).hasSize(2);
252+
SimpleSpan span = spans.poll();
253+
assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("runtimeExceptionTemplate");
254+
span = spans.poll();
255+
assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs4-0");
256+
assertThat(span.getError().getCause().getCause())
257+
.isInstanceOf(IllegalStateException.class)
258+
.hasMessage("obs4 run time exception");
259+
}
260+
261+
@Test
262+
void observationErrorException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer,
263+
@Autowired @Qualifier("errorTemplate") KafkaTemplate<Integer, String> errorTemplate,
264+
@Autowired KafkaListenerEndpointRegistry endpointRegistry)
265+
throws ExecutionException, InterruptedException, TimeoutException {
266+
267+
errorTemplate.send(OBSERVATION_ERROR, "testError").get(10, TimeUnit.SECONDS);
268+
assertThat(listener.latch5.await(10, TimeUnit.SECONDS)).isTrue();
269+
endpointRegistry.getListenerContainer("obs5").stop();
270+
271+
Deque<SimpleSpan> spans = tracer.getSpans();
272+
assertThat(spans).hasSize(2);
273+
SimpleSpan span = spans.poll();
274+
assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("errorTemplate");
275+
span = spans.poll();
276+
assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs5-0");
277+
assertThat(span.getError())
278+
.isInstanceOf(Error.class)
279+
.hasMessage("obs5 error");
280+
}
281+
233282
@Configuration
234283
@EnableKafka
235284
public static class Config {
@@ -276,6 +325,20 @@ KafkaTemplate<Integer, String> customTemplate(ProducerFactory<Integer, String> p
276325
return template;
277326
}
278327

328+
@Bean
329+
KafkaTemplate<Integer, String> runtimeExceptionTemplate(ProducerFactory<Integer, String> pf) {
330+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
331+
template.setObservationEnabled(true);
332+
return template;
333+
}
334+
335+
@Bean
336+
KafkaTemplate<Integer, String> errorTemplate(ProducerFactory<Integer, String> pf) {
337+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
338+
template.setObservationEnabled(true);
339+
return template;
340+
}
341+
279342
@Bean
280343
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(
281344
ConsumerFactory<Integer, String> cf) {
@@ -286,7 +349,7 @@ ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerF
286349
factory.getContainerProperties().setObservationEnabled(true);
287350
factory.setContainerCustomizer(container -> {
288351
if (container.getListenerId().equals("obs3")) {
289-
((AbstractMessageListenerContainer<Integer, String>) container).setKafkaAdmin(this.mockAdmin);
352+
container.setKafkaAdmin(this.mockAdmin);
290353
}
291354
});
292355
return factory;
@@ -352,6 +415,11 @@ Listener listener(KafkaTemplate<Integer, String> template) {
352415
return new Listener(template);
353416
}
354417

418+
@Bean
419+
ExceptionListener exceptionListener() {
420+
return new ExceptionListener();
421+
}
422+
355423
}
356424

357425
public static class Listener {
@@ -387,4 +455,24 @@ void listen3(ConsumerRecord<Integer, String> in) {
387455

388456
}
389457

458+
public static class ExceptionListener {
459+
460+
final CountDownLatch latch4 = new CountDownLatch(1);
461+
462+
final CountDownLatch latch5 = new CountDownLatch(1);
463+
464+
@KafkaListener(id = "obs4", topics = OBSERVATION_RUNTIME_EXCEPTION)
465+
void listenRuntimeException(ConsumerRecord<Integer, String> in) {
466+
this.latch4.countDown();
467+
throw new IllegalStateException("obs4 run time exception");
468+
}
469+
470+
@KafkaListener(id = "obs5", topics = OBSERVATION_ERROR)
471+
void listenError(ConsumerRecord<Integer, String> in) {
472+
this.latch5.countDown();
473+
throw new Error("obs5 error");
474+
}
475+
476+
}
477+
390478
}

0 commit comments

Comments
 (0)