Skip to content

Commit 129ebdc

Browse files
artembilangaryrussell
authored andcommitted
INT-4207: Fallback for replyChannel Resolution
JIRA: https://jira.spring.io/browse/INT-4207 Enhance `AbstractMessageProducingHandler` to fallback for `replyChannel` to the `reply` if it is `Message`. That lets to avoid extra `bridge` configuration afterwards to make that `reply` as `request` for the same `replyChannel` resolution. This situation happens in case of error handling when the request message is `ErrorMessage`, typically without original headers to properly consult. But at the same time `failedMessage` in the `MessagingException` has all required headers.
1 parent fd082db commit 129ebdc

File tree

3 files changed

+56
-4
lines changed

3 files changed

+56
-4
lines changed

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2016 the original author or authors.
2+
* Copyright 2014-2017 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -146,7 +146,8 @@ protected void produceOutput(Object reply, final Message<?> requestMessage) {
146146
if (getOutputChannel() == null) {
147147
Map<?, ?> routingSlipHeader = requestHeaders.get(IntegrationMessageHeaderAccessor.ROUTING_SLIP, Map.class);
148148
if (routingSlipHeader != null) {
149-
Assert.isTrue(routingSlipHeader.size() == 1, "The RoutingSlip header value must be a SingletonMap");
149+
Assert.isTrue(routingSlipHeader.size() == 1,
150+
"The RoutingSlip header value must be a SingletonMap");
150151
Object key = routingSlipHeader.keySet().iterator().next();
151152
Object value = routingSlipHeader.values().iterator().next();
152153
Assert.isInstanceOf(List.class, key, "The RoutingSlip key must be List");
@@ -174,6 +175,9 @@ else if (reply instanceof AbstractIntegrationMessageBuilder) {
174175

175176
if (replyChannel == null) {
176177
replyChannel = requestHeaders.getReplyChannel();
178+
if (replyChannel == null && reply instanceof Message) {
179+
replyChannel = ((Message<?>) reply).getHeaders().getReplyChannel();
180+
}
177181
}
178182
}
179183

spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.springframework.integration.annotation.MessageEndpoint;
5050
import org.springframework.integration.annotation.MessagingGateway;
5151
import org.springframework.integration.annotation.ServiceActivator;
52+
import org.springframework.integration.channel.DirectChannel;
5253
import org.springframework.integration.channel.FixedSubscriberChannel;
5354
import org.springframework.integration.channel.QueueChannel;
5455
import org.springframework.integration.config.EnableIntegration;
@@ -58,7 +59,10 @@
5859
import org.springframework.integration.dsl.Pollers;
5960
import org.springframework.integration.dsl.channel.MessageChannels;
6061
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
62+
import org.springframework.integration.handler.GenericHandler;
63+
import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer;
6164
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
65+
import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice;
6266
import org.springframework.integration.scheduling.PollerMetadata;
6367
import org.springframework.integration.store.MessageStore;
6468
import org.springframework.integration.store.SimpleMessageStore;
@@ -71,6 +75,7 @@
7175
import org.springframework.messaging.MessageDeliveryException;
7276
import org.springframework.messaging.MessageHandler;
7377
import org.springframework.messaging.MessageHeaders;
78+
import org.springframework.messaging.MessagingException;
7479
import org.springframework.messaging.PollableChannel;
7580
import org.springframework.messaging.SubscribableChannel;
7681
import org.springframework.messaging.support.ErrorMessage;
@@ -80,7 +85,6 @@
8085
import org.springframework.stereotype.Component;
8186
import org.springframework.stereotype.Service;
8287
import org.springframework.test.annotation.DirtiesContext;
83-
import org.springframework.test.context.ContextConfiguration;
8488
import org.springframework.test.context.junit4.SpringRunner;
8589

8690
/**
@@ -415,6 +419,13 @@ public void testSubscribersSubFlows() {
415419
assertEquals(6, receive3.getPayload());
416420
}
417421

422+
@Autowired
423+
private ErrorRecovererFlowGateway errorRecovererFlowGateway;
424+
425+
@Test
426+
public void testReplyChannelFromReplyMessage() {
427+
assertEquals("foo", this.errorRecovererFlowGateway.testIt("foo"));
428+
}
418429

419430
@MessagingGateway
420431
public interface ControlBusGateway {
@@ -681,6 +692,42 @@ public MessageChannel gatewayError() {
681692
return MessageChannels.queue().get();
682693
}
683694

695+
@Bean
696+
public IntegrationFlow errorRecovererFlow() {
697+
return IntegrationFlows.from(ErrorRecovererFlowGateway.class)
698+
.handle((GenericHandler<?>) (p, h) -> {
699+
throw new RuntimeException("intentional");
700+
}, e -> e.advice(retryAdvice()))
701+
.get();
702+
}
703+
704+
@Bean
705+
public RequestHandlerRetryAdvice retryAdvice() {
706+
RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
707+
requestHandlerRetryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(recoveryChannel()));
708+
return requestHandlerRetryAdvice;
709+
}
710+
711+
@Bean
712+
public MessageChannel recoveryChannel() {
713+
return new DirectChannel();
714+
}
715+
716+
@Bean
717+
public IntegrationFlow recoveryFlow() {
718+
return IntegrationFlows.from(recoveryChannel())
719+
.<MessagingException, Message>transform(MessagingException::getFailedMessage)
720+
.get();
721+
722+
}
723+
724+
}
725+
726+
@MessagingGateway
727+
private interface ErrorRecovererFlowGateway {
728+
729+
String testIt(String payload);
730+
684731
}
685732

686733
@Service

src/reference/asciidoc/service-activator.adoc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,13 @@ To determine the reply channel, it will first check if an "output-channel" was p
3434
ref="somePojo" method="someMethod"/>
3535
----
3636

37-
If the method returns a result and no "output-channel" is defined, the framework will then check the Message's `replyChannel` header value.
37+
If the method returns a result and no "output-channel" is defined, the framework will then check the request Message's `replyChannel` header value.
3838
If that value is available, it will then check its type.
3939
If it is a`MessageChannel`, the reply message will be sent to that channel.
4040
If it is a `String`, then the endpoint will attempt to resolve the channel name to a channel instance.
4141
If the channel cannot be resolved, then a `DestinationResolutionException` will be thrown.
4242
It it can be resolved, the Message will be sent there.
43+
If the request Message doesn't have `replyChannel` header and and the `reply` object is a `Message`, its `replyChannel` header is consulted for a target destination.
4344
This is the technique used for Request Reply messaging in Spring Integration, and it is also an example of the Return Address pattern.
4445

4546
If your method returns a result, and you want to discard it and end the flow, you should configure the `output-channel` to send to a `NullChannel`.

0 commit comments

Comments
 (0)