Skip to content

Commit 4e380ed

Browse files
committed
Handle fatal exceptions in continuation machinery using handleViaException instead of rethrowing it to provide faster feedback loop
Fixes #808 Fixes #773
1 parent 338b231 commit 4e380ed

File tree

3 files changed

+176
-8
lines changed

3 files changed

+176
-8
lines changed

kotlinx-coroutines-core/common/src/Dispatched.kt

+39-6
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ private fun DispatchedTask<*>.resumeUnconfined() {
5151
}
5252
}
5353

54-
private inline fun runUnconfinedEventLoop(
54+
private inline fun DispatchedTask<*>.runUnconfinedEventLoop(
5555
eventLoop: EventLoop,
5656
block: () -> Unit
5757
) {
@@ -64,10 +64,10 @@ private inline fun runUnconfinedEventLoop(
6464
}
6565
} catch (e: Throwable) {
6666
/*
67-
* This exception doesn't happen normally, only if user either submitted throwing runnable
68-
* or if we have a bug in implementation. Throw an exception that better explains the problem.
67+
* This exception doesn't happen normally, only if we have a bug in implementation.
68+
* Report it as a fatal exception.
6969
*/
70-
throw DispatchException("Unexpected exception in unconfined event loop", e)
70+
handleFatalException(e, null)
7171
} finally {
7272
eventLoop.decrementUseCount(unconfined = true)
7373
}
@@ -216,6 +216,7 @@ internal abstract class DispatchedTask<in T>(
216216

217217
public final override fun run() {
218218
val taskContext = this.taskContext
219+
var exception: Throwable? = null
219220
try {
220221
val delegate = delegate as DispatchedContinuation<T>
221222
val continuation = delegate.continuation
@@ -234,11 +235,43 @@ internal abstract class DispatchedTask<in T>(
234235
}
235236
}
236237
} catch (e: Throwable) {
237-
throw DispatchException("Unexpected exception running $this", e)
238+
// This instead of runCatching to have nicer stacktrace and debug experience
239+
exception = e
238240
} finally {
239-
taskContext.afterTask()
241+
val result = runCatching { taskContext.afterTask() }
242+
handleFatalException(exception, result.exceptionOrNull())
240243
}
241244
}
245+
246+
/**
247+
* Machinery that handles fatal exceptions in kotlinx.coroutines.
248+
* There are two kinds of fatal exceptions:
249+
*
250+
* 1) Exceptions from kotlinx.coroutines code. Such exceptions indicate that either
251+
* the library or the compiler has a bug that breaks internal invariants.
252+
* They usually have specific workarounds, but require careful study of the cause and should
253+
* be reported to the maintainers and fixed on the library's side anyway.
254+
*
255+
* 2) Exceptions from [ThreadContextElement.updateThreadContext] and [ThreadContextElement.restoreThreadContext].
256+
* While a user code can trigger such exception by providing an improper implementation of [ThreadContextElement],
257+
* we can't ignore it because it may leave coroutine in the inconsistent state.
258+
* If you encounter such exception, you can either disable this context element or wrap it into
259+
* another context element that catches all exceptions and handles it in the application specific manner.
260+
*
261+
* Fatal exception handling can be intercepted with [CoroutineExceptionHandler] element in the context of
262+
* a failed coroutine, but such exceptions should be reported anyway.
263+
*/
264+
internal fun handleFatalException(exception: Throwable?, finallyException: Throwable?) {
265+
if (exception === null && finallyException === null) return
266+
if (exception !== null && finallyException !== null) {
267+
exception.addSuppressedThrowable(finallyException)
268+
}
269+
270+
val cause = exception ?: finallyException
271+
val reason = DispatchException("Fatal exception in coroutines machinery for $this. " +
272+
"Please read KDoc to 'handleFatalException' method and report this incident to maintainers", cause!!)
273+
handleExceptionViaHandler(this.delegate.context, reason)
274+
}
242275
}
243276

244277
internal fun DispatchedContinuation<Unit>.yieldUndispatched(): Boolean =

