Skip to content

Commit 9eac6dc

Browse files
committed
* Fix KafkaDslKotlinTests according new Kotlin expectations
* Document `suspend fun` for `@MessagingGateway`
1 parent fd653b4 commit 9eac6dc

File tree

5 files changed

+61
-16
lines changed

5 files changed

+61
-16
lines changed

spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayMethodInboundMessageMapper.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -38,6 +38,7 @@
3838
import org.springframework.expression.EvaluationContext;
3939
import org.springframework.expression.Expression;
4040
import org.springframework.expression.spel.standard.SpelExpressionParser;
41+
import org.springframework.expression.spel.support.StandardEvaluationContext;
4142
import org.springframework.integration.expression.ExpressionUtils;
4243
import org.springframework.integration.mapping.InboundMessageMapper;
4344
import org.springframework.integration.mapping.MessageMappingException;
@@ -216,6 +217,14 @@ private Map<String, Object> evaluateHeaders(EvaluationContext methodInvocationEv
216217
return evaluatedHeaders;
217218
}
218219

220+
// TODO Remove in the future release. The MethodArgsHolder as a root object covers this use-case.
221+
private StandardEvaluationContext createMethodInvocationEvaluationContext(Object[] arguments) {
222+
StandardEvaluationContext context = ExpressionUtils.createStandardEvaluationContext(this.beanFactory);
223+
context.setVariable("args", arguments);
224+
context.setVariable("gatewayMethod", this.method);
225+
return context;
226+
}
227+
219228
@Nullable
220229
private Object evaluatePayloadExpression(String expressionString, Object argumentValue) {
221230
Expression expression =
@@ -286,24 +295,23 @@ public class DefaultMethodArgsMessageMapper implements MethodArgsMessageMapper {
286295
public Message<?> toMessage(MethodArgsHolder holder, @Nullable Map<String, Object> headersToMap) {
287296
Object messageOrPayload = null;
288297
boolean foundPayloadAnnotation = false;
289-
EvaluationContext methodInvocationEvaluationContext =
290-
ExpressionUtils.createStandardEvaluationContext(GatewayMethodInboundMessageMapper.this.beanFactory);
298+
Object[] arguments = holder.getArgs();
299+
EvaluationContext methodInvocationEvaluationContext = createMethodInvocationEvaluationContext(arguments);
300+
Map<String, Object> headersToPopulate =
301+
headersToMap != null
302+
? new HashMap<>(headersToMap)
303+
: new HashMap<>();
291304
if (GatewayMethodInboundMessageMapper.this.payloadExpression != null) {
292305
messageOrPayload =
293306
GatewayMethodInboundMessageMapper.this.payloadExpression.getValue(
294307
methodInvocationEvaluationContext, holder);
295308
}
296-
Map<String, Object> headersToPopulate =
297-
headersToMap != null
298-
? new HashMap<>(headersToMap)
299-
: new HashMap<>();
300-
Object[] arguments = holder.getArgs();
301309
for (int i = 0; i < GatewayMethodInboundMessageMapper.this.parameterList.size(); i++) {
302310
Object argumentValue = arguments[i];
303-
if (CoroutinesUtils.isContinuation(argumentValue)) {
311+
MethodParameter methodParameter = GatewayMethodInboundMessageMapper.this.parameterList.get(i);
312+
if (CoroutinesUtils.isContinuationType(methodParameter.getParameterType())) {
304313
continue;
305314
}
306-
MethodParameter methodParameter = GatewayMethodInboundMessageMapper.this.parameterList.get(i);
307315
Annotation annotation =
308316
MessagingAnnotationUtils.findMessagePartAnnotation(methodParameter.getParameterAnnotations(),
309317
false);

spring-integration-core/src/main/java/org/springframework/integration/util/CoroutinesUtils.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.util;
1818

1919
import org.springframework.core.KotlinDetector;
20+
import org.springframework.lang.Nullable;
2021
import org.springframework.util.Assert;
2122
import org.springframework.util.ClassUtils;
2223

@@ -36,6 +37,7 @@ public final class CoroutinesUtils {
3637
/**
3738
* The {@link kotlin.coroutines.Continuation} class object.
3839
*/
40+
@Nullable
3941
public static final Class<?> KOTLIN_CONTINUATION_CLASS;
4042

4143
static {
@@ -56,16 +58,17 @@ public final class CoroutinesUtils {
5658
}
5759
}
5860

59-
public static boolean isContinuationType(Class<?> candidate) {
60-
return KOTLIN_CONTINUATION_CLASS != null && KOTLIN_CONTINUATION_CLASS.isAssignableFrom(candidate);
61+
public static boolean isContinuation(Object candidate) {
62+
return isContinuationType(candidate.getClass());
6163
}
6264

63-
public static boolean isContinuation(Object candidate) {
64-
return KOTLIN_CONTINUATION_CLASS != null && KOTLIN_CONTINUATION_CLASS.isAssignableFrom(candidate.getClass());
65+
public static boolean isContinuationType(Class<?> candidate) {
66+
return KOTLIN_CONTINUATION_CLASS != null && KOTLIN_CONTINUATION_CLASS.isAssignableFrom(candidate);
6567
}
6668

6769
@SuppressWarnings("unchecked")
68-
public static <T> T monoAwaitSingleOrNull(Mono<T> source, Object continuation) {
70+
public static <T> T monoAwaitSingleOrNull(Mono<? extends T> source, Object continuation) {
71+
Assert.notNull(KOTLIN_CONTINUATION_CLASS, "Kotlin Coroutines library is not present in classpath");
6972
Assert.isAssignable(KOTLIN_CONTINUATION_CLASS, continuation.getClass());
7073
return (T) kotlinx.coroutines.reactor.MonoKt.awaitSingleOrNull(
7174
source, (kotlin.coroutines.Continuation<T>) continuation);

spring-integration-kafka/src/test/kotlin/org/springframework/integration/kafka/dsl/kotlin/KafkaDslKotlinTests.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ class KafkaDslKotlinTests {
292292

293293
private fun kafkaMessageHandler(producerFactory: ProducerFactory<Int, String>, topic: String) =
294294
Kafka.outboundChannelAdapter(producerFactory)
295-
.messageKey { it.headers[IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER] }
295+
.messageKey<Any> { it.headers[IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER] }
296296
.headerMapper(mapper())
297297
.sync(true)
298298
.partitionId<Any> { 0 }

src/reference/asciidoc/gateway.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -777,6 +777,8 @@ mono.subscribe(invoice -> handleInvoice(invoice));
777777

778778
The calling thread continues, with `handleInvoice()` being called when the flow completes.
779779

780+
Also see <<./kotlin-functions.adoc#kotlin-coroutines,Kotlin Coroutines>> for more information.
781+
780782
===== Downstream Flows Returning an Asynchronous Type
781783

782784
As mentioned in the <<gateway-asynctaskexecutor>> section above, if you wish some downstream component to return a message with an async payload (`Future`, `Mono`, and others), you must explicitly set the async executor to `null` (or `""` when using XML configuration).

src/reference/asciidoc/kotlin-functions.adoc

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,35 @@ fun flowServiceFunction(payload: String) =
5151

5252
The framework treats them as Reactive Streams interactions and uses `ReactiveAdapterRegistry` to convert to respective `Mono` and `Flux` reactor types.
5353
Such a function reply is processed then in the reply channel, if it is a `ReactiveStreamsSubscribableChannel`, or as a result of `CompletableFuture` in the respective callback.
54+
55+
The `@MessagingGateway` interface methods also can be marked with a `suspend` modifier when declared in Kotlin.
56+
The framework utilizes a `Mono` logic internally to perform request-reply for downstream flow.
57+
Such a `Mono` result is processed by the `MonoKt.awaitSingleOrNull()` API internally to fulfil a `kotlin.coroutines.Continuation` argument fo the called `suspend` function of the gateway:
58+
59+
====
60+
[source, kotlin]
61+
----
62+
@MessagingGateway(defaultRequestChannel = "suspendRequestChannel")
63+
interface SuspendFunGateway {
64+
65+
suspend fun suspendGateway(payload: String): String
66+
67+
}
68+
----
69+
====
70+
71+
This method has to be called as a coroutine according to Kotlin language requirements:
72+
73+
====
74+
[source, kotlin]
75+
----
76+
@Autowired
77+
private lateinit var suspendFunGateway: SuspendFunGateway
78+
79+
fun someServiceMethod() {
80+
runBlocking {
81+
val reply = suspendFunGateway.suspendGateway("test suspend gateway")
82+
}
83+
}
84+
----
85+
====

0 commit comments

Comments
 (0)