Skip to content

Commit 73bee66

Browse files
authored
GH-2612: Add Option To Restart Container After Auth
Resolves #2612 Add a container property to enable restarting the container after an emergency stop due to authentication/authorization exceptions, perhaps due to credentials expiring. **cherry-pick to 2.9.x** * Add unsaved change.
1 parent 426f11f commit 73bee66

File tree

8 files changed

+130
-12
lines changed

8 files changed

+130
-12
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2556,7 +2556,15 @@ See `monitorInterval`.
25562556

25572557
|[[pollTimeout]]<<pollTimeout,`pollTimeout`>>
25582558
|5000
2559-
|The timeout passed into `Consumer.poll()`.
2559+
|The timeout passed into `Consumer.poll()` in milliseconds.
2560+
2561+
|[[pollTimeoutWhilePaused]]<<pollTimeoutWhilePaused,`pollTimeoutWhilePaused`>>
2562+
|100
2563+
|The timeout passed into `Consumer.poll()` (in milliseconds) when the container is in a paused state.
2564+
2565+
|[[restartAfterAuthExceptions]]<<restartAfterAuthExceptions,`restartAfterAuthExceptions`>>
2566+
|false
2567+
|True to restart the container if it is stopped due to authorization/authentication exceptions.
25602568

25612569
|[[scheduler]]<<scheduler,`scheduler`>>
25622570
|`ThreadPoolTaskScheduler`

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ See <<events>> for more information.
6767
You can now customize the thread names used by consumer threads.
6868
See <<container-thread-naming>> for more information.
6969

70+
The container property `restartAfterAuthException` has been added.
71+
See <<container-props>> for more information.
72+
7073
[[x30-template-changes]]
7174
==== `KafkaTemplate` Changes
7275

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.springframework.core.task.AsyncTaskExecutor;
3737
import org.springframework.core.task.SimpleAsyncTaskExecutor;
3838
import org.springframework.kafka.core.ConsumerFactory;
39+
import org.springframework.kafka.event.ConsumerStoppedEvent.Reason;
3940
import org.springframework.kafka.support.TopicPartitionOffset;
4041
import org.springframework.lang.Nullable;
4142
import org.springframework.util.Assert;
@@ -64,10 +65,14 @@ public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageLis
6465

6566
private final List<AsyncTaskExecutor> executors = new ArrayList<>();
6667

68+
private final AtomicInteger stoppedContainers = new AtomicInteger();
69+
6770
private int concurrency = 1;
6871

6972
private boolean alwaysClientIdSuffix = true;
7073

74+
private volatile Reason reason;
75+
7176
/**
7277
* Construct an instance with the supplied configuration properties.
7378
* The topic partitions are distributed evenly across the delegate
@@ -343,6 +348,23 @@ protected void doStop(final Runnable callback, boolean normal) {
343348
}
344349
}
345350

351+
@Override
352+
public void childStopped(MessageListenerContainer child, Reason reason) {
353+
synchronized (this.lifecycleMonitor) {
354+
if (this.reason == null || reason.equals(Reason.AUTH)) {
355+
this.reason = reason;
356+
}
357+
if (Reason.AUTH.equals(this.reason)
358+
&& getContainerProperties().isRestartAfterAuthExceptions()
359+
&& this.concurrency == this.stoppedContainers.incrementAndGet()) {
360+
361+
this.reason = null;
362+
this.stoppedContainers.set(0);
363+
doStart();
364+
}
365+
}
366+
}
367+
346368
@Override
347369
public void pause() {
348370
synchronized (this.lifecycleMonitor) {

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,8 @@ public enum EOSMode {
293293

294294
private Duration pollTimeoutWhilePaused = DEFAULT_PAUSED_POLL_TIMEOUT;
295295

296+
private boolean restartAfterAuthExceptions;
297+
296298
/**
297299
* Create properties for a container that will subscribe to the specified topics.
298300
* @param topics the topics.
@@ -972,6 +974,25 @@ public void setPollTimeoutWhilePaused(Duration pollTimeoutWhilePaused) {
972974
this.pollTimeoutWhilePaused = pollTimeoutWhilePaused;
973975
}
974976

977+
/**
978+
* Restart the container if stopped due to an auth exception.
979+
* @return the restartAfterAuthExceptions
980+
* @since 2.9.7
981+
*/
982+
public boolean isRestartAfterAuthExceptions() {
983+
return this.restartAfterAuthExceptions;
984+
}
985+
986+
/**
987+
* Set to true to automatically restart the container if an auth exception is
988+
* detected by the container (or all child containers).
989+
* @param restartAfterAuthExceptions true to restart.
990+
* @since 2.9.7
991+
*/
992+
public void setRestartAfterAuthExceptions(boolean restartAfterAuthExceptions) {
993+
this.restartAfterAuthExceptions = restartAfterAuthExceptions;
994+
}
995+
975996
@Override
976997
public String toString() {
977998
return "ContainerProperties ["
@@ -1009,6 +1030,7 @@ public String toString() {
10091030
+ (this.observationConvention != null
10101031
? "\n observationConvention=" + this.observationConvention
10111032
: "")
1033+
+ "\n restartAfterAuthExceptions=" + this.restartAfterAuthExceptions
10121034
+ "\n]";
10131035
}
10141036

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,15 @@ protected void doStop(final Runnable callback, boolean normal) {
431431
}
432432
}
433433

