Skip to content

Commit 4401f96

Browse files
committed
GH-2702: RetryableTopic with asyncAcks Back Port
Resolves #2707 When using `asyncAcks` with manual ack modes, the `DefaultErrorHandler` must have `seekAfterError` set to `false`; this required user configuration. The framework now unconditionally sets the property when it configures a container using a manual ack mode. In addition, the default DLT handler was not compatible with any manual ack mode, regardless of the `asyncAcks` setting. Add `Acknowledgment` to the `LoggingDltListenerHandlerMethod`. Also tested with reporter's reproducer. **cherry-pick to 2.9.x (will require instanceof polishing for Java 8)** * Only supply `NoOpAck` with explicit `@NonNull` param annotation.
1 parent 8e1575b commit 4401f96

File tree

7 files changed

+152
-22
lines changed

7 files changed

+152
-22
lines changed

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,22 @@ IMPORTANT: You can set the `AckMode` mode you prefer, but `RECORD` is suggested.
2828

2929
IMPORTANT: At this time this functionality doesn't support class level `@KafkaListener` annotations
3030

31+
When using a manual `AckMode` with `asyncAcks` set to true, the `DefaultErrorHandler` must be configured with `seekAfterError` set to `false`.
32+
Starting with versions 2.9.10, 3.0.8, this will be set to true unconditionally for such configurations.
33+
With earlier versions, it was necessary to override the `RetryConfigurationSupport.configureCustomizers()` method to set the property to `true`.
34+
35+
====
36+
[source, java]
37+
----
38+
@Override
39+
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
40+
customizersConfigurer.customizeErrorHandler(eh -> eh.setSeekAfterError(false));
41+
}
42+
----
43+
====
44+
45+
In addition, before those versions, using the default (logging) DLT handler was not compatible with any kind of manual `AckMode`, regardless of the `asyncAcks` property.
46+
3147
==== Back Off Delay Precision
3248

3349
===== Overview and Guarantees

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.springframework.kafka.support.KafkaUtils;
5353
import org.springframework.kafka.support.converter.MessagingMessageConverter;
5454
import org.springframework.kafka.support.converter.RecordMessageConverter;
55+
import org.springframework.lang.NonNull;
5556
import org.springframework.lang.Nullable;
5657
import org.springframework.messaging.Message;
5758
import org.springframework.messaging.MessageHeaders;
@@ -84,6 +85,8 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS
8485

8586
private static final SpelExpressionParser PARSER = new SpelExpressionParser();
8687

88+
private static final Acknowledgment NO_OP_ACK = new NoOpAck();
89+
8790
/**
8891
* Message used when no conversion is needed.
8992
*/
@@ -120,6 +123,8 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS
120123

121124
private boolean hasAckParameter;
122125

126+
private boolean noOpAck;
127+
123128
private boolean hasMetadataParameter;
124129

