Skip to content

GH-3049: Listener exceptions are not saved to the observation. #3080

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2762,37 +2762,37 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
DefaultKafkaListenerObservationConvention.INSTANCE,
() -> new KafkaRecordReceiverContext(cRecord, getListenerId(), this::clusterId),
this.observationRegistry);
return observation.observe(() -> {
try {
try {
observation.observe(() -> {
invokeOnMessage(cRecord);
successTimer(sample, cRecord);
recordInterceptAfter(cRecord, null);
});
}
catch (RuntimeException e) {
failureTimer(sample, cRecord);
recordInterceptAfter(cRecord, e);
if (this.commonErrorHandler == null) {
throw e;
}
catch (RuntimeException e) {
failureTimer(sample, cRecord);
recordInterceptAfter(cRecord, e);
if (this.commonErrorHandler == null) {
throw e;
}
try {
invokeErrorHandler(cRecord, iterator, e);
commitOffsetsIfNeededAfterHandlingError(cRecord);
}
catch (KafkaException ke) {
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
return ke;
}
catch (RuntimeException ee) {
this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION);
return ee;
}
catch (Error er) { // NOSONAR
this.logger.error(er, "Error handler threw an error");
throw er;
}
try {
invokeErrorHandler(cRecord, iterator, e);
commitOffsetsIfNeededAfterHandlingError(cRecord);
}
return null;
});
catch (KafkaException ke) {
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
return ke;
}
catch (RuntimeException ee) {
this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION);
return ee;
}
catch (Error er) { // NOSONAR
this.logger.error(er, "Error handler threw an error");
throw er;
}
}
return null;
}

private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecord<K, V> cRecord) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 the original author or authors.
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,6 +35,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.junit.jupiter.api.Test;

Expand All @@ -54,7 +55,6 @@
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention;
import org.springframework.kafka.support.micrometer.KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
Expand Down Expand Up @@ -85,14 +85,20 @@
/**
* @author Gary Russell
* @author Artem Bilan
* @author Wang Zhiyang
*
* @since 3.0
*/
@SpringJUnitConfig
@EmbeddedKafka(topics = { "observation.testT1", "observation.testT2", "ObservationTests.testT3" })
@EmbeddedKafka(topics = { "observation.testT1", "observation.testT2", "observation.testT3",
ObservationTests.OBSERVATION_RUNTIME_EXCEPTION, ObservationTests.OBSERVATION_ERROR})
@DirtiesContext
public class ObservationTests {

public final static String OBSERVATION_RUNTIME_EXCEPTION = "observation.runtime-exception";

public final static String OBSERVATION_ERROR = "observation.error";

@Test
void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, String> template,
@Autowired SimpleTracer tracer, @Autowired KafkaListenerEndpointRegistry rler,
Expand All @@ -106,8 +112,8 @@ void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, St
assertThat(listener.latch1.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(listener.record).isNotNull();
Headers headers = listener.record.headers();
assertThat(headers.lastHeader("foo")).extracting(hdr -> hdr.value()).isEqualTo("some foo value".getBytes());
assertThat(headers.lastHeader("bar")).extracting(hdr -> hdr.value()).isEqualTo("some bar value".getBytes());
assertThat(headers.lastHeader("foo")).extracting(Header::value).isEqualTo("some foo value".getBytes());
assertThat(headers.lastHeader("bar")).extracting(Header::value).isEqualTo("some bar value".getBytes());
Deque<SimpleSpan> spans = tracer.getSpans();
assertThat(spans).hasSize(4);
SimpleSpan span = spans.poll();
Expand Down Expand Up @@ -148,14 +154,15 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context)
}

});

rler.getListenerContainer("obs1").stop();
rler.getListenerContainer("obs1").start();
template.send("observation.testT1", "test").get(10, TimeUnit.SECONDS);
assertThat(listener.latch2.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(listener.record).isNotNull();
headers = listener.record.headers();
assertThat(headers.lastHeader("foo")).extracting(hdr -> hdr.value()).isEqualTo("some foo value".getBytes());
assertThat(headers.lastHeader("bar")).extracting(hdr -> hdr.value()).isEqualTo("some bar value".getBytes());
assertThat(headers.lastHeader("foo")).extracting(Header::value).isEqualTo("some foo value".getBytes());
assertThat(headers.lastHeader("bar")).extracting(Header::value).isEqualTo("some bar value".getBytes());
assertThat(spans).hasSize(4);
span = spans.poll();
assertThat(span.getTags()).containsEntry("spring.kafka.template.name", "template");
Expand Down Expand Up @@ -230,6 +237,48 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context)
.doesNotHaveMeterWithNameAndTags("spring.kafka.template", KeyValues.of("error", "KafkaException"));
}

