Skip to content

Commit 03b0bfe

Browse files
committed
* Add kotlinx.coroutines.flow.Flow support
The `Flow` is essentially a multi-value reactive `Publisher`, so use `ReactiveAdapterRegistry` to convert any custom reactive streams result to `Flux` and `Mono` which we already support as reply types * Add docs for `Kotlin Coroutines` Rearrange the doc a bit extracting Kotlin support to individual `kotlin-functions.adoc` file * Fix missed link to `reactive-streams.adoc` from the `index-single.adoc` * Fix unintended Javadocs formatting in the `AbstractMessageProducingHandler`
1 parent a86fc32 commit 03b0bfe

File tree

7 files changed

+67
-40
lines changed

7 files changed

+67
-40
lines changed

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,8 @@ public final void setAsync(boolean async) {
120120

121121
/**
122122
* @return true if this handler supports async replies.
123-
* @see #setAsync(boolean)
124123
* @since 4.3
124+
* @see #setAsync(boolean)
125125
*/
126126
protected boolean isAsync() {
127127
return this.async;

src/reference/asciidoc/functions-support.adoc

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -110,32 +110,3 @@ public IntegrationFlow supplierFlow() {
110110
====
111111

112112
This function support is useful when used together with the https://cloud.spring.io/spring-cloud-function/[Spring Cloud Function] framework, where we have a function catalog and can refer to its member functions from an integration flow definition.
113-
114-
[[kotlin-functions-support]]
115-
==== Kotlin Lambdas
116-
117-
The Framework also has been improved to support Kotlin lambdas for functions, so now you can use a combination of the Kotlin language and Spring Integration flow definitions:
118-
119-
====
120-
[source, java]
121-
----
122-
@Bean
123-
@Transformer(inputChannel = "functionServiceChannel")
124-
fun kotlinFunction(): (String) -> String {
125-
return { it.toUpperCase() }
126-
}
127-
128-
@Bean
129-
@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
130-
fun kotlinConsumer(): (Message<Any>) -> Unit {
131-
return { print(it) }
132-
}
133-
134-
@Bean
135-
@InboundChannelAdapter(value = "counterChannel",
136-
poller = [Poller(fixedRate = "10", maxMessagesPerPoll = "1")])
137-
fun kotlinSupplier(): () -> String {
138-
return { "baz" }
139-
}
140-
----
141-
====

src/reference/asciidoc/index-single.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ include::./kotlin-dsl.adoc[]
3737

3838
include::./system-management.adoc[]
3939

40+
include::./reactive-streams.adoc[]
41+
4042
include::./endpoint-summary.adoc[]
4143

4244
include::./amqp.adoc[]
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
[[kotlin-functions-support]]
2+
=== Kotlin Support
3+
4+
The Framework also has been improved to support Kotlin lambdas for functions, so now you can use a combination of the Kotlin language and Spring Integration flow definitions:
5+
6+
====
7+
[source, kotlin]
8+
----
9+
@Bean
10+
@Transformer(inputChannel = "functionServiceChannel")
11+
fun kotlinFunction(): (String) -> String {
12+
return { it.toUpperCase() }
13+
}
14+
15+
@Bean
16+
@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
17+
fun kotlinConsumer(): (Message<Any>) -> Unit {
18+
return { print(it) }
19+
}
20+
21+
@Bean
22+
@InboundChannelAdapter(value = "counterChannel",
23+
poller = Poller(fixedRate = "10", maxMessagesPerPoll = "1"))
24+
fun kotlinSupplier(): () -> String {
25+
return { "baz" }
26+
}
27+
----
28+
====
29+
30+
[[kotlin-coroutines]]
31+
==== Kotlin Coroutines
32+
33+
Starting with version 6.0, Spring Integration provides support for https://kotlinlang.org/docs/coroutines-guide.html[Kotlin Coroutines].
34+
Now the `suspend` functions and `kotlinx.coroutines.Deferred` & `kotlinx.coroutines.flow.Flow` return types can be used for service methods:
35+
36+
====
37+
[source, kotlin]
38+
----
39+
@ServiceActivator(inputChannel = "suspendServiceChannel", outputChannel = "resultChannel")
40+
suspend fun suspendServiceFunction(payload: String) = payload.uppercase()
41+
42+
@ServiceActivator(inputChannel = "flowServiceChannel", outputChannel = "resultChannel")
43+
fun flowServiceFunction(payload: String) =
44+
flow {
45+
for (i in 1..3) {
46+
emit("$payload #$i")
47+
}
48+
}
49+
----
50+
====
51+
52+
The framework treats them as Reactive Streams interactions and uses `ReactiveAdapterRegistry` to convert to respective `Mono` and `Flux` reactor types.
53+
Such a function reply is processed then in the reply channel, if it is a `ReactiveStreamsSubscribableChannel`, or as a result of `CompletableFuture` in the respective callback.
Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,13 @@
11
[[messaging-endpoints-chapter]]
22
== Messaging Endpoints
33

4-
// BE SURE TO PRECEDE ALL include:: with a blank line - see https://asciidoctor.org/docs/user-manual/#include-partitioning
54
include::./endpoint.adoc[]
6-
75
include::./gateway.adoc[]
8-
96
include::./service-activator.adoc[]
10-
117
include::./delayer.adoc[]
12-
138
include::./scripting.adoc[]
14-
159
include::./groovy.adoc[]
16-
1710
include::./handler-advice.adoc[]
18-
1911
include::./logging-adapter.adoc[]
20-
2112
include::./functions-support.adoc[]
22-
// BE SURE TO PRECEDE ALL include:: with a blank line - see https://asciidoctor.org/docs/user-manual/#include-partitioning
13+
include::./kotlin-functions.adoc[]

src/reference/asciidoc/reactive-streams.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ With a `ReactiveStreamsSubscribableChannel` for the `outputChannel`, there is no
4949

5050
See <<./service-activator.adoc#async-service-activator,Asynchronous Service Activator>> for more information.
5151

52+
Also see <<./kotlin-functions.adoc#kotlin-coroutines,Kotlin Coroutines>> for more information.
53+
5254
=== `FluxMessageChannel` and `ReactiveStreamsConsumer`
5355

5456
The `FluxMessageChannel` is a combined implementation of `MessageChannel` and `Publisher<Message<?>>`.

src/reference/asciidoc/whats-new.adoc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,14 @@ See <<./amqp.adoc#rmq-streams,RabbitMQ Stream Queue Support>> for more informati
5656
The SFTP modules has been fully reworked from outdated JCraft JSch library to more robust and modern `org.apache.sshd:sshd-sftp` module of the Apache MINA project.
5757

5858
See <<./sftp.adoc#sftp,SFTP Adapters>> for more information.
59+
60+
[[x6.0-kotlin-coroutines]]
61+
==== Kotlin Coroutines
62+
63+
Kotlin Coroutines support has been introduced to the framework.
64+
65+
See <<./kotlin-functions.adoc#kotlin-coroutines,Kotlin Coroutines>> for more information.
66+
5967
[[x6.0-general]]
6068
=== General Changes
6169

0 commit comments

Comments
 (0)