Skip to content

Commit 09b063f

Browse files
spring-projectsGH-2116: Add blocking retries to RT
Before we hardcoded a no-ops back off in the DefaultErrorHandler used in the Retryable Topics feature. Adds a setter to let the user provide their own back off policy and configure blocking retries in conjunction with RT.
1 parent 6f63a57 commit 09b063f

File tree

3 files changed

+82
-2
lines changed

3 files changed

+82
-2
lines changed

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,7 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> templa
274274

275275
NOTE: The default is having no timeout set, which can also be achieved by providing -1 as the timout value.
276276

277+
[[retry-topic-exception-classifier]]
277278
===== Exception Classifier
278279

279280
You can specify which exceptions you want to retry on and which not to.
@@ -328,6 +329,37 @@ public DefaultDestinationTopicResolver topicResolver(ApplicationContext applicat
328329
NOTE: To disable fatal exceptions' classification, clear the default list using the `setClassifications` method in `DefaultDestinationTopicResolver`.
329330

330331

332+
[[retry-topic-combine-blocking]]
333+
===== Combine blocking and non-blocking retries
334+
335+
Starting in 2.8.3 you can configure the framework to use both blocking and non-blocking retries in conjunction.
336+
For example, you can have a set of exceptions that would likely trigger errors on the next records as well, such as `DatabaseAccessException`, so you can retry the same record a few times before sending it to the retry topic, or straight to the DLT.
337+
338+
You can configure the blocking retries as follows:
339+
340+
====
341+
[source, java]
342+
----
343+
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
344+
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
345+
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
346+
@Qualifier(RetryTopicInternalBeanNames
347+
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
348+
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
349+
lcfc.setBlockingRetriesBackOff(new FixedBackOff(50, 3));
350+
lcfc.setErrorHandlerCustomizer(commonErrorHandler -> ((DefaultErrorHandler) commonErrorHandler)
351+
.addNotRetryableExceptions(MyFatalException.class);
352+
return lcfc;
353+
}
354+
----
355+
====
356+
357+
NOTE: If you set a blocking retry back off, the default is to retry on all exceptions except the fatal ones in <<default-eh>>.
358+
You can add or remove exceptions using the `addNotRetryableException` and `removeNotRetryableException` methods in the `ListenerContainerFactoryConfigurer`.
359+
360+
NOTE: In combination with the global retryable topic's fatal classification, you can configure the framework for any behavior you'd like, such as having some exceptions trigger both blocking and non-blocking retries, trigger only one kind or the other, or go straight to the DLT without retries of any kind.
361+
362+
331363
===== Include and Exclude Topics
332364

333365
You can decide which topics will and will not be handled by a `RetryTopicConfiguration` bean via the .includeTopic(String topic), .includeTopics(Collection<String> topics) .excludeTopic(String topic) and .excludeTopics(Collection<String> topics) methods.

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
4444
import org.springframework.kafka.support.TopicPartitionOffset;
4545
import org.springframework.util.Assert;
46+
import org.springframework.util.backoff.BackOff;
4647
import org.springframework.util.backoff.FixedBackOff;
4748

4849
/**
@@ -82,6 +83,10 @@ public class ListenerContainerFactoryConfigurer {
8283

8384
private static final long LOWEST_BACKOFF_THRESHOLD = 1500L;
8485

86+
private static final BackOff DEFAULT_BLOCKING_BACKOFF = new FixedBackOff(0, 0);
87+
88+
private BackOff blockingBackOff = DEFAULT_BLOCKING_BACKOFF;
89+
8590
private Consumer<ConcurrentMessageListenerContainer<?, ?>> containerCustomizer = container -> {
8691
};
8792

@@ -159,6 +164,19 @@ public KafkaListenerContainerFactory<?> decorateFactoryWithoutBackOffValues(
159164
return new RetryTopicListenerContainerFactoryDecorator(factory, Collections.emptyList());
160165
}
161166

167+
/**
168+
* Set a {@link BackOff} to be used by blocking retries.
169+
* You can add and remove exceptions to be retried this way using this class'
170+
* superclass
171+
* {@link org.springframework.kafka.listener.ExceptionClassifier#addNotRetryableExceptions(Class[])}
172+
* and
173+
* {@link org.springframework.kafka.listener.ExceptionClassifier#removeNotRetryableException(Class)}}
174+
* @param blockingBackOff the BackOff policy to be used
175+
*/
176+
public void setBlockingRetriesBackOff(BackOff blockingBackOff) {
177+
this.blockingBackOff = blockingBackOff;
178+
}
179+
162180
private ConcurrentKafkaListenerContainerFactory<?, ?> doConfigure(
163181
ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, List<Long> backOffValues) {
164182

@@ -192,8 +210,7 @@ public void setErrorHandlerCustomizer(Consumer<CommonErrorHandler> errorHandlerC
192210
}
193211

194212
private CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
195-
DefaultErrorHandler errorHandler = new DefaultErrorHandler(deadLetterPublishingRecoverer,
196-
new FixedBackOff(0, 0));
213+
DefaultErrorHandler errorHandler = new DefaultErrorHandler(deadLetterPublishingRecoverer, this.blockingBackOff);
197214
errorHandler.setCommitRecovered(true);
198215
errorHandler.setLogLevel(KafkaException.Level.DEBUG);
199216
this.errorHandlerCustomizer.accept(errorHandler);

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.mockito.BDDMockito.given;
2424
import static org.mockito.BDDMockito.then;
2525
import static org.mockito.BDDMockito.willReturn;
26+
import static org.mockito.Mockito.mock;
2627
import static org.mockito.Mockito.never;
2728
import static org.mockito.Mockito.times;
2829

@@ -59,6 +60,8 @@
5960
import org.springframework.kafka.listener.adapter.AbstractDelegatingMessageListenerAdapter;
6061
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
6162
import org.springframework.kafka.support.Acknowledgment;
63+
import org.springframework.util.backoff.BackOff;
64+
import org.springframework.util.backoff.BackOffExecution;
6265

6366
/**
6467
* @author Tomaz Fernandes
@@ -408,6 +411,34 @@ void shouldDecorateFactory() {
408411
then(this.configurerContainerCustomizer).should(times(1)).accept(container);
409412
}
410413

414+
@Test
415+
void shouldUseGivenBackOff() {
416+
417+
// given
418+
given(container.getContainerProperties()).willReturn(containerProperties);
419+
given(deadLetterPublishingRecovererFactory.create()).willReturn(recoverer);
420+
given(containerProperties.getMessageListener()).willReturn(listener);
421+
given(configuration.forContainerFactoryConfigurer()).willReturn(lcfcConfiguration);
422+
willReturn(container).given(containerFactory).createListenerContainer(endpoint);
423+
BackOff backOffMock = mock(BackOff.class);
424+
BackOffExecution backOffExecutionMock = mock(BackOffExecution.class);
425+
given(backOffMock.start()).willReturn(backOffExecutionMock);
426+
427+
ListenerContainerFactoryConfigurer configurer =
428+
new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager,
429+
deadLetterPublishingRecovererFactory, clock);
430+
431+
configurer.setBlockingRetriesBackOff(backOffMock);
432+
433+
// when
434+
KafkaListenerContainerFactory<?> decoratedFactory =
435+
configurer.decorateFactory(this.containerFactory, configuration.forContainerFactoryConfigurer());
436+
decoratedFactory.createListenerContainer(endpoint);
437+
438+
// then
439+
then(backOffMock).should().start();
440+
}
441+
411442
@Test
412443
void shouldCacheFactoryInstances() {
413444

0 commit comments

Comments
 (0)