Skip to content

Commit 32399f6

Browse files
sdeleuzemp911de
authored andcommitted
DATAMONGO-2255 - Add Coroutines Flow extensions.
Original pull request: #736.
1 parent 0f3c90b commit 32399f6

10 files changed

+275
-0
lines changed

spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveAggregationOperationExtensions.kt

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
*/
1616
package org.springframework.data.mongodb.core
1717

18+
import kotlinx.coroutines.FlowPreview
19+
import kotlinx.coroutines.flow.Flow
20+
import kotlinx.coroutines.reactive.flow.asFlow
1821
import kotlin.reflect.KClass
1922

2023
/**
@@ -35,3 +38,16 @@ fun <T : Any> ReactiveAggregationOperation.aggregateAndReturn(entityClass: KClas
3538
*/
3639
inline fun <reified T : Any> ReactiveAggregationOperation.aggregateAndReturn(): ReactiveAggregationOperation.ReactiveAggregation<T> =
3740
aggregateAndReturn(T::class.java)
41+
42+
/**
43+
* Coroutines [Flow] variant of [ReactiveAggregationOperation.TerminatingAggregationOperation.all].
44+
*
45+
* Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements
46+
* and [org.reactivestreams.Subscription.request] size.
47+
*
48+
* @author Sebastien Deleuze
49+
* @since 2.2
50+
*/
51+
@FlowPreview
52+
fun <T : Any> ReactiveAggregationOperation.TerminatingAggregationOperation<T>.allAsFlow(batchSize: Int = 1): Flow<T> =
53+
all().asFlow(batchSize)

spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveFindOperationExtensions.kt

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,12 @@
1515
*/
1616
package org.springframework.data.mongodb.core
1717

18+
import kotlinx.coroutines.FlowPreview
19+
import kotlinx.coroutines.flow.Flow
1820
import kotlinx.coroutines.reactive.awaitFirstOrNull
1921
import kotlinx.coroutines.reactive.awaitSingle
22+
import kotlinx.coroutines.reactive.flow.asFlow
23+
import org.springframework.data.geo.GeoResult
2024
import kotlin.reflect.KClass
2125

2226
/**
@@ -129,3 +133,52 @@ suspend fun <T : Any> ReactiveFindOperation.TerminatingFind<T>.awaitCount(): Lon
129133
*/
130134
suspend fun <T : Any> ReactiveFindOperation.TerminatingFind<T>.awaitExists(): Boolean =
131135
exists().awaitSingle()
136+
137+
/**
138+
* Coroutines [Flow] variant of [ReactiveFindOperation.TerminatingFind.all].
139+
*
140+
* Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements
141+
* and [org.reactivestreams.Subscription.request] size.
142+
*
143+
* @author Sebastien Deleuze
144+
*/
145+
@FlowPreview
146+
fun <T : Any> ReactiveFindOperation.TerminatingFind<T>.allAsFlow(batchSize: Int = 1): Flow<T> =
147+
all().asFlow(batchSize)
148+
149+
/**
150+
* Coroutines [Flow] variant of [ReactiveFindOperation.TerminatingFind.tail].
151+
*
152+
* Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements
153+
* and [org.reactivestreams.Subscription.request] size.
154+
*
155+
* @author Sebastien Deleuze
156+
*/
157+
@FlowPreview
158+
fun <T : Any> ReactiveFindOperation.TerminatingFind<T>.tailAsFlow(batchSize: Int = 1): Flow<T> =
159+
tail().asFlow(batchSize)
160+
161+
/**
162+
* Coroutines [Flow] variant of [ReactiveFindOperation.TerminatingFindNear.all].
163+
*
164+
* Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements
165+
* and [org.reactivestreams.Subscription.request] size.
166+
*
167+
* @author Sebastien Deleuze
168+
*/
169+
@FlowPreview
170+
fun <T : Any> ReactiveFindOperation.TerminatingFindNear<T>.allAsFlow(batchSize: Int = 1): Flow<GeoResult<T>> =
171+
all().asFlow(batchSize)
172+
173+
/**
174+
* Coroutines [Flow] variant of [ReactiveFindOperation.TerminatingDistinct.all].
175+
*
176+
* Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements
177+
* and [org.reactivestreams.Subscription.request] size.
178+
*
179+
* @author Sebastien Deleuze
180+
* @since 2.2
181+
*/
182+
@FlowPreview
183+
fun <T : Any> ReactiveFindOperation.TerminatingDistinct<T>.allAsFlow(batchSize: Int = 1): Flow<T> =
184+
all().asFlow(batchSize)

spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveInsertOperationExtensions.kt

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
*/
1616
package org.springframework.data.mongodb.core
1717

18+
import kotlinx.coroutines.FlowPreview
19+
import kotlinx.coroutines.flow.Flow
1820
import kotlinx.coroutines.reactive.awaitSingle
21+
import kotlinx.coroutines.reactive.flow.asFlow
1922
import kotlin.reflect.KClass
2023

2124
/**
@@ -45,3 +48,17 @@ inline fun <reified T : Any> ReactiveInsertOperation.insert(): ReactiveInsertOpe
4548
*/
4649
suspend inline fun <reified T: Any> ReactiveInsertOperation.TerminatingInsert<T>.oneAndAwait(o: T): T =
4750
one(o).awaitSingle()
51+
52+
53+
/**
54+
* Coroutines [Flow] variant of [ReactiveInsertOperation.TerminatingInsert.all].
55+
*
56+
* Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements
57+
* and [org.reactivestreams.Subscription.request] size.
58+
*
59+
* @author Sebastien Deleuze
60+
* @since 2.2
61+
*/
62+
@FlowPreview
63+
fun <T : Any> ReactiveInsertOperation.TerminatingInsert<T>.allAsFlow(objects: Collection<T>, batchSize: Int = 1): Flow<T> =
64+
all(objects).asFlow(batchSize)

spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveMapReduceOperationExtensions.kt

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
*/
1616
package org.springframework.data.mongodb.core
1717

18+
import kotlinx.coroutines.FlowPreview
19+
import kotlinx.coroutines.flow.Flow
20+
import kotlinx.coroutines.reactive.flow.asFlow
1821
import kotlin.reflect.KClass
1922

2023
/**
@@ -54,3 +57,17 @@ fun <T : Any> ReactiveMapReduceOperation.MapReduceWithProjection<*>.asType(resul
5457
*/
5558
inline fun <reified T : Any> ReactiveMapReduceOperation.MapReduceWithProjection<*>.asType(): ReactiveMapReduceOperation.MapReduceWithQuery<T> =
5659
`as`(T::class.java)
60+
61+
62+
/**
63+
* Coroutines [Flow] variant of [ReactiveMapReduceOperation.TerminatingMapReduce.all].
64+
*
65+
* Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements
66+
* and [org.reactivestreams.Subscription.request] size.
67+
*
68+
* @author Sebastien Deleuze
69+
* @since 2.2
70+
*/
71+
@FlowPreview
72+
fun <T : Any> ReactiveMapReduceOperation.TerminatingMapReduce<T>.allAsFlow(batchSize: Int = 1): Flow<T> =
73+
all().asFlow(batchSize)

spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveRemoveOperationExtensions.kt

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616
package org.springframework.data.mongodb.core
1717

1818
import com.mongodb.client.result.DeleteResult
19+
import kotlinx.coroutines.FlowPreview
20+
import kotlinx.coroutines.flow.Flow
1921
import kotlinx.coroutines.reactive.awaitSingle
22+
import kotlinx.coroutines.reactive.flow.asFlow
2023
import kotlin.reflect.KClass
2124

