Skip to content

Commit 935b83a

Browse files
committed
Change the exception suppression invariant to work backwards:
All the benifits of the approach stay the same, but additionally, for an arbitrary flow pipeline, adding a catch operator that is not triggered will no longer change the type of resulting exception
1 parent 08a3ed3 commit 935b83a

File tree

5 files changed

+59
-10
lines changed

5 files changed

+59
-10
lines changed

kotlinx-coroutines-core/common/src/flow/Flow.kt

+4-2
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ import kotlin.coroutines.*
133133
*
134134
* When `emit` or `emitAll` throws, the Flow implementations must immediately stop emitting new values and finish with an exception.
135135
* For diagnostics or application-specific purposes, the exception may be different from the one thrown by the emit operation,
136-
* but then it will be suppressed, as discussed below.
136+
* suppressing the original exception as discussed below.
137137
* If there is a need to emit values after the downstream failed, please use the [catch][Flow.catch] operator.
138138
*
139139
* The [catch][Flow.catch] operator only catches upstream exceptions, but passes
@@ -152,7 +152,9 @@ import kotlin.coroutines.*
152152
* All exception-handling Flow operators follow the principle of exception suppression:
153153
*
154154
* If the upstream flow throws an exception during its completion when the downstream exception has been thrown,
155-
* the upstream exception becomes superseded and suppressed by the downstream exception.
155+
* the downstream exception becomes superseded and suppressed by the upstream exception, being a semantic
156+
* equivalent of throwing from `finally` block. Exception-handling operators then ignore this exception,
157+
* still following the downstream failure.
156158
*
157159
* Failure to adhere to the exception transparency requirement can lead to strange behaviors which make
158160
* it hard to reason about the code because an exception in the `collect { ... }` could be somehow "caught"

kotlinx-coroutines-core/common/src/flow/operators/Errors.kt

+5-6
Original file line numberDiff line numberDiff line change
@@ -214,8 +214,7 @@ internal suspend fun <T> Flow<T>.catchImpl(
214214
* The exception came from the upstream [semi-] independently.
215215
* For pure failures, when the downstream functions normally, we handle the exception as intended.
216216
* But if the downstream has failed prior to or concurrently
217-
* with the upstream, we forcefully rethrow it, preserving the contextual information and ensuring
218-
* that it's not lost.
217+
* with the upstream, we forcefully rethrow it, preserving the contextual information and ensuring that it's not lost.
219218
*/
220219
if (fromDownstream == null) {
221220
return e
@@ -238,12 +237,12 @@ internal suspend fun <T> Flow<T>.catchImpl(
238237
* ```
239238
* when *the downstream* throws.
240239
*/
241-
if (fromDownstream is CancellationException) {
242-
e.addSuppressed(fromDownstream)
243-
throw e
244-
} else {
240+
if (e is CancellationException) {
245241
fromDownstream.addSuppressed(e)
246242
throw fromDownstream
243+
} else {
244+
e.addSuppressed(fromDownstream)
245+
throw e
247246
}
248247
}
249248
}

kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ class CatchTest : TestBase() {
160160
throw TestException2()
161161
}
162162

163-
assertFailsWith<TestException2>(flow)
163+
assertFailsWith<TestException>(flow)
164164
finish(4)
165165
}
166166

kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ class RetryTest : TestBase() {
120120
throw TestException2()
121121
}
122122

123-
assertFailsWith<TestException2>(flow)
123+
assertFailsWith<TestException>(flow)
124124
finish(4)
125125
}
126126

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow
6+
7+
import kotlinx.coroutines.*
8+
import org.junit.Test
9+
import kotlin.test.*
10+
11+
class FlowSuppressionTest : TestBase() {
12+
@Test
13+
fun testSuppressionForPrimaryException() = runTest {
14+
val flow = flow {
15+
try {
16+
emit(1)
17+
} finally {
18+
throw TestException()
19+
}
20+
}.catch { expectUnreached() }.onEach { throw TestException2() }
21+
22+
try {
23+
flow.collect()
24+
} catch (e: Throwable) {
25+
assertIs<TestException>(e)
26+
assertIs<TestException2>(e.suppressed[0])
27+
}
28+
}
29+
30+
@Test
31+
fun testSuppressionForPrimaryExceptionRetry() = runTest {
32+
val flow = flow {
33+
try {
34+
emit(1)
35+
} finally {
36+
throw TestException()
37+
}
38+
}.retry { expectUnreached(); true }.onEach { throw TestException2() }
39+
40+
try {
41+
flow.collect()
42+
} catch (e: Throwable) {
43+
assertIs<TestException>(e)
44+
assertIs<TestException2>(e.suppressed[0])
45+
46+
}
47+
}
48+
}

0 commit comments

Comments
 (0)