Skip to content

Ignore exception on cancel/fail race in CancellableContinuation #896

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public final class kotlinx/coroutines/BuildersKt {
public abstract interface class kotlinx/coroutines/CancellableContinuation : kotlin/coroutines/Continuation {
public abstract fun cancel (Ljava/lang/Throwable;)Z
public abstract fun completeResume (Ljava/lang/Object;)V
public abstract fun initCancellability ()V
public abstract synthetic fun initCancellability ()V
public abstract fun invokeOnCancellation (Lkotlin/jvm/functions/Function1;)V
public abstract fun isActive ()Z
public abstract fun isCancelled ()Z
Expand All @@ -50,17 +50,28 @@ public final class kotlinx/coroutines/CancellableContinuation$DefaultImpls {
public static synthetic fun tryResume$default (Lkotlinx/coroutines/CancellableContinuation;Ljava/lang/Object;Ljava/lang/Object;ILjava/lang/Object;)Ljava/lang/Object;
}

public class kotlinx/coroutines/CancellableContinuationImpl : java/lang/Runnable, kotlin/coroutines/jvm/internal/CoroutineStackFrame, kotlinx/coroutines/CancellableContinuation {
public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/jvm/internal/CoroutineStackFrame, kotlinx/coroutines/CancellableContinuation {
public fun <init> (Lkotlin/coroutines/Continuation;I)V
public fun cancel (Ljava/lang/Throwable;)Z
public fun completeResume (Ljava/lang/Object;)V
public fun getCallerFrame ()Lkotlin/coroutines/jvm/internal/CoroutineStackFrame;
public fun getContext ()Lkotlin/coroutines/CoroutineContext;
public fun getContinuationCancellationCause (Lkotlinx/coroutines/Job;)Ljava/lang/Throwable;
public final fun getDelegate ()Lkotlin/coroutines/Continuation;
public final fun getResult ()Ljava/lang/Object;
public fun getStackTraceElement ()Ljava/lang/StackTraceElement;
public fun getSuccessfulResult (Ljava/lang/Object;)Ljava/lang/Object;
public fun initCancellability ()V
public synthetic fun initCancellability ()V
public fun invokeOnCancellation (Lkotlin/jvm/functions/Function1;)V
public fun isActive ()Z
public fun isCancelled ()Z
public fun isCompleted ()Z
protected fun nameString ()Ljava/lang/String;
public fun resumeUndispatched (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Object;)V
public fun resumeUndispatchedWithException (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Throwable;)V
public fun resumeWith (Ljava/lang/Object;)V
public fun takeState ()Ljava/lang/Object;
public fun toString ()Ljava/lang/String;
public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
public fun tryResumeWithException (Ljava/lang/Throwable;)Ljava/lang/Object;
}
Expand Down
109 changes: 27 additions & 82 deletions common/kotlinx-coroutines-core-common/src/CancellableContinuation.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package kotlinx.coroutines
import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.jvm.*

// --------------- cancellable continuations ---------------

Expand Down Expand Up @@ -98,9 +97,17 @@ public interface CancellableContinuation<in T> : Continuation<T> {
public fun completeResume(token: Any)

/**
* Makes this continuation cancellable. Use it with `holdCancellability` optional parameter to
* [suspendCancellableCoroutine] function. It throws [IllegalStateException] if invoked more than once.
* Legacy function that turned on cancellation behavior in [suspendCancellableCoroutine] before kotlinx.coroutines 1.1.0.
* This function does nothing and is left only for binary compatibility with old compiled code.
*
* @suppress **Deprecated**: This function is no longer used.
* It is left for binary compatibility with code compiled before kotlinx.coroutines 1.1.0.
*/
@Deprecated(
level = DeprecationLevel.HIDDEN,
message = "This function is no longer used. " +
"It is left for binary compatibility with code compiled before kotlinx.coroutines 1.1.0. "
)
@InternalCoroutinesApi
public fun initCancellability()

Expand Down Expand Up @@ -155,7 +162,9 @@ public suspend inline fun <T> suspendCancellableCoroutine(
): T =
suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
cancellable.initCancellability()
// NOTE: Before version 1.1.0 the following invocation was inlined here, so invocation of this
// method indicates that the code was compiled by kotlinx.coroutines < 1.1.0
// cancellable.initCancellability()
block(cancellable)
cancellable.getResult()
}
Expand All @@ -172,16 +181,28 @@ public suspend inline fun <T> suspendCancellableCoroutine(
*/
@InternalCoroutinesApi
public suspend inline fun <T> suspendAtomicCancellableCoroutine(
holdCancellability: Boolean = false,
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_ATOMIC_DEFAULT)
if (!holdCancellability) cancellable.initCancellability()
block(cancellable)
cancellable.getResult()
}

/**
* @suppress **Deprecated**
*/
@Deprecated(
message = "holdCancellability parameter is deprecated and is no longer used",
replaceWith = ReplaceWith("suspendAtomicCancellableCoroutine(block)")
)
@InternalCoroutinesApi
public suspend inline fun <T> suspendAtomicCancellableCoroutine(
holdCancellability: Boolean = false,
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
suspendAtomicCancellableCoroutine(block)

/**
* Removes a given node on cancellation.
*/
Expand Down Expand Up @@ -213,79 +234,3 @@ private class DisposeOnCancel(private val handle: DisposableHandle) : CancelHand
override fun invoke(cause: Throwable?) = handle.dispose()
override fun toString(): String = "DisposeOnCancel[$handle]"
}

@PublishedApi
internal open class CancellableContinuationImpl<in T>(
delegate: Continuation<T>,
resumeMode: Int
) : AbstractContinuation<T>(delegate, resumeMode), CancellableContinuation<T>, Runnable, CoroutineStackFrame {

public override val context: CoroutineContext = delegate.context

override val callerFrame: CoroutineStackFrame?
get() = delegate as? CoroutineStackFrame

override fun getStackTraceElement(): StackTraceElement? = null

override fun initCancellability() {
initParentJobInternal(delegate.context[Job])
}

override fun tryResume(value: T, idempotent: Any?): Any? {
loopOnState { state ->
when (state) {
is NotCompleted -> {
val update: Any? = if (idempotent == null) value else
CompletedIdempotentResult(idempotent, value, state)
if (tryUpdateStateToFinal(state, update)) return state
}
is CompletedIdempotentResult -> {
if (state.idempotentResume === idempotent) {
check(state.result === value) { "Non-idempotent resume" }
return state.token
} else
return null
}
else -> return null // cannot resume -- not active anymore
}
}
}

override fun tryResumeWithException(exception: Throwable): Any? {
loopOnState { state ->
when (state) {
is NotCompleted -> {
if (tryUpdateStateToFinal(state, CompletedExceptionally(exception))) return state
}
else -> return null // cannot resume -- not active anymore
}
}
}

override fun completeResume(token: Any) = completeStateUpdate(token as NotCompleted, state, resumeMode)

override fun CoroutineDispatcher.resumeUndispatched(value: T) {
val dc = delegate as? DispatchedContinuation
resumeImpl(value, if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
}

override fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable) {
val dc = delegate as? DispatchedContinuation
resumeImpl(CompletedExceptionally(exception), if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
}

@Suppress("UNCHECKED_CAST")
override fun <T> getSuccessfulResult(state: Any?): T =
if (state is CompletedIdempotentResult) state.result as T else state as T

protected override fun nameString(): String =
"CancellableContinuation(${delegate.toDebugString()})"
}

private class CompletedIdempotentResult(
@JvmField val idempotentResume: Any?,
@JvmField val result: Any?,
@JvmField val token: NotCompleted
) {
override fun toString(): String = "CompletedIdempotentResult[$result]"
}
Loading