Skip to content

Commit e0fee7e

Browse files
GH-2212: Log listener exception in retry topic flow (#2213)
* GH-2212: Log listener ex. in retry topic flow Resolves #2212 As in Retry Topic flow recovery is successful, the exception thrown by the listener was never being logged. Now it logs at DEBUG level for intermediate retries and ERROR level when retries are exhausted. Also some polishing in TimestampedException as it was cluttering the stacktrace with duplicated information from cause.getMessage(). Now it logs the time when the exception occurred. * Address TimestampedException review suggestions Add ListenerExceptionLoggingStrategy Add unit tests * Address review comments
1 parent 0278465 commit e0fee7e

File tree

5 files changed

+295
-10
lines changed

5 files changed

+295
-10
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/TimestampedException.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -34,14 +34,41 @@ public class TimestampedException extends KafkaException {
3434

3535
private final long timestamp;
3636

37+
@Deprecated
3738
public TimestampedException(Exception ex, Clock clock) {
3839
super(ex.getMessage(), ex);
3940
this.timestamp = Instant.now(clock).toEpochMilli();
4041
}
4142

43+
/**
44+
* Constructs an instance with the provided cause
45+
* and the current time.
46+
* @param ex the exception cause.
47+
*/
4248
public TimestampedException(Exception ex) {
43-
super(ex.getMessage(), ex);
44-
this.timestamp = Instant.now(Clock.systemDefaultZone()).toEpochMilli();
49+
this(ex, Instant.now());
50+
}
51+
52+
/**
53+
* Creates an instance with the timestamp of when it was thrown and its cause.
54+
* @param ex the exception cause.
55+
* @param timestamp the millis from epoch of when the exception was thrown.
56+
* @since 2.7.13
57+
*/
58+
public TimestampedException(Exception ex, long timestamp) {
59+
super("Exception thrown at " + Instant.ofEpochMilli(timestamp), ex);
60+
this.timestamp = timestamp;
61+
}
62+
63+
/**
64+
* Creates an instance with the Instant of when it was thrown and its cause.
65+
* @param ex the exception cause.
66+
* @param now the Instant of when the exception was thrown.
67+
* @since 2.7.13
68+
*/
69+
public TimestampedException(Exception ex, Instant now) {
70+
super("Exception thrown at " + now, ex);
71+
this.timestamp = now.toEpochMilli();
4572
}
4673

4774
public long getTimestamp() {

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaBackoffAwareMessageListenerAdapter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import java.math.BigInteger;
2020
import java.time.Clock;
21+
import java.time.Instant;
2122
import java.util.Optional;
2223

2324
import org.apache.kafka.clients.consumer.Consumer;
@@ -96,7 +97,7 @@ public void onMessage(ConsumerRecord<K, V> consumerRecord, @Nullable Acknowledgm
9697
invokeDelegateOnMessage(consumerRecord, acknowledgment, consumer);
9798
}
9899
catch (Exception ex) {
99-
throw new TimestampedException(ex, this.clock);
100+
throw new TimestampedException(ex, Instant.now(this.clock));
100101
}
101102
}
102103

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory.java

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@
5454
*/
5555
public class DeadLetterPublishingRecovererFactory {
5656

57-
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(DeadLetterPublishingRecovererFactory.class));
57+
private static final LogAccessor LOGGER =
58+
new LogAccessor(LogFactory.getLog(DeadLetterPublishingRecovererFactory.class));
5859

5960
private final DestinationTopicResolver destinationTopicResolver;
6061

@@ -66,6 +67,8 @@ public class DeadLetterPublishingRecovererFactory {
6667

6768
private BiFunction<ConsumerRecord<?, ?>, Exception, Headers> headersFunction;
6869

70+
private ListenerExceptionLoggingStrategy loggingStrategy = ListenerExceptionLoggingStrategy.AFTER_RETRIES_EXHAUSTED;
71+
6972
public DeadLetterPublishingRecovererFactory(DestinationTopicResolver destinationTopicResolver) {
7073
this.destinationTopicResolver = destinationTopicResolver;
7174
}
@@ -121,6 +124,24 @@ public boolean removeNotRetryableException(Class<? extends Exception> exceptionT
121124
return this.nonFatalExceptions.add(exceptionType);
122125
}
123126

127+
/**
128+
* Never logs the listener exception.
129+
* The default is logging only after retries are exhausted.
130+
* @since 2.7.13
131+
*/
132+
public void neverLogListenerException() {
133+
this.loggingStrategy = ListenerExceptionLoggingStrategy.NEVER;
134+
}
135+
136+
/**
137+
* Logs the listener exception at each attempt.
138+
* The default is logging only after retries are exhausted.
139+
* @since 2.7.13
140+
*/
141+
public void alwaysLogListenerException() {
142+
this.loggingStrategy = ListenerExceptionLoggingStrategy.EACH_ATTEMPT;
143+
}
144+
124145
@SuppressWarnings("unchecked")
125146
public DeadLetterPublishingRecoverer create() {
126147
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(// NOSONAR anon. class size
@@ -185,11 +206,45 @@ private TopicPartition resolveDestination(ConsumerRecord<?, ?> cr, Exception e)
185206
? "none"
186207
: nextDestination.getDestinationName()));
187208

209+
maybeLogListenerException(e, cr, nextDestination);
210+
188211
return nextDestination.isNoOpsTopic()
189212
? null
190213
: resolveTopicPartition(cr, nextDestination);
191214
}
192215

216+
private void maybeLogListenerException(Exception e, ConsumerRecord<?, ?> cr, DestinationTopic nextDestination) {
217+
if (nextDestination.isDltTopic()
218+
&& !ListenerExceptionLoggingStrategy.NEVER.equals(this.loggingStrategy)) {
219+
LOGGER.error(e, () -> getErrorMessage(cr) + " and won't be retried. "
220+
+ "Sending to DLT with name " + nextDestination.getDestinationName() + ".");
221+
}
222+
else if (nextDestination.isNoOpsTopic()
223+
&& !ListenerExceptionLoggingStrategy.NEVER.equals(this.loggingStrategy)) {
224+
LOGGER.error(e, () -> getErrorMessage(cr) + " and won't be retried. "
225+
+ "No further action will be taken with this record.");
226+
}
227+
else if (ListenerExceptionLoggingStrategy.EACH_ATTEMPT.equals(this.loggingStrategy)) {
228+
LOGGER.error(e, () -> getErrorMessage(cr) + ". "
229+
+ "Sending to retry topic " + nextDestination.getDestinationName() + ".");
230+
}
231+
else {
232+
LOGGER.debug(e, () -> getErrorMessage(cr) + ". "
233+
+ "Sending to retry topic " + nextDestination.getDestinationName() + ".");
234+
}
235+
}
236+
237+
private static String getErrorMessage(ConsumerRecord<?, ?> cr) {
238+
return "Record: " + getRecordInfo(cr) + " threw an error at topic " + cr.topic();
239+
}
240+
241+
private static String getRecordInfo(ConsumerRecord<?, ?> cr) {
242+
Header originalTopicHeader = cr.headers().lastHeader(KafkaHeaders.ORIGINAL_TOPIC);
243+
return String.format("topic = %s, partition = %s, offset = %s, main topic = %s",
244+
cr.topic(), cr.partition(), cr.offset(),
245+
originalTopicHeader != null ? new String(originalTopicHeader.value()) : cr.topic());
246+
}
247+
193248
/**
194249
* Creates and returns the {@link TopicPartition}, where the original record should be forwarded.
195250
* By default, it will use the partition same as original record's partition, in the next destination topic.
@@ -284,6 +339,23 @@ private Header getOriginaTimeStampHeader(ConsumerRecord<?, ?> consumerRecord) {
284339
return consumerRecord.headers()
285340
.lastHeader(RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP);
286341
}
287-
}
288342

343+
private enum ListenerExceptionLoggingStrategy {
344+
345+
/**
346+
* Never log the listener exception.
347+
*/
348+
NEVER,
349+
350+
/**
351+
* Log the listener exception after each attempt.
352+
*/
353+
EACH_ATTEMPT,
289354

355+
/**
356+
* Log the listener only after retries are exhausted.
357+
*/
358+
AFTER_RETRIES_EXHAUSTED
359+
360+
}
361+
}

0 commit comments

Comments
 (0)