Skip to content

Commit bd4e041

Browse files
committed
GH-10018: Implement async for Java DSL gateway()
Fixes: #10018 The `GatewayEndpointSpec` configuration of the `gateway()` operator support already an `async(true)` option. However, it is silently ignored internally since no real async contract provided for the gateway proxy. * Introduce the `AsyncRequestReplyExchanger` interface to use instead of `RequestReplyExchanger`, when `gateway()` operator is opted-in for the `async(true)` * Use this new `AsyncRequestReplyExchanger` in the `GatewayMessageHandler` when `async(true)` * Also, expose a `GatewayEndpointSpec.asyncExecutor(Executor)` option to support async behavior similar to the `@MessagingGateway`
1 parent 53cccd0 commit bd4e041

File tree

8 files changed

+158
-28
lines changed

8 files changed

+158
-28
lines changed

spring-integration-core/src/main/java/org/springframework/integration/dsl/GatewayEndpointSpec.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2023 the original author or authors.
2+
* Copyright 2016-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,7 +16,10 @@
1616

1717
package org.springframework.integration.dsl;
1818

19+
import java.util.concurrent.Executor;
20+
1921
import org.springframework.integration.gateway.GatewayMessageHandler;
22+
import org.springframework.lang.Nullable;
2023
import org.springframework.messaging.MessageChannel;
2124

2225
/**
@@ -110,4 +113,17 @@ public GatewayEndpointSpec errorOnTimeout(boolean errorOnTimeout) {
110113
return this;
111114
}
112115

116+
/**
117+
* Set an {@link Executor} for async request-reply scenarios.
118+
* @param executor the executor to use.
119+
* @return the spec.
120+
* @since 6.5
121+
* @see org.springframework.integration.gateway.GatewayProxyFactoryBean#setAsyncExecutor(Executor)
122+
*/
123+
124+
public GatewayEndpointSpec asyncExecutor(@Nullable Executor executor) {
125+
this.handler.setAsyncExecutor(executor);
126+
return this;
127+
}
128+
113129
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.gateway;
18+
19+
import java.util.concurrent.CompletableFuture;
20+
21+
import org.springframework.messaging.Message;
22+
23+
/**
24+
* Messaging gateway contract for async request/reply Message exchange.
25+
*
26+
* @author Artem Bilan
27+
*
28+
* @since 6.5
29+
*
30+
* @see RequestReplyExchanger
31+
*/
32+
@FunctionalInterface
33+
public interface AsyncRequestReplyExchanger {
34+
35+
CompletableFuture<Message<?>> exchange(Message<?> request);
36+
37+
}

spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayMessageHandler.java

Lines changed: 84 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2023 the original author or authors.
2+
* Copyright 2016-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,14 +16,17 @@
1616

1717
package org.springframework.integration.gateway;
1818

19+
import java.util.concurrent.CompletableFuture;
20+
import java.util.concurrent.Executor;
21+
import java.util.concurrent.Future;
1922
import java.util.concurrent.locks.Lock;
2023
import java.util.concurrent.locks.ReentrantLock;
2124

2225
import org.springframework.beans.factory.BeanCreationException;
23-
import org.springframework.beans.factory.BeanFactory;
2426
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
2527
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
2628
import org.springframework.integration.support.management.ManageableLifecycle;
29+
import org.springframework.lang.Nullable;
2730
import org.springframework.messaging.Message;
2831
import org.springframework.messaging.MessageChannel;
2932