125130
private boolean messageReturnType;
@@ -334,25 +339,29 @@ protected Message<?> toMessagingMessage(ConsumerRecord<K, V> record, @Nullable A
334339
protected final Object invokeHandler(Object data, @Nullable Acknowledgment acknowledgment, Message<?> message,
335340
Consumer<?, ?> consumer) {
336341

342+
Acknowledgment ack = acknowledgment;
343+
if (ack == null && this.noOpAck) {
344+
ack = NO_OP_ACK;
345+
}
337346
try {
338347
if (data instanceof List && !this.isConsumerRecordList) {
339-
return this.handlerMethod.invoke(message, acknowledgment, consumer);
348+
return this.handlerMethod.invoke(message, ack, consumer);
340349
}
341350
else {
342351
if (this.hasMetadataParameter) {
343-
return this.handlerMethod.invoke(message, data, acknowledgment, consumer,
352+
return this.handlerMethod.invoke(message, data, ack, consumer,
344353
AdapterUtils.buildConsumerRecordMetadata(data));
345354
}
346355
else {
347-
return this.handlerMethod.invoke(message, data, acknowledgment, consumer);
356+
return this.handlerMethod.invoke(message, data, ack, consumer);
348357
}
349358
}
350359
}
351360
catch (org.springframework.messaging.converter.MessageConversionException ex) {
352-
throw checkAckArg(acknowledgment, message, new MessageConversionException("Cannot handle message", ex));
361+
throw checkAckArg(ack, message, new MessageConversionException("Cannot handle message", ex));
353362
}
354363
catch (MethodArgumentNotValidException ex) {
355-
throw checkAckArg(acknowledgment, message, ex);
364+
throw checkAckArg(ack, message, ex);
356365
}
357366
catch (MessagingException ex) {
358367
throw new ListenerExecutionFailedException(createMessagingErrorMessage("Listener method could not " +
@@ -588,6 +597,9 @@ protected Type determineInferredType(Method method) { // NOSONAR complexity
588597
boolean isNotConvertible = parameterIsType(parameterType, ConsumerRecord.class);
589598
boolean isAck = parameterIsType(parameterType, Acknowledgment.class);
590599
this.hasAckParameter |= isAck;
600+
if (isAck) {
601+
this.noOpAck |= methodParameter.getParameterAnnotation(NonNull.class) != null;
602+
}
591603
isNotConvertible |= isAck;
592604
boolean isConsumer = parameterIsType(parameterType, Consumer.class);
593605
isNotConvertible |= isConsumer;
@@ -767,4 +779,12 @@ public Object getResult() {
767779

768780
}
769781

782+
static class NoOpAck implements Acknowledgment {
783+
784+
@Override
785+
public void acknowledge() {
786+
}
787+
788+
}
789+
770790
}

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 the original author or authors.
2+
* Copyright 2018-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,6 +29,7 @@
2929
import org.springframework.kafka.listener.CommonErrorHandler;
3030
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
3131
import org.springframework.kafka.listener.ContainerProperties;
32+
import org.springframework.kafka.listener.ContainerProperties.AckMode;
3233
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
3334
import org.springframework.kafka.listener.DefaultErrorHandler;
3435
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
@@ -208,12 +209,23 @@ private class RetryTopicListenerContainerFactoryDecorator
208209
return decorate(this.delegate.createListenerContainer(endpoint));
209210
}
210211

211-
private ConcurrentMessageListenerContainer<?, ?> decorate(ConcurrentMessageListenerContainer<?, ?> listenerContainer) {
212+
private ConcurrentMessageListenerContainer<?, ?> decorate(
213+
ConcurrentMessageListenerContainer<?, ?> listenerContainer) {
214+
215+
CommonErrorHandler errorHandler = createErrorHandler(
216+
ListenerContainerFactoryConfigurer.this.deadLetterPublishingRecovererFactory.create(),
217+
this.configuration);
218+
if (listenerContainer.getContainerProperties().isAsyncAcks()) {
219+
AckMode ackMode = listenerContainer.getContainerProperties().getAckMode();
220+
if ((AckMode.MANUAL.equals(ackMode) || AckMode.MANUAL_IMMEDIATE.equals(ackMode))
221+
&& errorHandler instanceof DefaultErrorHandler) {
222+
((DefaultErrorHandler) errorHandler).setSeekAfterError(false);
223+
}
224+
}
212225
listenerContainer
213-
.setCommonErrorHandler(createErrorHandler(
214-
ListenerContainerFactoryConfigurer.this.deadLetterPublishingRecovererFactory.create(),
215-
this.configuration));
216-
setupBackoffAwareMessageListenerAdapter(listenerContainer, this.configuration, this.isSetContainerProperties);
226+
.setCommonErrorHandler(errorHandler);
227+
setupBackoffAwareMessageListenerAdapter(listenerContainer, this.configuration,
228+
this.isSetContainerProperties);
217229
return listenerContainer;
218230
}
219231

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,10 @@
3535
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
3636
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
3737
import org.springframework.kafka.listener.ListenerUtils;
38+
import org.springframework.kafka.support.Acknowledgment;
3839
import org.springframework.kafka.support.EndpointHandlerMethod;
3940
import org.springframework.kafka.support.TopicForRetryable;
41+
import org.springframework.lang.NonNull;
4042
import org.springframework.lang.Nullable;
4143

4244

@@ -448,15 +450,17 @@ static class LoggingDltListenerHandlerMethod {
448450
public static final String DEFAULT_DLT_METHOD_NAME = "logMessage";
449451

450452
@SuppressWarnings("deprecation")
451-
public void logMessage(Object message) {
453+
public void logMessage(Object message, @NonNull Acknowledgment ack) {
452454
if (message instanceof ConsumerRecord) {
453455
LOGGER.info(() -> "Received message in dlt listener: "
454456
+ ListenerUtils.recordToString((ConsumerRecord<?, ?>) message));
455457
}
456458
else {
457459
LOGGER.info(() -> "Received message in dlt listener.");
458460
}
461+
ack.acknowledge();
459462
}
463+
460464
}
461465

462466
}

spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/KafkaBackoffAwareMessageListenerAdapterTests.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2021 the original author or authors.
2+
* Copyright 2019-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.assertj.core.api.Assertions.assertThatThrownBy;
21+
import static org.mockito.ArgumentMatchers.any;
2122
import static org.mockito.ArgumentMatchers.eq;
2223
import static org.mockito.ArgumentMatchers.isNull;
2324
import static org.mockito.BDDMockito.given;
@@ -147,7 +148,7 @@ void shouldCallBackoffManagerIfBackoffHeaderIsPresentAndFirstMethodIsCalled() {
147148
then(kafkaConsumerBackoffManager).should(times(1))
148149
.backOffIfNecessary(context);
149150

150-
then(delegate).should(times(1)).onMessage(data, null, null);
151+
then(delegate).should(times(1)).onMessage(eq(data), any(), isNull());
151152
}
152153

153154
@Test
@@ -159,7 +160,7 @@ void shouldWrapExceptionInTimestampedException() {
159160
given(kafkaConsumerBackoffManager.createContext(originalTimestamp, listenerId, topicPartition, null))
160161
.willReturn(context);
161162
RuntimeException thrownException = new RuntimeException();
162-
willThrow(thrownException).given(delegate).onMessage(data, null, null);
163+
willThrow(thrownException).given(delegate).onMessage(eq(data), any(), isNull());
163164

164165
KafkaBackoffAwareMessageListenerAdapter<Object, Object> backoffAwareMessageListenerAdapter =
165166
new KafkaBackoffAwareMessageListenerAdapter<>(delegate, kafkaConsumerBackoffManager, listenerId, clock);
@@ -175,7 +176,7 @@ void shouldWrapExceptionInTimestampedException() {
175176
then(kafkaConsumerBackoffManager).should(times(1))
176177
.backOffIfNecessary(context);
177178

178-
then(delegate).should(times(1)).onMessage(data, null, null);
179+
then(delegate).should(times(1)).onMessage(eq(data), any(), isNull());
179180
}
180181

181182
@Test
@@ -224,7 +225,7 @@ void shouldCallBackoffManagerIfBackoffHeaderIsPresentAndThirdMethodIsCalled() {
224225
then(kafkaConsumerBackoffManager).should(times(1))
225226
.backOffIfNecessary(context);
226227

227-
then(delegate).should(times(1)).onMessage(data, null, consumer);
228+
then(delegate).should(times(1)).onMessage(eq(data), any(), eq(consumer));
228229
}
229230

230231
@Test

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 the original author or authors.
2+
* Copyright 2018-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,6 +24,7 @@
2424
import static org.mockito.BDDMockito.then;
2525
import static org.mockito.BDDMockito.willReturn;
2626
import static org.mockito.BDDMockito.willThrow;
27+
import static org.mockito.Mockito.mock;
2728
import static org.mockito.Mockito.times;
2829

2930
import java.lang.reflect.Method;
@@ -52,6 +53,7 @@
5253
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
5354
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
5455
import org.springframework.kafka.listener.ListenerUtils;
56+
import org.springframework.kafka.support.Acknowledgment;
5557
import org.springframework.kafka.support.EndpointHandlerMethod;
5658
import org.springframework.kafka.test.condition.LogLevels;
5759
import org.springframework.test.util.ReflectionTestUtils;
@@ -367,7 +369,7 @@ void shouldLogConsumerRecordMessage() {
367369
ListenerUtils.setLogOnlyMetadata(true);
368370
RetryTopicConfigurer.LoggingDltListenerHandlerMethod method =
369371
new RetryTopicConfigurer.LoggingDltListenerHandlerMethod();
370-
method.logMessage(consumerRecordMessage);
372+
method.logMessage(consumerRecordMessage, mock(Acknowledgment.class));
371373
then(consumerRecordMessage).should().topic();
372374
ListenerUtils.setLogOnlyMetadata(false);
373375
}
@@ -376,7 +378,7 @@ void shouldLogConsumerRecordMessage() {
376378
void shouldNotLogObjectMessage() {
377379
RetryTopicConfigurer.LoggingDltListenerHandlerMethod method =
378380
new RetryTopicConfigurer.LoggingDltListenerHandlerMethod();
379-
method.logMessage(objectMessage);
381+
method.logMessage(objectMessage, mock(Acknowledgment.class));
380382
then(objectMessage).shouldHaveNoInteractions();
381383
}
382384

0 commit comments

Comments
 (0)