Skip to content

Commit 0cf5f8e

Browse files
committed
spring-projectsGH-3116: Upgrade kafka-client to 3.7.0
Fixes: spring-projects#3116 * Upgrade kafka-client to 3.7.0 * Necessary test changes due to `KafkaConsumer` now uses a delegate based model for consumers to create legacy vs consumer-group based consumers See the following link for more details. https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes * Implement Kafka producer `clientInstanceId()` method in `CloseSafeProducer` * whats-new changes
1 parent 4e9c099 commit 0cf5f8e

File tree

7 files changed

+38
-21
lines changed

7 files changed

+38
-21
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ ext {
5959
jaywayJsonPathVersion = '2.8.0'
6060
junit4Version = '4.13.2'
6161
junitJupiterVersion = '5.10.2'
62-
kafkaVersion = '3.6.1'
62+
kafkaVersion = '3.7.0'
6363
kotlinCoroutinesVersion = '1.7.3'
6464
log4jVersion = '2.22.1'
6565
micrometerDocsVersion = '1.0.2'

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,17 @@
77
This section covers the changes made from version 3.1 to version 3.2.
88
For changes in earlier version, see xref:appendix/change-history.adoc[Change History].
99

10+
[[x32-kafka-client-version]]
11+
=== Kafka Client Version
12+
13+
This version requires 3.7.0 `kafka-clients`.
14+
1015
[[x32-kafka-streams-iqs-support]]
1116
=== Kafka Streams Interactive Query Support
1217

1318
A new API `KafkaStreamsInteractiveQuerySupport` for accessing queryable stores used in Kafka Streams interactive queries.
1419
See xref:streams.adoc#kafka-streams-iq-support[Kafka Streams Interactive Support] for more details.
1520

16-
17-
1821
[[x32-tiss]]
1922
=== TransactionIdSuffixStrategy
2023

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.kafka.common.MetricName;
4848
import org.apache.kafka.common.PartitionInfo;
4949
import org.apache.kafka.common.TopicPartition;
50+
import org.apache.kafka.common.Uuid;
5051
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
5152
import org.apache.kafka.common.errors.ProducerFencedException;
5253
import org.apache.kafka.common.errors.TimeoutException;
@@ -67,6 +68,8 @@
6768
import org.springframework.util.Assert;
6869
import org.springframework.util.StringUtils;
6970

71+
72+
7073
/**
7174
* The {@link ProducerFactory} implementation for a {@code singleton} shared {@link Producer} instance.
7275
* <p>
@@ -113,6 +116,7 @@
113116
* @author Chris Gilbert
114117
* @author Thomas Strauß
115118
* @author Adrian Gygax
119+
* @author Soby Chacko
116120
*/
117121
public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
118122
implements ProducerFactory<K, V>, ApplicationContextAware,
@@ -1128,6 +1132,11 @@ public List<PartitionInfo> partitionsFor(String topic) {
11281132
return this.delegate.metrics();
11291133
}
11301134

1135+
@Override
1136+
public Uuid clientInstanceId(Duration timeout) {
1137+
return this.delegate.clientInstanceId(timeout);
1138+
}
1139+
11311140
@Override
11321141
public void initTransactions() {
11331142
this.delegate.initTransactions();

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -340,14 +340,14 @@ public void manyTests() throws Exception {
340340
"listenerConsumer.consumer"));
341341
assertThat(
342342
KafkaTestUtils.getPropertyValue(this.listener.listen4Consumer,
343-
"fetcher.fetchConfig.maxPollRecords", Integer.class))
343+
"delegate.fetcher.fetchConfig.maxPollRecords", Integer.class))
344344
.isEqualTo(100);
345345
assertThat(this.quxGroup).hasSize(1);
346346
assertThat(this.quxGroup.get(0)).isSameAs(manualContainer);
347347
List<?> containers = KafkaTestUtils.getPropertyValue(manualContainer, "containers", List.class);
348348
assertThat(KafkaTestUtils.getPropertyValue(containers.get(0), "listenerConsumer.consumerGroupId"))
349349
.isEqualTo("qux");
350-
assertThat(KafkaTestUtils.getPropertyValue(containers.get(0), "listenerConsumer.consumer.clientId"))
350+
assertThat(KafkaTestUtils.getPropertyValue(containers.get(0), "listenerConsumer.consumer.delegate.clientId"))
351351
.isEqualTo("clientIdViaProps3-0");
352352

