Skip to content

Commit a8d6ba9

Browse files
committed
Add support for Coroutines Flow
Flow is a Kotlin Coroutines related cold asynchronous stream of the data, that emits from zero to N (where N can be unbounded) values and completes normally or with an exception. It is conceptually the Coroutines equivalent of Flux with an extension oriented API design, easy custom operator capabilities and some suspending methods. This commit leverages Flow <-> Flux interoperability to support Flow on controller handler method parameters or return values, and also adds Flow based extensions to WebFlux.fn. It allows to reach a point when we can consider Spring Framework officially supports Coroutines even if some additional work remains to be done like adding interoperability between Reactor and Coroutines contexts. Flow is currently an experimental API that is expected to become final before Spring Framework 5.2 GA. Close gh-19975
1 parent a5e297a commit a8d6ba9

File tree

11 files changed

+210
-3
lines changed

11 files changed

+210
-3
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ ext {
2929
}
3030

3131
aspectjVersion = "1.9.2"
32-
coroutinesVersion = "1.2.0-alpha"
32+
coroutinesVersion = "1.2.0-alpha-2"
3333
freemarkerVersion = "2.3.28"
3434
groovyVersion = "2.5.6"
3535
hsqldbVersion = "2.4.1"

spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
import io.reactivex.Flowable;
2828
import kotlinx.coroutines.CompletableDeferredKt;
2929
import kotlinx.coroutines.Deferred;
30+
import kotlinx.coroutines.flow.FlowKt;
31+
import kotlinx.coroutines.reactive.flow.FlowAsPublisherKt;
32+
import kotlinx.coroutines.reactive.flow.PublisherAsFlowKt;
3033
import org.reactivestreams.Publisher;
3134
import reactor.core.publisher.Flux;
3235
import reactor.core.publisher.Mono;
@@ -97,6 +100,10 @@ public ReactiveAdapterRegistry() {
97100
if (ClassUtils.isPresent("kotlinx.coroutines.Deferred", classLoader)) {
98101
new CoroutinesRegistrar().registerAdapters(this);
99102
}
103+
// TODO Use a single CoroutinesRegistrar when Flow will be not experimental anymore
104+
if (ClassUtils.isPresent("kotlinx.coroutines.flow.Flow", classLoader)) {
105+
new CoroutinesFlowRegistrar().registerAdapters(this);
106+
}
100107
}
101108

102109

@@ -335,7 +342,17 @@ void registerAdapters(ReactiveAdapterRegistry registry) {
335342
source -> CoroutinesUtils.deferredToMono((Deferred<?>) source),
336343
source -> CoroutinesUtils.monoToDeferred(Mono.from(source)));
337344
}
345+
}
338346

347+
private static class CoroutinesFlowRegistrar {
348+
349+
void registerAdapters(ReactiveAdapterRegistry registry) {
350+
registry.registerReactiveType(
351+
ReactiveTypeDescriptor.multiValue(kotlinx.coroutines.flow.Flow.class, FlowKt::emptyFlow),
352+
source -> FlowAsPublisherKt.from((kotlinx.coroutines.flow.Flow<?>) source),
353+
PublisherAsFlowKt::from
354+
);
355+
}
339356
}
340357

341358
}

spring-core/src/test/kotlin/org/springframework/core/KotlinReactiveAdapterRegistryTests.kt

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,21 @@
1717
package org.springframework.core
1818

1919
import kotlinx.coroutines.Deferred
20+
import kotlinx.coroutines.FlowPreview
2021
import kotlinx.coroutines.GlobalScope
2122
import kotlinx.coroutines.async
23+
import kotlinx.coroutines.flow.Flow
24+
import kotlinx.coroutines.flow.flow
25+
import kotlinx.coroutines.flow.toList
2226
import kotlinx.coroutines.runBlocking
2327
import org.junit.Assert.assertEquals
2428
import org.junit.Assert.assertTrue
29+
import org.junit.Assert.fail
2530
import org.junit.Test
2631
import org.reactivestreams.Publisher
32+
import reactor.core.publisher.Flux
2733
import reactor.core.publisher.Mono
34+
import reactor.test.StepVerifier
2835
import java.time.Duration
2936
import kotlin.reflect.KClass
3037

@@ -49,6 +56,36 @@ class KotlinReactiveAdapterRegistryTests {
4956

5057
}
5158

