Skip to content

Commit 8ad588e

Browse files
garyrussellartembilan
authored andcommitted
GH-2395: RetryListener - Add Batch Methods
Resolves #2395 Previously, `RetryListener` only supported record listeners; add methods for retrying batches. **cherry-pick to 2.9.x, 2.8.x** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java
1 parent f584d8f commit 8ad588e

File tree

6 files changed

+148
-19
lines changed

6 files changed

+148
-19
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5295,6 +5295,7 @@ public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
52955295
====
52965296

52975297
The error handler can be configured with one or more `RetryListener` s, receiving notifications of retry and recovery progress.
5298+
Starting with version 2.8.10, methods for batch listeners were added.
52985299

52995300
====
53005301
[source, java]
@@ -5310,6 +5311,15 @@ public interface RetryListener {
53105311
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
53115312
}
53125313
5314+
default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
5315+
}
5316+
5317+
default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
5318+
}
5319+
5320+
default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
5321+
}
5322+
53135323
}
53145324
----
53155325
====

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.listener;
1818

1919
import java.time.Duration;
20+
import java.util.List;
2021
import java.util.Set;
2122
import java.util.function.BiConsumer;
2223

@@ -26,6 +27,7 @@
2627

2728
import org.springframework.core.log.LogAccessor;
2829
import org.springframework.kafka.KafkaException;
30+
import org.springframework.lang.Nullable;
2931
import org.springframework.util.backoff.BackOff;
3032
import org.springframework.util.backoff.BackOffExecution;
3133

