Skip to content

Commit f55bbbb

Browse files
committed
Fix ConcurrentMessageListenerContainer.isInExpectedState consistency problem
`ConcurrentMessageListenerContainer#isInExpectedState` throw `ConcurrentModificationException` needs simultaneously invoke `KafkaMessageListenerContainer#setStoppedNormally`, `ConcurrentMessageListenerContainer#isInExpectedState`, modify property in stream. `TestOOMError#testOOMCMLC` throw `ConcurrentModificationException`, when assertThat container.isInExpectedState(), because of container maybe is not in expected state. concurrent container publishes one time, child container publishes concurrency time, `CountDownLatch` needs to modify to 2. * Fix `ConcurrentMessageListenerContainer.isInExpectedState` consistency problem * Fix `TestOOMError#testOOMCMLC` throw `ConcurrentModificationException`
1 parent 0b321cf commit f55bbbb

File tree

2 files changed

+12
-6
lines changed

2 files changed

+12
-6
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,13 @@ protected void doStop(final Runnable callback, boolean normal) {
368368
}
369369
}
370370
this.containers.clear();
371-
setStoppedNormally(normal);
371+
this.lifecycleLock.lock();
372+
try {
373+
setStoppedNormally(normal);
374+
}
375+
finally {
376+
this.lifecycleLock.unlock();
377+
}
372378
}
373379
}
374380

@@ -389,7 +395,7 @@ && getContainerProperties().isRestartAfterAuthExceptions()
389395
if (exec == null) {
390396
exec = new SimpleAsyncTaskExecutor(getListenerId() + ".authRestart");
391397
}
392-
exec.execute(() -> start());
398+
exec.execute(this::start);
393399
}
394400
}
395401

@@ -479,8 +485,7 @@ public boolean isInExpectedState() {
479485
try {
480486
return (isRunning() || isStoppedNormally()) && this.containers
481487
.stream()
482-
.map(container -> container.isInExpectedState())
483-
.allMatch(bool -> Boolean.TRUE.equals(bool));
488+
.allMatch(KafkaMessageListenerContainer::isInExpectedState);
484489
}
485490
finally {
486491
this.lifecycleLock.unlock();

spring-kafka/src/test/java/org/springframework/kafka/listener/TestOOMError.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 the original author or authors.
2+
* Copyright 2018-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -77,7 +77,8 @@ public void testOOMCMLC() throws Exception {
7777
containerProps.setClientId("clientId");
7878
ConcurrentMessageListenerContainer<Integer, String> container =
7979
new ConcurrentMessageListenerContainer<>(cf, containerProps);
80-
CountDownLatch stopLatch = new CountDownLatch(1);
80+
// concurrent container publishes one time, child container publishes concurrency time.
81+
CountDownLatch stopLatch = new CountDownLatch(2);
8182
container.setApplicationEventPublisher(e -> {
8283
if (e instanceof ContainerStoppedEvent) {
8384
stopLatch.countDown();

0 commit comments

Comments
 (0)