Skip to content

Commit c368896

Browse files
sdeleuzemp911de
authored andcommitted
DATACASS-648 - Add a Flow extension to ReactiveSelectOperation.
Original pull request: #159.
1 parent b36ef04 commit c368896

File tree

2 files changed

+35
-0
lines changed

2 files changed

+35
-0
lines changed

spring-data-cassandra/src/main/kotlin/org/springframework/data/cassandra/core/ReactiveSelectOperationExtensions.kt

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@
1515
*/
1616
package org.springframework.data.cassandra.core
1717

18+
import kotlinx.coroutines.FlowPreview
19+
import kotlinx.coroutines.flow.Flow
1820
import kotlinx.coroutines.reactive.awaitFirstOrNull
1921
import kotlinx.coroutines.reactive.awaitSingle
22+
import kotlinx.coroutines.reactive.flow.asFlow
2023
import kotlin.reflect.KClass
2124

2225
/**
@@ -107,3 +110,16 @@ suspend fun <T : Any> ReactiveSelectOperation.TerminatingSelect<T>.awaitCount():
107110
*/
108111
suspend fun <T : Any> ReactiveSelectOperation.TerminatingSelect<T>.awaitExists(): Boolean =
109112
exists().awaitSingle()
113+
114+
/**
115+
* Coroutines [Flow] variant of [ReactiveSelectOperation.TerminatingSelect.all].
116+
*
117+
* Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements
118+
* and [org.reactivestreams.Subscription.request] size.
119+
*
120+
* @author Sebastien Deleuze
121+
* @since 2.2
122+
*/
123+
@FlowPreview
124+
fun <T : Any> ReactiveSelectOperation.TerminatingSelect<T>.allAsFlow(batchSize: Int = 1): Flow<T> =
125+
all().asFlow(batchSize)

spring-data-cassandra/src/test/kotlin/org/springframework/data/cassandra/core/ReactiveSelectOperationExtensionsUnitTests.kt

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@ package org.springframework.data.cassandra.core
1818
import io.mockk.every
1919
import io.mockk.mockk
2020
import io.mockk.verify
21+
import kotlinx.coroutines.FlowPreview
22+
import kotlinx.coroutines.flow.toList
2123
import kotlinx.coroutines.runBlocking
2224
import org.assertj.core.api.Assertions
2325
import org.junit.Test
2426
import org.springframework.data.cassandra.domain.Person
2527
import org.springframework.data.cassandra.domain.User
28+
import reactor.core.publisher.Flux
2629
import reactor.core.publisher.Mono
2730

2831
/**
@@ -213,4 +216,20 @@ class ReactiveSelectOperationExtensionsUnitTests {
213216
find.exists()
214217
}
215218
}
219+
220+
@Test
221+
@FlowPreview
222+
fun terminatingFindAllAsFlow() {
223+
224+
val spec = mockk<ReactiveSelectOperation.TerminatingSelect<String>>()
225+
every { spec.all() } returns Flux.just("foo", "bar", "baz")
226+
227+
runBlocking {
228+
Assertions.assertThat(spec.allAsFlow().toList()).contains("foo", "bar", "baz")
229+
}
230+
231+
verify {
232+
spec.all()
233+
}
234+
}
216235
}

0 commit comments

Comments
 (0)