434+
@Override
435+
public void childStopped(MessageListenerContainer child, Reason reason) {
436+
if (reason.equals(Reason.AUTH) && child.equals(this)
437+
&& getContainerProperties().isRestartAfterAuthExceptions()) {
438+
setStoppedNormally(true);
439+
start();
440+
}
441+
}
442+
434443
private void publishIdlePartitionEvent(long idleTime, TopicPartition topicPartition, Consumer<K, V> consumer, boolean paused) {
435444
ApplicationEventPublisher publisher = getApplicationEventPublisher();
436445
if (publisher != null) {
@@ -540,6 +549,7 @@ else if (throwable instanceof NoOffsetForPartitionException) {
540549
}
541550
publisher.publishEvent(new ConsumerStoppedEvent(this, this.thisOrParentContainer,
542551
reason));
552+
this.thisOrParentContainer.childStopped(this, reason);
543553
}
544554
}
545555

spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@
2525

2626
import org.springframework.beans.factory.DisposableBean;
2727
import org.springframework.context.SmartLifecycle;
28+
import org.springframework.kafka.event.ConsumerStoppedEvent;
2829
import org.springframework.kafka.support.KafkaHeaders;
2930
import org.springframework.lang.Nullable;
3031

@@ -260,6 +261,15 @@ default MessageListenerContainer getContainerFor(String topic, int partition) {
260261
return this;
261262
}
262263

