Skip to content

Commit bbaecbc

Browse files
tbarabanovartembilan
authored andcommitted
GH-3726: Fix KafkaMessageListenerContainer for ConcurrentModificationException
Fixes: #3726 Issue link: #3726 `KafkaMessageListenerContainer.getAssignedPartitions()` is not safe due to the fact that different threads can iterate/modify any of the fields `partitionsListenerConsumer.definedPartitions` or `partitionsListenerConsumer.assignedPartitions` simultaneously, but collection types of these fields are not designed for such scenarios. Thus at least `ConcurrentModificationException` can be thrown. * Wrap `partitionsListenerConsumer.definedPartitions` and `partitionsListenerConsumer.assignedPartitions` into `Collections.synchronizedSet()` Signed-off-by: Tim Barabanov <[email protected]> [[email protected] Fix commit message] # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java Signed-off-by: Artem Bilan <[email protected]>
1 parent a628704 commit bbaecbc

File tree

1 file changed

+5
-3
lines changed

1 file changed

+5
-3
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2024 the original author or authors.
2+
* Copyright 2016-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -164,6 +164,7 @@
164164
* @author Raphael Rösch
165165
* @author Christian Mergenthaler
166166
* @author Mikael Carlstedt
167+
* @author Timofey Barabanov
167168
*/
168169
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
169170
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
@@ -617,7 +618,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
617618

618619
private final Map<TopicPartition, Long> offsets = new LinkedHashMap<>();
619620

620-
private final Collection<TopicPartition> assignedPartitions = new LinkedHashSet<>();
621+
private final Collection<TopicPartition> assignedPartitions = Collections.synchronizedSet(new LinkedHashSet<>());
621622

622623
private final Map<TopicPartition, OffsetAndMetadata> lastCommits = new HashMap<>();
623624

@@ -1219,7 +1220,8 @@ private void subscribeOrAssignTopics(final Consumer<? super K, ? super V> subscr
12191220
else {
12201221
List<TopicPartitionOffset> topicPartitionsToAssign =
12211222
Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
1222-
this.definedPartitions = new LinkedHashMap<>(topicPartitionsToAssign.size());
1223+
this.definedPartitions = Collections.synchronizedMap(
1224+
new LinkedHashMap<>(topicPartitionsToAssign.size()));
12231225
for (TopicPartitionOffset topicPartition : topicPartitionsToAssign) {
12241226
this.definedPartitions.put(topicPartition.getTopicPartition(),
12251227
new OffsetMetadata(topicPartition.getOffset(), topicPartition.isRelativeToCurrent(),

0 commit comments

Comments
 (0)