Skip to content

Commit cd2ea4c

Browse files
authored
Code Refactoring KafkaMessageListenerContainer
Fixes: spring-projects#3090 * remove `checkAckMode` because `ContainerProperties.ackCount` and `ContainerProperties.ackTime` have init value. * change `ListenerConsumer.offsets` type to Map<TopicPartition, Long>. * replace `ListenerConsumer.setupSubBatchPerPartition` to `ContainerProperties.isSubBatchPerPartition`. * add method `commitOffsets` and `commitOffsetsInTransactions` * remove properties this.commitRecovered. * refactor method `processCommits`. * minor modification in `ackCurrent` * remove the duplicate call to `updatePendingOffsets()` which is already handled in `processAcks()`
1 parent 29f8d18 commit cd2ea4c

File tree

3 files changed

+126
-189
lines changed

3 files changed

+126
-189
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -783,7 +783,7 @@ public void setConsumerStartTimeout(Duration consumerStartTimeout) {
783783
* @since 2.3.2
784784
*/
785785
public boolean isSubBatchPerPartition() {
786-
return this.subBatchPerPartition == null ? false : this.subBatchPerPartition;
786+
return this.subBatchPerPartition != null && this.subBatchPerPartition;
787787
}
788788

789789
/**

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2023 the original author or authors.
2+
* Copyright 2021-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -49,6 +49,8 @@
4949
* @author Gary Russell
5050
* @author Andrii Pelesh
5151
* @author Antonio Tomac
52+
* @author Wang Zhiyang
53+
*
5254
* @since 2.8
5355
*
5456
*/
@@ -245,7 +247,7 @@ public static Exception findRootCause(Exception exception) {
245247
* @since 3.0.10
246248
*/
247249
public static <K, V> boolean checkDeserializer(ConsumerFactory<K, V> consumerFactory,
248-
Properties consumerOverrides, boolean isValue, ClassLoader classLoader) {
250+
Properties consumerOverrides, boolean isValue, @Nullable ClassLoader classLoader) {
249251

250252
Object deser = findDeserializerClass(consumerFactory, consumerOverrides, isValue);
251253
Class<?> deserializer = null;

0 commit comments

Comments
 (0)