Skip to content

Commit ec7cde6

Browse files
author
Zhiyang.Wang1
committed
GH-2588: ARBP support BatchListenerFailedException
Resolves #2588 * add default function `processBatch` in AfterRollbackProcessor * support BatchListenerFailedException in DefaultAfterRollbackProcessor * Add unit tests
1 parent b868682 commit ec7cde6

File tree

7 files changed

+320
-52
lines changed

7 files changed

+320
-52
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/AfterRollbackProcessor.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020

2121
import org.apache.kafka.clients.consumer.Consumer;
2222
import org.apache.kafka.clients.consumer.ConsumerRecord;
23+
import org.apache.kafka.clients.consumer.ConsumerRecords;
2324

2425
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
2526

@@ -63,6 +64,12 @@ public interface AfterRollbackProcessor<K, V> {
6364
void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer,
6465
MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode);
6566

67+
default void processBatch(ConsumerRecords<K, V> records, List<ConsumerRecord<K, V>> recordList,
68+
Consumer<K, V> consumer, MessageListenerContainer container, Exception exception,
69+
boolean recoverable, ContainerProperties.EOSMode eosMode) {
70+
process(recordList, consumer, container, exception, recoverable, eosMode);
71+
}
72+
6673
/**
6774
* Optional method to clear thread state; will be called just before a consumer
6875
* thread terminates.

spring-kafka/src/main/java/org/springframework/kafka/listener/BatchListenerFailedException.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,9 @@ public int getIndex() {
9898

9999
@Override
100100
public String getMessage() {
101-
return super.getMessage() + (this.record != null
101+
return super.getMessage() + " " + (this.record != null
102102
? (this.record.topic() + "-" + this.record.partition() + "@" + this.record.offset())
103-
: (" @-" + this.index));
103+
: ("@-" + this.index));
104104
}
105105

106106
}

spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,17 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.util.ArrayList;
1920
import java.util.Collections;
21+
import java.util.HashMap;
2022
import java.util.List;
2123
import java.util.Map;
2224
import java.util.concurrent.ConcurrentHashMap;
2325
import java.util.function.BiConsumer;
2426

2527
import org.apache.kafka.clients.consumer.Consumer;
2628
import org.apache.kafka.clients.consumer.ConsumerRecord;
29+
import org.apache.kafka.clients.consumer.ConsumerRecords;
2730
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2831
import org.apache.kafka.common.TopicPartition;
2932

@@ -51,7 +54,7 @@
5154
* @since 1.3.5
5255
*
5356
*/
54-
public class DefaultAfterRollbackProcessor<K, V> extends FailedRecordProcessor
57+
public class DefaultAfterRollbackProcessor<K, V> extends FailedBatchProcessor
5558
implements AfterRollbackProcessor<K, V> {
5659

5760
private final Map<Thread, BackOffExecution> backOffs = new ConcurrentHashMap<>();
@@ -138,7 +141,7 @@ public DefaultAfterRollbackProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>,
138141
BackOff backOff, @Nullable BackOffHandler backOffHandler, @Nullable KafkaOperations<?, ?> kafkaOperations,
139142
boolean commitRecovered) {
140143

141-
super(recoverer, backOff, backOffHandler);
144+
super(recoverer, backOff, backOffHandler, null);
142145
this.kafkaTemplate = kafkaOperations;
143146
super.setCommitRecovered(commitRecovered);
144147
checkConfig();
@@ -176,6 +179,46 @@ && isCommitRecovered() && this.kafkaTemplate.isTransactional()) {
176179

177180
}
178181