@@ -38,9 +40,28 @@
3840
*/
3941
public final class ErrorHandlingUtils {
4042

43+
private static final ThreadLocal<List<RetryListener>> retryListeners = new ThreadLocal<>();
44+
4145
private ErrorHandlingUtils() {
4246
}
4347

48+
/**
49+
* Set the retry listeners.
50+
* @param listeners the listeners.
51+
* @since 2.8.10
52+
*/
53+
public static void setRetryListeners(List<RetryListener> listeners) {
54+
retryListeners.set(listeners);
55+
}
56+
57+
/**
58+
* Clear the retry listeners.
59+
* @since 2.8.10
60+
*/
61+
public static void clearRetryListeners() {
62+
retryListeners.remove();
63+
}
64+
4465
/**
4566
* Retry a complete batch by pausing the consumer and then, in a loop, poll the
4667
* consumer, wait for the next back off, then call the listener. When retries are
@@ -66,6 +87,9 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
6687
String failed = null;
6788
Set<TopicPartition> assignment = consumer.assignment();
6889
consumer.pause(assignment);
90+
List<RetryListener> listeners = retryListeners.get();
91+
int attempt = 1;
92+
listen(listeners, records, thrownException, attempt++);
6993
if (container instanceof KafkaMessageListenerContainer) {
7094
((KafkaMessageListenerContainer<?, ?>) container).publishConsumerPausedEvent(assignment, "For batch retry");
7195
}
@@ -87,20 +111,27 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
87111
invokeListener.run();
88112
return;
89113
}
90-
catch (Exception e) {
114+
catch (Exception ex) {
115+
listen(listeners, records, ex, attempt++);
91116
if (failed == null) {
92117
failed = recordsToString(records);
93118
}
94119
String toLog = failed;
95-
logger.debug(e, () -> "Retry failed for: " + toLog);
120+
logger.debug(ex, () -> "Retry failed for: " + toLog);
96121
}
97122
nextBackOff = execution.nextBackOff();
98123
}
99124
try {
100125
recoverer.accept(records, thrownException);
126+
if (listeners != null) {
127+
listeners.forEach(listener -> listener.recovered(records, thrownException));
128+
}
101129
}
102-
catch (Exception e) {
103-
logger.error(e, () -> "Recoverer threw an exception; re-seeking batch");
130+
catch (Exception ex) {
131+
logger.error(ex, () -> "Recoverer threw an exception; re-seeking batch");
132+
if (listeners != null) {
133+
listeners.forEach(listener -> listener.recoveryFailed(records, thrownException, ex));
134+
}
104135
seeker.handleBatch(thrownException, records, consumer, container, () -> { });
105136
}
106137
}
@@ -113,6 +144,14 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
113144
}
114145
}
115146

147+
private static void listen(@Nullable List<RetryListener> listeners, ConsumerRecords<?, ?> records,
148+
Exception thrownException, int attempt) {
149+
150+
if (listeners != null) {
151+
listeners.forEach(listener -> listener.failedDelivery(records, thrownException, attempt));
152+
}
153+
}
154+
116155
/**
117156
* Represent the records as a comma-delimited String of {@code topic-part@offset}.
118157
* @param records the records.

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -69,16 +69,6 @@ public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception
6969
this(recoverer, backOff, null, fallbackHandler);
7070
}
7171

72-
/**
73-
* Return the fallback batch error handler.
74-
* @return the handler.
75-
* @since 2.8.8
76-
*/
77-
protected CommonErrorHandler getFallbackBatchHandler() {
78-
return this.fallbackBatchHandler;
79-
}
80-
81-
8272
/**
8373
* Construct an instance with the provided properties.
8474
* @param recoverer the recoverer.
@@ -94,6 +84,15 @@ public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception
9484
this.fallbackBatchHandler = fallbackHandler;
9585
}
9686

87+
/**
88+
* Return the fallback batch error handler.
89+
* @return the handler.
90+
* @since 2.8.8
91+
*/
92+
protected CommonErrorHandler getFallbackBatchHandler() {
93+
return this.fallbackBatchHandler;
94+
}
95+
9796
protected void doHandle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
9897
MessageListenerContainer container, Runnable invokeListener) {
9998

@@ -105,17 +104,18 @@ protected <K, V> ConsumerRecords<K, V> handle(Exception thrownException, Consume
105104

106105
BatchListenerFailedException batchListenerFailedException = getBatchListenerFailedException(thrownException);
107106
if (batchListenerFailedException == null) {
108-
this.logger.debug(thrownException, "Expected a BatchListenerFailedException; re-seeking batch");
109-
this.fallbackBatchHandler.handleBatch(thrownException, data, consumer, container, invokeListener);
107+
this.logger.debug(thrownException, "Expected a BatchListenerFailedException; re-delivering full batch");
108+
fallback(thrownException, data, consumer, container, invokeListener);
110109
}
111110
else {
111+
getRetryListeners().forEach(listener -> listener.failedDelivery(data, thrownException, 1));
112112
ConsumerRecord<?, ?> record = batchListenerFailedException.getRecord();
113113
int index = record != null ? findIndex(data, record) : batchListenerFailedException.getIndex();
114114
if (index < 0 || index >= data.count()) {
115115
this.logger.warn(batchListenerFailedException, () ->
116116
String.format("Record not found in batch: %s-%d@%d; re-seeking batch",
117117
record.topic(), record.partition(), record.offset()));
118-
this.fallbackBatchHandler.handleBatch(thrownException, data, consumer, container, invokeListener);
118+
fallback(thrownException, data, consumer, container, invokeListener);
119119
}
120120
else {
121121
return seekOrRecover(thrownException, data, consumer, container, index);
@@ -124,6 +124,18 @@ protected <K, V> ConsumerRecords<K, V> handle(Exception thrownException, Consume
124124
return ConsumerRecords.empty();
125125
}
126126

127+
private void fallback(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
128+
MessageListenerContainer container, Runnable invokeListener) {
129+
130+
ErrorHandlingUtils.setRetryListeners(getRetryListeners());
131+
try {
132+
this.fallbackBatchHandler.handleBatch(thrownException, data, consumer, container, invokeListener);
133+
}
134+
finally {
135+
ErrorHandlingUtils.clearRetryListeners();
136+
}
137+
}
138+
127139
private int findIndex(ConsumerRecords<?, ?> data, ConsumerRecord<?, ?> record) {
128140
if (record == null) {
129141
return -1;

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.util.ArrayList;
20+
import java.util.Arrays;
1921
import java.util.List;
2022
import java.util.function.BiConsumer;
2123
import java.util.function.BiFunction;
@@ -54,6 +56,8 @@ public abstract class FailedRecordProcessor extends ExceptionClassifier implemen
5456

5557
private final FailedRecordTracker failureTracker;
5658

59+
private final List<RetryListener> retryListeners = new ArrayList<>();
60+
5761
private boolean commitRecovered;
5862

5963
private BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> userBackOffFunction = (rec, ex) -> null;
@@ -130,7 +134,14 @@ public void setResetStateOnExceptionChange(boolean resetStateOnExceptionChange)
130134
* @since 2.7
131135
*/
132136
public void setRetryListeners(RetryListener... listeners) {
137+
Assert.noNullElements(listeners, "'listeners' cannot have null elements");
133138
this.failureTracker.setRetryListeners(listeners);
139+
this.retryListeners.clear();
140+
this.retryListeners.addAll(Arrays.asList(listeners));
141+
}
142+
143+
protected List<RetryListener> getRetryListeners() {
144+
return this.retryListeners;
134145
}
135146

136147
/**

spring-kafka/src/main/java/org/springframework/kafka/listener/RetryListener.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.listener;
1818

1919
import org.apache.kafka.clients.consumer.ConsumerRecord;
20+
import org.apache.kafka.clients.consumer.ConsumerRecords;
2021

2122
/**
2223
* A listener for retry activity.
@@ -53,4 +54,31 @@ default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
5354
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
5455
}
5556

57+
/**
58+
* Called after a delivery failed for batch records.
59+
* @param records the records.
60+
* @param ex the exception.
61+
* @param deliveryAttempt the delivery attempt, if available.
62+
* @since 2.8.10
63+
*/
64+
default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
65+
}
66+
67+
/**
68+
* Called after a failing record was successfully recovered.
69+
* @param records the record.
70+
* @param ex the exception.
71+
*/
72+
default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
73+
}
74+
75+
/**
76+
* Called after a recovery attempt failed.
77+
* @param records the record.
78+
* @param original the original exception causing the recovery attempt.
79+
* @param failure the exception thrown by the recoverer.
80+
*/
81+
default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
82+
}
83+
5684
}

spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerBatchTests.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2021 the original author or authors.
2+
* Copyright 2020-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,10 +19,12 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2121
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.ArgumentMatchers.eq;
2223
import static org.mockito.BDDMockito.given;
2324
import static org.mockito.BDDMockito.willAnswer;
2425
import static org.mockito.Mockito.inOrder;
2526
import static org.mockito.Mockito.mock;
27+
import static org.mockito.Mockito.times;
2628
import static org.mockito.Mockito.verify;
2729

2830
import java.time.Duration;
@@ -205,6 +207,33 @@ records, mockConsumer, mock(MessageListenerContainer.class), () -> { }))
205207
verify(mockConsumer).seek(tp, 0L);
206208
}
207209

