Skip to content

Commit 08439cc

Browse files
committed
Do not report exceptions raised in CoroutineDispatcher.dispatch as internal errors
1 parent 5e03ca2 commit 08439cc

File tree

3 files changed

+98
-2
lines changed

3 files changed

+98
-2
lines changed

kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt

+10-2
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ internal class DispatchedContinuation<in T>(
191191
if (dispatcher.isDispatchNeeded(context)) {
192192
_state = state
193193
resumeMode = MODE_ATOMIC
194-
dispatcher.dispatch(context, this)
194+
dispatchWithExceptionHandling(context)
195195
} else {
196196
executeUnconfined(state, MODE_ATOMIC) {
197197
withCoroutineContext(this.context, countOrElement) {
@@ -212,7 +212,7 @@ internal class DispatchedContinuation<in T>(
212212
if (dispatcher.isDispatchNeeded(context)) {
213213
_state = state
214214
resumeMode = MODE_CANCELLABLE
215-
dispatcher.dispatch(context, this)
215+
dispatchWithExceptionHandling(context)
216216
} else {
217217
executeUnconfined(state, MODE_CANCELLABLE) {
218218
if (!resumeCancelled(state)) {
@@ -260,6 +260,14 @@ internal class DispatchedContinuation<in T>(
260260

261261
override fun toString(): String =
262262
"DispatchedContinuation[$dispatcher, ${continuation.toDebugString()}]"
263+
264+
private fun dispatchWithExceptionHandling(context: CoroutineContext) {
265+
try {
266+
dispatcher.dispatch(context, this)
267+
} catch (e: Throwable) {
268+
throw DispatchException(e)
269+
}
270+
}
263271
}
264272

265273
/**

kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt

+13
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ internal abstract class DispatchedTask<in T> internal constructor(
7878
assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
7979
val taskContext = this.taskContext
8080
var fatalException: Throwable? = null
81+
var dispatchException: DispatchException? = null
8182
try {
8283
val delegate = delegate as DispatchedContinuation<T>
8384
val continuation = delegate.continuation
@@ -103,12 +104,17 @@ internal abstract class DispatchedTask<in T> internal constructor(
103104
}
104105
}
105106
}
107+
} catch (e: DispatchException) {
108+
dispatchException = e
106109
} catch (e: Throwable) {
107110
// This instead of runCatching to have nicer stacktrace and debug experience
108111
fatalException = e
109112
} finally {
110113
val result = runCatching { taskContext.afterTask() }
111114
handleFatalException(fatalException, result.exceptionOrNull())
115+
if (dispatchException != null) {
116+
handleCoroutineException(delegate.context, dispatchException.cause!!)
117+
}
112118
}
113119
}
114120

@@ -213,3 +219,10 @@ internal inline fun DispatchedTask<*>.runUnconfinedEventLoop(
213219
internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) {
214220
resumeWith(Result.failure(recoverStackTrace(exception, this)))
215221
}
222+
223+
/**
224+
* This exception holds an exception raised in [CoroutineDispatcher.dispatch] method
225+
*
226+
* @see DispatchedContinuation.dispatchWithExceptionHandling
227+
*/
228+
internal class DispatchException(cause: Throwable) : Exception(cause)

kotlinx-coroutines-core/jvm/test/ExecutorsTest.kt

+75
Original file line numberDiff line numberDiff line change
@@ -119,4 +119,79 @@ class ExecutorsTest : TestBase() {
119119
dispatcher.close()
120120
check(executorService.isShutdown)
121121
}
122+
123+
@Test
124+
fun testEarlyExecutorShutdown() {
125+
runTestExceptionInDispatch(6, { it is RejectedExecutionException }) {
126+
expect(1)
127+
val dispatcher = newSingleThreadContext("Ctx")
128+
launch(dispatcher) {
129+
withContext(Dispatchers.Default) {
130+
expect(2)
131+
delay(100)
132+
expect(4)
133+
}
134+
}
135+
136+
delay(50)
137+
expect(3)
138+
139+
dispatcher.close()
140+
}
141+
}
142+
143+
@Test
144+
fun testExceptionInDispatch() {
145+
runTestExceptionInDispatch(5, { it is TestException }) {
146+
val dispatcher = object : CoroutineDispatcher() {
147+
private var closed = false
148+
override fun dispatch(context: CoroutineContext, block: Runnable) {
149+
if (closed) throw TestException()
150+
Dispatchers.Default.dispatch(context, block)
151+
}
152+
153+
fun close() {
154+
closed = true
155+
}
156+
}
157+
launch(dispatcher) {
158+
withContext(Dispatchers.Default) {
159+
expect(1)
160+
delay(100)
161+
expect(3)
162+
}
163+
}
164+
165+
delay(50)
166+
expect(2)
167+
dispatcher.close()
168+
}
169+
}
170+
171+
private fun runTestExceptionInDispatch(
172+
totalSteps: Int,
173+
isExpectedException: (Throwable) -> Boolean,
174+
block: suspend CoroutineScope.() -> Unit,
175+
) {
176+
var mainThread: Thread? = null
177+
val exceptionHandler = CoroutineExceptionHandler { _, e ->
178+
if (isExpectedException(e)) {
179+
expect(totalSteps - 1)
180+
mainThread!!.run {
181+
interrupt()
182+
unpark(this)
183+
}
184+
} else {
185+
expectUnreached()
186+
}
187+
}
188+
try {
189+
runBlocking(exceptionHandler) {
190+
block()
191+
mainThread = Thread.currentThread()
192+
}
193+
} catch (_: InterruptedException) {
194+
finish(totalSteps)
195+
}
196+
}
122197
}

0 commit comments

Comments
 (0)