59+
@Test
60+
@FlowPreview
61+
fun flowToPublisher() {
62+
val source = flow {
63+
emit(1)
64+
emit(2)
65+
emit(3)
66+
}
67+
val target: Publisher<Int> = getAdapter(Flow::class).toPublisher(source)
68+
assertTrue("Expected Flux Publisher: " + target.javaClass.name, target is Flux<*>)
69+
StepVerifier.create(target)
70+
.expectNext(1)
71+
.expectNext(2)
72+
.expectNext(3)
73+
.verifyComplete()
74+
}
75+
76+
@Test
77+
@FlowPreview
78+
fun publisherToFlow() {
79+
val source = Flux.just(1, 2, 3)
80+
val target = getAdapter(Flow::class).fromPublisher(source)
81+
if (target is Flow<*>) {
82+
assertEquals(listOf(1, 2, 3), runBlocking { target.toList() })
83+
}
84+
else {
85+
fail()
86+
}
87+
}
88+
5289
private fun getAdapter(reactiveType: KClass<*>): ReactiveAdapter {
5390
return this.registry.getAdapter(reactiveType.java)!!
5491
}

spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/ClientResponseExtensions.kt

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,11 @@
1616

1717
package org.springframework.web.reactive.function.client
1818

19+
import kotlinx.coroutines.FlowPreview
20+
import kotlinx.coroutines.flow.Flow
1921
import kotlinx.coroutines.reactive.awaitFirstOrNull
2022
import kotlinx.coroutines.reactive.awaitSingle
23+
import kotlinx.coroutines.reactive.flow.asFlow
2124
import org.springframework.core.ParameterizedTypeReference
2225
import org.springframework.http.ResponseEntity
2326
import reactor.core.publisher.Flux
@@ -45,6 +48,19 @@ inline fun <reified T : Any> ClientResponse.bodyToMono(): Mono<T> =
4548
inline fun <reified T : Any> ClientResponse.bodyToFlux(): Flux<T> =
4649
bodyToFlux(object : ParameterizedTypeReference<T>() {})
4750