2225
/**
@@ -46,3 +49,17 @@ inline fun <reified T : Any> ReactiveRemoveOperation.remove(): ReactiveRemoveOpe
4649
*/
4750
suspend fun <T : Any> ReactiveRemoveOperation.TerminatingRemove<T>.allAndAwait(): DeleteResult =
4851
all().awaitSingle()
52+
53+
54+
/**
55+
* Coroutines [Flow] variant of [ReactiveRemoveOperation.TerminatingRemove.findAndRemove].
56+
*
57+
* Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements
58+
* and [org.reactivestreams.Subscription.request] size.
59+
*
60+
* @author Sebastien Deleuze
61+
* @since 2.2
62+
*/
63+
@FlowPreview
64+
fun <T : Any> ReactiveRemoveOperation.TerminatingRemove<T>.findAndRemoveAsFlow(batchSize: Int = 1): Flow<T> =
65+
findAndRemove().asFlow(batchSize)

spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveAggregationOperationExtensionsTests.kt

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,15 @@
1616
package org.springframework.data.mongodb.core
1717

1818
import example.first.First
19+
import io.mockk.every
1920
import io.mockk.mockk
2021
import io.mockk.verify
22+
import kotlinx.coroutines.FlowPreview
23+
import kotlinx.coroutines.flow.toList
24+
import kotlinx.coroutines.runBlocking
25+
import org.assertj.core.api.Assertions
2126
import org.junit.Test
27+
import reactor.core.publisher.Flux
2228

