Skip to content

Commit dd532e2

Browse files
committed
* Verify sync and async Flow processing
* Mention default sync behavior in the docs
1 parent ba3adc3 commit dd532e2

File tree

3 files changed

+40
-3
lines changed

3 files changed

+40
-3
lines changed

spring-integration-core/src/main/java/org/springframework/integration/util/CoroutinesUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public static boolean isContinuationType(Class<?> candidate) {
6666
return KOTLIN_CONTINUATION_CLASS != null && KOTLIN_CONTINUATION_CLASS.isAssignableFrom(candidate);
6767
}
6868

69+
@Nullable
6970
@SuppressWarnings("unchecked")
7071
public static <T> T monoAwaitSingleOrNull(Mono<? extends T> source, Object continuation) {
7172
Assert.notNull(KOTLIN_CONTINUATION_CLASS, "Kotlin Coroutines library is not present in classpath");

spring-integration-core/src/test/kotlin/org/springframework/integration/function/FunctionsTests.kt

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package org.springframework.integration.function
1818

1919
import assertk.assertThat
2020
import assertk.assertions.*
21-
import kotlinx.coroutines.flow.flow
21+
import kotlinx.coroutines.flow.*
2222
import kotlinx.coroutines.runBlocking
2323
import org.junit.jupiter.api.Test
2424
import org.springframework.beans.factory.annotation.Autowired
@@ -190,6 +190,34 @@ class FunctionsTests {
190190
stepVerifier.verify(Duration.ofSeconds(10))
191191
}
192192

193+
@Autowired
194+
private lateinit var syncFlowServiceChannel: MessageChannel
195+
196+
@Test
197+
fun `verify sync flow function reply`() {
198+
val replyChannel = QueueChannel()
199+
val testPayload = "test flow"
200+
201+
syncFlowServiceChannel.send(
202+
MessageBuilder.withPayload(testPayload)
203+
.setReplyChannel(replyChannel)
204+
.build()
205+
)
206+
207+
val receive = replyChannel.receive(10_000)
208+
209+
val payload = receive?.payload
210+
211+
assertThat(payload)
212+
.isNotNull()
213+
.isInstanceOf(Flow::class)
214+
215+
runBlocking {
216+
val strings = (payload as Flow<String>).toList()
217+
assertThat(strings).containsExactly("Sync $testPayload #1", "Sync $testPayload #2", "Sync $testPayload #3")
218+
}
219+
}
220+
193221
@Autowired
194222
private lateinit var suspendRequestChannel: DirectChannel
195223

@@ -256,14 +284,19 @@ class FunctionsTests {
256284
@ServiceActivator(inputChannel = "suspendServiceChannel")
257285
suspend fun suspendServiceFunction(payload: String) = payload.uppercase()
258286

259-
@ServiceActivator(inputChannel = "flowServiceChannel")
287+
@ServiceActivator(inputChannel = "flowServiceChannel", async = "true")
260288
fun flowServiceFunction(payload: String) =
261289
flow {
262290
for (i in 1..3) {
263291
emit("$payload #$i")
264292
}
265293
}
266294

295+
@ServiceActivator(inputChannel = "syncFlowServiceChannel")
296+
fun syncFlowServiceFunction(payload: String) =
297+
(1..3).asFlow()
298+
.map { "Sync $payload #$it" }
299+
267300
@Bean
268301
fun suspendRequestChannel() = DirectChannel()
269302

src/reference/asciidoc/kotlin-functions.adoc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ Now the `suspend` functions and `kotlinx.coroutines.Deferred` & `kotlinx.corouti
3939
@ServiceActivator(inputChannel = "suspendServiceChannel", outputChannel = "resultChannel")
4040
suspend fun suspendServiceFunction(payload: String) = payload.uppercase()
4141
42-
@ServiceActivator(inputChannel = "flowServiceChannel", outputChannel = "resultChannel")
42+
@ServiceActivator(inputChannel = "flowServiceChannel", outputChannel = "resultChannel", async = "true")
4343
fun flowServiceFunction(payload: String) =
4444
flow {
4545
for (i in 1..3) {
@@ -52,6 +52,9 @@ fun flowServiceFunction(payload: String) =
5252
The framework treats them as Reactive Streams interactions and uses `ReactiveAdapterRegistry` to convert to respective `Mono` and `Flux` reactor types.
5353
Such a function reply is processed then in the reply channel, if it is a `ReactiveStreamsSubscribableChannel`, or as a result of `CompletableFuture` in the respective callback.
5454

55+
NOTE: The functions with `Flow` result are not `async` by default on the `@ServiceActivator`, so `Flow` instance is produced as a reply message payload.
56+
It is already target application to process this object as a coroutine or convert it to `Flux`, respectively.
57+
5558
The `@MessagingGateway` interface methods also can be marked with a `suspend` modifier when declared in Kotlin.
5659
The framework utilizes a `Mono` logic internally to perform request-reply for downstream flow.
5760
Such a `Mono` result is processed by the `MonoKt.awaitSingleOrNull()` API internally to fulfil a `kotlin.coroutines.Continuation` argument fo the called `suspend` function of the gateway:

0 commit comments

Comments
 (0)