Skip to content

Commit abced79

Browse files
committed
Rethrow downstream exception during "onCompletion" emissions
* We cannot allow emitting elements when downstream exception occurred, otherwise it may lead to a weird side-effects when "collect" block (or any other terminal operator) has thrown an exception, but keeps receiving new values * Another solution may be to silently ignore emitted values, but it may lead to a postponed cancellation and surprising behaviour for users Fixes #1654
1 parent 1da7311 commit abced79

File tree

2 files changed

+101
-12
lines changed

2 files changed

+101
-12
lines changed

kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt

+20-9
Original file line numberDiff line numberDiff line change
@@ -128,13 +128,26 @@ public fun <T> Flow<T>.onStart(
128128
public fun <T> Flow<T>.onCompletion(
129129
action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
130130
): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke completion action
131-
var exception: Throwable? = null
132-
try {
133-
exception = catchImpl(this)
134-
} finally {
135-
// Separate method because of KT-32220
136-
SafeCollector<T>(this, coroutineContext).invokeSafely(action, exception)
137-
exception?.let { throw it }
131+
val exception = try {
132+
catchImpl(this)
133+
} catch (e: Throwable) {
134+
/*
135+
* Exception from the downstream.
136+
* Use throwing collector to prevent any emissions from the
137+
* completion sequence when downstream has failed, otherwise it may
138+
* lead to a non-sequential behaviour impossible with `finally`
139+
*/
140+
ThrowingCollector(e).invokeSafely(action, null)
141+
throw e
142+
}
143+
// Exception from the upstream or normal completion
144+
SafeCollector(this, coroutineContext).invokeSafely(action, exception)
145+
exception?.let { throw it }
146+
}
147+
148+
private class ThrowingCollector(private val e: Throwable) : FlowCollector<Any?> {
149+
override suspend fun emit(value: Any?) {
150+
throw e
138151
}
139152
}
140153

@@ -155,5 +168,3 @@ private suspend fun <T> FlowCollector<T>.invokeSafely(
155168
throw e
156169
}
157170
}
158-
159-

kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt

+81-3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package kotlinx.coroutines.flow
66

77
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.internal.*
89
import kotlin.test.*
910

1011
class OnCompletionTest : TestBase() {
@@ -171,14 +172,91 @@ class OnCompletionTest : TestBase() {
171172
.onCompletion { e ->
172173
expect(8)
173174
assertNull(e)
174-
emit(TestData.Done(e))
175+
try {
176+
emit(TestData.Done(e))
177+
expectUnreached()
178+
} finally {
179+
expect(9)
180+
}
175181
}.collect {
176182
collected += it
177183
}
178184
}
179185
}
180-
val expected = (1..5).map { TestData.Value(it) } + TestData.Done(null)
186+
val expected = (1..5).map<Int, TestData> { TestData.Value(it) }
181187
assertEquals(expected, collected)
182-
finish(9)
188+
finish(10)
189+
}
190+
191+
@Test
192+
fun testFailedEmit() = runTest {
193+
val cause = TestException()
194+
assertFailsWith<TestException> {
195+
flow<TestData> {
196+
expect(1)
197+
emit(TestData.Value(2))
198+
expectUnreached()
199+
}.onCompletion {
200+
assertNull(it)
201+
expect(3)
202+
try {
203+
emit(TestData.Done(it))
204+
expectUnreached()
205+
} catch (e: TestException) {
206+
assertSame(cause, e)
207+
finish(4)
208+
}
209+
}.collect {
210+
expect((it as TestData.Value).i)
211+
throw cause
212+
}
213+
}
214+
}
215+
216+
@Test
217+
fun testFirst() = runTest {
218+
val value = flowOf(239).onCompletion {
219+
assertNull(it)
220+
expect(1)
221+
try {
222+
emit(42)
223+
expectUnreached()
224+
} catch (e: Throwable) {
225+
assertTrue { e is AbortFlowException }
226+
}
227+
}.first()
228+
assertEquals(239, value)
229+
finish(2)
230+
}
231+
232+
@Test
233+
fun testSingle() = runTest {
234+
assertFailsWith<IllegalStateException> {
235+
flowOf(239).onCompletion {
236+
assertNull(it)
237+
expect(1)
238+
try {
239+
emit(42)
240+
expectUnreached()
241+
} catch (e: Throwable) {
242+
// Second emit -- failure
243+
assertTrue { e is IllegalStateException }
244+
throw e
245+
}
246+
}.single()
247+
expectUnreached()
248+
}
249+
finish(2)
250+
}
251+
252+
@Test
253+
fun testEmptySingleInterference() = runTest {
254+
val value = emptyFlow<Int>().onCompletion {
255+
assertNull(it)
256+
expect(1)
257+
emit(42)
258+
}.single()
259+
assertEquals(42, value)
260+
finish(2)
183261
}
184262
}

0 commit comments

Comments
 (0)