Skip to content

Commit 4ec6061

Browse files
author
Zhiyang.Wang1
committed
GH-2588: support batch recoverable DefaultAfterRollbackProcessor
* add method processBatch at `AfterRollbackProcessor` * add opt-in property `batchRecoverAfterRollback` at `ContainerProperties` * change format to `BatchListenerFailedException.getMessage` * add batch recoverable after rollback unit test
1 parent c8e0c33 commit 4ec6061

11 files changed

+347
-80
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/AfterRollbackProcessor.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020

2121
import org.apache.kafka.clients.consumer.Consumer;
2222
import org.apache.kafka.clients.consumer.ConsumerRecord;
23+
import org.apache.kafka.clients.consumer.ConsumerRecords;
2324

2425
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
2526

@@ -63,6 +64,12 @@ public interface AfterRollbackProcessor<K, V> {
6364
void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer,
6465
MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode);
6566

67+
default void processBatch(ConsumerRecords<K, V> records, List<ConsumerRecord<K, V>> recordList,
68+
Consumer<K, V> consumer, MessageListenerContainer container, Exception exception,
69+
boolean recoverable, ContainerProperties.EOSMode eosMode) {
70+
process(recordList, consumer, container, exception, recoverable, eosMode);
71+
}
72+
6673
/**
6774
* Optional method to clear thread state; will be called just before a consumer
6875
* thread terminates.

spring-kafka/src/main/java/org/springframework/kafka/listener/BatchListenerFailedException.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,9 @@ public int getIndex() {
9898

9999
@Override
100100
public String getMessage() {
101-
return super.getMessage() + (this.record != null
101+
return super.getMessage() + " " + (this.record != null
102102
? (this.record.topic() + "-" + this.record.partition() + "@" + this.record.offset())
103-
: (" @-" + this.index));
103+
: ("@-" + this.index));
104104
}
105105

106106
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,8 @@ public enum EOSMode {
258258

259259
private PlatformTransactionManager transactionManager;
260260

261+
private boolean batchRecoverAfterRollback = false;
262+
261263
private int monitorInterval = DEFAULT_MONITOR_INTERVAL;
262264

263265
private TaskScheduler scheduler;
@@ -543,6 +545,14 @@ public void setTransactionManager(@Nullable PlatformTransactionManager transacti
543545
this.transactionManager = transactionManager;
544546
}
545547

548+
public boolean isBatchRecoverAfterRollback() {
549+
return this.batchRecoverAfterRollback;
550+
}
551+
552+
public void setBatchRecoverAfterRollback(boolean batchRecoverAfterRollback) {
553+
this.batchRecoverAfterRollback = batchRecoverAfterRollback;
554+
}
555+
546556
public int getMonitorInterval() {
547557
return this.monitorInterval;
548558
}

spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,19 @@
1717
package org.springframework.kafka.listener;
1818

1919
import java.util.Collections;
20+
import java.util.HashMap;
2021
import java.util.List;
2122
import java.util.Map;
2223
import java.util.concurrent.ConcurrentHashMap;
2324
import java.util.function.BiConsumer;
2425

2526
import org.apache.kafka.clients.consumer.Consumer;
2627
import org.apache.kafka.clients.consumer.ConsumerRecord;
28+
import org.apache.kafka.clients.consumer.ConsumerRecords;
2729
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2830
import org.apache.kafka.common.TopicPartition;
2931

32+
import org.springframework.kafka.KafkaException;
3033
import org.springframework.kafka.core.KafkaOperations;
3134
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
3235
import org.springframework.lang.Nullable;
@@ -60,7 +63,9 @@ public class DefaultAfterRollbackProcessor<K, V> extends FailedRecordProcessor
6063

6164
private final BackOff backOff;
6265

63-
private KafkaOperations<?, ?> kafkaTemplate;
66+
private final KafkaOperations<?, ?> kafkaTemplate;
67+
68+
private final BiConsumer<ConsumerRecords<?, ?>, Exception> recoverer;
6469

6570
/**
6671
* Construct an instance with the default recoverer which simply logs the record after
@@ -143,14 +148,19 @@ public DefaultAfterRollbackProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>,
143148
super.setCommitRecovered(commitRecovered);
144149
checkConfig();
145150
this.backOff = backOff;
151+
this.recoverer = (crs, ex) -> {
152+
if (recoverer != null && !crs.isEmpty()) {
153+
crs.spliterator().forEachRemaining(rec -> recoverer.accept(rec, ex));
154+
}
155+
};
146156
}
147157

148158
private void checkConfig() {
149159
Assert.isTrue(!isCommitRecovered() || this.kafkaTemplate != null,
150160
"A KafkaOperations is required when 'commitRecovered' is true");
151161
}
152162

153-
@SuppressWarnings({ "unchecked", "rawtypes", "deprecation" })
163+
@SuppressWarnings({ "unchecked", "rawtypes"})
154164
@Override
155165
public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer,
156166
@Nullable MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode) {
@@ -176,6 +186,53 @@ && isCommitRecovered() && this.kafkaTemplate.isTransactional()) {
176186

177187
}
178188

189+
@SuppressWarnings({ "unchecked", "rawtypes"})
190+
@Override
191+
public void processBatch(ConsumerRecords<K, V> records, List<ConsumerRecord<K, V>> recordList, Consumer<K, V> consumer,
192+
@Nullable MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode) {
193+
194+
if (recoverable && isCommitRecovered()) {
195+
long nextBackOff = ListenerUtils.nextBackOff(this.backOff, this.backOffs);
196+
if (nextBackOff != BackOffExecution.STOP) {
197+
SeekUtils.doSeeksToBegin((List) recordList, consumer, this.logger);
198+
try {
199+
ListenerUtils.stoppableSleep(container, nextBackOff);
200+
}
201+
catch (InterruptedException e) {
202+
Thread.currentThread().interrupt();
203+
}
204+
return;
205+
}
206+
207+
try {
208+
this.recoverer.accept(records, exception);
209+
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
210+
records.forEach(rec -> offsets.put(new TopicPartition(rec.topic(), rec.partition()),
211+
ListenerUtils.createOffsetAndMetadata(container, rec.offset() + 1)));
212+
if (offsets.size() > 0 && this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) {
213+
this.kafkaTemplate.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
214+
}
215+
clearThreadState();
216+
}
217+
catch (Exception ex) {
218+
SeekUtils.doSeeksToBegin((List) recordList, consumer, this.logger);
219+
logger.error(ex, () -> "Recoverer threw an exception; re-seeking batch");
220+
throw ex;
221+
}
222+
return;
223+
}
224+
225+
try {
226+
process(recordList, consumer, container, exception, false, eosMode);
227+
}
228+
catch (KafkaException ke) {
229+
ke.selfLog("AfterRollbackProcessor threw an exception", this.logger);
230+
}
231+
catch (Exception ex) {
232+
this.logger.error(ex, "AfterRollbackProcessor threw an exception");
233+
}
234+
}
235+
179236
@Override
180237
public boolean isProcessInTransaction() {
181238
return isCommitRecovered();

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public void setReclassifyOnExceptionChange(boolean reclassifyOnExceptionChange)
120120
@Override
121121
protected void notRetryable(Stream<Class<? extends Exception>> notRetryable) {
122122
if (this.fallbackBatchHandler instanceof ExceptionClassifier handler) {
123-
notRetryable.forEach(ex -> handler.addNotRetryableExceptions(ex));
123+
notRetryable.forEach(handler::addNotRetryableExceptions);
124124
}
125125
}
126126

@@ -201,11 +201,9 @@ private int findIndex(ConsumerRecords<?, ?> data, ConsumerRecord<?, ?> record) {
201201
return -1;
202202
}
203203
int i = 0;
204-
Iterator<?> iterator = data.iterator();
205-
while (iterator.hasNext()) {
206-
ConsumerRecord<?, ?> candidate = (ConsumerRecord<?, ?>) iterator.next();
207-
if (candidate.topic().equals(record.topic()) && candidate.partition() == record.partition()
208-
&& candidate.offset() == record.offset()) {
204+
for (ConsumerRecord<?, ?> datum : data) {
205+
if (datum.topic().equals(record.topic()) && datum.partition() == record.partition()
206+
&& datum.offset() == record.offset()) {
209207
break;
210208
}
211209
i++;

spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, C
181181
.stream()
182182
.collect(
183183
Collectors.toMap(tp -> tp,
184-
tp -> data.records(tp).get(0).offset(), (u, v) -> (long) v, LinkedHashMap::new))
184+
tp -> data.records(tp).get(0).offset(), (u, v) -> v, LinkedHashMap::new))
185185
.forEach(consumer::seek);
186186

187187
throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException);

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 16 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.List;
3333
import java.util.Map;
3434
import java.util.Map.Entry;
35+
import java.util.Objects;
3536
import java.util.Properties;
3637
import java.util.Set;
3738
import java.util.concurrent.BlockingQueue;
@@ -659,6 +660,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
659660

660661
private final boolean wantsFullRecords;
661662

663+
private final boolean wantsBatchRecoverAfterRollback;
664+
662665
private final boolean autoCommit;
663666

664667
private final boolean isManualAck;
@@ -881,6 +884,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
881884

882885
this.clientId = determineClientId();
883886
this.transactionTemplate = determineTransactionTemplate();
887+
this.wantsBatchRecoverAfterRollback = this.containerProperties.isBatchRecoverAfterRollback();
884888
this.genericListener = listener;
885889
this.consumerSeekAwareListener = checkConsumerSeekAware(listener);
886890
this.commitCurrentOnAssignment = determineCommitCurrent(consumerProperties,
@@ -2198,37 +2202,20 @@ private void batchRollback(final ConsumerRecords<K, V> records,
21982202

21992203
@Override
22002204
protected void doInTransactionWithoutResult(TransactionStatus status) {
2201-
batchAfterRollback(records, recordList, e, afterRollbackProcessorToUse);
2205+
afterRollbackProcessorToUse.processBatch(records,
2206+
Objects.requireNonNullElseGet(recordList, () -> createRecordList(records)),
2207+
ListenerConsumer.this.consumer,
2208+
KafkaMessageListenerContainer.this.thisOrParentContainer, e,
2209+
ListenerConsumer.this.wantsBatchRecoverAfterRollback, ListenerConsumer.this.eosMode);
22022210
}
22032211

22042212
});
22052213
}
22062214
else {
2207-
batchAfterRollback(records, recordList, e, afterRollbackProcessorToUse);
2208-
}
2209-
}
2210-
2211-
private void batchAfterRollback(final ConsumerRecords<K, V> records,
2212-
@Nullable final List<ConsumerRecord<K, V>> recordList, RuntimeException rollbackException,
2213-
AfterRollbackProcessor<K, V> afterRollbackProcessorToUse) {
2214-
2215-
try {
2216-
if (recordList == null) {
2217-
afterRollbackProcessorToUse.process(createRecordList(records), this.consumer,
2218-
KafkaMessageListenerContainer.this.thisOrParentContainer, rollbackException, false,
2219-
this.eosMode);
2220-
}
2221-
else {
2222-
afterRollbackProcessorToUse.process(recordList, this.consumer,
2223-
KafkaMessageListenerContainer.this.thisOrParentContainer, rollbackException, false,
2224-
this.eosMode);
2225-
}
2226-
}
2227-
catch (KafkaException ke) {
2228-
ke.selfLog("AfterRollbackProcessor threw an exception", this.logger);
2229-
}
2230-
catch (Exception ex) {
2231-
this.logger.error(ex, "AfterRollbackProcessor threw an exception");
2215+
afterRollbackProcessorToUse.processBatch(records,
2216+
Objects.requireNonNullElseGet(recordList, () -> createRecordList(records)), this.consumer,
2217+
KafkaMessageListenerContainer.this.thisOrParentContainer, e,
2218+
this.wantsBatchRecoverAfterRollback, this.eosMode);
22322219
}
22332220
}
22342221

@@ -3879,16 +3866,13 @@ private class StopCallback implements BiConsumer<Object, Throwable> {
38793866
public void accept(Object result, @Nullable Throwable throwable) {
38803867
if (throwable != null) {
38813868
KafkaMessageListenerContainer.this.logger.error(throwable, "Error while stopping the container");
3882-
if (this.callback != null) {
3883-
this.callback.run();
3884-
}
38853869
}
38863870
else {
38873871
KafkaMessageListenerContainer.this.logger
38883872
.debug(() -> KafkaMessageListenerContainer.this + " stopped normally");
3889-
if (this.callback != null) {
3890-
this.callback.run();
3891-
}
3873+
}
3874+
if (this.callback != null) {
3875+
this.callback.run();
38923876
}
38933877
}
38943878

spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -126,12 +126,7 @@ public static void unrecoverableBackOff(BackOff backOff, Map<Thread, BackOffExec
126126
Map<Thread, Long> lastIntervals, MessageListenerContainer container) throws InterruptedException {
127127

128128
Thread currentThread = Thread.currentThread();
129-
BackOffExecution backOffExecution = executions.get(currentThread);
130-
if (backOffExecution == null) {
131-
backOffExecution = backOff.start();
132-
executions.put(currentThread, backOffExecution);
133-
}
134-
Long interval = backOffExecution.nextBackOff();
129+
Long interval = nextBackOff(backOff, executions);
135130
if (interval == BackOffExecution.STOP) {
136131
interval = lastIntervals.get(currentThread);
137132
if (interval == null) {
@@ -144,6 +139,17 @@ public static void unrecoverableBackOff(BackOff backOff, Map<Thread, BackOffExec
144139
}
145140
}
146141

142+
public static long nextBackOff(BackOff backOff, Map<Thread, BackOffExecution> executions) {
143+
144+
Thread currentThread = Thread.currentThread();
145+
BackOffExecution backOffExecution = executions.get(currentThread);
146+
if (backOffExecution == null) {
147+
backOffExecution = backOff.start();
148+
executions.put(currentThread, backOffExecution);
149+
}
150+
return backOffExecution.nextBackOff();
151+
}
152+
147153
/**
148154
* Sleep for the desired timeout, as long as the container continues to run.
149155
* @param container the container.

spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 the original author or authors.
2+
* Copyright 2018-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -133,6 +133,17 @@ public static boolean doSeeks(List<ConsumerRecord<?, ?>> records, Consumer<?, ?>
133133
return skipped.get();
134134
}
135135

136+
public static void doSeeksToBegin(List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
137+
LogAccessor logger) {
138+
139+
Map<TopicPartition, Long> partitions = new LinkedHashMap<>();
140+
records.forEach(record -> {
141+
partitions.computeIfAbsent(new TopicPartition(record.topic(), record.partition()),
142+
offset -> record.offset());
143+
});
144+
seekPartitions(consumer, partitions, logger);
145+
}
146+
136147
/**
137148
* Perform seek operations on each partition.
138149
* @param consumer the consumer.

0 commit comments

Comments
 (0)