Skip to content

Commit 61016db

Browse files
authored
GH-3049: Fix Listener exceptions for observations
Fixes: #3049 Listener exceptions are not saved to the observation. * Embedded the (original) exception into the observation, allowing downstream tracing code to handle it. * Add unit test for observation `Error` and `RuntimeException`. * Unify the `runtimeExceptionTemplate` and `errorTemplate` into a `throwableTemplate`. **Auto-cherry-pick to `3.1.x` & `3.0.x`**
1 parent 8c3d656 commit 61016db

File tree

2 files changed

+115
-34
lines changed

2 files changed

+115
-34
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2762,37 +2762,37 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
27622762
DefaultKafkaListenerObservationConvention.INSTANCE,
27632763
() -> new KafkaRecordReceiverContext(cRecord, getListenerId(), this::clusterId),
27642764
this.observationRegistry);
2765-
return observation.observe(() -> {
2766-
try {
2765+
try {
2766+
observation.observe(() -> {
27672767
invokeOnMessage(cRecord);
27682768
successTimer(sample, cRecord);
27692769
recordInterceptAfter(cRecord, null);
2770+
});
2771+
}
2772+
catch (RuntimeException e) {
2773+
failureTimer(sample, cRecord);
2774+
recordInterceptAfter(cRecord, e);
2775+
if (this.commonErrorHandler == null) {
2776+
throw e;
27702777
}
2771-
catch (RuntimeException e) {
2772-
failureTimer(sample, cRecord);
2773-
recordInterceptAfter(cRecord, e);
2774-
if (this.commonErrorHandler == null) {
2775-
throw e;
2776-
}
2777-
try {
2778-
invokeErrorHandler(cRecord, iterator, e);
2779-
commitOffsetsIfNeededAfterHandlingError(cRecord);
2780-
}
2781-
catch (KafkaException ke) {
2782-
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
2783-
return ke;
2784-
}
2785-
catch (RuntimeException ee) {
2786-
this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION);
2787-
return ee;
2788-
}
2789-
catch (Error er) { // NOSONAR
2790-
this.logger.error(er, "Error handler threw an error");
2791-
throw er;
2792-
}
2778+
try {
2779+
invokeErrorHandler(cRecord, iterator, e);
2780+
commitOffsetsIfNeededAfterHandlingError(cRecord);
27932781
}
2794-
return null;
2795-
});
2782+
catch (KafkaException ke) {
2783+
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
2784+
return ke;
2785+
}
2786+
catch (RuntimeException ee) {
2787+
this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION);
2788+
return ee;
2789+
}
2790+
catch (Error er) { // NOSONAR
2791+
this.logger.error(er, "Error handler threw an error");
2792+
throw er;
2793+
}
2794+
}
2795+
return null;
27962796
}
27972797

27982798
private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecord<K, V> cRecord) {

spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java

Lines changed: 89 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2023 the original author or authors.
2+
* Copyright 2022-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -35,6 +35,7 @@
3535
import org.apache.kafka.clients.consumer.ConsumerRecord;
3636
import org.apache.kafka.clients.producer.ProducerConfig;
3737
import org.apache.kafka.common.errors.InvalidTopicException;
38+
import org.apache.kafka.common.header.Header;
3839
import org.apache.kafka.common.header.Headers;
3940
import org.junit.jupiter.api.Test;
4041

@@ -54,7 +55,6 @@
5455
import org.springframework.kafka.core.KafkaAdmin;
5556
import org.springframework.kafka.core.KafkaTemplate;
5657
import org.springframework.kafka.core.ProducerFactory;
57-
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
5858
import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention;
5959
import org.springframework.kafka.support.micrometer.KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention;
6060
import org.springframework.kafka.test.EmbeddedKafkaBroker;
@@ -85,14 +85,20 @@
8585
/**
8686
* @author Gary Russell
8787
* @author Artem Bilan
88+
* @author Wang Zhiyang
8889
*
8990
* @since 3.0
9091
*/
9192
@SpringJUnitConfig
92-
@EmbeddedKafka(topics = { "observation.testT1", "observation.testT2", "ObservationTests.testT3" })
93+
@EmbeddedKafka(topics = { "observation.testT1", "observation.testT2", "observation.testT3",
94+
ObservationTests.OBSERVATION_RUNTIME_EXCEPTION, ObservationTests.OBSERVATION_ERROR})
9395
@DirtiesContext
9496
public class ObservationTests {
9597

98+
public final static String OBSERVATION_RUNTIME_EXCEPTION = "observation.runtime-exception";
99+
100+
public final static String OBSERVATION_ERROR = "observation.error";
101+
96102
@Test
97103
void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, String> template,
98104
@Autowired SimpleTracer tracer, @Autowired KafkaListenerEndpointRegistry rler,
@@ -106,8 +112,8 @@ void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, St
106112
assertThat(listener.latch1.await(10, TimeUnit.SECONDS)).isTrue();
107113
assertThat(listener.record).isNotNull();
108114
Headers headers = listener.record.headers();
109-
assertThat(headers.lastHeader("foo")).extracting(hdr -> hdr.value()).isEqualTo("some foo value".getBytes());
110-
assertThat(headers.lastHeader("bar")).extracting(hdr -> hdr.value()).isEqualTo("some bar value".getBytes());
115+
assertThat(headers.lastHeader("foo")).extracting(Header::value).isEqualTo("some foo value".getBytes());
116+
assertThat(headers.lastHeader("bar")).extracting(Header::value).isEqualTo("some bar value".getBytes());
111117
Deque<SimpleSpan> spans = tracer.getSpans();
112118
assertThat(spans).hasSize(4);
113119
SimpleSpan span = spans.poll();
@@ -148,14 +154,15 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context)
148154
}
149155

