Skip to content

Commit d8bcab0

Browse files
Wzy19930507spring-builds
authored andcommitted
GH-3049: Fix Listener exceptions for observations
Fixes: #3049 Listener exceptions are not saved to the observation. * Embedded the (original) exception into the observation, allowing downstream tracing code to handle it. * Add unit test for observation `Error` and `RuntimeException`. * Unify the `runtimeExceptionTemplate` and `errorTemplate` into a `throwableTemplate`. (cherry picked from commit 61016db)
1 parent ef8c040 commit d8bcab0

File tree

2 files changed

+115
-34
lines changed

2 files changed

+115
-34
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2793,37 +2793,37 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
27932793
DefaultKafkaListenerObservationConvention.INSTANCE,
27942794
() -> new KafkaRecordReceiverContext(cRecord, getListenerId(), this::clusterId),
27952795
this.observationRegistry);
2796-
return observation.observe(() -> {
2797-
try {
2796+
try {
2797+
observation.observe(() -> {
27982798
invokeOnMessage(cRecord);
27992799
successTimer(sample, cRecord);
28002800
recordInterceptAfter(cRecord, null);
2801+
});
2802+
}
2803+
catch (RuntimeException e) {
2804+
failureTimer(sample, cRecord);
2805+
recordInterceptAfter(cRecord, e);
2806+
if (this.commonErrorHandler == null) {
2807+
throw e;
28012808
}
2802-
catch (RuntimeException e) {
2803-
failureTimer(sample, cRecord);
2804-
recordInterceptAfter(cRecord, e);
2805-
if (this.commonErrorHandler == null) {
2806-
throw e;
2807-
}
2808-
try {
2809-
invokeErrorHandler(cRecord, iterator, e);
2810-
commitOffsetsIfNeededAfterHandlingError(cRecord);
2811-
}
2812-
catch (KafkaException ke) {
2813-
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
2814-
return ke;
2815-
}
2816-
catch (RuntimeException ee) {
2817-
this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION);
2818-
return ee;
2819-
}
2820-
catch (Error er) { // NOSONAR
2821-
this.logger.error(er, "Error handler threw an error");
2822-
throw er;
2823-
}
2809+
try {
2810+
invokeErrorHandler(cRecord, iterator, e);
2811+
commitOffsetsIfNeededAfterHandlingError(cRecord);
28242812
}
2825-
return null;
2826-
});
2813+
catch (KafkaException ke) {
2814+
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
2815+
return ke;
2816+
}
2817+
catch (RuntimeException ee) {
2818+
this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION);
2819+
return ee;
2820+
}
2821+
catch (Error er) { // NOSONAR
2822+
this.logger.error(er, "Error handler threw an error");
2823+
throw er;
2824+
}
2825+
}
2826+
return null;
28272827
}
28282828

28292829
private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecord<K, V> cRecord) {

spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java

Lines changed: 89 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2023 the original author or authors.
2+
* Copyright 2022-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -35,6 +35,7 @@
3535
import org.apache.kafka.clients.consumer.ConsumerRecord;
3636
import org.apache.kafka.clients.producer.ProducerConfig;
3737
import org.apache.kafka.common.errors.InvalidTopicException;
38+
import org.apache.kafka.common.header.Header;
3839
import org.apache.kafka.common.header.Headers;
3940
import org.junit.jupiter.api.Test;
4041

@@ -52,7 +53,6 @@
5253
import org.springframework.kafka.core.KafkaAdmin;
5354
import org.springframework.kafka.core.KafkaTemplate;
5455
import org.springframework.kafka.core.ProducerFactory;
55-
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
5656
import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention;
5757
import org.springframework.kafka.support.micrometer.KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention;
5858
import org.springframework.kafka.test.EmbeddedKafkaBroker;
@@ -83,14 +83,20 @@
8383
/**
8484
* @author Gary Russell
8585
* @author Artem Bilan
86+
* @author Wang Zhiyang
8687
*
8788
* @since 3.0
8889
*/
8990
@SpringJUnitConfig
90-
@EmbeddedKafka(topics = { "observation.testT1", "observation.testT2", "ObservationTests.testT3" })
91+
@EmbeddedKafka(topics = { "observation.testT1", "observation.testT2", "observation.testT3",
92+
ObservationTests.OBSERVATION_RUNTIME_EXCEPTION, ObservationTests.OBSERVATION_ERROR})
9193
@DirtiesContext
9294
public class ObservationTests {
9395

96+
public final static String OBSERVATION_RUNTIME_EXCEPTION = "observation.runtime-exception";
97+
98+
public final static String OBSERVATION_ERROR = "observation.error";
99+
94100
@Test
95101
void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, String> template,
96102
@Autowired SimpleTracer tracer, @Autowired KafkaListenerEndpointRegistry rler,
@@ -103,8 +109,8 @@ void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, St
103109
assertThat(listener.latch1.await(10, TimeUnit.SECONDS)).isTrue();
104110
assertThat(listener.record).isNotNull();
105111
Headers headers = listener.record.headers();
106-
assertThat(headers.lastHeader("foo")).extracting(hdr -> hdr.value()).isEqualTo("some foo value".getBytes());
107-
assertThat(headers.lastHeader("bar")).extracting(hdr -> hdr.value()).isEqualTo("some bar value".getBytes());
112+
assertThat(headers.lastHeader("foo")).extracting(Header::value).isEqualTo("some foo value".getBytes());
113+
assertThat(headers.lastHeader("bar")).extracting(Header::value).isEqualTo("some bar value".getBytes());
108114
Deque<SimpleSpan> spans = tracer.getSpans();
109115
assertThat(spans).hasSize(4);
110116
SimpleSpan span = spans.poll();
@@ -145,14 +151,15 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context)
145151
}
146152

