Skip to content

Commit 690743e

Browse files
artembilangaryrussell
authored andcommitted
Use Global Embedded Kafka whenever possible
For better test suite lifecycle (higher performance) reuse one global embedded Kafka broker introduced in Spring for Apache Kafka `3.0` Some tests have left with their own `EmbeddedKafkaBroker` definitions since they rely on different partitions
1 parent 5a178de commit 690743e

File tree

9 files changed

+176
-176
lines changed

9 files changed

+176
-176
lines changed

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/channnel/ChannelTests.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.junit.jupiter.api.Test;
2828

2929
import org.springframework.beans.factory.annotation.Autowired;
30+
import org.springframework.beans.factory.annotation.Value;
3031
import org.springframework.context.annotation.Bean;
3132
import org.springframework.context.annotation.Configuration;
3233
import org.springframework.integration.channel.NullChannel;
@@ -44,8 +45,6 @@
4445
import org.springframework.kafka.core.ProducerFactory;
4546
import org.springframework.kafka.listener.ConsumerProperties;
4647
import org.springframework.kafka.support.KafkaHeaders;
47-
import org.springframework.kafka.test.EmbeddedKafkaBroker;
48-
import org.springframework.kafka.test.context.EmbeddedKafka;
4948
import org.springframework.kafka.test.utils.KafkaTestUtils;
5049
import org.springframework.messaging.Message;
5150
import org.springframework.messaging.PollableChannel;
@@ -61,7 +60,6 @@
6160
*
6261
*/
6362
@SpringJUnitConfig
64-
@EmbeddedKafka(topics = { "channel.1", "channel.2", "channel.3" }, partitions = 1)
6563
public class ChannelTests {
6664

6765
@Test
@@ -118,17 +116,18 @@ void pollable(@Autowired PollableChannel pollable) {
118116
@Configuration
119117
public static class Config {
120118

121-
@Autowired
122-
private EmbeddedKafkaBroker broker;
119+
@Value("${spring.global.embedded.kafka.brokers}")
120+
String embeddedKafkaBrokers;
123121

124122
@Bean
125123
public ProducerFactory<Integer, String> pf() {
126-
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(this.broker));
124+
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(this.embeddedKafkaBrokers));
127125
}
128126

129127
@Bean
130128
public ConsumerFactory<Integer, String> cf() {
131-
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("channelTests", "false", this.broker);
129+
Map<String, Object> consumerProps =
130+
KafkaTestUtils.consumerProps(this.embeddedKafkaBrokers, "channelTests", "false");
132131
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
133132
return new DefaultKafkaConsumerFactory<>(consumerProps);
134133
}

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/AllXmlTests-context.xml

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,18 @@
33
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
44
xmlns:int="http://www.springframework.org/schema/integration"
55
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
6+
xmlns:context="http://www.springframework.org/schema/context"
67
xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
78
http://www.springframework.org/schema/integration/kafka https://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
8-
http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd">
9+
http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd
10+
http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd">
11+
12+
<context:property-placeholder />
913

1014
<bean id="cf" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
1115
<constructor-arg>
1216
<map>
13-
<entry key="bootstrap.servers" value="#{embeddedKafka.brokersAsString}"/>
17+
<entry key="bootstrap.servers" value="${spring.global.embedded.kafka.brokers}"/>
1418
<entry key="auto.offset.reset" value="earliest"/>
1519
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
1620
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
@@ -22,7 +26,7 @@
2226
<bean id="pf" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
2327
<constructor-arg>
2428
<map>
25-
<entry key="bootstrap.servers" value="#{embeddedKafka.brokersAsString}"/>
29+
<entry key="bootstrap.servers" value="${spring.global.embedded.kafka.brokers}"/>
2630
<entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
2731
<entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
2832
</map>

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/AllXmlTests.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222

2323
import org.springframework.beans.factory.annotation.Autowired;
2424
import org.springframework.kafka.core.KafkaTemplate;
25-
import org.springframework.kafka.test.context.EmbeddedKafka;
2625
import org.springframework.messaging.Message;
2726
import org.springframework.messaging.PollableChannel;
2827
import org.springframework.test.annotation.DirtiesContext;
@@ -36,7 +35,6 @@
3635
*/
3736
@SpringJUnitConfig
3837
@DirtiesContext
39-
@EmbeddedKafka(topics = { "one", "two", "three", "four" })
4038
public class AllXmlTests {
4139

4240
@Autowired

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/ChannelParserTests.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,16 +28,17 @@
2828
import org.springframework.integration.kafka.channel.PollableKafkaChannel;
2929
import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
3030
import org.springframework.integration.kafka.inbound.KafkaMessageSource;
31+
import org.springframework.integration.test.util.TestUtils;
3132
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
3233
import org.springframework.kafka.config.KafkaListenerContainerFactory;
3334
import org.springframework.kafka.core.ConsumerFactory;
3435
import org.springframework.kafka.core.KafkaOperations;
3536
import org.springframework.kafka.listener.ConsumerProperties;
36-
import org.springframework.kafka.test.utils.KafkaTestUtils;
3737
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
3838

3939
/**
4040
* @author Gary Russell
41+
* @author Artem Bilan
4142
*
4243
* @since 5.4
4344
*
@@ -65,17 +66,17 @@ public class ChannelParserTests {
6566

6667
@Test
6768
void testParser() {
68-
assertThat(KafkaTestUtils.getPropertyValue(this.ptp, "topic")).isEqualTo("ptpTopic");
69-
assertThat(KafkaTestUtils.getPropertyValue(this.pubSub, "topic")).isEqualTo("pubSubTopic");
70-
assertThat(KafkaTestUtils.getPropertyValue(this.ptp, "container")).isNotNull();
71-
assertThat(KafkaTestUtils.getPropertyValue(this.pubSub, "container")).isNotNull();
72-
assertThat(KafkaTestUtils.getPropertyValue(this.ptp, "template")).isSameAs(this.template);
73-
assertThat(KafkaTestUtils.getPropertyValue(this.pubSub, "template")).isSameAs(this.template);
74-
assertThat(KafkaTestUtils.getPropertyValue(this.pollable, "template")).isSameAs(this.template);
75-
assertThat(KafkaTestUtils.getPropertyValue(this.pollable, "source")).isSameAs(this.source);
76-
assertThat(KafkaTestUtils.getPropertyValue(this.ptp, "groupId")).isEqualTo("ptpGroup");
77-
assertThat(KafkaTestUtils.getPropertyValue(this.pubSub, "groupId")).isEqualTo("pubSubGroup");
78-
assertThat(KafkaTestUtils.getPropertyValue(this.pollable, "groupId")).isEqualTo("pollableGroup");
69+
assertThat(TestUtils.getPropertyValue(this.ptp, "topic")).isEqualTo("ptpTopic");
70+
assertThat(TestUtils.getPropertyValue(this.pubSub, "topic")).isEqualTo("pubSubTopic");
71+
assertThat(TestUtils.getPropertyValue(this.ptp, "container")).isNotNull();
72+
assertThat(TestUtils.getPropertyValue(this.pubSub, "container")).isNotNull();
73+
assertThat(TestUtils.getPropertyValue(this.ptp, "template")).isSameAs(this.template);
74+
assertThat(TestUtils.getPropertyValue(this.pubSub, "template")).isSameAs(this.template);
75+
assertThat(TestUtils.getPropertyValue(this.pollable, "template")).isSameAs(this.template);
76+
assertThat(TestUtils.getPropertyValue(this.pollable, "source")).isSameAs(this.source);
77+
assertThat(TestUtils.getPropertyValue(this.ptp, "groupId")).isEqualTo("ptpGroup");
78+
assertThat(TestUtils.getPropertyValue(this.pubSub, "groupId")).isEqualTo("pubSubGroup");
79+
assertThat(TestUtils.getPropertyValue(this.pollable, "groupId")).isEqualTo("pollableGroup");
7980
}
8081

8182
@Configuration

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
import org.springframework.beans.factory.annotation.Autowired;
3737
import org.springframework.beans.factory.annotation.Qualifier;
38+
import org.springframework.beans.factory.annotation.Value;
3839
import org.springframework.context.annotation.Bean;
3940
import org.springframework.context.annotation.Configuration;
4041
import org.springframework.integration.IntegrationMessageHeaderAccessor;
@@ -71,8 +72,6 @@
7172
import org.springframework.kafka.support.Acknowledgment;
7273
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
7374
import org.springframework.kafka.support.KafkaHeaders;
74-
import org.springframework.kafka.test.EmbeddedKafkaBroker;
75-
import org.springframework.kafka.test.context.EmbeddedKafka;
7675
import org.springframework.kafka.test.utils.KafkaTestUtils;
7776
import org.springframework.messaging.Message;
7877
import org.springframework.messaging.MessageChannel;
@@ -97,9 +96,6 @@
9796
*/
9897
@SpringJUnitConfig
9998
@DirtiesContext
100-
@EmbeddedKafka(topics = { KafkaDslTests.TEST_TOPIC1, KafkaDslTests.TEST_TOPIC2, KafkaDslTests.TEST_TOPIC3,
101-
KafkaDslTests.TEST_TOPIC4, KafkaDslTests.TEST_TOPIC5, KafkaDslTests.TEST_TOPIC6, KafkaDslTests.TEST_TOPIC7,
102-
KafkaDslTests.TEST_TOPIC8, KafkaDslTests.TEST_TOPIC9 })
10399
public class KafkaDslTests {
104100

105101
static final String TEST_TOPIC1 = "test-topic1";
@@ -118,8 +114,6 @@ public class KafkaDslTests {
118114

119115
static final String TEST_TOPIC8 = "test-topic8";
120116

121-
static final String TEST_TOPIC9 = "test-topic9";
122-
123117
@Autowired
124118
@Qualifier("sendToKafkaFlow.input")
125119
private MessageChannel sendToKafkaFlowInput;
@@ -266,7 +260,8 @@ void channels(@Autowired MessageChannel topic6Channel, @Autowired PollableKafkaC
266260
assertThat(received)
267261
.isNotNull()
268262
.extracting("payload")
269-
.isEqualTo("foo"); }
263+
.isEqualTo("foo");
264+
}
270265

271266
@Configuration
272267
@EnableIntegration
@@ -281,14 +276,12 @@ public static class ContextConfiguration {
281276

282277
private Object fromSource;
283278

284-
@Autowired
285-
private EmbeddedKafkaBroker embeddedKafka;
286-
279+
@Value("${spring.global.embedded.kafka.brokers}")
280+
String embeddedKafkaBrokers;
287281

288282
@Bean
289283
public ConsumerFactory<Integer, String> consumerFactory() {
290-
Map<String, Object> props = KafkaTestUtils
291-
.consumerProps("test1", "false", this.embeddedKafka);
284+
Map<String, Object> props = KafkaTestUtils.consumerProps(this.embeddedKafkaBrokers, "test1", "false");
292285
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
293286
return new DefaultKafkaConsumerFactory<>(props);
294287
}
@@ -302,7 +295,7 @@ public PollableChannel errorChannel() {
302295
public IntegrationFlow topic1ListenerFromKafkaFlow() {
303296
return IntegrationFlow
304297
.from(Kafka.messageDrivenChannelAdapter(consumerFactory(),
305-
KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC1)
298+
KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC1)
306299
.configureListenerContainer(c ->
307300
c.ackMode(ContainerProperties.AckMode.MANUAL)
308301
.idleEventInterval(100L)
@@ -344,7 +337,7 @@ public IntegrationFlow topic2ListenerFromKafkaFlow() {
344337

345338
@Bean
346339
public ProducerFactory<Integer, String> producerFactory() {
347-
Map<String, Object> props = KafkaTestUtils.producerProps(this.embeddedKafka);
340+
Map<String, Object> props = KafkaTestUtils.producerProps(this.embeddedKafkaBrokers);
348341
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
349342
return new DefaultKafkaProducerFactory<>(props);
350343
}
@@ -479,8 +472,8 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
479472
public IntegrationFlow serverGateway() {
480473
return IntegrationFlow
481474
.from(Kafka.inboundGateway(consumerFactory(), containerProperties(),
482-
producerFactory())
483-
.configureListenerContainer(container -> container.errorHandler(eh())))
475+
producerFactory())
476+
.configureListenerContainer(container -> container.errorHandler(eh())))
484477
.<String, String>transform(String::toUpperCase)
485478
.get();
486479
}
@@ -495,9 +488,9 @@ IntegrationFlow withRecoveringErrorHandler() {
495488
ContainerProperties props = containerProperties();
496489
props.setGroupId("wreh");
497490
return IntegrationFlow.from(Kafka.messageDrivenChannelAdapter(consumerFactory(), props)
498-
.configureListenerContainer(container -> {
499-
container.errorHandler(recoveringErrorHandler());
500-
}))
491+
.configureListenerContainer(container -> {
492+
container.errorHandler(recoveringErrorHandler());
493+
}))
501494
.handle(p -> {
502495
throw new RuntimeException("test");
503496
})

0 commit comments

Comments
 (0)