Skip to content

Commit 144fa1e

Browse files
Wzy19930507spring-builds
authored andcommitted
CMLC#isInExpectedState consistency problem
Fixes: #3063 * change stream to for loops, prevent `ConcurrentModificationException` from simultaneously invoking `KafkaMessageListenerContainer#setStoppedNormally` and `CMLC#isInExpectedState` * modify property isInExpectedState when invoking stream.allMatch. This was causing list size to change and thus throwing `ConcurrentModificationException` by `ArrayList$ArrayListSpliterator#tryAdvance`. (cherry picked from commit ee835ce)
1 parent 978a0c5 commit 144fa1e

File tree

1 file changed

+19
-12
lines changed

1 file changed

+19
-12
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.Map;
2626
import java.util.Objects;
2727
import java.util.concurrent.atomic.AtomicInteger;
28-
import java.util.stream.Collectors;
2928

3029
import org.apache.kafka.common.Metric;
3130
import org.apache.kafka.common.MetricName;
@@ -120,12 +119,12 @@ public void setAlwaysClientIdSuffix(boolean alwaysClientIdSuffix) {
120119
public List<KafkaMessageListenerContainer<K, V>> getContainers() {
121120
this.lifecycleLock.lock();
122121
try {
123-
return Collections.unmodifiableList(new ArrayList<>(this.containers));
122+
return List.copyOf(this.containers);
124123
}
125124
finally {
126125
this.lifecycleLock.unlock();
127126
}
128-
}
127+
}
129128

130129
@Override
131130
public MessageListenerContainer getContainerFor(String topic, int partition) {
@@ -156,7 +155,7 @@ public Collection<TopicPartition> getAssignedPartitions() {
156155
.map(KafkaMessageListenerContainer::getAssignedPartitions)
157156
.filter(Objects::nonNull)
158157
.flatMap(Collection::stream)
159-
.collect(Collectors.toList());
158+
.toList();
160159
}
161160
finally {
162161
this.lifecycleLock.unlock();
@@ -258,7 +257,6 @@ protected void doStart() {
258257
}
259258
}
260259

261-
@SuppressWarnings("deprecation")
262260
private void configureChildContainer(int index, KafkaMessageListenerContainer<K, V> container) {
263261
String beanName = getBeanName();
264262
beanName = (beanName == null ? "consumer" : beanName) + "-" + index;
@@ -307,13 +305,17 @@ private KafkaMessageListenerContainer<K, V> constructContainer(ContainerProperti
307305
return container;
308306
}
309307

308+
@Nullable
310309
private TopicPartitionOffset[] partitionSubset(ContainerProperties containerProperties, int index) {
311310
TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
311+
if (topicPartitions == null) {
312+
return null;
313+
}
312314
if (this.concurrency == 1) {
313-
return topicPartitions; // NOSONAR
315+
return topicPartitions;
314316
}
315317
else {
316-
int numPartitions = topicPartitions.length; // NOSONAR
318+
int numPartitions = topicPartitions.length;
317319
if (numPartitions == this.concurrency) {
318320
return new TopicPartitionOffset[] { topicPartitions[index] };
319321
}
@@ -388,7 +390,7 @@ && getContainerProperties().isRestartAfterAuthExceptions()
388390
if (exec == null) {
389391
exec = new SimpleAsyncTaskExecutor(getListenerId() + ".authRestart");
390392
}
391-
exec.execute(() -> start());
393+
exec.execute(this::start);
392394
}
393395
}
394396

@@ -476,10 +478,15 @@ public boolean isPartitionPaused(TopicPartition topicPartition) {
476478
public boolean isInExpectedState() {
477479
this.lifecycleLock.lock();
478480
try {
479-
return (isRunning() || isStoppedNormally()) && this.containers
480-
.stream()
481-
.map(container -> container.isInExpectedState())
482-
.allMatch(bool -> Boolean.TRUE.equals(bool));
481+
boolean isInExpectedState = isRunning() || isStoppedNormally();
482+
if (isInExpectedState) {
483+
for (KafkaMessageListenerContainer<K, V> container : this.containers) {
484+
if (!container.isInExpectedState()) {
485+
return false;
486+
}
487+
}
488+
}
489+
return isInExpectedState;
483490
}
484491
finally {
485492
this.lifecycleLock.unlock();

0 commit comments

Comments
 (0)