Skip to content

Commit 8380958

Browse files
authored
GH-2891: Always MANUAL with null group.id
Resolves #2891 When using manual partition assignment with `null` `group.id` always coerce `AckMode` to `MANUAL`.
1 parent 9f1050a commit 8380958

File tree

3 files changed

+42
-17
lines changed

3 files changed

+42
-17
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/tips.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public static class PartitionFinder {
4444

4545
Using this in conjunction with `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG=earliest` will load all records each time the application is started.
4646
You should also set the container's `AckMode` to `MANUAL` to prevent the container from committing offsets for a `null` consumer group.
47+
Starting with version 3.1, the container will automatically coerce the `AckMode` to `MANUAL` when manual topic assignment is used with no consumer `group.id`.
4748
However, starting with version 2.5.5, as shown above, you can apply an initial offset to all partitions; see xref:kafka/receiving-messages/listener-annotation.adoc#manual-assignment[Explicit Partition Assignment] for more information.
4849

4950
[[ex-jdbc-sync]]

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,10 @@ See xref:kafka/serdes.adoc#error-handling-deserializer[Using `ErrorHandlingDeser
4343
=== Retryable Topics
4444
Change suffix `-retry-5000` to `-retry` when `@RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)`.
4545
If you want to keep suffix `-retry-5000`, use `@RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2")`.
46-
See xref:retrytopic/topic-naming.adoc[Topic Naming] for more information.
46+
See xref:retrytopic/topic-naming.adoc[Topic Naming] for more information.
47+
48+
[[x31-c]]
49+
=== Listener Container Changes
50+
51+
When manually assigning partitions, with a `null` consumer `group.id`, the `AckMode` is now automatically coerced to `MANUAL`.
52+
See xref:tips.adoc#tip-assign-all-parts[Manually Assigning All Partitions] for more information.

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -661,19 +661,17 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
661661

662662
private final boolean autoCommit;
663663

664-
private final boolean isManualAck = this.containerProperties.getAckMode().equals(AckMode.MANUAL);
664+
private final boolean isManualAck;
665665

666-
private final boolean isCountAck = this.containerProperties.getAckMode().equals(AckMode.COUNT)
667-
|| this.containerProperties.getAckMode().equals(AckMode.COUNT_TIME);
666+
private final boolean isCountAck;
668667

669-
private final boolean isTimeOnlyAck = this.containerProperties.getAckMode().equals(AckMode.TIME);
668+
private final boolean isTimeOnlyAck;
670669

671-
private final boolean isManualImmediateAck =
672-
this.containerProperties.getAckMode().equals(AckMode.MANUAL_IMMEDIATE);
670+
private final boolean isManualImmediateAck;
673671

674-
private final boolean isAnyManualAck = this.isManualAck || this.isManualImmediateAck;
672+
private final boolean isAnyManualAck;
675673

676-
private final boolean isRecordAck = this.containerProperties.getAckMode().equals(AckMode.RECORD);
674+
private final boolean isRecordAck;
677675

678676
private final BlockingQueue<ConsumerRecord<K, V>> acks = new LinkedBlockingQueue<>();
679677

@@ -768,15 +766,9 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
768766

769767
private final Set<TopicPartition> pausedPartitions = new HashSet<>();
770768

771-
private final Map<TopicPartition, List<Long>> offsetsInThisBatch =
772-
this.isAnyManualAck && this.containerProperties.isAsyncAcks()
773-
? new HashMap<>()
774-
: null;
769+
private final Map<TopicPartition, List<Long>> offsetsInThisBatch;
775770

776-
private final Map<TopicPartition, List<ConsumerRecord<K, V>>> deferredOffsets =
777-
this.isAnyManualAck && this.containerProperties.isAsyncAcks()
778-
? new HashMap<>()
779-
: null;
771+
private final Map<TopicPartition, List<ConsumerRecord<K, V>>> deferredOffsets;
780772

781773
private final Map<TopicPartition, Long> lastReceivePartition;
782774

@@ -857,6 +849,24 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
857849
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType,
858850
ObservationRegistry observationRegistry) {
859851

852+
AckMode ackMode = determineAckMode();
853+
this.isManualAck = ackMode.equals(AckMode.MANUAL);
854+
this.isCountAck = ackMode.equals(AckMode.COUNT)
855+
|| ackMode.equals(AckMode.COUNT_TIME);
856+
this.isTimeOnlyAck = ackMode.equals(AckMode.TIME);
857+
this.isManualImmediateAck =
858+
ackMode.equals(AckMode.MANUAL_IMMEDIATE);
859+
this.isAnyManualAck = this.isManualAck || this.isManualImmediateAck;
860+
this.isRecordAck = ackMode.equals(AckMode.RECORD);
861+
this.offsetsInThisBatch =
862+
this.isAnyManualAck && this.containerProperties.isAsyncAcks()
863+
? new HashMap<>()
864+
: null;
865+
this.deferredOffsets =
866+
this.isAnyManualAck && this.containerProperties.isAsyncAcks()
867+
? new HashMap<>()
868+
: null;
869+
860870
this.observationRegistry = observationRegistry;
861871
Properties consumerProperties = propertiesFromConsumerPropertyOverrides();
862872
checkGroupInstance(consumerProperties, KafkaMessageListenerContainer.this.consumerFactory);
@@ -950,6 +960,14 @@ else if (listener instanceof MessageListener) {
950960
this.kafkaAdmin = obtainAdmin();
951961
}
952962

963+
private AckMode determineAckMode() {
964+
AckMode ackMode = this.containerProperties.getAckMode();
965+
if (this.consumerGroupId == null && KafkaMessageListenerContainer.this.topicPartitions != null) {
966+
ackMode = AckMode.MANUAL;
967+
}
968+
return ackMode;
969+
}
970+
953971
@Nullable
954972
private Object determineBootstrapServers(Properties consumerProperties) {
955973
Object servers = consumerProperties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);

0 commit comments

Comments
 (0)