Skip to content

Commit 50c04a1

Browse files
committed
spring-projectsGH-3635: Add Future<Void> & Mono<Void> to gateway
Fixes spring-projects#3635 When `Future<Void>` & `Mono<Void>` is used as a messaging gateway return type, the application hangs out on this barrier which may lead to the out of memory eventually * Add support for the `Future<Void>` & `Mono<Void>` messaging gateway return type and ensure an asynchronous call for the `gateway.send(Message)` operation and its exception handling. In case of successful call, the `Future` is fulfilled with `null` and `Mono` is completed as empty
1 parent 6ce61ed commit 50c04a1

File tree

4 files changed

+81
-28
lines changed

4 files changed

+81
-28
lines changed

spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -564,7 +564,7 @@ private Object invokeGatewayMethod(MethodInvocation invocation, boolean runningO
564564
}
565565
boolean shouldReturnMessage =
566566
Message.class.isAssignableFrom(gateway.returnType) || (!runningOnCallerThread && gateway.expectMessage);
567-
boolean shouldReply = gateway.returnType != void.class;
567+
boolean shouldReply = !gateway.isVoidReturn;
568568
int paramCount = method.getParameterTypes().length;
569569
Object response;
570570
boolean hasPayloadExpression = findPayloadExpression(method);
@@ -640,7 +640,12 @@ private Object sendOrSendAndReceive(MethodInvocation invocation, MethodInvocatio
640640
}
641641
}
642642
else {
643-
gateway.send(args);
643+
if (gateway.isMonoReturn) {
644+
return Mono.fromRunnable(() -> gateway.send(args));
645+
}
646+
else {
647+
gateway.send(args);
648+
}
644649
}
645650
return null;
646651
}

spring-integration-core/src/test/java/org/springframework/integration/gateway/AsyncGatewayTests.java

Lines changed: 64 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,13 @@
3030

3131
import org.junit.jupiter.api.Test;
3232

33+
import org.springframework.beans.DirectFieldAccessor;
3334
import org.springframework.beans.factory.BeanFactory;
35+
import org.springframework.integration.MessageDispatchingException;
3436
import org.springframework.integration.annotation.Gateway;
3537
import org.springframework.integration.annotation.GatewayHeader;
3638
import org.springframework.integration.channel.DirectChannel;
39+
import org.springframework.integration.channel.NullChannel;
3740
import org.springframework.integration.channel.QueueChannel;
3841
import org.springframework.messaging.Message;
3942
import org.springframework.messaging.MessageChannel;
@@ -42,6 +45,7 @@
4245
import org.springframework.messaging.support.MessageBuilder;
4346

4447
import reactor.core.publisher.Mono;
48+
import reactor.test.StepVerifier;
4549

