Skip to content

Stacktrace recovery #792

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Dec 13, 2018
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ public final class kotlinx/coroutines/CancellableContinuation$DefaultImpls {
public static synthetic fun tryResume$default (Lkotlinx/coroutines/CancellableContinuation;Ljava/lang/Object;Ljava/lang/Object;ILjava/lang/Object;)Ljava/lang/Object;
}

public class kotlinx/coroutines/CancellableContinuationImpl : java/lang/Runnable, kotlinx/coroutines/CancellableContinuation {
public class kotlinx/coroutines/CancellableContinuationImpl : java/lang/Runnable, kotlin/coroutines/jvm/internal/CoroutineStackFrame, kotlinx/coroutines/CancellableContinuation {
public fun <init> (Lkotlin/coroutines/Continuation;I)V
public fun completeResume (Ljava/lang/Object;)V
public fun getCallerFrame ()Lkotlin/coroutines/jvm/internal/CoroutineStackFrame;
public fun getContext ()Lkotlin/coroutines/CoroutineContext;
public fun getStackTraceElement ()Ljava/lang/StackTraceElement;
public fun getSuccessfulResult (Ljava/lang/Object;)Ljava/lang/Object;
public fun initCancellability ()V
protected fun nameString ()Ljava/lang/String;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package kotlinx.coroutines

import kotlinx.atomicfu.*
import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.jvm.*
Expand Down Expand Up @@ -133,7 +134,7 @@ internal abstract class AbstractContinuation<in T>(
if (trySuspend()) return COROUTINE_SUSPENDED
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
val state = this.state
if (state is CompletedExceptionally) throw state.cause
if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this)
return getSuccessfulResult(state)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,15 @@ private class DisposeOnCancel(private val handle: DisposableHandle) : CancelHand
internal open class CancellableContinuationImpl<in T>(
delegate: Continuation<T>,
resumeMode: Int
) : AbstractContinuation<T>(delegate, resumeMode), CancellableContinuation<T>, Runnable {
) : AbstractContinuation<T>(delegate, resumeMode), CancellableContinuation<T>, Runnable, CoroutineStackFrame {

public override val context: CoroutineContext = delegate.context

override val callerFrame: CoroutineStackFrame?
get() = delegate as? CoroutineStackFrame

override fun getStackTraceElement(): StackTraceElement? = null

override fun initCancellability() {
initParentJobInternal(delegate.context[Job])
}
Expand Down
20 changes: 14 additions & 6 deletions common/kotlinx-coroutines-core-common/src/Dispatched.kt
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,12 @@ internal object UndispatchedEventLoop {
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), Continuation<T> by continuation {
) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {
@JvmField
@Suppress("PropertyName")
internal var _state: Any? = UNDEFINED
override val callerFrame: CoroutineStackFrame? = continuation as? CoroutineStackFrame
override fun getStackTraceElement(): StackTraceElement? = null
@JvmField // pre-cached value to avoid ctx.fold on every resumption
internal val countOrElement = threadContextElements(context)

Expand Down Expand Up @@ -167,7 +169,7 @@ internal class DispatchedContinuation<in T>(
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
inline fun resumeUndispatchedWithException(exception: Throwable) {
withCoroutineContext(context, countOrElement) {
continuation.resumeWithException(exception)
continuation.resumeWithStackTrace(exception)
}
}

Expand All @@ -190,7 +192,7 @@ internal fun <T> Continuation<T>.resumeCancellable(value: T) = when (this) {

internal fun <T> Continuation<T>.resumeCancellableWithException(exception: Throwable) = when (this) {
is DispatchedContinuation -> resumeCancellableWithException(exception)
else -> resumeWithException(exception)
else -> resumeWithStackTrace(exception)
}

internal fun <T> Continuation<T>.resumeDirect(value: T) = when (this) {
Expand All @@ -199,8 +201,8 @@ internal fun <T> Continuation<T>.resumeDirect(value: T) = when (this) {
}

internal fun <T> Continuation<T>.resumeDirectWithException(exception: Throwable) = when (this) {
is DispatchedContinuation -> continuation.resumeWithException(exception)
else -> resumeWithException(exception)
is DispatchedContinuation -> continuation.resumeWithStackTrace(exception)
else -> resumeWithStackTrace(exception)
}

internal abstract class DispatchedTask<in T>(
Expand Down Expand Up @@ -231,7 +233,7 @@ internal abstract class DispatchedTask<in T>(
else {
val exception = getExceptionalResult(state)
if (exception != null)
continuation.resumeWithException(exception)
continuation.resumeWithStackTrace(exception)
else
continuation.resume(getSuccessfulResult(state))
}
Expand Down Expand Up @@ -275,3 +277,9 @@ internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, useMode: In
delegate.resumeMode(getSuccessfulResult(state), useMode)
}
}


@Suppress("NOTHING_TO_INLINE")
internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) {
resumeWith(Result.failure(recoverStackTrace(exception, this)))
}
9 changes: 6 additions & 3 deletions common/kotlinx-coroutines-core-common/src/JobSupport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,9 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
val seenExceptions = identitySet<Throwable>(exceptions.size)
var suppressed = false
for (exception in exceptions) {
if (exception !== rootCause && exception !is CancellationException && seenExceptions.add(exception)) {
rootCause.addSuppressedThrowable(exception)
val unwrapped = unwrap(exception)
if (unwrapped !== rootCause && unwrapped !is CancellationException && seenExceptions.add(unwrapped)) {
rootCause.addSuppressedThrowable(unwrapped)
suppressed = true
}
}
Expand Down Expand Up @@ -1078,7 +1079,9 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
val state = this.state
if (state !is Incomplete) {
// already complete -- just return result
if (state is CompletedExceptionally) throw state.cause
if (state is CompletedExceptionally) { // Slow path to recover stacktrace
recoverAndThrow(state.cause)
}
return state

}
Expand Down
6 changes: 4 additions & 2 deletions common/kotlinx-coroutines-core-common/src/Timeout.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package kotlinx.coroutines

import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*
import kotlinx.coroutines.selects.*
import kotlin.coroutines.*
Expand Down Expand Up @@ -80,9 +81,10 @@ private fun <U, T: U> setupTimeout(
private open class TimeoutCoroutine<U, in T: U>(
@JvmField val time: Long,
@JvmField val uCont: Continuation<U> // unintercepted continuation
) : AbstractCoroutine<T>(uCont.context, active = true), Runnable, Continuation<T> {
) : AbstractCoroutine<T>(uCont.context, active = true), Runnable, Continuation<T>, CoroutineStackFrame {
override val defaultResumeMode: Int get() = MODE_DIRECT

override val callerFrame: CoroutineStackFrame? get() = (uCont as? CoroutineStackFrame)?.callerFrame
override fun getStackTraceElement(): StackTraceElement? = (uCont as? CoroutineStackFrame)?.getStackTraceElement()
@Suppress("LeakingThis", "Deprecation")
override fun run() {
cancel(TimeoutCancellationException(time, this))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
result === OFFER_SUCCESS -> true
// We should check for closed token on offer as well, otherwise offer won't be linearizable
// in the face of concurrent close()
result === OFFER_FAILED -> throw closedForSend?.sendException ?: return false
result is Closed<*> -> throw result.sendException
result === OFFER_FAILED -> throw closedForSend?.sendException?.let { recoverStackTrace(it) } ?: return false
result is Closed<*> -> throw recoverStackTrace(result.sendException)
else -> error("offerInternal returned $result")
}
}
Expand Down Expand Up @@ -408,7 +408,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
when {
enqueueResult === ALREADY_SELECTED -> return
enqueueResult === ENQUEUE_FAILED -> {} // retry
enqueueResult is Closed<*> -> throw enqueueResult.sendException
enqueueResult is Closed<*> -> throw recoverStackTrace(enqueueResult.sendException)
else -> error("performAtomicIfNotSelected(TryEnqueueSendDesc) returned $enqueueResult")
}
} else {
Expand All @@ -420,7 +420,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
return
}
offerResult is Closed<*> -> throw offerResult.sendException
offerResult is Closed<*> -> throw recoverStackTrace(offerResult.sendException)
else -> error("offerSelectInternal returned $offerResult")
}
}
Expand Down Expand Up @@ -574,7 +574,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E

@Suppress("UNCHECKED_CAST")
private fun receiveResult(result: Any?): E {
if (result is Closed<*>) throw result.receiveException
if (result is Closed<*>) throw recoverStackTrace(result.receiveException)
return result as E
}

Expand Down Expand Up @@ -620,7 +620,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
@Suppress("UNCHECKED_CAST")
private fun receiveOrNullResult(result: Any?): E? {
if (result is Closed<*>) {
if (result.closeCause != null) throw result.closeCause
if (result.closeCause != null) throw recoverStackTrace(result.closeCause)
return null
}
return result as E
Expand Down Expand Up @@ -759,7 +759,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
when {
pollResult === ALREADY_SELECTED -> return
pollResult === POLL_FAILED -> {} // retry
pollResult is Closed<*> -> throw pollResult.receiveException
pollResult is Closed<*> -> throw recoverStackTrace(pollResult.receiveException)
else -> {
block.startCoroutineUnintercepted(pollResult as E, select.completion)
return
Expand Down Expand Up @@ -798,8 +798,9 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
if (select.trySelect(null))
block.startCoroutineUnintercepted(null, select.completion)
return
} else
throw pollResult.closeCause
} else {
throw recoverStackTrace(pollResult.closeCause)
}
}
else -> {
// selected successfully
Expand Down Expand Up @@ -858,7 +859,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E

private fun hasNextResult(result: Any?): Boolean {
if (result is Closed<*>) {
if (result.closeCause != null) throw result.receiveException
if (result.closeCause != null) recoverStackTrace(throw result.receiveException)
return false
}
return true
Expand Down Expand Up @@ -892,7 +893,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
@Suppress("UNCHECKED_CAST")
override suspend fun next(): E {
val result = this.result
if (result is Closed<*>) throw result.receiveException
if (result is Closed<*>) throw recoverStackTrace(result.receiveException)
if (result !== POLL_FAILED) {
this.result = POLL_FAILED
return result as E
Expand Down Expand Up @@ -944,10 +945,11 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
}

override fun resumeReceiveClosed(closed: Closed<*>) {
val token = if (closed.closeCause == null)
val token = if (closed.closeCause == null) {
cont.tryResume(false)
else
cont.tryResumeWithException(closed.receiveException)
} else {
cont.tryResumeWithException(recoverStackTrace(closed.receiveException, cont))
}
if (token != null) {
iterator.result = closed
cont.completeResume(token)
Expand Down
17 changes: 13 additions & 4 deletions common/kotlinx-coroutines-core-common/src/internal/Scopes.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,27 @@ import kotlin.jvm.*
internal open class ScopeCoroutine<in T>(
context: CoroutineContext,
@JvmField val uCont: Continuation<T> // unintercepted continuation
) : AbstractCoroutine<T>(context, true) {
) : AbstractCoroutine<T>(context, true), CoroutineStackFrame {
final override val callerFrame: CoroutineStackFrame? get() = uCont as CoroutineStackFrame?
final override fun getStackTraceElement(): StackTraceElement? = null
override val defaultResumeMode: Int get() = MODE_DIRECT

@Suppress("UNCHECKED_CAST")
internal override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
if (state is CompletedExceptionally)
uCont.resumeUninterceptedWithExceptionMode(state.cause, mode)
else
if (state is CompletedExceptionally) {
val exception = if (mode == MODE_IGNORE) state.cause else recoverStackTrace(state.cause, uCont)
uCont.resumeUninterceptedWithExceptionMode(exception, mode)
} else {
uCont.resumeUninterceptedMode(state as T, mode)
}
}
}

internal fun AbstractCoroutine<*>.tryRecover(exception: Throwable): Throwable {
val cont = (this as? ScopeCoroutine<*>)?.uCont ?: return exception
return recoverStackTrace(exception, cont)
}

internal class ContextScope(context: CoroutineContext) : CoroutineScope {
override val coroutineContext: CoroutineContext = context
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.internal

import kotlin.coroutines.*

/**
* Tries to recover stacktrace for given [exception] and [continuation].
* Stacktrace recovery tries to restore [continuation] stack frames using its debug metadata with [CoroutineStackFrame] API
* and then reflectively instantiate exception of given type with original exception as a cause and
* sets new stacktrace for wrapping exception.
* Some frames may be missing due to tail-call elimination.
*
* Works only on JVM with enabled debug-mode.
*/
internal expect fun <E: Throwable> recoverStackTrace(exception: E, continuation: Continuation<*>): E

/**
* Tries to recover stacktrace for given [exception]. Used in non-suspendable points of awaiting.
* Stacktrace recovery tries to instantiate exception of given type with original exception as a cause.
* Wrapping exception will have proper stacktrace as it's instantiated in the right context.
*
* Works only on JVM with enabled debug-mode.
*/
internal expect fun <E: Throwable> recoverStackTrace(exception: E): E

// Name conflict with recoverStackTrace
@Suppress("NOTHING_TO_INLINE")
internal expect suspend inline fun recoverAndThrow(exception: Throwable): Nothing

/**
* The opposite of [recoverStackTrace].
* It is guaranteed that `unwrap(recoverStackTrace(e)) === e`
*/
internal expect fun <E: Throwable> unwrap(exception: E): E

expect class StackTraceElement

internal expect interface CoroutineStackFrame {
public val callerFrame: CoroutineStackFrame?
public fun getStackTraceElement(): StackTraceElement?
}

/**
* Marker that indicates that stacktrace of the exception should not be recovered.
* Currently internal, but may become public in the future
*/
internal interface NonRecoverableThrowable
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package kotlinx.coroutines.intrinsics

import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*

Expand Down Expand Up @@ -126,8 +127,8 @@ private inline fun <T> AbstractCoroutine<T>.undispatchedResult(
val state = state
if (state is CompletedExceptionally) {
when {
shouldThrow(state.cause) -> throw state.cause
result is CompletedExceptionally -> throw result.cause
shouldThrow(state.cause) -> throw tryRecover(state.cause)
result is CompletedExceptionally -> throw tryRecover(result.cause)
else -> result
}
} else {
Expand Down
13 changes: 7 additions & 6 deletions common/kotlinx-coroutines-core-common/test/TestBase.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package kotlinx.coroutines

import kotlin.coroutines.*
import kotlinx.coroutines.internal.*

public expect open class TestBase constructor() {
public val isStressTest: Boolean
Expand All @@ -23,13 +24,13 @@ public expect open class TestBase constructor() {
)
}

public class TestException(message: String? = null) : Throwable(message)
public class TestException1(message: String? = null) : Throwable(message)
public class TestException2(message: String? = null) : Throwable(message)
public class TestException3(message: String? = null) : Throwable(message)
public class TestRuntimeException(message: String? = null) : RuntimeException(message)
public class TestException(message: String? = null) : Throwable(message), NonRecoverableThrowable
public class TestException1(message: String? = null) : Throwable(message), NonRecoverableThrowable
public class TestException2(message: String? = null) : Throwable(message), NonRecoverableThrowable
public class TestException3(message: String? = null) : Throwable(message), NonRecoverableThrowable
public class TestRuntimeException(message: String? = null) : RuntimeException(message), NonRecoverableThrowable
public class RecoverableTestException(message: String? = null) : RuntimeException(message)

// Wrap context to avoid fast-paths on dispatcher comparison
public fun wrapperDispatcher(context: CoroutineContext): CoroutineContext {
val dispatcher = context[ContinuationInterceptor] as CoroutineDispatcher
return object : CoroutineDispatcher() {
Expand Down
Loading