Skip to content

Commit 5fadaf5

Browse files
authored
GH-3635: Add Future<Void> & Mono<Void> to gateway (#3899)
* GH-3635: Add Future<Void> & Mono<Void> to gateway Fixes #3635 When `Future<Void>` & `Mono<Void>` is used as a messaging gateway return type, the application hangs out on this barrier which may lead to the out of memory eventually * Add support for the `Future<Void>` & `Mono<Void>` messaging gateway return type and ensure an asynchronous call for the `gateway.send(Message)` operation and its exception handling. In case of successful call, the `Future` is fulfilled with `null` and `Mono` is completed as empty * * Check for `void.class` as well in the `GatewayProxyFactoryBean.isVoidReturnType` * * Allow `Future<Void>` as a reply type of the gateway request-reply operation * * Fix Checkstyle violations * * Resolve `System.err.println()` in the test code * Add `Thread.currentThread.interrupt()` to the `InterruptedException` block in the test
1 parent 0eb6ae1 commit 5fadaf5

File tree

4 files changed

+123
-29
lines changed

4 files changed

+123
-29
lines changed

spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -564,15 +564,16 @@ private Object invokeGatewayMethod(MethodInvocation invocation, boolean runningO
564564
}
565565
boolean shouldReturnMessage =
566566
Message.class.isAssignableFrom(gateway.returnType) || (!runningOnCallerThread && gateway.expectMessage);
567-
boolean shouldReply = gateway.returnType != void.class;
567+
boolean oneWay =
568+
void.class.isAssignableFrom(gateway.returnType) || (gateway.isVoidReturn && !runningOnCallerThread);
568569
int paramCount = method.getParameterTypes().length;
569570
Object response;
570571
boolean hasPayloadExpression = findPayloadExpression(method);
571572
if (paramCount == 0 && !hasPayloadExpression) {
572-
response = receive(gateway, method, shouldReply, shouldReturnMessage);
573+
response = receive(gateway, method, !oneWay, shouldReturnMessage);
573574
}
574575
else {
575-
response = sendOrSendAndReceive(invocation, gateway, shouldReturnMessage, shouldReply);
576+
response = sendOrSendAndReceive(invocation, gateway, shouldReturnMessage, !oneWay);
576577
}
577578
return response(gateway.returnType, shouldReturnMessage, response);
578579
}
@@ -640,7 +641,12 @@ private Object sendOrSendAndReceive(MethodInvocation invocation, MethodInvocatio
640641
}
641642
}
642643
else {
643-
gateway.send(args);
644+
if (gateway.isMonoReturn) {
645+
return Mono.fromRunnable(() -> gateway.send(args));
646+
}
647+
else {
648+
gateway.send(args);
649+
}
644650
}
645651
return null;
646652
}

spring-integration-core/src/test/java/org/springframework/integration/gateway/AsyncGatewayTests.java

+97-18
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,24 @@
3030

3131
import org.junit.jupiter.api.Test;
3232

33+
import org.springframework.beans.DirectFieldAccessor;
3334
import org.springframework.beans.factory.BeanFactory;
35+
import org.springframework.integration.MessageDispatchingException;
3436
import org.springframework.integration.annotation.Gateway;
3537
import org.springframework.integration.annotation.GatewayHeader;
3638
import org.springframework.integration.channel.DirectChannel;
39+
import org.springframework.integration.channel.NullChannel;
3740
import org.springframework.integration.channel.QueueChannel;
3841
import org.springframework.messaging.Message;
3942
import org.springframework.messaging.MessageChannel;
4043
import org.springframework.messaging.PollableChannel;
4144
import org.springframework.messaging.support.ChannelInterceptor;
45+
import org.springframework.messaging.support.GenericMessage;
4246
import org.springframework.messaging.support.MessageBuilder;
47+
import org.springframework.util.ReflectionUtils;
4348

4449
import reactor.core.publisher.Mono;
50+
import reactor.test.StepVerifier;
4551

