Skip to content

Commit ab871ab

Browse files
gurpiarbassigaryrussell
authored andcommitted
GH-2240: Bug fix for KafkaTemplate.receive(..)
Fixes GH-2240 (#2240) - Ensures that we dont get ConsumerRecords with nulls in the list - Ensures that we do not allow null offsets
1 parent 920089b commit ab871ab

File tree

2 files changed

+37
-4
lines changed

2 files changed

+37
-4
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@
7070
import org.springframework.util.concurrent.ListenableFuture;
7171
import org.springframework.util.concurrent.SettableListenableFuture;
7272

73-
7473
/**
7574
* A template for executing high-level operations. When used with a
7675
* {@link DefaultKafkaProducerFactory}, the template is thread-safe. The producer factory
@@ -88,6 +87,7 @@
8887
* @author Endika Gutierrez
8988
* @author Thomas Strauß
9089
* @author Soby Chacko
90+
* @author Gurps Bassi
9191
*/
9292
public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationContextAware, BeanNameAware,
9393
ApplicationListener<ContextStoppedEvent>, DisposableBean {
@@ -570,8 +570,14 @@ public ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested,
570570
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new LinkedHashMap<>();
571571
try (Consumer<K, V> consumer = this.consumerFactory.createConsumer(null, null, null, props)) {
572572
requested.forEach(tpo -> {
573+
if (tpo.getOffset() == null || tpo.getOffset() < 0) {
574+
throw new KafkaException("Offset supplied in TopicPartitionOffset is invalid: " + tpo);
575+
}
573576
ConsumerRecord<K, V> one = receiveOne(tpo.getTopicPartition(), tpo.getOffset(), pollTimeout, consumer);
574-
records.computeIfAbsent(tpo.getTopicPartition(), tp -> new ArrayList<>()).add(one);
577+
List<ConsumerRecord<K, V>> consumerRecords = records.computeIfAbsent(tpo.getTopicPartition(), tp -> new ArrayList<>());
578+
if (one != null) {
579+
consumerRecords.add(one);
580+
}
575581
});
576582
return new ConsumerRecords<>(records);
577583
}

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@
7171
import org.junit.jupiter.api.AfterAll;
7272
import org.junit.jupiter.api.BeforeAll;
7373
import org.junit.jupiter.api.Test;
74+
import org.junit.jupiter.params.ParameterizedTest;
75+
import org.junit.jupiter.params.provider.NullSource;
76+
import org.junit.jupiter.params.provider.ValueSource;
7477
import org.mockito.InOrder;
7578
import org.mockito.Mockito;
7679

@@ -104,6 +107,7 @@
104107
* @author Endika Gutierrez
105108
* @author Thomas Strauß
106109
* @author Soby Chacko
110+
* @author Gurps Bassi
107111
*/
108112
@EmbeddedKafka(topics = { KafkaTemplateTests.INT_KEY_TOPIC, KafkaTemplateTests.STRING_KEY_TOPIC })
109113
public class KafkaTemplateTests {
@@ -163,6 +167,12 @@ void testTemplate() {
163167

164168
template.setDefaultTopic(INT_KEY_TOPIC);
165169

170+
template.setConsumerFactory(
171+
new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps("xx", "false", embeddedKafka)));
172+
ConsumerRecords<Integer, String> initialRecords =
173+
template.receive(Collections.singleton(new TopicPartitionOffset(INT_KEY_TOPIC, 1, 1L)));
174+
assertThat(initialRecords).isEmpty();
175+
166176
template.sendDefault("foo");
167177
assertThat(KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC)).has(value("foo"));
168178

@@ -201,8 +211,7 @@ void testTemplate() {
201211
assertThat(partitions).isNotNull();
202212
assertThat(partitions).hasSize(2);
203213
assertThat(KafkaTestUtils.getPropertyValue(pf.createProducer(), "delegate")).isSameAs(wrapped.get());
204-
template.setConsumerFactory(
205-
new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps("xx", "false", embeddedKafka)));
214+
206215
ConsumerRecord<Integer, String> receive = template.receive(INT_KEY_TOPIC, 1, received.offset());
207216
assertThat(receive).has(allOf(keyValue(2, "buz"), partition(1)))
208217
.extracting(ConsumerRecord::offset)
@@ -621,4 +630,22 @@ void testCompositeProducerInterceptor() {
621630
inOrder.verify(producerInterceptor2).onAcknowledgement(any(RecordMetadata.class), Mockito.isNull());
622631
}
623632

633+
@ParameterizedTest(name = "{0} is invalid")
634+
@NullSource
635+
@ValueSource(longs = -1)
636+
void testReceiveWhenOffsetIsInvalid(Long offset) {
637+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
638+
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
639+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
640+
641+
template.setConsumerFactory(
642+
new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps("xx", "false", embeddedKafka)));
643+
TopicPartitionOffset tpoWithNullOffset = new TopicPartitionOffset(INT_KEY_TOPIC, 1, offset);
644+
645+
assertThatExceptionOfType(KafkaException.class)
646+
.isThrownBy(() -> template.receive(Collections.singleton(tpoWithNullOffset)))
647+
.withMessage("Offset supplied in TopicPartitionOffset is invalid: " + tpoWithNullOffset);
648+
}
649+
650+
624651
}

0 commit comments

Comments
 (0)