147153
});
154+
148155
rler.getListenerContainer("obs1").stop();
149156
rler.getListenerContainer("obs1").start();
150157
template.send("observation.testT1", "test").get(10, TimeUnit.SECONDS);
151158
assertThat(listener.latch2.await(10, TimeUnit.SECONDS)).isTrue();
152159
assertThat(listener.record).isNotNull();
153160
headers = listener.record.headers();
154-
assertThat(headers.lastHeader("foo")).extracting(hdr -> hdr.value()).isEqualTo("some foo value".getBytes());
155-
assertThat(headers.lastHeader("bar")).extracting(hdr -> hdr.value()).isEqualTo("some bar value".getBytes());
161+
assertThat(headers.lastHeader("foo")).extracting(Header::value).isEqualTo("some foo value".getBytes());
162+
assertThat(headers.lastHeader("bar")).extracting(Header::value).isEqualTo("some bar value".getBytes());
156163
assertThat(spans).hasSize(4);
157164
span = spans.poll();
158165
assertThat(span.getTags()).containsEntry("spring.kafka.template.name", "template");
@@ -227,6 +234,48 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context)
227234
.doesNotHaveMeterWithNameAndTags("spring.kafka.template", KeyValues.of("error", "KafkaException"));
228235
}
229236

237+
@Test
238+
void observationRuntimeException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer,
239+
@Autowired @Qualifier("throwableTemplate") KafkaTemplate<Integer, String> runtimeExceptionTemplate,
240+
@Autowired KafkaListenerEndpointRegistry endpointRegistry)
241+
throws ExecutionException, InterruptedException, TimeoutException {
242+
243+
runtimeExceptionTemplate.send(OBSERVATION_RUNTIME_EXCEPTION, "testRuntimeException").get(10, TimeUnit.SECONDS);
244+
assertThat(listener.latch4.await(10, TimeUnit.SECONDS)).isTrue();
245+
endpointRegistry.getListenerContainer("obs4").stop();
246+
247+
Deque<SimpleSpan> spans = tracer.getSpans();
248+
assertThat(spans).hasSize(2);
249+
SimpleSpan span = spans.poll();
250+
assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("throwableTemplate");
251+
span = spans.poll();
252+
assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs4-0");
253+
assertThat(span.getError().getCause())
254+
.isInstanceOf(IllegalStateException.class)
255+
.hasMessage("obs4 run time exception");
256+
}
257+
258+
@Test
259+
void observationErrorException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer,
260+
@Autowired @Qualifier("throwableTemplate") KafkaTemplate<Integer, String> errorTemplate,
261+
@Autowired KafkaListenerEndpointRegistry endpointRegistry)
262+
throws ExecutionException, InterruptedException, TimeoutException {
263+
264+
errorTemplate.send(OBSERVATION_ERROR, "testError").get(10, TimeUnit.SECONDS);
265+
assertThat(listener.latch5.await(10, TimeUnit.SECONDS)).isTrue();
266+
endpointRegistry.getListenerContainer("obs5").stop();
267+
268+
Deque<SimpleSpan> spans = tracer.getSpans();
269+
assertThat(spans).hasSize(2);
270+
SimpleSpan span = spans.poll();
271+
assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("throwableTemplate");
272+
span = spans.poll();
273+
assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs5-0");
274+
assertThat(span.getError())
275+
.isInstanceOf(Error.class)
276+
.hasMessage("obs5 error");
277+
}
278+
230279
@Configuration
231280
@EnableKafka
232281
public static class Config {
@@ -272,6 +321,13 @@ KafkaTemplate<Integer, String> customTemplate(ProducerFactory<Integer, String> p
272321
return template;
273322
}
274323

324+
@Bean
325+
KafkaTemplate<Integer, String> throwableTemplate(ProducerFactory<Integer, String> pf) {
326+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
327+
template.setObservationEnabled(true);
328+
return template;
329+
}
330+
275331
@Bean
276332
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(
277333
ConsumerFactory<Integer, String> cf) {
@@ -282,7 +338,7 @@ ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerF
282338
factory.getContainerProperties().setObservationEnabled(true);
283339
factory.setContainerCustomizer(container -> {
284340
if (container.getListenerId().equals("obs3")) {
285-
((AbstractMessageListenerContainer<Integer, String>) container).setKafkaAdmin(this.mockAdmin);
341+
container.setKafkaAdmin(this.mockAdmin);
286342
}
287343
});
288344
return factory;
@@ -348,6 +404,11 @@ Listener listener(KafkaTemplate<Integer, String> template) {
348404
return new Listener(template);
349405
}
350406

407+
@Bean
408+
ExceptionListener exceptionListener() {
409+
return new ExceptionListener();
410+
}
411+
351412
}
352413

353414
public static class Listener {
@@ -383,4 +444,24 @@ void listen3(ConsumerRecord<Integer, String> in) {
383444

384445
}
385446

447+
public static class ExceptionListener {
448+
449+
final CountDownLatch latch4 = new CountDownLatch(1);
450+
451+
final CountDownLatch latch5 = new CountDownLatch(1);
452+
453+
@KafkaListener(id = "obs4", topics = OBSERVATION_RUNTIME_EXCEPTION)
454+
void listenRuntimeException(ConsumerRecord<Integer, String> in) {
455+
this.latch4.countDown();
456+
throw new IllegalStateException("obs4 run time exception");
457+
}
458+
459+
@KafkaListener(id = "obs5", topics = OBSERVATION_ERROR)
460+
void listenError(ConsumerRecord<Integer, String> in) {
461+
this.latch5.countDown();
462+
throw new Error("obs5 error");
463+
}
464+
465+
}
466+
386467
}

0 commit comments

Comments
 (0)