Skip to content

Commit 060c96f

Browse files
committed
Move KafkaItemReader integration tests to a separate class
Tests requiring a kafka broker should be part of the integration tests suite, not the unit tests suite.
1 parent d6904ba commit 060c96f

File tree

2 files changed

+335
-303
lines changed

2 files changed

+335
-303
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,329 @@
1+
/*
2+
* Copyright 2019-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.batch.item.kafka;
18+
19+
import java.time.Duration;
20+
import java.util.ArrayList;
21+
import java.util.HashMap;
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.Properties;
25+
import java.util.concurrent.CompletableFuture;
26+
import java.util.concurrent.ExecutionException;
27+
28+
import org.apache.kafka.clients.admin.NewTopic;
29+
import org.apache.kafka.clients.consumer.ConsumerConfig;
30+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
31+
import org.apache.kafka.common.TopicPartition;
32+
import org.apache.kafka.common.serialization.StringDeserializer;
33+
import org.junit.jupiter.api.BeforeAll;
34+
import org.junit.jupiter.api.BeforeEach;
35+
import org.junit.jupiter.api.Test;
36+
import org.junit.jupiter.api.extension.ExtendWith;
37+
38+
import org.springframework.batch.item.ExecutionContext;
39+
import org.springframework.beans.factory.annotation.Autowired;
40+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
41+
import org.springframework.kafka.core.KafkaTemplate;
42+
import org.springframework.kafka.core.ProducerFactory;
43+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
44+
import org.springframework.kafka.test.context.EmbeddedKafka;
45+
import org.springframework.kafka.test.utils.KafkaTestUtils;
46+
import org.springframework.test.context.junit.jupiter.SpringExtension;
47+
48+
import static org.hamcrest.MatcherAssert.assertThat;
49+
import static org.hamcrest.Matchers.containsInAnyOrder;
50+
import static org.hamcrest.Matchers.is;
51+
import static org.junit.jupiter.api.Assertions.assertEquals;
52+
import static org.junit.jupiter.api.Assertions.assertNull;
53+
54+
/**
55+
* @author Mathieu Ouellet
56+
* @author Mahmoud Ben Hassine
57+
*/
58+
@EmbeddedKafka
59+
@ExtendWith(SpringExtension.class)
60+
class KafkaItemReaderIntegrationTests {
61+
62+
@Autowired
63+
private EmbeddedKafkaBroker embeddedKafka;
64+
65+
private KafkaItemReader<String, String> reader;
66+
67+
private KafkaTemplate<String, String> template;
68+
69+
private Properties consumerProperties;
70+
71+
@BeforeAll
72+
static void setUpTopics(@Autowired EmbeddedKafkaBroker embeddedKafka) {
73+
embeddedKafka.addTopics(new NewTopic("topic1", 1, (short) 1), new NewTopic("topic2", 2, (short) 1),
74+
new NewTopic("topic3", 1, (short) 1), new NewTopic("topic4", 2, (short) 1),
75+
new NewTopic("topic5", 1, (short) 1), new NewTopic("topic6", 1, (short) 1));
76+
}
77+
78+
@BeforeEach
79+
void setUp() {
80+
Map<String, Object> producerProperties = KafkaTestUtils.producerProps(embeddedKafka);
81+
ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerProperties);
82+
this.template = new KafkaTemplate<>(producerFactory);
83+
84+
this.consumerProperties = new Properties();
85+
this.consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
86+
embeddedKafka.getBrokersAsString());
87+
this.consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "1");
88+
this.consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
89+
StringDeserializer.class.getName());
90+
this.consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
91+
StringDeserializer.class.getName());
92+
}
93+
94+
@Test
95+
void testReadFromSinglePartition() throws ExecutionException, InterruptedException {
96+
this.template.setDefaultTopic("topic1");
97+
var futures = new ArrayList<CompletableFuture<?>>();
98+
futures.add(this.template.sendDefault("val0"));
99+
futures.add(this.template.sendDefault("val1"));
100+
futures.add(this.template.sendDefault("val2"));
101+
futures.add(this.template.sendDefault("val3"));
102+
for (var future : futures) {
103+
future.get();
104+
}
105+
106+
this.reader = new KafkaItemReader<>(this.consumerProperties, "topic1", 0);
107+
this.reader.setPollTimeout(Duration.ofSeconds(1));
108+
this.reader.open(new ExecutionContext());
109+
110+
String item = this.reader.read();
111+
assertThat(item, is("val0"));
112+
113+
item = this.reader.read();
114+
assertThat(item, is("val1"));
115+
116+
item = this.reader.read();
117+
assertThat(item, is("val2"));
118+
119+
item = this.reader.read();
120+
assertThat(item, is("val3"));
121+
122+
item = this.reader.read();
123+
assertNull(item);
124+
125+
this.reader.close();
126+
}
127+
128+
@Test
129+
void testReadFromSinglePartitionFromCustomOffset() throws ExecutionException, InterruptedException {
130+
this.template.setDefaultTopic("topic5");
131+
var futures = new ArrayList<CompletableFuture<?>>();
132+
futures.add(this.template.sendDefault("val0")); // <-- offset 0
133+
futures.add(this.template.sendDefault("val1")); // <-- offset 1
134+
futures.add(this.template.sendDefault("val2")); // <-- offset 2
135+
futures.add(this.template.sendDefault("val3")); // <-- offset 3
136+
for (var future : futures) {
137+
future.get();
138+
}
139+
140+
this.reader = new KafkaItemReader<>(this.consumerProperties, "topic5", 0);
141+
142+
// specify which offset to start from
143+
Map<TopicPartition, Long> partitionOffsets = new HashMap<>();
144+
partitionOffsets.put(new TopicPartition("topic5", 0), 2L);
145+
this.reader.setPartitionOffsets(partitionOffsets);
146+
147+
this.reader.setPollTimeout(Duration.ofSeconds(1));
148+
this.reader.open(new ExecutionContext());
149+
150+
String item = this.reader.read();
151+
assertThat(item, is("val2"));
152+
153+
item = this.reader.read();
154+
assertThat(item, is("val3"));
155+
156+
item = this.reader.read();
157+
assertNull(item);
158+
159+
this.reader.close();
160+
}
161+
162+
@Test
163+
void testReadFromSinglePartitionFromTheOffsetStoredInKafka() throws Exception {
164+
// first run: read a topic from the beginning
165+
166+
this.template.setDefaultTopic("topic6");
167+
var futures = new ArrayList<CompletableFuture<?>>();
168+
futures.add(this.template.sendDefault("val0")); // <-- offset 0
169+
futures.add(this.template.sendDefault("val1")); // <-- offset 1
170+
for (var future : futures) {
171+
future.get();
172+
}
173+
this.reader = new KafkaItemReader<>(this.consumerProperties, "topic6", 0);
174+
this.reader.setPollTimeout(Duration.ofSeconds(1));
175+
this.reader.open(new ExecutionContext());
176+
177+
String item = this.reader.read();
178+
assertThat(item, is("val0"));
179+
180+
item = this.reader.read();
181+
assertThat(item, is("val1"));
182+
183+
item = this.reader.read();
184+
assertNull(item);
185+
186+
this.reader.close();
187+
188+
// The offset stored in Kafka should be equal to 2 at this point
189+
OffsetAndMetadata currentOffset = KafkaTestUtils.getCurrentOffset(embeddedKafka.getBrokersAsString(), "1",
190+
"topic6", 0);
191+
assertEquals(2, currentOffset.offset());
192+
193+
// second run (with same consumer group ID): new messages arrived since the last
194+
// run.
195+
196+
this.template.sendDefault("val2"); // <-- offset 2
197+
this.template.sendDefault("val3"); // <-- offset 3
198+
199+
this.reader = new KafkaItemReader<>(this.consumerProperties, "topic6", 0);
200+
// Passing an empty map means the reader should start from the offset stored in
201+
// Kafka (offset 2 in this case)
202+
this.reader.setPartitionOffsets(new HashMap<>());
203+
this.reader.setPollTimeout(Duration.ofSeconds(1));
204+
this.reader.open(new ExecutionContext());
205+
206+
item = this.reader.read();
207+
assertThat(item, is("val2"));
208+
209+
item = this.reader.read();
210+
assertThat(item, is("val3"));
211+
212+
item = this.reader.read();
213+
assertNull(item);
214+
215+
this.reader.close();
216+
}
217+
218+
@Test
219+
void testReadFromMultiplePartitions() throws ExecutionException, InterruptedException {
220+
this.template.setDefaultTopic("topic2");
221+
var futures = new ArrayList<CompletableFuture<?>>();
222+
futures.add(this.template.sendDefault("val0"));
223+
futures.add(this.template.sendDefault("val1"));
224+
futures.add(this.template.sendDefault("val2"));
225+
futures.add(this.template.sendDefault("val3"));
226+
for (var future : futures) {
227+
future.get();
228+
}
229+
230+
this.reader = new KafkaItemReader<>(this.consumerProperties, "topic2", 0, 1);
231+
this.reader.setPollTimeout(Duration.ofSeconds(1));
232+
this.reader.open(new ExecutionContext());
233+
234+
List<String> items = new ArrayList<>();
235+
items.add(this.reader.read());
236+
items.add(this.reader.read());
237+
items.add(this.reader.read());
238+
items.add(this.reader.read());
239+
assertThat(items, containsInAnyOrder("val0", "val1", "val2", "val3"));
240+
String item = this.reader.read();
241+
assertNull(item);
242+
243+
this.reader.close();
244+
}
245+
246+
@Test
247+
void testReadFromSinglePartitionAfterRestart() throws ExecutionException, InterruptedException {
248+
this.template.setDefaultTopic("topic3");
249+
var futures = new ArrayList<CompletableFuture<?>>();
250+
futures.add(this.template.sendDefault("val0"));
251+
futures.add(this.template.sendDefault("val1"));
252+
futures.add(this.template.sendDefault("val2"));
253+
futures.add(this.template.sendDefault("val3"));
254+
futures.add(this.template.sendDefault("val4"));
255+
for (var future : futures) {
256+
future.get();
257+
}
258+
ExecutionContext executionContext = new ExecutionContext();
259+
Map<TopicPartition, Long> offsets = new HashMap<>();
260+
offsets.put(new TopicPartition("topic3", 0), 1L);
261+
executionContext.put("topic.partition.offsets", offsets);
262+
263+
// topic3-0: val0, val1, val2, val3, val4
264+
// ^
265+
// |
266+
// last committed offset = 1 (should restart from offset = 2)
267+
268+
this.reader = new KafkaItemReader<>(this.consumerProperties, "topic3", 0);
269+
this.reader.setPollTimeout(Duration.ofSeconds(1));
270+
this.reader.open(executionContext);
271+
272+
List<String> items = new ArrayList<>();
273+
items.add(this.reader.read());
274+
items.add(this.reader.read());
275+
items.add(this.reader.read());
276+
assertThat(items, containsInAnyOrder("val2", "val3", "val4"));
277+
String item = this.reader.read();
278+
assertNull(item);
279+
280+
this.reader.close();
281+
}
282+
283+
@Test
284+
void testReadFromMultiplePartitionsAfterRestart() throws ExecutionException, InterruptedException {
285+
var futures = new ArrayList<CompletableFuture<?>>();
286+
futures.add(this.template.send("topic4", 0, null, "val0"));
287+
futures.add(this.template.send("topic4", 0, null, "val2"));
288+
futures.add(this.template.send("topic4", 0, null, "val4"));
289+
futures.add(this.template.send("topic4", 0, null, "val6"));
290+
futures.add(this.template.send("topic4", 1, null, "val1"));
291+
futures.add(this.template.send("topic4", 1, null, "val3"));
292+
futures.add(this.template.send("topic4", 1, null, "val5"));
293+
futures.add(this.template.send("topic4", 1, null, "val7"));
294+
295+
for (var future : futures) {
296+
future.get();
297+
}
298+
299+
ExecutionContext executionContext = new ExecutionContext();
300+
Map<TopicPartition, Long> offsets = new HashMap<>();
301+
offsets.put(new TopicPartition("topic4", 0), 1L);
302+
offsets.put(new TopicPartition("topic4", 1), 2L);
303+
executionContext.put("topic.partition.offsets", offsets);
304+
305+
// topic4-0: val0, val2, val4, val6
306+
// ^
307+
// |
308+
// last committed offset = 1 (should restart from offset = 2)
309+
// topic4-1: val1, val3, val5, val7
310+
// ^
311+
// |
312+
// last committed offset = 2 (should restart from offset = 3)
313+
314+
this.reader = new KafkaItemReader<>(this.consumerProperties, "topic4", 0, 1);
315+
this.reader.setPollTimeout(Duration.ofSeconds(1));
316+
this.reader.open(executionContext);
317+
318+
List<String> items = new ArrayList<>();
319+
items.add(this.reader.read());
320+
items.add(this.reader.read());
321+
items.add(this.reader.read());
322+
assertThat(items, containsInAnyOrder("val4", "val6", "val7"));
323+
String item = this.reader.read();
324+
assertNull(item);
325+
326+
this.reader.close();
327+
}
328+
329+
}

0 commit comments

Comments
 (0)