Skip to content

Commit 2dea91b

Browse files
committed
GH-8631: Add retry options to JMS inbound components
Fixes: #8631 The `ChannelPublishingJmsMessageListener` (and respective Java DSL specs) can now be supplied with a `RetryTemplate` & `RecoveryCallback` for retying `send` & `send-and-receive` operations in the internal gateway implementation. The `JmsMessageHeaderErrorMessageStrategy` was introduced to have access into a raw JMS message from retry context. The functionality is modeled after `AmqpInboundChannelAdapter`
1 parent ffe3605 commit 2dea91b

File tree

8 files changed

+276
-17
lines changed

8 files changed

+276
-17
lines changed

spring-integration-jms/src/main/java/org/springframework/integration/jms/ChannelPublishingJmsMessageListener.java

Lines changed: 120 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2023 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.jms;
1818

1919
import java.util.Map;
20+
import java.util.concurrent.atomic.AtomicInteger;
2021

2122
import io.micrometer.observation.ObservationRegistry;
2223
import jakarta.jms.DeliveryMode;
@@ -30,13 +31,18 @@
3031
import org.springframework.beans.factory.BeanFactory;
3132
import org.springframework.beans.factory.BeanFactoryAware;
3233
import org.springframework.beans.factory.InitializingBean;
34+
import org.springframework.core.AttributeAccessor;
3335
import org.springframework.core.log.LogAccessor;
3436
import org.springframework.expression.Expression;
3537
import org.springframework.expression.spel.support.StandardEvaluationContext;
38+
import org.springframework.integration.IntegrationMessageHeaderAccessor;
39+
import org.springframework.integration.StaticMessageHeaderAccessor;
3640
import org.springframework.integration.core.MessagingTemplate;
3741
import org.springframework.integration.expression.ExpressionUtils;
3842
import org.springframework.integration.gateway.MessagingGatewaySupport;
43+
import org.springframework.integration.jms.support.JmsMessageHeaderErrorMessageStrategy;
3944
import org.springframework.integration.support.DefaultMessageBuilderFactory;
45+
import org.springframework.integration.support.ErrorMessageUtils;
4046
import org.springframework.integration.support.MessageBuilderFactory;
4147
import org.springframework.integration.support.management.TrackableComponent;
4248
import org.springframework.integration.support.management.metrics.MetricsCaptor;
@@ -54,6 +60,10 @@
5460
import org.springframework.messaging.MessageChannel;
5561
import org.springframework.messaging.MessagingException;
5662
import org.springframework.messaging.support.ErrorMessage;
63+
import org.springframework.retry.RecoveryCallback;
64+
import org.springframework.retry.RetryOperations;
65+
import org.springframework.retry.support.RetrySynchronizationManager;
66+
import org.springframework.retry.support.RetryTemplate;
5767
import org.springframework.util.Assert;
5868

5969
/**
@@ -347,6 +357,30 @@ public void setReceiverObservationConvention(
347357
this.gatewayDelegate.setReceiverObservationConvention(observationConvention);
348358
}
349359

360+
/**
361+
* Set a {@link RetryTemplate} to use for retrying a message delivery within the
362+
* adapter. Unlike adding retry at the container level, this can be used with an
363+
* {@code ErrorMessageSendingRecoverer} {@link RecoveryCallback} to publish to the
364+
* error channel after retries are exhausted. You generally should not configure an
365+
* error channel when using retry here, use a {@link RecoveryCallback} instead.
366+
* @param retryTemplate the template.
367+
* @since 6.3
368+
* @see #setRecoveryCallback(RecoveryCallback)
369+
*/
370+
public void setRetryTemplate(RetryTemplate retryTemplate) {
371+
this.gatewayDelegate.retryTemplate = retryTemplate;
372+
}
373+
374+
/**
375+
* Set a {@link RecoveryCallback} when using retry within the adapter.
376+
* @param recoveryCallback the callback.
377+
* @since 6.3
378+
* @see #setRetryTemplate(RetryTemplate)
379+
*/
380+
public void setRecoveryCallback(RecoveryCallback<Message<?>> recoveryCallback) {
381+
this.gatewayDelegate.recoveryCallback = recoveryCallback;
382+
}
383+
350384
@Override
351385
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
352386
this.beanFactory = beanFactory;
@@ -367,6 +401,9 @@ public void onMessage(jakarta.jms.Message jmsMessage, Session session) throws JM
367401
}
368402

