File tree 2 files changed +37
-0
lines changed
2 files changed +37
-0
lines changed Original file line number Diff line number Diff line change @@ -64,6 +64,10 @@ private fun <T, R> ScopeCoroutine<T>.startUndspatched(
64
64
): Any? {
65
65
val result = try {
66
66
block.startCoroutineUninterceptedOrReturn(receiver, this )
67
+ } catch (e: DispatchException ) {
68
+ // Special codepath for failing CoroutineDispatcher: rethrow an exception
69
+ // immediately without waiting for children to indicate something is wrong
70
+ dispatchException(e)
67
71
} catch (e: Throwable ) {
68
72
CompletedExceptionally (e)
69
73
}
@@ -93,3 +97,8 @@ private fun <T, R> ScopeCoroutine<T>.startUndspatched(
93
97
private fun ScopeCoroutine <* >.notOwnTimeout (cause : Throwable ): Boolean {
94
98
return cause !is TimeoutCancellationException || cause.coroutine != = this
95
99
}
100
+
101
+ private fun ScopeCoroutine <* >.dispatchException (e : DispatchException ) {
102
+ makeCompleting(CompletedExceptionally (e.cause))
103
+ throw throw recoverStackTrace(e.cause, uCont)
104
+ }
Original file line number Diff line number Diff line change @@ -4,9 +4,17 @@ package kotlinx.coroutines
4
4
5
5
import kotlinx.coroutines.testing.*
6
6
import kotlinx.coroutines.channels.*
7
+ import kotlinx.coroutines.flow.emptyFlow
8
+ import kotlinx.coroutines.flow.flow
9
+ import kotlinx.coroutines.flow.flowOn
7
10
import org.junit.*
8
11
import org.junit.Test
9
12
import org.junit.rules.*
13
+ import org.junit.runners.model.TestTimedOutException
14
+ import java.util.concurrent.TimeoutException
15
+ import kotlin.coroutines.Continuation
16
+ import kotlin.coroutines.EmptyCoroutineContext
17
+ import kotlin.coroutines.startCoroutine
10
18
import kotlin.test.*
11
19
12
20
class FailFastOnStartTest : TestBase () {
@@ -81,4 +89,24 @@ class FailFastOnStartTest : TestBase() {
81
89
fun testAsyncNonChild () = runTest(expected = ::mainException) {
82
90
async<Int >(Job () + Dispatchers .Main ) { fail() }
83
91
}
92
+
93
+ @Test
94
+ fun testFlowOn () {
95
+ // See #4142, this test ensures that `coroutineScope { produce(failingDispatcher, ATOMIC) }`
96
+ // rethrows an exception. It does not help with the completion of such a coroutine though.
97
+ // Hack to avoid waiting for test completion
98
+ expect(1 )
99
+ val caller = suspend {
100
+ try {
101
+ emptyFlow<Int >().flowOn(Dispatchers .Main ).collect { fail() }
102
+ } catch (e: Throwable ) {
103
+ assertTrue(mainException(e))
104
+ expect(2 )
105
+ }
106
+ }
107
+
108
+ caller.startCoroutine(Continuation (EmptyCoroutineContext ) {
109
+ finish(3 )
110
+ })
111
+ }
84
112
}
You can’t perform that action at this time.
0 commit comments