Skip to content

Commit e29ec92

Browse files
tomazfernandesgaryrussell
authored andcommitted
GH-1863 - Configure listener error handler and type converter for @RetryableTopics
1 parent c0303d3 commit e29ec92

File tree

2 files changed

+54
-13
lines changed

2 files changed

+54
-13
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ private boolean processMainAndRetryListeners(KafkaListener kafkaListener, Object
465465
}
466466

467467
RetryTopicConfigurer.EndpointProcessor endpointProcessor = endpointToProcess ->
468-
this.doProcessKafkaListenerAnnotation(endpointToProcess, kafkaListener, bean);
468+
this.processKafkaListenerAnnotationForRetryTopic(endpointToProcess, kafkaListener, bean);
469469

470470
String beanRef = kafkaListener.beanRef();
471471
this.listenerScope.addListener(beanRef, bean);
@@ -546,28 +546,26 @@ protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, Kafka
546546
this.listenerScope.addListener(beanRef, bean);
547547
}
548548

549-
doProcessKafkaListenerAnnotation(endpoint, kafkaListener, bean);
549+
processKafkaListenerAnnotationBeforeRegistration(endpoint, kafkaListener, bean);
550550

551551
String containerFactory = resolve(kafkaListener.containerFactory());
552552
KafkaListenerContainerFactory<?> listenerContainerFactory = resolveContainerFactory(kafkaListener, containerFactory, beanName);
553553

554554
this.registrar.registerEndpoint(endpoint, listenerContainerFactory);
555555

556-
endpoint.setBeanFactory(this.beanFactory);
557-
String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
558-
if (StringUtils.hasText(errorHandlerBeanName)) {
559-
resolveErrorHandler(endpoint, kafkaListener);
560-
}
561-
String converterBeanName = resolveExpressionAsString(kafkaListener.contentTypeConverter(), "contentTypeConverter");
562-
if (StringUtils.hasText(converterBeanName)) {
563-
resolveContentTypeConverter(endpoint, kafkaListener);
564-
}
556+
processKafkaListenerEndpointAfterRegistration(endpoint, kafkaListener);
557+
565558
if (StringUtils.hasText(beanRef)) {
566559
this.listenerScope.removeListener(beanRef);
567560
}
568561
}
569562

