Skip to content

Commit e2b4a57

Browse files
committed
spring-projectsGH-3019: seek even in case of commit failure
1 parent 61bc219 commit e2b4a57

File tree

2 files changed

+69
-48
lines changed

2 files changed

+69
-48
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2023 the original author or authors.
2+
* Copyright 2021-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -236,40 +236,44 @@ private <K, V> ConsumerRecords<K, V> seekOrRecover(Exception thrownException, @N
236236
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
237237
toCommit.forEach(rec -> offsets.compute(new TopicPartition(rec.topic(), rec.partition()),
238238
(key, val) -> ListenerUtils.createOffsetAndMetadata(container, rec.offset() + 1)));
239-
if (offsets.size() > 0) {
240-
commit(consumer, container, offsets);
241-
}
242-
if (isSeekAfterError()) {
243-
if (remaining.size() > 0) {
244-
SeekUtils.seekOrRecover(thrownException, remaining, consumer, container, false,
245-
getFailureTracker()::recovered, this.logger, getLogLevel());
246-
ConsumerRecord<?, ?> recovered = remaining.get(0);
247-
commit(consumer, container,
248-
Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()),
249-
ListenerUtils.createOffsetAndMetadata(container, recovered.offset() + 1)));
250-
if (remaining.size() > 1) {
251-
throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException);
252-
}
239+
try {
240+
if (offsets.size() > 0) {
241+
commit(consumer, container, offsets);
253242
}
254-
return ConsumerRecords.empty();
255243
}
256-
else {
257-
if (indexArg == 0) {
258-
return (ConsumerRecords<K, V>) data; // first record just rerun the whole thing
244+
finally {
245+
if (isSeekAfterError()) {
246+
if (remaining.size() > 0) {
247+
SeekUtils.seekOrRecover(thrownException, remaining, consumer, container, false,
248+
getFailureTracker()::recovered, this.logger, getLogLevel());
249+
ConsumerRecord<?, ?> recovered = remaining.get(0);
250+
commit(consumer, container,
251+
Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()),
252+
ListenerUtils.createOffsetAndMetadata(container, recovered.offset() + 1)));
253+
if (remaining.size() > 1) {
254+
throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException);
255+
}
256+
}
257+
return ConsumerRecords.empty();
259258
}
260259
else {
261-
try {
262-
if (getFailureTracker().recovered(remaining.get(0), thrownException, container,
263-
consumer)) {
264-
remaining.remove(0);
265-
}
260+
if (indexArg == 0) {
261+
return (ConsumerRecords<K, V>) data; // first record just rerun the whole thing
266262
}
267-
catch (Exception e) {
263+
else {
264+
try {
265+
if (getFailureTracker().recovered(remaining.get(0), thrownException, container,
266+
consumer)) {
267+
remaining.remove(0);
268+
}
269+
}
270+
catch (Exception e) {
271+
}
272+
Map<TopicPartition, List<ConsumerRecord<K, V>>> remains = new HashMap<>();
273+
remaining.forEach(rec -> remains.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()),
274+
tp -> new ArrayList<ConsumerRecord<K, V>>()).add((ConsumerRecord<K, V>) rec));
275+
return new ConsumerRecords<>(remains);
268276
}
269-
Map<TopicPartition, List<ConsumerRecord<K, V>>> remains = new HashMap<>();
270-
remaining.forEach(rec -> remains.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()),
271-
tp -> new ArrayList<ConsumerRecord<K, V>>()).add((ConsumerRecord<K, V>) rec));
272-
return new ConsumerRecords<>(remains);
273277
}
274278
}
275279
}

spring-kafka/src/test/java/org/springframework/kafka/listener/FailedBatchProcessorTests.java

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 the original author or authors.
2+
* Copyright 2023-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,8 +17,11 @@
1717
package org.springframework.kafka.listener;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2021
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
2122
import static org.mockito.ArgumentMatchers.any;
23+
import static org.mockito.ArgumentMatchers.anyMap;
24+
import static org.mockito.BDDMockito.willReturn;
2225
import static org.mockito.BDDMockito.willThrow;
2326
import static org.mockito.Mockito.mock;
2427
import static org.mockito.Mockito.spy;
@@ -34,11 +37,13 @@
3437
import org.apache.kafka.clients.consumer.ConsumerRecord;
3538
import org.apache.kafka.clients.consumer.ConsumerRecords;
3639
import org.apache.kafka.common.TopicPartition;
40+
import org.apache.kafka.common.errors.RebalanceInProgressException;
3741
import org.junit.jupiter.api.Test;
3842
import org.mockito.ArgumentCaptor;
3943

