Skip to content

Commit c3d9779

Browse files
tomazfernandesgaryrussell
authored andcommitted
GH-2212: Log listener exception in retry topic flow (#2213)
* GH-2212: Log listener ex. in retry topic flow Resolves #2212 As in Retry Topic flow recovery is successful, the exception thrown by the listener was never being logged. Now it logs at DEBUG level for intermediate retries and ERROR level when retries are exhausted. Also some polishing in TimestampedException as it was cluttering the stacktrace with duplicated information from cause.getMessage(). Now it logs the time when the exception occurred. * Address TimestampedException review suggestions Add ListenerExceptionLoggingStrategy Add unit tests * Address review comments
1 parent a89895b commit c3d9779

File tree

5 files changed

+297
-10
lines changed

5 files changed

+297
-10
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/TimestampedException.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -34,14 +34,41 @@ public class TimestampedException extends KafkaException {
3434

3535
private final long timestamp;
3636

37+
@Deprecated
3738
public TimestampedException(Exception ex, Clock clock) {
3839
super(ex.getMessage(), ex);
3940
this.timestamp = Instant.now(clock).toEpochMilli();
4041
}
4142

43+
/**
44+
* Constructs an instance with the provided cause
45+
* and the current time.
46+
* @param ex the exception cause.
47+
*/
4248
public TimestampedException(Exception ex) {
43-
super(ex.getMessage(), ex);
44-
this.timestamp = Instant.now(Clock.systemDefaultZone()).toEpochMilli();
49+
this(ex, Instant.now());
50+
}
51+
52+
/**
53+
* Creates an instance with the timestamp of when it was thrown and its cause.
54+
* @param ex the exception cause.
55+
* @param timestamp the millis from epoch of when the exception was thrown.
56+
* @since 2.7.13
57+
*/
58+
public TimestampedException(Exception ex, long timestamp) {
59+
super("Exception thrown at " + Instant.ofEpochMilli(timestamp), ex);
60+
this.timestamp = timestamp;
61+
}
62+
63+
/**
64+
* Creates an instance with the Instant of when it was thrown and its cause.
65+
* @param ex the exception cause.
66+
* @param now the Instant of when the exception was thrown.
67+
* @since 2.7.13
68+
*/
69+
public TimestampedException(Exception ex, Instant now) {
70+
super("Exception thrown at " + now, ex);
71+
this.timestamp = now.toEpochMilli();
4572
}
4673

4774
public long getTimestamp() {

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaBackoffAwareMessageListenerAdapter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import java.math.BigInteger;
2020
import java.time.Clock;
21+
import java.time.Instant;
2122
import java.util.Optional;
2223

2324
import org.apache.kafka.clients.consumer.Consumer;
@@ -96,7 +97,7 @@ public void onMessage(ConsumerRecord<K, V> consumerRecord, @Nullable Acknowledgm
9697
invokeDelegateOnMessage(consumerRecord, acknowledgment, consumer);
9798
}
9899
catch (Exception ex) {
99-
throw new TimestampedException(ex, this.clock);
100+
throw new TimestampedException(ex, Instant.now(this.clock));
100101
}
101102
}
102103

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory.java

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,37 @@
4848
*/
4949
public class DeadLetterPublishingRecovererFactory {
5050

51-
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(DeadLetterPublishingRecovererFactory.class));
51+
private static final LogAccessor LOGGER =
52+
new LogAccessor(LogFactory.getLog(DeadLetterPublishingRecovererFactory.class));
5253

5354
private final DestinationTopicResolver destinationTopicResolver;
5455

5556
private Consumer<DeadLetterPublishingRecoverer> recovererCustomizer = recoverer -> { };
5657

58+
private ListenerExceptionLoggingStrategy loggingStrategy = ListenerExceptionLoggingStrategy.AFTER_RETRIES_EXHAUSTED;
59+
5760
public DeadLetterPublishingRecovererFactory(DestinationTopicResolver destinationTopicResolver) {
5861
this.destinationTopicResolver = destinationTopicResolver;
5962
}
6063

64+
/**
65+
* Never logs the listener exception.
66+
* The default is logging only after retries are exhausted.
67+
* @since 2.7.13
68+
*/
69+
public void neverLogListenerException() {
70+
this.loggingStrategy = ListenerExceptionLoggingStrategy.NEVER;
71+
}
72+
73+
/**
74+
* Logs the listener exception at each attempt.
75+
* The default is logging only after retries are exhausted.
76+
* @since 2.7.13
77+
*/
78+
public void alwaysLogListenerException() {
79+
this.loggingStrategy = ListenerExceptionLoggingStrategy.EACH_ATTEMPT;
80+
}
81+
6182
@SuppressWarnings("unchecked")
6283
public DeadLetterPublishingRecoverer create() {
6384
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(// NOSONAR anon. class size
@@ -114,11 +135,45 @@ private TopicPartition resolveDestination(ConsumerRecord<?, ?> cr, Exception e)
114135
? "none"
115136
: nextDestination.getDestinationName()));
116137

138+
maybeLogListenerException(e, cr, nextDestination);
139+
117140
return nextDestination.isNoOpsTopic()
118141
? null
119142
: resolveTopicPartition(cr, nextDestination);
120143
}
121144

145+
private void maybeLogListenerException(Exception e, ConsumerRecord<?, ?> cr, DestinationTopic nextDestination) {
146+
if (nextDestination.isDltTopic()
147+
&& !ListenerExceptionLoggingStrategy.NEVER.equals(this.loggingStrategy)) {
148+
LOGGER.error(e, () -> getErrorMessage(cr) + " and won't be retried. "
149+
+ "Sending to DLT with name " + nextDestination.getDestinationName() + ".");
150+
}
151+
else if (nextDestination.isNoOpsTopic()
152+
&& !ListenerExceptionLoggingStrategy.NEVER.equals(this.loggingStrategy)) {
153+
LOGGER.error(e, () -> getErrorMessage(cr) + " and won't be retried. "
154+
+ "No further action will be taken with this record.");
155+
}
156+
else if (ListenerExceptionLoggingStrategy.EACH_ATTEMPT.equals(this.loggingStrategy)) {
157+
LOGGER.error(e, () -> getErrorMessage(cr) + ". "
158+
+ "Sending to retry topic " + nextDestination.getDestinationName() + ".");
159+
}
160+
else {
161+
LOGGER.debug(e, () -> getErrorMessage(cr) + ". "
162+
+ "Sending to retry topic " + nextDestination.getDestinationName() + ".");
163+
}
164+
}
165+
166+
private static String getErrorMessage(ConsumerRecord<?, ?> cr) {
167+
return "Record: " + getRecordInfo(cr) + " threw an error at topic " + cr.topic();
168+
}
169+
170+
private static String getRecordInfo(ConsumerRecord<?, ?> cr) {
171+
Header originalTopicHeader = cr.headers().lastHeader(KafkaHeaders.ORIGINAL_TOPIC);
172+
return String.format("topic = %s, partition = %s, offset = %s, main topic = %s",
173+
cr.topic(), cr.partition(), cr.offset(),
174+
originalTopicHeader != null ? new String(originalTopicHeader.value()) : cr.topic());
175+
}
176+
122177
/**
123178
* Creates and returns the {@link TopicPartition}, where the original record should be forwarded.
124179
* By default, it will use the partition same as original record's partition, in the next destination topic.
@@ -213,6 +268,23 @@ private Header getOriginaTimeStampHeader(ConsumerRecord<?, ?> consumerRecord) {
213268
return consumerRecord.headers()
214269
.lastHeader(RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP);
215270
}
216-
}
217271

272+
private enum ListenerExceptionLoggingStrategy {
273+
274+
/**
275+
* Never log the listener exception.
276+
*/
277+
NEVER,
278+
279+
/**
280+
* Log the listener exception after each attempt.
281+
*/
282+
EACH_ATTEMPT,
218283

284+
/**
285+
* Log the listener only after retries are exhausted.
286+
*/
287+
AFTER_RETRIES_EXHAUSTED
288+
289+
}
290+
}

0 commit comments

Comments
 (0)