Skip to content

Commit e9f7780

Browse files
committed
Fix race condition in DebeziumMessProducerTests
The `then(debeziumEngineMock).should().run()` cannot be just checked after `debeziumMessageProducer.start()`: the `DebeziumEngine` is really started on a separate thread. * Check for a `run()` interaction with the mock already after calling `debeziumMessageProducer.stop()`. The `stop()` waits for an internal `latch` which is fulfilled when `DebeziumEngine` exists from its `run()` cycle * Rename `DebeziumMessageProducer.latch` to `lifecycleLatch` to give it more sense.
1 parent c24d10c commit e9f7780

File tree

2 files changed

+11
-7
lines changed

2 files changed

+11
-7
lines changed

spring-integration-debezium/src/main/java/org/springframework/integration/debezium/inbound/DebeziumMessageProducer.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public class DebeziumMessageProducer extends MessageProducerSupport {
7777

7878
private ThreadFactory threadFactory;
7979

80-
private volatile CountDownLatch latch = new CountDownLatch(0);
80+
private volatile CountDownLatch lifecycleLatch = new CountDownLatch(0);
8181

8282
/**
8383
* Create new Debezium message producer inbound channel adapter.
@@ -174,10 +174,10 @@ protected void onInit() {
174174

175175
@Override
176176
protected void doStart() {
177-
if (this.latch.getCount() > 0) {
177+
if (this.lifecycleLatch.getCount() > 0) {
178178
return;
179179
}
180-
this.latch = new CountDownLatch(1);
180+
this.lifecycleLatch = new CountDownLatch(1);
181181
this.executorService.execute(() -> {
182182
try {
183183
// Runs the debezium connector and deliver database changes to the registered consumer. This method
@@ -191,7 +191,7 @@ protected void doStart() {
191191
this.debeziumEngine.run();
192192
}
193193
finally {
194-
this.latch.countDown();
194+
this.lifecycleLatch.countDown();
195195
}
196196
});
197197
}
@@ -205,7 +205,7 @@ protected void doStop() {
205205
logger.warn(e, "Debezium failed to close!");
206206
}
207207
try {
208-
if (!this.latch.await(5, TimeUnit.SECONDS)) {
208+
if (!this.lifecycleLatch.await(5, TimeUnit.SECONDS)) {
209209
throw new IllegalStateException("Failed to stop " + this);
210210
}
211211
}

spring-integration-debezium/src/test/java/org/springframework/integration/debezium/inbound/DebeziumMessageProducerTests.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838

3939
/**
4040
* @author Christian Tzolov
41+
* @author Artem Bilan
4142
*
4243
* @since 6.2
4344
*/
@@ -76,12 +77,15 @@ public void testDebeziumMessageProducerLifecycle() throws IOException {
7677

7778
await().atMost(5, TimeUnit.SECONDS).until(() -> debeziumMessageProducer.isRunning());
7879
assertThat(debeziumMessageProducer.isActive()).isEqualTo(true);
79-
then(debeziumEngineMock).should().run();
8080

8181
debeziumMessageProducer.stop(); // STOP
8282

8383
assertThat(debeziumMessageProducer.isActive()).isEqualTo(false);
8484
assertThat(debeziumMessageProducer.isRunning()).isEqualTo(false);
85+
86+
// The DebeziumEngine is started on a different thread.
87+
// Only the way to catch the run() mock is to stop DebeziumMessageProducer and wait for its internal latch
88+
then(debeziumEngineMock).should().run();
8589
then(debeziumEngineMock).should().close();
8690

8791
reset(debeziumEngineMock);
@@ -90,10 +94,10 @@ public void testDebeziumMessageProducerLifecycle() throws IOException {
9094

9195
await().atMost(5, TimeUnit.SECONDS).until(() -> debeziumMessageProducer.isRunning());
9296
assertThat(debeziumMessageProducer.isActive()).isEqualTo(true);
93-
then(debeziumEngineMock).should().run();
9497

9598
debeziumMessageProducer.destroy(); // DESTROY
9699

100+
then(debeziumEngineMock).should().run();
97101
then(debeziumEngineMock).should().close();
98102
}
99103

0 commit comments

Comments
 (0)