@Test
void observationRuntimeException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer,
@Autowired @Qualifier("throwableTemplate") KafkaTemplate<Integer, String> runtimeExceptionTemplate,
@Autowired KafkaListenerEndpointRegistry endpointRegistry)
throws ExecutionException, InterruptedException, TimeoutException {

runtimeExceptionTemplate.send(OBSERVATION_RUNTIME_EXCEPTION, "testRuntimeException").get(10, TimeUnit.SECONDS);
assertThat(listener.latch4.await(10, TimeUnit.SECONDS)).isTrue();
endpointRegistry.getListenerContainer("obs4").stop();

Deque<SimpleSpan> spans = tracer.getSpans();
assertThat(spans).hasSize(2);
SimpleSpan span = spans.poll();
assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("throwableTemplate");
span = spans.poll();
assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs4-0");
assertThat(span.getError().getCause())
.isInstanceOf(IllegalStateException.class)
.hasMessage("obs4 run time exception");
}

@Test
void observationErrorException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer,
@Autowired @Qualifier("throwableTemplate") KafkaTemplate<Integer, String> errorTemplate,
@Autowired KafkaListenerEndpointRegistry endpointRegistry)
throws ExecutionException, InterruptedException, TimeoutException {

errorTemplate.send(OBSERVATION_ERROR, "testError").get(10, TimeUnit.SECONDS);
assertThat(listener.latch5.await(10, TimeUnit.SECONDS)).isTrue();
endpointRegistry.getListenerContainer("obs5").stop();

Deque<SimpleSpan> spans = tracer.getSpans();
assertThat(spans).hasSize(2);
SimpleSpan span = spans.poll();
assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("throwableTemplate");
span = spans.poll();
assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs5-0");
assertThat(span.getError())
.isInstanceOf(Error.class)
.hasMessage("obs5 error");
}

@Configuration
@EnableKafka
public static class Config {
Expand Down Expand Up @@ -276,6 +325,13 @@ KafkaTemplate<Integer, String> customTemplate(ProducerFactory<Integer, String> p
return template;
}

@Bean
KafkaTemplate<Integer, String> throwableTemplate(ProducerFactory<Integer, String> pf) {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setObservationEnabled(true);
return template;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like both of these new KafkaTemplate beans are just exactly the same what is template bean.

Any particular reason why do we need these two?
Why that template is not enough for new tests?

Thanks

Copy link
Contributor Author

@Wzy19930507 Wzy19930507 Feb 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for test tag spring.kafka.template.name, is doesn't matter.
Is ok to unify the runtimeExceptionTemplate and errorTemplate into a throwableTemplate?

}

@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(
ConsumerFactory<Integer, String> cf) {
Expand All @@ -286,7 +342,7 @@ ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerF
factory.getContainerProperties().setObservationEnabled(true);
factory.setContainerCustomizer(container -> {
if (container.getListenerId().equals("obs3")) {
((AbstractMessageListenerContainer<Integer, String>) container).setKafkaAdmin(this.mockAdmin);
container.setKafkaAdmin(this.mockAdmin);
}
});
return factory;
Expand Down Expand Up @@ -352,6 +408,11 @@ Listener listener(KafkaTemplate<Integer, String> template) {
return new Listener(template);
}

@Bean
ExceptionListener exceptionListener() {
return new ExceptionListener();
}

}

public static class Listener {
Expand Down Expand Up @@ -387,4 +448,24 @@ void listen3(ConsumerRecord<Integer, String> in) {

}

public static class ExceptionListener {

final CountDownLatch latch4 = new CountDownLatch(1);

final CountDownLatch latch5 = new CountDownLatch(1);

@KafkaListener(id = "obs4", topics = OBSERVATION_RUNTIME_EXCEPTION)
void listenRuntimeException(ConsumerRecord<Integer, String> in) {
this.latch4.countDown();
throw new IllegalStateException("obs4 run time exception");
}

@KafkaListener(id = "obs5", topics = OBSERVATION_ERROR)
void listenError(ConsumerRecord<Integer, String> in) {
this.latch5.countDown();
throw new Error("obs5 error");
}

}

}