Skip to content

GH-2478: Improve RetryTopicComponentFactory Conf #2480

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ You can now configure multiple `@RetryableTopic` listeners on the same topic in
Previously, this was not possible.
See <<multi-retry>> for more information.

There are breaking API changes in `RetryTopicConfigurationSupport`; specifically, if you override the bean definition methods for `destinationTopicResolver`, `kafkaConsumerBackoffManager` and/or `retryTopicConfigurer`;
these methods now require an `ObjectProvider<RetryTopicComponentFactory>` parameter.

[[x30-lc-changes]]
==== Listener Container Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.springframework.beans.factory.ListableBeanFactory;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.ObjectFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.config.BeanExpressionContext;
import org.springframework.beans.factory.config.BeanExpressionResolver;
Expand Down Expand Up @@ -89,6 +90,7 @@
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.retrytopic.DestinationTopicResolver;
import org.springframework.kafka.retrytopic.RetryTopicBeanNames;
import org.springframework.kafka.retrytopic.RetryTopicComponentFactory;
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
import org.springframework.kafka.retrytopic.RetryTopicConfigurationSupport;
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
Expand Down Expand Up @@ -549,7 +551,8 @@ private RetryTopicConfigurer createDefaultConfigurer() {
RetryTopicConfigurationSupport rtcs = this.applicationContext.getBean(
RetryTopicBeanNames.DEFAULT_RETRY_TOPIC_CONFIG_SUPPORT_BEAN_NAME,
RetryTopicConfigurationSupport.class);
DestinationTopicResolver destResolver = rtcs.destinationTopicResolver();
ObjectProvider<RetryTopicComponentFactory> provider = gac.getBeanProvider(RetryTopicComponentFactory.class);
DestinationTopicResolver destResolver = rtcs.destinationTopicResolver(provider);
RetryTopicSchedulerWrapper schedW = gac.getBeanProvider(RetryTopicSchedulerWrapper.class).getIfUnique();
TaskScheduler sched = gac.getBeanProvider(TaskScheduler.class).getIfUnique();
if (schedW == null && sched == null) {
Expand All @@ -560,8 +563,8 @@ private RetryTopicConfigurer createDefaultConfigurer() {
}
KafkaConsumerBackoffManager bom =
rtcs.kafkaConsumerBackoffManager(this.applicationContext, this.registrar.getEndpointRegistry(), // NOSONAR
schedW, sched);
RetryTopicConfigurer rtc = rtcs.retryTopicConfigurer(bom, destResolver, this.beanFactory);
provider, schedW, sched);
RetryTopicConfigurer rtc = rtcs.retryTopicConfigurer(bom, destResolver, provider, this.beanFactory);

gac.registerBean(RetryTopicBeanNames.DESTINATION_TOPIC_RESOLVER_BEAN_NAME, DestinationTopicResolver.class,
() -> destResolver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
Expand Down Expand Up @@ -104,6 +105,7 @@ public void afterSingletonsInstantiated() {
* To configure it, consider overriding the {@link #configureRetryTopicConfigurer()}.
* @param kafkaConsumerBackoffManager the global {@link KafkaConsumerBackoffManager}.
* @param destinationTopicResolver the global {@link DestinationTopicResolver}.
* @param componentFactoryProvider the component factory provider.
* @param beanFactory the {@link BeanFactory}.
* @return the instance.
* @see KafkaListenerAnnotationBeanPostProcessor
Expand All @@ -113,24 +115,26 @@ public RetryTopicConfigurer retryTopicConfigurer(@Qualifier(KafkaListenerConfigU
KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
@Qualifier(RetryTopicBeanNames.DESTINATION_TOPIC_RESOLVER_BEAN_NAME)
DestinationTopicResolver destinationTopicResolver,
ObjectProvider<RetryTopicComponentFactory> componentFactoryProvider,
BeanFactory beanFactory) {

DestinationTopicProcessor destinationTopicProcessor = this.componentFactory
RetryTopicComponentFactory compFactory = componentFactoryProvider.getIfUnique(() -> this.componentFactory);
DestinationTopicProcessor destinationTopicProcessor = compFactory
.destinationTopicProcessor(destinationTopicResolver);
DeadLetterPublishingRecovererFactory dlprf = this.componentFactory
DeadLetterPublishingRecovererFactory dlprf = compFactory
.deadLetterPublishingRecovererFactory(destinationTopicResolver);
ListenerContainerFactoryConfigurer lcfc = this.componentFactory
ListenerContainerFactoryConfigurer lcfc = compFactory
.listenerContainerFactoryConfigurer(kafkaConsumerBackoffManager,
dlprf, this.componentFactory.internalRetryTopicClock());
ListenerContainerFactoryResolver factoryResolver = this.componentFactory
dlprf, compFactory.internalRetryTopicClock());
ListenerContainerFactoryResolver factoryResolver = compFactory
.listenerContainerFactoryResolver(beanFactory);
RetryTopicNamesProviderFactory retryTopicNamesProviderFactory =
this.componentFactory.retryTopicNamesProviderFactory();
compFactory.retryTopicNamesProviderFactory();

processDeadLetterPublishingContainerFactory(dlprf);
processListenerContainerFactoryConfigurer(lcfc);

RetryTopicConfigurer retryTopicConfigurer = this.componentFactory
RetryTopicConfigurer retryTopicConfigurer = compFactory
.retryTopicConfigurer(destinationTopicProcessor, lcfc,
factoryResolver, retryTopicNamesProviderFactory);

Expand Down Expand Up @@ -255,11 +259,15 @@ protected void configureCustomizers(CustomizersConfigurer customizersConfigurer)
* <li>{@link #configureDestinationTopicResolver} to further customize the component.
* <li>{@link #createComponentFactory} to provide a subclass instance.
* </ul>
* @param componentFactoryProvider the component factory provider.
* @return the instance.
*/
@Bean(name = RetryTopicBeanNames.DESTINATION_TOPIC_RESOLVER_BEAN_NAME)
public DestinationTopicResolver destinationTopicResolver() {
DestinationTopicResolver destinationTopicResolver = this.componentFactory.destinationTopicResolver();
public DestinationTopicResolver destinationTopicResolver(
ObjectProvider<RetryTopicComponentFactory> componentFactoryProvider) {

RetryTopicComponentFactory compFactory = componentFactoryProvider.getIfUnique(() -> this.componentFactory);
DestinationTopicResolver destinationTopicResolver = compFactory.destinationTopicResolver();
JavaUtils.INSTANCE.acceptIfInstanceOf(DefaultDestinationTopicResolver.class, destinationTopicResolver,
this::configureNonBlockingFatalExceptions);
Consumer<DestinationTopicResolver> resolverConsumer = configureDestinationTopicResolver();
Expand Down Expand Up @@ -294,18 +302,22 @@ protected Consumer<DestinationTopicResolver> configureDestinationTopicResolver()
* @param applicationContext the application context.
* @param registry the {@link ListenerContainerRegistry} to be used to fetch the
* {@link MessageListenerContainer} at runtime to be backed off.
* @param componentFactoryProvider the component factory provider.
* @param wrapper a {@link RetryTopicSchedulerWrapper}.
* @param taskScheduler a {@link TaskScheduler}.
* @return the instance.
*/
@Bean(name = KafkaListenerConfigUtils.KAFKA_CONSUMER_BACK_OFF_MANAGER_BEAN_NAME)
public KafkaConsumerBackoffManager kafkaConsumerBackoffManager(ApplicationContext applicationContext,
@Qualifier(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
ListenerContainerRegistry registry, @Nullable RetryTopicSchedulerWrapper wrapper,
ListenerContainerRegistry registry,
ObjectProvider<RetryTopicComponentFactory> componentFactoryProvider,
@Nullable RetryTopicSchedulerWrapper wrapper,
@Nullable TaskScheduler taskScheduler) {

RetryTopicComponentFactory compFactory = componentFactoryProvider.getIfUnique(() -> this.componentFactory);
KafkaBackOffManagerFactory backOffManagerFactory =
this.componentFactory.kafkaBackOffManagerFactory(registry, applicationContext);
compFactory.kafkaBackOffManagerFactory(registry, applicationContext);
JavaUtils.INSTANCE.acceptIfInstanceOf(ContainerPartitionPausingBackOffManagerFactory.class, backOffManagerFactory,
factory -> configurePartitionPausingFactory(factory, registry,
wrapper != null ? wrapper.getScheduler() : taskScheduler));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.springframework.kafka.retrytopic;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -63,7 +65,8 @@ class RetryTopicConfigurationIntegrationTests {

@Test
void includeTopic(@Autowired EmbeddedKafkaBroker broker, @Autowired ConsumerFactory<Integer, String> cf,
@Autowired KafkaTemplate<Integer, String> template, @Autowired Config config) throws InterruptedException {
@Autowired KafkaTemplate<Integer, String> template, @Autowired Config config,
@Autowired RetryTopicComponentFactory componentFactory) throws InterruptedException {

Consumer<Integer, String> consumer = cf.createConsumer("grp2", "");
Map<String, List<PartitionInfo>> topics = consumer.listTopics();
Expand All @@ -72,6 +75,7 @@ void includeTopic(@Autowired EmbeddedKafkaBroker broker, @Autowired ConsumerFact
"RetryTopicConfigurationIntegrationTests.1-retry-110");
template.send(TOPIC1, "foo");
assertThat(config.latch.await(10, TimeUnit.SECONDS)).isTrue();
verify(componentFactory).destinationTopicResolver();
}

@Configuration(proxyBeanMethods = false)
Expand All @@ -89,6 +93,11 @@ void dlt(String in) {
this.latch.countDown();
}

@Bean
RetryTopicComponentFactory componentFactory() {
return spy(new RetryTopicComponentFactory());
}

@Bean
KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(KafkaTemplate<Integer, String> template,
ConsumerFactory<Integer, String> consumerFactory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
Expand All @@ -37,6 +39,7 @@

import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.context.ApplicationContext;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
Expand Down Expand Up @@ -125,7 +128,14 @@ protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetrie
}
};

RetryTopicConfigurer retryTopicConfigurer = support.retryTopicConfigurer(backoffManager, resolver, beanFactory);
@SuppressWarnings("unchecked")
ObjectProvider<RetryTopicComponentFactory> prov = mock(ObjectProvider.class);
willAnswer(inv -> {
Supplier<RetryTopicComponentFactory> sup = inv.getArgument(0);
return sup.get();
}).given(prov).getIfUnique(any());
RetryTopicConfigurer retryTopicConfigurer = support.retryTopicConfigurer(backoffManager, resolver,
prov, beanFactory);
assertThat(retryTopicConfigurer).isNotNull();

then(componentFactory).should().destinationTopicProcessor(resolver);
Expand Down Expand Up @@ -153,7 +163,14 @@ void testRetryTopicConfigurerNoConfiguration() {
DestinationTopicResolver resolver = mock(DestinationTopicResolver.class);
BeanFactory beanFactory = mock(BeanFactory.class);
RetryTopicConfigurationSupport support = new RetryTopicConfigurationSupport();
RetryTopicConfigurer retryTopicConfigurer = support.retryTopicConfigurer(backoffManager, resolver, beanFactory);
@SuppressWarnings("unchecked")
ObjectProvider<RetryTopicComponentFactory> prov = mock(ObjectProvider.class);
willAnswer(inv -> {
Supplier<RetryTopicComponentFactory> sup = inv.getArgument(0);
return sup.get();
}).given(prov).getIfUnique(any());
RetryTopicConfigurer retryTopicConfigurer = support.retryTopicConfigurer(backoffManager, resolver, prov,
beanFactory);
assertThat(retryTopicConfigurer).isNotNull();
}

Expand All @@ -177,7 +194,13 @@ protected RetryTopicComponentFactory createComponentFactory() {
}

};
KafkaConsumerBackoffManager backoffManager = support.kafkaConsumerBackoffManager(ctx, registry, null,
@SuppressWarnings("unchecked")
ObjectProvider<RetryTopicComponentFactory> prov = mock(ObjectProvider.class);
willAnswer(inv -> {
Supplier<RetryTopicComponentFactory> sup = inv.getArgument(0);
return sup.get();
}).given(prov).getIfUnique(any());
KafkaConsumerBackoffManager backoffManager = support.kafkaConsumerBackoffManager(ctx, registry, prov, null,
taskSchedulerMock);
assertThat(backoffManager).isEqualTo(backoffManagerMock);
then(componentFactory).should().kafkaBackOffManagerFactory(registry, ctx);
Expand All @@ -190,7 +213,13 @@ void testCreateBackOffManagerNoConfiguration() {
TaskScheduler scheduler = mock(TaskScheduler.class);
ApplicationContext ctx = mock(ApplicationContext.class);
RetryTopicConfigurationSupport support = new RetryTopicConfigurationSupport();
KafkaConsumerBackoffManager backoffManager = support.kafkaConsumerBackoffManager(ctx, registry, null,
@SuppressWarnings("unchecked")
ObjectProvider<RetryTopicComponentFactory> prov = mock(ObjectProvider.class);
willAnswer(inv -> {
Supplier<RetryTopicComponentFactory> sup = inv.getArgument(0);
return sup.get();
}).given(prov).getIfUnique(any());
KafkaConsumerBackoffManager backoffManager = support.kafkaConsumerBackoffManager(ctx, registry, prov, null,
scheduler);
assertThat(backoffManager).isNotNull();
}
Expand Down Expand Up @@ -218,7 +247,14 @@ protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>>
nonBlockingRetries.remove(ConversionException.class);
}
};
DefaultDestinationTopicResolver resolver = (DefaultDestinationTopicResolver) support.destinationTopicResolver();
@SuppressWarnings("unchecked")
ObjectProvider<RetryTopicComponentFactory> prov = mock(ObjectProvider.class);
willAnswer(inv -> {
Supplier<RetryTopicComponentFactory> sup = inv.getArgument(0);
return sup.get();
}).given(prov).getIfUnique(any());
DefaultDestinationTopicResolver resolver = (DefaultDestinationTopicResolver) support
.destinationTopicResolver(prov);
assertThat(resolver).isEqualTo(resolverMock);
then(dtrConsumer).should().accept(resolverMock);
ArgumentCaptor<Map<Class<? extends Throwable>, Boolean>> captor = ArgumentCaptor.forClass(Map.class);
Expand All @@ -229,8 +265,13 @@ protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>>
@Test
void testCreateDestinationTopicResolverNoConfiguration() {
RetryTopicConfigurationSupport support = new RetryTopicConfigurationSupport();
DestinationTopicResolver resolver = support.destinationTopicResolver();
@SuppressWarnings("unchecked")
ObjectProvider<RetryTopicComponentFactory> prov = mock(ObjectProvider.class);
RetryTopicComponentFactory factory = spy(new RetryTopicComponentFactory());
given(prov.getIfUnique(any())).willReturn(factory);
DestinationTopicResolver resolver = support.destinationTopicResolver(prov);
assertThat(resolver).isNotNull();
verify(factory).destinationTopicResolver();
}

@Test
Expand All @@ -257,5 +298,4 @@ void twoSupports() {
assertThat(captor.getValue().get()).isEqualTo("Only one RetryTopicConfigurationSupport object expected, found "
+ "[foo, bar]; this may result in unexpected behavior");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -82,7 +84,7 @@ public class RetryTopicSameContainerFactoryIntegrationTests {
private CountDownLatchContainer latchContainer;

@Test
void shouldRetryFirstAndSecondTopics() {
void shouldRetryFirstAndSecondTopics(@Autowired RetryTopicComponentFactory componentFactory) {
logger.debug("Sending message to topic " + FIRST_TOPIC);
sendKafkaTemplate.send(FIRST_TOPIC, "Testing topic 1");
logger.debug("Sending message to topic " + SECOND_TOPIC);
Expand All @@ -91,6 +93,7 @@ void shouldRetryFirstAndSecondTopics() {
assertThat(awaitLatch(latchContainer.countDownLatchDltOne)).isTrue();
assertThat(awaitLatch(latchContainer.countDownLatch2)).isTrue();
assertThat(awaitLatch(latchContainer.customizerLatch)).isTrue();
verify(componentFactory).destinationTopicResolver();
}

private boolean awaitLatch(CountDownLatch latch) {
Expand Down Expand Up @@ -240,6 +243,11 @@ TaskScheduler sched() {
return new ThreadPoolTaskScheduler();
}

@Bean
RetryTopicComponentFactory componentFactory() {
return spy(new RetryTopicComponentFactory());
}

}

}