570-
private void doProcessKafkaListenerAnnotation(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener, Object bean) {
563+
private void processKafkaListenerAnnotationForRetryTopic(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener, Object bean) {
564+
processKafkaListenerAnnotationBeforeRegistration(endpoint, kafkaListener, bean);
565+
processKafkaListenerEndpointAfterRegistration(endpoint, kafkaListener);
566+
}
567+
568+
private void processKafkaListenerAnnotationBeforeRegistration(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener, Object bean) {
571569
endpoint.setBean(bean);
572570
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
573571
endpoint.setId(getEndpointId(kafkaListener));
@@ -595,6 +593,18 @@ private void doProcessKafkaListenerAnnotation(MethodKafkaListenerEndpoint<?, ?>
595593
endpoint.setSplitIterables(kafkaListener.splitIterables());
596594
}
597595

596+
private void processKafkaListenerEndpointAfterRegistration(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener) {
597+
endpoint.setBeanFactory(this.beanFactory);
598+
String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
599+
if (StringUtils.hasText(errorHandlerBeanName)) {
600+
resolveErrorHandler(endpoint, kafkaListener);
601+
}
602+
String converterBeanName = resolveExpressionAsString(kafkaListener.contentTypeConverter(), "contentTypeConverter");
603+
if (StringUtils.hasText(converterBeanName)) {
604+
resolveContentTypeConverter(endpoint, kafkaListener);
605+
}
606+
}
607+
598608
private void resolveErrorHandler(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener) {
599609
Object errorHandler = resolveExpression(kafkaListener.errorHandler());
600610
if (errorHandler instanceof KafkaListenerErrorHandler) {

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.time.Clock;
2323
import java.util.ArrayList;
2424
import java.util.Arrays;
25+
import java.util.Collections;
2526
import java.util.HashMap;
2627
import java.util.List;
2728
import java.util.Map;
@@ -53,9 +54,14 @@
5354
import org.springframework.kafka.core.KafkaTemplate;
5455
import org.springframework.kafka.core.ProducerFactory;
5556
import org.springframework.kafka.listener.ContainerProperties;
57+
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
5658
import org.springframework.kafka.support.KafkaHeaders;
5759
import org.springframework.kafka.test.EmbeddedKafkaBroker;
5860
import org.springframework.kafka.test.context.EmbeddedKafka;
61+
import org.springframework.messaging.Message;
62+
import org.springframework.messaging.converter.CompositeMessageConverter;
63+
import org.springframework.messaging.converter.GenericMessageConverter;
64+
import org.springframework.messaging.converter.SmartMessageConverter;
5965
import org.springframework.messaging.handler.annotation.Header;
6066
import org.springframework.retry.annotation.Backoff;
6167
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@@ -105,6 +111,8 @@ void shouldRetryFirstTopic() {
105111
kafkaTemplate.send(FIRST_TOPIC, "Testing topic 1");
106112
assertThat(awaitLatch(latchContainer.countDownLatch1)).isTrue();
107113
assertThat(awaitLatch(latchContainer.customDltCountdownLatch)).isTrue();
114+
assertThat(awaitLatch(latchContainer.customErrorHandlerCountdownLatch)).isTrue();
115+
assertThat(awaitLatch(latchContainer.customMessageConverterCountdownLatch)).isTrue();
108116
}
109117

110118
@Test
@@ -154,7 +162,8 @@ static class FirstTopicListener {
154162
@Autowired
155163
CountDownLatchContainer container;
156164

157-
@KafkaListener(id = "firstTopicId", topics = FIRST_TOPIC, containerFactory = MAIN_TOPIC_CONTAINER_FACTORY)
165+
@KafkaListener(id = "firstTopicId", topics = FIRST_TOPIC, containerFactory = MAIN_TOPIC_CONTAINER_FACTORY,
166+
errorHandler = "myCustomErrorHandler", contentTypeConverter = "myCustomMessageConverter")
158167
public void listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
159168
logger.debug("Message {} received in topic {}", message, receivedTopic);
160169
container.countDownLatch1.countDown();
@@ -256,6 +265,8 @@ static class CountDownLatchContainer {
256265
CountDownLatch countDownLatchDltOne = new CountDownLatch(1);
257266
CountDownLatch countDownLatchDltTwo = new CountDownLatch(1);
258267
CountDownLatch customDltCountdownLatch = new CountDownLatch(1);
268+
CountDownLatch customErrorHandlerCountdownLatch = new CountDownLatch(6);
269+
CountDownLatch customMessageConverterCountdownLatch = new CountDownLatch(6);
259270

260271
List<String> knownTopics = new ArrayList<>();
261272

@@ -334,6 +345,26 @@ public FirstTopicListener firstTopicListener() {
334345
return new FirstTopicListener();
335346
}
336347

348+
@Bean
349+
public KafkaListenerErrorHandler myCustomErrorHandler(CountDownLatchContainer container) {
350+
return (message, exception) -> {
351+
container.customErrorHandlerCountdownLatch.countDown();
352+
throw exception;
353+
};
354+
}
355+
356+
@Bean
357+
public SmartMessageConverter myCustomMessageConverter(CountDownLatchContainer container) {
358+
return new CompositeMessageConverter(Collections.singletonList(new GenericMessageConverter())) {
359+
360+
@Override
361+
public Object fromMessage(Message<?> message, Class<?> targetClass, Object conversionHint) {
362+
container.customMessageConverterCountdownLatch.countDown();
363+
return super.fromMessage(message, targetClass, conversionHint);
364+
}
365+
};
366+
}
367+
337368
@Bean
338369
public SecondTopicListener secondTopicListener() {
339370
return new SecondTopicListener();

0 commit comments

Comments
 (0)