kotlinx-coroutines-core/jvm/src/ThreadContextElement.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public interface ThreadContextElement<S> : CoroutineContext.Element {
5656
* when the context of the coroutine this element.
5757
* The result of this function is the old value of the thread-local state that will be passed to [restoreThreadContext].
5858
* This method should handle its own exceptions and do not rethrow it. Thrown exceptions will leave coroutine which
59-
* context is updated in an undefined state.
59+
* context is updated in an undefined state and may crash an application.
6060
*
6161
* @param context the coroutine context.
6262
*/
@@ -69,7 +69,7 @@ public interface ThreadContextElement<S> : CoroutineContext.Element {
6969
* The value of [oldState] is the result of the previous invocation of [updateThreadContext] and it should
7070
* be restored in the thread-local state by this function.
7171
* This method should handle its own exceptions and do not rethrow it. Thrown exceptions will leave coroutine which
72-
* context is updated in an undefined state.
72+
* context is updated in an undefined state and may crash an application.
7373
*
7474
* @param context the coroutine context.
7575
* @param oldState the value returned by the previous invocation of [updateThreadContext].
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import org.junit.*
8+
import org.junit.Test
9+
import org.junit.runner.*
10+
import org.junit.runners.*
11+
import java.util.concurrent.*
12+
import kotlin.coroutines.*
13+
import kotlin.test.*
14+
15+
@RunWith(Parameterized::class)
16+
class FailingCoroutinesMachineryTest(
17+
private val element: CoroutineContext.Element,
18+
private val dispatcher: CoroutineDispatcher
19+
) : TestBase() {
20+
21+
private var caught: Throwable? = null
22+
private val latch = CountDownLatch(1)
23+
private var exceptionHandler = CoroutineExceptionHandler { _, t -> caught = t;latch.countDown() }
24+
private val lazyOuterDispatcher = lazy { newFixedThreadPoolContext(1, "") }
25+
26+
private object FailingUpdate : ThreadContextElement<Unit> {
27+
private object Key : CoroutineContext.Key<MyElement>
28+
29+
override val key: CoroutineContext.Key<*> get() = Key
30+
31+
override fun restoreThreadContext(context: CoroutineContext, oldState: Unit) {
32+
}
33+
34+
override fun updateThreadContext(context: CoroutineContext) {
35+
throw TestException("Prevent a coroutine from starting right here for some reason")
36+
}
37+
38+
override fun toString() = "FailingUpdate"
39+
}
40+
41+
private object FailingRestore : ThreadContextElement<Unit> {
42+
private object Key : CoroutineContext.Key<MyElement>
43+
44+
override val key: CoroutineContext.Key<*> get() = Key
45+
46+
override fun restoreThreadContext(context: CoroutineContext, oldState: Unit) {
47+
throw TestException("Prevent a coroutine from starting right here for some reason")
48+
}
49+
50+
override fun updateThreadContext(context: CoroutineContext) {
51+
}
52+
53+
override fun toString() = "FailingRestore"
54+
}
55+
56+
private object ThrowingDispatcher : CoroutineDispatcher() {
57+
override fun dispatch(context: CoroutineContext, block: Runnable) {
58+
throw TestException()
59+
}
60+
61+
override fun toString() = "ThrowingDispatcher"
62+
}
63+
64+
private object ThrowingDispatcher2 : CoroutineDispatcher() {
65+
override fun dispatch(context: CoroutineContext, block: Runnable) {
66+
block.run()
67+
}
68+
69+
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
70+
throw TestException()
71+
}
72+
73+
override fun toString() = "ThrowingDispatcher2"
74+
}
75+
76+
@After
77+
fun tearDown() {
78+
runCatching { (dispatcher as? ExecutorCoroutineDispatcher)?.close() }
79+
if (lazyOuterDispatcher.isInitialized()) lazyOuterDispatcher.value.close()
80+
}
81+
82+
companion object {
83+
@JvmStatic
84+
@Parameterized.Parameters(name = "Element: {0}, dispatcher: {1}")
85+
fun dispatchers(): List<Array<Any>> {
86+
val elements = listOf<Any>(FailingRestore, FailingUpdate)
87+
val dispatchers = listOf<Any>(
88+
Dispatchers.Unconfined,
89+
Dispatchers.Default,
90+
Executors.newFixedThreadPool(1).asCoroutineDispatcher(),
91+
Executors.newScheduledThreadPool(1).asCoroutineDispatcher(),
92+
ThrowingDispatcher, ThrowingDispatcher2
93+
)
94+
95+
return elements.flatMap { element ->
96+
dispatchers.map { dispatcher ->
97+
arrayOf(element, dispatcher)
98+
}
99+
}
100+
}
101+
}
102+
103+
@Test
104+
fun testElement() = runTest {
105+
launch(NonCancellable + dispatcher + exceptionHandler + element) {}
106+
checkException()
107+
}
108+
109+
@Test
110+
fun testNestedElement() = runTest {
111+
launch(NonCancellable + dispatcher + exceptionHandler) {
112+
launch(element) { }
113+
}
114+
checkException()
115+
}
116+
117+
@Test
118+
fun testNestedDispatcherAndElement() = runTest {
119+
launch(lazyOuterDispatcher.value + NonCancellable + exceptionHandler) {
120+
launch(element + dispatcher) { }
121+
}
122+
checkException()
123+
}
124+
125+
private fun checkException() {
126+
latch.await(2, TimeUnit.SECONDS)
127+
val e = caught
128+
assertNotNull(e)
129+
// First condition -- failure in context element
130+
val firstCondition = e is DispatchException && e.cause is TestException
131+
// Second condition -- failure from isDispatchNeeded (#880)
132+
val secondCondition = e is TestException
133+
assertTrue(firstCondition xor secondCondition)
134+
}
135+
}

0 commit comments

Comments
 (0)