Skip to content

Commit 92c77b3

Browse files
garyrussellartembilan
authored andcommitted
GH-666: KafkaEmbedded doWithAdmin, addTopics
Resolves #666 Allow arbitrary `AdminClient` operations and adding topics. Polishing - PR Comments
1 parent 75375f0 commit 92c77b3

File tree

2 files changed

+84
-9
lines changed

2 files changed

+84
-9
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/KafkaEmbedded.java

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.springframework.retry.support.RetryTemplate;
6161
import org.springframework.util.Assert;
6262

63+
import kafka.common.KafkaException;
6364
import kafka.server.KafkaConfig;
6465
import kafka.server.KafkaServer;
6566
import kafka.server.NotRunning;
@@ -233,19 +234,45 @@ public void before() throws Exception { //NOSONAR
233234
this.kafkaPorts[i] = TestUtils.boundPort(server, SecurityProtocol.PLAINTEXT);
234235
}
235236
}
236-
Map<String, Object> adminConfigs = new HashMap<>();
237-
adminConfigs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokersAsString());
238-
AdminClient admin = AdminClient.create(adminConfigs);
239-
List<NewTopic> newTopics = Arrays.stream(this.topics)
240-
.map(t -> new NewTopic(t, this.partitionsPerTopic, (short) this.count))
241-
.collect(Collectors.toList());
242-
CreateTopicsResult createTopics = admin.createTopics(newTopics);
243-
createTopics.all().get();
244-
admin.close();
237+
addTopics(this.topics);
245238
System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, getBrokersAsString());
246239
System.setProperty(SPRING_EMBEDDED_ZOOKEEPER_CONNECT, getZookeeperConnectionString());
247240
}
248241

242+
/**
243+
* Add topics to the existing broker(s) using the configured number of partitions.
244+
* @param topics the topics.
245+
* @since 2.1
246+
*/
247+
public void addTopics(String... topics) {
248+
doWithAdmin(admin -> {
249+
List<NewTopic> newTopics = Arrays.stream(topics)
250+
.map(t -> new NewTopic(t, this.partitionsPerTopic, (short) this.count))
251+
.collect(Collectors.toList());
252+
CreateTopicsResult createTopics = admin.createTopics(newTopics);
253+
try {
254+
createTopics.all().get();
255+
}
256+
catch (Exception e) {
257+
throw new KafkaException(e);
258+
}
259+
});
260+
}
261+
262+
/**
263+
* Create an {@link AdminClient} invoke the callback and reliable close the
264+
* admin.
265+
* @param callback the callback.
266+
* @since 2.1
267+
*/
268+
public void doWithAdmin(java.util.function.Consumer<AdminClient> callback) {
269+
Map<String, Object> adminConfigs = new HashMap<>();
270+
adminConfigs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokersAsString());
271+
try (AdminClient admin = AdminClient.create(adminConfigs)) {
272+
callback.accept(admin);
273+
}
274+
}
275+
249276
public Properties createBrokerProperties(int i) {
250277
if (testUtilsCreateBrokerConfigMethod == null) {
251278
return TestUtils.createBrokerConfig(i, this.zkConnect, this.controlledShutdown,

src/reference/asciidoc/testing.adoc

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,54 @@ Convenient constants `KafkaEmbedded.SPRING_EMBEDDED_KAFKA_BROKERS` and `KafkaEmb
102102
With the `KafkaEmbedded.brokerProperties(Map<String, String>)` you can provide additional properties for the Kafka server(s).
103103
See https://kafka.apache.org/documentation/#brokerconfigs[Kafka Config] for more information about possible broker properties.
104104

105+
106+
==== Using the Same Broker(s) for Multiple Test Classes
107+
108+
There is no built-in support for this, but it can be achieved with something similar to the following:
109+
110+
[source, java]
111+
----
112+
public final class KafkaEmbeddedHolder {
113+
114+
private static KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, false);
115+
116+
private static boolean started;
117+
118+
public static KafkaEmbedded getKafkaEmbedded() {
119+
if (!started) {
120+
try {
121+
kafkaEmbedded.before();
122+
}
123+
catch (Exception e) {
124+
throw new KafkaException(e);
125+
}
126+
started = true;
127+
}
128+
return kafkaEmbedded;
129+
}
130+
131+
private KafkaEmbeddedHolder() {
132+
super();
133+
}
134+
135+
}
136+
----
137+
138+
And then, in each test class:
139+
140+
[source, java]
141+
----
142+
static {
143+
KafkaEmbeddedHolder.getKafkaEmbedded().addTopics(topic1, topic2);
144+
}
145+
146+
private static KafkaEmbedded embeddedKafka = KafkaEmbeddedHolder.getKafkaEmbedded();
147+
----
148+
149+
IMPORTANT: This example provides no mechanism for shutting down the broker(s) when all tests are complete.
150+
This could be a problem if, say, you run your tests in a gradle daemon.
151+
You should not use this technique in such a situation, or use something to call `destroy()` on the `KafkaEmbedded` when your tests are complete.
152+
105153
==== @EmbeddedKafka Annotation
106154
It is generally recommended to use the rule as a `@ClassRule` to avoid starting/stopping the broker between tests (and use a different topic for each test).
107155
Starting with _version 2.0_, if you are using Spring's test application context caching, you can also declare a `KafkaEmbedded` bean, so a single broker can be used across multiple test classes.

0 commit comments

Comments
 (0)