150156
});
157+
151158
rler.getListenerContainer("obs1").stop();
152159
rler.getListenerContainer("obs1").start();
153160
template.send("observation.testT1", "test").get(10, TimeUnit.SECONDS);
154161
assertThat(listener.latch2.await(10, TimeUnit.SECONDS)).isTrue();
155162
assertThat(listener.record).isNotNull();
156163
headers = listener.record.headers();
157-
assertThat(headers.lastHeader("foo")).extracting(hdr -> hdr.value()).isEqualTo("some foo value".getBytes());
158-
assertThat(headers.lastHeader("bar")).extracting(hdr -> hdr.value()).isEqualTo("some bar value".getBytes());
164+
assertThat(headers.lastHeader("foo")).extracting(Header::value).isEqualTo("some foo value".getBytes());
165+
assertThat(headers.lastHeader("bar")).extracting(Header::value).isEqualTo("some bar value".getBytes());
159166
assertThat(spans).hasSize(4);
160167
span = spans.poll();
161168
assertThat(span.getTags()).containsEntry("spring.kafka.template.name", "template");
@@ -230,6 +237,48 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context)
230237
.doesNotHaveMeterWithNameAndTags("spring.kafka.template", KeyValues.of("error", "KafkaException"));
231238
}
232239

240+
@Test
241+
void observationRuntimeException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer,
242+
@Autowired @Qualifier("throwableTemplate") KafkaTemplate<Integer, String> runtimeExceptionTemplate,
243+
@Autowired KafkaListenerEndpointRegistry endpointRegistry)
244+
throws ExecutionException, InterruptedException, TimeoutException {
245+
246+
runtimeExceptionTemplate.send(OBSERVATION_RUNTIME_EXCEPTION, "testRuntimeException").get(10, TimeUnit.SECONDS);
247+
assertThat(listener.latch4.await(10, TimeUnit.SECONDS)).isTrue();
248+
endpointRegistry.getListenerContainer("obs4").stop();
249+
250+
Deque<SimpleSpan> spans = tracer.getSpans();
251+
assertThat(spans).hasSize(2);
252+
SimpleSpan span = spans.poll();
253+
assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("throwableTemplate");
254+
span = spans.poll();
255+
assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs4-0");
256+
assertThat(span.getError().getCause())
257+
.isInstanceOf(IllegalStateException.class)
258+
.hasMessage("obs4 run time exception");
259+
}
260+
261+
@Test
262+
void observationErrorException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer,
263+
@Autowired @Qualifier("throwableTemplate") KafkaTemplate<Integer, String> errorTemplate,
264+
@Autowired KafkaListenerEndpointRegistry endpointRegistry)
265+
throws ExecutionException, InterruptedException, TimeoutException {
266+
267+
errorTemplate.send(OBSERVATION_ERROR, "testError").get(10, TimeUnit.SECONDS);
268+
assertThat(listener.latch5.await(10, TimeUnit.SECONDS)).isTrue();
269+
endpointRegistry.getListenerContainer("obs5").stop();
270+
271+
Deque<SimpleSpan> spans = tracer.getSpans();
272+
assertThat(spans).hasSize(2);
273+
SimpleSpan span = spans.poll();
274+
assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("throwableTemplate");
275+
span = spans.poll();
276+
assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs5-0");
277+
assertThat(span.getError())
278+
.isInstanceOf(Error.class)
279+
.hasMessage("obs5 error");
280+
}
281+
233282
@Configuration
234283
@EnableKafka
235284
public static class Config {
@@ -276,6 +325,13 @@ KafkaTemplate<Integer, String> customTemplate(ProducerFactory<Integer, String> p
276325
return template;
277326
}
278327

328+
@Bean
329+
KafkaTemplate<Integer, String> throwableTemplate(ProducerFactory<Integer, String> pf) {
330+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
331+
template.setObservationEnabled(true);
332+
return template;
333+
}
334+
279335
@Bean
280336
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(
281337
ConsumerFactory<Integer, String> cf) {
@@ -286,7 +342,7 @@ ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerF
286342
factory.getContainerProperties().setObservationEnabled(true);
287343
factory.setContainerCustomizer(container -> {
288344
if (container.getListenerId().equals("obs3")) {
289-
((AbstractMessageListenerContainer<Integer, String>) container).setKafkaAdmin(this.mockAdmin);
345+
container.setKafkaAdmin(this.mockAdmin);
290346
}
291347
});
292348
return factory;
@@ -352,6 +408,11 @@ Listener listener(KafkaTemplate<Integer, String> template) {
352408
return new Listener(template);
353409
}
354410

411+
@Bean
412+
ExceptionListener exceptionListener() {
413+
return new ExceptionListener();
414+
}
415+
355416
}
356417

357418
public static class Listener {
@@ -387,4 +448,24 @@ void listen3(ConsumerRecord<Integer, String> in) {
387448

388449
}
389450

451+
public static class ExceptionListener {
452+
453+
final CountDownLatch latch4 = new CountDownLatch(1);
454+
455+
final CountDownLatch latch5 = new CountDownLatch(1);
456+
457+
@KafkaListener(id = "obs4", topics = OBSERVATION_RUNTIME_EXCEPTION)
458+
void listenRuntimeException(ConsumerRecord<Integer, String> in) {
459+
this.latch4.countDown();
460+
throw new IllegalStateException("obs4 run time exception");
461+
}
462+
463+
@KafkaListener(id = "obs5", topics = OBSERVATION_ERROR)
464+
void listenError(ConsumerRecord<Integer, String> in) {
465+
this.latch5.countDown();
466+
throw new Error("obs5 error");
467+
}
468+
469+
}
470+
390471
}

0 commit comments

Comments
 (0)