353353
template.send("annotated4", 0, "foo");
@@ -371,15 +371,15 @@ public void manyTests() throws Exception {
371371
TopicPartitionOffset[].class)[3];
372372
assertThat(offset.isRelativeToCurrent()).isTrue();
373373
assertThat(KafkaTestUtils.getPropertyValue(fizContainer,
374-
"listenerConsumer.consumer.groupId", Optional.class).get())
374+
"listenerConsumer.consumer.delegate.groupId", Optional.class).get())
375375
.isEqualTo("fiz");
376-
assertThat(KafkaTestUtils.getPropertyValue(fizContainer, "listenerConsumer.consumer.clientId"))
376+
assertThat(KafkaTestUtils.getPropertyValue(fizContainer, "listenerConsumer.consumer.delegate.clientId"))
377377
.isEqualTo("clientIdViaAnnotation-0");
378378
assertThat(KafkaTestUtils.getPropertyValue(fizContainer,
379-
"listenerConsumer.consumer.fetcher.fetchConfig.maxPollRecords"))
379+
"listenerConsumer.consumer.delegate.fetcher.fetchConfig.maxPollRecords"))
380380
.isEqualTo(10);
381381
assertThat(KafkaTestUtils.getPropertyValue(fizContainer,
382-
"listenerConsumer.consumer.fetcher.fetchConfig.minBytes"))
382+
"listenerConsumer.consumer.delegate.fetcher.fetchConfig.minBytes"))
383383
.isEqualTo(420000);
384384

385385
MessageListenerContainer rebalanceConcurrentContainer = registry.getListenerContainer("rebalanceListener");
@@ -402,9 +402,9 @@ public void manyTests() throws Exception {
402402

403403
MessageListenerContainer rebalanceContainer = (MessageListenerContainer) KafkaTestUtils
404404
.getPropertyValue(rebalanceConcurrentContainer, "containers", List.class).get(0);
405-
assertThat(KafkaTestUtils.getPropertyValue(rebalanceContainer, "listenerConsumer.consumer.groupId"))
405+
assertThat(KafkaTestUtils.getPropertyValue(rebalanceContainer, "listenerConsumer.consumer.delegate.groupId"))
406406
.isNotEqualTo("rebalanceListener");
407-
String clientId = KafkaTestUtils.getPropertyValue(rebalanceContainer, "listenerConsumer.consumer.clientId",
407+
String clientId = KafkaTestUtils.getPropertyValue(rebalanceContainer, "listenerConsumer.consumer.delegate.clientId",
408408
String.class);
409409
assertThat(clientId).startsWith("rebal-");
410410
assertThat(clientId.indexOf('-')).isEqualTo(clientId.lastIndexOf('-'));
@@ -535,13 +535,13 @@ public void testJson() throws Exception {
535535
MessageListenerContainer buzContainer = (MessageListenerContainer) KafkaTestUtils
536536
.getPropertyValue(buzConcurrentContainer, "containers", List.class).get(0);
537537
assertThat(KafkaTestUtils.getPropertyValue(buzContainer,
538-
"listenerConsumer.consumer.groupId", Optional.class).get())
538+
"listenerConsumer.consumer.delegate.groupId", Optional.class).get())
539539
.isEqualTo("buz.explicitGroupId");
540540
assertThat(KafkaTestUtils.getPropertyValue(buzContainer,
541-
"listenerConsumer.consumer.fetcher.fetchConfig.maxPollRecords"))
541+
"listenerConsumer.consumer.delegate.fetcher.fetchConfig.maxPollRecords"))
542542
.isEqualTo(5);
543543
assertThat(KafkaTestUtils.getPropertyValue(buzContainer,
544-
"listenerConsumer.consumer.fetcher.fetchConfig.minBytes"))
544+
"listenerConsumer.consumer.delegate.fetcher.fetchConfig.minBytes"))
545545
.isEqualTo(123456);
546546
}
547547

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.mockito.Mockito.mock;
2222
import static org.mockito.Mockito.verify;
2323