264+
/**
265+
* Notify a parent container that a child container has stopped.
266+
* @param child the container.
267+
* @param reason the reason.
268+
* @since 2.9.7
269+
*/
270+
default void childStopped(MessageListenerContainer child, ConsumerStoppedEvent.Reason reason) {
271+
}
272+
263273
@Override
264274
default void destroy() {
265275
stop();

spring-kafka/src/test/java/org/springframework/kafka/listener/CommonContainerStoppingErrorHandler2Tests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2021 the original author or authors.
2+
* Copyright 2017-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -183,7 +183,7 @@ public Consumer consumer() {
183183
catch (InterruptedException e) {
184184
Thread.currentThread().interrupt();
185185
}
186-
return new ConsumerRecords(Collections.emptyMap());
186+
return ConsumerRecords.empty();
187187
}
188188
}).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
189189
willAnswer(i -> {

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -74,7 +74,6 @@
7474
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
7575
import org.apache.kafka.clients.producer.ProducerConfig;
7676
import org.apache.kafka.common.TopicPartition;
77-
import org.apache.kafka.common.errors.AuthenticationException;
7877
import org.apache.kafka.common.errors.AuthorizationException;
7978
import org.apache.kafka.common.errors.FencedInstanceIdException;
8079
import org.apache.kafka.common.errors.RebalanceInProgressException;
@@ -105,6 +104,7 @@
105104
import org.springframework.kafka.event.ConsumerResumedEvent;
106105
import org.springframework.kafka.event.ConsumerRetryAuthEvent;
107106
import org.springframework.kafka.event.ConsumerRetryAuthSuccessfulEvent;
107+
import org.springframework.kafka.event.ConsumerStartedEvent;
108108
import org.springframework.kafka.event.ConsumerStoppedEvent;
109109
import org.springframework.kafka.event.ConsumerStoppedEvent.Reason;
110110
import org.springframework.kafka.event.ConsumerStoppingEvent;
@@ -3231,7 +3231,7 @@ void testFatalErrorOnAuthenticationException() throws InterruptedException {
32313231
containerProps.setMessageListener((MessageListener) r -> { });
32323232
KafkaMessageListenerContainer<Integer, String> container =
32333233
new KafkaMessageListenerContainer<>(cf, containerProps);
3234-
testFatalErrorOnAuthenticationException(container, cf);
3234+
testFatalErrorOnAuthenticationException(container, cf, false);
32353235
}
32363236

32373237
@Test
@@ -3244,25 +3244,59 @@ void testFatalErrorOnAuthenticationExceptionConcurrent() throws InterruptedExcep
32443244
containerProps.setMessageListener((MessageListener) r -> { });
32453245
ConcurrentMessageListenerContainer<Integer, String> container =
32463246
new ConcurrentMessageListenerContainer<>(cf, containerProps);
3247-
testFatalErrorOnAuthenticationException(container, cf);
3247+
testFatalErrorOnAuthenticationException(container, cf, false);
3248+
}
3249+
3250+
@Test
3251+
@SuppressWarnings({ "unchecked", "rawtypes" })
3252+
void testFatalErrorOnAuthenticationExceptionRestart() throws InterruptedException {
3253+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
3254+
ContainerProperties containerProps = new ContainerProperties(topic1);
3255+
containerProps.setGroupId("grp");
3256+
containerProps.setClientId("clientId");
3257+
containerProps.setMessageListener((MessageListener) r -> { });
3258+
containerProps.setRestartAfterAuthExceptions(true);
3259+
KafkaMessageListenerContainer<Integer, String> container =
3260+
new KafkaMessageListenerContainer<>(cf, containerProps);
3261+
testFatalErrorOnAuthenticationException(container, cf, true);
3262+
}
3263+
3264+
@Test
3265+
@SuppressWarnings({ "unchecked", "rawtypes" })
3266+
void testFatalErrorOnAuthenticationExceptionConcurrentRestart() throws InterruptedException {
3267+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
3268+
ContainerProperties containerProps = new ContainerProperties(topic1);
3269+
containerProps.setGroupId("grp");
3270+
containerProps.setClientId("clientId");
3271+
containerProps.setMessageListener((MessageListener) r -> { });
3272+
containerProps.setRestartAfterAuthExceptions(true);
3273+
ConcurrentMessageListenerContainer<Integer, String> container =
3274+
new ConcurrentMessageListenerContainer<>(cf, containerProps);
3275+
testFatalErrorOnAuthenticationException(container, cf, true);
32483276
}
32493277

32503278
@SuppressWarnings({ "unchecked", "rawtypes" })
32513279
private void testFatalErrorOnAuthenticationException(AbstractMessageListenerContainer container,
3252-
ConsumerFactory<Integer, String> cf) throws InterruptedException {
3280+
ConsumerFactory<Integer, String> cf, boolean restart) throws InterruptedException {
32533281

32543282
Consumer<Integer, String> consumer = mock(Consumer.class);
32553283
given(cf.createConsumer(eq("grp"), eq("clientId"),
32563284
container instanceof ConcurrentMessageListenerContainer ? eq("-0") : isNull(), any()))
32573285
.willReturn(consumer);
32583286
given(cf.getConfigurationProperties()).willReturn(new HashMap<>());
32593287

3260-
willThrow(AuthenticationException.class)
3261-
.given(consumer).poll(any());
3288+
AtomicBoolean first = new AtomicBoolean(true);
3289+
willAnswer(inv -> {
3290+
if (first.getAndSet(false)) {
3291+
throw new AuthorizationException("test");
3292+
}
3293+
return ConsumerRecords.empty();
3294+
}).given(consumer).poll(any());
32623295

32633296
AtomicReference<ConsumerStoppedEvent.Reason> reason = new AtomicReference<>();
32643297
CountDownLatch consumerStopped = new CountDownLatch(1);
32653298
CountDownLatch containerStopped = new CountDownLatch(1);
3299+
CountDownLatch containerStarted = new CountDownLatch(2);
32663300

32673301
container.setApplicationEventPublisher(e -> {
32683302
if (e instanceof ConsumerStoppedEvent) {
@@ -3272,14 +3306,23 @@ private void testFatalErrorOnAuthenticationException(AbstractMessageListenerCont
32723306
else if (e instanceof ContainerStoppedEvent) {
32733307
containerStopped.countDown();
32743308
}
3309+
else if (e instanceof ConsumerStartedEvent) {
3310+
containerStarted.countDown();
3311+
}
32753312
});
32763313

32773314
container.start();
32783315
try {
32793316
assertThat(consumerStopped.await(10, TimeUnit.SECONDS)).isTrue();
32803317
assertThat(reason.get()).isEqualTo(Reason.AUTH);
32813318
assertThat(containerStopped.await(10, TimeUnit.SECONDS)).isTrue();
3282-
assertThat(container.isInExpectedState()).isFalse();
3319+
if (!restart) {
3320+
assertThat(container.isInExpectedState()).isFalse();
3321+
}
3322+
else {
3323+
assertThat(containerStarted.await(10, TimeUnit.SECONDS)).isTrue();
3324+
assertThat(container.isInExpectedState()).isTrue();
3325+
}
32833326
}
32843327
finally {
32853328
container.stop();

0 commit comments

Comments
 (0)