Skip to content

Commit 46f9ccc

Browse files
authored
Do not report exceptions raised in CoroutineDispatcher.dispatch as internal errors (Kotlin#4181)
Co-authored-by: Dmitry Khalanskiy <[email protected]> Co-authored-by: Vsevolod Tolstopyatov <[email protected]> Fixes Kotlin#4091
1 parent 1500c83 commit 46f9ccc

File tree

11 files changed

+158
-24
lines changed

11 files changed

+158
-24
lines changed

integration-testing/src/jvmCoreTest/kotlin/ListAllCoroutineThrowableSubclassesTest.kt

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ class ListAllCoroutineThrowableSubclassesTest {
2626
"kotlinx.coroutines.internal.DiagnosticCoroutineContextException",
2727
"kotlinx.coroutines.internal.ExceptionSuccessfullyProcessed",
2828
"kotlinx.coroutines.CoroutinesInternalError",
29+
"kotlinx.coroutines.DispatchException",
2930
"kotlinx.coroutines.channels.ClosedSendChannelException",
3031
"kotlinx.coroutines.channels.ClosedReceiveChannelException",
3132
"kotlinx.coroutines.flow.internal.ChildCancelledException",

kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ public abstract class CoroutineDispatcher :
225225
* @suppress **This an internal API and should not be used from general code.**
226226
*/
227227
@InternalCoroutinesApi
228-
public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block)
228+
public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = safeDispatch(context, block)
229229

230230
/**
231231
* Returns a continuation that wraps the provided [continuation], thus intercepting all resumptions.

kotlinx-coroutines-core/common/src/CoroutineExceptionHandler.kt

+4-3
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,19 @@ import kotlin.coroutines.*
1616
*/
1717
@InternalCoroutinesApi
1818
public fun handleCoroutineException(context: CoroutineContext, exception: Throwable) {
19+
val reportException = if (exception is DispatchException) exception.cause else exception
1920
// Invoke an exception handler from the context if present
2021
try {
2122
context[CoroutineExceptionHandler]?.let {
22-
it.handleException(context, exception)
23+
it.handleException(context, reportException)
2324
return
2425
}
2526
} catch (t: Throwable) {
26-
handleUncaughtCoroutineException(context, handlerException(exception, t))
27+
handleUncaughtCoroutineException(context, handlerException(reportException, t))
2728
return
2829
}
2930
// If a handler is not present in the context or an exception was thrown, fallback to the global handler
30-
handleUncaughtCoroutineException(context, exception)
31+
handleUncaughtCoroutineException(context, reportException)
3132
}
3233

3334
internal fun handlerException(originalException: Throwable, thrownException: Throwable): Throwable {

kotlinx-coroutines-core/common/src/Yield.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { u
2626
val context = uCont.context
2727
context.ensureActive()
2828
val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit
29-
if (cont.dispatcher.isDispatchNeeded(context)) {
29+
if (cont.dispatcher.safeIsDispatchNeeded(context)) {
3030
// this is a regular dispatcher -- do simple dispatchYield
3131
cont.dispatchYield(context, Unit)
3232
} else {

kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt

+20-4
Original file line numberDiff line numberDiff line change
@@ -187,10 +187,10 @@ internal class DispatchedContinuation<in T>(
187187

188188
override fun resumeWith(result: Result<T>) {
189189
val state = result.toState()
190-
if (dispatcher.isDispatchNeeded(context)) {
190+
if (dispatcher.safeIsDispatchNeeded(context)) {
191191
_state = state
192192
resumeMode = MODE_ATOMIC
193-
dispatcher.dispatch(context, this)
193+
dispatcher.safeDispatch(context, this)
194194
} else {
195195
executeUnconfined(state, MODE_ATOMIC) {
196196
withCoroutineContext(context, countOrElement) {
@@ -205,10 +205,10 @@ internal class DispatchedContinuation<in T>(
205205
@Suppress("NOTHING_TO_INLINE")
206206
internal inline fun resumeCancellableWith(result: Result<T>) {
207207
val state = result.toState()
208-
if (dispatcher.isDispatchNeeded(context)) {
208+
if (dispatcher.safeIsDispatchNeeded(context)) {
209209
_state = state
210210
resumeMode = MODE_CANCELLABLE
211-
dispatcher.dispatch(context, this)
211+
dispatcher.safeDispatch(context, this)
212212
} else {
213213
executeUnconfined(state, MODE_CANCELLABLE) {
214214
if (!resumeCancelled(state)) {
@@ -249,6 +249,22 @@ internal class DispatchedContinuation<in T>(
249249
"DispatchedContinuation[$dispatcher, ${continuation.toDebugString()}]"
250250
}
251251

252+
internal fun CoroutineDispatcher.safeDispatch(context: CoroutineContext, runnable: Runnable) {
253+
try {
254+
dispatch(context, runnable)
255+
} catch (e: Throwable) {
256+
throw DispatchException(e, this, context)
257+
}
258+
}
259+
260+
internal fun CoroutineDispatcher.safeIsDispatchNeeded(context: CoroutineContext): Boolean {
261+
try {
262+
return isDispatchNeeded(context)
263+
} catch (e: Throwable) {
264+
throw DispatchException(e, this, context)
265+
}
266+
}
267+
252268
/**
253269
* It is not inline to save bytecode (it is pretty big and used in many places)
254270
* and we leave it public so that its name is not mangled in use stack traces if it shows there.

kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt

+19-7
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ internal abstract class DispatchedTask<in T> internal constructor(
7676

7777
final override fun run() {
7878
assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
79-
var fatalException: Throwable? = null
8079
try {
8180
val delegate = delegate as DispatchedContinuation<T>
8281
val continuation = delegate.continuation
@@ -102,11 +101,10 @@ internal abstract class DispatchedTask<in T> internal constructor(
102101
}
103102
}
104103
}
104+
} catch (e: DispatchException) {
105+
handleCoroutineException(delegate.context, e.cause)
105106
} catch (e: Throwable) {
106-
// This instead of runCatching to have nicer stacktrace and debug experience
107-
fatalException = e
108-
} finally {
109-
fatalException?.let { handleFatalException(it) }
107+
handleFatalException(e)
110108
}
111109
}
112110

@@ -143,8 +141,8 @@ internal fun <T> DispatchedTask<T>.dispatch(mode: Int) {
143141
// dispatch directly using this instance's Runnable implementation
144142
val dispatcher = delegate.dispatcher
145143
val context = delegate.context
146-
if (dispatcher.isDispatchNeeded(context)) {
147-
dispatcher.dispatch(context, this)
144+
if (dispatcher.safeIsDispatchNeeded(context)) {
145+
dispatcher.safeDispatch(context, this)
148146
} else {
149147
resumeUnconfined()
150148
}
@@ -205,3 +203,17 @@ internal inline fun DispatchedTask<*>.runUnconfinedEventLoop(
205203
internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) {
206204
resumeWith(Result.failure(recoverStackTrace(exception, this)))
207205
}
206+
207+
/**
208+
* This exception holds an exception raised in [CoroutineDispatcher.dispatch] method.
209+
* When dispatcher methods fail unexpectedly, it is likely a user-induced programmatic bug,
210+
* such as calling `executor.close()` prematurely. To avoid reporting such exceptions as fatal errors,
211+
* we handle them with a separate code path. See also #4091.
212+
*
213+
* @see safeDispatch
214+
*/
215+
internal class DispatchException(
216+
override val cause: Throwable,
217+
dispatcher: CoroutineDispatcher,
218+
context: CoroutineContext,
219+
) : Exception("Coroutine dispatcher $dispatcher threw an exception, context = $context", cause)

kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ internal class LimitedDispatcher(
4242

4343
override fun dispatch(context: CoroutineContext, block: Runnable) {
4444
dispatchInternal(block) { worker ->
45-
dispatcher.dispatch(this, worker)
45+
dispatcher.safeDispatch(this, worker)
4646
}
4747
}
4848

@@ -116,10 +116,10 @@ internal class LimitedDispatcher(
116116
}
117117
currentTask = obtainTaskOrDeallocateWorker() ?: return
118118
// 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well
119-
if (++fairnessCounter >= 16 && dispatcher.isDispatchNeeded(this@LimitedDispatcher)) {
119+
if (++fairnessCounter >= 16 && dispatcher.safeIsDispatchNeeded(this@LimitedDispatcher)) {
120120
// Do "yield" to let other views execute their runnable as well
121121
// Note that we do not decrement 'runningWorkers' as we are still committed to our part of work
122-
dispatcher.dispatch(this@LimitedDispatcher, this)
122+
dispatcher.safeDispatch(this@LimitedDispatcher, this)
123123
return
124124
}
125125
}

kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt

+3-2
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ private fun dispatcherFailure(completion: Continuation<*>, e: Throwable) {
5858
* 2) Rethrow the exception immediately, so it will crash the caller (e.g. when the coroutine had
5959
* no parent or it was async/produce over MainScope).
6060
*/
61-
completion.resumeWith(Result.failure(e))
62-
throw e
61+
val reportException = if (e is DispatchException) e.cause else e
62+
completion.resumeWith(Result.failure(reportException))
63+
throw reportException
6364
}

kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ internal fun <R, T> (suspend (R) -> T).startCoroutineUndispatched(receiver: R, c
2020
startCoroutineUninterceptedOrReturn(receiver, actualCompletion)
2121
}
2222
} catch (e: Throwable) {
23-
actualCompletion.resumeWithException(e)
23+
val reportException = if (e is DispatchException) e.cause else e
24+
actualCompletion.resumeWithException(reportException)
2425
return
2526
}
2627
if (value !== COROUTINE_SUSPENDED) {

kotlinx-coroutines-core/jvm/src/Executors.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,8 @@ public fun CoroutineDispatcher.asExecutor(): Executor =
105105

106106
private class DispatcherExecutor(@JvmField val dispatcher: CoroutineDispatcher) : Executor {
107107
override fun execute(block: Runnable) {
108-
if (dispatcher.isDispatchNeeded(EmptyCoroutineContext)) {
109-
dispatcher.dispatch(EmptyCoroutineContext, block)
108+
if (dispatcher.safeIsDispatchNeeded(EmptyCoroutineContext)) {
109+
dispatcher.safeDispatch(EmptyCoroutineContext, block)
110110
} else {
111111
block.run()
112112
}

kotlinx-coroutines-core/jvm/test/ExecutorsTest.kt

+102
Original file line numberDiff line numberDiff line change
@@ -119,4 +119,106 @@ class ExecutorsTest : TestBase() {
119119
dispatcher.close()
120120
check(executorService.isShutdown)
121121
}
122+
123+
@Test
124+
fun testEarlyExecutorShutdown() {
125+
runTestExceptionInDispatch(6, { it is RejectedExecutionException }) {
126+
expect(1)
127+
val dispatcher = newSingleThreadContext("Ctx")
128+
launch(dispatcher) {
129+
withContext(Dispatchers.Default) {
130+
expect(2)
131+
delay(100)
132+
expect(4)
133+
}
134+
}
135+
136+
delay(50)
137+
expect(3)
138+
139+
dispatcher.close()
140+
}
141+
}
142+
143+
@Test
144+
fun testExceptionInDispatch() {
145+
runTestExceptionInDispatch(5, { it is TestException }) {
146+
val dispatcher = object : CoroutineDispatcher() {
147+
private var closed = false
148+
override fun dispatch(context: CoroutineContext, block: Runnable) {
149+
if (closed) throw TestException()
150+
Dispatchers.Default.dispatch(context, block)
151+
}
152+
153+
fun close() {
154+
closed = true
155+
}
156+
}
157+
launch(dispatcher) {
158+
withContext(Dispatchers.Default) {
159+
expect(1)
160+
delay(100)
161+
expect(3)
162+
}
163+
}
164+
165+
delay(50)
166+
expect(2)
167+
dispatcher.close()
168+
}
169+
}
170+
171+
@Test
172+
fun testExceptionInIsDispatchNeeded() {
173+
val dispatcher = object : CoroutineDispatcher() {
174+
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
175+
expect(2)
176+
throw TestException()
177+
}
178+
override fun dispatch(context: CoroutineContext, block: Runnable) = expectUnreached()
179+
}
180+
try {
181+
runBlocking {
182+
expect(1)
183+
try {
184+
launch(dispatcher) {
185+
expectUnreached()
186+
}
187+
expectUnreached()
188+
} catch (_: TestException) {
189+
expect(3)
190+
}
191+
192+
}
193+
} catch (_: TestException) {
194+
finish(4)
195+
}
196+
}
197+
198+
private fun runTestExceptionInDispatch(
199+
totalSteps: Int,
200+
isExpectedException: (Throwable) -> Boolean,
201+
block: suspend CoroutineScope.() -> Unit,
202+
) {
203+
var mainThread: Thread? = null
204+
val exceptionHandler = CoroutineExceptionHandler { _, e ->
205+
if (isExpectedException(e)) {
206+
expect(totalSteps - 1)
207+
mainThread!!.run {
208+
interrupt()
209+
unpark(this)
210+
}
211+
} else {
212+
expectUnreached()
213+
}
214+
}
215+
try {
216+
runBlocking(exceptionHandler) {
217+
block()
218+
mainThread = Thread.currentThread()
219+
}
220+
} catch (_: InterruptedException) {
221+
finish(totalSteps)
222+
}
223+
}
122224
}

0 commit comments

Comments
 (0)