182+
@Override
183+
public void processBatch(ConsumerRecords<K, V> records, List<ConsumerRecord<K, V>> recordList, Consumer<K, V> consumer,
184+
@Nullable MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode) {
185+
186+
int index = handlerBatchListenerFailedException(exception, records);
187+
if (index > -1) {
188+
List<ConsumerRecord<?, ?>> toCommit = new ArrayList<>();
189+
List<ConsumerRecord<?, ?>> remaining = new ArrayList<>();
190+
for (ConsumerRecord<?, ?> datum : records) {
191+
if (index-- > 0) {
192+
toCommit.add(datum);
193+
}
194+
else {
195+
remaining.add(datum);
196+
}
197+
}
198+
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
199+
toCommit.forEach(rec -> offsets.compute(new TopicPartition(rec.topic(), rec.partition()),
200+
(key, val) -> ListenerUtils.createOffsetAndMetadata(container, rec.offset() + 1)));
201+
if (offsets.size() > 0 && this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) {
202+
this.kafkaTemplate.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
203+
}
204+
205+
// skip first record
206+
if (SeekUtils.doSeeks(remaining, consumer, exception, recoverable,
207+
getFailureTracker()::recovered, container, this.logger)
208+
&& isCommitRecovered() && this.kafkaTemplate.isTransactional()) {
209+
ConsumerRecord<?, ?> skipped = remaining.get(0);
210+
this.kafkaTemplate.sendOffsetsToTransaction(
211+
Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
212+
createOffsetAndMetadata(container, skipped.offset() + 1)
213+
), consumer.groupMetadata());
214+
}
215+
}
216+
else {
217+
process(recordList, consumer, container, exception, false, eosMode);
218+
}
219+
220+
}
221+
179222
@Override
180223
public boolean isProcessInTransaction() {
181224
return isCommitRecovered();

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public void setReclassifyOnExceptionChange(boolean reclassifyOnExceptionChange)
120120
@Override
121121
protected void notRetryable(Stream<Class<? extends Exception>> notRetryable) {
122122
if (this.fallbackBatchHandler instanceof ExceptionClassifier handler) {
123-
notRetryable.forEach(ex -> handler.addNotRetryableExceptions(ex));
123+
notRetryable.forEach(handler::addNotRetryableExceptions);
124124
}
125125
}
126126

@@ -159,11 +159,19 @@ protected void doHandle(Exception thrownException, ConsumerRecords<?, ?> data, C
159159

160160
protected <K, V> ConsumerRecords<K, V> handle(Exception thrownException, ConsumerRecords<?, ?> data,
161161
Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
162+
int index = handlerBatchListenerFailedException(thrownException, data);
163+
if (index > -1) {
164+
return seekOrRecover(thrownException, data, consumer, container, index);
165+
}
166+
fallback(thrownException, data, consumer, container, invokeListener);
167+
return ConsumerRecords.empty();
168+
}
169+
170+
protected int handlerBatchListenerFailedException(Exception thrownException, ConsumerRecords<?, ?> data) {
162171

163172
BatchListenerFailedException batchListenerFailedException = getBatchListenerFailedException(thrownException);
164173
if (batchListenerFailedException == null) {
165174
this.logger.debug(thrownException, "Expected a BatchListenerFailedException; re-delivering full batch");
166-
fallback(thrownException, data, consumer, container, invokeListener);
167175
}
168176
else {
169177
getRetryListeners().forEach(listener -> listener.failedDelivery(data, thrownException, 1));
@@ -181,13 +189,12 @@ protected <K, V> ConsumerRecords<K, V> handle(Exception thrownException, Consume
181189

182190
}
183191
});
184-
fallback(thrownException, data, consumer, container, invokeListener);
185192
}
186193
else {
187-
return seekOrRecover(thrownException, data, consumer, container, index);
194+
return index;
188195
}
189196
}
190-
return ConsumerRecords.empty();
197+
return -1;
191198
}
192199

