Skip to content

Commit 9136832

Browse files
committed
Fix ConcurrentMessageListenerContainer.isInExpectedState consistency problem
`ConcurrentMessageListenerContainer#isInExpectedState` throw `ConcurrentModificationException` needs simultaneously invoke `KafkaMessageListenerContainer#setStoppedNormally`, `ConcurrentMessageListenerContainer#isInExpectedState`, modify property in stream. `TestOOMError#testOOMCMLC` throw `ConcurrentModificationException`, when assertThat container.isInExpectedState(), because of container maybe is not in expected state. concurrent container publishes one time, child container publishes concurrency time, `CountDownLatch` needs to modify to 2. * Fix `ConcurrentMessageListenerContainer.isInExpectedState` consistency problem. * Fix `TestOOMError#testOOMCMLC` throw `ConcurrentModificationException`.
1 parent 0b321cf commit 9136832

File tree

2 files changed

+15
-7
lines changed

2 files changed

+15
-7
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ && getContainerProperties().isRestartAfterAuthExceptions()
389389
if (exec == null) {
390390
exec = new SimpleAsyncTaskExecutor(getListenerId() + ".authRestart");
391391
}
392-
exec.execute(() -> start());
392+
exec.execute(this::start);
393393
}
394394
}
395395

@@ -477,16 +477,23 @@ public boolean isPartitionPaused(TopicPartition topicPartition) {
477477
public boolean isInExpectedState() {
478478
this.lifecycleLock.lock();
479479
try {
480-
return (isRunning() || isStoppedNormally()) && this.containers
481-
.stream()
482-
.map(container -> container.isInExpectedState())
483-
.allMatch(bool -> Boolean.TRUE.equals(bool));
480+
return (isRunning() || isStoppedNormally()) && isAllMatchInExpectedState();
484481
}
485482
finally {
486483
this.lifecycleLock.unlock();
487484
}
488485
}
489486

487+
// See https://github.com/spring-projects/spring-kafka/pull/3059.
488+
private boolean isAllMatchInExpectedState() {
489+
for (KafkaMessageListenerContainer<K, V> kafkaMessageListenerContainer : this.containers) {
490+
if (!kafkaMessageListenerContainer.isInExpectedState()) {
491+
return false;
492+
}
493+
}
494+
return true;
495+
}
496+
490497
private boolean containsPartition(TopicPartition topicPartition, KafkaMessageListenerContainer<K, V> container) {
491498
Collection<TopicPartition> assignedPartitions = container.getAssignedPartitions();
492499
return assignedPartitions != null && assignedPartitions.contains(topicPartition);

spring-kafka/src/test/java/org/springframework/kafka/listener/TestOOMError.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 the original author or authors.
2+
* Copyright 2018-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -77,7 +77,8 @@ public void testOOMCMLC() throws Exception {
7777
containerProps.setClientId("clientId");
7878
ConcurrentMessageListenerContainer<Integer, String> container =
7979
new ConcurrentMessageListenerContainer<>(cf, containerProps);
80-
CountDownLatch stopLatch = new CountDownLatch(1);
80+
// concurrent container publishes one time, child container publishes concurrency time.
81+
CountDownLatch stopLatch = new CountDownLatch(2);
8182
container.setApplicationEventPublisher(e -> {
8283
if (e instanceof ContainerStoppedEvent) {
8384
stopLatch.countDown();

0 commit comments

Comments
 (0)