Skip to content

Commit fea221f

Browse files
garyrussellartembilan
authored andcommitted
GH-2197: Fix Container State After Fatal Stop
Resolves #2197 When a container is stopped due to any fatal error, `isInExpectedState()` should return false. **cherry-pick to 2.8.x, 2.7.x**
1 parent 94721d5 commit fea221f

File tree

3 files changed

+57
-25
lines changed

3 files changed

+57
-25
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,6 @@ public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
169169
private String clientIdSuffix;
170170

171171
private Runnable emergencyStop = () -> stopAbnormally(() -> {
172-
// NOSONAR
173172
});
174173

175174
private volatile ListenerConsumer listenerConsumer;
@@ -226,7 +225,7 @@ public KafkaMessageListenerContainer(ConsumerFactory<? super K, ? super V> consu
226225
}
227226

228227
/**
229-
* Set a {@link Runnable} to call whenever an {@link Error} occurs on a listener
228+
* Set a {@link Runnable} to call whenever a fatal error occurs on the listener
230229
* thread.
231230
* @param emergencyStop the Runnable.
232231
* @since 2.2.1
@@ -1290,11 +1289,8 @@ public void run() {
12901289
exitThrowable = e;
12911290
}
12921291
catch (Error e) { // NOSONAR - rethrown
1293-
Runnable runnable = KafkaMessageListenerContainer.this.emergencyStop;
1294-
if (runnable != null) {
1295-
runnable.run();
1296-
}
12971292
this.logger.error(e, "Stopping container due to an Error");
1293+
this.fatalError = true;
12981294
wrapUp(e);
12991295
throw e;
13001296
}
@@ -1758,8 +1754,10 @@ private void wrapUp(@Nullable Throwable throwable) {
17581754
}
17591755
}
17601756
else {
1761-
this.logger.error("Fatal consumer exception; stopping container");
1762-
KafkaMessageListenerContainer.this.stop(false);
1757+
if (!(throwable instanceof Error)) {
1758+
this.logger.error("Fatal consumer exception; stopping container");
1759+
}
1760+
KafkaMessageListenerContainer.this.emergencyStop.run();
17631761
}
17641762
this.monitorTask.cancel(true);
17651763
if (!this.taskSchedulerExplicitlySet) {

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@
9999
import org.springframework.kafka.event.ConsumerStoppedEvent;
100100
import org.springframework.kafka.event.ConsumerStoppedEvent.Reason;
101101
import org.springframework.kafka.event.ConsumerStoppingEvent;
102+
import org.springframework.kafka.event.ContainerStoppedEvent;
102103
import org.springframework.kafka.event.NonResponsiveConsumerEvent;
103104
import org.springframework.kafka.listener.ContainerProperties.AckMode;
104105
import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption;
@@ -3049,38 +3050,63 @@ public void testCommitErrorHandlerCalled() throws Exception {
30493050
container.stop();
30503051
}
30513052

3052-
@SuppressWarnings({ "unchecked", "rawtypes" })
30533053
@Test
3054-
void testFatalErrorOnAuthenticationException() throws Exception {
3054+
void testFatalErrorOnAuthenticationException() throws InterruptedException {
30553055
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
3056-
Consumer<Integer, String> consumer = mock(Consumer.class);
3057-
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
3058-
given(cf.getConfigurationProperties()).willReturn(new HashMap<>());
3059-
3060-
willThrow(AuthenticationException.class)
3061-
.given(consumer).poll(any());
3062-
30633056
ContainerProperties containerProps = new ContainerProperties(topic1);
30643057
containerProps.setGroupId("grp");
30653058
containerProps.setClientId("clientId");
30663059
containerProps.setMessageListener((MessageListener) r -> { });
30673060
KafkaMessageListenerContainer<Integer, String> container =
30683061
new KafkaMessageListenerContainer<>(cf, containerProps);
3062+
testFatalErrorOnAuthenticationException(container, cf);
3063+
}
3064+
3065+
@Test
3066+
void testFatalErrorOnAuthenticationExceptionConcurrent() throws InterruptedException {
3067+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
3068+
ContainerProperties containerProps = new ContainerProperties(topic1);
3069+
containerProps.setGroupId("grp");
3070+
containerProps.setClientId("clientId");
3071+
containerProps.setMessageListener((MessageListener) r -> { });
3072+
ConcurrentMessageListenerContainer<Integer, String> container =
3073+
new ConcurrentMessageListenerContainer<>(cf, containerProps);
3074+
testFatalErrorOnAuthenticationException(container, cf);
3075+
}
3076+
3077+
@SuppressWarnings({ "unchecked", "rawtypes" })
3078+
private void testFatalErrorOnAuthenticationException(AbstractMessageListenerContainer container,
3079+
ConsumerFactory<Integer, String> cf) throws InterruptedException {
3080+
3081+
Consumer<Integer, String> consumer = mock(Consumer.class);
3082+
given(cf.createConsumer(eq("grp"), eq("clientId"),
3083+
container instanceof ConcurrentMessageListenerContainer ? eq("-0") : isNull(), any()))
3084+
.willReturn(consumer);
3085+
given(cf.getConfigurationProperties()).willReturn(new HashMap<>());
3086+
3087+
willThrow(AuthenticationException.class)
3088+
.given(consumer).poll(any());
30693089

30703090
AtomicReference<ConsumerStoppedEvent.Reason> reason = new AtomicReference<>();
3071-
CountDownLatch stopped = new CountDownLatch(1);
3091+
CountDownLatch consumerStopped = new CountDownLatch(1);
3092+
CountDownLatch containerStopped = new CountDownLatch(1);
30723093

30733094
container.setApplicationEventPublisher(e -> {
30743095
if (e instanceof ConsumerStoppedEvent) {
30753096
reason.set(((ConsumerStoppedEvent) e).getReason());
3076-
stopped.countDown();
3097+
consumerStopped.countDown();
3098+
}
3099+
else if (e instanceof ContainerStoppedEvent) {
3100+
containerStopped.countDown();
30773101
}
30783102
});
30793103

30803104
container.start();
30813105
try {
3082-
assertThat(stopped.await(10, TimeUnit.SECONDS)).isTrue();
3106+
assertThat(consumerStopped.await(10, TimeUnit.SECONDS)).isTrue();
30833107
assertThat(reason.get()).isEqualTo(Reason.AUTH);
3108+
assertThat(containerStopped.await(10, TimeUnit.SECONDS)).isTrue();
3109+
assertThat(container.isInExpectedState()).isFalse();
30843110
}
30853111
finally {
30863112
container.stop();
@@ -3106,18 +3132,23 @@ void testFatalErrorOnAuthorizationException() throws Exception {
31063132
new KafkaMessageListenerContainer<>(cf, containerProps);
31073133

31083134
AtomicReference<ConsumerStoppedEvent.Reason> reason = new AtomicReference<>();
3109-
CountDownLatch stopped = new CountDownLatch(1);
3135+
CountDownLatch consumerStopped = new CountDownLatch(1);
3136+
CountDownLatch containerStopped = new CountDownLatch(1);
31103137

31113138
container.setApplicationEventPublisher(e -> {
31123139
if (e instanceof ConsumerStoppedEvent) {
31133140
reason.set(((ConsumerStoppedEvent) e).getReason());
3114-
stopped.countDown();
3141+
consumerStopped.countDown();
3142+
}
3143+
else if (e instanceof ContainerStoppedEvent) {
3144+
containerStopped.countDown();
31153145
}
31163146
});
31173147

31183148
container.start();
3119-
assertThat(stopped.await(10, TimeUnit.SECONDS)).isTrue();
3149+
assertThat(consumerStopped.await(10, TimeUnit.SECONDS)).isTrue();
31203150
assertThat(reason.get()).isEqualTo(Reason.AUTH);
3151+
assertThat(container.isInExpectedState()).isFalse();
31213152
container.stop();
31223153
}
31233154

@@ -3144,6 +3175,7 @@ void testNotFatalErrorOnAuthorizationException() throws Exception {
31443175
container.start();
31453176
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
31463177
container.stop();
3178+
assertThat(container.isInExpectedState()).isTrue();
31473179
}
31483180

31493181
@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -3167,13 +3199,14 @@ void testFatalErrorOnFencedInstanceException() throws Exception {
31673199
CountDownLatch stopped = new CountDownLatch(1);
31683200

31693201
container.setApplicationEventPublisher(e -> {
3170-
if (e instanceof ConsumerStoppedEvent) {
3202+
if (e instanceof ContainerStoppedEvent) {
31713203
stopped.countDown();
31723204
}
31733205
});
31743206

31753207
container.start();
31763208
assertThat(stopped.await(10, TimeUnit.SECONDS)).isTrue();
3209+
assertThat(container.isInExpectedState()).isFalse();
31773210
container.stop();
31783211
}
31793212

spring-kafka/src/test/java/org/springframework/kafka/listener/TestOOMError.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -127,6 +127,7 @@ public void testOOMKMLC() throws Exception {
127127
container.start();
128128
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
129129
assertThat(container.isRunning()).isFalse();
130+
assertThat(container.isInExpectedState()).isFalse();
130131
}
131132

132133
}

0 commit comments

Comments
 (0)