Skip to content

GH-2355: Add ManualAckListenerErrorHandler #2356

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 4 additions & 43 deletions spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5049,56 +5049,17 @@ Object handleError(Message<?> message, ListenerExecutionFailedException exceptio
----
====

If your error handler implements this interface, you can, for example, adjust the offsets accordingly.
For example, to reset the offset to replay the failed message, you could do something like the following:
Another sub-interface ()`ManualAckListenerErrorHandler`) provides access to the `Acknowledgment` object when using manual `AckMode` s.

====
[source, java]
----
@Bean
public ConsumerAwareListenerErrorHandler listen3ErrorHandler() {
return (m, e, c) -> {
this.listen3Exception = e;
MessageHeaders headers = m.getHeaders();
c.seek(new org.apache.kafka.common.TopicPartition(
headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
headers.get(KafkaHeaders.RECEIVED_PARTITION, Integer.class)),
headers.get(KafkaHeaders.OFFSET, Long.class));
return null;
};
}
Object handleError(Message<?> message, ListenerExecutionFailedException exception,
Consumer<?, ?> consumer, @Nullable Acknowledgment ack);
----
====

Similarly, you could do something like the following for a batch listener:

====
[source, java]
----
@Bean
public ConsumerAwareListenerErrorHandler listen10ErrorHandler() {
return (m, e, c) -> {
this.listen10Exception = e;
MessageHeaders headers = m.getHeaders();
List<String> topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class);
List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION, List.class);
List<Long> offsets = headers.get(KafkaHeaders.OFFSET, List.class);
Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
for (int i = 0; i < topics.size(); i++) {
int index = i;
offsetsToReset.compute(new TopicPartition(topics.get(i), partitions.get(i)),
(k, v) -> v == null ? offsets.get(index) : Math.min(v, offsets.get(index)));
}
offsetsToReset.forEach((k, v) -> c.seek(k, v));
return null;
};
}
----
====

This resets each topic/partition in the batch to the lowest offset in the batch.

NOTE: The preceding two examples are simplistic implementations, and you would probably want more checking in the error handler.
In either case, you should NOT perform any seeks on the consumer because the container would be unaware of them.

[[error-handlers]]
===== Container Error Handlers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public interface ConsumerAwareListenerErrorHandler extends KafkaListenerErrorHan

@Override
default Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
throw new UnsupportedOperationException("Container should never call this");
throw new UnsupportedOperationException("Adapter should never call this");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import org.apache.kafka.clients.consumer.Consumer;

import org.springframework.kafka.support.Acknowledgment;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;

/**
Expand Down Expand Up @@ -60,4 +62,20 @@ default Object handleError(Message<?> message, ListenerExecutionFailedException
return handleError(message, exception);
}

/**
* Handle the error.
* @param message the spring-messaging message.
* @param exception the exception the listener threw, wrapped in a
* {@link ListenerExecutionFailedException}.
* @param consumer the consumer.
* @param ack the {@link Acknowledgment}.
* @return the return value is ignored unless the annotated method has a
* {@code @SendTo} annotation.
*/
default Object handleError(Message<?> message, ListenerExecutionFailedException exception,
Consumer<?, ?> consumer, @Nullable Acknowledgment ack) {

return handleError(message, exception, consumer);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import org.apache.kafka.clients.consumer.Consumer;

import org.springframework.kafka.support.Acknowledgment;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;

/**
* A {@link KafkaListenerErrorHandler} that supports manual acks.
*
* @author Gary Russell
* @since 2.9
*
*/
@FunctionalInterface
public interface ManualAckListenerErrorHandler extends KafkaListenerErrorHandler {

@Override
default Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
throw new UnsupportedOperationException("Adapter should never call this");
}

@Override
Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer,
@Nullable Acknowledgment ack);

}
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, C
if (message.equals(NULL_MESSAGE)) {
message = new GenericMessage<>(records);
}
Object result = this.errorHandler.handleError(message, e, consumer);
Object result = this.errorHandler.handleError(message, e, consumer, acknowledgment);
if (result != null) {
handleResult(result, records, message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void onMessage(ConsumerRecord<K, V> record, @Nullable Acknowledgment ackn
if (message.equals(NULL_MESSAGE)) {
message = new GenericMessage<>(record);
}
Object result = this.errorHandler.handleError(message, e, consumer);
Object result = this.errorHandler.handleError(message, e, consumer, acknowledgment);
if (result != null) {
handleResult(result, record, message);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.willThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

import java.lang.reflect.Method;
import java.util.Collections;
import java.util.List;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.Test;

import org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.HandlerAdapter;
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;

/**
* @author Gary Russell
* @since 2.9
*
*/
public class ListenerErrorHandlerTests {

private static Method test1;

private static Method test2;

static {
try {
test1 = TestListener.class.getDeclaredMethod("test1", String.class, Acknowledgment.class);
test2 = TestListener.class.getDeclaredMethod("test2", List.class, Acknowledgment.class);
}
catch (NoSuchMethodException | SecurityException e) {
throw new IllegalStateException(e);
}
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
void record() throws Exception {
RecordMessagingMessageListenerAdapter adapter = new RecordMessagingMessageListenerAdapter(getClass(), test1,
(ManualAckListenerErrorHandler) (msg, ex, cons, ack) -> {
ack.acknowledge();
return null;
});
HandlerAdapter handler = mock(HandlerAdapter.class);
willThrow(new RuntimeException("test")).given(handler).invoke(any(), any());
adapter.setHandlerMethod(handler);
Acknowledgment ack = mock(Acknowledgment.class);
adapter.onMessage(mock(ConsumerRecord.class), ack, mock(Consumer.class));
verify(ack).acknowledge();
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
void batch() throws Exception {
BatchMessagingMessageListenerAdapter adapter = new BatchMessagingMessageListenerAdapter(getClass(), test2,
(ManualAckListenerErrorHandler) (msg, ex, cons, ack) -> {
ack.acknowledge();
return null;
});
HandlerAdapter handler = mock(HandlerAdapter.class);
willThrow(new RuntimeException("test")).given(handler).invoke(any(), any());
adapter.setHandlerMethod(handler);
Acknowledgment ack = mock(Acknowledgment.class);
adapter.onMessage(Collections.emptyList(), ack, mock(Consumer.class));
verify(ack).acknowledge();
}

private static class TestListener {

void test1(String foo, Acknowledgment ack) {
}

void test2(List<String> foo, Acknowledgment ack) {
}

}

}