Skip to content

Commit bbefd04

Browse files
authored
GH-2702: RetryableTopic with asyncAcks
Resolves #2702 When using `asyncAcks` with manual ack modes, the `DefaultErrorHandler` must have `seekAfterError` set to `false`; this required user configuration. The framework now unconditionally sets the property when it configures a container using a manual ack mode. In addition, the default DLT handler was not compatible with any manual ack mode, regardless of the `asyncAcks` setting. Add `Acknowledgment` to the `LoggingDltListenerHandlerMethod`. Also tested with reporter's reproducer. **cherry-pick to 2.9.x (will require instanceof polishing for Java 8)** * Only supply `NoOpAck` with explicit `@NonNull` param annotation.
1 parent 8efd15f commit bbefd04

File tree

7 files changed

+145
-20
lines changed

7 files changed

+145
-20
lines changed

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,22 @@ IMPORTANT: You can set the `AckMode` mode you prefer, but `RECORD` is suggested.
2424

2525
IMPORTANT: At this time this functionality doesn't support class level `@KafkaListener` annotations
2626

27+
When using a manual `AckMode` with `asyncAcks` set to true, the `DefaultErrorHandler` must be configured with `seekAfterError` set to `false`.
28+
Starting with versions 2.9.10, 3.0.8, this will be set to true unconditionally for such configurations.
29+
With earlier versions, it was necessary to override the `RetryConfigurationSupport.configureCustomizers()` method to set the property to `true`.
30+
31+
====
32+
[source, java]
33+
----
34+
@Override
35+
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
36+
customizersConfigurer.customizeErrorHandler(eh -> eh.setSeekAfterError(false));
37+
}
38+
----
39+
====
40+
41+
In addition, before those versions, using the default (logging) DLT handler was not compatible with any kind of manual `AckMode`, regardless of the `asyncAcks` property.
42+
2743
==== Back Off Delay Precision
2844

2945
===== Overview and Guarantees

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.springframework.kafka.support.KafkaUtils;
5353
import org.springframework.kafka.support.converter.MessagingMessageConverter;
5454
import org.springframework.kafka.support.converter.RecordMessageConverter;
55+
import org.springframework.lang.NonNull;
5556
import org.springframework.lang.Nullable;
5657
import org.springframework.messaging.Message;
5758
import org.springframework.messaging.MessageHeaders;
@@ -84,6 +85,8 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS
8485

8586
private static final SpelExpressionParser PARSER = new SpelExpressionParser();
8687

88+
private static final Acknowledgment NO_OP_ACK = new NoOpAck();
89+
8790
/**
8891
* Message used when no conversion is needed.
8992
*/
@@ -120,6 +123,8 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS
120123

121124
private boolean hasAckParameter;
122125

126+
private boolean noOpAck;
127+
123128
private boolean hasMetadataParameter;
124129

