Skip to content

GH-2471: RetryableTopic Default Replication Factor #2473

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion spring-kafka-docs/src/main/asciidoc/retrytopic.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ IMPORTANT: When using this configuration approach, the `@EnableKafkaRetryTopic`
Use the simple `@EnableKafka` annotation instead.

When `autoCreateTopics` is true, the main and retry topics will be created with the specified number of partitions and replication factor.
Starting with version 3.0, the default replication factor is `-1`, meaning use the broker default.
If your broker version is earlier than 2.4, you will need to set an explicit value.
To override these values for a particular topic (e.g. the main topic or DLT), simply add a `NewTopic` `@Bean` with the required properties; that will override the auto creation properties.

IMPORTANT: By default, records are published to the retry topic(s) using the original partition of the received record.
Expand Down Expand Up @@ -546,6 +548,8 @@ NOTE: The default behavior is to include all topics.

Unless otherwise specified the framework will auto create the required topics using `NewTopic` beans that are consumed by the `KafkaAdmin` bean.
You can specify the number of partitions and the replication factor with which the topics will be created, and you can turn this feature off.
Starting with version 3.0, the default replication factor is `-1`, meaning use the broker default.
If your broker version is earlier than 2.4, you will need to set an explicit value.

IMPORTANT: Note that if you're not using Spring Boot you'll have to provide a KafkaAdmin bean in order to use this feature.

Expand Down Expand Up @@ -584,7 +588,8 @@ public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo>
----
====

NOTE: By default the topics are autocreated with one partition and a replication factor of one.
NOTE: By default the topics are autocreated with one partition and a replication factor of -1 (meaning use the broker default).
If your broker version is earlier than 2.4, you will need to set an explicit value.

[[retry-headers]]
===== Failure Header Management
Expand Down
3 changes: 3 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ You can now set a different `concurrency` for the retry containers; by default,

See <<retry-config>> for more information.

The default replication factor for the retry topics is now `-1` (use broker default).
If your broker is earlier that version 2.4, you will now need to explicitly set the property.

You can now configure multiple `@RetryableTopic` listeners on the same topic in the same application context.
Previously, this was not possible.
See <<multi-retry>> for more information.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,13 @@

/**
* The replication factor for the automatically created topics. Expressions must
* resolve to a short or a String that can be parsed as such.
* resolve to a short or a String that can be parsed as such. Default is -1 to use the
* broker default if the broker is earlier than version 2.4, an explicit value is
* required.
*
* @return the replication factor.
*/
String replicationFactor() default "1";
String replicationFactor() default "-1";

/**
* The exception types that should be retried.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,13 @@ static class TopicCreation {
TopicCreation() {
this.shouldCreateTopics = true;
this.numPartitions = 1;
this.replicationFactor = 1;
this.replicationFactor = -1;
}

TopicCreation(boolean shouldCreateTopics) {
this.shouldCreateTopics = shouldCreateTopics;
this.numPartitions = 1;
this.replicationFactor = 1;
this.replicationFactor = -1;
}

public int getNumPartitions() {
Expand All @@ -130,4 +130,5 @@ public boolean shouldCreateTopics() {
return this.shouldCreateTopics;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,8 @@ public RetryTopicConfigurationBuilder doNotAutoCreateRetryTopics() {
* Configure the topic creation behavior to auto create topics with the provided
* properties.
* @param numPartitions the number of partitions.
* @param replicationFactor the replication factor.
* @param replicationFactor the replication factor (-1 to use the broker default if the
* broker is version 2.4 or later).
* @return the builder.
*/
public RetryTopicConfigurationBuilder autoCreateTopicsWith(int numPartitions, short replicationFactor) {
Expand All @@ -421,7 +422,8 @@ public RetryTopicConfigurationBuilder autoCreateTopicsWith(int numPartitions, sh
* properties.
* @param shouldCreate true to auto create.
* @param numPartitions the number of partitions.
* @param replicationFactor the replication factor.
* @param replicationFactor the replication factor (-1 to use the broker default if the
* broker is version 2.4 or later).
* @return the builder.
*/
public RetryTopicConfigurationBuilder autoCreateTopics(boolean shouldCreate, int numPartitions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.util.ReflectionTestUtils;

/**
Expand Down Expand Up @@ -59,6 +60,8 @@ void shouldExcludeTopics() {

// then
assertThat(configuration.hasConfigurationForTopics(topicNames)).isFalse();
assertThat(KafkaTestUtils.getPropertyValue(builder, "topicCreationConfiguration.replicationFactor"))
.isEqualTo((short) -1);
}

@Test
Expand Down