Skip to content

Commit 5b63191

Browse files
frosierespring-builds
authored andcommitted
GH-3019: seek even in case of commit failure
Fixes: #3019 * DefaultErrorHandler is not able to seek in case of an exception during the commit (cherry picked from commit cfa369b)
1 parent 2c9aacc commit 5b63191

File tree

2 files changed

+77
-55
lines changed

2 files changed

+77
-55
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java

Lines changed: 40 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2023 the original author or authors.
2+
* Copyright 2021-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -229,49 +229,53 @@ private <K, V> ConsumerRecords<K, V> seekOrRecover(Exception thrownException, @N
229229
remaining.add(datum);
230230
}
231231
}
232-
if (offsets.size() > 0) {
233-
commit(consumer, container, offsets);
234-
}
235-
if (isSeekAfterError()) {
236-
if (remaining.size() > 0) {
237-
SeekUtils.seekOrRecover(thrownException, remaining, consumer, container, false,
238-
getFailureTracker(), this.logger, getLogLevel());
239-
ConsumerRecord<?, ?> recovered = remaining.get(0);
240-
commit(consumer, container,
241-
Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()),
242-
ListenerUtils.createOffsetAndMetadata(container, recovered.offset() + 1)));
243-
if (remaining.size() > 1) {
244-
throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException);
245-
}
232+
try {
233+
if (offsets.size() > 0) {
234+
commit(consumer, container, offsets);
246235
}
247-
return ConsumerRecords.empty();
248236
}
249-
else {
250-
if (remaining.size() > 0) {
251-
try {
252-
if (getFailureTracker().recovered(remaining.get(0), thrownException, container,
253-
consumer)) {
254-
remaining.remove(0);
237+
finally {
238+
if (isSeekAfterError()) {
239+
if (remaining.size() > 0) {
240+
SeekUtils.seekOrRecover(thrownException, remaining, consumer, container, false,
241+
getFailureTracker(), this.logger, getLogLevel());
242+
ConsumerRecord<?, ?> recovered = remaining.get(0);
243+
commit(consumer, container,
244+
Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()),
245+
ListenerUtils.createOffsetAndMetadata(container, recovered.offset() + 1)));
246+
if (remaining.size() > 1) {
247+
throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException);
255248
}
256249
}
257-
catch (Exception e) {
258-
if (SeekUtils.isBackoffException(thrownException)) {
259-
this.logger.debug(e, () -> KafkaUtils.format(remaining.get(0))
260-
+ " included in remaining due to retry back off " + thrownException);
250+
return ConsumerRecords.empty();
251+
}
252+
else {
253+
if (remaining.size() > 0) {
254+
try {
255+
if (getFailureTracker().recovered(remaining.get(0), thrownException, container,
256+
consumer)) {
257+
remaining.remove(0);
258+
}
261259
}
262-
else {
263-
this.logger.error(e, KafkaUtils.format(remaining.get(0))
264-
+ " included in remaining due to " + thrownException);
260+
catch (Exception e) {
261+
if (SeekUtils.isBackoffException(thrownException)) {
262+
this.logger.debug(e, () -> KafkaUtils.format(remaining.get(0))
263+
+ " included in remaining due to retry back off " + thrownException);
264+
}
265+
else {
266+
this.logger.error(e, KafkaUtils.format(remaining.get(0))
267+
+ " included in remaining due to " + thrownException);
268+
}
265269
}
266270
}
271+
if (remaining.isEmpty()) {
272+
return ConsumerRecords.empty();
273+
}
274+
Map<TopicPartition, List<ConsumerRecord<K, V>>> remains = new HashMap<>();
275+
remaining.forEach(rec -> remains.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()),
276+
tp -> new ArrayList<>()).add((ConsumerRecord<K, V>) rec));
277+
return new ConsumerRecords<>(remains);
267278
}
268-
if (remaining.isEmpty()) {
269-
return ConsumerRecords.empty();
270-
}
271-
Map<TopicPartition, List<ConsumerRecord<K, V>>> remains = new HashMap<>();
272-
remaining.forEach(rec -> remains.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()),
273-
tp -> new ArrayList<>()).add((ConsumerRecord<K, V>) rec));
274-
return new ConsumerRecords<>(remains);
275279
}
276280
}
277281

