diff --git a/spring-kafka-docs/src/main/asciidoc/whats-new.adoc b/spring-kafka-docs/src/main/asciidoc/whats-new.adoc index fbf82bcc76..c078a927c9 100644 --- a/spring-kafka-docs/src/main/asciidoc/whats-new.adoc +++ b/spring-kafka-docs/src/main/asciidoc/whats-new.adoc @@ -49,6 +49,9 @@ You can now configure multiple `@RetryableTopic` listeners on the same topic in Previously, this was not possible. See <> for more information. +There are breaking API changes in `RetryTopicConfigurationSupport`; specifically, if you override the bean definition methods for `destinationTopicResolver`, `kafkaConsumerBackoffManager` and/or `retryTopicConfigurer`; +these methods now require an `ObjectProvider` parameter. + [[x30-lc-changes]] ==== Listener Container Changes diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index 9eb012c3aa..a350826264 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -51,6 +51,7 @@ import org.springframework.beans.factory.ListableBeanFactory; import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.beans.factory.ObjectFactory; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.beans.factory.config.BeanExpressionContext; import org.springframework.beans.factory.config.BeanExpressionResolver; @@ -89,6 +90,7 @@ import org.springframework.kafka.listener.adapter.RecordFilterStrategy; import org.springframework.kafka.retrytopic.DestinationTopicResolver; import org.springframework.kafka.retrytopic.RetryTopicBeanNames; +import org.springframework.kafka.retrytopic.RetryTopicComponentFactory; import org.springframework.kafka.retrytopic.RetryTopicConfiguration; import org.springframework.kafka.retrytopic.RetryTopicConfigurationSupport; import org.springframework.kafka.retrytopic.RetryTopicConfigurer; @@ -549,7 +551,8 @@ private RetryTopicConfigurer createDefaultConfigurer() { RetryTopicConfigurationSupport rtcs = this.applicationContext.getBean( RetryTopicBeanNames.DEFAULT_RETRY_TOPIC_CONFIG_SUPPORT_BEAN_NAME, RetryTopicConfigurationSupport.class); - DestinationTopicResolver destResolver = rtcs.destinationTopicResolver(); + ObjectProvider provider = gac.getBeanProvider(RetryTopicComponentFactory.class); + DestinationTopicResolver destResolver = rtcs.destinationTopicResolver(provider); RetryTopicSchedulerWrapper schedW = gac.getBeanProvider(RetryTopicSchedulerWrapper.class).getIfUnique(); TaskScheduler sched = gac.getBeanProvider(TaskScheduler.class).getIfUnique(); if (schedW == null && sched == null) { @@ -560,8 +563,8 @@ private RetryTopicConfigurer createDefaultConfigurer() { } KafkaConsumerBackoffManager bom = rtcs.kafkaConsumerBackoffManager(this.applicationContext, this.registrar.getEndpointRegistry(), // NOSONAR - schedW, sched); - RetryTopicConfigurer rtc = rtcs.retryTopicConfigurer(bom, destResolver, this.beanFactory); + provider, schedW, sched); + RetryTopicConfigurer rtc = rtcs.retryTopicConfigurer(bom, destResolver, provider, this.beanFactory); gac.registerBean(RetryTopicBeanNames.DESTINATION_TOPIC_RESOLVER_BEAN_NAME, DestinationTopicResolver.class, () -> destResolver); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupport.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupport.java index 8641228a49..4e79a851c8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupport.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupport.java @@ -28,6 +28,7 @@ import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationContext; @@ -104,6 +105,7 @@ public void afterSingletonsInstantiated() { * To configure it, consider overriding the {@link #configureRetryTopicConfigurer()}. * @param kafkaConsumerBackoffManager the global {@link KafkaConsumerBackoffManager}. * @param destinationTopicResolver the global {@link DestinationTopicResolver}. + * @param componentFactoryProvider the component factory provider. * @param beanFactory the {@link BeanFactory}. * @return the instance. * @see KafkaListenerAnnotationBeanPostProcessor @@ -113,24 +115,26 @@ public RetryTopicConfigurer retryTopicConfigurer(@Qualifier(KafkaListenerConfigU KafkaConsumerBackoffManager kafkaConsumerBackoffManager, @Qualifier(RetryTopicBeanNames.DESTINATION_TOPIC_RESOLVER_BEAN_NAME) DestinationTopicResolver destinationTopicResolver, + ObjectProvider componentFactoryProvider, BeanFactory beanFactory) { - DestinationTopicProcessor destinationTopicProcessor = this.componentFactory + RetryTopicComponentFactory compFactory = componentFactoryProvider.getIfUnique(() -> this.componentFactory); + DestinationTopicProcessor destinationTopicProcessor = compFactory .destinationTopicProcessor(destinationTopicResolver); - DeadLetterPublishingRecovererFactory dlprf = this.componentFactory + DeadLetterPublishingRecovererFactory dlprf = compFactory .deadLetterPublishingRecovererFactory(destinationTopicResolver); - ListenerContainerFactoryConfigurer lcfc = this.componentFactory + ListenerContainerFactoryConfigurer lcfc = compFactory .listenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, - dlprf, this.componentFactory.internalRetryTopicClock()); - ListenerContainerFactoryResolver factoryResolver = this.componentFactory + dlprf, compFactory.internalRetryTopicClock()); + ListenerContainerFactoryResolver factoryResolver = compFactory .listenerContainerFactoryResolver(beanFactory); RetryTopicNamesProviderFactory retryTopicNamesProviderFactory = - this.componentFactory.retryTopicNamesProviderFactory(); + compFactory.retryTopicNamesProviderFactory(); processDeadLetterPublishingContainerFactory(dlprf); processListenerContainerFactoryConfigurer(lcfc); - RetryTopicConfigurer retryTopicConfigurer = this.componentFactory + RetryTopicConfigurer retryTopicConfigurer = compFactory .retryTopicConfigurer(destinationTopicProcessor, lcfc, factoryResolver, retryTopicNamesProviderFactory); @@ -255,11 +259,15 @@ protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) *
  • {@link #configureDestinationTopicResolver} to further customize the component. *
  • {@link #createComponentFactory} to provide a subclass instance. * + * @param componentFactoryProvider the component factory provider. * @return the instance. */ @Bean(name = RetryTopicBeanNames.DESTINATION_TOPIC_RESOLVER_BEAN_NAME) - public DestinationTopicResolver destinationTopicResolver() { - DestinationTopicResolver destinationTopicResolver = this.componentFactory.destinationTopicResolver(); + public DestinationTopicResolver destinationTopicResolver( + ObjectProvider componentFactoryProvider) { + + RetryTopicComponentFactory compFactory = componentFactoryProvider.getIfUnique(() -> this.componentFactory); + DestinationTopicResolver destinationTopicResolver = compFactory.destinationTopicResolver(); JavaUtils.INSTANCE.acceptIfInstanceOf(DefaultDestinationTopicResolver.class, destinationTopicResolver, this::configureNonBlockingFatalExceptions); Consumer resolverConsumer = configureDestinationTopicResolver(); @@ -294,6 +302,7 @@ protected Consumer configureDestinationTopicResolver() * @param applicationContext the application context. * @param registry the {@link ListenerContainerRegistry} to be used to fetch the * {@link MessageListenerContainer} at runtime to be backed off. + * @param componentFactoryProvider the component factory provider. * @param wrapper a {@link RetryTopicSchedulerWrapper}. * @param taskScheduler a {@link TaskScheduler}. * @return the instance. @@ -301,11 +310,14 @@ protected Consumer configureDestinationTopicResolver() @Bean(name = KafkaListenerConfigUtils.KAFKA_CONSUMER_BACK_OFF_MANAGER_BEAN_NAME) public KafkaConsumerBackoffManager kafkaConsumerBackoffManager(ApplicationContext applicationContext, @Qualifier(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME) - ListenerContainerRegistry registry, @Nullable RetryTopicSchedulerWrapper wrapper, + ListenerContainerRegistry registry, + ObjectProvider componentFactoryProvider, + @Nullable RetryTopicSchedulerWrapper wrapper, @Nullable TaskScheduler taskScheduler) { + RetryTopicComponentFactory compFactory = componentFactoryProvider.getIfUnique(() -> this.componentFactory); KafkaBackOffManagerFactory backOffManagerFactory = - this.componentFactory.kafkaBackOffManagerFactory(registry, applicationContext); + compFactory.kafkaBackOffManagerFactory(registry, applicationContext); JavaUtils.INSTANCE.acceptIfInstanceOf(ContainerPartitionPausingBackOffManagerFactory.class, backOffManagerFactory, factory -> configurePartitionPausingFactory(factory, registry, wrapper != null ? wrapper.getScheduler() : taskScheduler)); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationIntegrationTests.java index 1023e5196d..a74136923e 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationIntegrationTests.java @@ -17,6 +17,8 @@ package org.springframework.kafka.retrytopic; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import java.util.List; import java.util.Map; @@ -63,7 +65,8 @@ class RetryTopicConfigurationIntegrationTests { @Test void includeTopic(@Autowired EmbeddedKafkaBroker broker, @Autowired ConsumerFactory cf, - @Autowired KafkaTemplate template, @Autowired Config config) throws InterruptedException { + @Autowired KafkaTemplate template, @Autowired Config config, + @Autowired RetryTopicComponentFactory componentFactory) throws InterruptedException { Consumer consumer = cf.createConsumer("grp2", ""); Map> topics = consumer.listTopics(); @@ -72,6 +75,7 @@ void includeTopic(@Autowired EmbeddedKafkaBroker broker, @Autowired ConsumerFact "RetryTopicConfigurationIntegrationTests.1-retry-110"); template.send(TOPIC1, "foo"); assertThat(config.latch.await(10, TimeUnit.SECONDS)).isTrue(); + verify(componentFactory).destinationTopicResolver(); } @Configuration(proxyBeanMethods = false) @@ -89,6 +93,11 @@ void dlt(String in) { this.latch.countDown(); } + @Bean + RetryTopicComponentFactory componentFactory() { + return spy(new RetryTopicComponentFactory()); + } + @Bean KafkaListenerContainerFactory kafkaListenerContainerFactory(KafkaTemplate template, ConsumerFactory consumerFactory) { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupportTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupportTests.java index fd953c7371..c633b470e8 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupportTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupportTests.java @@ -18,9 +18,11 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.then; +import static org.mockito.BDDMockito.willAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -37,6 +39,7 @@ import org.springframework.beans.DirectFieldAccessor; import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.context.ApplicationContext; import org.springframework.core.log.LogAccessor; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; @@ -125,7 +128,14 @@ protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetrie } }; - RetryTopicConfigurer retryTopicConfigurer = support.retryTopicConfigurer(backoffManager, resolver, beanFactory); + @SuppressWarnings("unchecked") + ObjectProvider prov = mock(ObjectProvider.class); + willAnswer(inv -> { + Supplier sup = inv.getArgument(0); + return sup.get(); + }).given(prov).getIfUnique(any()); + RetryTopicConfigurer retryTopicConfigurer = support.retryTopicConfigurer(backoffManager, resolver, + prov, beanFactory); assertThat(retryTopicConfigurer).isNotNull(); then(componentFactory).should().destinationTopicProcessor(resolver); @@ -153,7 +163,14 @@ void testRetryTopicConfigurerNoConfiguration() { DestinationTopicResolver resolver = mock(DestinationTopicResolver.class); BeanFactory beanFactory = mock(BeanFactory.class); RetryTopicConfigurationSupport support = new RetryTopicConfigurationSupport(); - RetryTopicConfigurer retryTopicConfigurer = support.retryTopicConfigurer(backoffManager, resolver, beanFactory); + @SuppressWarnings("unchecked") + ObjectProvider prov = mock(ObjectProvider.class); + willAnswer(inv -> { + Supplier sup = inv.getArgument(0); + return sup.get(); + }).given(prov).getIfUnique(any()); + RetryTopicConfigurer retryTopicConfigurer = support.retryTopicConfigurer(backoffManager, resolver, prov, + beanFactory); assertThat(retryTopicConfigurer).isNotNull(); } @@ -177,7 +194,13 @@ protected RetryTopicComponentFactory createComponentFactory() { } }; - KafkaConsumerBackoffManager backoffManager = support.kafkaConsumerBackoffManager(ctx, registry, null, + @SuppressWarnings("unchecked") + ObjectProvider prov = mock(ObjectProvider.class); + willAnswer(inv -> { + Supplier sup = inv.getArgument(0); + return sup.get(); + }).given(prov).getIfUnique(any()); + KafkaConsumerBackoffManager backoffManager = support.kafkaConsumerBackoffManager(ctx, registry, prov, null, taskSchedulerMock); assertThat(backoffManager).isEqualTo(backoffManagerMock); then(componentFactory).should().kafkaBackOffManagerFactory(registry, ctx); @@ -190,7 +213,13 @@ void testCreateBackOffManagerNoConfiguration() { TaskScheduler scheduler = mock(TaskScheduler.class); ApplicationContext ctx = mock(ApplicationContext.class); RetryTopicConfigurationSupport support = new RetryTopicConfigurationSupport(); - KafkaConsumerBackoffManager backoffManager = support.kafkaConsumerBackoffManager(ctx, registry, null, + @SuppressWarnings("unchecked") + ObjectProvider prov = mock(ObjectProvider.class); + willAnswer(inv -> { + Supplier sup = inv.getArgument(0); + return sup.get(); + }).given(prov).getIfUnique(any()); + KafkaConsumerBackoffManager backoffManager = support.kafkaConsumerBackoffManager(ctx, registry, prov, null, scheduler); assertThat(backoffManager).isNotNull(); } @@ -218,7 +247,14 @@ protected void manageNonBlockingFatalExceptions(List> nonBlockingRetries.remove(ConversionException.class); } }; - DefaultDestinationTopicResolver resolver = (DefaultDestinationTopicResolver) support.destinationTopicResolver(); + @SuppressWarnings("unchecked") + ObjectProvider prov = mock(ObjectProvider.class); + willAnswer(inv -> { + Supplier sup = inv.getArgument(0); + return sup.get(); + }).given(prov).getIfUnique(any()); + DefaultDestinationTopicResolver resolver = (DefaultDestinationTopicResolver) support + .destinationTopicResolver(prov); assertThat(resolver).isEqualTo(resolverMock); then(dtrConsumer).should().accept(resolverMock); ArgumentCaptor, Boolean>> captor = ArgumentCaptor.forClass(Map.class); @@ -229,8 +265,13 @@ protected void manageNonBlockingFatalExceptions(List> @Test void testCreateDestinationTopicResolverNoConfiguration() { RetryTopicConfigurationSupport support = new RetryTopicConfigurationSupport(); - DestinationTopicResolver resolver = support.destinationTopicResolver(); + @SuppressWarnings("unchecked") + ObjectProvider prov = mock(ObjectProvider.class); + RetryTopicComponentFactory factory = spy(new RetryTopicComponentFactory()); + given(prov.getIfUnique(any())).willReturn(factory); + DestinationTopicResolver resolver = support.destinationTopicResolver(prov); assertThat(resolver).isNotNull(); + verify(factory).destinationTopicResolver(); } @Test @@ -257,5 +298,4 @@ void twoSupports() { assertThat(captor.getValue().get()).isEqualTo("Only one RetryTopicConfigurationSupport object expected, found " + "[foo, bar]; this may result in unexpected behavior"); } - } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicSameContainerFactoryIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicSameContainerFactoryIntegrationTests.java index 4d70ffff1a..2e7d442364 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicSameContainerFactoryIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicSameContainerFactoryIntegrationTests.java @@ -18,6 +18,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import java.util.HashMap; import java.util.Map; @@ -82,7 +84,7 @@ public class RetryTopicSameContainerFactoryIntegrationTests { private CountDownLatchContainer latchContainer; @Test - void shouldRetryFirstAndSecondTopics() { + void shouldRetryFirstAndSecondTopics(@Autowired RetryTopicComponentFactory componentFactory) { logger.debug("Sending message to topic " + FIRST_TOPIC); sendKafkaTemplate.send(FIRST_TOPIC, "Testing topic 1"); logger.debug("Sending message to topic " + SECOND_TOPIC); @@ -91,6 +93,7 @@ void shouldRetryFirstAndSecondTopics() { assertThat(awaitLatch(latchContainer.countDownLatchDltOne)).isTrue(); assertThat(awaitLatch(latchContainer.countDownLatch2)).isTrue(); assertThat(awaitLatch(latchContainer.customizerLatch)).isTrue(); + verify(componentFactory).destinationTopicResolver(); } private boolean awaitLatch(CountDownLatch latch) { @@ -240,6 +243,11 @@ TaskScheduler sched() { return new ThreadPoolTaskScheduler(); } + @Bean + RetryTopicComponentFactory componentFactory() { + return spy(new RetryTopicComponentFactory()); + } + } }