Skip to content

Commit 36e2364

Browse files
committed
spring-projectsGH-615: Add CommonLoggingErrorHandler
See spring-projects#615
1 parent 8e5c5a7 commit 36e2364

12 files changed

+179
-39
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.springframework.kafka.listener.AfterRollbackProcessor;
3838
import org.springframework.kafka.listener.BatchErrorHandler;
3939
import org.springframework.kafka.listener.BatchInterceptor;
40+
import org.springframework.kafka.listener.CommonErrorHandler;
4041
import org.springframework.kafka.listener.ContainerProperties;
4142
import org.springframework.kafka.listener.ErrorHandler;
4243
import org.springframework.kafka.listener.GenericErrorHandler;
@@ -75,6 +76,8 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe
7576

7677
private GenericErrorHandler<?> errorHandler;
7778

79+
private CommonErrorHandler commonErrorHandler;
80+
7881
private ConsumerFactory<? super K, ? super V> consumerFactory;
7982

8083
private Boolean autoStartup;
@@ -262,6 +265,16 @@ public void setBatchErrorHandler(BatchErrorHandler errorHandler) {
262265
this.errorHandler = errorHandler;
263266
}
264267

268+
/**
269+
* Set the {@link CommonErrorHandler} which can handle errors for both record
270+
* and batch listeners. Replaces the use of {@link GenericErrorHandler}s.
271+
* @param commonErrorHandler the handler.
272+
* @since 2.8
273+
*/
274+
public void setCommonErrorHandler(CommonErrorHandler commonErrorHandler) {
275+
this.commonErrorHandler = commonErrorHandler;
276+
}
277+
265278
/**
266279
* Set a processor to invoke after a transaction rollback; typically will
267280
* seek the unprocessed topic/partition to reprocess the records.
@@ -342,7 +355,7 @@ public void setContainerCustomizer(ContainerCustomizer<K, V, C> containerCustomi
342355

343356
@Override
344357
public void afterPropertiesSet() {
345-
if (this.errorHandler != null) {
358+
if (this.commonErrorHandler == null && this.errorHandler != null) {
346359
if (Boolean.TRUE.equals(this.batchListener)) {
347360
Assert.state(this.errorHandler instanceof BatchErrorHandler,
348361
() -> "The error handler must be a BatchErrorHandler, not " +
@@ -412,6 +425,7 @@ protected void initializeContainer(C instance, KafkaListenerEndpoint endpoint) {
412425
.acceptIfNotNull(this.containerProperties.getSubBatchPerPartition(),
413426
properties::setSubBatchPerPartition)
414427
.acceptIfNotNull(this.errorHandler, instance::setGenericErrorHandler)
428+
.acceptIfNotNull(this.commonErrorHandler, instance::setCommonErrorHandler)
415429
.acceptIfNotNull(this.missingTopicsFatal, instance.getContainerProperties()::setMissingTopicsFatal);
416430
Boolean autoStart = endpoint.getAutoStartup();
417431
if (autoStart != null) {

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ public abstract class AbstractMessageListenerContainer<K, V>
8888

8989
private GenericErrorHandler<?> errorHandler;
9090

91+
private CommonErrorHandler commonErrorHandler;
92+
9193
private boolean autoStartup = true;
9294

9395
private int phase = DEFAULT_PHASE;
@@ -236,6 +238,26 @@ public GenericErrorHandler<?> getGenericErrorHandler() {
236238
return this.errorHandler;
237239
}
238240

241+
/**
242+
* Get the {@link CommonErrorHandler}.
243+
* @return the handler.
244+
* @since 2.8
245+
*/
246+
@Nullable
247+
public CommonErrorHandler getCommonErrorHandler() {
248+
return this.commonErrorHandler;
249+
}
250+
251+
/**
252+
* Set the {@link CommonErrorHandler} which can handle errors for both record
253+
* and batch listeners. Replaces the use of {@link GenericErrorHandler}s.
254+
* @param commonErrorHandler the handler.
255+
* @since 2.8
256+
*/
257+
public void setCommonErrorHandler(@Nullable CommonErrorHandler commonErrorHandler) {
258+
this.commonErrorHandler = commonErrorHandler;
259+
}
260+
239261
@Override
240262
public boolean isAutoStartup() {
241263
return this.autoStartup;

spring-kafka/src/main/java/org/springframework/kafka/listener/BatchLoggingErrorHandler.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,11 +25,14 @@
2525
import org.springframework.lang.Nullable;
2626

2727
/**
28-
* Simple handler that invokes a {@link LoggingErrorHandler} for each record.
28+
* Simple handler that logs each record.
29+
*
30+
* @deprecated - use the {@link CommonLoggingErrorHandler} instead.
2931
*
3032
* @author Gary Russell
3133
* @since 1.1
3234
*/
35+
@Deprecated
3336
public class BatchLoggingErrorHandler implements BatchErrorHandler {
3437

3538
private static final LogAccessor LOGGER =
@@ -43,7 +46,7 @@ public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> da
4346
}
4447
else {
4548
for (ConsumerRecord<?, ?> record : data) {
46-
message.append(record).append('\n');
49+
message.append(ListenerUtils.recordToString(record)).append('\n');
4750
}
4851
}
4952
LOGGER.error(thrownException, () -> message.substring(0, message.length() - 1));

spring-kafka/src/main/java/org/springframework/kafka/listener/CommonErrorHandler.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.List;
2020

21+
import org.apache.commons.logging.LogFactory;
2122
import org.apache.kafka.clients.consumer.Consumer;
2223
import org.apache.kafka.clients.consumer.ConsumerRecord;
2324
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -34,14 +35,6 @@
3435
*/
3536
public interface CommonErrorHandler extends DeliveryAttemptAware {
3637

37-
/**
38-
* Return true if this error handler is for a batch listener.
39-
* @return true for batch.
40-
*/
41-
default boolean isBatch() {
42-
return false;
43-
}
44-
4538
/**
4639
* Return false if this error handler should only receive the current failed record;
4740
* remaining records will be passed to the listener after the error handler returns.
@@ -52,7 +45,7 @@ default boolean isBatch() {
5245
* @see #handleRemaining(Exception, List, Consumer, MessageListenerContainer)
5346
*/
5447
default boolean remainingRecords() {
55-
return true;
48+
return false;
5649
}
5750

5851
/**
@@ -72,6 +65,9 @@ default boolean deliveryAttemptHeader() {
7265
*/
7366
default void handleOtherException(Exception thrownException, Consumer<?, ?> consumer,
7467
MessageListenerContainer container) {
68+
69+
LogFactory.getLog(getClass()).error("'handleOtherException' is not implemented by this handler",
70+
thrownException);
7571
}
7672

7773
/**
@@ -86,6 +82,8 @@ default void handleOtherException(Exception thrownException, Consumer<?, ?> cons
8682
*/
8783
default void handleRecord(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
8884
MessageListenerContainer container) {
85+
86+
LogFactory.getLog(getClass()).error("'handleRecord' is not implemented by this handler", thrownException);
8987
}
9088

9189
/**
@@ -101,6 +99,8 @@ default void handleRecord(Exception thrownException, ConsumerRecord<?, ?> record
10199
*/
102100
default void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
103101
MessageListenerContainer container) {
102+
103+
LogFactory.getLog(getClass()).error("'handleRemaining' is not implemented by this handler", thrownException);
104104
}
105105

106106
/**
@@ -115,6 +115,8 @@ default void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?
115115
*/
116116
default void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data,
117117
Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
118+
119+
LogFactory.getLog(getClass()).error("'handleBatch' is not implemented by this handler", thrownException);
118120
}
119121