51+
/**
52+
* Coroutines [kotlinx.coroutines.flow.Flow] based variant of [ClientResponse.bodyToFlux].
53+
*
54+
* Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements
55+
* and [org.reactivestreams.Subscription.request] size.
56+
*
57+
* @author Sebastien Deleuze
58+
* @since 5.2
59+
*/
60+
@FlowPreview
61+
inline fun <reified T : Any> ClientResponse.bodyToFlow(batchSize: Int = 1): Flow<T> =
62+
bodyToFlux<T>().asFlow(batchSize)
63+
4864
/**
4965
* Extension for [ClientResponse.toEntity] providing a `toEntity<Foo>()` variant
5066
* leveraging Kotlin reified type parameters. This extension is not subject to type

spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/WebClientExtensions.kt

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,12 @@
1717
package org.springframework.web.reactive.function.client
1818

1919
import kotlinx.coroutines.Dispatchers
20+
import kotlinx.coroutines.FlowPreview
2021
import kotlinx.coroutines.GlobalScope
22+
import kotlinx.coroutines.flow.Flow
2123
import kotlinx.coroutines.reactive.awaitSingle
24+
import kotlinx.coroutines.reactive.flow.asFlow
25+
import kotlinx.coroutines.reactive.flow.asPublisher
2226
import kotlinx.coroutines.reactor.mono
2327
import org.reactivestreams.Publisher
2428
import org.springframework.core.ParameterizedTypeReference
@@ -28,7 +32,7 @@ import reactor.core.publisher.Flux
2832
import reactor.core.publisher.Mono
2933

3034
/**
31-
* Extension for [WebClient.RequestBodySpec.body] providing a `body<Foo>()` variant
35+
* Extension for [WebClient.RequestBodySpec.body] providing a `body(Publisher<T>)` variant
3236
* leveraging Kotlin reified type parameters. This extension is not subject to type
3337
* erasure and retains actual generic type arguments.
3438
*
@@ -39,6 +43,18 @@ import reactor.core.publisher.Mono
3943
inline fun <reified T : Any, S : Publisher<T>> RequestBodySpec.body(publisher: S): RequestHeadersSpec<*> =
4044
body(publisher, object : ParameterizedTypeReference<T>() {})
4145

46+
/**
47+
* Coroutines [Flow] based extension for [WebClient.RequestBodySpec.body] providing a
48+
* body(Flow<T>)` variant leveraging Kotlin reified type parameters. This extension is
49+
* not subject to type erasure and retains actual generic type arguments.
50+
*
51+
* @author Sebastien Deleuze
52+
* @since 5.2
53+
*/
54+
@FlowPreview
55+
inline fun <reified T : Any, S : Flow<T>> RequestBodySpec.body(flow: S): RequestHeadersSpec<*> =
56+
body(flow.asPublisher(), object : ParameterizedTypeReference<T>() {})
57+
4258
/**
4359
* Extension for [WebClient.ResponseSpec.bodyToMono] providing a `bodyToMono<Foo>()` variant
4460
* leveraging Kotlin reified type parameters. This extension is not subject to type
@@ -62,6 +78,20 @@ inline fun <reified T : Any> WebClient.ResponseSpec.bodyToMono(): Mono<T> =
6278
inline fun <reified T : Any> WebClient.ResponseSpec.bodyToFlux(): Flux<T> =
6379
bodyToFlux(object : ParameterizedTypeReference<T>() {})
6480

81+
/**
82+
* Coroutines [kotlinx.coroutines.flow.Flow] based variant of [WebClient.ResponseSpec.bodyToFlux].
83+
*
84+
* Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements
85+
* and [org.reactivestreams.Subscription.request] size.
86+
*
87+
* @author Sebastien Deleuze
88+
* @since 5.2
89+
*/
90+
@FlowPreview
91+
inline fun <reified T : Any> WebClient.ResponseSpec.bodyToFlow(batchSize: Int = 1): Flow<T> =
92+
bodyToFlux<T>().asFlow(batchSize)
93+
94+
6595
/**
6696
* Coroutines variant of [WebClient.RequestHeadersSpec.exchange].
6797
*

spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/server/ServerRequestExtensions.kt

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,11 @@
1616

1717
package org.springframework.web.reactive.function.server
1818

19+
import kotlinx.coroutines.FlowPreview
20+
import kotlinx.coroutines.flow.Flow
1921
import kotlinx.coroutines.reactive.awaitFirstOrNull
2022
import kotlinx.coroutines.reactive.awaitSingle
23+
import kotlinx.coroutines.reactive.flow.asFlow
2124
import org.springframework.core.ParameterizedTypeReference
2225
import org.springframework.http.codec.multipart.Part
2326
import org.springframework.util.MultiValueMap
@@ -48,6 +51,19 @@ inline fun <reified T : Any> ServerRequest.bodyToMono(): Mono<T> =
4851
inline fun <reified T : Any> ServerRequest.bodyToFlux(): Flux<T> =
4952
bodyToFlux(object : ParameterizedTypeReference<T>() {})
5053

54+
/**
55+
* Coroutines [kotlinx.coroutines.flow.Flow] based variant of [ServerRequest.bodyToFlux].
56+
*
57+
* Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements
58+
* and [org.reactivestreams.Subscription.request] size.
59+
*
60+
* @author Sebastien Deleuze
61+
* @since 5.2
62+
*/
63+
@FlowPreview
64+
inline fun <reified T : Any> ServerRequest.bodyToFlow(batchSize: Int = 1): Flow<T> =
65+
bodyToFlux<T>().asFlow(batchSize)
66+
5167
/**
5268
* Non-nullable Coroutines variant of [ServerRequest.bodyToMono].
5369
*

spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/server/ServerResponseExtensions.kt

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616

1717
package org.springframework.web.reactive.function.server
1818

19+
import kotlinx.coroutines.FlowPreview
20+
import kotlinx.coroutines.flow.Flow
1921
import kotlinx.coroutines.reactive.awaitSingle
22+
import kotlinx.coroutines.reactive.flow.asPublisher
2023
import org.reactivestreams.Publisher
2124
import org.springframework.core.ParameterizedTypeReference
2225
import org.springframework.http.MediaType
@@ -74,6 +77,19 @@ fun ServerResponse.BodyBuilder.html() = contentType(MediaType.TEXT_HTML)
7477
suspend fun ServerResponse.HeadersBuilder<out ServerResponse.HeadersBuilder<*>>.buildAndAwait(): ServerResponse =
7578
build().awaitSingle()
7679

80+
81+
/**
82+
* Coroutines [Flow] based extension for [ServerResponse.BodyBuilder.body] providing a
83+
* `body(Flow<T>)` variant. This extension is not subject to type erasure and retains
84+
* actual generic type arguments.
85+
*
86+
* @author Sebastien Deleuze
87+
* @since 5.0
88+
*/
89+
@FlowPreview
90+
suspend inline fun <reified T : Any> ServerResponse.BodyBuilder.bodyAndAwait(flow: Flow<T>): ServerResponse =
91+
body(flow.asPublisher(), object : ParameterizedTypeReference<T>() {}).awaitSingle()
92+
7793
/**
7894
* Coroutines variant of [ServerResponse.BodyBuilder.syncBody].
7995
*
@@ -83,6 +99,18 @@ suspend fun ServerResponse.HeadersBuilder<out ServerResponse.HeadersBuilder<*>>.
8399
suspend fun ServerResponse.BodyBuilder.bodyAndAwait(body: Any): ServerResponse =
84100
syncBody(body).awaitSingle()
85101

102+
/**
103+
* Coroutines [Flow] based extension for [ServerResponse.BodyBuilder.body] providing a
104+
* `bodyToServerSentEvents(Flow<T>)` variant. This extension is not subject to type
105+
* erasure and retains actual generic type arguments.
106+
*
107+
* @author Sebastien Deleuze
108+
* @since 5.0
109+
*/
110+
@FlowPreview
111+
suspend inline fun <reified T : Any> ServerResponse.BodyBuilder.bodyToServerSentEventsAndAwait(flow: Flow<T>): ServerResponse =
112+
contentType(MediaType.TEXT_EVENT_STREAM).body(flow.asPublisher(), object : ParameterizedTypeReference<T>() {}).awaitSingle()
113+
86114

