Skip to content

Commit 7ecef8b

Browse files
authored
GH-3052: RetryableTopic SITRS default changes
Fixes: #3052 * Change `RetryableTopic#SameIntervalTopicReuseStrategy` default behavior to SINGLE_TOPIC. * Change unit test for SameIntervalTopicReuseStrategy. * Add doc.
1 parent 3927d64 commit 7ecef8b

File tree

8 files changed

+46
-33
lines changed

8 files changed

+46
-33
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/how-the-pattern-works.adoc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ IMPORTANT: You can set the `AckMode` mode you prefer, but `RECORD` is suggested.
1616
IMPORTANT: At this time this functionality doesn't support class level `@KafkaListener` annotations.
1717

1818
When using a manual `AckMode` with `asyncAcks` set to true, the `DefaultErrorHandler` must be configured with `seekAfterError` set to `false`.
19-
Starting with versions 2.9.10, 3.0.8, this will be set to true unconditionally for such configurations.
20-
With earlier versions, it was necessary to override the `RetryConfigurationSupport.configureCustomizers()` method to set the property to `true`.
19+
Starting with versions 2.9.10, 3.0.8, this will be set to `false` unconditionally for such configurations.
20+
With earlier versions, it was necessary to override the `RetryConfigurationSupport.configureCustomizers()` method to set the property to `false`.
2121

2222
[source, java]
2323
----

spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/topic-naming.adoc

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -111,35 +111,35 @@ This "final" retry topic will be suffixed with the provided or default suffix, a
111111

112112
NOTE: By opting to use a single topic for the retries with the `maxInterval` delay, it may become more viable to configure an exponential retry policy that keeps retrying for a long time, because in this approach you do not need a large amount of topics.
113113

114-
The default behavior is to work with the number of retry topics equal to the configured `maxAttempts` minus 1 and, when using exponential backoff, the retry topics are suffixed with the delay values, with the last retry topic (corresponding to the `maxInterval` delay) being suffixed with an additional index.
114+
Starting 3.2, the default behavior is reuses the retry topic for the same intervals, when using exponential backoff, the retry topics are suffixed with the delay values, with the last retry topic reuses for the same intervals(corresponding to the `maxInterval` delay).
115115

116116
For instance, when configuring the exponential backoff with `initialInterval=1_000`, `multiplier=2`, and `maxInterval=16_000`, in order to keep trying for one hour, one would need to configure `maxAttempts` as 229, and by default the needed retry topics would be:
117117

118118
* -retry-1000
119119
* -retry-2000
120120
* -retry-4000
121121
* -retry-8000
122-
* -retry-16000-0
123-
* -retry-16000-1
124-
* -retry-16000-2
125-
* ...
126-
* -retry-16000-224
122+
* -retry-16000
127123

128-
When using the strategy that reuses the retry topic for the same intervals, in the same configuration above the needed retry topics would be:
124+
When using the strategy that work with the number of retry topics equal to the configured `maxAttempts` minus 1, the last retry topic (corresponding to the `maxInterval` delay) being suffixed with an additional index would be:
129125

130126
* -retry-1000
131127
* -retry-2000
132128
* -retry-4000
133129
* -retry-8000
134-
* -retry-16000
130+
* -retry-16000-0
131+
* -retry-16000-1
132+
* -retry-16000-2
133+
* ...
134+
* -retry-16000-224
135135

136-
This will be the default in a future release.
136+
If multiple topics are required, then that can be done using the following configuration.
137137

