diff --git a/build.gradle b/build.gradle
index 6c58ac42cb..15977502eb 100644
--- a/build.gradle
+++ b/build.gradle
@@ -59,7 +59,7 @@ ext {
jaywayJsonPathVersion = '2.8.0'
junit4Version = '4.13.2'
junitJupiterVersion = '5.10.2'
- kafkaVersion = '3.6.1'
+ kafkaVersion = '3.7.0'
kotlinCoroutinesVersion = '1.7.3'
log4jVersion = '2.22.1'
micrometerDocsVersion = '1.0.2'
diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc
index c22b60913c..7cce3be3d3 100644
--- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc
+++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc
@@ -7,14 +7,17 @@
This section covers the changes made from version 3.1 to version 3.2.
For changes in earlier version, see xref:appendix/change-history.adoc[Change History].
+[[x32-kafka-client-version]]
+=== Kafka Client Version
+
+This version requires 3.7.0 `kafka-clients`.
+
[[x32-kafka-streams-iqs-support]]
=== Kafka Streams Interactive Query Support
A new API `KafkaStreamsInteractiveQuerySupport` for accessing queryable stores used in Kafka Streams interactive queries.
See xref:streams.adoc#kafka-streams-iq-support[Kafka Streams Interactive Support] for more details.
-
-
[[x32-tiss]]
=== TransactionIdSuffixStrategy
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java
index 1271228742..5a9792cd1c 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java
@@ -47,6 +47,7 @@
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
@@ -67,6 +68,8 @@
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
+
+
/**
* The {@link ProducerFactory} implementation for a {@code singleton} shared {@link Producer} instance.
*
@@ -113,6 +116,7 @@
* @author Chris Gilbert
* @author Thomas Strauß
* @author Adrian Gygax
+ * @author Soby Chacko
*/
public class DefaultKafkaProducerFactory extends KafkaResourceFactory
implements ProducerFactory, ApplicationContextAware,
@@ -1128,6 +1132,11 @@ public List partitionsFor(String topic) {
return this.delegate.metrics();
}
+ @Override
+ public Uuid clientInstanceId(Duration timeout) {
+ return this.delegate.clientInstanceId(timeout);
+ }
+
@Override
public void initTransactions() {
this.delegate.initTransactions();
diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java
index 23df25d831..d1c9191c7d 100644
--- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java
+++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java
@@ -340,14 +340,14 @@ public void manyTests() throws Exception {
"listenerConsumer.consumer"));
assertThat(
KafkaTestUtils.getPropertyValue(this.listener.listen4Consumer,
- "fetcher.fetchConfig.maxPollRecords", Integer.class))
+ "delegate.fetcher.fetchConfig.maxPollRecords", Integer.class))
.isEqualTo(100);
assertThat(this.quxGroup).hasSize(1);
assertThat(this.quxGroup.get(0)).isSameAs(manualContainer);
List> containers = KafkaTestUtils.getPropertyValue(manualContainer, "containers", List.class);
assertThat(KafkaTestUtils.getPropertyValue(containers.get(0), "listenerConsumer.consumerGroupId"))
.isEqualTo("qux");
- assertThat(KafkaTestUtils.getPropertyValue(containers.get(0), "listenerConsumer.consumer.clientId"))
+ assertThat(KafkaTestUtils.getPropertyValue(containers.get(0), "listenerConsumer.consumer.delegate.clientId"))
.isEqualTo("clientIdViaProps3-0");
template.send("annotated4", 0, "foo");
@@ -371,15 +371,15 @@ public void manyTests() throws Exception {
TopicPartitionOffset[].class)[3];
assertThat(offset.isRelativeToCurrent()).isTrue();
assertThat(KafkaTestUtils.getPropertyValue(fizContainer,
- "listenerConsumer.consumer.groupId", Optional.class).get())
+ "listenerConsumer.consumer.delegate.groupId", Optional.class).get())
.isEqualTo("fiz");
- assertThat(KafkaTestUtils.getPropertyValue(fizContainer, "listenerConsumer.consumer.clientId"))
+ assertThat(KafkaTestUtils.getPropertyValue(fizContainer, "listenerConsumer.consumer.delegate.clientId"))
.isEqualTo("clientIdViaAnnotation-0");
assertThat(KafkaTestUtils.getPropertyValue(fizContainer,
- "listenerConsumer.consumer.fetcher.fetchConfig.maxPollRecords"))
+ "listenerConsumer.consumer.delegate.fetcher.fetchConfig.maxPollRecords"))
.isEqualTo(10);
assertThat(KafkaTestUtils.getPropertyValue(fizContainer,
- "listenerConsumer.consumer.fetcher.fetchConfig.minBytes"))
+ "listenerConsumer.consumer.delegate.fetcher.fetchConfig.minBytes"))
.isEqualTo(420000);
MessageListenerContainer rebalanceConcurrentContainer = registry.getListenerContainer("rebalanceListener");
@@ -402,9 +402,9 @@ public void manyTests() throws Exception {
MessageListenerContainer rebalanceContainer = (MessageListenerContainer) KafkaTestUtils
.getPropertyValue(rebalanceConcurrentContainer, "containers", List.class).get(0);
- assertThat(KafkaTestUtils.getPropertyValue(rebalanceContainer, "listenerConsumer.consumer.groupId"))
+ assertThat(KafkaTestUtils.getPropertyValue(rebalanceContainer, "listenerConsumer.consumer.delegate.groupId"))
.isNotEqualTo("rebalanceListener");
- String clientId = KafkaTestUtils.getPropertyValue(rebalanceContainer, "listenerConsumer.consumer.clientId",
+ String clientId = KafkaTestUtils.getPropertyValue(rebalanceContainer, "listenerConsumer.consumer.delegate.clientId",
String.class);
assertThat(clientId).startsWith("rebal-");
assertThat(clientId.indexOf('-')).isEqualTo(clientId.lastIndexOf('-'));
@@ -535,13 +535,13 @@ public void testJson() throws Exception {
MessageListenerContainer buzContainer = (MessageListenerContainer) KafkaTestUtils
.getPropertyValue(buzConcurrentContainer, "containers", List.class).get(0);
assertThat(KafkaTestUtils.getPropertyValue(buzContainer,
- "listenerConsumer.consumer.groupId", Optional.class).get())
+ "listenerConsumer.consumer.delegate.groupId", Optional.class).get())
.isEqualTo("buz.explicitGroupId");
assertThat(KafkaTestUtils.getPropertyValue(buzContainer,
- "listenerConsumer.consumer.fetcher.fetchConfig.maxPollRecords"))
+ "listenerConsumer.consumer.delegate.fetcher.fetchConfig.maxPollRecords"))
.isEqualTo(5);
assertThat(KafkaTestUtils.getPropertyValue(buzContainer,
- "listenerConsumer.consumer.fetcher.fetchConfig.minBytes"))
+ "listenerConsumer.consumer.delegate.fetcher.fetchConfig.minBytes"))
.isEqualTo(123456);
}
diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java
index 064839f0bd..30002846a5 100644
--- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java
+++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java
@@ -21,6 +21,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
@@ -64,6 +65,7 @@
* @author Chris Gilbert
* @author Artem Bilan
* @author Adrian Gygax
+ * @author Soby Chacko
*
* @since 1.0.6
*/
@@ -484,7 +486,7 @@ public void consumerRemoved(String id, Consumer consumer) {
assertThat(adds).hasSize(1);
assertThat(adds.get(0)).isEqualTo("cf.foo-0");
assertThat(removals).hasSize(0);
- consumer.close();
+ consumer.close(Duration.ofSeconds(10));
assertThat(removals).hasSize(1);
}
diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java
index bfff9e77dc..4db13f4734 100644
--- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java
+++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2023 the original author or authors.
+ * Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -67,6 +67,7 @@
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.kafka.test.utils.KafkaTestUtils;
+import org.springframework.lang.Nullable;
/**
* @author Gary Russell
@@ -75,6 +76,7 @@
* @author Marius Bogoevici
* @author Artem Yakshin
* @author Vladimir Tsanev
+ * @author Soby Chacko
*/
@EmbeddedKafka(topics = { ConcurrentMessageListenerContainerTests.topic1,
ConcurrentMessageListenerContainerTests.topic2,
@@ -230,13 +232,13 @@ public void testAutoCommitWithRebalanceListener() throws Exception {
DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props) {
@Override
- protected Consumer createKafkaConsumer(String groupId, String clientIdPrefix,
- String clientIdSuffixArg, Properties properties) {
+ protected Consumer createKafkaConsumer(@Nullable String groupId, @Nullable String clientIdPrefix,
+ @Nullable String clientIdSuffixArg, @Nullable Properties properties) {
overrides.set(properties);
Consumer created = super.createKafkaConsumer(groupId, clientIdPrefix,
clientIdSuffixArg, properties);
- assertThat(KafkaTestUtils.getPropertyValue(created, "requestTimeoutMs", Long.class)).isEqualTo(23000L);
+ assertThat(KafkaTestUtils.getPropertyValue(created, "delegate.requestTimeoutMs", Integer.class)).isEqualTo(23000);
return created;
}
diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/SerializationIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/SerializationIntegrationTests.java
index 5b71aed27c..dc819ea957 100644
--- a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/SerializationIntegrationTests.java
+++ b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/SerializationIntegrationTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2021 the original author or authors.
+ * Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -63,7 +63,8 @@ void configurePreLoadedDelegates() {
props.setMessageListener(mock(MessageListener.class));
KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cFact, props);
container.start();
- assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer.valueDeserializer"))
+ assertThat(KafkaTestUtils.getPropertyValue(container,
+ "listenerConsumer.consumer.delegate.deserializers.valueDeserializer"))
.isSameAs(delegating);
Map, ?> delegates = KafkaTestUtils.getPropertyValue(delegating, "delegates", Map.class);
assertThat(delegates).hasSize(1);