4652
/**
4753
* @author Mark Fisher
@@ -70,7 +76,7 @@ public void futureWithMessageReturned() throws Exception {
7076
}
7177

7278
@Test
73-
public void futureWithError() throws Exception {
79+
public void futureWithError() {
7480
final Error error = new Error("error");
7581
DirectChannel channel = new DirectChannel() {
7682

@@ -140,7 +146,7 @@ public void customFutureReturned() {
140146
}
141147

142148
@Test
143-
public void nonAsyncFutureReturned() throws Exception {
149+
public void nonAsyncFutureReturned() {
144150
QueueChannel requestChannel = new QueueChannel();
145151
addThreadEnricher(requestChannel);
146152
startResponder(requestChannel);
@@ -204,6 +210,60 @@ public void futureWithWildcardReturned() throws Exception {
204210
assertThat(result).isEqualTo("foobar");
205211
}
206212

213+
@Test
214+
public void futureVoid() throws Exception {
215+
GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean(TestEchoService.class);
216+
proxyFactory.setDefaultRequestChannel(new NullChannel());
217+
proxyFactory.setBeanName("testGateway");
218+
proxyFactory.setBeanFactory(mock(BeanFactory.class));
219+
proxyFactory.afterPropertiesSet();
220+
TestEchoService service = (TestEchoService) proxyFactory.getObject();
221+
Future<Void> f = service.asyncSendAndForget("test1");
222+
Object result = f.get(10, TimeUnit.SECONDS);
223+
assertThat(result).isNull();
224+
225+
new DirectFieldAccessor(proxyFactory).setPropertyValue("initialized", false);
226+
proxyFactory.setDefaultRequestChannel((message, timeout) -> {
227+
throw new MessageDispatchingException(message, "intentional dispatcher error");
228+
});
229+
proxyFactory.afterPropertiesSet();
230+
231+
Future<Void> futureError = service.asyncSendAndForget("test2");
232+
assertThatExceptionOfType(ExecutionException.class)
233+
.isThrownBy(() -> futureError.get(10, TimeUnit.SECONDS))
234+
.withCauseInstanceOf(MessageDispatchingException.class)
235+
.withMessageContaining("intentional dispatcher error");
236+
}
237+
238+
@Test
239+
public void futureVoidReply() throws Exception {
240+
QueueChannel requestChannel = new QueueChannel();
241+
CountDownLatch readyForReplyLatch = new CountDownLatch(1);
242+
new Thread(() -> {
243+
try {
244+
Message<?> input = requestChannel.receive();
245+
CompletableFuture<Void> reply = new CompletableFuture<>();
246+
((MessageChannel) input.getHeaders().getReplyChannel()).send(new GenericMessage<>(reply));
247+
readyForReplyLatch.await(10, TimeUnit.SECONDS);
248+
reply.complete(null);
249+
}
250+
catch (InterruptedException ex) {
251+
Thread.currentThread.interrupt();
252+
ReflectionUtils.rethrowRuntimeException(ex);
253+
}
254+
}).start();
255+
GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean(TestEchoService.class);
256+
proxyFactory.setDefaultRequestChannel(requestChannel);
257+
proxyFactory.setBeanName("testGateway");
258+
proxyFactory.setBeanFactory(mock(BeanFactory.class));
259+
proxyFactory.setAsyncExecutor(null);
260+
proxyFactory.afterPropertiesSet();
261+
TestEchoService service = (TestEchoService) proxyFactory.getObject();
262+
Future<Void> f = service.sendAndReceiveFutureVoid("test");
263+
readyForReplyLatch.countDown();
264+
Object result = f.get(10, TimeUnit.SECONDS);
265+
assertThat(result).isNull();
266+
}
207267

208268
@Test
209269
public void monoWithMessageReturned() {
@@ -252,7 +312,7 @@ public void monoWithWildcardReturned() {
252312
}
253313

254314
@Test
255-
public void monoWithConsumer() throws Exception {
315+
public void monoWithConsumer() {
256316
QueueChannel requestChannel = new QueueChannel();
257317
startResponder(requestChannel);
258318
GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean(TestEchoService.class);
@@ -263,16 +323,38 @@ public void monoWithConsumer() throws Exception {
263323
TestEchoService service = (TestEchoService) proxyFactory.getObject();
264324
Mono<String> mono = service.returnStringPromise("foo");
265325

266-
final AtomicReference<String> result = new AtomicReference<>();
267-
final CountDownLatch latch = new CountDownLatch(1);
326+
StepVerifier.create(mono)
327+
.expectNext("foobar")
328+
.verifyComplete();
329+
}
268330

269-
mono.subscribe(s -> {
270-
result.set(s);
271-
latch.countDown();
331+
@Test
332+
public void monoVoid() throws InterruptedException {
333+
GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean(TestEchoService.class);
334+
proxyFactory.setDefaultRequestChannel(new NullChannel());
335+
proxyFactory.setBeanName("testGateway");
336+
proxyFactory.setBeanFactory(mock(BeanFactory.class));
337+
proxyFactory.afterPropertiesSet();
338+
TestEchoService service = (TestEchoService) proxyFactory.getObject();
339+
Mono<Void> mono = service.monoVoid("test1");
340+
341+
CountDownLatch emptyMonoLatch = new CountDownLatch(1);
342+
mono.switchIfEmpty(Mono.empty().doOnSuccess(v -> emptyMonoLatch.countDown()).then()).subscribe();
343+
344+
assertThat(emptyMonoLatch.await(10, TimeUnit.SECONDS)).isTrue();
345+
346+
new DirectFieldAccessor(proxyFactory).setPropertyValue("initialized", false);
347+
proxyFactory.setDefaultRequestChannel((message, timeout) -> {
348+
throw new MessageDispatchingException(message, "intentional dispatcher error");
272349
});
350+
proxyFactory.afterPropertiesSet();
351+
352+
Mono<Void> monoError = service.monoVoid("test2");
273353

274-
latch.await(10, TimeUnit.SECONDS);
275-
assertThat(result.get()).isEqualTo("foobar");
354+
StepVerifier.create(monoError)
355+
.expectSubscription()
356+
.expectError(MessageDispatchingException.class)
357+
.verify(Duration.ofSeconds(10));
276358
}
277359

278360
private static void startResponder(final PollableChannel requestChannel) {
@@ -323,18 +405,15 @@ private interface TestEchoService {
323405

324406
Mono<?> returnSomethingPromise(String s);
325407

326-
}
408+
Future<Void> asyncSendAndForget(String s);
327409

328-
private static class CustomFuture implements Future<String> {
410+
Future<Void> sendAndReceiveFutureVoid(String s);
329411

330-
private final String result;
412+
Mono<Void> monoVoid(String s);
331413

332-
private final Thread thread;
414+
}
333415

334-
private CustomFuture(String result, Thread thread) {
335-
this.result = result;
336-
this.thread = thread;
337-
}
416+
private record CustomFuture(String result, Thread thread) implements Future<String> {
338417

339418
@Override
340419
public boolean cancel(boolean mayInterruptIfRunning) {

src/reference/asciidoc/gateway.adoc

+13-6
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ If you provide a one-way flow, nothing would be sent back to the caller.
460460
If you want to completely suppress exceptions, you can provide a reference to the global `nullChannel` (essentially a `/dev/null` approach).
461461
Finally, as mentioned above, if no `error-channel` is defined, then the exceptions propagate as usual.
462462

463-
When you use the `@MessagingGateway` annotation (see `<<messaging-gateway-annotation>>`), you can use use the `errorChannel` attribute.
463+
When you use the `@MessagingGateway` annotation (see `<<messaging-gateway-annotation>>`), you can use an `errorChannel` attribute.
464464

465465
Starting with version 5.0, when you use a gateway method with a `void` return type (one-way flow), the `error-channel` reference (if provided) is populated in the standard `errorChannel` header of each sent message.
466466
This feature allows a downstream asynchronous flow, based on the standard `ExecutorChannel` configuration (or a `QueueChannel`), to override a default global `errorChannel` exceptions sending behavior.
@@ -469,7 +469,7 @@ The `error-channel` property was ignored for `void` methods with an asynchronous
469469
Instead, error messages were sent to the default `errorChannel`.
470470

471471

472-
IMPORTANT: Exposing the messaging system through simple POJI Gateways provides benefits, but "`hiding`" the reality of the underlying messaging system does come at a price, so there are certain things you should consider.
472+
IMPORTANT: Exposing the messaging system through simple POJI Gateways provides benefits, but "`hiding`" the reality of the underlying messaging system does come at a price, so there are certain things you should consider.
473473
We want our Java method to return as quickly as possible and not hang for an indefinite amount of time while the caller is waiting on it to return (whether void, a return value, or a thrown Exception).
474474
When regular methods are used as a proxies in front of the messaging system, we have to take into account the potentially asynchronous nature of the underlying messaging.
475475
This means that there might be a chance that a message that was initiated by a gateway could be dropped by a filter and never reach a component that is responsible for producing a reply.
@@ -782,10 +782,9 @@ The calling thread continues, with `handleInvoice()` being called when the flow
782782
As mentioned in the <<gateway-asynctaskexecutor>> section above, if you wish some downstream component to return a message with an async payload (`Future`, `Mono`, and others), you must explicitly set the async executor to `null` (or `""` when using XML configuration).
783783
The flow is then invoked on the caller thread and the result can be retrieved later.
784784

785-
===== `void` Return Type
785+
===== Asynchronous `void` Return Type
786786

787-
Unlike the return types mentioned earlier, when the method return type is `void`, the framework cannot implicitly determine that you wish the downstream flow to run asynchronously, with the caller thread returning immediately.
788-
In this case, you must annotate the interface method with `@Async`, as the following example shows:
787+
The messaging gateway method can be declared like this:
789788

790789
====
791790
[source, java]
@@ -801,7 +800,15 @@ public interface MyGateway {
801800
----
802801
====
803802

804-
Unlike the `Future<?>` return types, there is no way to inform the caller if some exception is thrown by the flow, unless some custom `TaskExecutor` (such as an `ErrorHandlingTaskExecutor`) is associated with the `@Async` annotation.
803+
But downstream exceptions are not going to be propagated back to the caller.
804+
To ensure asynchronous behavior for downstream flow invocation and exception propagation to the caller, starting with version 6.0, the framework provides support for the `Future<Void>` and `Mono<Void>` return types.
805+
The use-case is similar to send-and-forget behavior described before for plain `void` return type, but with a difference that flow execution happens asynchronously and returned `Future` (or `Mono`) is complete with a `null` or exceptionally according to the `send` operation result.
806+
807+
NOTE: If the `Future<Void>` is exact downstream flow reply, then an `asyncExecutor` option of the gateway must be set to null (`AnnotationConstants.NULL` for a `@MessagingGateway` configuration) and the `send` part is performed on a producer thread.
808+
The reply one depends on the downstream flow configuration.
809+
This way it is up target application to produce a `Future<Void>` reply correctly.
810+
The `Mono` use-case is already out of the framework threading control, so setting `asyncExecutor` to null won't make sense.
811+
There `Mono<Void>` as a result of the request-reply gateway operation must be configured as a `Mono<?>` return type of the gateway method.
805812

806813
[[gateway-no-response]]
807814
==== Gateway Behavior When No response Arrives

src/reference/asciidoc/whats-new.adoc

+3-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,9 @@ See <<./dsl.adoc#java-dsl,Java DSL>> for more information.
8080
The `org.springframework.util.concurrent.ListenableFuture` has been deprecated starting with Spring Framework `6.0`.
8181
All Spring Integration async API has been migrated to the `CompletableFuture`.
8282

83-
See <<./gateway.adoc#gw-completable-future, CompletableFuture support>> for more information.
83+
Also Messaging Gateway interface method can now return `Future<Void>` and `Mono<Void>` with a proper asynchronous execution of the downstream flow.
84+
85+
See <<./gateway.adoc#async-gateway, Asynchronous Gateway>> for more information.
8486

8587
The `integrationGlobalProperties` bean is now declared by the framework as an instance of `org.springframework.integration.context.IntegrationProperties` instead of the previously deprecated `java.util.Properties`.
8688

0 commit comments

Comments
 (0)