Skip to content

Commit 83b5474

Browse files
committed
Basic exception stacktrace recovery mechanism in JobSupport, ScopeCoroutine and AbstractChannel
* Implement CoroutineStackFrame in CancellableContinuationImpl, DispatchedContinuation and ScopeCoroutine * On coroutine resumption try to reflectively instantiate exception instance of the same type, but with augmented stacktrace * Recover stacktrace by walking over CoroutineStackFrame * Recover stacktrace on fast-path exceptions without CoroutineStackFrame walking to provide more context to an exception * Unwrap exceptions when doing aggregation in JobSupport Fixes #493
1 parent 0b886a3 commit 83b5474

File tree

14 files changed

+564
-77
lines changed

14 files changed

+564
-77
lines changed

common/kotlinx-coroutines-core-common/src/CancellableContinuation.kt

+6-1
Original file line numberDiff line numberDiff line change
@@ -216,10 +216,15 @@ private class DisposeOnCancel(private val handle: DisposableHandle) : CancelHand
216216
internal open class CancellableContinuationImpl<in T>(
217217
delegate: Continuation<T>,
218218
resumeMode: Int
219-
) : AbstractContinuation<T>(delegate, resumeMode), CancellableContinuation<T>, Runnable {
219+
) : AbstractContinuation<T>(delegate, resumeMode), CancellableContinuation<T>, Runnable, CoroutineStackFrame {
220220

221221
public override val context: CoroutineContext = delegate.context
222222

223+
override val callerFrame: CoroutineStackFrame?
224+
get() = delegate as? CoroutineStackFrame
225+
226+
override fun getStackTraceElement(): StackTraceElement? = null
227+
223228
override fun initCancellability() {
224229
initParentJobInternal(delegate.context[Job])
225230
}

common/kotlinx-coroutines-core-common/src/Dispatched.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,13 @@ internal object UndispatchedEventLoop {
8181
internal class DispatchedContinuation<in T>(
8282
@JvmField val dispatcher: CoroutineDispatcher,
8383
@JvmField val continuation: Continuation<T>
84-
) : Continuation<T> by continuation, DispatchedTask<T> {
84+
) : Continuation<T> by continuation, DispatchedTask<T>, CoroutineStackFrame {
8585
@JvmField
8686
@Suppress("PropertyName")
8787
internal var _state: Any? = UNDEFINED
8888
public override var resumeMode: Int = 0
89+
override val callerFrame: CoroutineStackFrame? = continuation as? CoroutineStackFrame
90+
override fun getStackTraceElement(): StackTraceElement? = null
8991
@JvmField // pre-cached value to avoid ctx.fold on every resumption
9092
internal val countOrElement = threadContextElements(context)
9193

common/kotlinx-coroutines-core-common/src/JobSupport.kt

+6-4
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,10 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
242242
val seenExceptions = identitySet<Throwable>(exceptions.size)
243243
var suppressed = false
244244
for (exception in exceptions) {
245-
if (exception !== rootCause && exception !is CancellationException && seenExceptions.add(exception)) {
246-
rootCause.addSuppressedThrowable(exception)
245+
// Unwrap original exception from one with recovered stacktrace
246+
val unwrapped = unwrap(exception)
247+
if (unwrapped !== rootCause && unwrapped !is CancellationException && seenExceptions.add(exception)) {
248+
rootCause.addSuppressedThrowable(unwrapped)
247249
suppressed = true
248250
}
249251
}
@@ -1077,7 +1079,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
10771079
val state = this.state
10781080
if (state !is Incomplete) {
10791081
// already complete -- just return result
1080-
if (state is CompletedExceptionally) throw state.cause
1082+
if (state is CompletedExceptionally) throw recoverStackTrace(state.cause)
10811083
return state
10821084

10831085
}
@@ -1233,7 +1235,7 @@ private class ResumeAwaitOnCompletion<T>(
12331235
check(state !is Incomplete)
12341236
if (state is CompletedExceptionally) {
12351237
// Resume with exception in atomic way to preserve exception
1236-
continuation.resumeWithExceptionMode(state.cause, MODE_ATOMIC_DEFAULT)
1238+
continuation.resumeWithExceptionMode(recoverStackTrace(state.cause, continuation), MODE_ATOMIC_DEFAULT)
12371239
} else {
12381240
// Resuming with value in a cancellable way (AwaitContinuation is configured for this mode).
12391241
@Suppress("UNCHECKED_CAST")

common/kotlinx-coroutines-core-common/src/channels/AbstractChannel.kt

+23-21
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,8 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
176176
result === OFFER_SUCCESS -> true
177177
// We should check for closed token on offer as well, otherwise offer won't be linearizable
178178
// in the face of concurrent close()
179-
result === OFFER_FAILED -> throw closedForSend?.sendException ?: return false
180-
result is Closed<*> -> throw result.sendException
179+
result === OFFER_FAILED -> throw closedForSend?.sendException?.let { recoverStackTrace(it) } ?: return false
180+
result is Closed<*> -> throw recoverStackTrace(result.sendException)
181181
else -> error("offerInternal returned $result")
182182
}
183183
}
@@ -194,7 +194,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
194194
}
195195
is Closed<*> -> {
196196
helpClose(enqueueResult)
197-
cont.resumeWithException(enqueueResult.sendException)
197+
cont.resumeWithStackTrace(enqueueResult.sendException)
198198
return@sc
199199
}
200200
}
@@ -208,7 +208,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
208208
offerResult === OFFER_FAILED -> continue@loop
209209
offerResult is Closed<*> -> {
210210
helpClose(offerResult)
211-
cont.resumeWithException(offerResult.sendException)
211+
cont.resumeWithStackTrace(offerResult.sendException)
212212
return@sc
213213
}
214214
else -> error("offerInternal returned $offerResult")
@@ -408,7 +408,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
408408
when {
409409
enqueueResult === ALREADY_SELECTED -> return
410410
enqueueResult === ENQUEUE_FAILED -> {} // retry
411-
enqueueResult is Closed<*> -> throw enqueueResult.sendException
411+
enqueueResult is Closed<*> -> throw recoverStackTrace(enqueueResult.sendException)
412412
else -> error("performAtomicIfNotSelected(TryEnqueueSendDesc) returned $enqueueResult")
413413
}
414414
} else {
@@ -420,7 +420,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
420420
block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
421421
return
422422
}
423-
offerResult is Closed<*> -> throw offerResult.sendException
423+
offerResult is Closed<*> -> throw recoverStackTrace(offerResult.sendException)
424424
else -> error("offerSelectInternal returned $offerResult")
425425
}
426426
}
@@ -574,7 +574,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
574574

