Skip to content

Commit 7ac6d4f

Browse files
committed
spring-projectsGH-3019: seek even in case of commit failure
1 parent 3367e3e commit 7ac6d4f

File tree

1 file changed

+39
-35
lines changed

1 file changed

+39
-35
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java

Lines changed: 39 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -229,49 +229,53 @@ private <K, V> ConsumerRecords<K, V> seekOrRecover(Exception thrownException, @N
229229
remaining.add(datum);
230230
}
231231
}
232-
if (offsets.size() > 0) {
233-
commit(consumer, container, offsets);
234-
}
235-
if (isSeekAfterError()) {
236-
if (remaining.size() > 0) {
237-
SeekUtils.seekOrRecover(thrownException, remaining, consumer, container, false,
238-
getFailureTracker(), this.logger, getLogLevel());
239-
ConsumerRecord<?, ?> recovered = remaining.get(0);
240-
commit(consumer, container,
241-
Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()),
242-
ListenerUtils.createOffsetAndMetadata(container, recovered.offset() + 1)));
243-
if (remaining.size() > 1) {
244-
throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException);
245-
}
232+
try {
233+
if (offsets.size() > 0) {
234+
commit(consumer, container, offsets);
246235
}
247-
return ConsumerRecords.empty();
248236
}
249-
else {
250-
if (remaining.size() > 0) {
251-
try {
252-
if (getFailureTracker().recovered(remaining.get(0), thrownException, container,
253-
consumer)) {
254-
remaining.remove(0);
237+
finally {
238+
if (isSeekAfterError()) {
239+
if (remaining.size() > 0) {
240+
SeekUtils.seekOrRecover(thrownException, remaining, consumer, container, false,
241+
getFailureTracker(), this.logger, getLogLevel());
242+
ConsumerRecord<?, ?> recovered = remaining.get(0);
243+
commit(consumer, container,
244+
Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()),
245+
ListenerUtils.createOffsetAndMetadata(container, recovered.offset() + 1)));
246+
if (remaining.size() > 1) {
247+
throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException);
255248
}
256249
}
257-
catch (Exception e) {
258-
if (SeekUtils.isBackoffException(thrownException)) {
259-
this.logger.debug(e, () -> KafkaUtils.format(remaining.get(0))
260-
+ " included in remaining due to retry back off " + thrownException);
250+
return ConsumerRecords.empty();
251+
}
252+
else {
253+
if (remaining.size() > 0) {
254+
try {
255+
if (getFailureTracker().recovered(remaining.get(0), thrownException, container,
256+
consumer)) {
257+
remaining.remove(0);
258+
}
261259
}
262-
else {
263-
this.logger.error(e, KafkaUtils.format(remaining.get(0))
264-
+ " included in remaining due to " + thrownException);
260+
catch (Exception e) {
261+
if (SeekUtils.isBackoffException(thrownException)) {
262+
this.logger.debug(e, () -> KafkaUtils.format(remaining.get(0))
263+
+ " included in remaining due to retry back off " + thrownException);
264+
}
265+
else {
266+
this.logger.error(e, KafkaUtils.format(remaining.get(0))
267+
+ " included in remaining due to " + thrownException);
268+
}
265269
}
266270
}
271+
if (remaining.isEmpty()) {
272+
return ConsumerRecords.empty();
273+
}
274+
Map<TopicPartition, List<ConsumerRecord<K, V>>> remains = new HashMap<>();
275+
remaining.forEach(rec -> remains.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()),
276+
tp -> new ArrayList<>()).add((ConsumerRecord<K, V>) rec));
277+
return new ConsumerRecords<>(remains);
267278
}
268-
if (remaining.isEmpty()) {
269-
return ConsumerRecords.empty();
270-
}
271-
Map<TopicPartition, List<ConsumerRecord<K, V>>> remains = new HashMap<>();
272-
remaining.forEach(rec -> remains.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()),
273-
tp -> new ArrayList<>()).add((ConsumerRecord<K, V>) rec));
274-
return new ConsumerRecords<>(remains);
275279
}
276280
}
277281

0 commit comments

Comments
 (0)