Skip to content

Commit 7998944

Browse files
GH-2116: Add blocking retries to RT
Before we hardcoded a no-ops back off in the DefaultErrorHandler used in the Retryable Topics feature. Adds a setter to let the user provide their own back off policy and configure blocking retries in conjunction with RT.
1 parent c27de92 commit 7998944

File tree

2 files changed

+62
-0
lines changed

2 files changed

+62
-0
lines changed

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,37 @@ public DefaultDestinationTopicResolver topicResolver(ApplicationContext applicat
329329
NOTE: To disable fatal exceptions' classification, clear the default list using the `setClassifications` method in `DefaultDestinationTopicResolver`.
330330

331331

332+
[[retry-topic-combine-blocking]]
333+
===== Combine blocking and non-blocking retries
334+
335+
Starting in 2.8.3 you can configure the framework to use both blocking and non-blocking retries in conjunction.
336+
For example, you can have a set of exceptions that would likely trigger errors on the next records as well, such as `DatabaseAccessException`, so you can retry the same record a few times before sending it to the retry topic, or straight to the DLT.
337+
338+
You can configure the blocking retries as follows:
339+
340+
====
341+
[source, java]
342+
----
343+
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
344+
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
345+
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
346+
@Qualifier(RetryTopicInternalBeanNames
347+
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
348+
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
349+
lcfc.setBlockingRetriesBackOff(new FixedBackOff(50, 3));
350+
lcfc.setErrorHandlerCustomizer(commonErrorHandler -> ((DefaultErrorHandler) commonErrorHandler)
351+
.addNotRetryableExceptions(MyFatalException.class);
352+
return lcfc;
353+
}
354+
----
355+
====
356+
357+
NOTE: If you set a blocking retry back off, the default is to retry on all exceptions except the fatal ones in <<default-eh>>.
358+
You can add or remove exceptions using the `addNotRetryableException` and `removeNotRetryableException` methods in the `ListenerContainerFactoryConfigurer`.
359+
360+
NOTE: In combination with the global retryable topic's fatal classification, you can configure the framework for any behavior you'd like, such as having some exceptions trigger both blocking and non-blocking retries, trigger only one kind or the other, or go straight to the DLT without retries of any kind.
361+
362+
332363
===== Include and Exclude Topics
333364

334365
You can decide which topics will and will not be handled by a `RetryTopicConfiguration` bean via the .includeTopic(String topic), .includeTopics(Collection<String> topics) .excludeTopic(String topic) and .excludeTopics(Collection<String> topics) methods.

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.mockito.BDDMockito.given;
2424
import static org.mockito.BDDMockito.then;
2525
import static org.mockito.BDDMockito.willReturn;
26+
import static org.mockito.Mockito.mock;
2627
import static org.mockito.Mockito.never;
2728
import static org.mockito.Mockito.times;
2829

@@ -59,6 +60,8 @@
5960
import org.springframework.kafka.listener.adapter.AbstractDelegatingMessageListenerAdapter;
6061
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
6162
import org.springframework.kafka.support.Acknowledgment;
63+
import org.springframework.util.backoff.BackOff;
64+
import org.springframework.util.backoff.BackOffExecution;
6265

6366
/**
6467
* @author Tomaz Fernandes
@@ -408,6 +411,34 @@ void shouldDecorateFactory() {
408411
then(this.configurerContainerCustomizer).should(times(1)).accept(container);
409412
}
410413

414+
@Test
415+
void shouldUseGivenBackOff() {
416+
417+
// given
418+
given(container.getContainerProperties()).willReturn(containerProperties);
419+
given(deadLetterPublishingRecovererFactory.create()).willReturn(recoverer);
420+
given(containerProperties.getMessageListener()).willReturn(listener);
421+
given(configuration.forContainerFactoryConfigurer()).willReturn(lcfcConfiguration);
422+
willReturn(container).given(containerFactory).createListenerContainer(endpoint);
423+
BackOff backOffMock = mock(BackOff.class);
424+
BackOffExecution backOffExecutionMock = mock(BackOffExecution.class);
425+
given(backOffMock.start()).willReturn(backOffExecutionMock);
426+
427+
ListenerContainerFactoryConfigurer configurer =
428+
new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager,
429+
deadLetterPublishingRecovererFactory, clock);
430+
431+
configurer.setBlockingRetriesBackOff(backOffMock);
432+
433+
// when
434+
KafkaListenerContainerFactory<?> decoratedFactory =
435+
configurer.decorateFactory(this.containerFactory, configuration.forContainerFactoryConfigurer());
436+
decoratedFactory.createListenerContainer(endpoint);
437+
438+
// then
439+
then(backOffMock).should().start();
440+
}
441+
411442
@Test
412443
void shouldCacheFactoryInstances() {
413444

0 commit comments

Comments
 (0)