diff --git a/build.gradle b/build.gradle index 6c58ac42cb..15977502eb 100644 --- a/build.gradle +++ b/build.gradle @@ -59,7 +59,7 @@ ext { jaywayJsonPathVersion = '2.8.0' junit4Version = '4.13.2' junitJupiterVersion = '5.10.2' - kafkaVersion = '3.6.1' + kafkaVersion = '3.7.0' kotlinCoroutinesVersion = '1.7.3' log4jVersion = '2.22.1' micrometerDocsVersion = '1.0.2' diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index c22b60913c..7cce3be3d3 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -7,14 +7,17 @@ This section covers the changes made from version 3.1 to version 3.2. For changes in earlier version, see xref:appendix/change-history.adoc[Change History]. +[[x32-kafka-client-version]] +=== Kafka Client Version + +This version requires 3.7.0 `kafka-clients`. + [[x32-kafka-streams-iqs-support]] === Kafka Streams Interactive Query Support A new API `KafkaStreamsInteractiveQuerySupport` for accessing queryable stores used in Kafka Streams interactive queries. See xref:streams.adoc#kafka-streams-iq-support[Kafka Streams Interactive Support] for more details. - - [[x32-tiss]] === TransactionIdSuffixStrategy diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java index 1271228742..5a9792cd1c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java @@ -47,6 +47,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.TimeoutException; @@ -67,6 +68,8 @@ import org.springframework.util.Assert; import org.springframework.util.StringUtils; + + /** * The {@link ProducerFactory} implementation for a {@code singleton} shared {@link Producer} instance. *

@@ -113,6 +116,7 @@ * @author Chris Gilbert * @author Thomas Strauß * @author Adrian Gygax + * @author Soby Chacko */ public class DefaultKafkaProducerFactory extends KafkaResourceFactory implements ProducerFactory, ApplicationContextAware, @@ -1128,6 +1132,11 @@ public List partitionsFor(String topic) { return this.delegate.metrics(); } + @Override + public Uuid clientInstanceId(Duration timeout) { + return this.delegate.clientInstanceId(timeout); + } + @Override public void initTransactions() { this.delegate.initTransactions(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index 23df25d831..d1c9191c7d 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -340,14 +340,14 @@ public void manyTests() throws Exception { "listenerConsumer.consumer")); assertThat( KafkaTestUtils.getPropertyValue(this.listener.listen4Consumer, - "fetcher.fetchConfig.maxPollRecords", Integer.class)) + "delegate.fetcher.fetchConfig.maxPollRecords", Integer.class)) .isEqualTo(100); assertThat(this.quxGroup).hasSize(1); assertThat(this.quxGroup.get(0)).isSameAs(manualContainer); List containers = KafkaTestUtils.getPropertyValue(manualContainer, "containers", List.class); assertThat(KafkaTestUtils.getPropertyValue(containers.get(0), "listenerConsumer.consumerGroupId")) .isEqualTo("qux"); - assertThat(KafkaTestUtils.getPropertyValue(containers.get(0), "listenerConsumer.consumer.clientId")) + assertThat(KafkaTestUtils.getPropertyValue(containers.get(0), "listenerConsumer.consumer.delegate.clientId")) .isEqualTo("clientIdViaProps3-0"); template.send("annotated4", 0, "foo"); @@ -371,15 +371,15 @@ public void manyTests() throws Exception { TopicPartitionOffset[].class)[3]; assertThat(offset.isRelativeToCurrent()).isTrue(); assertThat(KafkaTestUtils.getPropertyValue(fizContainer, - "listenerConsumer.consumer.groupId", Optional.class).get()) + "listenerConsumer.consumer.delegate.groupId", Optional.class).get()) .isEqualTo("fiz"); - assertThat(KafkaTestUtils.getPropertyValue(fizContainer, "listenerConsumer.consumer.clientId")) + assertThat(KafkaTestUtils.getPropertyValue(fizContainer, "listenerConsumer.consumer.delegate.clientId")) .isEqualTo("clientIdViaAnnotation-0"); assertThat(KafkaTestUtils.getPropertyValue(fizContainer, - "listenerConsumer.consumer.fetcher.fetchConfig.maxPollRecords")) + "listenerConsumer.consumer.delegate.fetcher.fetchConfig.maxPollRecords")) .isEqualTo(10); assertThat(KafkaTestUtils.getPropertyValue(fizContainer, - "listenerConsumer.consumer.fetcher.fetchConfig.minBytes")) + "listenerConsumer.consumer.delegate.fetcher.fetchConfig.minBytes")) .isEqualTo(420000); MessageListenerContainer rebalanceConcurrentContainer = registry.getListenerContainer("rebalanceListener"); @@ -402,9 +402,9 @@ public void manyTests() throws Exception { MessageListenerContainer rebalanceContainer = (MessageListenerContainer) KafkaTestUtils .getPropertyValue(rebalanceConcurrentContainer, "containers", List.class).get(0); - assertThat(KafkaTestUtils.getPropertyValue(rebalanceContainer, "listenerConsumer.consumer.groupId")) + assertThat(KafkaTestUtils.getPropertyValue(rebalanceContainer, "listenerConsumer.consumer.delegate.groupId")) .isNotEqualTo("rebalanceListener"); - String clientId = KafkaTestUtils.getPropertyValue(rebalanceContainer, "listenerConsumer.consumer.clientId", + String clientId = KafkaTestUtils.getPropertyValue(rebalanceContainer, "listenerConsumer.consumer.delegate.clientId", String.class); assertThat(clientId).startsWith("rebal-"); assertThat(clientId.indexOf('-')).isEqualTo(clientId.lastIndexOf('-')); @@ -535,13 +535,13 @@ public void testJson() throws Exception { MessageListenerContainer buzContainer = (MessageListenerContainer) KafkaTestUtils .getPropertyValue(buzConcurrentContainer, "containers", List.class).get(0); assertThat(KafkaTestUtils.getPropertyValue(buzContainer, - "listenerConsumer.consumer.groupId", Optional.class).get()) + "listenerConsumer.consumer.delegate.groupId", Optional.class).get()) .isEqualTo("buz.explicitGroupId"); assertThat(KafkaTestUtils.getPropertyValue(buzContainer, - "listenerConsumer.consumer.fetcher.fetchConfig.maxPollRecords")) + "listenerConsumer.consumer.delegate.fetcher.fetchConfig.maxPollRecords")) .isEqualTo(5); assertThat(KafkaTestUtils.getPropertyValue(buzContainer, - "listenerConsumer.consumer.fetcher.fetchConfig.minBytes")) + "listenerConsumer.consumer.delegate.fetcher.fetchConfig.minBytes")) .isEqualTo(123456); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java index 064839f0bd..30002846a5 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java @@ -21,6 +21,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import java.time.Duration; import java.util.AbstractMap; import java.util.ArrayList; import java.util.Collections; @@ -64,6 +65,7 @@ * @author Chris Gilbert * @author Artem Bilan * @author Adrian Gygax + * @author Soby Chacko * * @since 1.0.6 */ @@ -484,7 +486,7 @@ public void consumerRemoved(String id, Consumer consumer) { assertThat(adds).hasSize(1); assertThat(adds.get(0)).isEqualTo("cf.foo-0"); assertThat(removals).hasSize(0); - consumer.close(); + consumer.close(Duration.ofSeconds(10)); assertThat(removals).hasSize(1); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java index bfff9e77dc..4db13f4734 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -67,6 +67,7 @@ import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.utils.ContainerTestUtils; import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.lang.Nullable; /** * @author Gary Russell @@ -75,6 +76,7 @@ * @author Marius Bogoevici * @author Artem Yakshin * @author Vladimir Tsanev + * @author Soby Chacko */ @EmbeddedKafka(topics = { ConcurrentMessageListenerContainerTests.topic1, ConcurrentMessageListenerContainerTests.topic2, @@ -230,13 +232,13 @@ public void testAutoCommitWithRebalanceListener() throws Exception { DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props) { @Override - protected Consumer createKafkaConsumer(String groupId, String clientIdPrefix, - String clientIdSuffixArg, Properties properties) { + protected Consumer createKafkaConsumer(@Nullable String groupId, @Nullable String clientIdPrefix, + @Nullable String clientIdSuffixArg, @Nullable Properties properties) { overrides.set(properties); Consumer created = super.createKafkaConsumer(groupId, clientIdPrefix, clientIdSuffixArg, properties); - assertThat(KafkaTestUtils.getPropertyValue(created, "requestTimeoutMs", Long.class)).isEqualTo(23000L); + assertThat(KafkaTestUtils.getPropertyValue(created, "delegate.requestTimeoutMs", Integer.class)).isEqualTo(23000); return created; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/SerializationIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/SerializationIntegrationTests.java index 5b71aed27c..dc819ea957 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/SerializationIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/SerializationIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -63,7 +63,8 @@ void configurePreLoadedDelegates() { props.setMessageListener(mock(MessageListener.class)); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cFact, props); container.start(); - assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer.valueDeserializer")) + assertThat(KafkaTestUtils.getPropertyValue(container, + "listenerConsumer.consumer.delegate.deserializers.valueDeserializer")) .isSameAs(delegating); Map delegates = KafkaTestUtils.getPropertyValue(delegating, "delegates", Map.class); assertThat(delegates).hasSize(1);