diff --git a/spring-kafka-docs/src/main/asciidoc/kafka.adoc b/spring-kafka-docs/src/main/asciidoc/kafka.adoc index 8429eec81a..77c95dbe47 100644 --- a/spring-kafka-docs/src/main/asciidoc/kafka.adoc +++ b/spring-kafka-docs/src/main/asciidoc/kafka.adoc @@ -5049,56 +5049,17 @@ Object handleError(Message message, ListenerExecutionFailedException exceptio ---- ==== -If your error handler implements this interface, you can, for example, adjust the offsets accordingly. -For example, to reset the offset to replay the failed message, you could do something like the following: +Another sub-interface (`ManualAckListenerErrorHandler`) provides access to the `Acknowledgment` object when using manual `AckMode` s. ==== [source, java] ---- -@Bean -public ConsumerAwareListenerErrorHandler listen3ErrorHandler() { - return (m, e, c) -> { - this.listen3Exception = e; - MessageHeaders headers = m.getHeaders(); - c.seek(new org.apache.kafka.common.TopicPartition( - headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class), - headers.get(KafkaHeaders.RECEIVED_PARTITION, Integer.class)), - headers.get(KafkaHeaders.OFFSET, Long.class)); - return null; - }; -} +Object handleError(Message message, ListenerExecutionFailedException exception, + Consumer consumer, @Nullable Acknowledgment ack); ---- ==== -Similarly, you could do something like the following for a batch listener: - -==== -[source, java] ----- -@Bean -public ConsumerAwareListenerErrorHandler listen10ErrorHandler() { - return (m, e, c) -> { - this.listen10Exception = e; - MessageHeaders headers = m.getHeaders(); - List topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class); - List partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION, List.class); - List offsets = headers.get(KafkaHeaders.OFFSET, List.class); - Map offsetsToReset = new HashMap<>(); - for (int i = 0; i < topics.size(); i++) { - int index = i; - offsetsToReset.compute(new TopicPartition(topics.get(i), partitions.get(i)), - (k, v) -> v == null ? offsets.get(index) : Math.min(v, offsets.get(index))); - } - offsetsToReset.forEach((k, v) -> c.seek(k, v)); - return null; - }; -} ----- -==== - -This resets each topic/partition in the batch to the lowest offset in the batch. - -NOTE: The preceding two examples are simplistic implementations, and you would probably want more checking in the error handler. +In either case, you should NOT perform any seeks on the consumer because the container would be unaware of them. [[error-handlers]] ===== Container Error Handlers diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareListenerErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareListenerErrorHandler.java index 3031d9aaa0..d016d04a69 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareListenerErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareListenerErrorHandler.java @@ -34,7 +34,7 @@ public interface ConsumerAwareListenerErrorHandler extends KafkaListenerErrorHan @Override default Object handleError(Message message, ListenerExecutionFailedException exception) { - throw new UnsupportedOperationException("Container should never call this"); + throw new UnsupportedOperationException("Adapter should never call this"); } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerErrorHandler.java index dc7e6835d4..91715b1710 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerErrorHandler.java @@ -18,6 +18,8 @@ import org.apache.kafka.clients.consumer.Consumer; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.lang.Nullable; import org.springframework.messaging.Message; /** @@ -60,4 +62,20 @@ default Object handleError(Message message, ListenerExecutionFailedException return handleError(message, exception); } + /** + * Handle the error. + * @param message the spring-messaging message. + * @param exception the exception the listener threw, wrapped in a + * {@link ListenerExecutionFailedException}. + * @param consumer the consumer. + * @param ack the {@link Acknowledgment}. + * @return the return value is ignored unless the annotated method has a + * {@code @SendTo} annotation. + */ + default Object handleError(Message message, ListenerExecutionFailedException exception, + Consumer consumer, @Nullable Acknowledgment ack) { + + return handleError(message, exception, consumer); + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ManualAckListenerErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ManualAckListenerErrorHandler.java new file mode 100644 index 0000000000..274f5331a1 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ManualAckListenerErrorHandler.java @@ -0,0 +1,44 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import org.apache.kafka.clients.consumer.Consumer; + +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.lang.Nullable; +import org.springframework.messaging.Message; + +/** + * A {@link KafkaListenerErrorHandler} that supports manual acks. + * + * @author Gary Russell + * @since 2.9 + * + */ +@FunctionalInterface +public interface ManualAckListenerErrorHandler extends KafkaListenerErrorHandler { + + @Override + default Object handleError(Message message, ListenerExecutionFailedException exception) { + throw new UnsupportedOperationException("Adapter should never call this"); + } + + @Override + Object handleError(Message message, ListenerExecutionFailedException exception, Consumer consumer, + @Nullable Acknowledgment ack); + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java index e7f77e410e..66701736fb 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java @@ -188,7 +188,7 @@ protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, C if (message.equals(NULL_MESSAGE)) { message = new GenericMessage<>(records); } - Object result = this.errorHandler.handleError(message, e, consumer); + Object result = this.errorHandler.handleError(message, e, consumer, acknowledgment); if (result != null) { handleResult(result, records, message); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java index 47758bede9..d0b2ca5474 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java @@ -100,7 +100,7 @@ public void onMessage(ConsumerRecord record, @Nullable Acknowledgment ackn if (message.equals(NULL_MESSAGE)) { message = new GenericMessage<>(record); } - Object result = this.errorHandler.handleError(message, e, consumer); + Object result = this.errorHandler.handleError(message, e, consumer, acknowledgment); if (result != null) { handleResult(result, record, message); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ListenerErrorHandlerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ListenerErrorHandlerTests.java new file mode 100644 index 0000000000..02cf046621 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ListenerErrorHandlerTests.java @@ -0,0 +1,100 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.willThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.List; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.Test; + +import org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter; +import org.springframework.kafka.listener.adapter.HandlerAdapter; +import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter; +import org.springframework.kafka.support.Acknowledgment; + +/** + * @author Gary Russell + * @since 2.9 + * + */ +public class ListenerErrorHandlerTests { + + private static Method test1; + + private static Method test2; + + static { + try { + test1 = TestListener.class.getDeclaredMethod("test1", String.class, Acknowledgment.class); + test2 = TestListener.class.getDeclaredMethod("test2", List.class, Acknowledgment.class); + } + catch (NoSuchMethodException | SecurityException e) { + throw new IllegalStateException(e); + } + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + void record() throws Exception { + RecordMessagingMessageListenerAdapter adapter = new RecordMessagingMessageListenerAdapter(getClass(), test1, + (ManualAckListenerErrorHandler) (msg, ex, cons, ack) -> { + ack.acknowledge(); + return null; + }); + HandlerAdapter handler = mock(HandlerAdapter.class); + willThrow(new RuntimeException("test")).given(handler).invoke(any(), any()); + adapter.setHandlerMethod(handler); + Acknowledgment ack = mock(Acknowledgment.class); + adapter.onMessage(mock(ConsumerRecord.class), ack, mock(Consumer.class)); + verify(ack).acknowledge(); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + void batch() throws Exception { + BatchMessagingMessageListenerAdapter adapter = new BatchMessagingMessageListenerAdapter(getClass(), test2, + (ManualAckListenerErrorHandler) (msg, ex, cons, ack) -> { + ack.acknowledge(); + return null; + }); + HandlerAdapter handler = mock(HandlerAdapter.class); + willThrow(new RuntimeException("test")).given(handler).invoke(any(), any()); + adapter.setHandlerMethod(handler); + Acknowledgment ack = mock(Acknowledgment.class); + adapter.onMessage(Collections.emptyList(), ack, mock(Consumer.class)); + verify(ack).acknowledge(); + } + + private static class TestListener { + + void test1(String foo, Acknowledgment ack) { + } + + void test2(List foo, Acknowledgment ack) { + } + + } + +}