Skip to content

Commit 8d8b6eb

Browse files
authored
Declarative flow operators (#1291)
* Flow.onCompletion operator * Flow.launchIn operator Fixes #1263
1 parent 502610e commit 8d8b6eb

File tree

8 files changed

+297
-35
lines changed

8 files changed

+297
-35
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+4-2
Original file line numberDiff line numberDiff line change
@@ -860,8 +860,10 @@ public final class kotlinx/coroutines/flow/FlowKt {
860860
public static synthetic fun flowWith$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
861861
public static final fun fold (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
862862
public static final fun getDEFAULT_CONCURRENCY ()I
863+
public static final fun launchIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/Job;
863864
public static final fun map (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
864865
public static final fun mapNotNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
866+
public static final fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
865867
public static final fun onEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
866868
public static final fun onErrorCollect (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
867869
public static synthetic fun onErrorCollect$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
@@ -910,8 +912,8 @@ public final class kotlinx/coroutines/flow/MigrationKt {
910912
public static final fun scanFold (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
911913
public static final fun skip (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
912914
public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;)V
913-
public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)V
914-
public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;)V
915+
public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)V
916+
public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;)V
915917
public static final fun subscribeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
916918
public static final fun withContext (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;)V
917919
}

kotlinx-coroutines-core/common/src/flow/Flow.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ import kotlin.coroutines.*
1818
* They only set up a chain of operations for future execution and quickly return.
1919
* This is known as a _cold flow_ property.
2020
*
21-
* _Terminal operators_ on the flow are suspending functions such as [collect], [single], [reduce], [toList], etc.
21+
* _Terminal operators_ on the flow are either suspending functions such as [collect], [single], [reduce], [toList], etc.
22+
* or [launchIn] operator that starts collection of the flow in the given scope.
2223
* They are applied to the upstream flow and trigger execution of all operations.
2324
* Execution of the flow is also called _collecting the flow_ and is always performed in a suspending manner
2425
* without actual blocking. Terminal operator complete normally or exceptionally depending on successful or failed
@@ -142,6 +143,7 @@ import kotlin.coroutines.*
142143
* .map { computeTwo(it) }
143144
* .collect { process(it) } // throws exceptions from process and computeTwo
144145
* ```
146+
* The same reasoning can be applied to [onCompletion] operator that is a declarative replacement for `finally` block.
145147
*
146148
* Failure to adhere to the exception transparency requirement would result in strange behaviours that would make
147149
* it hard to reason about the code because an exception in the `collect { ... }` could be somehow "caught"

kotlinx-coroutines-core/common/src/flow/Migration.kt

+23-17
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public fun <T, R> FlowCollector<T>.withContext(context: CoroutineContext, block:
127127

128128
/**
129129
* `subscribe` is Rx-specific API that has no direct match in flows.
130-
* One can use `launch` instead, for example the following:
130+
* One can use [launchIn] instead, for example the following:
131131
* ```
132132
* flowable
133133
* .observeOn(Schedulers.io())
@@ -136,30 +136,36 @@ public fun <T, R> FlowCollector<T>.withContext(context: CoroutineContext, block:
136136
*
137137
* has the following Flow equivalent:
138138
* ```
139-
* launch(Dispatchers.IO) {
140-
* try {
141-
* flow.collect { value ->
142-
* println("Received $value")
143-
* }
144-
* println("Flow is completed successfully")
145-
* } catch (e: Throwable) {
146-
* println("Exception $e happened")
147-
* }
148-
* }
139+
* flow
140+
* .onEach { value -> println("Received $value") }
141+
* .onCompletion { cause -> if (cause == null) println("Flow is completed successfully") }
142+
* .catch { cause -> println("Exception $cause happened") }
143+
* .flowOn(Dispatchers.IO)
144+
* .launchIn(myScope)
149145
* ```
150-
* But most of the time it is better to use terminal operators like [single] instead of [collect].
146+
*
147+
* Note that resulting value of [launchIn] is not used because the provided scope takes care of cancellation.
148+
*
149+
* Or terminal operators like [single] can be used from suspend functions.
151150
* @suppress
152151
*/
153-
@Deprecated(message = "Use launch + collect instead", level = DeprecationLevel.ERROR)
152+
@Deprecated(
153+
message = "Use launchIn with onEach, onCompletion and catch operators instead",
154+
level = DeprecationLevel.ERROR
155+
)
154156
public fun <T> Flow<T>.subscribe(): Unit = error("Should not be called")
155157

156158
/** @suppress **/
157-
@Deprecated(message = "Use launch + collect instead", level = DeprecationLevel.ERROR)
158-
public fun <T> Flow<T>.subscribe(onEach: (T) -> Unit): Unit = error("Should not be called")
159+
@Deprecated(
160+
message = "Use launchIn with onEach, onCompletion and catch operators instead",
161+
level = DeprecationLevel.ERROR
162+
)public fun <T> Flow<T>.subscribe(onEach: suspend (T) -> Unit): Unit = error("Should not be called")
159163

160164
/** @suppress **/
161-
@Deprecated(message = "Use launch + collect instead", level = DeprecationLevel.ERROR)
162-
public fun <T> Flow<T>.subscribe(onEach: (T) -> Unit, onError: (Throwable) -> Unit): Unit = error("Should not be called")
165+
@Deprecated(
166+
message = "Use launchIn with onEach, onCompletion and catch operators instead",
167+
level = DeprecationLevel.ERROR
168+
)public fun <T> Flow<T>.subscribe(onEach: suspend (T) -> Unit, onError: suspend (Throwable) -> Unit): Unit = error("Should not be called")
163169

164170
/**
165171
* Note that this replacement is sequential (`concat`) by default.

kotlinx-coroutines-core/common/src/flow/operators/Errors.kt

+14-13
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,8 @@ import kotlinx.coroutines.flow.unsafeFlow as flow
5757
@ExperimentalCoroutinesApi // tentatively stable in 1.3.0
5858
public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(cause: Throwable) -> Unit): Flow<T> =
5959
flow {
60-
catchImpl(this) { e ->
61-
action(e)
62-
}
60+
val exception = catchImpl(this)
61+
if (exception != null) action(exception)
6362
}
6463

6564
/**
@@ -177,22 +176,22 @@ public fun <T> Flow<T>.retryWhen(predicate: suspend FlowCollector<T>.(cause: Thr
177176
var shallRetry: Boolean
178177
do {
179178
shallRetry = false
180-
catchImpl(this) { e ->
181-
if (predicate(e, attempt)) {
179+
val cause = catchImpl(this)
180+
if (cause != null) {
181+
if (predicate(cause, attempt)) {
182182
shallRetry = true
183183
attempt++
184184
} else {
185-
throw e
185+
throw cause
186186
}
187187
}
188188
} while (shallRetry)
189189
}
190190

191-
// Note that exception may come from the downstream operators, we should not catch them
192-
private suspend inline fun <T> Flow<T>.catchImpl(
193-
collector: FlowCollector<T>,
194-
action: FlowCollector<T>.(Throwable) -> Unit
195-
) {
191+
// Return exception from upstream or null
192+
internal suspend fun <T> Flow<T>.catchImpl(
193+
collector: FlowCollector<T>
194+
): Throwable? {
196195
var fromDownstream: Throwable? = null
197196
try {
198197
collect {
@@ -209,10 +208,12 @@ private suspend inline fun <T> Flow<T>.catchImpl(
209208
* Seconds check ignores cancellation causes, they cannot be caught.
210209
*/
211210
if (e.isSameExceptionAs(fromDownstream) || e.isCancellationCause(coroutineContext)) {
212-
throw e
211+
throw e // Rethrow exceptions from downstream and cancellation causes
212+
} else {
213+
return e // not from downstream
213214
}
214-
collector.action(e)
215215
}
216+
return null
216217
}
217218

218219
private fun Throwable.isCancellationCause(coroutineContext: CoroutineContext): Boolean {

kotlinx-coroutines-core/common/src/flow/operators/Transform.kt

+60-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
package kotlinx.coroutines.flow
1010

1111
import kotlinx.coroutines.*
12-
import kotlinx.coroutines.flow.internal.NULL
12+
import kotlinx.coroutines.flow.internal.*
13+
import kotlin.coroutines.*
1314
import kotlin.jvm.*
1415
import kotlinx.coroutines.flow.unsafeFlow as flow
1516

@@ -100,6 +101,64 @@ public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = flow {
100101
}
101102
}
102103

104+
/**
105+
* Invokes the given [action] when the given flow is completed or cancelled, using
106+
* the exception from the upstream (if any) as cause parameter of [action].
107+
*
108+
* Conceptually, [onCompletion] is similar to wrapping the flow collection into a `finally` block,
109+
* for example the following imperative snippet:
110+
* ```
111+
* try {
112+
* myFlow.collect { value ->
113+
* println(value)
114+
* }
115+
* } finally {
116+
* println("Done")
117+
* }
118+
* ```
119+
*
120+
* can be replaced with a declarative one using [onCompletion]:
121+
* ```
122+
* myFlow
123+
* .onEach { println(it) }
124+
* .onCompletion { println("Done") }
125+
* .collect()
126+
* ```
127+
*
128+
* This operator is *transparent* to exceptions that occur in downstream flow
129+
* and does not observe exceptions that are thrown to cancel the flow,
130+
* while any exception from the [action] will be thrown downstream.
131+
* This behaviour can be demonstrated by the following example:
132+
* ```
133+
* flow { emitData() }
134+
* .map { computeOne(it) }
135+
* .onCompletion { println(it) } // Can print exceptions from emitData and computeOne
136+
* .map { computeTwo(it) }
137+
* .onCompletion { println(it) } // Can print exceptions from emitData, computeOne, onCompletion and computeTwo
138+
* .collect()
139+
* ```
140+
*/
141+
@ExperimentalCoroutinesApi // tentatively stable in 1.3.0
142+
public fun <T> Flow<T>.onCompletion(action: suspend (cause: Throwable?) -> Unit): Flow<T> = flow {
143+
var exception: Throwable? = null
144+
try {
145+
exception = catchImpl(this)
146+
} finally {
147+
// Separate method because of KT-32220
148+
invokeSafely(action, exception)
149+
exception?.let { throw it }
150+
}
151+
}
152+
153+
private suspend fun invokeSafely(action: suspend (cause: Throwable?) -> Unit, cause: Throwable?) {
154+
try {
155+
action(cause)
156+
} catch (e: Throwable) {
157+
if (cause !== null) e.addSuppressedThrowable(cause)
158+
throw e
159+
}
160+
}
161+
103162
/**
104163
* Folds the given flow with [operation], emitting every intermediate result, including [initial] value.
105164
* Note that initial value should be immutable (or should not be mutated) as it is shared between different collectors.

kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt

+22-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import kotlin.jvm.*
1717
*
1818
* It is a shorthand for `collect {}`.
1919
*
20-
* This operator is usually used with [onEach] and [catch] operators to process all emitted values and
20+
* This operator is usually used with [onEach], [onCompletion] and [catch] operators to process all emitted values and
2121
* handle an exception that might occur in the upstream flow or during processing, for example:
2222
*
2323
* ```
@@ -30,6 +30,27 @@ import kotlin.jvm.*
3030
@ExperimentalCoroutinesApi // tentatively stable in 1.3.0
3131
public suspend fun Flow<*>.collect() = collect(NopCollector)
3232

33+
/**
34+
* Terminal flow operator that [launches][launch] the [collection][collect] of the given flow in the [scope].
35+
* It is a shorthand for `scope.launch { flow.collect() }`.
36+
*
37+
* This operator is usually used with [onEach], [onCompletion] and [catch] operators to process all emitted values
38+
* handle an exception that might occur in the upstream flow or during processing, for example:
39+
* ```
40+
* flow
41+
* .onEach { value -> updateUi(value) }
42+
* .onCompletion { cause -> updateUi(if (cause == null) "Done" else "Failed") }
43+
* .catch { cause -> LOG.error("Exception: $cause") }
44+
* .launchIn(uiScope)
45+
* ```
46+
*
47+
* Note that resulting value of [launchIn] is not used the provided scope takes care of cancellation.
48+
*/
49+
@ExperimentalCoroutinesApi // tentatively stable in 1.3.0
50+
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
51+
collect() // tail-call
52+
}
53+
3354
/**
3455
* Terminal flow operator that collects the given flow with a provided [action].
3556
* If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow
6+
7+
import kotlinx.coroutines.*
8+
import kotlin.test.*
9+
10+
class OnCompletionTest : TestBase() {
11+
12+
@Test
13+
fun testOnCompletion() = runTest {
14+
flow {
15+
expect(1)
16+
emit(2)
17+
expect(4)
18+
}.onEach {
19+
expect(2)
20+
}.onCompletion {
21+
assertNull(it)
22+
expect(5)
23+
}.onEach {
24+
expect(3)
25+
}.collect()
26+
finish(6)
27+
}
28+
29+
@Test
30+
fun testOnCompletionWithException() = runTest {
31+
flowOf(1).onEach {
32+
expect(1)
33+
throw TestException()
34+
}.onCompletion {
35+
assertTrue(it is TestException)
36+
expect(2)
37+
}.catch {
38+
assertTrue(it is TestException)
39+
expect(3)
40+
}.collect()
41+
finish(4)
42+
}
43+
44+
@Test
45+
fun testOnCompletionWithExceptionDownstream() = runTest {
46+
flow {
47+
expect(1)
48+
emit(2)
49+
}.onEach {
50+
expect(2)
51+
}.onCompletion {
52+
assertNull(it)
53+
expect(4)
54+
}.onEach {
55+
expect(3)
56+
throw TestException()
57+
}.catch {
58+
assertTrue(it is TestException)
59+
expect(5)
60+
}.collect()
61+
finish(6)
62+
}
63+
64+
@Test
65+
fun testMultipleOnCompletions() = runTest {
66+
flowOf(1).onCompletion {
67+
assertNull(it)
68+
expect(2)
69+
}.onEach {
70+
expect(1)
71+
throw TestException()
72+
}.onCompletion {
73+
assertTrue(it is TestException)
74+
expect(3)
75+
}.catch {
76+
assertTrue(it is TestException)
77+
expect(4)
78+
}.collect()
79+
finish(5)
80+
}
81+
82+
@Test
83+
fun testExceptionFromOnCompletion() = runTest {
84+
flowOf(1).onEach {
85+
expect(1)
86+
throw TestException()
87+
}.onCompletion {
88+
expect(2)
89+
throw TestException2()
90+
}.catch {
91+
assertTrue(it is TestException2)
92+
expect(3)
93+
}.collect()
94+
finish(4)
95+
}
96+
97+
@Test
98+
fun testContextPreservation() = runTest {
99+
flowOf(1).onCompletion {
100+
assertEquals("OK", NamedDispatchers.name())
101+
assertNull(it)
102+
expect(1)
103+
}.flowOn(NamedDispatchers("OK"))
104+
.onEach {
105+
expect(2)
106+
assertEquals("default", NamedDispatchers.nameOr("default"))
107+
throw TestException()
108+
}
109+
.catch {
110+
assertTrue(it is TestException)
111+
expect(3)
112+
}.collect()
113+
finish(4)
114+
}
115+
}

0 commit comments

Comments
 (0)