Skip to content

Commit 3f3f4f7

Browse files
authored
Improve handling of DispatchException in an undispatched case (#4272)
* Properly rename test that is responsible for fast-path failing for undispatched mode * Simplify, document and reduce the binary size of Undispatched.kt * Rethrow dispatcher exceptions during the undispatched coroutines start immediately Otherwise, in the face of malfunctioning dispatcher, there might be no trace of any error, making it extremely hard to debug. The root cause should be addressed by #4209 Fixes #4142
1 parent 4862747 commit 3f3f4f7

File tree

3 files changed

+64
-26
lines changed

3 files changed

+64
-26
lines changed

Diff for: kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt

+37-25
Original file line numberDiff line numberDiff line change
@@ -38,55 +38,67 @@ internal fun <R, T> (suspend (R) -> T).startCoroutineUndispatched(receiver: R, c
3838
*
3939
* It starts the coroutine using [startCoroutineUninterceptedOrReturn].
4040
*/
41-
internal fun <T, R> ScopeCoroutine<T>.startUndispatchedOrReturn(receiver: R, block: suspend R.() -> T): Any? {
42-
return undispatchedResult({ true }) {
43-
block.startCoroutineUninterceptedOrReturn(receiver, this)
44-
}
45-
}
41+
internal fun <T, R> ScopeCoroutine<T>.startUndispatchedOrReturn(
42+
receiver: R, block: suspend R.() -> T
43+
): Any? = startUndspatched(alwaysRethrow = true, receiver, block)
4644

4745
/**
4846
* Same as [startUndispatchedOrReturn], but ignores [TimeoutCancellationException] on fast-path.
4947
*/
5048
internal fun <T, R> ScopeCoroutine<T>.startUndispatchedOrReturnIgnoreTimeout(
5149
receiver: R, block: suspend R.() -> T
52-
): Any? {
53-
return undispatchedResult({ e -> !(e is TimeoutCancellationException && e.coroutine === this) }) {
54-
block.startCoroutineUninterceptedOrReturn(receiver, this)
55-
}
56-
}
50+
): Any? = startUndspatched(alwaysRethrow = false, receiver, block)
5751

58-
private inline fun <T> ScopeCoroutine<T>.undispatchedResult(
59-
shouldThrow: (Throwable) -> Boolean,
60-
startBlock: () -> Any?
52+
/**
53+
* Starts and handles the result of an undispatched coroutine, potentially with children.
54+
* For example, it handles `coroutineScope { ...suspend of throw, maybe start children... }`
55+
* and `launch(start = UNDISPATCHED) { ... }`
56+
*
57+
* @param alwaysRethrow specifies whether an exception should be unconditioanlly rethrown.
58+
* It is a tweak for 'withTimeout' in order to successfully return values when the block was cancelled:
59+
* i.e. `withTimeout(1ms) { Thread.sleep(1000); 42 }` should not fail.
60+
*/
61+
private fun <T, R> ScopeCoroutine<T>.startUndspatched(
62+
alwaysRethrow: Boolean,
63+
receiver: R, block: suspend R.() -> T
6164
): Any? {
6265
val result = try {
63-
startBlock()
66+
block.startCoroutineUninterceptedOrReturn(receiver, this)
67+
} catch (e: DispatchException) {
68+
// Special codepath for failing CoroutineDispatcher: rethrow an exception
69+
// immediately without waiting for children to indicate something is wrong
70+
dispatchExceptionAndMakeCompleting(e)
6471
} catch (e: Throwable) {
6572
CompletedExceptionally(e)
6673
}
74+
6775
/*
68-
* We're trying to complete our undispatched block here and have three code-paths:
69-
* (1) Coroutine is suspended.
70-
* Otherwise, coroutine had returned result, so we are completing our block (and its job).
71-
* (2) If we can't complete it or started waiting for children, we suspend.
72-
* (3) If we have successfully completed the coroutine state machine here,
73-
* then we take the actual final state of the coroutine from makeCompletingOnce and return it.
74-
*
75-
* shouldThrow parameter is a special code path for timeout coroutine:
76-
* If timeout is exceeded, but withTimeout() block was not suspended, we would like to return block value,
77-
* not a timeout exception.
76+
* We are trying to complete our undispatched block with the following possible codepaths:
77+
* 1) The coroutine just suspended. I.e. `coroutineScope { .. suspend here }`.
78+
* Then just suspend
79+
* 2) The coroutine completed with something, but has active children. Wait for them, also suspend
80+
* 3) The coroutine succesfully completed. Return or rethrow its result.
7881
*/
7982
if (result === COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED // (1)
8083
val state = makeCompletingOnce(result)
8184
if (state === COMPLETING_WAITING_CHILDREN) return COROUTINE_SUSPENDED // (2)
8285
afterCompletionUndispatched()
8386
return if (state is CompletedExceptionally) { // (3)
8487
when {
85-
shouldThrow(state.cause) -> throw recoverStackTrace(state.cause, uCont)
88+
alwaysRethrow || notOwnTimeout(state.cause) -> throw recoverStackTrace(state.cause, uCont)
8689
result is CompletedExceptionally -> throw recoverStackTrace(result.cause, uCont)
8790
else -> result
8891
}
8992
} else {
9093
state.unboxState()
9194
}
9295
}
96+
97+
private fun ScopeCoroutine<*>.notOwnTimeout(cause: Throwable): Boolean {
98+
return cause !is TimeoutCancellationException || cause.coroutine !== this
99+
}
100+
101+
private fun ScopeCoroutine<*>.dispatchExceptionAndMakeCompleting(e: DispatchException): Nothing {
102+
makeCompleting(CompletedExceptionally(e.cause))
103+
throw recoverStackTrace(e.cause, uCont)
104+
}

Diff for: kotlinx-coroutines-core/common/test/ExperimentalDispatchModeTest.kt renamed to kotlinx-coroutines-core/common/test/UnconfinedCancellationTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package kotlinx.coroutines
33
import kotlinx.coroutines.testing.*
44
import kotlin.test.*
55

6-
class ExperimentalDispatchModeTest : TestBase() {
6+
class UnconfinedCancellationTest : TestBase() {
77
@Test
88
fun testUnconfinedCancellation() = runTest {
99
val parent = Job()

Diff for: kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt

+26
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,15 @@ package kotlinx.coroutines
44

55
import kotlinx.coroutines.testing.*
66
import kotlinx.coroutines.channels.*
7+
import kotlinx.coroutines.flow.emptyFlow
8+
import kotlinx.coroutines.flow.flow
9+
import kotlinx.coroutines.flow.flowOn
710
import org.junit.*
811
import org.junit.Test
912
import org.junit.rules.*
13+
import kotlin.coroutines.Continuation
14+
import kotlin.coroutines.EmptyCoroutineContext
15+
import kotlin.coroutines.startCoroutine
1016
import kotlin.test.*
1117

1218
class FailFastOnStartTest : TestBase() {
@@ -81,4 +87,24 @@ class FailFastOnStartTest : TestBase() {
8187
fun testAsyncNonChild() = runTest(expected = ::mainException) {
8288
async<Int>(Job() + Dispatchers.Main) { fail() }
8389
}
90+
91+
@Test
92+
fun testFlowOn() {
93+
// See #4142, this test ensures that `coroutineScope { produce(failingDispatcher, ATOMIC) }`
94+
// rethrows an exception. It does not help with the completion of such a coroutine though.
95+
// `suspend {}` + start coroutine with custom `completion` to avoid waiting for test completion
96+
expect(1)
97+
val caller = suspend {
98+
try {
99+
emptyFlow<Int>().flowOn(Dispatchers.Main).collect { fail() }
100+
} catch (e: Throwable) {
101+
assertTrue(mainException(e))
102+
expect(2)
103+
}
104+
}
105+
106+
caller.startCoroutine(Continuation(EmptyCoroutineContext) {
107+
finish(3)
108+
})
109+
}
84110
}

0 commit comments

Comments
 (0)