@@ -37,52 +40,79 @@
3740
*/
3841
public class GatewayMessageHandler extends AbstractReplyProducingMessageHandler implements ManageableLifecycle {
3942

40-
private final GatewayProxyFactoryBean<?> gatewayProxyFactoryBean;
43+
private final Lock lock = new ReentrantLock();
44+
45+
private volatile GatewayProxyFactoryBean<?> gatewayProxyFactoryBean;
4146

42-
private volatile RequestReplyExchanger exchanger;
47+
private volatile Object exchanger;
4348

4449
private volatile boolean running;
4550

46-
private final Lock lock = new ReentrantLock();
51+
private MessageChannel requestChannel;
4752

48-
public GatewayMessageHandler() {
49-
this.gatewayProxyFactoryBean = new GatewayProxyFactoryBean<>();
50-
}
53+
private String requestChannelName;
54+
55+
private MessageChannel replyChannel;
56+
57+
private String replyChannelName;
58+
59+
private MessageChannel errorChannel;
60+
61+
private String errorChannelName;
62+
63+
private Long requestTimeout;
64+
65+
private Long replyTimeout;
66+
67+
private boolean errorOnTimeout;
68+
69+
private Executor executor;
5170

5271
public void setRequestChannel(MessageChannel requestChannel) {
53-
this.gatewayProxyFactoryBean.setDefaultRequestChannel(requestChannel);
72+
this.requestChannel = requestChannel;
5473
}
5574

5675
public void setRequestChannelName(String requestChannel) {
57-
this.gatewayProxyFactoryBean.setDefaultRequestChannelName(requestChannel);
76+
this.requestChannelName = requestChannel;
5877
}
5978

6079
public void setReplyChannel(MessageChannel replyChannel) {
61-
this.gatewayProxyFactoryBean.setDefaultReplyChannel(replyChannel);
80+
this.replyChannel = replyChannel;
6281
}
6382

6483
public void setReplyChannelName(String replyChannel) {
65-
this.gatewayProxyFactoryBean.setDefaultReplyChannelName(replyChannel);
84+
this.replyChannelName = replyChannel;
6685
}
6786

6887
public void setErrorChannel(MessageChannel errorChannel) {
69-
this.gatewayProxyFactoryBean.setErrorChannel(errorChannel);
88+
this.errorChannel = errorChannel;
7089
}
7190

7291
public void setErrorChannelName(String errorChannel) {
73-
this.gatewayProxyFactoryBean.setErrorChannelName(errorChannel);
92+
this.errorChannelName = errorChannel;
7493
}
7594

7695
public void setRequestTimeout(Long requestTimeout) {
77-
this.gatewayProxyFactoryBean.setDefaultRequestTimeout(requestTimeout);
96+
this.requestTimeout = requestTimeout;
7897
}
7998

8099
public void setReplyTimeout(Long replyTimeout) {
81-
this.gatewayProxyFactoryBean.setDefaultReplyTimeout(replyTimeout);
100+
this.replyTimeout = replyTimeout;
82101
}
83102

84103
public void setErrorOnTimeout(boolean errorOnTimeout) {
85-
this.gatewayProxyFactoryBean.setErrorOnTimeout(errorOnTimeout);
104+
this.errorOnTimeout = errorOnTimeout;
105+
}
106+
107+
/**
108+
* Set the executor for use when the gateway method returns
109+
* {@link Future} or {@link CompletableFuture}.
110+
* Set it to null to disable the async processing, and any
111+
* {@link Future} return types must be returned by the downstream flow.
112+
* @param executor The executor.
113+
*/
114+
public void setAsyncExecutor(@Nullable Executor executor) {
115+
this.executor = executor;
86116
}
87117

88118
@Override
@@ -98,18 +128,38 @@ protected Object handleRequestMessage(Message<?> requestMessage) {
98128
this.lock.unlock();
99129
}
100130
}
101-
return this.exchanger.exchange(requestMessage);
131+
return isAsync()
132+
? ((AsyncRequestReplyExchanger) this.exchanger).exchange(requestMessage)
133+
: ((RequestReplyExchanger) this.exchanger).exchange(requestMessage);
102134
}
103135

