Skip to content

Commit 30fb4d5

Browse files
sdeleuzemp911de
authored andcommitted
#91 - Add RowsFetchSpec<T>.flow() extension.
Original pull request: #91.
1 parent 591072f commit 30fb4d5

File tree

2 files changed

+32
-2
lines changed

2 files changed

+32
-2
lines changed

src/main/kotlin/org/springframework/data/r2dbc/function/RowsFetchSpecExtensions.kt

+13-2
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@
1515
*/
1616
package org.springframework.data.r2dbc.function
1717

18+
import kotlinx.coroutines.FlowPreview
19+
import kotlinx.coroutines.flow.Flow
1820
import kotlinx.coroutines.reactive.awaitFirstOrNull
1921
import kotlinx.coroutines.reactive.awaitSingle
22+
import kotlinx.coroutines.reactive.flow.asFlow
2023

2124
/**
2225
* Non-nullable Coroutines variant of [RowsFetchSpec.one].
@@ -50,5 +53,13 @@ suspend fun <T> RowsFetchSpec<T>.awaitFirst(): T =
5053
suspend fun <T> RowsFetchSpec<T>.awaitFirstOrNull(): T? =
5154
first().awaitFirstOrNull()
5255

53-
// TODO Coroutines variant of [RowsFetchSpec.all], depends on [kotlinx.coroutines#254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
54-
// suspend fun <T> RowsFetchSpec<T>.awaitAll() = all()...
56+
/**
57+
* Coroutines [Flow] variant of [RowsFetchSpec.all].
58+
*
59+
* Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements
60+
* and [org.reactivestreams.Subscription.request] size.
61+
*
62+
* @author Sebastien Deleuze
63+
*/
64+
@FlowPreview
65+
fun <T: Any> RowsFetchSpec<T>.flow(batchSize: Int = 1): Flow<T> = all().asFlow(batchSize)

src/test/kotlin/org/springframework/data/r2dbc/function/RowsFetchSpecExtensionsTests.kt

+19
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@ package org.springframework.data.r2dbc.function
1818
import io.mockk.every
1919
import io.mockk.mockk
2020
import io.mockk.verify
21+
import kotlinx.coroutines.FlowPreview
22+
import kotlinx.coroutines.flow.toList
2123
import kotlinx.coroutines.runBlocking
2224
import org.assertj.core.api.Assertions.assertThat
2325
import org.assertj.core.api.Assertions.assertThatExceptionOfType
2426
import org.junit.Test
27+
import reactor.core.publisher.Flux
2528
import reactor.core.publisher.Mono
2629

2730
/**
@@ -150,4 +153,20 @@ class RowsFetchSpecExtensionsTests {
150153
spec.first()
151154
}
152155
}
156+
157+
@Test // gh-91
158+
@FlowPreview
159+
fun allAsFlow() {
160+
161+
val spec = mockk<RowsFetchSpec<String>>()
162+
every { spec.all() } returns Flux.just("foo", "bar", "baz")
163+
164+
runBlocking {
165+
assertThat(spec.flow().toList()).contains("foo", "bar", "baz")
166+
}
167+
168+
verify {
169+
spec.all()
170+
}
171+
}
153172
}

0 commit comments

Comments
 (0)