Skip to content

Commit cd661dc

Browse files
authored
GH-615: Error Handler Evolution Part 1
Flatten `GenericErrorHandler` into a single interface. Create an adapter for legacy error handlers. See #615 * Fix since; expand javadocs; improve method names.
1 parent 2cab170 commit cd661dc

File tree

3 files changed

+348
-34
lines changed

3 files changed

+348
-34
lines changed
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.List;
20+
21+
import org.apache.kafka.clients.consumer.Consumer;
22+
import org.apache.kafka.clients.consumer.ConsumerRecord;
23+
import org.apache.kafka.clients.consumer.ConsumerRecords;
24+
25+
import org.springframework.kafka.support.TopicPartitionOffset;
26+
27+
/**
28+
* Replacement for {@link ErrorHandler} and {@link BatchErrorHandler} and their
29+
* sub-interfaces.
30+
*
31+
* @author Gary Russell
32+
* @since 2.8
33+
*
34+
*/
35+
public interface CommonErrorHandler extends DeliveryAttemptAware {
36+
37+
/**
38+
* Return true if this error handler is for a batch listener.
39+
* @return true for batch.
40+
*/
41+
default boolean isBatch() {
42+
return false;
43+
}
44+
45+
/**
46+
* Return false if this error handler should only receive the current failed record;
47+
* remaining records will be passed to the listener after the error handler returns.
48+
* When true (default), all remaining records including the failed record are passed
49+
* to the error handler.
50+
* @return false to receive only the failed record.
51+
* @see #handleRecord(Exception, ConsumerRecord, Consumer, MessageListenerContainer)
52+
* @see #handleRemaining(Exception, List, Consumer, MessageListenerContainer)
53+
*/
54+
default boolean remainingRecords() {
55+
return true;
56+
}
57+
58+
/**
59+
* Return true if this error handler supports delivery attempts headers.
60+
* @return true if capable.
61+
*/
62+
default boolean deliveryAttemptHeader() {
63+
return false;
64+
}
65+
66+
/**
67+
* Called when an exception is thrown with no records available, e.g. if the consumer
68+
* poll throws an exception.
69+
* @param thrownException the exception.
70+
* @param consumer the consumer.
71+
* @param container the container.
72+
*/
73+
default void handleOtherException(Exception thrownException, Consumer<?, ?> consumer,
74+
MessageListenerContainer container) {
75+
}
76+
77+
/**
78+
* Handle the exception for a record listener when {@link #remainingRecords()} returns
79+
* false. Use this to handle just the single failed record; remaining records from the
80+
* poll will be sent to the listener.
81+
* @param thrownException the exception.
82+
* @param record the record.
83+
* @param consumer the consumer.
84+
* @param container the container.
85+
* @see #remainingRecords()
86+
*/
87+
default void handleRecord(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
88+
MessageListenerContainer container) {
89+
}
90+
91+
/**
92+
* Handle the exception for a record listener when {@link #remainingRecords()} returns
93+
* true. The failed record and all the remaining records from the poll are passed in.
94+
* Usually used when the error handler performs seeks so that the remaining records
95+
* will be redelivered on the next poll.
96+
* @param thrownException the exception.
97+
* @param records the remaining records including the one that failed.
98+
* @param consumer the consumer.
99+
* @param container the container.
100+
* @see #remainingRecords()
101+
*/
102+
default void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
103+
MessageListenerContainer container) {
104+
}
105+
106+
/**
107+
* Handle the exception for a batch listener. The complete {@link ConsumerRecords}
108+
* from the poll is supplied. The error handler needs to perform seeks if you wish to
109+
* reprocess the records in the batch.
110+
* @param thrownException the exception.
111+
* @param data the consumer records.
112+
* @param consumer the consumer.
113+
* @param container the container.
114+
* @param invokeListener a callback to re-invoke the listener.
115+
*/
116+
default void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data,
117+
Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
118+
}
119+
120+
@Override
121+
default int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) {
122+
return 0;
123+
}
124+
125+
/**
126+
* Optional method to clear thread state; will be called just before a consumer
127+
* thread terminates.
128+
*/
129+
default void clearThreadState() {
130+
}
131+
132+
/**
133+
* Return true if the offset should be committed for a handled error (no exception
134+
* thrown).
135+
* @return true to commit.
136+
*/
137+
default boolean isAckAfterHandle() {
138+
return true;
139+
}
140+
141+
/**
142+
* Set to false to prevent the container from committing the offset of a recovered
143+
* record (when the error handler does not itself throw an exception).
144+
* @param ack false to not commit.
145+
*/
146+
default void setAckAfterHandle(boolean ack) {
147+
throw new UnsupportedOperationException("This error handler does not support setting this property");
148+
}
149+
150+
}
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.Collections;
20+
import java.util.List;
21+
22+
import org.apache.kafka.clients.consumer.Consumer;
23+
import org.apache.kafka.clients.consumer.ConsumerRecord;
24+
import org.apache.kafka.clients.consumer.ConsumerRecords;
25+
26+
import org.springframework.kafka.support.TopicPartitionOffset;
27+
import org.springframework.util.Assert;
28+
29+
/**
30+
* Adapts a legacy {@link ErrorHandler} or {@link BatchErrorHandler}.
31+
*
32+
* @author Gary Russell
33+
* @since 2.7.4
34+
*
35+
*/
36+
public class ErrorHandlerAdapter implements CommonErrorHandler {
37+
38+
@SuppressWarnings({ "rawtypes", "unchecked" })
39+
private static final ConsumerRecords EMPTY_BATCH = new ConsumerRecords(Collections.emptyMap());
40+
41+
private final ErrorHandler errorHandler;
42+
43+
private final BatchErrorHandler batchErrorHandler;
44+
45+
/**
46+
* Adapt an {@link ErrorHandler}.
47+
* @param errorHandler the handler.
48+
*/
49+
public ErrorHandlerAdapter(ErrorHandler errorHandler) {
50+
Assert.notNull(errorHandler, "'errorHandler' cannot be null");
51+
this.errorHandler = errorHandler;
52+
this.batchErrorHandler = null;
53+
}
54+
55+
/**
56+
* Adapt a {@link BatchErrorHandler}.
57+
* @param batchErrorHandler the handler.
58+
*/
59+
public ErrorHandlerAdapter(BatchErrorHandler batchErrorHandler) {
60+
Assert.notNull(batchErrorHandler, "'batchErrorHandler' cannot be null");
61+
this.errorHandler = null;
62+
this.batchErrorHandler = batchErrorHandler;
63+
}
64+
65+
@Override
66+
public boolean isBatch() {
67+
return this.batchErrorHandler != null;
68+
}
69+
70+
@Override
71+
public boolean remainingRecords() {
72+
return this.errorHandler instanceof RemainingRecordsErrorHandler;
73+
}
74+
75+
@Override
76+
public boolean deliveryAttemptHeader() {
77+
return this.errorHandler instanceof DeliveryAttemptAware;
78+
}
79+
80+
@Override
81+
public void clearThreadState() {
82+
if (this.errorHandler != null) {
83+
this.errorHandler.clearThreadState();
84+
}
85+
else {
86+
this.batchErrorHandler.clearThreadState();
87+
}
88+
}
89+
90+
@Override
91+
public boolean isAckAfterHandle() {
92+
if (this.errorHandler != null) {
93+
return this.errorHandler.isAckAfterHandle();
94+
}
95+
else {
96+
return this.batchErrorHandler.isAckAfterHandle();
97+
}
98+
}
99+
100+
@Override
101+
public void setAckAfterHandle(boolean ack) {
102+
if (this.errorHandler != null) {
103+
this.errorHandler.setAckAfterHandle(ack);
104+
}
105+
else {
106+
this.batchErrorHandler.setAckAfterHandle(ack);
107+
}
108+
}
109+
110+
@Override
111+
public int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) {
112+
Assert.state(deliveryAttemptHeader(), "This method should not be called by the container");
113+
return ((DeliveryAttemptAware) this.errorHandler).deliveryAttempt(topicPartitionOffset);
114+
}
115+
116+
@SuppressWarnings({ "unchecked" })
117+
@Override
118+
public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer,
119+
MessageListenerContainer container) {
120+
121+
if (this.errorHandler != null) {
122+
this.errorHandler.handle(thrownException, Collections.EMPTY_LIST, consumer, container);
123+
}
124+
else {
125+
this.batchErrorHandler.handle(thrownException, EMPTY_BATCH, consumer, container, () -> { });
126+
}
127+
}
128+
129+
@Override
130+
public void handleRecord(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
131+
MessageListenerContainer container) {
132+
133+
this.errorHandler.handle(thrownException, record, consumer);
134+
}
135+
136+
@Override
137+
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
138+
MessageListenerContainer container) {
139+
140+
this.errorHandler.handle(thrownException, records, consumer, container);
141+
}
142+
143+
@Override
144+
public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
145+
MessageListenerContainer container, Runnable invokeListener) {
146+
147+
this.batchErrorHandler.handle(thrownException, data, consumer, container, invokeListener);
148+
}
149+
150+
}
151+

0 commit comments

Comments
 (0)