Skip to content

Commit 9924210

Browse files
garyrussellartembilan
authored andcommitted
GH-1827: Fix Race
- increase idle event interval to prevent sequencer stopping the container too soon. - tighten up synchronization in the sequencer **cherry-pick to 2.7.x**
1 parent 852c447 commit 9924210

File tree

2 files changed

+13
-11
lines changed

2 files changed

+13
-11
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerGroupSequencer.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -140,20 +140,22 @@ public synchronized void onApplicationEvent(ListenerContainerIdleEvent event) {
140140
this.executor.execute(() -> {
141141
LOGGER.debug(() -> "Stopping: " + container);
142142
container.stop(() -> {
143-
if (!parent.isChildRunning()) {
144-
this.executor.execute(() -> {
145-
stopParentAndCheckGroup(parent);
146-
});
143+
synchronized (this) {
144+
if (!parent.isChildRunning()) {
145+
this.executor.execute(() -> {
146+
stopParentAndCheckGroup(parent);
147+
});
148+
}
147149
}
148150
});
149151
});
150152
}
151153
}
152154

153-
private void stopParentAndCheckGroup(MessageListenerContainer parent) {
154-
LOGGER.debug(() -> "Stopping: " + parent);
155-
parent.stop(() -> {
156-
synchronized (this) {
155+
private synchronized void stopParentAndCheckGroup(MessageListenerContainer parent) {
156+
if (parent.isRunning()) {
157+
LOGGER.debug(() -> "Stopping: " + parent);
158+
parent.stop(() -> {
157159
if (this.currentGroup != null) {
158160
LOGGER.debug(() -> "Checking group: " + this.currentGroup.toString());
159161
if (this.currentGroup.allStopped()) {
@@ -167,8 +169,8 @@ private void stopParentAndCheckGroup(MessageListenerContainer parent) {
167169
}
168170
}
169171
}
170-
}
171-
});
172+
});
173+
}
172174
}
173175

174176
@Override

spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerGroupSequencerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public void stopped(ContainerStoppedEvent event) {
137137

138138
@Bean
139139
ContainerGroupSequencer sequencer(KafkaListenerEndpointRegistry registry) {
140-
ContainerGroupSequencer sequencer = new ContainerGroupSequencer(registry, 1000, "g1", "g2");
140+
ContainerGroupSequencer sequencer = new ContainerGroupSequencer(registry, 3000, "g1", "g2");
141141
sequencer.setStopLastGroupWhenIdle(true);
142142
sequencer.setAutoStartup(false);
143143
return sequencer;

0 commit comments

Comments
 (0)