diff --git a/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveAggregationOperationExtensions.kt b/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveAggregationOperationExtensions.kt index 24b74e4eb8..f23f9ee2d4 100644 --- a/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveAggregationOperationExtensions.kt +++ b/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveAggregationOperationExtensions.kt @@ -15,6 +15,9 @@ */ package org.springframework.data.mongodb.core +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.reactive.flow.asFlow import kotlin.reflect.KClass /** @@ -35,3 +38,16 @@ fun ReactiveAggregationOperation.aggregateAndReturn(entityClass: KClas */ inline fun ReactiveAggregationOperation.aggregateAndReturn(): ReactiveAggregationOperation.ReactiveAggregation = aggregateAndReturn(T::class.java) + +/** + * Coroutines [Flow] variant of [ReactiveAggregationOperation.TerminatingAggregationOperation.all]. + * + * Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements + * and [org.reactivestreams.Subscription.request] size. + * + * @author Sebastien Deleuze + * @since 2.2 + */ +@FlowPreview +fun ReactiveAggregationOperation.TerminatingAggregationOperation.allAsFlow(batchSize: Int = 1): Flow = + all().asFlow(batchSize) diff --git a/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveFindOperationExtensions.kt b/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveFindOperationExtensions.kt index 0a70ae675e..4c3cfdc2b1 100644 --- a/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveFindOperationExtensions.kt +++ b/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveFindOperationExtensions.kt @@ -15,8 +15,12 @@ */ package org.springframework.data.mongodb.core +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.reactive.awaitFirstOrNull import kotlinx.coroutines.reactive.awaitSingle +import kotlinx.coroutines.reactive.flow.asFlow +import org.springframework.data.geo.GeoResult import kotlin.reflect.KClass /** @@ -129,3 +133,52 @@ suspend fun ReactiveFindOperation.TerminatingFind.awaitCount(): Lon */ suspend fun ReactiveFindOperation.TerminatingFind.awaitExists(): Boolean = exists().awaitSingle() + +/** + * Coroutines [Flow] variant of [ReactiveFindOperation.TerminatingFind.all]. + * + * Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements + * and [org.reactivestreams.Subscription.request] size. + * + * @author Sebastien Deleuze + */ +@FlowPreview +fun ReactiveFindOperation.TerminatingFind.allAsFlow(batchSize: Int = 1): Flow = + all().asFlow(batchSize) + +/** + * Coroutines [Flow] variant of [ReactiveFindOperation.TerminatingFind.tail]. + * + * Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements + * and [org.reactivestreams.Subscription.request] size. + * + * @author Sebastien Deleuze + */ +@FlowPreview +fun ReactiveFindOperation.TerminatingFind.tailAsFlow(batchSize: Int = 1): Flow = + tail().asFlow(batchSize) + +/** + * Coroutines [Flow] variant of [ReactiveFindOperation.TerminatingFindNear.all]. + * + * Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements + * and [org.reactivestreams.Subscription.request] size. + * + * @author Sebastien Deleuze + */ +@FlowPreview +fun ReactiveFindOperation.TerminatingFindNear.allAsFlow(batchSize: Int = 1): Flow> = + all().asFlow(batchSize) + +/** + * Coroutines [Flow] variant of [ReactiveFindOperation.TerminatingDistinct.all]. + * + * Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements + * and [org.reactivestreams.Subscription.request] size. + * + * @author Sebastien Deleuze + * @since 2.2 + */ +@FlowPreview +fun ReactiveFindOperation.TerminatingDistinct.allAsFlow(batchSize: Int = 1): Flow = + all().asFlow(batchSize) diff --git a/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveInsertOperationExtensions.kt b/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveInsertOperationExtensions.kt index dd877bcf42..6324b84014 100644 --- a/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveInsertOperationExtensions.kt +++ b/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveInsertOperationExtensions.kt @@ -15,7 +15,10 @@ */ package org.springframework.data.mongodb.core +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.reactive.awaitSingle +import kotlinx.coroutines.reactive.flow.asFlow import kotlin.reflect.KClass /** @@ -45,3 +48,17 @@ inline fun ReactiveInsertOperation.insert(): ReactiveInsertOpe */ suspend inline fun ReactiveInsertOperation.TerminatingInsert.oneAndAwait(o: T): T = one(o).awaitSingle() + + +/** + * Coroutines [Flow] variant of [ReactiveInsertOperation.TerminatingInsert.all]. + * + * Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements + * and [org.reactivestreams.Subscription.request] size. + * + * @author Sebastien Deleuze + * @since 2.2 + */ +@FlowPreview +fun ReactiveInsertOperation.TerminatingInsert.allAsFlow(objects: Collection, batchSize: Int = 1): Flow = + all(objects).asFlow(batchSize) diff --git a/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveMapReduceOperationExtensions.kt b/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveMapReduceOperationExtensions.kt index 156ef23771..16ca003074 100644 --- a/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveMapReduceOperationExtensions.kt +++ b/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveMapReduceOperationExtensions.kt @@ -15,6 +15,9 @@ */ package org.springframework.data.mongodb.core +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.reactive.flow.asFlow import kotlin.reflect.KClass /** @@ -54,3 +57,17 @@ fun ReactiveMapReduceOperation.MapReduceWithProjection<*>.asType(resul */ inline fun ReactiveMapReduceOperation.MapReduceWithProjection<*>.asType(): ReactiveMapReduceOperation.MapReduceWithQuery = `as`(T::class.java) + + +/** + * Coroutines [Flow] variant of [ReactiveMapReduceOperation.TerminatingMapReduce.all]. + * + * Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements + * and [org.reactivestreams.Subscription.request] size. + * + * @author Sebastien Deleuze + * @since 2.2 + */ +@FlowPreview +fun ReactiveMapReduceOperation.TerminatingMapReduce.allAsFlow(batchSize: Int = 1): Flow = + all().asFlow(batchSize) diff --git a/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveRemoveOperationExtensions.kt b/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveRemoveOperationExtensions.kt index 94c9e510ab..262a07296e 100644 --- a/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveRemoveOperationExtensions.kt +++ b/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveRemoveOperationExtensions.kt @@ -16,7 +16,10 @@ package org.springframework.data.mongodb.core import com.mongodb.client.result.DeleteResult +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.reactive.awaitSingle +import kotlinx.coroutines.reactive.flow.asFlow import kotlin.reflect.KClass /** @@ -46,3 +49,17 @@ inline fun ReactiveRemoveOperation.remove(): ReactiveRemoveOpe */ suspend fun ReactiveRemoveOperation.TerminatingRemove.allAndAwait(): DeleteResult = all().awaitSingle() + + +/** + * Coroutines [Flow] variant of [ReactiveRemoveOperation.TerminatingRemove.findAndRemove]. + * + * Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements + * and [org.reactivestreams.Subscription.request] size. + * + * @author Sebastien Deleuze + * @since 2.2 + */ +@FlowPreview +fun ReactiveRemoveOperation.TerminatingRemove.findAndRemoveAsFlow(batchSize: Int = 1): Flow = + findAndRemove().asFlow(batchSize) diff --git a/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveAggregationOperationExtensionsTests.kt b/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveAggregationOperationExtensionsTests.kt index d04749d37e..2c59ffcc2d 100644 --- a/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveAggregationOperationExtensionsTests.kt +++ b/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveAggregationOperationExtensionsTests.kt @@ -16,9 +16,15 @@ package org.springframework.data.mongodb.core import example.first.First +import io.mockk.every import io.mockk.mockk import io.mockk.verify +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.runBlocking +import org.assertj.core.api.Assertions import org.junit.Test +import reactor.core.publisher.Flux /** * @author Mark Paluch @@ -42,4 +48,20 @@ class ReactiveAggregationOperationExtensionsTests { verify { operation.aggregateAndReturn(First::class.java) } } + @Test + @FlowPreview + fun terminatingAggregationOperationAllAsFlow() { + + val spec = mockk>() + every { spec.all() } returns Flux.just("foo", "bar", "baz") + + runBlocking { + Assertions.assertThat(spec.allAsFlow().toList()).contains("foo", "bar", "baz") + } + + verify { + spec.all() + } + } + } diff --git a/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveFindOperationExtensionsTests.kt b/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveFindOperationExtensionsTests.kt index 0eb1b006ad..a2848a087b 100644 --- a/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveFindOperationExtensionsTests.kt +++ b/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveFindOperationExtensionsTests.kt @@ -19,10 +19,15 @@ import example.first.First import io.mockk.every import io.mockk.mockk import io.mockk.verify +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.toList import kotlinx.coroutines.runBlocking import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.junit.Test +import org.springframework.data.geo.Distance +import org.springframework.data.geo.GeoResult +import reactor.core.publisher.Flux import reactor.core.publisher.Mono /** @@ -228,4 +233,71 @@ class ReactiveFindOperationExtensionsTests { find.exists() } } + + @Test + @FlowPreview + fun terminatingFindAllAsFlow() { + + val spec = mockk>() + every { spec.all() } returns Flux.just("foo", "bar", "baz") + + runBlocking { + assertThat(spec.allAsFlow().toList()).contains("foo", "bar", "baz") + } + + verify { + spec.all() + } + } + + @Test + @FlowPreview + fun terminatingFindTailAsFlow() { + + val spec = mockk>() + every { spec.tail() } returns Flux.just("foo", "bar", "baz") + + runBlocking { + assertThat(spec.tailAsFlow().toList()).contains("foo", "bar", "baz") + } + + verify { + spec.tail() + } + } + + @Test + @FlowPreview + fun terminatingFindNearAllAsFlow() { + + val spec = mockk>() + val foo = GeoResult("foo", Distance(0.0)) + val bar = GeoResult("bar", Distance(0.0)) + val baz = GeoResult("baz", Distance(0.0)) + every { spec.all() } returns Flux.just(foo, bar, baz) + + runBlocking { + assertThat(spec.allAsFlow().toList()).contains(foo, bar, baz) + } + + verify { + spec.all() + } + } + + @Test + @FlowPreview + fun terminatingDistinctAllAsFlow() { + + val spec = mockk>() + every { spec.all() } returns Flux.just("foo", "bar", "baz") + + runBlocking { + assertThat(spec.allAsFlow().toList()).contains("foo", "bar", "baz") + } + + verify { + spec.all() + } + } } diff --git a/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveInsertOperationExtensionsTests.kt b/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveInsertOperationExtensionsTests.kt index 2e1258ba76..227e4c92a7 100644 --- a/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveInsertOperationExtensionsTests.kt +++ b/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveInsertOperationExtensionsTests.kt @@ -19,9 +19,12 @@ import example.first.First import io.mockk.every import io.mockk.mockk import io.mockk.verify +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.toList import kotlinx.coroutines.runBlocking import org.assertj.core.api.Assertions.assertThat import org.junit.Test +import reactor.core.publisher.Flux import reactor.core.publisher.Mono /** @@ -47,17 +50,34 @@ class ReactiveInsertOperationExtensionsTests { } @Test // DATAMONGO-2209 - fun terminatingFindAwaitOne() { + fun terminatingInsertOneAndAwait() { - val find = mockk>() - every { find.one("foo") } returns Mono.just("foo") + val insert = mockk>() + every { insert.one("foo") } returns Mono.just("foo") runBlocking { - assertThat(find.oneAndAwait("foo")).isEqualTo("foo") + assertThat(insert.oneAndAwait("foo")).isEqualTo("foo") } verify { - find.one("foo") + insert.one("foo") + } + } + + @Test + @FlowPreview + fun terminatingInsertAllAsFlow() { + + val insert = mockk>() + val list = listOf("foo", "bar") + every { insert.all(any()) } returns Flux.fromIterable(list) + + runBlocking { + assertThat(insert.allAsFlow(list).toList()).containsAll(list) + } + + verify { + insert.all(list) } } } diff --git a/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveMapReduceOperationExtensionsTests.kt b/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveMapReduceOperationExtensionsTests.kt index d4e1f4294c..e63cc675b6 100644 --- a/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveMapReduceOperationExtensionsTests.kt +++ b/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveMapReduceOperationExtensionsTests.kt @@ -16,9 +16,15 @@ package org.springframework.data.mongodb.core import example.first.First +import io.mockk.every import io.mockk.mockk import io.mockk.verify +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.runBlocking +import org.assertj.core.api.Assertions import org.junit.Test +import reactor.core.publisher.Flux /** * @author Christoph Strobl @@ -57,4 +63,20 @@ class ReactiveMapReduceOperationExtensionsTests { operationWithProjection.asType() verify { operationWithProjection.`as`(User::class.java) } } + + @Test + @FlowPreview + fun terminatingMapReduceAllAsFlow() { + + val spec = mockk>() + every { spec.all() } returns Flux.just("foo", "bar", "baz") + + runBlocking { + Assertions.assertThat(spec.allAsFlow().toList()).contains("foo", "bar", "baz") + } + + verify { + spec.all() + } + } } diff --git a/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveRemoveOperationExtensionsTests.kt b/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveRemoveOperationExtensionsTests.kt index ac8866b102..4e4ecf2f55 100644 --- a/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveRemoveOperationExtensionsTests.kt +++ b/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveRemoveOperationExtensionsTests.kt @@ -20,9 +20,12 @@ import example.first.First import io.mockk.every import io.mockk.mockk import io.mockk.verify +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.toList import kotlinx.coroutines.runBlocking import org.assertj.core.api.Assertions.assertThat import org.junit.Test +import reactor.core.publisher.Flux import reactor.core.publisher.Mono /** @@ -62,4 +65,20 @@ class ReactiveRemoveOperationExtensionsTests { remove.all() } } + + @Test + @FlowPreview + fun terminatingRemoveFindAndRemoveAsFlow() { + + val spec = mockk>() + every { spec.findAndRemove() } returns Flux.just("foo", "bar", "baz") + + runBlocking { + assertThat(spec.findAndRemoveAsFlow().toList()).contains("foo", "bar", "baz") + } + + verify { + spec.findAndRemove() + } + } }