Skip to content

Commit bc1af1f

Browse files
committed
Flow.transformWhile operator
Also, all flow-truncating operators are refactored via a common internal collectWhile operator that properly uses AbortFlowException and checks for its ownership, so that we don't have to look for bugs in interactions between all those operators (and zip, too, which is also flow-truncating). Fixes #2065
1 parent ad542c4 commit bc1af1f

File tree

5 files changed

+123
-49
lines changed

5 files changed

+123
-49
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+1
Original file line numberDiff line numberDiff line change
@@ -995,6 +995,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
995995
public static synthetic fun toSet$default (Lkotlinx/coroutines/flow/Flow;Ljava/util/Set;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
996996
public static final fun transform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
997997
public static final fun transformLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
998+
public static final fun transformWhile (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
998999
public static final fun unsafeTransform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
9991000
public static final fun withIndex (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
10001001
public static final fun zip (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;

kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt

+6-4
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ package kotlinx.coroutines.flow
1111
import kotlinx.coroutines.*
1212
import kotlinx.coroutines.flow.internal.*
1313
import kotlin.jvm.*
14+
import kotlinx.coroutines.flow.flow as safeFlow
1415

1516
// ------------------ WARNING ------------------
1617
// These emitting operators must use safe flow builder, because they allow
@@ -19,10 +20,11 @@ import kotlin.jvm.*
1920
/**
2021
* Applies [transform] function to each value of the given flow.
2122
*
22-
* The receiver of the [transform] is [FlowCollector] and thus `transform` is a
23-
* generic function that may transform emitted element, skip it or emit it multiple times.
23+
* The receiver of the `transform` is [FlowCollector] and thus `transform` is a
24+
* flexible function that may transform emitted element, skip it or emit it multiple times.
2425
*
25-
* This operator can be used as a building block for other operators, for example:
26+
* This operator generalizes [filter] and [map] operators and
27+
* can be used as a building block for other operators, for example:
2628
*
2729
* ```
2830
* fun Flow<Int>.skipOddAndDuplicateEven(): Flow<Int> = transform { value ->
@@ -35,7 +37,7 @@ import kotlin.jvm.*
3537
*/
3638
public inline fun <T, R> Flow<T>.transform(
3739
@BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
38-
): Flow<R> = flow { // Note: safe flow is used here, because collector is exposed to transform on each operation
40+
): Flow<R> = safeFlow { // Note: safe flow is used here, because collector is exposed to transform on each operation
3941
collect { value ->
4042
// kludge, without it Unit will be returned and TCE won't kick in, KT-28938
4143
return@collect transform(value)

kotlinx-coroutines-core/common/src/flow/operators/Limit.kt

+57-20
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@
77

88
package kotlinx.coroutines.flow
99

10+
import kotlinx.coroutines.*
1011
import kotlinx.coroutines.flow.internal.*
1112
import kotlin.jvm.*
13+
import kotlinx.coroutines.flow.flow as safeFlow
1214
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
1315

1416
/**
@@ -49,35 +51,70 @@ public fun <T> Flow<T>.take(count: Int): Flow<T> {
4951
require(count > 0) { "Requested element count $count should be positive" }
5052
return flow {
5153
var consumed = 0
52-
try {
53-
collect { value ->
54-
if (++consumed < count) {
55-
return@collect emit(value)
56-
} else {
57-
return@collect emitAbort(value)
58-
}
59-
}
60-
} catch (e: AbortFlowException) {
61-
e.checkOwnership(owner = this)
54+
collectWhile { value ->
55+
emit(value)
56+
++consumed < count
6257
}
6358
}
6459
}
6560

66-
private suspend fun <T> FlowCollector<T>.emitAbort(value: T) {
67-
emit(value)
68-
throw AbortFlowException(this)
69-
}
70-
7161
/**
7262
* Returns a flow that contains first elements satisfying the given [predicate].
63+
*
64+
* Note, that the resulting flow does not contain the element on which the [predicate] returned `true`.
65+
* See [transformWhile] for a more flexible operator.
7366
*/
7467
public fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T> = flow {
75-
try {
76-
collect { value ->
77-
if (predicate(value)) emit(value)
78-
else throw AbortFlowException(this)
68+
collectWhile { value ->
69+
if (predicate(value)) {
70+
emit(value)
71+
true
72+
} else {
73+
false
7974
}
75+
}
76+
}
77+
78+
/**
79+
* Applies [transform] function to each value of the given flow while this
80+
* function returns `true`.
81+
*
82+
* The receiver of the `transformWhile` is [FlowCollector] and thus `transformWhile` is a
83+
* flexible function that may transform emitted element, skip it or emit it multiple times.
84+
*
85+
* This operator generalizes [takeWhile] and can be used as a building block for other operators.
86+
* For example, a flow of download progress messages can be completed when the
87+
* download is done but emit this last message (unlike `takeWhile`):
88+
*
89+
* ```
90+
* fun Flow<DownloadProgress>.completeWhenDone(): Flow<DownloadProgress> =
91+
* transformWhile { progress ->
92+
* emit(progress) // always emit progress
93+
* !progress.isDone() // continue while download is not done
94+
* }
95+
* }
96+
* ```
97+
*/
98+
@ExperimentalCoroutinesApi
99+
public fun <T, R> Flow<T>.transformWhile(
100+
@BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Boolean
101+
): Flow<R> =
102+
safeFlow { // Note: safe flow is used here, because collector is exposed to transform on each operation
103+
collectWhile { value ->
104+
transform(value)
105+
}
106+
}
107+
108+
// Internal building block for all flow-truncating operators
109+
internal suspend inline fun <T> Flow<T>.collectWhile(crossinline predicate: suspend (value: T) -> Boolean) {
110+
val collector = object : FlowCollector<T> {
111+
override suspend fun emit(value: T) {
112+
if (!predicate(value)) throw AbortFlowException(this)
113+
}
114+
}
115+
try {
116+
collect(collector)
80117
} catch (e: AbortFlowException) {
81-
e.checkOwnership(owner = this)
118+
e.checkOwnership(collector)
82119
}
83120
}

kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt

+10-25
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,9 @@ public suspend fun <T: Any> Flow<T>.singleOrNull(): T? {
8282
*/
8383
public suspend fun <T> Flow<T>.first(): T {
8484
var result: Any? = NULL
85-
collectUntil {
85+
collectWhile {
8686
result = it
87-
true
87+
false
8888
}
8989
if (result === NULL) throw NoSuchElementException("Expected at least one element")
9090
return result as T
@@ -96,12 +96,12 @@ public suspend fun <T> Flow<T>.first(): T {
9696
*/
9797
public suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T {
9898
var result: Any? = NULL
99-
collectUntil {
99+
collectWhile {
100100
if (predicate(it)) {
101101
result = it
102-
true
103-
} else {
104102
false
103+
} else {
104+
true
105105
}
106106
}
107107
if (result === NULL) throw NoSuchElementException("Expected at least one element matching the predicate $predicate")
@@ -114,9 +114,9 @@ public suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T {
114114
*/
115115
public suspend fun <T : Any> Flow<T>.firstOrNull(): T? {
116116
var result: T? = null
117-
collectUntil {
117+
collectWhile {
118118
result = it
119-
true
119+
false
120120
}
121121
return result
122122
}
@@ -127,28 +127,13 @@ public suspend fun <T : Any> Flow<T>.firstOrNull(): T? {
127127
*/
128128
public suspend fun <T : Any> Flow<T>.firstOrNull(predicate: suspend (T) -> Boolean): T? {
129129
var result: T? = null
130-
collectUntil {
130+
collectWhile {
131131
if (predicate(it)) {
132132
result = it
133-
true
134-
} else {
135133
false
134+
} else {
135+
true
136136
}
137137
}
138138
return result
139139
}
140-
141-
internal suspend inline fun <T> Flow<T>.collectUntil(crossinline block: suspend (value: T) -> Boolean) {
142-
val collector = object : FlowCollector<T> {
143-
override suspend fun emit(value: T) {
144-
if (block(value)) {
145-
throw AbortFlowException(this)
146-
}
147-
}
148-
}
149-
try {
150-
collect(collector)
151-
} catch (e: AbortFlowException) {
152-
e.checkOwnership(collector)
153-
}
154-
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow
6+
7+
import kotlinx.coroutines.*
8+
import kotlin.test.*
9+
10+
class TransformWhileTest : TestBase() {
11+
@Test
12+
fun testSimple() = runTest {
13+
val flow = (0..10).asFlow()
14+
val expected = listOf("A", "B", "C", "D")
15+
val actual = flow.transformWhile { value ->
16+
when(value) {
17+
0 -> { emit("A"); true }
18+
1 -> true
19+
2 -> { emit("B"); emit("C"); true }
20+
3 -> { emit("D"); false }
21+
else -> { expectUnreached(); false }
22+
}
23+
}.toList()
24+
assertEquals(expected, actual)
25+
}
26+
27+
@Test
28+
fun testExample() = runTest {
29+
val source = listOf(
30+
DownloadProgress(0),
31+
DownloadProgress(50),
32+
DownloadProgress(100),
33+
DownloadProgress(147)
34+
)
35+
val expected = source.subList(0, 3)
36+
val actual = source.asFlow().completeWhenDone().toList()
37+
assertEquals(expected, actual)
38+
}
39+
40+
private fun Flow<DownloadProgress>.completeWhenDone(): Flow<DownloadProgress> =
41+
transformWhile { progress ->
42+
emit(progress) // always emit progress
43+
!progress.isDone() // continue while download is not done
44+
}
45+
46+
private data class DownloadProgress(val percent: Int) {
47+
fun isDone() = percent >= 100
48+
}
49+
}

0 commit comments

Comments
 (0)