4044
import org.springframework.core.log.LogAccessor;
4145
import org.springframework.data.util.DirectFieldAccessFallbackBeanWrapper;
46+
import org.springframework.kafka.KafkaException;
4247
import org.springframework.util.backoff.BackOff;
4348
import org.springframework.util.backoff.FixedBackOff;
4449

@@ -52,15 +57,6 @@ public class FailedBatchProcessorTests {
5257
@SuppressWarnings({ "rawtypes", "unchecked" })
5358
@Test
5459
void indexOutOfBounds() {
55-
class TestFBP extends FailedBatchProcessor {
56-
57-
TestFBP(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
58-
CommonErrorHandler fallbackHandler) {
59-
60-
super(recoverer, backOff, fallbackHandler);
61-
}
62-
63-
}
6460
CommonErrorHandler mockEH = mock(CommonErrorHandler.class);
6561
willThrow(new IllegalStateException("fallback")).given(mockEH).handleBatch(any(), any(), any(), any(), any());
6662

@@ -83,15 +79,6 @@ records, mock(Consumer.class), mock(MessageListenerContainer.class), mock(Runnab
8379
@SuppressWarnings({ "rawtypes", "unchecked" })
8480
@Test
8581
void recordNotPresent() {
86-
class TestFBP extends FailedBatchProcessor {
87-
88-
TestFBP(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
89-
CommonErrorHandler fallbackHandler) {
90-
91-
super(recoverer, backOff, fallbackHandler);
92-
}
93-
94-
}
9582
CommonErrorHandler mockEH = mock(CommonErrorHandler.class);
9683
willThrow(new IllegalStateException("fallback")).given(mockEH).handleBatch(any(), any(), any(), any(), any());
9784

@@ -114,4 +101,34 @@ records, mock(Consumer.class), mock(MessageListenerContainer.class), mock(Runnab
114101
assertThat(output).contains("Record not found in batch: topic-42@123;");
115102
}
116103

104+
@Test
105+
void testExceptionDuringCommit() {
106+
CommonErrorHandler mockEH = mock(CommonErrorHandler.class);
107+
willThrow(new IllegalStateException("ise")).given(mockEH).handleBatch(any(), any(), any(), any(), any());
108+
109+
ConsumerRecord rec1 = new ConsumerRecord("topic", 0, 0L, null, null);
110+
ConsumerRecord rec2 = new ConsumerRecord("topic", 0, 1L, null, null);
111+
ConsumerRecord rec3 = new ConsumerRecord("topic", 0, 2L, null, null);
112+
113+
ConsumerRecords records = new ConsumerRecords(Map.of(new TopicPartition("topic", 0), List.of(rec1, rec2, rec3)));
114+
TestFBP testFBP = new TestFBP((rec, ex) -> { }, new FixedBackOff(2L, 2L), mockEH);
115+
final Consumer consumer = mock(Consumer.class);
116+
willThrow(new RebalanceInProgressException("rebalance in progress")).given(consumer).commitSync(anyMap(), any());
117+
final MessageListenerContainer mockMLC = mock(MessageListenerContainer.class);
118+
willReturn(new ContainerProperties("topic")).given(mockMLC).getContainerProperties();
119+
assertThatExceptionOfType(KafkaException.class).isThrownBy(() ->
120+
testFBP.handle(new BatchListenerFailedException("topic", rec2),
121+
records, consumer, mockMLC, mock(Runnable.class))
122+
).withMessage("Seek to current after exception");
123+
}
124+
125+
static class TestFBP extends FailedBatchProcessor {
126+
127+
TestFBP(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
128+
CommonErrorHandler fallbackHandler) {
129+
130+
super(recoverer, backOff, fallbackHandler);
131+
}
132+
133+
}
117134
}

0 commit comments

Comments
 (0)