125130
private boolean messageReturnType;
@@ -353,25 +358,29 @@ protected Message<?> toMessagingMessage(ConsumerRecord<K, V> cRecord, @Nullable
353358
protected final Object invokeHandler(Object data, @Nullable Acknowledgment acknowledgment, Message<?> message,
354359
Consumer<?, ?> consumer) {
355360

361+
Acknowledgment ack = acknowledgment;
362+
if (ack == null && this.noOpAck) {
363+
ack = NO_OP_ACK;
364+
}
356365
try {
357366
if (data instanceof List && !this.isConsumerRecordList) {
358-
return this.handlerMethod.invoke(message, acknowledgment, consumer);
367+
return this.handlerMethod.invoke(message, ack, consumer);
359368
}
360369
else {
361370
if (this.hasMetadataParameter) {
362-
return this.handlerMethod.invoke(message, data, acknowledgment, consumer,
371+
return this.handlerMethod.invoke(message, data, ack, consumer,
363372
AdapterUtils.buildConsumerRecordMetadata(data));
364373
}
365374
else {
366-
return this.handlerMethod.invoke(message, data, acknowledgment, consumer);
375+
return this.handlerMethod.invoke(message, data, ack, consumer);
367376
}
368377
}
369378
}
370379
catch (org.springframework.messaging.converter.MessageConversionException ex) {
371-
throw checkAckArg(acknowledgment, message, new MessageConversionException("Cannot handle message", ex));
380+
throw checkAckArg(ack, message, new MessageConversionException("Cannot handle message", ex));
372381
}
373382
catch (MethodArgumentNotValidException ex) {
374-
throw checkAckArg(acknowledgment, message, ex);
383+
throw checkAckArg(ack, message, ex);
375384
}
376385
catch (MessagingException ex) {
377386
throw new ListenerExecutionFailedException(createMessagingErrorMessage("Listener method could not " +
@@ -607,6 +616,9 @@ protected Type determineInferredType(Method method) { // NOSONAR complexity
607616
boolean isNotConvertible = parameterIsType(parameterType, ConsumerRecord.class);
608617
boolean isAck = parameterIsType(parameterType, Acknowledgment.class);
609618
this.hasAckParameter |= isAck;
619+
if (isAck) {
620+
this.noOpAck |= methodParameter.getParameterAnnotation(NonNull.class) != null;
621+
}
610622
isNotConvertible |= isAck;
611623
boolean isConsumer = parameterIsType(parameterType, Consumer.class);
612624
isNotConvertible |= isConsumer;
@@ -759,4 +771,12 @@ private boolean parameterIsType(Type parameterType, Type type) {
759771
public record ReplyExpressionRoot(Object request, Object source, Object result) {
760772
}
761773

774+
static class NoOpAck implements Acknowledgment {
775+
776+
@Override
777+
public void acknowledge() {
778+
}
779+
780+
}
781+
762782
}

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 the original author or authors.
2+
* Copyright 2018-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,6 +29,7 @@
2929
import org.springframework.kafka.listener.CommonErrorHandler;
3030
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
3131
import org.springframework.kafka.listener.ContainerProperties;
32+
import org.springframework.kafka.listener.ContainerProperties.AckMode;
3233
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
3334
import org.springframework.kafka.listener.DefaultErrorHandler;
3435
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
@@ -233,11 +234,19 @@ private class RetryTopicListenerContainerFactoryDecorator
233234
if (mainListenerId == null) {
234235
mainListenerId = listenerContainer.getListenerId();
235236
}
237+
CommonErrorHandler errorHandler = createErrorHandler(
238+
ListenerContainerFactoryConfigurer.this.deadLetterPublishingRecovererFactory
239+
.create(mainListenerId),
240+
this.configuration);
241+
if (listenerContainer.getContainerProperties().isAsyncAcks()) {
242+
AckMode ackMode = listenerContainer.getContainerProperties().getAckMode();
243+
if ((AckMode.MANUAL.equals(ackMode) || AckMode.MANUAL_IMMEDIATE.equals(ackMode))
244+
&& errorHandler instanceof DefaultErrorHandler deh) {
245+
deh.setSeekAfterError(false);
246+
}
247+
}
236248
listenerContainer
237-
.setCommonErrorHandler(createErrorHandler(
238-
ListenerContainerFactoryConfigurer.this.deadLetterPublishingRecovererFactory
239-
.create(mainListenerId),
240-
this.configuration));
249+
.setCommonErrorHandler(errorHandler);
241250
setupBackoffAwareMessageListenerAdapter(listenerContainer, this.configuration,
242251
this.isSetContainerProperties);
243252
return listenerContainer;

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,11 @@
3434
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
3535
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
3636
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
37+
import org.springframework.kafka.support.Acknowledgment;
3738
import org.springframework.kafka.support.EndpointHandlerMethod;
3839
import org.springframework.kafka.support.KafkaUtils;
3940
import org.springframework.kafka.support.TopicForRetryable;
41+
import org.springframework.lang.NonNull;
4042
import org.springframework.lang.Nullable;
4143

4244

@@ -468,15 +470,17 @@ static class LoggingDltListenerHandlerMethod {
468470

469471
public static final String DEFAULT_DLT_METHOD_NAME = "logMessage";
470472

471-
public void logMessage(Object message) {
473+
public void logMessage(Object message, @NonNull Acknowledgment ack) {
472474
if (message instanceof ConsumerRecord) {
473475
LOGGER.info(() -> "Received message in dlt listener: "
474476
+ KafkaUtils.format((ConsumerRecord<?, ?>) message));
475477
}
476478
else {
477479
LOGGER.info(() -> "Received message in dlt listener.");
478480
}
481+
ack.acknowledge();
479482
}
483+
480484
}
481485

482486
}

spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/KafkaBackoffAwareMessageListenerAdapterTests.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2021 the original author or authors.
2+
* Copyright 2019-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.assertj.core.api.Assertions.assertThatThrownBy;
21+
import static org.mockito.ArgumentMatchers.any;
2122
import static org.mockito.ArgumentMatchers.eq;
2223
import static org.mockito.ArgumentMatchers.isNull;
2324
import static org.mockito.BDDMockito.given;
@@ -147,7 +148,7 @@ void shouldCallBackoffManagerIfBackoffHeaderIsPresentAndFirstMethodIsCalled() {
147148
then(kafkaConsumerBackoffManager).should(times(1))
148149
.backOffIfNecessary(context);
149150

150-
then(delegate).should(times(1)).onMessage(data, null, null);
151+
then(delegate).should(times(1)).onMessage(eq(data), any(), isNull());
151152
}
152153

153154
@Test
@@ -159,7 +160,7 @@ void shouldWrapExceptionInTimestampedException() {
159160
given(kafkaConsumerBackoffManager.createContext(originalTimestamp, listenerId, topicPartition, null))
160161
.willReturn(context);
161162
RuntimeException thrownException = new RuntimeException();
162-
willThrow(thrownException).given(delegate).onMessage(data, null, null);
163+
willThrow(thrownException).given(delegate).onMessage(eq(data), any(), isNull());
163164

164165
KafkaBackoffAwareMessageListenerAdapter<Object, Object> backoffAwareMessageListenerAdapter =
165166
new KafkaBackoffAwareMessageListenerAdapter<>(delegate, kafkaConsumerBackoffManager, listenerId, clock);
@@ -175,7 +176,7 @@ void shouldWrapExceptionInTimestampedException() {
175176
then(kafkaConsumerBackoffManager).should(times(1))
176177
.backOffIfNecessary(context);
177178

178-
then(delegate).should(times(1)).onMessage(data, null, null);
179+
then(delegate).should(times(1)).onMessage(eq(data), any(), isNull());
179180
}
180181

181182
@Test
@@ -224,7 +225,7 @@ void shouldCallBackoffManagerIfBackoffHeaderIsPresentAndThirdMethodIsCalled() {
224225
then(kafkaConsumerBackoffManager).should(times(1))
225226
.backOffIfNecessary(context);
226227

227-
then(delegate).should(times(1)).onMessage(data, null, consumer);
228+
then(delegate).should(times(1)).onMessage(eq(data), any(), eq(consumer));
228229
}
229230

230231
@Test

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 the original author or authors.
2+
* Copyright 2018-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,6 +24,7 @@
2424
import static org.mockito.BDDMockito.then;
2525
import static org.mockito.BDDMockito.willReturn;
2626
import static org.mockito.BDDMockito.willThrow;
27+
import static org.mockito.Mockito.mock;
2728
import static org.mockito.Mockito.times;
2829

2930
import java.lang.reflect.Method;
@@ -51,6 +52,7 @@
5152
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
5253
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
5354
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
55+
import org.springframework.kafka.support.Acknowledgment;
5456
import org.springframework.kafka.support.EndpointHandlerMethod;
5557
import org.springframework.kafka.test.condition.LogLevels;
5658
import org.springframework.test.util.ReflectionTestUtils;
@@ -365,15 +367,15 @@ void shouldInstantiateIfNotInContainer() {
365367
void shouldLogConsumerRecordMessage() {
366368
RetryTopicConfigurer.LoggingDltListenerHandlerMethod method =
367369
new RetryTopicConfigurer.LoggingDltListenerHandlerMethod();
368-
method.logMessage(consumerRecordMessage);
370+
method.logMessage(consumerRecordMessage, mock(Acknowledgment.class));
369371
then(consumerRecordMessage).should().topic();
370372
}
371373

372374
@Test
373375
void shouldNotLogObjectMessage() {
374376
RetryTopicConfigurer.LoggingDltListenerHandlerMethod method =
375377
new RetryTopicConfigurer.LoggingDltListenerHandlerMethod();
376-
method.logMessage(objectMessage);
378+
method.logMessage(objectMessage, mock(Acknowledgment.class));
377379
then(objectMessage).shouldHaveNoInteractions();
378380
}
379381

0 commit comments

Comments
 (0)