575575
@Suppress("UNCHECKED_CAST")
576576
private fun receiveResult(result: Any?): E {
577-
if (result is Closed<*>) throw result.receiveException
577+
if (result is Closed<*>) throw recoverStackTrace(result.receiveException)
578578
return result as E
579579
}
580580

@@ -590,7 +590,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
590590
// hm... something is not right. try to poll
591591
val result = pollInternal()
592592
if (result is Closed<*>) {
593-
cont.resumeWithException(result.receiveException)
593+
cont.resumeWithStackTrace(result.receiveException)
594594
return@sc
595595
}
596596
if (result !== POLL_FAILED) {
@@ -620,7 +620,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
620620
@Suppress("UNCHECKED_CAST")
621621
private fun receiveOrNullResult(result: Any?): E? {
622622
if (result is Closed<*>) {
623-
if (result.closeCause != null) throw result.closeCause
623+
if (result.closeCause != null) throw recoverStackTrace(result.closeCause)
624624
return null
625625
}
626626
return result as E
@@ -641,7 +641,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
641641
if (result.closeCause == null)
642642
cont.resume(null)
643643
else
644-
cont.resumeWithException(result.closeCause)
644+
cont.resumeWithStackTrace(result.closeCause)
645645
return@sc
646646
}
647647
if (result !== POLL_FAILED) {
@@ -758,7 +758,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
758758
when {
759759
pollResult === ALREADY_SELECTED -> return
760760
pollResult === POLL_FAILED -> {} // retry
761-
pollResult is Closed<*> -> throw pollResult.receiveException
761+
pollResult is Closed<*> -> throw recoverStackTrace(pollResult.receiveException)
762762
else -> {
763763
block.startCoroutineUnintercepted(pollResult as E, select.completion)
764764
return
@@ -797,8 +797,9 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
797797
if (select.trySelect(null))
798798
block.startCoroutineUnintercepted(null, select.completion)
799799
return
800-
} else
801-
throw pollResult.closeCause
800+
} else {
801+
throw recoverStackTrace(pollResult.closeCause)
802+
}
802803
}
803804
else -> {
804805
// selected successfully
@@ -857,7 +858,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
857858

858859
private fun hasNextResult(result: Any?): Boolean {
859860
if (result is Closed<*>) {
860-
if (result.closeCause != null) throw result.receiveException
861+
if (result.closeCause != null) recoverStackTrace(throw result.receiveException)
861862
return false
862863
}
863864
return true
@@ -878,7 +879,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
878879
if (result.closeCause == null)
879880
cont.resume(false)
880881
else
881-
cont.resumeWithException(result.receiveException)
882+
cont.resumeWithStackTrace(result.receiveException)
882883
return@sc
883884
}
884885
if (result !== POLL_FAILED) {
@@ -891,7 +892,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
891892
@Suppress("UNCHECKED_CAST")
892893
override suspend fun next(): E {
893894
val result = this.result
894-
if (result is Closed<*>) throw result.receiveException
895+
if (result is Closed<*>) throw recoverStackTrace(result.receiveException)
895896
if (result !== POLL_FAILED) {
896897
this.result = POLL_FAILED
897898
return result as E
@@ -911,7 +912,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
911912
if (closed.closeCause == null && nullOnClose)
912913
cont.resume(null)
913914
else
914-
cont.resumeWithException(closed.receiveException)
915+
cont.resumeWithStackTrace(closed.receiveException)
915916
}
916917
override fun toString(): String = "ReceiveElement[$cont,nullOnClose=$nullOnClose]"
917918
}
@@ -943,10 +944,11 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
943944
}
944945

945946
override fun resumeReceiveClosed(closed: Closed<*>) {
946-
val token = if (closed.closeCause == null)
947+
val token = if (closed.closeCause == null) {
947948
cont.tryResume(false)
948-
else
949-
cont.tryResumeWithException(closed.receiveException)
949+
} else {
950+
cont.tryResumeWithException(recoverStackTrace(closed.receiveException, cont))
951+
}
950952
if (token != null) {
951953
iterator.result = closed
952954
cont.completeResume(token)
@@ -1056,7 +1058,7 @@ internal class SendElement(
10561058
) : LockFreeLinkedListNode(), Send {
10571059
override fun tryResumeSend(idempotent: Any?): Any? = cont.tryResume(Unit, idempotent)
10581060
override fun completeResumeSend(token: Any) = cont.completeResume(token)
1059-
override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
1061+
override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithStackTrace(closed.sendException)
10601062
override fun toString(): String = "SendElement($pollResult)[$cont]"
10611063
}
10621064

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
7+
import kotlin.coroutines.*
8+
9+
/**
10+
* Tries to recover stacktrace for given [exception] and [continuation].
11+
* Stacktrace recovery tries to restore [continuation] stack frames using its debug metadata with [CoroutineStackFrame] API
12+
* and then reflectively instantiate exception of given type with original exception as a cause and
13+
* sets new stacktrace for wrapping exception.
14+
* Some frames may be missing due to tail-call elimination.
15+
*
16+
* Works only on JVM with enabled debug-mode.
17+
*/
18+
internal expect fun <E: Throwable> recoverStackTrace(exception: E, continuation: Continuation<*>): E
19+
20+
/**
21+
* Tries to recover stacktrace for given [exception]. Used in non-suspendable points of awaiting.
22+
* Stacktrace recovery tries to instantiate exception of given type with original exception as a cause.
23+
* Wrapping exception will have proper stacktrace as it's instantiated in the right context.
24+
*
25+
* Works only on JVM with enabled debug-mode.
26+
*/
27+
internal expect fun <E: Throwable> recoverStackTrace(exception: E): E
28+
29+
/**
30+
* The opposite of [recoverStackTrace].
31+
* It is guaranteed that `unwrap(recoverStackTrace(e)) === e`
32+
*/
33+
internal expect fun <E: Throwable> unwrap(exception: E): E
34+
35+
@Suppress("NOTHING_TO_INLINE")
36+
internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) = resumeWith(Result.failure(recoverStackTrace(exception, this)))
37+
38+
expect class StackTraceElement
39+
40+
internal expect interface CoroutineStackFrame {
41+
public val callerFrame: CoroutineStackFrame?
42+
public fun getStackTraceElement(): StackTraceElement?
43+
}
44+
45+
/**
46+
* Marker that indicates that stacktrace of the exception should not be recovered.
47+
* Currently internal, but may become public in the future
48+
*/
49+
internal interface NonRecoverableThrowable

common/kotlinx-coroutines-core-common/src/internal/Scopes.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ import kotlin.jvm.*
1414
internal open class ScopeCoroutine<in T>(
1515
context: CoroutineContext,
1616
@JvmField val uCont: Continuation<T> // unintercepted continuation
17-
) : AbstractCoroutine<T>(context, true) {
17+
) : AbstractCoroutine<T>(context, true), CoroutineStackFrame {
18+
final override val callerFrame: CoroutineStackFrame? get() = uCont as CoroutineStackFrame?
19+
final override fun getStackTraceElement(): StackTraceElement? = null
1820
override val defaultResumeMode: Int get() = MODE_DIRECT
1921

2022
@Suppress("UNCHECKED_CAST")

common/kotlinx-coroutines-core-common/test/TestBase.common.kt

+8-5
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
package kotlinx.coroutines
66

7+
import kotlinx.coroutines.internal.*
8+
79
public expect open class TestBase constructor() {
810
public val isStressTest: Boolean
911
public val stressTestMultiplier: Int
@@ -21,8 +23,9 @@ public expect open class TestBase constructor() {
2123
)
2224
}
2325

24-
public class TestException(message: String? = null) : Throwable(message)
25-
public class TestException1(message: String? = null) : Throwable(message)
26-
public class TestException2(message: String? = null) : Throwable(message)
27-
public class TestException3(message: String? = null) : Throwable(message)
28-
public class TestRuntimeException(message: String? = null) : RuntimeException(message)
26+
public class TestException(message: String? = null) : Throwable(message), NonRecoverableThrowable
27+
public class TestException1(message: String? = null) : Throwable(message), NonRecoverableThrowable
28+
public class TestException2(message: String? = null) : Throwable(message), NonRecoverableThrowable
29+
public class TestException3(message: String? = null) : Throwable(message), NonRecoverableThrowable
30+
public class TestRuntimeException(message: String? = null) : RuntimeException(message), NonRecoverableThrowable
31+
public class RecoverableTestException(message: String? = null) : Throwable(message)

0 commit comments

Comments
 (0)