Skip to content

Commit d5478b6

Browse files
authored
More operators (#1236)
* Scan and emitAll operators * Flow.first operators family (without firstOrNull and firstOrDefault support) * More migrations Fixes #1094 Fixes #1078 Fixes #1244
1 parent d15d8d6 commit d5478b6

File tree

9 files changed

+315
-13
lines changed

9 files changed

+315
-13
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+9
Original file line numberDiff line numberDiff line change
@@ -822,10 +822,13 @@ public final class kotlinx/coroutines/flow/FlowKt {
822822
public static final fun distinctUntilChangedBy (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
823823
public static final fun drop (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
824824
public static final fun dropWhile (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
825+
public static final fun emitAll (Lkotlinx/coroutines/flow/FlowCollector;Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
825826
public static final fun emptyFlow ()Lkotlinx/coroutines/flow/Flow;
826827
public static final fun filter (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
827828
public static final fun filterNot (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
828829
public static final fun filterNotNull (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
830+
public static final fun first (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
831+
public static final fun first (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
829832
public static final fun flatMapConcat (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
830833
public static final fun flatMapMerge (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
831834
public static synthetic fun flatMapMerge$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
@@ -853,6 +856,8 @@ public final class kotlinx/coroutines/flow/FlowKt {
853856
public static final fun retry (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
854857
public static synthetic fun retry$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
855858
public static final fun sample (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
859+
public static final fun scan (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
860+
public static final fun scanReduce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
856861
public static final fun single (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
857862
public static final fun singleOrNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
858863
public static final fun switchMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
@@ -872,13 +877,17 @@ public final class kotlinx/coroutines/flow/MigrationKt {
872877
public static final fun BehaviourSubject ()Ljava/lang/Object;
873878
public static final fun PublishSubject ()Ljava/lang/Object;
874879
public static final fun ReplaySubject ()Ljava/lang/Object;
880+
public static final fun compose (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
875881
public static final fun concatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
876882
public static final fun flatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
877883
public static final fun flatten (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
884+
public static final fun forEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)V
878885
public static final fun merge (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
879886
public static final fun observeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
880887
public static final fun onErrorResume (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
881888
public static final fun publishOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
889+
public static final fun scanFold (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
890+
public static final fun skip (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
882891
public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;)V
883892
public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)V
884893
public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;)V

kotlinx-coroutines-core/common/src/flow/Migration.kt

+53
Original file line numberDiff line numberDiff line change
@@ -193,3 +193,56 @@ public fun <T> Flow<Flow<T>>.merge(): Flow<T> = error("Should not be called")
193193
replaceWith = ReplaceWith("flattenConcat()")
194194
)
195195
public fun <T> Flow<Flow<T>>.flatten(): Flow<T> = error("Should not be called")
196+
197+
/**
198+
* Kotlin has a built-in generic mechanism for making chained calls.
199+
* If you wish to write something like
200+
* ```
201+
* myFlow.compose(MyFlowExtensions.ignoreErrors()).collect { ... }
202+
* ```
203+
* you can replace it with
204+
*
205+
* ```
206+
* myFlow.let(MyFlowExtensions.ignoreErrors()).collect { ... }
207+
* ```
208+
*
209+
* @suppress
210+
*/
211+
@Deprecated(
212+
level = DeprecationLevel.ERROR,
213+
message = "Kotlin analogue of compose is 'let'",
214+
replaceWith = ReplaceWith("let(transformer)")
215+
)
216+
public fun <T, R> Flow<T>.compose(transformer: Flow<T>.() -> Flow<R>): Flow<R> = error("Should not be called")
217+
218+
/**
219+
* @suppress
220+
*/
221+
@Deprecated(
222+
level = DeprecationLevel.ERROR,
223+
message = "Kotlin analogue of 'skip' is 'drop'",
224+
replaceWith = ReplaceWith("drop(count)")
225+
)
226+
public fun <T> Flow<T>.skip(count: Int): Flow<T> = error("Should not be called")
227+
228+
/**
229+
* Flow extension to iterate over elements is [collect].
230+
* Foreach wasn't introduced deliberately to avoid confusion.
231+
* Flow is not a collection, iteration over it may be not idempotent
232+
* and can *launch* computations with side-effects.
233+
* This behaviour is not reflected in [forEach] name.
234+
* @suppress
235+
*/
236+
@Deprecated(
237+
level = DeprecationLevel.ERROR,
238+
message = "Flow analogue of 'forEach' is 'collect'",
239+
replaceWith = ReplaceWith("collect(block)")
240+
)
241+
public fun <T> Flow<T>.forEach(action: suspend (value: T) -> Unit): Unit = error("Should not be called")
242+
243+
@Deprecated(
244+
level = DeprecationLevel.ERROR,
245+
message = "Flow has less verbose 'scan' shortcut",
246+
replaceWith = ReplaceWith("scan(initial, operation)")
247+
)
248+
public fun <T, R> Flow<T>.scanFold(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> = error("Should not be called")

kotlinx-coroutines-core/common/src/flow/operators/Errors.kt

+1-3
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,7 @@ public fun <T> Flow<T>.onErrorCollect(
2727
predicate: ExceptionPredicate = ALWAYS_TRUE
2828
): Flow<T> = collectSafely { e ->
2929
if (!predicate(e)) throw e
30-
fallback.collect { value ->
31-
emit(value)
32-
}
30+
emitAll(fallback)
3331
}
3432

3533
/**

kotlinx-coroutines-core/common/src/flow/operators/Merge.kt

+2-8
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,7 @@ public fun <T, R> Flow<T>.flatMapMerge(
8080
*/
8181
@FlowPreview
8282
public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
83-
collect { value ->
84-
value.collect { innerValue ->
85-
emit(innerValue)
86-
}
87-
}
83+
collect { value -> emitAll(value) }
8884
}
8985

9086
/**
@@ -137,9 +133,7 @@ public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): F
137133
previousFlow?.join()
138134
// Undispatched to have better user experience in case of synchronous flows
139135
previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
140-
transform(value).collect { innerValue ->
141-
downstream.emit(innerValue)
142-
}
136+
downstream.emitAll(transform(value))
143137
}
144138
}
145139
}

kotlinx-coroutines-core/common/src/flow/operators/Transform.kt

+45
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44

55
@file:JvmMultifileClass
66
@file:JvmName("FlowKt")
7+
@file:Suppress("UNCHECKED_CAST")
78

89
package kotlinx.coroutines.flow
910

1011
import kotlinx.coroutines.*
12+
import kotlinx.coroutines.flow.internal.NULL
1113
import kotlin.jvm.*
1214
import kotlinx.coroutines.flow.unsafeFlow as flow
1315

@@ -97,3 +99,46 @@ public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = flow {
9799
emit(value)
98100
}
99101
}
102+
103+
/**
104+
* Folds the given flow with [operation], emitting every intermediate result, including [initial] value.
105+
* Note that initial value should be immutable (or should not be mutated) as it is shared between different collectors.
106+
* For example:
107+
* ```
108+
* flowOf(1, 2, 3).accumulate(emptyList<Int>()) { acc, value -> acc + value }.toList()
109+
* ```
110+
* will produce `[], [1], [1, 2], [1, 2, 3]]`.
111+
*/
112+
@FlowPreview
113+
public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> = flow {
114+
var accumulator: R = initial
115+
emit(accumulator)
116+
collect { value ->
117+
accumulator = operation(accumulator, value)
118+
emit(accumulator)
119+
}
120+
}
121+
122+
/**
123+
* Reduces the given flow with [operation], emitting every intermediate result, including initial value.
124+
* The first element is taken as initial value for operation accumulator.
125+
* This operator has a sibling with initial value -- [scan].
126+
*
127+
* For example:
128+
* ```
129+
* flowOf(1, 2, 3, 4).scan { (v1, v2) -> v1 + v2 }.toList()
130+
* ```
131+
* will produce `[1, 3, 6, 10]`
132+
*/
133+
@FlowPreview
134+
public fun <T> Flow<T>.scanReduce(operation: suspend (accumulator: T, value: T) -> T): Flow<T> = flow {
135+
var accumulator: Any? = NULL
136+
collect { value ->
137+
accumulator = if (accumulator === NULL) {
138+
value
139+
} else {
140+
operation(accumulator as T, value)
141+
}
142+
emit(accumulator as T)
143+
}
144+
}

kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt

+6
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,9 @@ public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value
3232
collect(object : FlowCollector<T> {
3333
override suspend fun emit(value: T) = action(value)
3434
})
35+
36+
/**
37+
* Collects all the values from the given [flow] and emits them to the collector.
38+
* Shortcut for `flow.collect { value -> emit(value) }`.
39+
*/
40+
public suspend inline fun <T> FlowCollector<T>.emitAll(flow: Flow<T>) = flow.collect(this)

kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt

+45-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
@file:JvmMultifileClass
66
@file:JvmName("FlowKt")
7+
@file:Suppress("UNCHECKED_CAST")
78

89
package kotlinx.coroutines.flow
910

@@ -50,7 +51,7 @@ public suspend inline fun <T, R> Flow<T>.fold(
5051
}
5152

5253
/**
53-
* Terminal operator, that awaits for one and only one value to be published.
54+
* The terminal operator, that awaits for one and only one value to be published.
5455
* Throws [NoSuchElementException] for empty flow and [IllegalStateException] for flow
5556
* that contains more than one element.
5657
*/
@@ -68,7 +69,7 @@ public suspend fun <T> Flow<T>.single(): T {
6869
}
6970

7071
/**
71-
* Terminal operator, that awaits for one and only one value to be published.
72+
* The terminal operator, that awaits for one and only one value to be published.
7273
* Throws [IllegalStateException] for flow that contains more than one element.
7374
*/
7475
@FlowPreview
@@ -81,3 +82,45 @@ public suspend fun <T: Any> Flow<T>.singleOrNull(): T? {
8182

8283
return result
8384
}
85+
86+
/**
87+
* The terminal operator that returns the first element emitted by the flow and then cancels flow's collection.
88+
* Throws [NoSuchElementException] if the flow was empty.
89+
*/
90+
@FlowPreview
91+
public suspend fun <T> Flow<T>.first(): T {
92+
var result: Any? = NULL
93+
try {
94+
collect { value ->
95+
result = value
96+
throw AbortFlowException()
97+
}
98+
} catch (e: AbortFlowException) {
99+
// Do nothing
100+
}
101+
102+
if (result === NULL) throw NoSuchElementException("Expected at least one element")
103+
return result as T
104+
}
105+
106+
/**
107+
* The terminal operator that returns the first element emitted by the flow matching the given [predicate] and then cancels flow's collection.
108+
* Throws [NoSuchElementException] if the flow has not contained elements matching the [predicate].
109+
*/
110+
@FlowPreview
111+
public suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T {
112+
var result: Any? = NULL
113+
try {
114+
collect { value ->
115+
if (predicate(value)) {
116+
result = value
117+
throw AbortFlowException()
118+
}
119+
}
120+
} catch (e: AbortFlowException) {
121+
// Do nothing
122+
}
123+
124+
if (result === NULL) throw NoSuchElementException("Expected at least one element matching the predicate $predicate")
125+
return result as T
126+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.channels.*
9+
import kotlin.test.*
10+
11+
class ScanTest : TestBase() {
12+
@Test
13+
fun testScan() = runTest {
14+
val flow = flowOf(1, 2, 3, 4, 5)
15+
val result = flow.scanReduce { acc, v -> acc + v }.toList()
16+
assertEquals(listOf(1, 3, 6, 10, 15), result)
17+
}
18+
19+
@Test
20+
fun testScanWithInitial() = runTest {
21+
val flow = flowOf(1, 2, 3)
22+
val result = flow.scan(emptyList<Int>()) { acc, value -> acc + value }.toList()
23+
assertEquals(listOf(emptyList(), listOf(1), listOf(1, 2), listOf(1, 2, 3)), result)
24+
}
25+
26+
@Test
27+
fun testNulls() = runTest {
28+
val flow = flowOf(null, 2, null, null, null, 5)
29+
val result = flow.scanReduce { acc, v -> if (v == null) acc else (if (acc == null) v else acc + v) }.toList()
30+
assertEquals(listOf(null, 2, 2, 2, 2, 7), result)
31+
}
32+
33+
@Test
34+
fun testEmptyFlow() = runTest {
35+
val result = emptyFlow<Int>().scanReduce { _, _ -> 1 }.toList()
36+
assertTrue(result.isEmpty())
37+
}
38+
39+
@Test
40+
fun testErrorCancelsUpstream() = runTest {
41+
expect(1)
42+
val latch = Channel<Unit>()
43+
val flow = flow {
44+
coroutineScope {
45+
launch {
46+
latch.send(Unit)
47+
hang { expect(3) }
48+
}
49+
emit(1)
50+
emit(2)
51+
}
52+
}.scanReduce { _, value ->
53+
expect(value) // 2
54+
latch.receive()
55+
throw TestException()
56+
}.onErrorCollect(emptyFlow())
57+
58+
assertEquals(1, flow.single())
59+
finish(4)
60+
}
61+
62+
public operator fun <T> Collection<T>.plus(element: T): List<T> {
63+
val result = ArrayList<T>(size + 1)
64+
result.addAll(this)
65+
result.add(element)
66+
return result
67+
}
68+
}

0 commit comments

Comments
 (0)