Skip to content

Commit 2647e63

Browse files
committed
spring-projectsGH-2478: Improve RetryTopicComponentFactory Conf
Resolves spring-projects#2478 Allow users to define a `RetryTopicComponentFactory` bean for use in infrastructure bean definitions. Previously, although `createComponentFactory()` can be overridden, it is called early in the class lifecycle and can't have any dependencies on Spring beans. Now, if the application context contains a unique bean of this type, it is used instead. **Cannot back port due to breaking API changes; work around for 2.9.x is to override one or more of `destinationTopicResolver`, `kafkaConsumerBackoffManager` and/or `retryTopicConfigurer` bean definition methods.**
1 parent 2f9a7f7 commit 2647e63

File tree

6 files changed

+98
-23
lines changed

6 files changed

+98
-23
lines changed

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ You can now configure multiple `@RetryableTopic` listeners on the same topic in
4949
Previously, this was not possible.
5050
See <<multi-retry>> for more information.
5151

52+
There are breaking API changes in `RetryTopicConfigurationSupport`; specifically, if you override the bean definition methods for `destinationTopicResolver`, `kafkaConsumerBackoffManager` and/or `retryTopicConfigurer`;
53+
these methods now require an `ObjectProvider<RetryTopicComponentFactory>` parameter.
54+
5255
[[x30-lc-changes]]
5356
==== Listener Container Changes
5457

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.springframework.beans.factory.ListableBeanFactory;
5252
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
5353
import org.springframework.beans.factory.ObjectFactory;
54+
import org.springframework.beans.factory.ObjectProvider;
5455
import org.springframework.beans.factory.SmartInitializingSingleton;
5556
import org.springframework.beans.factory.config.BeanExpressionContext;
5657
import org.springframework.beans.factory.config.BeanExpressionResolver;
@@ -89,6 +90,7 @@
8990
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
9091
import org.springframework.kafka.retrytopic.DestinationTopicResolver;
9192
import org.springframework.kafka.retrytopic.RetryTopicBeanNames;
93+
import org.springframework.kafka.retrytopic.RetryTopicComponentFactory;
9294
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
9395
import org.springframework.kafka.retrytopic.RetryTopicConfigurationSupport;
9496
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
@@ -549,7 +551,8 @@ private RetryTopicConfigurer createDefaultConfigurer() {
549551
RetryTopicConfigurationSupport rtcs = this.applicationContext.getBean(
550552
RetryTopicBeanNames.DEFAULT_RETRY_TOPIC_CONFIG_SUPPORT_BEAN_NAME,
551553
RetryTopicConfigurationSupport.class);
552-
DestinationTopicResolver destResolver = rtcs.destinationTopicResolver();
554+
ObjectProvider<RetryTopicComponentFactory> provider = gac.getBeanProvider(RetryTopicComponentFactory.class);
555+
DestinationTopicResolver destResolver = rtcs.destinationTopicResolver(provider);
553556
RetryTopicSchedulerWrapper schedW = gac.getBeanProvider(RetryTopicSchedulerWrapper.class).getIfUnique();
554557
TaskScheduler sched = gac.getBeanProvider(TaskScheduler.class).getIfUnique();
555558
if (schedW == null && sched == null) {
@@ -560,8 +563,8 @@ private RetryTopicConfigurer createDefaultConfigurer() {
560563
}
561564
KafkaConsumerBackoffManager bom =
562565
rtcs.kafkaConsumerBackoffManager(this.applicationContext, this.registrar.getEndpointRegistry(), // NOSONAR
563-
schedW, sched);
564-
RetryTopicConfigurer rtc = rtcs.retryTopicConfigurer(bom, destResolver, this.beanFactory);
566+
provider, schedW, sched);
567+
RetryTopicConfigurer rtc = rtcs.retryTopicConfigurer(bom, destResolver, provider, this.beanFactory);
565568

566569
gac.registerBean(RetryTopicBeanNames.DESTINATION_TOPIC_RESOLVER_BEAN_NAME, DestinationTopicResolver.class,
567570
() -> destResolver);

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupport.java

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import org.springframework.beans.BeansException;
3030
import org.springframework.beans.factory.BeanFactory;
31+
import org.springframework.beans.factory.ObjectProvider;
3132
import org.springframework.beans.factory.SmartInitializingSingleton;
3233
import org.springframework.beans.factory.annotation.Qualifier;
3334
import org.springframework.context.ApplicationContext;
@@ -104,6 +105,7 @@ public void afterSingletonsInstantiated() {
104105
* To configure it, consider overriding the {@link #configureRetryTopicConfigurer()}.
105106
* @param kafkaConsumerBackoffManager the global {@link KafkaConsumerBackoffManager}.
106107
* @param destinationTopicResolver the global {@link DestinationTopicResolver}.
108+
* @param componentFactoryProvider the component factory provider.
107109
* @param beanFactory the {@link BeanFactory}.
108110
* @return the instance.
109111
* @see KafkaListenerAnnotationBeanPostProcessor
@@ -113,24 +115,26 @@ public RetryTopicConfigurer retryTopicConfigurer(@Qualifier(KafkaListenerConfigU
113115
KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
114116
@Qualifier(RetryTopicBeanNames.DESTINATION_TOPIC_RESOLVER_BEAN_NAME)
115117
DestinationTopicResolver destinationTopicResolver,
118+
ObjectProvider<RetryTopicComponentFactory> componentFactoryProvider,
116119
BeanFactory beanFactory) {
117120

118-
DestinationTopicProcessor destinationTopicProcessor = this.componentFactory
121+
RetryTopicComponentFactory compFactory = componentFactoryProvider.getIfUnique(() -> this.componentFactory);
122+
DestinationTopicProcessor destinationTopicProcessor = compFactory
119123
.destinationTopicProcessor(destinationTopicResolver);
120-
DeadLetterPublishingRecovererFactory dlprf = this.componentFactory
124+
DeadLetterPublishingRecovererFactory dlprf = compFactory
121125
.deadLetterPublishingRecovererFactory(destinationTopicResolver);
122-
ListenerContainerFactoryConfigurer lcfc = this.componentFactory
126+
ListenerContainerFactoryConfigurer lcfc = compFactory
123127
.listenerContainerFactoryConfigurer(kafkaConsumerBackoffManager,
124-
dlprf, this.componentFactory.internalRetryTopicClock());
125-
ListenerContainerFactoryResolver factoryResolver = this.componentFactory
128+
dlprf, compFactory.internalRetryTopicClock());
129+
ListenerContainerFactoryResolver factoryResolver = compFactory
126130
.listenerContainerFactoryResolver(beanFactory);
127131
RetryTopicNamesProviderFactory retryTopicNamesProviderFactory =
128-
this.componentFactory.retryTopicNamesProviderFactory();
132+
compFactory.retryTopicNamesProviderFactory();
129133

130134
processDeadLetterPublishingContainerFactory(dlprf);
131135
processListenerContainerFactoryConfigurer(lcfc);
132136

133-
RetryTopicConfigurer retryTopicConfigurer = this.componentFactory
137+
RetryTopicConfigurer retryTopicConfigurer = compFactory
134138
.retryTopicConfigurer(destinationTopicProcessor, lcfc,
135139
factoryResolver, retryTopicNamesProviderFactory);
136140

@@ -255,11 +259,15 @@ protected void configureCustomizers(CustomizersConfigurer customizersConfigurer)
255259
* <li>{@link #configureDestinationTopicResolver} to further customize the component.
256260
* <li>{@link #createComponentFactory} to provide a subclass instance.
257261
* </ul>
262+
* @param componentFactoryProvider the component factory provider.
258263
* @return the instance.
259264
*/
260265
@Bean(name = RetryTopicBeanNames.DESTINATION_TOPIC_RESOLVER_BEAN_NAME)
261-
public DestinationTopicResolver destinationTopicResolver() {
262-
DestinationTopicResolver destinationTopicResolver = this.componentFactory.destinationTopicResolver();
266+
public DestinationTopicResolver destinationTopicResolver(
267+
ObjectProvider<RetryTopicComponentFactory> componentFactoryProvider) {
268+
269+
RetryTopicComponentFactory compFactory = componentFactoryProvider.getIfUnique(() -> this.componentFactory);
270+
DestinationTopicResolver destinationTopicResolver = compFactory.destinationTopicResolver();
263271
JavaUtils.INSTANCE.acceptIfInstanceOf(DefaultDestinationTopicResolver.class, destinationTopicResolver,
264272
this::configureNonBlockingFatalExceptions);
265273
Consumer<DestinationTopicResolver> resolverConsumer = configureDestinationTopicResolver();
@@ -294,18 +302,22 @@ protected Consumer<DestinationTopicResolver> configureDestinationTopicResolver()
294302
* @param applicationContext the application context.
295303
* @param registry the {@link ListenerContainerRegistry} to be used to fetch the
296304
* {@link MessageListenerContainer} at runtime to be backed off.
305+
* @param componentFactoryProvider the component factory provider.
297306
* @param wrapper a {@link RetryTopicSchedulerWrapper}.
298307
* @param taskScheduler a {@link TaskScheduler}.
299308
* @return the instance.
300309
*/
301310
@Bean(name = KafkaListenerConfigUtils.KAFKA_CONSUMER_BACK_OFF_MANAGER_BEAN_NAME)
302311
public KafkaConsumerBackoffManager kafkaConsumerBackoffManager(ApplicationContext applicationContext,
303312
@Qualifier(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
304-
ListenerContainerRegistry registry, @Nullable RetryTopicSchedulerWrapper wrapper,
313+
ListenerContainerRegistry registry,
314+
ObjectProvider<RetryTopicComponentFactory> componentFactoryProvider,
315+
@Nullable RetryTopicSchedulerWrapper wrapper,
305316
@Nullable TaskScheduler taskScheduler) {
306317

318+
RetryTopicComponentFactory compFactory = componentFactoryProvider.getIfUnique(() -> this.componentFactory);
307319
KafkaBackOffManagerFactory backOffManagerFactory =
308-
this.componentFactory.kafkaBackOffManagerFactory(registry, applicationContext);
320+
compFactory.kafkaBackOffManagerFactory(registry, applicationContext);
309321
JavaUtils.INSTANCE.acceptIfInstanceOf(ContainerPartitionPausingBackOffManagerFactory.class, backOffManagerFactory,
310322
factory -> configurePartitionPausingFactory(factory, registry,
311323
wrapper != null ? wrapper.getScheduler() : taskScheduler));

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationIntegrationTests.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package org.springframework.kafka.retrytopic;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.Mockito.spy;
21+
import static org.mockito.Mockito.verify;
2022

2123
import java.util.List;
2224
import java.util.Map;
@@ -63,7 +65,8 @@ class RetryTopicConfigurationIntegrationTests {
6365

6466
@Test
6567
void includeTopic(@Autowired EmbeddedKafkaBroker broker, @Autowired ConsumerFactory<Integer, String> cf,
66-
@Autowired KafkaTemplate<Integer, String> template, @Autowired Config config) throws InterruptedException {
68+
@Autowired KafkaTemplate<Integer, String> template, @Autowired Config config,
69+
@Autowired RetryTopicComponentFactory componentFactory) throws InterruptedException {
6770

6871
Consumer<Integer, String> consumer = cf.createConsumer("grp2", "");
6972
Map<String, List<PartitionInfo>> topics = consumer.listTopics();
@@ -72,6 +75,7 @@ void includeTopic(@Autowired EmbeddedKafkaBroker broker, @Autowired ConsumerFact
7275
"RetryTopicConfigurationIntegrationTests.1-retry-110");
7376
template.send(TOPIC1, "foo");
7477
assertThat(config.latch.await(10, TimeUnit.SECONDS)).isTrue();
78+
verify(componentFactory).destinationTopicResolver();
7579
}
7680

7781
@Configuration(proxyBeanMethods = false)
@@ -89,6 +93,11 @@ void dlt(String in) {
8993
this.latch.countDown();
9094
}
9195

96+
@Bean
97+
RetryTopicComponentFactory componentFactory() {
98+
return spy(new RetryTopicComponentFactory());
99+
}
100+
92101
@Bean
93102
KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(KafkaTemplate<Integer, String> template,
94103
ConsumerFactory<Integer, String> consumerFactory) {

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupportTests.java

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.assertj.core.api.Assertions.assertThatThrownBy;
21+
import static org.mockito.ArgumentMatchers.any;
2122
import static org.mockito.ArgumentMatchers.eq;
2223
import static org.mockito.BDDMockito.given;
2324
import static org.mockito.BDDMockito.then;
25+
import static org.mockito.BDDMockito.willAnswer;
2426
import static org.mockito.Mockito.mock;
2527
import static org.mockito.Mockito.spy;
2628
import static org.mockito.Mockito.verify;
@@ -37,6 +39,7 @@
3739

3840
import org.springframework.beans.DirectFieldAccessor;
3941
import org.springframework.beans.factory.BeanFactory;
42+
import org.springframework.beans.factory.ObjectProvider;
4043
import org.springframework.context.ApplicationContext;
4144
import org.springframework.core.log.LogAccessor;
4245
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
@@ -125,7 +128,14 @@ protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetrie
125128
}
126129
};
127130

128-
RetryTopicConfigurer retryTopicConfigurer = support.retryTopicConfigurer(backoffManager, resolver, beanFactory);
131+
@SuppressWarnings("unchecked")
132+
ObjectProvider<RetryTopicComponentFactory> prov = mock(ObjectProvider.class);
133+
willAnswer(inv -> {
134+
Supplier<RetryTopicComponentFactory> sup = inv.getArgument(0);
135+
return sup.get();
136+
}).given(prov).getIfUnique(any());
137+
RetryTopicConfigurer retryTopicConfigurer = support.retryTopicConfigurer(backoffManager, resolver,
138+
prov, beanFactory);
129139
assertThat(retryTopicConfigurer).isNotNull();
130140

131141
then(componentFactory).should().destinationTopicProcessor(resolver);
@@ -153,7 +163,14 @@ void testRetryTopicConfigurerNoConfiguration() {
153163
DestinationTopicResolver resolver = mock(DestinationTopicResolver.class);
154164
BeanFactory beanFactory = mock(BeanFactory.class);
155165
RetryTopicConfigurationSupport support = new RetryTopicConfigurationSupport();
156-
RetryTopicConfigurer retryTopicConfigurer = support.retryTopicConfigurer(backoffManager, resolver, beanFactory);
166+
@SuppressWarnings("unchecked")
167+
ObjectProvider<RetryTopicComponentFactory> prov = mock(ObjectProvider.class);
168+
willAnswer(inv -> {
169+
Supplier<RetryTopicComponentFactory> sup = inv.getArgument(0);
170+
return sup.get();
171+
}).given(prov).getIfUnique(any());
172+
RetryTopicConfigurer retryTopicConfigurer = support.retryTopicConfigurer(backoffManager, resolver, prov,
173+
beanFactory);
157174
assertThat(retryTopicConfigurer).isNotNull();
158175
}
159176

@@ -177,7 +194,13 @@ protected RetryTopicComponentFactory createComponentFactory() {
177194
}
178195

179196
};
180-
KafkaConsumerBackoffManager backoffManager = support.kafkaConsumerBackoffManager(ctx, registry, null,
197+
@SuppressWarnings("unchecked")
198+
ObjectProvider<RetryTopicComponentFactory> prov = mock(ObjectProvider.class);
199+
willAnswer(inv -> {
200+
Supplier<RetryTopicComponentFactory> sup = inv.getArgument(0);
201+
return sup.get();
202+
}).given(prov).getIfUnique(any());
203+
KafkaConsumerBackoffManager backoffManager = support.kafkaConsumerBackoffManager(ctx, registry, prov, null,
181204
taskSchedulerMock);
182205
assertThat(backoffManager).isEqualTo(backoffManagerMock);
183206
then(componentFactory).should().kafkaBackOffManagerFactory(registry, ctx);
@@ -190,7 +213,13 @@ void testCreateBackOffManagerNoConfiguration() {
190213
TaskScheduler scheduler = mock(TaskScheduler.class);
191214
ApplicationContext ctx = mock(ApplicationContext.class);
192215
RetryTopicConfigurationSupport support = new RetryTopicConfigurationSupport();
193-
KafkaConsumerBackoffManager backoffManager = support.kafkaConsumerBackoffManager(ctx, registry, null,
216+
@SuppressWarnings("unchecked")
217+
ObjectProvider<RetryTopicComponentFactory> prov = mock(ObjectProvider.class);
218+
willAnswer(inv -> {
219+
Supplier<RetryTopicComponentFactory> sup = inv.getArgument(0);
220+
return sup.get();
221+
}).given(prov).getIfUnique(any());
222+
KafkaConsumerBackoffManager backoffManager = support.kafkaConsumerBackoffManager(ctx, registry, prov, null,
194223
scheduler);
195224
assertThat(backoffManager).isNotNull();
196225
}
@@ -218,7 +247,14 @@ protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>>
218247
nonBlockingRetries.remove(ConversionException.class);
219248
}
220249
};
221-
DefaultDestinationTopicResolver resolver = (DefaultDestinationTopicResolver) support.destinationTopicResolver();
250+
@SuppressWarnings("unchecked")
251+
ObjectProvider<RetryTopicComponentFactory> prov = mock(ObjectProvider.class);
252+
willAnswer(inv -> {
253+
Supplier<RetryTopicComponentFactory> sup = inv.getArgument(0);
254+
return sup.get();
255+
}).given(prov).getIfUnique(any());
256+
DefaultDestinationTopicResolver resolver = (DefaultDestinationTopicResolver) support
257+
.destinationTopicResolver(prov);
222258
assertThat(resolver).isEqualTo(resolverMock);
223259
then(dtrConsumer).should().accept(resolverMock);
224260
ArgumentCaptor<Map<Class<? extends Throwable>, Boolean>> captor = ArgumentCaptor.forClass(Map.class);
@@ -229,8 +265,13 @@ protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>>
229265
@Test
230266
void testCreateDestinationTopicResolverNoConfiguration() {
231267
RetryTopicConfigurationSupport support = new RetryTopicConfigurationSupport();
232-
DestinationTopicResolver resolver = support.destinationTopicResolver();
268+
@SuppressWarnings("unchecked")
269+
ObjectProvider<RetryTopicComponentFactory> prov = mock(ObjectProvider.class);
270+
RetryTopicComponentFactory factory = spy(new RetryTopicComponentFactory());
271+
given(prov.getIfUnique(any())).willReturn(factory);
272+
DestinationTopicResolver resolver = support.destinationTopicResolver(prov);
233273
assertThat(resolver).isNotNull();
274+
verify(factory).destinationTopicResolver();
234275
}
235276

236277
@Test
@@ -257,5 +298,4 @@ void twoSupports() {
257298
assertThat(captor.getValue().get()).isEqualTo("Only one RetryTopicConfigurationSupport object expected, found "
258299
+ "[foo, bar]; this may result in unexpected behavior");
259300
}
260-
261301
}

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicSameContainerFactoryIntegrationTests.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.assertj.core.api.Assertions.fail;
21+
import static org.mockito.Mockito.spy;
22+
import static org.mockito.Mockito.verify;
2123

2224
import java.util.HashMap;
2325
import java.util.Map;
@@ -82,7 +84,7 @@ public class RetryTopicSameContainerFactoryIntegrationTests {
8284
private CountDownLatchContainer latchContainer;
8385

8486
@Test
85-
void shouldRetryFirstAndSecondTopics() {
87+
void shouldRetryFirstAndSecondTopics(@Autowired RetryTopicComponentFactory componentFactory) {
8688
logger.debug("Sending message to topic " + FIRST_TOPIC);
8789
sendKafkaTemplate.send(FIRST_TOPIC, "Testing topic 1");
8890
logger.debug("Sending message to topic " + SECOND_TOPIC);
@@ -91,6 +93,7 @@ void shouldRetryFirstAndSecondTopics() {
9193
assertThat(awaitLatch(latchContainer.countDownLatchDltOne)).isTrue();
9294
assertThat(awaitLatch(latchContainer.countDownLatch2)).isTrue();
9395
assertThat(awaitLatch(latchContainer.customizerLatch)).isTrue();
96+
verify(componentFactory).destinationTopicResolver();
9497
}
9598

9699
private boolean awaitLatch(CountDownLatch latch) {
@@ -240,6 +243,11 @@ TaskScheduler sched() {
240243
return new ThreadPoolTaskScheduler();
241244
}
242245

246+
@Bean
247+
RetryTopicComponentFactory componentFactory() {
248+
return spy(new RetryTopicComponentFactory());
249+
}
250+
243251
}
244252

245253
}

0 commit comments

Comments
 (0)