spring-kafka/src/test/java/org/springframework/kafka/listener/FailedBatchProcessorTests.java

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 the original author or authors.
2+
* Copyright 2023-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,8 +17,11 @@
1717
package org.springframework.kafka.listener;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2021
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
2122
import static org.mockito.ArgumentMatchers.any;
23+
import static org.mockito.ArgumentMatchers.anyMap;
24+
import static org.mockito.BDDMockito.willReturn;
2225
import static org.mockito.BDDMockito.willThrow;
2326
import static org.mockito.Mockito.mock;
2427
import static org.mockito.Mockito.spy;
@@ -34,16 +37,19 @@
3437
import org.apache.kafka.clients.consumer.ConsumerRecord;
3538
import org.apache.kafka.clients.consumer.ConsumerRecords;
3639
import org.apache.kafka.common.TopicPartition;
40+
import org.apache.kafka.common.errors.RebalanceInProgressException;
3741
import org.junit.jupiter.api.Test;
3842
import org.mockito.ArgumentCaptor;
3943

4044
import org.springframework.core.log.LogAccessor;
4145
import org.springframework.data.util.DirectFieldAccessFallbackBeanWrapper;
46+
import org.springframework.kafka.KafkaException;
4247
import org.springframework.util.backoff.BackOff;
4348
import org.springframework.util.backoff.FixedBackOff;
4449

4550
/**
4651
* @author Gary Russell
52+
* @author Francois Rosiere
4753
* @since 3.0.3
4854
*
4955
*/
@@ -52,15 +58,6 @@ public class FailedBatchProcessorTests {
5258
@SuppressWarnings({ "rawtypes", "unchecked" })
5359
@Test
5460
void indexOutOfBounds() {
55-
class TestFBP extends FailedBatchProcessor {
56-
57-
TestFBP(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
58-
CommonErrorHandler fallbackHandler) {
59-
60-
super(recoverer, backOff, fallbackHandler);
61-
}
62-
63-
}
6461
CommonErrorHandler mockEH = mock(CommonErrorHandler.class);
6562
willThrow(new IllegalStateException("fallback")).given(mockEH).handleBatch(any(), any(), any(), any(), any());
6663

@@ -83,15 +80,6 @@ records, mock(Consumer.class), mock(MessageListenerContainer.class), mock(Runnab
8380
@SuppressWarnings({ "rawtypes", "unchecked" })
8481
@Test
8582
void recordNotPresent() {
86-
class TestFBP extends FailedBatchProcessor {
87-
88-
TestFBP(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
89-
CommonErrorHandler fallbackHandler) {
90-
91-
super(recoverer, backOff, fallbackHandler);
92-
}
93-
94-
}
9583
CommonErrorHandler mockEH = mock(CommonErrorHandler.class);
9684
willThrow(new IllegalStateException("fallback")).given(mockEH).handleBatch(any(), any(), any(), any(), any());
9785

@@ -114,4 +102,34 @@ records, mock(Consumer.class), mock(MessageListenerContainer.class), mock(Runnab
114102
assertThat(output).contains("Record not found in batch: topic-42@123;");
115103
}
116104

105+
@Test
106+
void testExceptionDuringCommit() {
107+
CommonErrorHandler mockEH = mock(CommonErrorHandler.class);
108+
willThrow(new IllegalStateException("ise")).given(mockEH).handleBatch(any(), any(), any(), any(), any());
109+
110+
ConsumerRecord rec1 = new ConsumerRecord("topic", 0, 0L, null, null);
111+
ConsumerRecord rec2 = new ConsumerRecord("topic", 0, 1L, null, null);
112+
ConsumerRecord rec3 = new ConsumerRecord("topic", 0, 2L, null, null);
113+
114+
ConsumerRecords records = new ConsumerRecords(Map.of(new TopicPartition("topic", 0), List.of(rec1, rec2, rec3)));
115+
TestFBP testFBP = new TestFBP((rec, ex) -> { }, new FixedBackOff(2L, 2L), mockEH);
116+
final Consumer consumer = mock(Consumer.class);
117+
willThrow(new RebalanceInProgressException("rebalance in progress")).given(consumer).commitSync(anyMap(), any());
118+
final MessageListenerContainer mockMLC = mock(MessageListenerContainer.class);
119+
willReturn(new ContainerProperties("topic")).given(mockMLC).getContainerProperties();
120+
assertThatExceptionOfType(KafkaException.class).isThrownBy(() ->
121+
testFBP.handle(new BatchListenerFailedException("topic", rec2),
122+
records, consumer, mockMLC, mock(Runnable.class))
123+
).withMessage("Seek to current after exception");
124+
}
125+
126+
static class TestFBP extends FailedBatchProcessor {
127+
128+
TestFBP(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
129+
CommonErrorHandler fallbackHandler) {
130+
131+
super(recoverer, backOff, fallbackHandler);
132+
}
133+
134+
}
117135
}

0 commit comments

Comments
 (0)