4650
/**
4751
* @author Mark Fisher
@@ -70,7 +74,7 @@ public void futureWithMessageReturned() throws Exception {
7074
}
7175

7276
@Test
73-
public void futureWithError() throws Exception {
77+
public void futureWithError() {
7478
final Error error = new Error("error");
7579
DirectChannel channel = new DirectChannel() {
7680

@@ -140,7 +144,7 @@ public void customFutureReturned() {
140144
}
141145

142146
@Test
143-
public void nonAsyncFutureReturned() throws Exception {
147+
public void nonAsyncFutureReturned() {
144148
QueueChannel requestChannel = new QueueChannel();
145149
addThreadEnricher(requestChannel);
146150
startResponder(requestChannel);
@@ -204,6 +208,30 @@ public void futureWithWildcardReturned() throws Exception {
204208
assertThat(result).isEqualTo("foobar");
205209
}
206210

211+
@Test
212+
public void futureVoid() throws Exception {
213+
GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean(TestEchoService.class);
214+
proxyFactory.setDefaultRequestChannel(new NullChannel());
215+
proxyFactory.setBeanName("testGateway");
216+
proxyFactory.setBeanFactory(mock(BeanFactory.class));
217+
proxyFactory.afterPropertiesSet();
218+
TestEchoService service = (TestEchoService) proxyFactory.getObject();
219+
Future<Void> f = service.asyncSendAndForget("test1");
220+
Object result = f.get(10, TimeUnit.SECONDS);
221+
assertThat(result).isNull();
222+
223+
new DirectFieldAccessor(proxyFactory).setPropertyValue("initialized", false);
224+
proxyFactory.setDefaultRequestChannel((message, timeout) -> {
225+
throw new MessageDispatchingException(message, "intentional dispatcher error");
226+
});
227+
proxyFactory.afterPropertiesSet();
228+
229+
Future<Void> futureError = service.asyncSendAndForget("test2");
230+
assertThatExceptionOfType(ExecutionException.class)
231+
.isThrownBy(() -> futureError.get(10, TimeUnit.SECONDS))
232+
.withCauseInstanceOf(MessageDispatchingException.class)
233+
.withMessageContaining("intentional dispatcher error");
234+
}
207235

208236
@Test
209237
public void monoWithMessageReturned() {
@@ -252,7 +280,7 @@ public void monoWithWildcardReturned() {
252280
}
253281

254282
@Test
255-
public void monoWithConsumer() throws Exception {
283+
public void monoWithConsumer() {
256284
QueueChannel requestChannel = new QueueChannel();
257285
startResponder(requestChannel);
258286
GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean(TestEchoService.class);
@@ -263,16 +291,38 @@ public void monoWithConsumer() throws Exception {
263291
TestEchoService service = (TestEchoService) proxyFactory.getObject();
264292
Mono<String> mono = service.returnStringPromise("foo");
265293

266-
final AtomicReference<String> result = new AtomicReference<>();
267-
final CountDownLatch latch = new CountDownLatch(1);
294+
StepVerifier.create(mono)
295+
.expectNext("foobar")
296+
.verifyComplete();
297+
}
268298

269-
mono.subscribe(s -> {
270-
result.set(s);
271-
latch.countDown();
299+
@Test
300+
public void monoVoid() throws InterruptedException {
301+
GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean(TestEchoService.class);
302+
proxyFactory.setDefaultRequestChannel(new NullChannel());
303+
proxyFactory.setBeanName("testGateway");
304+
proxyFactory.setBeanFactory(mock(BeanFactory.class));
305+
proxyFactory.afterPropertiesSet();
306+
TestEchoService service = (TestEchoService) proxyFactory.getObject();
307+
Mono<Void> mono = service.monoVoid("test1");
308+
309+
CountDownLatch emptyMonoLatch = new CountDownLatch(1);
310+
mono.switchIfEmpty(Mono.empty().doOnSuccess(v -> emptyMonoLatch.countDown()).then()).subscribe();
311+
312+
assertThat(emptyMonoLatch.await(10, TimeUnit.SECONDS)).isTrue();
313+
314+
new DirectFieldAccessor(proxyFactory).setPropertyValue("initialized", false);
315+
proxyFactory.setDefaultRequestChannel((message, timeout) -> {
316+
throw new MessageDispatchingException(message, "intentional dispatcher error");
272317
});
318+
proxyFactory.afterPropertiesSet();
273319

274-
latch.await(10, TimeUnit.SECONDS);
275-
assertThat(result.get()).isEqualTo("foobar");
320+
Mono<Void> monoError = service.monoVoid("test2");
321+
322+
StepVerifier.create(monoError)
323+
.expectSubscription()
324+
.expectError(MessageDispatchingException.class)
325+
.verify(Duration.ofSeconds(10));
276326
}
277327

278328
private static void startResponder(final PollableChannel requestChannel) {
@@ -323,18 +373,13 @@ private interface TestEchoService {
323373

324374
Mono<?> returnSomethingPromise(String s);
325375

326-
}
376+
Future<Void> asyncSendAndForget(String s);
327377

328-
private static class CustomFuture implements Future<String> {
378+
Mono<Void> monoVoid(String s);
329379

330-
private final String result;
331-
332-
private final Thread thread;
380+
}
333381

334-
private CustomFuture(String result, Thread thread) {
335-
this.result = result;
336-
this.thread = thread;
337-
}
382+
private record CustomFuture(String result, Thread thread) implements Future<String> {
338383

339384
@Override
340385
public boolean cancel(boolean mayInterruptIfRunning) {

src/reference/asciidoc/gateway.adoc

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ If you provide a one-way flow, nothing would be sent back to the caller.
460460
If you want to completely suppress exceptions, you can provide a reference to the global `nullChannel` (essentially a `/dev/null` approach).
461461
Finally, as mentioned above, if no `error-channel` is defined, then the exceptions propagate as usual.
462462

463-
When you use the `@MessagingGateway` annotation (see `<<messaging-gateway-annotation>>`), you can use use the `errorChannel` attribute.
463+
When you use the `@MessagingGateway` annotation (see `<<messaging-gateway-annotation>>`), you can use an `errorChannel` attribute.
464464

465465
Starting with version 5.0, when you use a gateway method with a `void` return type (one-way flow), the `error-channel` reference (if provided) is populated in the standard `errorChannel` header of each sent message.
466466
This feature allows a downstream asynchronous flow, based on the standard `ExecutorChannel` configuration (or a `QueueChannel`), to override a default global `errorChannel` exceptions sending behavior.
@@ -469,7 +469,7 @@ The `error-channel` property was ignored for `void` methods with an asynchronous
469469
Instead, error messages were sent to the default `errorChannel`.
470470

471471

472-
IMPORTANT: Exposing the messaging system through simple POJI Gateways provides benefits, but "`hiding`" the reality of the underlying messaging system does come at a price, so there are certain things you should consider.
472+
IMPORTANT: Exposing the messaging system through simple POJI Gateways provides benefits, but "`hiding`" the reality of the underlying messaging system does come at a price, so there are certain things you should consider.
473473
We want our Java method to return as quickly as possible and not hang for an indefinite amount of time while the caller is waiting on it to return (whether void, a return value, or a thrown Exception).
474474
When regular methods are used as a proxies in front of the messaging system, we have to take into account the potentially asynchronous nature of the underlying messaging.
475475
This means that there might be a chance that a message that was initiated by a gateway could be dropped by a filter and never reach a component that is responsible for producing a reply.
@@ -782,10 +782,9 @@ The calling thread continues, with `handleInvoice()` being called when the flow
782782
As mentioned in the <<gateway-asynctaskexecutor>> section above, if you wish some downstream component to return a message with an async payload (`Future`, `Mono`, and others), you must explicitly set the async executor to `null` (or `""` when using XML configuration).
783783
The flow is then invoked on the caller thread and the result can be retrieved later.
784784

785-
===== `void` Return Type
785+
===== Asynchronous `void` Return Type
786786

787-
Unlike the return types mentioned earlier, when the method return type is `void`, the framework cannot implicitly determine that you wish the downstream flow to run asynchronously, with the caller thread returning immediately.
788-
In this case, you must annotate the interface method with `@Async`, as the following example shows:
787+
The messaging gateway method can be declared like this:
789788

790789
====
791790
[source, java]
@@ -801,7 +800,9 @@ public interface MyGateway {
801800
----
802801
====
803802

804-
Unlike the `Future<?>` return types, there is no way to inform the caller if some exception is thrown by the flow, unless some custom `TaskExecutor` (such as an `ErrorHandlingTaskExecutor`) is associated with the `@Async` annotation.
803+
But downstream exceptions are not going to be propagated back to the caller.
804+
To ensure asynchronous behaviour for downstream flow invocation and exception propagation to the caller, starting with version 6.0, the framework provides support for the `Future<Void>` and `Mono<Void>` return types.
805+
The use-case is similar to send-and-forget behavior described before for plain `void` return type, but with a difference that flow execution happens asynchronously and returned `Future` (or `Mono`) is complete with a `null` or exceptionally according to the `send` operation result.
805806

806807
[[gateway-no-response]]
807808
==== Gateway Behavior When No response Arrives

src/reference/asciidoc/whats-new.adoc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,9 @@ See <<./dsl.adoc#java-dsl,Java DSL>> for more information.
8080
The `org.springframework.util.concurrent.ListenableFuture` has been deprecated starting with Spring Framework `6.0`.
8181
All Spring Integration async API has been migrated to the `CompletableFuture`.
8282

83-
See <<./gateway.adoc#gw-completable-future, CompletableFuture support>> for more information.
83+
Also Messaging Gateway interface method can now return `Future<Void>` and `Mono<Void>` with a proper asynchronous execution of the downstream flow.
84+
85+
See <<./gateway.adoc#async-gateway, Asynchronous Gateway>> for more information.
8486

8587
The `integrationGlobalProperties` bean is now declared by the framework as an instance of `org.springframework.integration.context.IntegrationProperties` instead of the previously deprecated `java.util.Properties`.
8688

0 commit comments

Comments
 (0)