138138
[source, java]
139139
----
140140
@RetryableTopic(attempts = 230,
141141
backoff = @Backoff(delay = 1_000, multiplier = 2, maxDelay = 16_000),
142-
sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC)
142+
sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS)
143143
@KafkaListener(topics = "my-annotated-topic")
144144
public void processMessage(MyPojo message) {
145145
// ... message processing

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,9 @@ Deprecating the `transactionManager` property in `ContainerProperties` in favor
4545
=== After Rollback Processing
4646

4747
A new `AfterRollbackProcessor` API `processBatch` is provided.
48-
See xref:kafka/annotation-error-handling.adoc#after-rollback[After-rollback Processor] for more information.
48+
See xref:kafka/annotation-error-handling.adoc#after-rollback[After-rollback Processor] for more information.
49+
50+
[[x32-retry-topic]]
51+
=== Change @RetryableTopic SameIntervalTopicReuseStrategy default value
52+
Change `@RetryableTopic` property `SameIntervalTopicReuseStrategy` default value to `SINGLE_TOPIC`.
53+
See xref:retrytopic/topic-naming.adoc#single-topic-maxinterval-delay[Single Topic for maxInterval Exponential Delay].

spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
* @author Fabio da Silva Jr.
4242
* @author João Lima
4343
* @author Adrian Chlebosz
44+
* @author Wang Zhiyang
45+
*
4446
* @since 2.7
4547
*
4648
* @see org.springframework.kafka.retrytopic.RetryTopicConfigurer
@@ -191,10 +193,12 @@
191193

192194
/**
193195
* Topic reuse strategy for sequential attempts made with a same backoff interval.
196+
* Starting 3.2, change default behavior to {@code SameIntervalTopicReuseStrategy.SINGLE_TOPIC}.
197+
*
194198
* @return the strategy.
195199
* @since 3.0.4
196200
*/
197-
SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy() default SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS;
201+
SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy() default SameIntervalTopicReuseStrategy.SINGLE_TOPIC;
198202

199203
/**
200204
* Whether or not create a DLT, and redeliver to the DLT if delivery fails or just give up.

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2023 the original author or authors.
2+
* Copyright 2021-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -65,6 +65,8 @@
6565

6666
/**
6767
* @author Tomaz Fernandes
68+
* @author Wang Zhiyang
69+
*
6870
* @since 2.8.4
6971
*/
7072
@SpringJUnitConfig
@@ -247,9 +249,7 @@ static class FrameworkFatalTopicListener {
247249
@Autowired
248250
CountDownLatchContainer container;
249251

250-
@SuppressWarnings("deprecation")
251-
@RetryableTopic(sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC,
252-
backoff = @Backoff(50))
252+
@RetryableTopic(backoff = @Backoff(50))
253253
@KafkaListener(topics = FRAMEWORK_FATAL_EXCEPTION_TOPIC)
254254
public void listenWithAnnotation(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
255255
container.fatalFrameworkLatch.countDown();

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2023 the original author or authors.
2+
* Copyright 2021-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -384,6 +384,7 @@ static class FourthTopicListener {
384384
CountDownLatchContainer container;
385385

386386
@RetryableTopic(dltStrategy = DltStrategy.NO_DLT, attempts = "4", backoff = @Backoff(300),
387+
sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS,
387388
kafkaTemplate = "${kafka.template}")
388389
@KafkaListener(topics = FOURTH_TOPIC, containerFactory = MAIN_TOPIC_CONTAINER_FACTORY)
389390
public void listenNoDlt(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
@@ -410,6 +411,7 @@ static class FifthTopicListener1 {
410411
numPartitions = "2",
411412
retryTopicSuffix = "-listener1", dltTopicSuffix = "-listener1-dlt",
412413
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
414+
sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS,
413415
kafkaTemplate = "${kafka.template}")
414416
@KafkaListener(id = "fifthTopicId1", topicPartitions = {@TopicPartition(topic = TWO_LISTENERS_TOPIC,
415417
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))},
@@ -442,6 +444,7 @@ static class FifthTopicListener2 {
442444
numPartitions = "2",
443445
retryTopicSuffix = "-listener2", dltTopicSuffix = "-listener2-dlt",
444446
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
447+
sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS,
445448
kafkaTemplate = "${kafka.template}")
446449
@KafkaListener(id = "fifthTopicId2", topicPartitions = {@TopicPartition(topic = TWO_LISTENERS_TOPIC,
447450
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "0"))},
@@ -468,7 +471,8 @@ static class SixthTopicDefaultDLTListener {
468471
@Autowired
469472
CountDownLatchContainer container;
470473

471-
@RetryableTopic(attempts = "4", backoff = @Backoff(50))
474+
@RetryableTopic(attempts = "4", backoff = @Backoff(50),
475+
sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS)
472476
@KafkaListener(id = "manual", topics = MANUAL_TOPIC, containerFactory = MAIN_TOPIC_CONTAINER_FACTORY)
473477
public void listenNoDlt(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic,
474478
@SuppressWarnings("unused") Acknowledgment ack) {
@@ -511,8 +515,7 @@ static class FirstReuseRetryTopicListener {
511515
@Autowired
512516
CountDownLatchContainer container;
513517

514-
@RetryableTopic(attempts = "2", backoff = @Backoff(50),
515-
sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC)
518+
@RetryableTopic(attempts = "2", backoff = @Backoff(50))
516519
@KafkaListener(id = "reuseRetry1", topics = FIRST_REUSE_RETRY_TOPIC,
517520
containerFactory = "retryTopicListenerContainerFactory")
518521
public void listen1(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
@@ -532,8 +535,7 @@ static class SecondReuseRetryTopicListener {
532535
@Autowired
533536
CountDownLatchContainer container;
534537

535-
@RetryableTopic(attempts = "5", backoff = @Backoff(delay = 30, maxDelay = 100, multiplier = 2),
536-
sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC)
538+
@RetryableTopic(attempts = "5", backoff = @Backoff(delay = 30, maxDelay = 100, multiplier = 2))
537539
@KafkaListener(id = "reuseRetry2", topics = SECOND_REUSE_RETRY_TOPIC,
538540
containerFactory = "retryTopicListenerContainerFactory")
539541
public void listen2(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
@@ -553,8 +555,7 @@ static class ThirdReuseRetryTopicListener {
553555
@Autowired
554556
CountDownLatchContainer container;
555557

556-
@RetryableTopic(attempts = "5", backoff = @Backoff(delay = 1, maxDelay = 5, multiplier = 1.4),
557-
sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC)
558+
@RetryableTopic(attempts = "5", backoff = @Backoff(delay = 1, maxDelay = 5, multiplier = 1.4))
558559
@KafkaListener(id = "reuseRetry3", topics = THIRD_REUSE_RETRY_TOPIC,
559560
containerFactory = "retryTopicListenerContainerFactory")
560561
public void listen3(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicSameContainerFactoryIntegrationTests.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2023 the original author or authors.
2+
* Copyright 2021-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -62,6 +62,8 @@
6262
/**
6363
* @author Tomaz Fernandes
6464
* @author Cenk Akin
65+
* @author Wang Zhiyang
66+
*
6567
* @since 2.8.3
6668
*/
6769
@SpringJUnitConfig
@@ -121,7 +123,8 @@ static class FirstRetryableKafkaListener {
121123
attempts = "4",
122124
backoff = @Backoff(delay = 1000, multiplier = 2.0),
123125
autoCreateTopics = "false",
124-
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
126+
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
127+
sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS)
125128
@KafkaListener(topics = RetryTopicSameContainerFactoryIntegrationTests.FIRST_TOPIC)
126129
public void listen(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
127130
countDownLatchContainer.countDownLatchFirstRetryable.countDown();
@@ -142,7 +145,7 @@ static class SecondRetryableKafkaListener {
142145
@Autowired
143146
CountDownLatchContainer countDownLatchContainer;
144147

145-
@RetryableTopic
148+
@RetryableTopic(sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS)
146149
@KafkaListener(topics = RetryTopicSameContainerFactoryIntegrationTests.SECOND_TOPIC)
147150
public void listen(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
148151
countDownLatchContainer.countDownLatchSecondRetryable.countDown();

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
* @author Tomaz Fernandes
5353
* @author Gary Russell
5454
* @author Adrian Chlebosz
55+
* @author Wang Zhiyang
56+
*
5557
* @since 2.7
5658
*/
5759
@SuppressWarnings("deprecation")
@@ -314,15 +316,13 @@ void shouldCreateFixedBackoff() {
314316

315317
// then
316318
List<DestinationTopic.Properties> destinationTopicProperties = configuration.getDestinationTopicProperties();
319+
assertThat(destinationTopicProperties.size()).isEqualTo(3);
317320
DestinationTopic destinationTopic = new DestinationTopic("", destinationTopicProperties.get(0));
318321
assertThat(destinationTopic.getDestinationDelay()).isEqualTo(0);
319322
DestinationTopic destinationTopic2 = new DestinationTopic("", destinationTopicProperties.get(1));
320323
assertThat(destinationTopic2.getDestinationDelay()).isEqualTo(1000);
321324
DestinationTopic destinationTopic3 = new DestinationTopic("", destinationTopicProperties.get(2));
322-
assertThat(destinationTopic3.getDestinationDelay()).isEqualTo(1000);
323-
DestinationTopic destinationTopic4 = new DestinationTopic("", destinationTopicProperties.get(3));
324-
assertThat(destinationTopic4.getDestinationDelay()).isEqualTo(0);
325-
325+
assertThat(destinationTopic3.getDestinationDelay()).isEqualTo(0);
326326
}
327327

328328
@Test

0 commit comments

Comments
 (0)