120122
@Override
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import org.apache.commons.logging.LogFactory;
20+
import org.apache.kafka.clients.consumer.Consumer;
21+
import org.apache.kafka.clients.consumer.ConsumerRecord;
22+
import org.apache.kafka.clients.consumer.ConsumerRecords;
23+
24+
import org.springframework.core.log.LogAccessor;
25+
26+
/**
27+
* The {@link CommonErrorHandler} implementation for logging exceptions.
28+
*
29+
* @author Gary Russell
30+
* @since 2.8
31+
*
32+
*/
33+
public class CommonLoggingErrorHandler implements CommonErrorHandler {
34+
35+
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(CommonLoggingErrorHandler.class));
36+
37+
private boolean ackAfterHandle = true;
38+
39+
@Override
40+
public boolean isAckAfterHandle() {
41+
return this.ackAfterHandle;
42+
}
43+
44+
@Override
45+
public void setAckAfterHandle(boolean ackAfterHandle) {
46+
this.ackAfterHandle = ackAfterHandle;
47+
}
48+
49+
@Override
50+
public void handleRecord(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
51+
MessageListenerContainer container) {
52+
53+
LOGGER.error(thrownException, () -> "Error occured while processing: " + ListenerUtils.recordToString(record));
54+
}
55+
56+
@Override
57+
public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
58+
MessageListenerContainer container, Runnable invokeListener) {
59+
60+
StringBuilder message = new StringBuilder("Error occurred while processing:\n");
61+
for (ConsumerRecord<?, ?> record : data) {
62+
message.append(ListenerUtils.recordToString(record)).append('\n');
63+
}
64+
LOGGER.error(thrownException, () -> message.substring(0, message.length() - 1));
65+
}
66+
67+
@Override
68+
public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer,
69+
MessageListenerContainer container) {
70+
71+
LOGGER.error(thrownException, () -> "Error occurred while not processing records");
72+
}
73+
74+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ private void configureChildContainer(int index, KafkaMessageListenerContainer<K,
221221
}
222222
container.setClientIdSuffix(this.concurrency > 1 || this.alwaysClientIdSuffix ? "-" + index : "");
223223
container.setGenericErrorHandler(getGenericErrorHandler());
224+
container.setCommonErrorHandler(getCommonErrorHandler());
224225
container.setAfterRollbackProcessor(getAfterRollbackProcessor());
225226
container.setRecordInterceptor(getRecordInterceptor());
226227
container.setBatchInterceptor(getBatchInterceptor());

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlerAdapter.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
* @since 2.7.4
3434
*
3535
*/
36-
public class ErrorHandlerAdapter implements CommonErrorHandler {
36+
class ErrorHandlerAdapter implements CommonErrorHandler {
3737

3838
@SuppressWarnings({ "rawtypes", "unchecked" })
3939
private static final ConsumerRecords EMPTY_BATCH = new ConsumerRecords(Collections.emptyMap());
@@ -46,7 +46,7 @@ public class ErrorHandlerAdapter implements CommonErrorHandler {
4646
* Adapt an {@link ErrorHandler}.
4747
* @param errorHandler the handler.
4848
*/
49-
public ErrorHandlerAdapter(ErrorHandler errorHandler) {
49+
ErrorHandlerAdapter(ErrorHandler errorHandler) {
5050
Assert.notNull(errorHandler, "'errorHandler' cannot be null");
5151
this.errorHandler = errorHandler;
5252
this.batchErrorHandler = null;
@@ -56,17 +56,12 @@ public ErrorHandlerAdapter(ErrorHandler errorHandler) {
5656
* Adapt a {@link BatchErrorHandler}.
5757
* @param batchErrorHandler the handler.
5858
*/
59-
public ErrorHandlerAdapter(BatchErrorHandler batchErrorHandler) {
59+
ErrorHandlerAdapter(BatchErrorHandler batchErrorHandler) {
6060
Assert.notNull(batchErrorHandler, "'batchErrorHandler' cannot be null");
6161
this.errorHandler = null;
6262
this.batchErrorHandler = batchErrorHandler;
6363
}
6464

65-
@Override
66-
public boolean isBatch() {
67-
return this.batchErrorHandler != null;
68-
}
69-
7065
@Override
7166
public boolean remainingRecords() {
7267
return this.errorHandler instanceof RemainingRecordsErrorHandler;
@@ -130,21 +125,36 @@ public void handleOtherException(Exception thrownException, Consumer<?, ?> consu
130125
public void handleRecord(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
131126
MessageListenerContainer container) {
132127

133-
this.errorHandler.handle(thrownException, record, consumer);
128+
if (this.errorHandler != null) {
129+
this.errorHandler.handle(thrownException, record, consumer);
130+
}
131+
else {
132+
CommonErrorHandler.super.handleRecord(thrownException, record, consumer, container);
133+
}
134134
}
135135

136136
@Override
137137
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
138138
MessageListenerContainer container) {
139139

140-
this.errorHandler.handle(thrownException, records, consumer, container);
140+
if (this.errorHandler != null) {
141+
this.errorHandler.handle(thrownException, records, consumer, container);
142+
}
143+
else {
144+
CommonErrorHandler.super.handleRemaining(thrownException, records, consumer, container);
145+
}
141146
}
142147

143148
@Override
144149
public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
145150
MessageListenerContainer container, Runnable invokeListener) {
146151

147-
this.batchErrorHandler.handle(thrownException, data, consumer, container, invokeListener);
152+
if (this.batchErrorHandler != null) {
153+
this.batchErrorHandler.handle(thrownException, data, consumer, container, invokeListener);
154+
}
155+
else {
156+
CommonErrorHandler.super.handleBatch(thrownException, data, consumer, container, invokeListener);
157+
}
148158
}
149159

150160
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -734,7 +734,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
734734
this.commitCurrentOnAssignment = determineCommitCurrent(consumerProperties,
735735
KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties());
736736
subscribeOrAssignTopics(this.consumer);
737-
GenericErrorHandler<?> errHandler = KafkaMessageListenerContainer.this.getGenericErrorHandler();
737+
GenericErrorHandler<?> errHandler = getGenericErrorHandler();
738738
if (listener instanceof BatchMessageListener) {
739739
this.listener = null;
740740
this.batchListener = (BatchMessageListener<K, V>) listener;
@@ -798,6 +798,13 @@ else if (listener instanceof MessageListener) {
798798

799799
@Nullable
800800
private CommonErrorHandler determineCommonErrorHandler(GenericErrorHandler<?> errHandler) {
801+
CommonErrorHandler common = getCommonErrorHandler();
802+
if (common != null) {
803+
if (errHandler != null) {
804+
this.logger.debug("GenericErrorHandler is ignored when a CommonErrorHandler is provided");
805+
}
806+
return common;
807+
}
801808
if (this.isBatchListener) {
802809
validateErrorHandler(true);
803810
BatchErrorHandler batchErrorHandler = determineBatchErrorHandler(errHandler);
@@ -1111,13 +1118,13 @@ protected void checkConsumer() {
11111118
}
11121119

11131120
@Nullable
1114-
protected BatchErrorHandler determineBatchErrorHandler(GenericErrorHandler<?> errHandler) {
1121+
protected BatchErrorHandler determineBatchErrorHandler(@Nullable GenericErrorHandler<?> errHandler) {
11151122
return errHandler != null ? (BatchErrorHandler) errHandler
11161123
: this.transactionManager != null ? null : new RecoveringBatchErrorHandler();
11171124
}
11181125

11191126
@Nullable
1120-
protected ErrorHandler determineErrorHandler(GenericErrorHandler<?> errHandler) {
1127+
protected ErrorHandler determineErrorHandler(@Nullable GenericErrorHandler<?> errHandler) {
11211128
return errHandler != null ? (ErrorHandler) errHandler
11221129
: this.transactionManager != null ? null : new SeekToCurrentErrorHandler();
11231130
}

0 commit comments

Comments
 (0)