Skip to content

Stacktrace recovery #792

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Dec 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ GlobalScope.launch {
* [core](core/README.md) — Kotlin/JVM implementation of common coroutines with additional features:
* `Dispatchers.IO` dispatcher for blocking coroutines;
* `Executor.asCoroutineDispatcher()` extension, custom thread pools, and more.
* [debug](core/README.md) — debug utilities for coroutines.
* `DebugProbes` API to probe, keep track of, print and dump active coroutines.
* [js](js/README.md) — Kotlin/JS implementation of common coroutines with `Promise` support.
* [native](native/README.md) — Kotlin/Native implementation of common coroutines with `runBlocking` single-threaded event loop.
* [reactive](reactive/README.md) — modules that provide builders and iteration support for various reactive streams libraries:
Expand Down
2 changes: 1 addition & 1 deletion RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ To release new `<version>` of `kotlinx-coroutines`:
`git merge origin/master`

4. Search & replace `<old-version>` with `<version>` across the project files. Should replace in:
* [`README.md`](README.md)
* [`README.md`](README.md) (native, core, test, debug, modules)
* [`coroutines-guide.md`](docs/coroutines-guide.md)
* [`gradle.properties`](gradle.properties)
* [`ui/kotlinx-coroutines-android/example-app/gradle.properties`](ui/kotlinx-coroutines-android/example-app/gradle.properties)
Expand Down
1 change: 1 addition & 0 deletions binary-compatibility-validator/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies {
testCompile "org.jetbrains.kotlin:kotlin-test-junit:$kotlin_version"

testArtifacts project(':kotlinx-coroutines-core')
testArtifacts project(':kotlinx-coroutines-debug')

testArtifacts project(':kotlinx-coroutines-reactive')
testArtifacts project(':kotlinx-coroutines-reactor')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ public final class kotlinx/coroutines/CancellableContinuation$DefaultImpls {
public static synthetic fun tryResume$default (Lkotlinx/coroutines/CancellableContinuation;Ljava/lang/Object;Ljava/lang/Object;ILjava/lang/Object;)Ljava/lang/Object;
}

public class kotlinx/coroutines/CancellableContinuationImpl : java/lang/Runnable, kotlinx/coroutines/CancellableContinuation {
public class kotlinx/coroutines/CancellableContinuationImpl : java/lang/Runnable, kotlin/coroutines/jvm/internal/CoroutineStackFrame, kotlinx/coroutines/CancellableContinuation {
public fun <init> (Lkotlin/coroutines/Continuation;I)V
public fun completeResume (Ljava/lang/Object;)V
public fun getCallerFrame ()Lkotlin/coroutines/jvm/internal/CoroutineStackFrame;
public fun getContext ()Lkotlin/coroutines/CoroutineContext;
public fun getStackTraceElement ()Ljava/lang/StackTraceElement;
public fun getSuccessfulResult (Ljava/lang/Object;)Ljava/lang/Object;
public fun initCancellability ()V
protected fun nameString ()Ljava/lang/String;
Expand Down Expand Up @@ -347,6 +349,7 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin
public fun plus (Lkotlinx/coroutines/Job;)Lkotlinx/coroutines/Job;
public final fun registerSelectClause0 (Lkotlinx/coroutines/selects/SelectInstance;Lkotlin/jvm/functions/Function1;)V
public final fun start ()Z
public final fun toDebugString ()Ljava/lang/String;
public fun toString ()Ljava/lang/String;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
public final class kotlinx/coroutines/debug/CoroutineState {
public final fun component1 ()Lkotlin/coroutines/Continuation;
public final fun copy (Lkotlin/coroutines/Continuation;Lkotlin/coroutines/jvm/internal/CoroutineStackFrame;J)Lkotlinx/coroutines/debug/CoroutineState;
public static synthetic fun copy$default (Lkotlinx/coroutines/debug/CoroutineState;Lkotlin/coroutines/Continuation;Lkotlin/coroutines/jvm/internal/CoroutineStackFrame;JILjava/lang/Object;)Lkotlinx/coroutines/debug/CoroutineState;
public fun equals (Ljava/lang/Object;)Z
public final fun getContinuation ()Lkotlin/coroutines/Continuation;
public final fun getCreationStackTrace ()Ljava/util/List;
public final fun getJob ()Lkotlinx/coroutines/Job;
public final fun getJobOrNull ()Lkotlinx/coroutines/Job;
public final fun getState ()Lkotlinx/coroutines/debug/State;
public fun hashCode ()I
public final fun lastObservedStackTrace ()Ljava/util/List;
public fun toString ()Ljava/lang/String;
}

public final class kotlinx/coroutines/debug/DebugProbes {
public static final field INSTANCE Lkotlinx/coroutines/debug/DebugProbes;
public final fun dumpCoroutines (Ljava/io/PrintStream;)V
public static synthetic fun dumpCoroutines$default (Lkotlinx/coroutines/debug/DebugProbes;Ljava/io/PrintStream;ILjava/lang/Object;)V
public final fun dumpCoroutinesState ()Ljava/util/List;
public final fun getSanitizeStackTraces ()Z
public final fun hierarchyToString (Lkotlinx/coroutines/Job;)Ljava/lang/String;
public final fun install ()V
public final fun printHierarchy (Lkotlinx/coroutines/Job;Ljava/io/PrintStream;)V
public static synthetic fun printHierarchy$default (Lkotlinx/coroutines/debug/DebugProbes;Lkotlinx/coroutines/Job;Ljava/io/PrintStream;ILjava/lang/Object;)V
public final fun setSanitizeStackTraces (Z)V
public final fun uninstall ()V
public final fun withDebugProbes (Lkotlin/jvm/functions/Function0;)V
}

public final class kotlinx/coroutines/debug/State : java/lang/Enum {
public static final field CREATED Lkotlinx/coroutines/debug/State;
public static final field RUNNING Lkotlinx/coroutines/debug/State;
public static final field SUSPENDED Lkotlinx/coroutines/debug/State;
public static fun valueOf (Ljava/lang/String;)Lkotlinx/coroutines/debug/State;
public static fun values ()[Lkotlinx/coroutines/debug/State;
}

2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ buildscript {
classpath "com.moowork.gradle:gradle-node-plugin:$gradle_node_version"

// JMH plugins
classpath "com.github.jengelman.gradle.plugins:shadow:2.0.2"
classpath "com.github.jengelman.gradle.plugins:shadow:4.0.2"
classpath "me.champeau.gradle:jmh-gradle-plugin:0.4.7"
classpath "net.ltgt.gradle:gradle-apt-plugin:0.10"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ internal abstract class AbstractContinuation<in T>(
if (trySuspend()) return COROUTINE_SUSPENDED
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
val state = this.state
if (state is CompletedExceptionally) throw state.cause
if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this)
return getSuccessfulResult(state)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,15 @@ private class DisposeOnCancel(private val handle: DisposableHandle) : CancelHand
internal open class CancellableContinuationImpl<in T>(
delegate: Continuation<T>,
resumeMode: Int
) : AbstractContinuation<T>(delegate, resumeMode), CancellableContinuation<T>, Runnable {
) : AbstractContinuation<T>(delegate, resumeMode), CancellableContinuation<T>, Runnable, CoroutineStackFrame {

public override val context: CoroutineContext = delegate.context

override val callerFrame: CoroutineStackFrame?
get() = delegate as? CoroutineStackFrame

override fun getStackTraceElement(): StackTraceElement? = null

override fun initCancellability() {
initParentJobInternal(delegate.context[Job])
}
Expand Down
20 changes: 14 additions & 6 deletions common/kotlinx-coroutines-core-common/src/Dispatched.kt
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,12 @@ internal object UndispatchedEventLoop {
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), Continuation<T> by continuation {
) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {
@JvmField
@Suppress("PropertyName")
internal var _state: Any? = UNDEFINED
override val callerFrame: CoroutineStackFrame? = continuation as? CoroutineStackFrame
override fun getStackTraceElement(): StackTraceElement? = null
@JvmField // pre-cached value to avoid ctx.fold on every resumption
internal val countOrElement = threadContextElements(context)

Expand Down Expand Up @@ -168,7 +170,7 @@ internal class DispatchedContinuation<in T>(
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
inline fun resumeUndispatchedWithException(exception: Throwable) {
withCoroutineContext(context, countOrElement) {
continuation.resumeWithException(exception)
continuation.resumeWithStackTrace(exception)
}
}

Expand All @@ -191,7 +193,7 @@ internal fun <T> Continuation<T>.resumeCancellable(value: T) = when (this) {

internal fun <T> Continuation<T>.resumeCancellableWithException(exception: Throwable) = when (this) {
is DispatchedContinuation -> resumeCancellableWithException(exception)
else -> resumeWithException(exception)
else -> resumeWithStackTrace(exception)
}

internal fun <T> Continuation<T>.resumeDirect(value: T) = when (this) {
Expand All @@ -200,8 +202,8 @@ internal fun <T> Continuation<T>.resumeDirect(value: T) = when (this) {
}

internal fun <T> Continuation<T>.resumeDirectWithException(exception: Throwable) = when (this) {
is DispatchedContinuation -> continuation.resumeWithException(exception)
else -> resumeWithException(exception)
is DispatchedContinuation -> continuation.resumeWithStackTrace(exception)
else -> resumeWithStackTrace(exception)
}

internal abstract class DispatchedTask<in T>(
Expand Down Expand Up @@ -232,7 +234,7 @@ internal abstract class DispatchedTask<in T>(
else {
val exception = getExceptionalResult(state)
if (exception != null)
continuation.resumeWithException(exception)
continuation.resumeWithStackTrace(exception)
else
continuation.resume(getSuccessfulResult(state))
}
Expand Down Expand Up @@ -276,3 +278,9 @@ internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, useMode: In
delegate.resumeMode(getSuccessfulResult(state), useMode)
}
}


@Suppress("NOTHING_TO_INLINE")
internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) {
resumeWith(Result.failure(recoverStackTrace(exception, this)))
}
14 changes: 10 additions & 4 deletions common/kotlinx-coroutines-core-common/src/JobSupport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,9 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
val seenExceptions = identitySet<Throwable>(exceptions.size)
var suppressed = false
for (exception in exceptions) {
if (exception !== rootCause && exception !is CancellationException && seenExceptions.add(exception)) {
rootCause.addSuppressedThrowable(exception)
val unwrapped = unwrap(exception)
if (unwrapped !== rootCause && unwrapped !is CancellationException && seenExceptions.add(unwrapped)) {
rootCause.addSuppressedThrowable(unwrapped)
suppressed = true
}
}
Expand Down Expand Up @@ -929,7 +930,10 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren

// for nicer debugging
public override fun toString(): String =
"${nameString()}{${stateString(state)}}@$hexAddress"
"${toDebugString()}@$hexAddress"

@InternalCoroutinesApi
public fun toDebugString(): String = "${nameString()}{${stateString(state)}}"

/**
* @suppress **This is unstable API and it is subject to change.**
Expand Down Expand Up @@ -1083,7 +1087,9 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
val state = this.state
if (state !is Incomplete) {
// already complete -- just return result
if (state is CompletedExceptionally) throw state.cause
if (state is CompletedExceptionally) { // Slow path to recover stacktrace
recoverAndThrow(state.cause)
}
return state.unboxState()

}
Expand Down
6 changes: 4 additions & 2 deletions common/kotlinx-coroutines-core-common/src/Timeout.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package kotlinx.coroutines

import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*
import kotlinx.coroutines.selects.*
import kotlin.coroutines.*
Expand Down Expand Up @@ -80,9 +81,10 @@ private fun <U, T: U> setupTimeout(
private open class TimeoutCoroutine<U, in T: U>(
@JvmField val time: Long,
@JvmField val uCont: Continuation<U> // unintercepted continuation
) : AbstractCoroutine<T>(uCont.context, active = true), Runnable, Continuation<T> {
) : AbstractCoroutine<T>(uCont.context, active = true), Runnable, Continuation<T>, CoroutineStackFrame {
override val defaultResumeMode: Int get() = MODE_DIRECT

override val callerFrame: CoroutineStackFrame? get() = (uCont as? CoroutineStackFrame)?.callerFrame
override fun getStackTraceElement(): StackTraceElement? = (uCont as? CoroutineStackFrame)?.getStackTraceElement()
@Suppress("LeakingThis", "Deprecation")
override fun run() {
cancel(TimeoutCancellationException(time, this))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
result === OFFER_SUCCESS -> true
// We should check for closed token on offer as well, otherwise offer won't be linearizable
// in the face of concurrent close()
result === OFFER_FAILED -> throw closedForSend?.sendException ?: return false
result is Closed<*> -> throw result.sendException
result === OFFER_FAILED -> throw closedForSend?.sendException?.let { recoverStackTrace(it) } ?: return false
result is Closed<*> -> throw recoverStackTrace(result.sendException)
else -> error("offerInternal returned $result")
}
}
Expand Down Expand Up @@ -408,7 +408,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
when {
enqueueResult === ALREADY_SELECTED -> return
enqueueResult === ENQUEUE_FAILED -> {} // retry
enqueueResult is Closed<*> -> throw enqueueResult.sendException
enqueueResult is Closed<*> -> throw recoverStackTrace(enqueueResult.sendException)
else -> error("performAtomicIfNotSelected(TryEnqueueSendDesc) returned $enqueueResult")
}
} else {
Expand All @@ -420,7 +420,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
return
}
offerResult is Closed<*> -> throw offerResult.sendException
offerResult is Closed<*> -> throw recoverStackTrace(offerResult.sendException)
else -> error("offerSelectInternal returned $offerResult")
}
}
Expand Down Expand Up @@ -574,7 +574,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E

@Suppress("UNCHECKED_CAST")
private fun receiveResult(result: Any?): E {
if (result is Closed<*>) throw result.receiveException
if (result is Closed<*>) throw recoverStackTrace(result.receiveException)
return result as E
}

Expand Down Expand Up @@ -620,7 +620,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
@Suppress("UNCHECKED_CAST")
private fun receiveOrNullResult(result: Any?): E? {
if (result is Closed<*>) {
if (result.closeCause != null) throw result.closeCause
if (result.closeCause != null) throw recoverStackTrace(result.closeCause)
return null
}
return result as E
Expand Down Expand Up @@ -759,7 +759,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
when {
pollResult === ALREADY_SELECTED -> return
pollResult === POLL_FAILED -> {} // retry
pollResult is Closed<*> -> throw pollResult.receiveException
pollResult is Closed<*> -> throw recoverStackTrace(pollResult.receiveException)
else -> {
block.startCoroutineUnintercepted(pollResult as E, select.completion)
return
Expand Down Expand Up @@ -798,8 +798,9 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
if (select.trySelect(null))
block.startCoroutineUnintercepted(null, select.completion)
return
} else
throw pollResult.closeCause
} else {
throw recoverStackTrace(pollResult.closeCause)
}
}
else -> {
// selected successfully
Expand Down Expand Up @@ -858,7 +859,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E

private fun hasNextResult(result: Any?): Boolean {
if (result is Closed<*>) {
if (result.closeCause != null) throw result.receiveException
if (result.closeCause != null) throw recoverStackTrace(result.receiveException)
return false
}
return true
Expand Down Expand Up @@ -892,7 +893,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
@Suppress("UNCHECKED_CAST")
override suspend fun next(): E {
val result = this.result
if (result is Closed<*>) throw result.receiveException
if (result is Closed<*>) throw recoverStackTrace(result.receiveException)
if (result !== POLL_FAILED) {
this.result = POLL_FAILED
return result as E
Expand Down Expand Up @@ -944,10 +945,11 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
}

override fun resumeReceiveClosed(closed: Closed<*>) {
val token = if (closed.closeCause == null)
val token = if (closed.closeCause == null) {
cont.tryResume(false)
else
cont.tryResumeWithException(closed.receiveException)
} else {
cont.tryResumeWithException(recoverStackTrace(closed.receiveException, cont))
}
if (token != null) {
iterator.result = closed
cont.completeResume(token)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.internal

import kotlin.coroutines.*

internal expect inline fun <T> probeCoroutineCreated(completion: Continuation<T>): Continuation<T>
17 changes: 13 additions & 4 deletions common/kotlinx-coroutines-core-common/src/internal/Scopes.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,27 @@ import kotlin.jvm.*
internal open class ScopeCoroutine<in T>(
context: CoroutineContext,
@JvmField val uCont: Continuation<T> // unintercepted continuation
) : AbstractCoroutine<T>(context, true) {
) : AbstractCoroutine<T>(context, true), CoroutineStackFrame {
final override val callerFrame: CoroutineStackFrame? get() = uCont as CoroutineStackFrame?
final override fun getStackTraceElement(): StackTraceElement? = null
override val defaultResumeMode: Int get() = MODE_DIRECT

@Suppress("UNCHECKED_CAST")
internal override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
if (state is CompletedExceptionally)
uCont.resumeUninterceptedWithExceptionMode(state.cause, mode)
else
if (state is CompletedExceptionally) {
val exception = if (mode == MODE_IGNORE) state.cause else recoverStackTrace(state.cause, uCont)
uCont.resumeUninterceptedWithExceptionMode(exception, mode)
} else {
uCont.resumeUninterceptedMode(state as T, mode)
}
}
}

internal fun AbstractCoroutine<*>.tryRecover(exception: Throwable): Throwable {
val cont = (this as? ScopeCoroutine<*>)?.uCont ?: return exception
return recoverStackTrace(exception, cont)
}

internal class ContextScope(context: CoroutineContext) : CoroutineScope {
override val coroutineContext: CoroutineContext = context
}
Loading