diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index a2000c7640..80ab0317b2 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -34,7 +34,7 @@ public final class kotlinx/coroutines/BuildersKt { public abstract interface class kotlinx/coroutines/CancellableContinuation : kotlin/coroutines/Continuation { public abstract fun cancel (Ljava/lang/Throwable;)Z public abstract fun completeResume (Ljava/lang/Object;)V - public abstract fun initCancellability ()V + public abstract synthetic fun initCancellability ()V public abstract fun invokeOnCancellation (Lkotlin/jvm/functions/Function1;)V public abstract fun isActive ()Z public abstract fun isCancelled ()Z @@ -50,17 +50,28 @@ public final class kotlinx/coroutines/CancellableContinuation$DefaultImpls { public static synthetic fun tryResume$default (Lkotlinx/coroutines/CancellableContinuation;Ljava/lang/Object;Ljava/lang/Object;ILjava/lang/Object;)Ljava/lang/Object; } -public class kotlinx/coroutines/CancellableContinuationImpl : java/lang/Runnable, kotlin/coroutines/jvm/internal/CoroutineStackFrame, kotlinx/coroutines/CancellableContinuation { +public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/jvm/internal/CoroutineStackFrame, kotlinx/coroutines/CancellableContinuation { public fun (Lkotlin/coroutines/Continuation;I)V + public fun cancel (Ljava/lang/Throwable;)Z public fun completeResume (Ljava/lang/Object;)V public fun getCallerFrame ()Lkotlin/coroutines/jvm/internal/CoroutineStackFrame; public fun getContext ()Lkotlin/coroutines/CoroutineContext; + public fun getContinuationCancellationCause (Lkotlinx/coroutines/Job;)Ljava/lang/Throwable; + public final fun getDelegate ()Lkotlin/coroutines/Continuation; + public final fun getResult ()Ljava/lang/Object; public fun getStackTraceElement ()Ljava/lang/StackTraceElement; public fun getSuccessfulResult (Ljava/lang/Object;)Ljava/lang/Object; - public fun initCancellability ()V + public synthetic fun initCancellability ()V + public fun invokeOnCancellation (Lkotlin/jvm/functions/Function1;)V + public fun isActive ()Z + public fun isCancelled ()Z + public fun isCompleted ()Z protected fun nameString ()Ljava/lang/String; public fun resumeUndispatched (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Object;)V public fun resumeUndispatchedWithException (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Throwable;)V + public fun resumeWith (Ljava/lang/Object;)V + public fun takeState ()Ljava/lang/Object; + public fun toString ()Ljava/lang/String; public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object; public fun tryResumeWithException (Ljava/lang/Throwable;)Ljava/lang/Object; } diff --git a/common/kotlinx-coroutines-core-common/src/CancellableContinuation.kt b/common/kotlinx-coroutines-core-common/src/CancellableContinuation.kt index 5c5d0881a6..4fffcd906f 100644 --- a/common/kotlinx-coroutines-core-common/src/CancellableContinuation.kt +++ b/common/kotlinx-coroutines-core-common/src/CancellableContinuation.kt @@ -7,7 +7,6 @@ package kotlinx.coroutines import kotlinx.coroutines.internal.* import kotlin.coroutines.* import kotlin.coroutines.intrinsics.* -import kotlin.jvm.* // --------------- cancellable continuations --------------- @@ -98,9 +97,17 @@ public interface CancellableContinuation : Continuation { public fun completeResume(token: Any) /** - * Makes this continuation cancellable. Use it with `holdCancellability` optional parameter to - * [suspendCancellableCoroutine] function. It throws [IllegalStateException] if invoked more than once. + * Legacy function that turned on cancellation behavior in [suspendCancellableCoroutine] before kotlinx.coroutines 1.1.0. + * This function does nothing and is left only for binary compatibility with old compiled code. + * + * @suppress **Deprecated**: This function is no longer used. + * It is left for binary compatibility with code compiled before kotlinx.coroutines 1.1.0. */ + @Deprecated( + level = DeprecationLevel.HIDDEN, + message = "This function is no longer used. " + + "It is left for binary compatibility with code compiled before kotlinx.coroutines 1.1.0. " + ) @InternalCoroutinesApi public fun initCancellability() @@ -155,7 +162,9 @@ public suspend inline fun suspendCancellableCoroutine( ): T = suspendCoroutineUninterceptedOrReturn { uCont -> val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE) - cancellable.initCancellability() + // NOTE: Before version 1.1.0 the following invocation was inlined here, so invocation of this + // method indicates that the code was compiled by kotlinx.coroutines < 1.1.0 + // cancellable.initCancellability() block(cancellable) cancellable.getResult() } @@ -172,16 +181,28 @@ public suspend inline fun suspendCancellableCoroutine( */ @InternalCoroutinesApi public suspend inline fun suspendAtomicCancellableCoroutine( - holdCancellability: Boolean = false, crossinline block: (CancellableContinuation) -> Unit ): T = suspendCoroutineUninterceptedOrReturn { uCont -> val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_ATOMIC_DEFAULT) - if (!holdCancellability) cancellable.initCancellability() block(cancellable) cancellable.getResult() } +/** + * @suppress **Deprecated** + */ +@Deprecated( + message = "holdCancellability parameter is deprecated and is no longer used", + replaceWith = ReplaceWith("suspendAtomicCancellableCoroutine(block)") +) +@InternalCoroutinesApi +public suspend inline fun suspendAtomicCancellableCoroutine( + holdCancellability: Boolean = false, + crossinline block: (CancellableContinuation) -> Unit +): T = + suspendAtomicCancellableCoroutine(block) + /** * Removes a given node on cancellation. */ @@ -213,79 +234,3 @@ private class DisposeOnCancel(private val handle: DisposableHandle) : CancelHand override fun invoke(cause: Throwable?) = handle.dispose() override fun toString(): String = "DisposeOnCancel[$handle]" } - -@PublishedApi -internal open class CancellableContinuationImpl( - delegate: Continuation, - resumeMode: Int -) : AbstractContinuation(delegate, resumeMode), CancellableContinuation, Runnable, CoroutineStackFrame { - - public override val context: CoroutineContext = delegate.context - - override val callerFrame: CoroutineStackFrame? - get() = delegate as? CoroutineStackFrame - - override fun getStackTraceElement(): StackTraceElement? = null - - override fun initCancellability() { - initParentJobInternal(delegate.context[Job]) - } - - override fun tryResume(value: T, idempotent: Any?): Any? { - loopOnState { state -> - when (state) { - is NotCompleted -> { - val update: Any? = if (idempotent == null) value else - CompletedIdempotentResult(idempotent, value, state) - if (tryUpdateStateToFinal(state, update)) return state - } - is CompletedIdempotentResult -> { - if (state.idempotentResume === idempotent) { - check(state.result === value) { "Non-idempotent resume" } - return state.token - } else - return null - } - else -> return null // cannot resume -- not active anymore - } - } - } - - override fun tryResumeWithException(exception: Throwable): Any? { - loopOnState { state -> - when (state) { - is NotCompleted -> { - if (tryUpdateStateToFinal(state, CompletedExceptionally(exception))) return state - } - else -> return null // cannot resume -- not active anymore - } - } - } - - override fun completeResume(token: Any) = completeStateUpdate(token as NotCompleted, state, resumeMode) - - override fun CoroutineDispatcher.resumeUndispatched(value: T) { - val dc = delegate as? DispatchedContinuation - resumeImpl(value, if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode) - } - - override fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable) { - val dc = delegate as? DispatchedContinuation - resumeImpl(CompletedExceptionally(exception), if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode) - } - - @Suppress("UNCHECKED_CAST") - override fun getSuccessfulResult(state: Any?): T = - if (state is CompletedIdempotentResult) state.result as T else state as T - - protected override fun nameString(): String = - "CancellableContinuation(${delegate.toDebugString()})" -} - -private class CompletedIdempotentResult( - @JvmField val idempotentResume: Any?, - @JvmField val result: Any?, - @JvmField val token: NotCompleted -) { - override fun toString(): String = "CompletedIdempotentResult[$result]" -} diff --git a/common/kotlinx-coroutines-core-common/src/AbstractContinuation.kt b/common/kotlinx-coroutines-core-common/src/CancellableContinuationImpl.kt similarity index 51% rename from common/kotlinx-coroutines-core-common/src/AbstractContinuation.kt rename to common/kotlinx-coroutines-core-common/src/CancellableContinuationImpl.kt index 961f15e322..c03c6121e6 100644 --- a/common/kotlinx-coroutines-core-common/src/AbstractContinuation.kt +++ b/common/kotlinx-coroutines-core-common/src/CancellableContinuationImpl.kt @@ -17,10 +17,12 @@ private const val RESUMED = 2 /** * @suppress **This is unstable API and it is subject to change.** */ -internal abstract class AbstractContinuation( +@PublishedApi +internal open class CancellableContinuationImpl( public final override val delegate: Continuation, resumeMode: Int -) : DispatchedTask(resumeMode), Continuation { +) : DispatchedTask(resumeMode), CancellableContinuation, CoroutineStackFrame { + public override val context: CoroutineContext = delegate.context /* * Implementation notes @@ -56,50 +58,72 @@ internal abstract class AbstractContinuation( ------ ------------ ------------ ----------- ACTIVE Active : Active active, no listeners SINGLE_A CancelHandler : Active active, one cancellation listener - CANCELLED Cancelled : Cancelled cancelled (final state) + CANCELLED CancelledContinuation: Cancelled cancelled (final state) COMPLETED any : Completed produced some result or threw an exception (final state) */ - private val _state = atomic(ACTIVE) + private val _state = atomic(Active) @Volatile private var parentHandle: DisposableHandle? = null internal val state: Any? get() = _state.value - public val isActive: Boolean get() = state is NotCompleted + public override val isActive: Boolean get() = state is NotCompleted - public val isCompleted: Boolean get() = state !is NotCompleted + public override val isCompleted: Boolean get() = state !is NotCompleted - public val isCancelled: Boolean get() = state is CancelledContinuation + public override val isCancelled: Boolean get() = state is CancelledContinuation - internal fun initParentJobInternal(parent: Job?) { - check(parentHandle == null) - if (parent == null) { - parentHandle = NonDisposableHandle - return - } - parent.start() // make sure the parent is started - val handle = parent.invokeOnCompletion(onCancelling = true, - handler = ChildContinuation(parent, this).asHandler) + public override fun initCancellability() { + // This method does nothing. Leftover for binary compatibility with old compiled code + } + // It is only invoked from an internal getResult function, so we can be sure it is not invoked twice + private fun installParentCancellationHandler() { + if (isCompleted) return // fast path 1 -- don't need to do anything if already completed + val parent = delegate.context[Job] ?: return // fast path 2 -- don't do anything without parent + parent.start() // make sure the parent is started + val handle = parent.invokeOnCompletion( + onCancelling = true, + handler = ChildContinuation(parent, this).asHandler + ) parentHandle = handle - // now check our state _after_ registering (see updateStateToFinal order of actions) + // now check our state _after_ registering (could have completed while we were registering) if (isCompleted) { - handle.dispose() + handle.dispose() // it is Ok to call dispose twice -- here and in disposeParentHandle parentHandle = NonDisposableHandle // release it just in case, to aid GC } } + public override val callerFrame: CoroutineStackFrame? + get() = delegate as? CoroutineStackFrame + + public override fun getStackTraceElement(): StackTraceElement? = null + override fun takeState(): Any? = state - public fun cancel(cause: Throwable?): Boolean = - cancelImpl(cause) + public override fun cancel(cause: Throwable?): Boolean { + _state.loop { state -> + if (state !is NotCompleted) return false // false if already complete or cancelling + // Active -- update to final state + if (!_state.compareAndSet(state, CancelledContinuation(this, cause))) return@loop // retry on cas failure + // Invoke cancel handler if it was present + if (state is CancelHandler) invokeHandlerSafely { state.invoke(cause) } + // Complete state update + disposeParentHandle() + dispatchResume(mode = MODE_ATOMIC_DEFAULT) + return true + } + } - fun cancelImpl(cause: Throwable?): Boolean { - loopOnState { state -> - if (state !is NotCompleted) return false // quit if already complete - val update = CancelledContinuation(this, cause) - if (updateStateToFinal(state, update, mode = MODE_ATOMIC_DEFAULT)) return true + private inline fun invokeHandlerSafely(block: () -> Unit) { + try { + block() + } catch (ex: Throwable) { + handleCoroutineException( + context, + CompletionHandlerException("Exception in cancellation handler for $this", ex) + ) } } @@ -131,6 +155,7 @@ internal abstract class AbstractContinuation( @PublishedApi internal fun getResult(): Any? { + installParentCancellationHandler() if (trySuspend()) return COROUTINE_SUSPENDED // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state val state = this.state @@ -144,20 +169,20 @@ internal abstract class AbstractContinuation( internal fun resumeWithExceptionMode(exception: Throwable, mode: Int) = resumeImpl(CompletedExceptionally(exception), mode) - public fun invokeOnCancellation(handler: CompletionHandler) { + public override fun invokeOnCancellation(handler: CompletionHandler) { var handleCache: CancelHandler? = null - loopOnState { state -> + _state.loop { state -> when (state) { is Active -> { val node = handleCache ?: makeHandler(handler).also { handleCache = it } - if (_state.compareAndSet(state, node)) { - return - } + if (_state.compareAndSet(state, node)) return // quit on cas success + } + is CancelHandler -> { + error("It's prohibited to register multiple handlers, tried to register $handler, already has $state") } - is CancelHandler -> error("It's prohibited to register multiple handlers, tried to register $handler, already has $state") is CancelledContinuation -> { /* - * Continuation is complete, invoke directly. + * Continuation was already cancelled, invoke directly. * NOTE: multiple invokeOnCancellation calls with different handlers are allowed on cancelled continuation. * It's inconsistent with running continuation, but currently, we have no mechanism to check * whether any handler was registered during continuation lifecycle without additional overhead. @@ -166,7 +191,7 @@ internal abstract class AbstractContinuation( * :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension, * because we play type tricks on Kotlin/JS and handler is not necessarily a function there */ - handler.invokeIt((state as? CompletedExceptionally)?.cause) + invokeHandlerSafely { handler.invokeIt((state as? CompletedExceptionally)?.cause) } return } else -> return @@ -183,29 +208,24 @@ internal abstract class AbstractContinuation( dispatch(mode) } - protected inline fun loopOnState(block: (Any?) -> Unit): Nothing { - while (true) { - block(state) - } - } - - protected fun resumeImpl(proposedUpdate: Any?, resumeMode: Int) { - loopOnState { state -> + private fun resumeImpl(proposedUpdate: Any?, resumeMode: Int) { + _state.loop { state -> when (state) { is NotCompleted -> { - if (updateStateToFinal(state, proposedUpdate, resumeMode)) return + if (!_state.compareAndSet(state, proposedUpdate)) return@loop // retry on cas failure + disposeParentHandle() + dispatchResume(resumeMode) + return } is CancelledContinuation -> { /* * If continuation was cancelled, then all further resumes must be * ignored, because cancellation is asynchronous and may race with resume. - * Racy exception are reported so no exceptions are lost + * Racy exceptions will be lost, too. There does not see to be a safe way to + * handle them without producing spurious crashes. * * :todo: we should somehow remember the attempt to invoke resume and fail on the second attempt. */ - if (proposedUpdate is CompletedExceptionally) { - handleException(proposedUpdate.cause) - } return } else -> error("Already resumed, but proposed with update $proposedUpdate") @@ -213,72 +233,85 @@ internal abstract class AbstractContinuation( } } - /** - * Tries to make transition from active to cancelled or completed state and invokes cancellation handler if necessary - */ - private fun updateStateToFinal(expect: NotCompleted, proposedUpdate: Any?, mode: Int): Boolean { - if (!tryUpdateStateToFinal(expect, proposedUpdate)) { - return false + // Unregister from parent job + private fun disposeParentHandle() { + parentHandle?.let { // volatile read parentHandle (once) + it.dispose() + parentHandle = NonDisposableHandle // release it just in case, to aid GC } - completeStateUpdate(expect, proposedUpdate, mode) - return true } - protected fun tryUpdateStateToFinal(expect: NotCompleted, update: Any?): Boolean { - require(update !is NotCompleted) // only NotCompleted -> completed transition is allowed - if (!_state.compareAndSet(expect, update)) return false - // Unregister from parent job - parentHandle?.let { - it.dispose() // volatile read parentHandle _after_ state was updated - parentHandle = NonDisposableHandle // release it just in case, to aid GC + override fun tryResume(value: T, idempotent: Any?): Any? { + _state.loop { state -> + when (state) { + is NotCompleted -> { + val update: Any? = if (idempotent == null) value else + CompletedIdempotentResult(idempotent, value, state) + if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure + disposeParentHandle() + return state + } + is CompletedIdempotentResult -> { + return if (state.idempotentResume === idempotent) { + check(state.result === value) { "Non-idempotent resume" } + state.token + } else { + null + } + } + else -> return null // cannot resume -- not active anymore + } } - return true // continues in completeStateUpdate } - protected fun completeStateUpdate(expect: NotCompleted, update: Any?, mode: Int) { - val exceptionally = update as? CompletedExceptionally - - if (update is CancelledContinuation && expect is CancelHandler) { - try { - expect.invoke(exceptionally?.cause) - } catch (ex: Throwable) { - handleException(CompletionHandlerException("Exception in completion handler $expect for $this", ex)) + override fun tryResumeWithException(exception: Throwable): Any? { + _state.loop { state -> + when (state) { + is NotCompleted -> { + val update = CompletedExceptionally(exception) + if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure + disposeParentHandle() + return state + } + else -> return null // cannot resume -- not active anymore } } + } - // Notify all handlers before dispatching, otherwise behaviour will be timing-dependent - // and confusing with Unconfined - dispatchResume(mode) + override fun completeResume(token: Any) { + // note: We don't actually use token anymore, because handler needs to be invoked on cancellation only + dispatchResume(resumeMode) } - private fun handleException(exception: Throwable) { - handleCoroutineException(context, exception) + override fun CoroutineDispatcher.resumeUndispatched(value: T) { + val dc = delegate as? DispatchedContinuation + resumeImpl(value, if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode) } + override fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable) { + val dc = delegate as? DispatchedContinuation + resumeImpl(CompletedExceptionally(exception), if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode) + } + + @Suppress("UNCHECKED_CAST") + override fun getSuccessfulResult(state: Any?): T = + if (state is CompletedIdempotentResult) state.result as T else state as T + // For nicer debugging public override fun toString(): String = - "${nameString()}{${stateString()}}@$hexAddress" + "${nameString()}(${delegate.toDebugString()}){$state}@$hexAddress" - protected open fun nameString(): String = classSimpleName - - private fun stateString(): String { - val state = this.state - return when (state) { - is NotCompleted ->"Active" - is CancelledContinuation -> "Cancelled" - is CompletedExceptionally -> "CompletedExceptionally" - else -> "Completed" - } - } + protected open fun nameString(): String = + "CancellableContinuation" } // Marker for active continuation internal interface NotCompleted -private class Active : NotCompleted -@SharedImmutable -private val ACTIVE: Active = Active() +private object Active : NotCompleted { + override fun toString(): String = "Active" +} internal abstract class CancelHandler : CancelHandlerBase(), NotCompleted @@ -291,3 +324,11 @@ private class InvokeOnCancel( // Clashes with InvokeOnCancellation } override fun toString() = "InvokeOnCancel[${handler.classSimpleName}@$hexAddress]" } + +private class CompletedIdempotentResult( + @JvmField val idempotentResume: Any?, + @JvmField val result: Any?, + @JvmField val token: NotCompleted +) { + override fun toString(): String = "CompletedIdempotentResult[$result]" +} diff --git a/common/kotlinx-coroutines-core-common/src/JobSupport.kt b/common/kotlinx-coroutines-core-common/src/JobSupport.kt index a230b3a35c..8504f84c6b 100644 --- a/common/kotlinx-coroutines-core-common/src/JobSupport.kt +++ b/common/kotlinx-coroutines-core-common/src/JobSupport.kt @@ -1049,7 +1049,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren } protected override fun nameString(): String = - "AwaitContinuation(${delegate.toDebugString()})" + "AwaitContinuation" } /* @@ -1105,7 +1105,6 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren * thrown and not a JobCancellationException. */ val cont = AwaitContinuation(uCont.intercepted(), this) - cont.initCancellability() cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(this, cont).asHandler)) cont.getResult() } @@ -1249,7 +1248,7 @@ private class ResumeOnCompletion( private class ResumeAwaitOnCompletion( job: JobSupport, - private val continuation: AbstractContinuation + private val continuation: CancellableContinuationImpl ) : JobNode(job) { override fun invoke(cause: Throwable?) { val state = job.state @@ -1330,10 +1329,10 @@ internal class ChildHandleNode( // Same as ChildHandleNode, but for cancellable continuation internal class ChildContinuation( parent: Job, - @JvmField val child: AbstractContinuation<*> + @JvmField val child: CancellableContinuationImpl<*> ) : JobCancellingNode(parent) { override fun invoke(cause: Throwable?) { - child.cancelImpl(child.getContinuationCancellationCause(job)) + child.cancel(child.getContinuationCancellationCause(job)) } override fun toString(): String = "ChildContinuation[$child]" diff --git a/common/kotlinx-coroutines-core-common/src/channels/AbstractChannel.kt b/common/kotlinx-coroutines-core-common/src/channels/AbstractChannel.kt index 5d47303351..d05ea29ecb 100644 --- a/common/kotlinx-coroutines-core-common/src/channels/AbstractChannel.kt +++ b/common/kotlinx-coroutines-core-common/src/channels/AbstractChannel.kt @@ -188,13 +188,12 @@ internal abstract class AbstractSendChannel : SendChannel { } } - private suspend fun sendSuspend(element: E): Unit = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont -> + private suspend fun sendSuspend(element: E): Unit = suspendAtomicCancellableCoroutine sc@ { cont -> val send = SendElement(element, cont) loop@ while (true) { val enqueueResult = enqueueSend(send) when (enqueueResult) { null -> { // enqueued successfully - cont.initCancellability() // make it properly cancellable cont.removeOnCancellation(send) return@sc } @@ -580,11 +579,10 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel + private suspend fun receiveSuspend(): E = suspendAtomicCancellableCoroutine sc@ { cont -> val receive = ReceiveElement(cont as CancellableContinuation, nullOnClose = false) while (true) { if (enqueueReceive(receive)) { - cont.initCancellability() // make it properly cancellable removeReceiveOnCancel(cont, receive) return@sc } @@ -628,11 +626,10 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel + private suspend fun receiveOrNullSuspend(): E? = suspendAtomicCancellableCoroutine sc@ { cont -> val receive = ReceiveElement(cont, nullOnClose = true) while (true) { if (enqueueReceive(receive)) { - cont.initCancellability() // make it properly cancellable removeReceiveOnCancel(cont, receive) return@sc } @@ -866,11 +863,10 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel + private suspend fun hasNextSuspend(): Boolean = suspendAtomicCancellableCoroutine sc@ { cont -> val receive = ReceiveHasNext(this, cont) while (true) { if (channel.enqueueReceive(receive)) { - cont.initCancellability() // make it properly cancellable channel.removeReceiveOnCancel(cont, receive) return@sc } diff --git a/common/kotlinx-coroutines-core-common/src/sync/Mutex.kt b/common/kotlinx-coroutines-core-common/src/sync/Mutex.kt index 4928a5f18f..0180956803 100644 --- a/common/kotlinx-coroutines-core-common/src/sync/Mutex.kt +++ b/common/kotlinx-coroutines-core-common/src/sync/Mutex.kt @@ -186,7 +186,7 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2 { return lockSuspend(owner) } - private suspend fun lockSuspend(owner: Any?) = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont -> + private suspend fun lockSuspend(owner: Any?) = suspendAtomicCancellableCoroutine sc@ { cont -> val waiter = LockCont(owner, cont) _state.loop { state -> when (state) { @@ -207,7 +207,6 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2 { check(curOwner !== owner) { "Already locked by $owner" } if (state.addLastIf(waiter, { _state.value === state })) { // added to waiter list! - cont.initCancellability() // make it properly cancellable cont.removeOnCancellation(waiter) return@sc } diff --git a/common/kotlinx-coroutines-core-common/test/CancellableContinuationTest.kt b/common/kotlinx-coroutines-core-common/test/CancellableContinuationTest.kt index 0a6e44e4ca..38fc9ff281 100644 --- a/common/kotlinx-coroutines-core-common/test/CancellableContinuationTest.kt +++ b/common/kotlinx-coroutines-core-common/test/CancellableContinuationTest.kt @@ -72,7 +72,7 @@ class CancellableContinuationTest : TestBase() { * should be ignored. Here suspended coroutine is cancelled but then resumed with exception. */ @Test - fun testCancelAndResumeWithException() = runTest(unhandled = listOf({e -> e is TestException})) { + fun testCancelAndResumeWithException() = runTest { var continuation: Continuation? = null val job = launch { try { diff --git a/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt b/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt index bed4c9119a..ef02aa229a 100644 --- a/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt +++ b/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt @@ -349,10 +349,10 @@ class FutureTest : TestBase() { private suspend fun CoroutineScope.awaitFutureWithCancel(cancellable: Boolean): CompletableFuture { val latch = CountDownLatch(1) - val future = CompletableFuture.supplyAsync({ + val future = CompletableFuture.supplyAsync { latch.await() 239 - }) + } val deferred = async { expect(2) @@ -424,6 +424,30 @@ class FutureTest : TestBase() { finish(3) } + /** + * See [https://github.com/Kotlin/kotlinx.coroutines/issues/892] + */ + @Test + fun testTimeoutCancellationFailRace() { + repeat(10 * stressTestMultiplier) { + runBlocking { + withTimeoutOrNull(10) { + while (true) { + var caught = false + try { + CompletableFuture.supplyAsync { + throw TestException() + }.await() + } catch (ignored: TestException) { + caught = true + } + assertTrue(caught) // should have caught TestException or timed out + } + } + } + } + } + private inline fun CompletableFuture<*>.checkFutureException(vararg suppressed: KClass) { val e = assertFailsWith { get() } val cause = e.cause!!