2329
/**
2430
* @author Mark Paluch
@@ -42,4 +48,20 @@ class ReactiveAggregationOperationExtensionsTests {
4248
verify { operation.aggregateAndReturn(First::class.java) }
4349
}
4450

51+
@Test
52+
@FlowPreview
53+
fun terminatingAggregationOperationAllAsFlow() {
54+
55+
val spec = mockk<ReactiveAggregationOperation.TerminatingAggregationOperation<String>>()
56+
every { spec.all() } returns Flux.just("foo", "bar", "baz")
57+
58+
runBlocking {
59+
Assertions.assertThat(spec.allAsFlow().toList()).contains("foo", "bar", "baz")
60+
}
61+
62+
verify {
63+
spec.all()
64+
}
65+
}
66+
4567
}

spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveFindOperationExtensionsTests.kt

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,15 @@ import example.first.First
1919
import io.mockk.every
2020
import io.mockk.mockk
2121
import io.mockk.verify
22+
import kotlinx.coroutines.FlowPreview
23+
import kotlinx.coroutines.flow.toList
2224
import kotlinx.coroutines.runBlocking
2325
import org.assertj.core.api.Assertions.assertThat
2426
import org.assertj.core.api.Assertions.assertThatExceptionOfType
2527
import org.junit.Test
28+
import org.springframework.data.geo.Distance
29+
import org.springframework.data.geo.GeoResult
30+
import reactor.core.publisher.Flux
2631
import reactor.core.publisher.Mono
2732

2833
/**
@@ -228,4 +233,71 @@ class ReactiveFindOperationExtensionsTests {
228233
find.exists()
229234
}
230235
}
236+
237+
@Test
238+
@FlowPreview
239+
fun terminatingFindAllAsFlow() {
240+
241+
val spec = mockk<ReactiveFindOperation.TerminatingFind<String>>()
242+
every { spec.all() } returns Flux.just("foo", "bar", "baz")
243+
244+
runBlocking {
245+
assertThat(spec.allAsFlow().toList()).contains("foo", "bar", "baz")
246+
}
247+
248+
verify {
249+
spec.all()
250+
}
251+
}
252+
253+
@Test
254+
@FlowPreview
255+
fun terminatingFindTailAsFlow() {
256+
257+
val spec = mockk<ReactiveFindOperation.TerminatingFind<String>>()
258+
every { spec.tail() } returns Flux.just("foo", "bar", "baz")
259+
260+
runBlocking {
261+
assertThat(spec.tailAsFlow().toList()).contains("foo", "bar", "baz")
262+
}
263+
264+
verify {
265+
spec.tail()
266+
}
267+
}
268+
269+
@Test
270+
@FlowPreview
271+
fun terminatingFindNearAllAsFlow() {
272+
273+
val spec = mockk<ReactiveFindOperation.TerminatingFindNear<String>>()
274+
val foo = GeoResult("foo", Distance(0.0))
275+
val bar = GeoResult("bar", Distance(0.0))
276+
val baz = GeoResult("baz", Distance(0.0))
277+
every { spec.all() } returns Flux.just(foo, bar, baz)
278+
279+
runBlocking {
280+
assertThat(spec.allAsFlow().toList()).contains(foo, bar, baz)
281+
}
282+
283+
verify {
284+
spec.all()
285+
}
286+
}
287+
288+
@Test
289+
@FlowPreview
290+
fun terminatingDistinctAllAsFlow() {
291+
292+
val spec = mockk<ReactiveFindOperation.TerminatingDistinct<String>>()
293+
every { spec.all() } returns Flux.just("foo", "bar", "baz")
294+
295+
runBlocking {
296+
assertThat(spec.allAsFlow().toList()).contains("foo", "bar", "baz")
297+
}
298+
299+
verify {
300+
spec.all()
301+
}
302+
}
231303
}

spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveInsertOperationExtensionsTests.kt

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@ import example.first.First
1919
import io.mockk.every
2020
import io.mockk.mockk
2121
import io.mockk.verify
22+
import kotlinx.coroutines.FlowPreview
23+
import kotlinx.coroutines.flow.toList
2224
import kotlinx.coroutines.runBlocking
2325
import org.assertj.core.api.Assertions.assertThat
2426
import org.junit.Test
27+
import reactor.core.publisher.Flux
2528
import reactor.core.publisher.Mono
2629

2730
/**
@@ -60,4 +63,21 @@ class ReactiveInsertOperationExtensionsTests {
6063
find.one("foo")
6164
}
6265
}
66+
67+
@Test
68+
@FlowPreview
69+
fun terminatingInsertAllAsFlow() {
70+
71+
val insert = mockk<ReactiveInsertOperation.TerminatingInsert<String>>()
72+
val list = listOf("foo", "bar")
73+
every { insert.all(any()) } returns Flux.fromIterable(list)
74+
75+
runBlocking {
76+
assertThat(insert.allAsFlow(list).toList()).containsAll(list)
77+
}
78+
79+
verify {
80+
insert.all(list)
81+
}
82+
}
6383
}

spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveMapReduceOperationExtensionsTests.kt

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,15 @@
1616
package org.springframework.data.mongodb.core
1717

1818
import example.first.First
19+
import io.mockk.every
1920
import io.mockk.mockk
2021
import io.mockk.verify
22+
import kotlinx.coroutines.FlowPreview
23+
import kotlinx.coroutines.flow.toList
24+
import kotlinx.coroutines.runBlocking
25+
import org.assertj.core.api.Assertions
2126
import org.junit.Test
27+
import reactor.core.publisher.Flux
2228

2329
/**
2430
* @author Christoph Strobl
@@ -57,4 +63,20 @@ class ReactiveMapReduceOperationExtensionsTests {
5763
operationWithProjection.asType<User>()
5864
verify { operationWithProjection.`as`(User::class.java) }
5965
}
66+
67+
@Test
68+
@FlowPreview
69+
fun terminatingMapReduceAllAsFlow() {
70+
71+
val spec = mockk<ReactiveMapReduceOperation.TerminatingMapReduce<String>>()
72+
every { spec.all() } returns Flux.just("foo", "bar", "baz")
73+
74+
runBlocking {
75+
Assertions.assertThat(spec.allAsFlow().toList()).contains("foo", "bar", "baz")
76+
}
77+
78+
verify {
79+
spec.all()
80+
}
81+
}
6082
}

0 commit comments

Comments
 (0)