Skip to content

Commit 9ebec2c

Browse files
committed
* Allow Future<Void> as a reply type of the gateway request-reply operation
1 parent 00082b3 commit 9ebec2c

File tree

3 files changed

+45
-5
lines changed

3 files changed

+45
-5
lines changed

spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -564,15 +564,16 @@ private Object invokeGatewayMethod(MethodInvocation invocation, boolean runningO
564564
}
565565
boolean shouldReturnMessage =
566566
Message.class.isAssignableFrom(gateway.returnType) || (!runningOnCallerThread && gateway.expectMessage);
567-
boolean shouldReply = !gateway.isVoidReturn;
567+
boolean oneWay =
568+
void.class.isAssignableFrom(gateway.returnType) || (gateway.isVoidReturn && !runningOnCallerThread);
568569
int paramCount = method.getParameterTypes().length;
569570
Object response;
570571
boolean hasPayloadExpression = findPayloadExpression(method);
571572
if (paramCount == 0 && !hasPayloadExpression) {
572-
response = receive(gateway, method, shouldReply, shouldReturnMessage);
573+
response = receive(gateway, method, !oneWay, shouldReturnMessage);
573574
}
574575
else {
575-
response = sendOrSendAndReceive(invocation, gateway, shouldReturnMessage, shouldReply);
576+
response = sendOrSendAndReceive(invocation, gateway, shouldReturnMessage, !oneWay);
576577
}
577578
return response(gateway.returnType, shouldReturnMessage, response);
578579
}
@@ -1099,7 +1100,7 @@ private boolean isVoidReturnType(ResolvableType resolvableType) {
10991100
if (Future.class.isAssignableFrom(this.returnType) || Mono.class.isAssignableFrom(this.returnType)) {
11001101
returnTypeToCheck = resolvableType.getGeneric(0).resolve(Object.class);
11011102
}
1102-
return Void.class.isAssignableFrom(returnTypeToCheck) || void.class.isAssignableFrom(returnTypeToCheck);
1103+
return Void.class.isAssignableFrom(returnTypeToCheck);
11031104
}
11041105

11051106
private void setPollable() {

spring-integration-core/src/test/java/org/springframework/integration/gateway/AsyncGatewayTests.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,18 @@
3333
import org.springframework.beans.DirectFieldAccessor;
3434
import org.springframework.beans.factory.BeanFactory;
3535
import org.springframework.integration.MessageDispatchingException;
36+
import org.springframework.integration.annotation.AnnotationConstants;
3637
import org.springframework.integration.annotation.Gateway;
3738
import org.springframework.integration.annotation.GatewayHeader;
39+
import org.springframework.integration.annotation.MessagingGateway;
3840
import org.springframework.integration.channel.DirectChannel;
3941
import org.springframework.integration.channel.NullChannel;
4042
import org.springframework.integration.channel.QueueChannel;
4143
import org.springframework.messaging.Message;
4244
import org.springframework.messaging.MessageChannel;
4345
import org.springframework.messaging.PollableChannel;
4446
import org.springframework.messaging.support.ChannelInterceptor;
47+
import org.springframework.messaging.support.GenericMessage;
4548
import org.springframework.messaging.support.MessageBuilder;
4649

4750
import reactor.core.publisher.Mono;
@@ -233,6 +236,35 @@ public void futureVoid() throws Exception {
233236
.withMessageContaining("intentional dispatcher error");
234237
}
235238

239+
@Test
240+
public void futureVoidReply() throws Exception {
241+
QueueChannel requestChannel = new QueueChannel();
242+
CountDownLatch readyForReplyLatch = new CountDownLatch(1);
243+
new Thread(() -> {
244+
try {
245+
Message<?> input = requestChannel.receive();
246+
CompletableFuture<Void> reply = new CompletableFuture<>();
247+
((MessageChannel) input.getHeaders().getReplyChannel()).send(new GenericMessage<>(reply));
248+
readyForReplyLatch.await(10, TimeUnit.SECONDS);
249+
reply.complete(null);
250+
}
251+
catch (InterruptedException e) {
252+
System.err.println(e);
253+
}
254+
}).start();
255+
GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean(TestEchoService.class);
256+
proxyFactory.setDefaultRequestChannel(requestChannel);
257+
proxyFactory.setBeanName("testGateway");
258+
proxyFactory.setBeanFactory(mock(BeanFactory.class));
259+
proxyFactory.setAsyncExecutor(null);
260+
proxyFactory.afterPropertiesSet();
261+
TestEchoService service = (TestEchoService) proxyFactory.getObject();
262+
Future<Void> f = service.sendAndReceiveFutureVoid("test");
263+
readyForReplyLatch.countDown();
264+
Object result = f.get(10, TimeUnit.SECONDS);
265+
assertThat(result).isNull();
266+
}
267+
236268
@Test
237269
public void monoWithMessageReturned() {
238270
QueueChannel requestChannel = new QueueChannel();
@@ -375,6 +407,7 @@ private interface TestEchoService {
375407

376408
Future<Void> asyncSendAndForget(String s);
377409

410+
Future<Void> sendAndReceiveFutureVoid(String s);
378411
Mono<Void> monoVoid(String s);
379412

380413
}

src/reference/asciidoc/gateway.adoc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -801,9 +801,15 @@ public interface MyGateway {
801801
====
802802

803803
But downstream exceptions are not going to be propagated back to the caller.
804-
To ensure asynchronous behaviour for downstream flow invocation and exception propagation to the caller, starting with version 6.0, the framework provides support for the `Future<Void>` and `Mono<Void>` return types.
804+
To ensure asynchronous behavior for downstream flow invocation and exception propagation to the caller, starting with version 6.0, the framework provides support for the `Future<Void>` and `Mono<Void>` return types.
805805
The use-case is similar to send-and-forget behavior described before for plain `void` return type, but with a difference that flow execution happens asynchronously and returned `Future` (or `Mono`) is complete with a `null` or exceptionally according to the `send` operation result.
806806

807+
NOTE: If the `Future<Void>` is exact downstream flow reply, then an `asyncExecutor` option of the gateway must be set to null (`AnnotationConstants.NULL` for a `@MessagingGateway` configuration) and the `send` part is performed on a producer thread.
808+
The reply one depends on the downstream flow configuration.
809+
This way it is up target application to produce a `Future<Void>` reply correctly.
810+
The `Mono` use-case is already out of the framework threading control, so setting `asyncExecutor` to null won't make sense.
811+
There `Mono<Void>` as a result of the request-reply gateway operation must be configured as a `Mono<?>` return type of the gateway method.
812+
807813
[[gateway-no-response]]
808814
==== Gateway Behavior When No response Arrives
809815

0 commit comments

Comments
 (0)