210+
@SuppressWarnings({ "unchecked", "rawtypes" })
211+
@Test
212+
void fallbackListener() {
213+
Consumer mockConsumer = mock(Consumer.class);
214+
ConsumerRecordRecoverer recoverer = mock(ConsumerRecordRecoverer.class);
215+
DefaultErrorHandler beh = new DefaultErrorHandler(recoverer, new FixedBackOff(0, 2));
216+
RetryListener retryListener = mock(RetryListener.class);
217+
beh.setRetryListeners(retryListener);
218+
TopicPartition tp = new TopicPartition("foo", 0);
219+
ConsumerRecords<?, ?> records = new ConsumerRecords(Collections.singletonMap(tp,
220+
List.of(new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo",
221+
new RecordHeaders(), Optional.empty()),
222+
new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo",
223+
new RecordHeaders(), Optional.empty()))));
224+
MessageListenerContainer container = mock(MessageListenerContainer.class);
225+
given(container.isRunning()).willReturn(true);
226+
beh.handleBatch(new ListenerExecutionFailedException("test"),
227+
records, mockConsumer, container, () -> {
228+
throw new ListenerExecutionFailedException("test");
229+
});
230+
verify(retryListener).failedDelivery(any(ConsumerRecords.class), any(), eq(1));
231+
verify(retryListener).failedDelivery(any(ConsumerRecords.class), any(), eq(2));
232+
verify(retryListener).failedDelivery(any(ConsumerRecords.class), any(), eq(3));
233+
verify(recoverer, times(2)).accept(any(), any()); // each record in batch
234+
verify(retryListener).recovered(any(ConsumerRecords.class), any());
235+
}
236+
208237
@Configuration
209238
@EnableKafka
210239
public static class Config {

0 commit comments

Comments
 (0)