Skip to content

Commit 3769c24

Browse files
authored
GH-3557: Adjust the replication factor for transactions topic on @embeddedkafka
Fixes: #3557 #3557 * Adjust the replication factor for the transaction state topic on `EmbeddedKafka` based on the broker count in `EmbeddedKafka`. * Keep the default replication factor of 3. * Adding tests to verify
1 parent 202a6e5 commit 3769c24

File tree

2 files changed

+30
-1
lines changed

2 files changed

+30
-1
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizer.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2023 the original author or authors.
2+
* Copyright 2017-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -47,13 +47,16 @@
4747
* @author Oleg Artyomov
4848
* @author Sergio Lourenco
4949
* @author Pawel Lozinski
50+
* @author Seonghwan Lee
5051
*
5152
* @since 1.3
5253
*/
5354
class EmbeddedKafkaContextCustomizer implements ContextCustomizer {
5455

5556
private final EmbeddedKafka embeddedKafka;
5657

58+
private final String TRANSACTION_STATE_LOG_REPLICATION_FACTOR = "transaction.state.log.replication.factor";
59+
5760
EmbeddedKafkaContextCustomizer(EmbeddedKafka embeddedKafka) {
5861
this.embeddedKafka = embeddedKafka;
5962
}
@@ -121,6 +124,8 @@ public void customizeContext(ConfigurableApplicationContext context, MergedConte
121124
}
122125
}
123126

127+
properties.putIfAbsent(TRANSACTION_STATE_LOG_REPLICATION_FACTOR, String.valueOf(Math.min(3, embeddedKafka.count())));
128+
124129
embeddedKafkaBroker.brokerProperties((Map<String, String>) (Map<?, ?>) properties);
125130
if (StringUtils.hasText(this.embeddedKafka.bootstrapServersProperty())) {
126131
embeddedKafkaBroker.brokerListProperty(this.embeddedKafka.bootstrapServersProperty());

spring-kafka-test/src/test/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizerTests.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.kafka.test.context;
1818

19+
import java.util.Map;
20+
1921
import org.junit.jupiter.api.BeforeEach;
2022
import org.junit.jupiter.api.Test;
2123

@@ -25,13 +27,15 @@
2527
import org.springframework.kafka.test.EmbeddedKafkaBroker;
2628
import org.springframework.kafka.test.utils.KafkaTestUtils;
2729

30+
2831
import static org.assertj.core.api.Assertions.assertThat;
2932

3033
/**
3134
* @author Oleg Artyomov
3235
* @author Sergio Lourenco
3336
* @author Artem Bilan
3437
* @author Gary Russell
38+
* @author Seonghwan Lee
3539
*
3640
* @since 1.3
3741
*/
@@ -91,6 +95,21 @@ void testMulti() {
9195
.matches("127.0.0.1:[0-9]+,127.0.0.1:[0-9]+");
9296
}
9397

98+
@Test
99+
void testTransactionReplicationFactor() {
100+
EmbeddedKafka annotationWithPorts =
101+
AnnotationUtils.findAnnotation(TestWithEmbeddedKafkaTransactionFactor.class, EmbeddedKafka.class);
102+
EmbeddedKafkaContextCustomizer customizer = new EmbeddedKafkaContextCustomizer(annotationWithPorts);
103+
ConfigurableApplicationContext context = new GenericApplicationContext();
104+
customizer.customizeContext(context, null);
105+
context.refresh();
106+
107+
EmbeddedKafkaBroker embeddedKafkaBroker = context.getBean(EmbeddedKafkaBroker.class);
108+
Map<String, Object> properties = (Map<String, Object>) KafkaTestUtils.getPropertyValue(embeddedKafkaBroker, "brokerProperties");
109+
110+
assertThat(properties.get("transaction.state.log.replication.factor")).isEqualTo("2");
111+
}
112+
94113
@EmbeddedKafka(kraft = false)
95114
private static final class TestWithEmbeddedKafka {
96115

@@ -111,4 +130,9 @@ private static final class TestWithEmbeddedKafkaMulti {
111130

112131
}
113132

133+
@EmbeddedKafka(kraft = false, count = 2)
134+
private static final class TestWithEmbeddedKafkaTransactionFactor {
135+
136+
}
137+
114138
}

0 commit comments

Comments
 (0)