193200
private void fallback(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
@@ -201,11 +208,9 @@ private int findIndex(ConsumerRecords<?, ?> data, ConsumerRecord<?, ?> record) {
201208
return -1;
202209
}
203210
int i = 0;
204-
Iterator<?> iterator = data.iterator();
205-
while (iterator.hasNext()) {
206-
ConsumerRecord<?, ?> candidate = (ConsumerRecord<?, ?>) iterator.next();
207-
if (candidate.topic().equals(record.topic()) && candidate.partition() == record.partition()
208-
&& candidate.offset() == record.offset()) {
211+
for (ConsumerRecord<?, ?> datum : data) {
212+
if (datum.topic().equals(record.topic()) && datum.partition() == record.partition()
213+
&& datum.offset() == record.offset()) {
209214
break;
210215
}
211216
i++;

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.List;
3333
import java.util.Map;
3434
import java.util.Map.Entry;
35+
import java.util.Objects;
3536
import java.util.Properties;
3637
import java.util.Set;
3738
import java.util.concurrent.BlockingQueue;
@@ -916,7 +917,6 @@ else if (listener instanceof MessageListener) {
916917
if (this.containerProperties.isLogContainerConfig()) {
917918
this.logger.info(toString());
918919
}
919-
Map<String, Object> props = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties();
920920
ApplicationContext applicationContext = getApplicationContext();
921921
this.checkNullKeyForExceptions = this.containerProperties.isCheckDeserExWhenKeyNull()
922922
|| ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory,
@@ -2180,7 +2180,11 @@ private void batchRollback(final ConsumerRecords<K, V> records,
21802180

21812181
@Override
21822182
protected void doInTransactionWithoutResult(TransactionStatus status) {
2183-
batchAfterRollback(records, recordList, e, afterRollbackProcessorToUse);
2183+
afterRollbackProcessorToUse.processBatch(records,
2184+
Objects.requireNonNullElseGet(recordList, () -> createRecordList(records)),
2185+
ListenerConsumer.this.consumer,
2186+
KafkaMessageListenerContainer.this.thisOrParentContainer, e, true,
2187+
ListenerConsumer.this.eosMode);
21842188
}
21852189

21862190
});
@@ -2195,16 +2199,10 @@ private void batchAfterRollback(final ConsumerRecords<K, V> records,
21952199
AfterRollbackProcessor<K, V> afterRollbackProcessorToUse) {
21962200

21972201
try {
2198-
if (recordList == null) {
2199-
afterRollbackProcessorToUse.process(createRecordList(records), this.consumer,
2200-
KafkaMessageListenerContainer.this.thisOrParentContainer, rollbackException, false,
2201-
this.eosMode);
2202-
}
2203-
else {
2204-
afterRollbackProcessorToUse.process(recordList, this.consumer,
2205-
KafkaMessageListenerContainer.this.thisOrParentContainer, rollbackException, false,
2206-
this.eosMode);
2207-
}
2202+
afterRollbackProcessorToUse.processBatch(records,
2203+
Objects.requireNonNullElseGet(recordList, () -> createRecordList(records)), this.consumer,
2204+
KafkaMessageListenerContainer.this.thisOrParentContainer, rollbackException, true,
2205+
this.eosMode);
22082206
}
22092207
catch (KafkaException ke) {
22102208
ke.selfLog("AfterRollbackProcessor threw an exception", this.logger);
@@ -2285,9 +2283,7 @@ private void commitOffsetsIfNeeded(final ConsumerRecords<K, V> records) {
22852283
|| this.producer != null) {
22862284
if (this.remainingRecords != null) {
22872285
ConsumerRecord<K, V> firstUncommitted = this.remainingRecords.iterator().next();
2288-
Iterator<ConsumerRecord<K, V>> it = records.iterator();
2289-
while (it.hasNext()) {
2290-
ConsumerRecord<K, V> next = it.next();
2286+
for (ConsumerRecord<K, V> next : records) {
22912287
if (!next.equals(firstUncommitted)) {
22922288
this.acks.add(next);
22932289
}
@@ -2716,8 +2712,8 @@ private void pauseForNackSleep() {
27162712
/**
27172713
* Actually invoke the listener.
27182714
* @param cRecord the record.
2719-
* @param iterator the {@link ConsumerRecords} iterator - used only if a
2720-
* {@link RemainingRecordsErrorHandler} is being used.
2715+
* @param iterator the {@link ConsumerRecords} iterator - only used by
2716+
* {@link DefaultErrorHandler#handleRemaining}.
27212717
* @return an exception.
27222718
* @throws Error an error.
27232719
*/

spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2022 the original author or authors.
2+
* Copyright 2019-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,14 +27,18 @@
2727
import static org.mockito.Mockito.times;
2828
import static org.mockito.Mockito.verify;
2929

30+
import java.util.ArrayList;
3031
import java.util.Arrays;
32+
import java.util.HashMap;
3133
import java.util.List;
34+
import java.util.Map;
3235
import java.util.concurrent.atomic.AtomicBoolean;
3336
import java.util.concurrent.atomic.AtomicReference;
3437

3538
import org.apache.kafka.clients.consumer.Consumer;
3639
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
3740
import org.apache.kafka.clients.consumer.ConsumerRecord;
41+
import org.apache.kafka.clients.consumer.ConsumerRecords;
3842
import org.apache.kafka.common.TopicPartition;
3943
import org.junit.jupiter.api.Test;
4044
import org.mockito.InOrder;
@@ -101,7 +105,6 @@ void testClassifier() {
101105

102106
@Test
103107
void testBatchBackOff() {
104-
AtomicReference<ConsumerRecord<?, ?>> recovered = new AtomicReference<>();
105108
@SuppressWarnings("unchecked")
106109
KafkaOperations<String, String> template = mock(KafkaOperations.class);
107110
given(template.isTransactional()).willReturn(true);
@@ -118,35 +121,64 @@ void testBatchBackOff() {
118121
ConsumerRecord<String, String> record1 = new ConsumerRecord<>("foo", 0, 0L, "foo", "bar");
119122
ConsumerRecord<String, String> record2 = new ConsumerRecord<>("foo", 1, 1L, "foo", "bar");
120123
List<ConsumerRecord<String, String>> records = Arrays.asList(record1, record2);
124+
Map<TopicPartition, List<ConsumerRecord<String, String>>> map = new HashMap<>();
125+
records.forEach(rec -> map.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()),
126+
tp -> new ArrayList<>()).add(rec));
127+
ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(map);
121128
IllegalStateException illegalState = new IllegalStateException();
122129
@SuppressWarnings("unchecked")
123130
Consumer<String, String> consumer = mock(Consumer.class);
124131
given(consumer.groupMetadata()).willReturn(new ConsumerGroupMetadata("foo"));
125132
MessageListenerContainer container = mock(MessageListenerContainer.class);
126133
given(container.isRunning()).willReturn(true);
127-
processor.process(records, consumer, container, illegalState, false, EOSMode.V2);
128-
processor.process(records, consumer, container, illegalState, false, EOSMode.V2);
134+
processor.processBatch(consumerRecords, records, consumer, container, illegalState, true, EOSMode.V2);
135+
processor.processBatch(consumerRecords, records, consumer, container, illegalState, true, EOSMode.V2);
129136
verify(backOff, times(2)).start();
130137
verify(execution.get(), times(2)).nextBackOff();
131138
processor.clearThreadState();
132-
processor.process(records, consumer, container, illegalState, false, EOSMode.V2);
139+
processor.processBatch(consumerRecords, records, consumer, container, illegalState, true, EOSMode.V2);
133140
verify(backOff, times(3)).start();
141+
134142
}
135143

136-
void testEarlyExitBackOff() {
137-
DefaultAfterRollbackProcessor<String, String> processor = new DefaultAfterRollbackProcessor<>(
138-
new FixedBackOff(1, 10_000));
144+
@Test
145+
void testBatchListenerFailedException() {
139146
@SuppressWarnings("unchecked")
140-
Consumer<String, String> consumer = mock(Consumer.class);
147+
KafkaOperations<String, String> template = mock(KafkaOperations.class);
148+
given(template.isTransactional()).willReturn(true);
149+
BackOff backOff = spy(new FixedBackOff(0, 1));
150+
AtomicReference<BackOffExecution> execution = new AtomicReference<>();
151+
willAnswer(inv -> {
152+
BackOffExecution exec = spy((BackOffExecution) inv.callRealMethod());
153+
execution.set(exec);
154+
return exec;
155+
}).given(backOff).start();
156+
ConsumerRecordRecoverer recoverer = mock(ConsumerRecordRecoverer.class);
157+
DefaultAfterRollbackProcessor<String, String> processor = new DefaultAfterRollbackProcessor<>(recoverer,
158+
backOff, template, false);
141159
ConsumerRecord<String, String> record1 = new ConsumerRecord<>("foo", 0, 0L, "foo", "bar");
160+
ConsumerRecord<String, String> record3 = new ConsumerRecord<>("foo", 0, 1L, "foo", "bar");
142161
ConsumerRecord<String, String> record2 = new ConsumerRecord<>("foo", 1, 1L, "foo", "bar");
143-
List<ConsumerRecord<String, String>> records = Arrays.asList(record1, record2);
144-
IllegalStateException illegalState = new IllegalStateException();
162+
ConsumerRecord<String, String> record4 = new ConsumerRecord<>("foo", 1, 2L, "foo", "bar");
163+
List<ConsumerRecord<String, String>> records = Arrays.asList(record1, record2, record3, record4);
164+
Map<TopicPartition, List<ConsumerRecord<String, String>>> map = new HashMap<>();
165+
records.forEach(rec -> map.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()),
166+
tp -> new ArrayList<>()).add(rec));
167+
ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(map);
168+
@SuppressWarnings("unchecked")
169+
Consumer<String, String> consumer = mock(Consumer.class);
170+
given(consumer.groupMetadata()).willReturn(new ConsumerGroupMetadata("foo"));
145171
MessageListenerContainer container = mock(MessageListenerContainer.class);
146-
given(container.isRunning()).willReturn(false);
147-
long t1 = System.currentTimeMillis();
148-
processor.process(records, consumer, container, illegalState, true, EOSMode.V2);
149-
assertThat(System.currentTimeMillis() < t1 + 5_000);
172+
ContainerProperties containerProperties = mock(ContainerProperties.class);
173+
given(container.getContainerProperties()).willReturn(containerProperties);
174+
given(container.isRunning()).willReturn(true);
175+
BatchListenerFailedException batchFailed = new BatchListenerFailedException("", 2);
176+
processor.processBatch(consumerRecords, records, consumer, container, batchFailed, true, EOSMode.V2);
177+
verify(consumer).seek(new TopicPartition("foo", 1), 1);
178+
BatchListenerFailedException batchFailed2 = new BatchListenerFailedException("", record1);
179+
processor.processBatch(consumerRecords, records, consumer, container, batchFailed2, true, EOSMode.V2);
180+
verify(consumer).seek(new TopicPartition("foo", 0), 0);
181+
verify(consumer, times(2)).seek(new TopicPartition("foo", 1), 1);
150182
}
151183

152184
@Test

0 commit comments

Comments
 (0)