24+
import java.time.Duration;
2425
import java.util.AbstractMap;
2526
import java.util.ArrayList;
2627
import java.util.Collections;
@@ -64,6 +65,7 @@
6465
* @author Chris Gilbert
6566
* @author Artem Bilan
6667
* @author Adrian Gygax
68+
* @author Soby Chacko
6769
*
6870
* @since 1.0.6
6971
*/
@@ -484,7 +486,7 @@ public void consumerRemoved(String id, Consumer consumer) {
484486
assertThat(adds).hasSize(1);
485487
assertThat(adds.get(0)).isEqualTo("cf.foo-0");
486488
assertThat(removals).hasSize(0);
487-
consumer.close();
489+
consumer.close(Duration.ofSeconds(10));
488490
assertThat(removals).hasSize(1);
489491
}
490492

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2023 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -67,6 +67,7 @@
6767
import org.springframework.kafka.test.context.EmbeddedKafka;
6868
import org.springframework.kafka.test.utils.ContainerTestUtils;
6969
import org.springframework.kafka.test.utils.KafkaTestUtils;
70+
import org.springframework.lang.Nullable;
7071

7172
/**
7273
* @author Gary Russell
@@ -75,6 +76,7 @@
7576
* @author Marius Bogoevici
7677
* @author Artem Yakshin
7778
* @author Vladimir Tsanev
79+
* @author Soby Chacko
7880
*/
7981
@EmbeddedKafka(topics = { ConcurrentMessageListenerContainerTests.topic1,
8082
ConcurrentMessageListenerContainerTests.topic2,
@@ -230,13 +232,13 @@ public void testAutoCommitWithRebalanceListener() throws Exception {
230232
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props) {
231233

232234
@Override
233-
protected Consumer<Integer, String> createKafkaConsumer(String groupId, String clientIdPrefix,
234-
String clientIdSuffixArg, Properties properties) {
235+
protected Consumer<Integer, String> createKafkaConsumer(@Nullable String groupId, @Nullable String clientIdPrefix,
236+
@Nullable String clientIdSuffixArg, @Nullable Properties properties) {
235237

236238
overrides.set(properties);
237239
Consumer<Integer, String> created = super.createKafkaConsumer(groupId, clientIdPrefix,
238240
clientIdSuffixArg, properties);
239-
assertThat(KafkaTestUtils.getPropertyValue(created, "requestTimeoutMs", Long.class)).isEqualTo(23000L);
241+
assertThat(KafkaTestUtils.getPropertyValue(created, "delegate.requestTimeoutMs", Integer.class)).isEqualTo(23000);
240242
return created;
241243
}
242244

spring-kafka/src/test/java/org/springframework/kafka/support/serializer/SerializationIntegrationTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -63,7 +63,8 @@ void configurePreLoadedDelegates() {
6363
props.setMessageListener(mock(MessageListener.class));
6464
KafkaMessageListenerContainer<String, Object> container = new KafkaMessageListenerContainer<>(cFact, props);
6565
container.start();
66-
assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer.valueDeserializer"))
66+
assertThat(KafkaTestUtils.getPropertyValue(container,
67+
"listenerConsumer.consumer.delegate.deserializers.valueDeserializer"))
6768
.isSameAs(delegating);
6869
Map<?, ?> delegates = KafkaTestUtils.getPropertyValue(delegating, "delegates", Map.class);
6970
assertThat(delegates).hasSize(1);

0 commit comments

Comments
 (0)