104136
private void initialize() {
105-
BeanFactory beanFactory = getBeanFactory();
137+
if (isAsync()) {
138+
this.gatewayProxyFactoryBean = new GatewayProxyFactoryBean<>(AsyncRequestReplyExchanger.class);
139+
}
140+
else {
141+
this.gatewayProxyFactoryBean = new GatewayProxyFactoryBean<>(RequestReplyExchanger.class);
142+
}
143+
144+
this.gatewayProxyFactoryBean.setDefaultRequestChannel(this.requestChannel);
145+
this.gatewayProxyFactoryBean.setDefaultRequestChannelName(this.requestChannelName);
146+
this.gatewayProxyFactoryBean.setDefaultReplyChannel(this.replyChannel);
147+
this.gatewayProxyFactoryBean.setDefaultReplyChannelName(this.replyChannelName);
148+
this.gatewayProxyFactoryBean.setErrorChannel(this.errorChannel);
149+
this.gatewayProxyFactoryBean.setErrorChannelName(this.errorChannelName);
150+
this.gatewayProxyFactoryBean.setAsyncExecutor(this.executor);
151+
if (this.requestTimeout != null) {
152+
this.gatewayProxyFactoryBean.setDefaultRequestTimeout(this.requestTimeout);
153+
}
154+
if (this.replyTimeout != null) {
155+
this.gatewayProxyFactoryBean.setDefaultReplyTimeout(this.replyTimeout);
156+
}
106157

107-
if (beanFactory instanceof ConfigurableListableBeanFactory) {
108-
((ConfigurableListableBeanFactory) beanFactory).initializeBean(this.gatewayProxyFactoryBean,
109-
getComponentName() + "#gpfb");
158+
if (getBeanFactory() instanceof ConfigurableListableBeanFactory configurableListableBeanFactory) {
159+
configurableListableBeanFactory.initializeBean(this.gatewayProxyFactoryBean, getComponentName() + "#gpfb");
110160
}
111161
try {
112-
this.exchanger = (RequestReplyExchanger) this.gatewayProxyFactoryBean.getObject();
162+
this.exchanger = this.gatewayProxyFactoryBean.getObject();
113163
}
114164
catch (Exception e) {
115165
throw new BeanCreationException("Can't instantiate the GatewayProxyFactoryBean: " + this, e);
@@ -123,6 +173,17 @@ private void initialize() {
123173

124174
@Override
125175
public void start() {
176+
if (this.exchanger == null) {
177+
this.lock.lock();
178+
try {
179+
if (this.exchanger == null) {
180+
initialize();
181+
}
182+
}
183+
finally {
184+
this.lock.unlock();
185+
}
186+
}
126187
this.gatewayProxyFactoryBean.start();
127188
this.running = true;
128189
}

spring-integration-core/src/main/java/org/springframework/integration/gateway/RequestReplyExchanger.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,6 +28,8 @@
2828
* @author Artem Bilan
2929
*
3030
* @since 2.0
31+
*
32+
* @see AsyncRequestReplyExchanger
3133
*/
3234
@FunctionalInterface
3335
public interface RequestReplyExchanger {

spring-integration-core/src/test/java/org/springframework/integration/dsl/flowservices/FlowServiceTests.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2024 the original author or authors.
2+
* Copyright 2016-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -118,6 +118,7 @@ public void testGatewayExplicitReplyChannel() {
118118
Message<?> message = replyChannel.receive(10000);
119119
assertThat(message).isNotNull();
120120
assertThat(message.getPayload()).isEqualTo("FOO");
121+
assertThat(message.getHeaders().get("currentThread", String.class)).startsWith("SimpleAsyncTaskExecutor-");
121122
}
122123

123124
@Autowired
@@ -155,8 +156,9 @@ public static class ContextConfiguration {
155156

156157
@Bean
157158
public IntegrationFlow testGateway() {
158-
return f -> f.gateway("processChannel", g -> g.replyChannel("replyChannel"))
159-
.log();
159+
return f -> f.gateway("processChannel", g -> g.replyChannel("replyChannel").async(true))
160+
.enrichHeaders(headers ->
161+
headers.headerExpression("currentThread", "T (Thread).currentThread().name"));
160162
}
161163

162164
@Bean

src/reference/antora/modules/ROOT/pages/dsl/java-gateway.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,6 @@ private static IntegrationFlow subFlow() {
3434
IMPORTANT: If the downstream flow does not always return a reply, you should set the `requestTimeout` to 0 to prevent hanging the calling thread indefinitely.
3535
In that case, the flow will end at that point and the thread released for further work.
3636

37+
Starting with version 6.5, this `gateway()` operator fully supports an `async(true)` behaviour.
38+
Internally, an `AsyncRequestReplyExchanger` service interface is provided for the `GatewayProxyFactoryBean`.
39+
And since `AsyncRequestReplyExchanger` contract is a `CompletableFuture<Message<?>>`, the whole request-reply is executed in asynchronous manner.

src/reference/antora/modules/ROOT/pages/gateway.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,12 @@ int finalResult = result.get(1000, TimeUnit.SECONDS);
560560

561561
For a more detailed example, see the https://github.com/spring-projects/spring-integration-samples/tree/main/intermediate/async-gateway[async-gateway] sample in the Spring Integration samples.
562562

563+
Also, starting with version 6.5, the Java DSL `gateway()` operator fully supports an `async(true)` behaviour.
564+
Internally, an `AsyncRequestReplyExchanger` service interface is provided for the `GatewayProxyFactoryBean`.
565+
And since `AsyncRequestReplyExchanger` contract is a `CompletableFuture<Message<?>>`, the whole request-reply is executed in asynchronous manner.
566+
This behavior is useful, for example, in case of splitter-aggregator scenario when another flow has to be called for each item.
567+
However, the order is not important - only their group gathering on the aggregator after all processing.
568+
563569
[[gateway-asynctaskexecutor]]
564570
=== `AsyncTaskExecutor`
565571

src/reference/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ The `AbstractMessageChannel` beans now throw a special `MessageDispatchingExcept
3131
In general, it is a design error to try to produce a message from `afterPropertiesSet()`, `@PostConstruct` or bean definition methods.
3232
The `SmartLifecycle.start()` is preferred way for this kind of logic, or better to do that via inbound channel adapters.
3333

34+
The Java DSL `gateway()` operator now fully supports an `async(true)` behavior.
35+
See xref:gateway.adoc[] for more information.
36+
3437
[[x6.5-lock-request-handler-advice]]
3538
== The `LockRequestHandlerAdvice`
3639

0 commit comments

Comments
 (0)