369403
Map<String, Object> headers = this.headerMapper.toHeaders(jmsMessage);
404+
if (this.gatewayDelegate.retryTemplate != null) {
405+
headers.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, new AtomicInteger());
406+
}
370407
requestMessage =
371408
(result instanceof Message<?>) ?
372409
this.messageBuilderFactory.fromMessage((Message<?>) result).copyHeaders(headers).build() :
@@ -385,10 +422,10 @@ public void onMessage(jakarta.jms.Message jmsMessage, Session session) throws JM
385422
}
386423

387424
if (!this.expectReply) {
388-
this.gatewayDelegate.send(requestMessage);
425+
this.gatewayDelegate.send(jmsMessage, requestMessage);
389426
}
390427
else {
391-
Message<?> replyMessage = this.gatewayDelegate.sendAndReceiveMessage(requestMessage);
428+
Message<?> replyMessage = this.gatewayDelegate.sendAndReceiveMessage(jmsMessage, requestMessage);
392429
if (replyMessage != null) {
393430
Destination destination = getReplyDestination(jmsMessage, session);
394431
this.logger.debug(() -> "Reply destination: " + destination);
@@ -424,6 +461,12 @@ public void afterPropertiesSet() {
424461
this.gatewayDelegate.setBeanFactory(this.beanFactory);
425462
}
426463
this.gatewayDelegate.afterPropertiesSet();
464+
if (this.gatewayDelegate.retryTemplate != null) {
465+
Assert.state(this.gatewayDelegate.getErrorChannel() == null,
466+
"Cannot have an 'errorChannel' property when a 'RetryTemplate' is "
467+
+ "provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to "
468+
+ "send an error message when retries are exhausted");
469+
}
427470
this.messageBuilderFactory = IntegrationUtils.getMessageBuilderFactory(this.beanFactory);
428471
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(this.beanFactory);
429472
}
@@ -551,21 +594,65 @@ private record DestinationNameHolder(String name, boolean isTopic) {
551594

552595
private class GatewayDelegate extends MessagingGatewaySupport {
553596

597+
private static final ThreadLocal<AttributeAccessor> ATTRIBUTES_HOLDER = new ThreadLocal<>();
598+
599+
@Nullable
600+
private RetryOperations retryTemplate;
601+
602+
@Nullable
603+
private RecoveryCallback<Message<?>> recoveryCallback;
604+
554605
GatewayDelegate() {
606+
setErrorMessageStrategy(new JmsMessageHeaderErrorMessageStrategy());
555607
}
556608

557-
@Override
558-
protected void send(Object request) { // NOSONAR - not useless, increases visibility
559-
super.send(request);
609+
private void send(jakarta.jms.Message jmsMessage, Message<?> requestMessage) {
610+
try {
611+
if (this.retryTemplate == null) {
612+
setAttributesIfNecessary(jmsMessage, requestMessage);
613+
send(requestMessage);
614+
}
615+
else {
616+
this.retryTemplate.execute(
617+
context -> {
618+
StaticMessageHeaderAccessor.getDeliveryAttempt(requestMessage).incrementAndGet();
619+
setAttributesIfNecessary(jmsMessage, requestMessage);
620+
send(requestMessage);
621+
return null;
622+
}, this.recoveryCallback);
623+
}
624+
}
625+
finally {
626+
if (this.retryTemplate == null) {
627+
ATTRIBUTES_HOLDER.remove();
628+
}
629+
}
560630
}
561631

562-
@Override
563-
protected Message<?> sendAndReceiveMessage(Object request) { // NOSONAR - not useless, increases visibility
564-
return super.sendAndReceiveMessage(request);
632+
private Message<?> sendAndReceiveMessage(jakarta.jms.Message jmsMessage, Message<?> requestMessage) {
633+
try {
634+
if (this.retryTemplate == null) {
635+
setAttributesIfNecessary(jmsMessage, requestMessage);
636+
return sendAndReceiveMessage(requestMessage);
637+
}
638+
else {
639+
return this.retryTemplate.execute(
640+
context -> {
641+
StaticMessageHeaderAccessor.getDeliveryAttempt(requestMessage).incrementAndGet();
642+
setAttributesIfNecessary(jmsMessage, requestMessage);
643+
return sendAndReceiveMessage(requestMessage);
644+
}, this.recoveryCallback);
645+
}
646+
}
647+
finally {
648+
if (this.retryTemplate == null) {
649+
ATTRIBUTES_HOLDER.remove();
650+
}
651+
}
565652
}
566653

567654
protected ErrorMessage buildErrorMessage(Throwable throwable) {
568-
return super.buildErrorMessage(null, throwable);
655+
return buildErrorMessage(null, throwable);
569656
}
570657

571658
protected MessagingTemplate getMessagingTemplate() {
@@ -582,6 +669,29 @@ public String getComponentType() {
582669
}
583670
}
584671

672+
@Override
673+
protected AttributeAccessor getErrorMessageAttributes(@Nullable Message<?> message) {
674+
AttributeAccessor attributes = ATTRIBUTES_HOLDER.get();
675+
return (attributes != null) ? attributes : super.getErrorMessageAttributes(message);
676+
}
677+
678+
private void setAttributesIfNecessary(Object jmsMessage, Message<?> message) {
679+
boolean needHolder = getErrorChannel() != null && this.retryTemplate == null;
680+
boolean needAttributes = needHolder || this.retryTemplate != null;
681+
if (needHolder) {
682+
ATTRIBUTES_HOLDER.set(ErrorMessageUtils.getAttributeAccessor(null, null));
683+
}
684+
if (needAttributes) {
685+
AttributeAccessor attributes =
686+
this.retryTemplate != null
687+
? RetrySynchronizationManager.getContext()
688+
: ATTRIBUTES_HOLDER.get();
689+
if (attributes != null) {
690+
attributes.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY, message);
691+
attributes.setAttribute(JmsMessageHeaderErrorMessageStrategy.JMS_RAW_MESSAGE, jmsMessage);
692+
}
693+
}
694+
}
585695
}
586696

587697
}

spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsInboundGatewaySpec.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.springframework.jms.listener.AbstractMessageListenerContainer;
3333
import org.springframework.jms.support.converter.MessageConverter;
3434
import org.springframework.jms.support.destination.DestinationResolver;
35+
import org.springframework.retry.RecoveryCallback;
36+
import org.springframework.retry.support.RetryTemplate;
3537
import org.springframework.util.Assert;
3638

3739
/**
@@ -221,9 +223,35 @@ public S extractReplyPayload(boolean extractReplyPayload) {
221223
}
222224

223225
/**
224-
* Set to false to prevent listener container shutdown when the endpoint is stopped.
226+
* Set a {@link RetryTemplate} to use for retrying a message delivery within the
227+
* adapter. Unlike adding retry at the container level, this can be used with an
228+
* {@code ErrorMessageSendingRecoverer} {@link RecoveryCallback} to publish to the
229+
* error channel after retries are exhausted. You generally should not configure an
230+
* error channel when using retry here, use a {@link RecoveryCallback} instead.
231+
* @param retryTemplate the template.
232+
* @since 6.3
233+
* @see #recoveryCallback(RecoveryCallback)
234+
*/
235+
public S retryTemplate(RetryTemplate retryTemplate) {
236+
this.target.getListener().setRetryTemplate(retryTemplate);
237+
return _this();
238+
}
239+
240+
/**
241+
* Set a {@link RecoveryCallback} when using retry within the adapter.
242+
* @param recoveryCallback the callback.
243+
* @since 6.3
244+
* @see #retryTemplate(RetryTemplate)
245+
*/
246+
public S recoveryCallback(RecoveryCallback<org.springframework.messaging.Message<?>> recoveryCallback) {
247+
this.target.getListener().setRecoveryCallback(recoveryCallback);
248+
return _this();
249+
}
250+
251+
/**
252+
* Set to {@code false} to prevent listener container shutdown when the endpoint is stopped.
225253
* Then, if so configured, any cached consumer(s) in the container will remain.
226-
* Otherwise the shared connection and will be closed and the listener invokers shut
254+
* Otherwise, the shared connection and will be closed and the listener invokers shut
227255
* down; this behavior is new starting with version 5.1. Default: true.
228256
* @param shutdown false to not shutdown.
229257
* @return the spec.

spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsMessageDrivenChannelAdapterSpec.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2023 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,6 +29,9 @@
2929
import org.springframework.integration.jms.JmsMessageDrivenEndpoint;
3030
import org.springframework.jms.listener.AbstractMessageListenerContainer;
3131
import org.springframework.jms.support.converter.MessageConverter;
32+
import org.springframework.messaging.Message;
33+
import org.springframework.retry.RecoveryCallback;
34+
import org.springframework.retry.support.RetryTemplate;
3235
import org.springframework.util.Assert;
3336

3437
/**
@@ -93,6 +96,32 @@ public S shutdownContainerOnStop(boolean shutdown) {
9396
return _this();
9497
}
9598

99+
/**
100+
* Set a {@link RetryTemplate} to use for retrying a message delivery within the
101+
* adapter. Unlike adding retry at the container level, this can be used with an
102+
* {@code ErrorMessageSendingRecoverer} {@link RecoveryCallback} to publish to the
103+
* error channel after retries are exhausted. You generally should not configure an
104+
* error channel when using retry here, use a {@link RecoveryCallback} instead.
105+
* @param retryTemplate the template.
106+
* @since 6.3
107+
* @see #recoveryCallback(RecoveryCallback)
108+
*/
109+
public S retryTemplate(RetryTemplate retryTemplate) {
110+
this.target.getListener().setRetryTemplate(retryTemplate);
111+
return _this();
112+
}
113+
114+
/**
115+
* Set a {@link RecoveryCallback} when using retry within the adapter.
116+
* @param recoveryCallback the callback.
117+
* @since 6.3
118+
* @see #retryTemplate(RetryTemplate)
119+
*/
120+
public S recoveryCallback(RecoveryCallback<Message<?>> recoveryCallback) {
121+
this.target.getListener().setRecoveryCallback(recoveryCallback);
122+
return _this();
123+
}
124+
96125
/**
97126
*
98127
* @param <S> the target {@link JmsListenerContainerSpec} implementation type.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.jms.support;
18+
19+
import java.util.HashMap;
20+
import java.util.Map;
21+
22+
import org.springframework.core.AttributeAccessor;
23+
import org.springframework.integration.IntegrationMessageHeaderAccessor;
24+
import org.springframework.integration.support.ErrorMessageStrategy;
25+
import org.springframework.integration.support.ErrorMessageUtils;
26+
import org.springframework.lang.Nullable;
27+
import org.springframework.messaging.Message;
28+
import org.springframework.messaging.support.ErrorMessage;
29+
30+
/**
31+
* An {@link ErrorMessageStrategy} extension that adds the raw JMS message as
32+
* a header to the {@link ErrorMessage}.
33+
*
34+
* @author Artem Bilan
35+
*
36+
* @since 6.3
37+
*
38+
*/
39+
public class JmsMessageHeaderErrorMessageStrategy implements ErrorMessageStrategy {
40+
41+
/**
42+
* Header name/retry context variable for the raw received message.
43+
*/
44+
public static final String JMS_RAW_MESSAGE = "jms_raw_message";
45+
46+
@Override
47+
public ErrorMessage buildErrorMessage(Throwable throwable, @Nullable AttributeAccessor context) {
48+
Object inputMessage = context == null ? null
49+
: context.getAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY);
50+
Map<String, Object> headers = new HashMap<>();
51+
if (context != null) {
52+
headers.put(JMS_RAW_MESSAGE, context.getAttribute(JMS_RAW_MESSAGE));
53+
headers.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, context.getAttribute(JMS_RAW_MESSAGE));
54+
}
55+
if (inputMessage instanceof Message) {
56+
return new ErrorMessage(throwable, headers, (Message<?>) inputMessage);
57+
}
58+
else {
59+
return new ErrorMessage(throwable, headers);
60+
}
61+
}
62+
63+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
/**
2+
* Provides JMS Components support classes.
3+
*/
4+
@org.springframework.lang.NonNullApi
5+
@org.springframework.lang.NonNullFields
6+
package org.springframework.integration.jms.support;

0 commit comments

Comments
 (0)