Skip to content

Commit 7c52853

Browse files
marincekgaryrussell
marincek
authored andcommitted
GH-2262: Add BackOffHander
With default implementation (suspend thread) and container pausing/resuming implementation. init commit adding default impl changes add Apache licence pr fix style fixes added ListenerContainerPauseService
1 parent 6fbc6cb commit 7c52853

File tree

8 files changed

+376
-20
lines changed

8 files changed

+376
-20
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import org.springframework.lang.Nullable;
20+
21+
/**
22+
* Handler for the provided back off time, listener container and exception.
23+
*
24+
* @author Jan Marincek
25+
* @since 2.9
26+
*/
27+
@FunctionalInterface
28+
public interface BackOffHandler {
29+
30+
void onNextBackOff(@Nullable MessageListenerContainer container, Exception exception, long nextBackOff);
31+
32+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -107,17 +107,30 @@ public DefaultAfterRollbackProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>,
107107
/**
108108
* Construct an instance with the provided recoverer which will be called after the
109109
* backOff returns STOP for a topic/partition/offset.
110-
* @param recoverer the recoverer; if null, the default (logging) recoverer is used.
111-
* @param backOff the {@link BackOff}.
110+
* @param recoverer the recoverer; if null, the default (logging) recoverer is used.
111+
* @param backOff the {@link BackOff}.
112112
* @param kafkaOperations for sending the recovered offset to the transaction.
113113
* @param commitRecovered true to commit the recovered record's offset; requires a
114-
* {@link KafkaOperations}.
115-
* @since 2.5.3
114+
* {@link KafkaOperations}.
115+
* @since 2.9
116116
*/
117-
public DefaultAfterRollbackProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer,
118-
BackOff backOff, @Nullable KafkaOperations<?, ?> kafkaOperations, boolean commitRecovered) {
117+
public DefaultAfterRollbackProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff, @Nullable KafkaOperations<?, ?> kafkaOperations, boolean commitRecovered) {
118+
this(recoverer, backOff, null, kafkaOperations, commitRecovered);
119+
}
119120