87115
/**
88116
* Coroutines variant of [ServerResponse.BodyBuilder.syncBody] without the sync prefix since it is ok to use it within

spring-webflux/src/test/kotlin/org/springframework/web/reactive/function/client/ClientResponseExtensionsTests.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.springframework.web.reactive.function.client
1919
import io.mockk.every
2020
import io.mockk.mockk
2121
import io.mockk.verify
22+
import kotlinx.coroutines.FlowPreview
2223
import kotlinx.coroutines.runBlocking
2324
import org.junit.Assert.assertEquals
2425
import org.junit.Assert.assertNull
@@ -49,6 +50,13 @@ class ClientResponseExtensionsTests {
4950
verify { response.bodyToFlux(object : ParameterizedTypeReference<List<Foo>>() {}) }
5051
}
5152

53+
@Test
54+
@FlowPreview
55+
fun `bodyToFlow with reified type parameters`() {
56+
response.bodyToFlow<List<Foo>>()
57+
verify { response.bodyToFlux(object : ParameterizedTypeReference<List<Foo>>() {}) }
58+
}
59+
5260
@Test
5361
fun `toEntity with reified type parameters`() {
5462
response.toEntity<List<Foo>>()

spring-webflux/src/test/kotlin/org/springframework/web/reactive/function/client/WebClientExtensionsTests.kt

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.springframework.web.reactive.function.client
1919
import io.mockk.every
2020
import io.mockk.mockk
2121
import io.mockk.verify
22+
import kotlinx.coroutines.FlowPreview
23+
import kotlinx.coroutines.flow.Flow
2224
import kotlinx.coroutines.runBlocking
2325
import org.junit.Assert.assertEquals
2426
import org.junit.Test
@@ -45,6 +47,14 @@ class WebClientExtensionsTests {
4547
verify { requestBodySpec.body(body, object : ParameterizedTypeReference<List<Foo>>() {}) }
4648
}
4749

50+
@Test
51+
@FlowPreview
52+
fun `RequestBodySpec#body with Flow and reified type parameters`() {
53+
val body = mockk<Flow<List<Foo>>>()
54+
requestBodySpec.body(body)
55+
verify { requestBodySpec.body(ofType<Publisher<List<Foo>>>(), object : ParameterizedTypeReference<List<Foo>>() {}) }
56+
}
57+
4858
@Test
4959
fun `ResponseSpec#bodyToMono with reified type parameters`() {
5060
responseSpec.bodyToMono<List<Foo>>()
@@ -57,6 +67,13 @@ class WebClientExtensionsTests {
5767
verify { responseSpec.bodyToFlux(object : ParameterizedTypeReference<List<Foo>>() {}) }
5868
}
5969

70+
@Test
71+
@FlowPreview
72+
fun `bodyToFlow with reified type parameters`() {
73+
responseSpec.bodyToFlow<List<Foo>>()
74+
verify { responseSpec.bodyToFlux(object : ParameterizedTypeReference<List<Foo>>() {}) }
75+
}
76+
6077
@Test
6178
fun awaitExchange() {
6279
val response = mockk<ClientResponse>()

spring-webflux/src/test/kotlin/org/springframework/web/reactive/function/server/ServerRequestExtensionsTests.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.springframework.web.reactive.function.client
1919
import io.mockk.every
2020
import io.mockk.mockk
2121
import io.mockk.verify
22+
import kotlinx.coroutines.FlowPreview
2223
import kotlinx.coroutines.runBlocking
2324
import org.junit.Assert.assertEquals
2425
import org.junit.Assert.assertNull
@@ -52,6 +53,13 @@ class ServerRequestExtensionsTests {
5253
verify { request.bodyToFlux(object : ParameterizedTypeReference<List<Foo>>() {}) }
5354
}
5455

56+
@Test
57+
@FlowPreview
58+
fun `bodyToFlow with reified type parameters`() {
59+
request.bodyToFlow<List<Foo>>()
60+
verify { request.bodyToFlux(object : ParameterizedTypeReference<List<Foo>>() {}) }
61+
}
62+
5563
@Test
5664
fun awaitBody() {
5765
every { request.bodyToMono<String>() } returns Mono.just("foo")

0 commit comments

Comments
 (0)