Skip to content

Commit f90c8e1

Browse files
gurpiarbassigaryrussell
authored andcommitted
GH-2240: Bug fix for KafkaTemplate.receive(..)
Fixes GH-2240 (#2240) - Ensures that we dont get ConsumerRecords with nulls in the list - Ensures that we do not allow null offsets
1 parent 63e56a7 commit f90c8e1

File tree

2 files changed

+36
-4
lines changed

2 files changed

+36
-4
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@
7070
import org.springframework.util.concurrent.ListenableFuture;
7171
import org.springframework.util.concurrent.SettableListenableFuture;
7272

73-
7473
/**
7574
* A template for executing high-level operations. When used with a
7675
* {@link DefaultKafkaProducerFactory}, the template is thread-safe. The producer factory
@@ -87,6 +86,7 @@
8786
* @author Biju Kunjummen
8887
* @author Endika Gutierrez
8988
* @author Thomas Strauß
89+
* @author Gurps Bassi
9090
*/
9191
public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationContextAware, BeanNameAware,
9292
ApplicationListener<ContextStoppedEvent>, DisposableBean {
@@ -583,8 +583,14 @@ public ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested,
583583
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new LinkedHashMap<>();
584584
try (Consumer<K, V> consumer = this.consumerFactory.createConsumer(null, null, null, props)) {
585585
requested.forEach(tpo -> {
586+
if (tpo.getOffset() == null || tpo.getOffset() < 0) {
587+
throw new KafkaException("Offset supplied in TopicPartitionOffset is invalid: " + tpo);
588+
}
586589
ConsumerRecord<K, V> one = receiveOne(tpo.getTopicPartition(), tpo.getOffset(), pollTimeout, consumer);
587-
records.computeIfAbsent(tpo.getTopicPartition(), tp -> new ArrayList<>()).add(one);
590+
List<ConsumerRecord<K, V>> consumerRecords = records.computeIfAbsent(tpo.getTopicPartition(), tp -> new ArrayList<>());
591+
if (one != null) {
592+
consumerRecords.add(one);
593+
}
588594
});
589595
return new ConsumerRecords<>(records);
590596
}

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@
6565
import org.junit.jupiter.api.AfterAll;
6666
import org.junit.jupiter.api.BeforeAll;
6767
import org.junit.jupiter.api.Test;
68+
import org.junit.jupiter.params.ParameterizedTest;
69+
import org.junit.jupiter.params.provider.NullSource;
70+
import org.junit.jupiter.params.provider.ValueSource;
6871

6972
import org.springframework.aop.framework.ProxyFactory;
7073
import org.springframework.kafka.KafkaException;
@@ -94,6 +97,7 @@
9497
* @author Biju Kunjummen
9598
* @author Endika Gutierrez
9699
* @author Thomas Strauß
100+
* @author Gurps Bassi
97101
*/
98102
@EmbeddedKafka(topics = { KafkaTemplateTests.INT_KEY_TOPIC, KafkaTemplateTests.STRING_KEY_TOPIC })
99103
public class KafkaTemplateTests {
@@ -153,6 +157,12 @@ void testTemplate() {
153157

154158
template.setDefaultTopic(INT_KEY_TOPIC);
155159

160+
template.setConsumerFactory(
161+
new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps("xx", "false", embeddedKafka)));
162+
ConsumerRecords<Integer, String> initialRecords =
163+
template.receive(Collections.singleton(new TopicPartitionOffset(INT_KEY_TOPIC, 1, 1L)));
164+
assertThat(initialRecords).isEmpty();
165+
156166
template.sendDefault("foo");
157167
assertThat(KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC)).has(value("foo"));
158168

@@ -191,8 +201,7 @@ void testTemplate() {
191201
assertThat(partitions).isNotNull();
192202
assertThat(partitions).hasSize(2);
193203
assertThat(KafkaTestUtils.getPropertyValue(pf.createProducer(), "delegate")).isSameAs(wrapped.get());
194-
template.setConsumerFactory(
195-
new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps("xx", "false", embeddedKafka)));
204+
196205
ConsumerRecord<Integer, String> receive = template.receive(INT_KEY_TOPIC, 1, received.offset());
197206
assertThat(receive).has(allOf(keyValue(2, "buz"), partition(1)))
198207
.extracting(ConsumerRecord::offset)
@@ -554,4 +563,21 @@ void testFutureFailureOnSend() {
554563
pf.destroy();
555564
}
556565

566+
@ParameterizedTest(name = "{0} is invalid")
567+
@NullSource
568+
@ValueSource(longs = -1)
569+
void testReceiveWhenOffsetIsInvalid(Long offset) {
570+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
571+
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
572+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
573+
574+
template.setConsumerFactory(
575+
new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps("xx", "false", embeddedKafka)));
576+
TopicPartitionOffset tpoWithNullOffset = new TopicPartitionOffset(INT_KEY_TOPIC, 1, offset);
577+
578+
assertThatExceptionOfType(KafkaException.class)
579+
.isThrownBy(() -> template.receive(Collections.singleton(tpoWithNullOffset)))
580+
.withMessage("Offset supplied in TopicPartitionOffset is invalid: " + tpoWithNullOffset);
581+
}
582+
557583
}

0 commit comments

Comments
 (0)