120-
super(recoverer, backOff);
121+
/**
122+
* Construct an instance with the provided recoverer which will be called after the
123+
* backOff returns STOP for a topic/partition/offset.
124+
* @param recoverer the recoverer; if null, the default (logging) recoverer is used.
125+
* @param backOff the {@link BackOff}.
126+
* @param backOffHandler the {@link BackOffHandler}.
127+
* @param kafkaOperations for sending the recovered offset to the transaction.
128+
* @param commitRecovered true to commit the recovered record's offset; requires a
129+
* {@link KafkaOperations}.
130+
* @since 2.5.9
131+
*/
132+
public DefaultAfterRollbackProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff, @Nullable BackOffHandler backOffHandler, @Nullable KafkaOperations<?, ?> kafkaOperations, boolean commitRecovered) {
133+
super(recoverer, backOff, backOffHandler);
121134
this.kafkaTemplate = kafkaOperations;
122135
super.setCommitRecovered(commitRecovered);
123136
checkConfig();

spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,18 @@ public DefaultErrorHandler(ConsumerRecordRecoverer recoverer) {
8686
* @param backOff the {@link BackOff}.
8787
*/
8888
public DefaultErrorHandler(@Nullable ConsumerRecordRecoverer recoverer, BackOff backOff) {
89-
super(recoverer, backOff, createFallback(backOff, recoverer));
89+
this(recoverer, backOff, null);
90+
}
91+
92+
/**
93+
* Construct an instance with the provided recoverer which will be called after
94+
* the backOff returns STOP for a topic/partition/offset.
95+
* @param recoverer the recoverer; if null, the default (logging) recoverer is used.
96+
* @param backOff the {@link BackOff}.
97+
* @param backOffHandler the {@link BackOffHandler}.
98+
*/
99+
public DefaultErrorHandler(@Nullable ConsumerRecordRecoverer recoverer, BackOff backOff, @Nullable BackOffHandler backOffHandler) {
100+
super(recoverer, backOff, backOffHandler, createFallback(backOff, recoverer));
90101
}
91102

92103
private static CommonErrorHandler createFallback(BackOff backOff, @Nullable ConsumerRecordRecoverer recoverer) {

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,22 @@ public abstract class FailedBatchProcessor extends FailedRecordProcessor {
6464
* @param fallbackHandler the fall back handler.
6565
*/
6666
public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
67-
CommonErrorHandler fallbackHandler) {
67+
CommonErrorHandler fallbackHandler) {
6868

69-
super(recoverer, backOff);
69+
this(recoverer, backOff, null, fallbackHandler);
70+
}
71+
72+
/**
73+
* Construct an instance with the provided properties.
74+
* @param recoverer the recoverer.
75+
* @param backOff the back off.
76+
* @param backOffHandler the {@link BackOffHandler}
77+
* @param fallbackHandler the fall back handler.
78+
*/
79+
public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
80+
@Nullable BackOffHandler backOffHandler, CommonErrorHandler fallbackHandler) {
81+
82+
super(recoverer, backOff, backOffHandler);
7083
this.fallbackBatchHandler = fallbackHandler;
7184
}
7285

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,11 @@ public abstract class FailedRecordProcessor extends ExceptionClassifier implemen
6161
private boolean seekAfterError = true;
6262

6363
protected FailedRecordProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff) {
64-
this.failureTracker = new FailedRecordTracker(recoverer, backOff, this.logger);
64+
this(recoverer, backOff, null);
65+
}
66+
67+
protected FailedRecordProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff, @Nullable BackOffHandler backOffHandler) {
68+
this.failureTracker = new FailedRecordTracker(recoverer, backOff, backOffHandler, this.logger);
6569
this.failureTracker.setBackOffFunction(this.noRetriesForClassified);
6670
}
6771

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,14 @@ class FailedRecordTracker implements RecoveryStrategy {
5858

5959
private BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> backOffFunction;
6060

61+
private final BackOffHandler backOffHandler;
62+
6163
private boolean resetStateOnRecoveryFailure = true;
6264

6365
private boolean resetStateOnExceptionChange = true;
6466

6567
FailedRecordTracker(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
66-
LogAccessor logger) {
68+
@Nullable BackOffHandler backOffHandler, LogAccessor logger) {
6769

6870
Assert.notNull(backOff, "'backOff' cannot be null");
6971
if (recoverer == null) {
@@ -74,10 +76,10 @@ class FailedRecordTracker implements RecoveryStrategy {
7476
failedRecord = map.get(new TopicPartition(rec.topic(), rec.partition()));
7577
}
7678
logger.error(thr, "Backoff "
77-
+ (failedRecord == null
79+
+ (failedRecord == null
7880
? "none"
7981
: failedRecord.getBackOffExecution())
80-
+ " exhausted for " + KafkaUtils.format(rec));
82+
+ " exhausted for " + KafkaUtils.format(rec));
8183
};
8284
}
8385
else {
@@ -90,6 +92,14 @@ class FailedRecordTracker implements RecoveryStrategy {
9092
}
9193
this.noRetries = backOff.start().nextBackOff() == BackOffExecution.STOP;
9294
this.backOff = backOff;
95+
96+
this.backOffHandler = backOffHandler == null ? new DefaultBackOffHandler() : backOffHandler;
97+
98+
}
99+
100+
FailedRecordTracker(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
101+
LogAccessor logger) {
102+
this(recoverer, backOff, null, logger);
93103
}
94104

95105
/**
@@ -172,12 +182,7 @@ public boolean recovered(ConsumerRecord<?, ?> record, Exception exception,
172182
rl.failedDelivery(record, exception, failedRecord.getDeliveryAttempts().get()));
173183
long nextBackOff = failedRecord.getBackOffExecution().nextBackOff();
174184
if (nextBackOff != BackOffExecution.STOP) {
175-
if (container == null) {
176-
Thread.sleep(nextBackOff);
177-
}
178-
else {
179-
ListenerUtils.stoppableSleep(container, nextBackOff);
180-
}
185+
this.backOffHandler.onNextBackOff(container, exception, nextBackOff);
181186
return false;
182187
}
183188
else {
@@ -301,4 +306,20 @@ void setLastException(Exception lastException) {
301306

302307
}
303308

309+
static class DefaultBackOffHandler implements BackOffHandler {
310+
public void onNextBackOff(@Nullable MessageListenerContainer container, Exception exception, long nextBackOff) {
311+
try {
312+
if (container == null) {
313+
Thread.sleep(nextBackOff);
314+
}
315+
else {
316+
ListenerUtils.stoppableSleep(container, nextBackOff);
317+
}
318+
}
319+
catch (InterruptedException e) {
320+
throw new RuntimeException(e);
321+
}
322+
}
323+
}
324+
304325
}
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Copyright 2014-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.time.Duration;
20+
import java.time.LocalDateTime;
21+
import java.util.Optional;
22+
import java.util.concurrent.Executors;
23+
import java.util.concurrent.ScheduledExecutorService;
24+
import java.util.concurrent.TimeUnit;
25+
26+
import org.apache.commons.logging.LogFactory;
27+
28+
import org.springframework.core.log.LogAccessor;
29+
import org.springframework.lang.NonNull;
30+
import org.springframework.lang.Nullable;
31+
32+
/**
33+
* Service for pausing and resuming of {@link MessageListenerContainer}.
34+
*
35+
* @author Jan Marincek
36+
* @since 2.9
37+
*/
38+
public class ListenerContainerPauseService {
39+
40+
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(ListenerContainerPauseService.class));
41+
private final ListenerContainerRegistry registry;
42+
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
43+
44+
public ListenerContainerPauseService(ListenerContainerRegistry registry) {
45+
this.registry = registry;
46+
}
47+
48+
/**
49+
* Pause the listener by given id.
50+
* Checks if the listener has already been requested to pause.
51+
*
52+
* @param listenerId the id of the listener
53+
*/
54+
public void pause(String listenerId) {
55+
getListenerContainer(listenerId).ifPresent(this::pause);
56+
}
57+
58+
/**
59+
* Pause the listener by given id.
60+
* Checks if the listener has already been requested to pause.
61+
* Sets executor schedule for resuming the same listener after pauseDuration.
62+
*
63+
* @param listenerId the id of the listener
64+
* @param pauseDuration duration between pause() and resume() actions
65+
*/
66+
public void pause(String listenerId, Duration pauseDuration) {
67+
getListenerContainer(listenerId)
68+
.ifPresent(messageListenerContainer -> pause(messageListenerContainer, pauseDuration));
69+
}
70+
71+
/**
72+
* Pause the listener by given container instance.
73+
* Checks if the listener has already been requested to pause.
74+
*
75+
* @param messageListenerContainer the listener container
76+
*/
77+
public void pause(@NonNull MessageListenerContainer messageListenerContainer) {
78+
pause(messageListenerContainer, null);
79+
}
80+
81+
/**
82+
* Pause the listener by given container instance.
83+
* Checks if the listener has already been requested to pause.
84+
* Sets executor schedule for resuming the same listener after pauseDuration.
85+
*
86+
* @param messageListenerContainer the listener container
87+
* @param pauseDuration duration between pause() and resume() actions
88+
*/
89+
public void pause(@NonNull MessageListenerContainer messageListenerContainer, @Nullable Duration pauseDuration) {
90+
if (messageListenerContainer.isPauseRequested()) {
91+
LOGGER.debug(() -> "Container " + messageListenerContainer + " already has pause requested");
92+
}
93+
else {
94+
LOGGER.debug(() -> "Pausing container " + messageListenerContainer);
95+
messageListenerContainer.pause();
96+
if (messageListenerContainer.getListenerId() != null && pauseDuration != null) {
97+
LOGGER.debug(() -> "Resuming of container " + messageListenerContainer + " scheduled for " + LocalDateTime.now().plus(pauseDuration));
98+
this.executor.schedule(() -> resume(messageListenerContainer.getListenerId()), pauseDuration.toMillis(), TimeUnit.MILLISECONDS);
99+
}
100+
}
101+
}
102+
103+
/**
104+
* Resume the listener by given id.
105+
*
106+
* @param listenerId the id of the listener
107+
*/
108+
public void resume(@NonNull String listenerId) {
109+
getListenerContainer(listenerId).ifPresent(this::resume);
110+
}
111+
112+
/**
113+
* Resume the listener.
114+
*
115+
* @param messageListenerContainer the listener container
116+
*/
117+
public void resume(@NonNull MessageListenerContainer messageListenerContainer) {
118+
if (messageListenerContainer.isContainerPaused()) {
119+
LOGGER.debug(() -> "Resuming container " + messageListenerContainer);
120+
messageListenerContainer.resume();
121+
}
122+
else {
123+
LOGGER.debug(() -> "Container " + messageListenerContainer + " was not paused");
124+
}
125+
}
126+
127+
private Optional<MessageListenerContainer> getListenerContainer(String listenerId) {
128+
MessageListenerContainer messageListenerContainer = this.registry.getListenerContainer(listenerId);
129+
if (messageListenerContainer == null) {
130+
LOGGER.warn(() -> "MessageListenerContainer " + listenerId + " does not exists");
131+
}
132+
133+
return Optional.ofNullable(messageListenerContainer);
134+
}
135+
136+
}

0 commit comments

Comments
 (0)