Skip to content

Commit ee835ce

Browse files
authored
CMLC#isInExpectedState consistency problem
Fixes: #3063 * change stream to for loops, prevent `ConcurrentModificationException` from simultaneously invoking `KafkaMessageListenerContainer#setStoppedNormally` and `CMLC#isInExpectedState` * modify property isInExpectedState when invoking stream.allMatch. This was causing list size to change and thus throwing `ConcurrentModificationException` by `ArrayList$ArrayListSpliterator#tryAdvance`. **Auto-cherry-pick to `3.1.x`**
1 parent 9260145 commit ee835ce

File tree

1 file changed

+19
-12
lines changed

1 file changed

+19
-12
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.Map;
2626
import java.util.Objects;
2727
import java.util.concurrent.atomic.AtomicInteger;
28-
import java.util.stream.Collectors;
2928

3029
import org.apache.kafka.common.Metric;
3130
import org.apache.kafka.common.MetricName;
@@ -121,12 +120,12 @@ public void setAlwaysClientIdSuffix(boolean alwaysClientIdSuffix) {
121120
public List<KafkaMessageListenerContainer<K, V>> getContainers() {
122121
this.lifecycleLock.lock();
123122
try {
124-
return Collections.unmodifiableList(new ArrayList<>(this.containers));
123+
return List.copyOf(this.containers);
125124
}
126125
finally {
127126
this.lifecycleLock.unlock();
128127
}
129-
}
128+
}
130129

131130
@Override
132131
public MessageListenerContainer getContainerFor(String topic, int partition) {
@@ -157,7 +156,7 @@ public Collection<TopicPartition> getAssignedPartitions() {
157156
.map(KafkaMessageListenerContainer::getAssignedPartitions)
158157
.filter(Objects::nonNull)
159158
.flatMap(Collection::stream)
160-
.collect(Collectors.toList());
159+
.toList();
161160
}
162161
finally {
163162
this.lifecycleLock.unlock();
@@ -259,7 +258,6 @@ protected void doStart() {
259258
}
260259
}
261260

262-
@SuppressWarnings("deprecation")
263261
private void configureChildContainer(int index, KafkaMessageListenerContainer<K, V> container) {
264262
String beanName = getBeanName();
265263
beanName = (beanName == null ? "consumer" : beanName) + "-" + index;
@@ -308,13 +306,17 @@ private KafkaMessageListenerContainer<K, V> constructContainer(ContainerProperti
308306
return container;
309307
}
310308

309+
@Nullable
311310
private TopicPartitionOffset[] partitionSubset(ContainerProperties containerProperties, int index) {
312311
TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
312+
if (topicPartitions == null) {
313+
return null;
314+
}
313315
if (this.concurrency == 1) {
314-
return topicPartitions; // NOSONAR
316+
return topicPartitions;
315317
}
316318
else {
317-
int numPartitions = topicPartitions.length; // NOSONAR
319+
int numPartitions = topicPartitions.length;
318320
if (numPartitions == this.concurrency) {
319321
return new TopicPartitionOffset[] { topicPartitions[index] };
320322
}
@@ -389,7 +391,7 @@ && getContainerProperties().isRestartAfterAuthExceptions()
389391
if (exec == null) {
390392
exec = new SimpleAsyncTaskExecutor(getListenerId() + ".authRestart");
391393
}
392-
exec.execute(() -> start());
394+
exec.execute(this::start);
393395
}
394396
}
395397

@@ -477,10 +479,15 @@ public boolean isPartitionPaused(TopicPartition topicPartition) {
477479
public boolean isInExpectedState() {
478480
this.lifecycleLock.lock();
479481
try {
480-
return (isRunning() || isStoppedNormally()) && this.containers
481-
.stream()
482-
.map(container -> container.isInExpectedState())
483-
.allMatch(bool -> Boolean.TRUE.equals(bool));
482+
boolean isInExpectedState = isRunning() || isStoppedNormally();
483+
if (isInExpectedState) {
484+
for (KafkaMessageListenerContainer<K, V> container : this.containers) {
485+
if (!container.isInExpectedState()) {
486+
return false;
487+
}
488+
}
489+
}
490+
return isInExpectedState;
484491
}
485492
finally {
486493
this.lifecycleLock.unlock();

0 commit comments

Comments
 (0)