diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt index 8f589912a0..b133b7935d 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt @@ -104,8 +104,10 @@ public interface CancellableContinuation : Continuation { public fun completeResume(token: Any) /** - * Legacy function that turned on cancellation behavior in [suspendCancellableCoroutine] before kotlinx.coroutines 1.1.0. - * This function does nothing and is left only for binary compatibility with old compiled code. + * Internal function that setups cancellation behavior in [suspendCancellableCoroutine]. + * It's illegal to call this function in any non-`kotlinx.coroutines` code and + * such calls lead to undefined behaviour. + * Exposed in our ABI since 1.0.0 withing `suspendCancellableCoroutine` body. * * @suppress **This is unstable API and it is subject to change.** */ @@ -332,7 +334,7 @@ internal suspend inline fun suspendCancellableCoroutineReusable( internal fun getOrCreateCancellableContinuation(delegate: Continuation): CancellableContinuationImpl { // If used outside of our dispatcher if (delegate !is DispatchedContinuation) { - return CancellableContinuationImpl(delegate, MODE_CANCELLABLE_REUSABLE) + return CancellableContinuationImpl(delegate, MODE_CANCELLABLE) } /* * Attempt to claim reusable instance. diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt index 1a8f35663a..c310623c5d 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt @@ -72,10 +72,7 @@ internal open class CancellableContinuationImpl( */ private val _state = atomic(Active) - private val _parentHandle = atomic(null) - private var parentHandle: DisposableHandle? - get() = _parentHandle.value - set(value) { _parentHandle.value = value } + private var parentHandle: DisposableHandle? = null internal val state: Any? get() = _state.value @@ -93,7 +90,21 @@ internal open class CancellableContinuationImpl( } public override fun initCancellability() { - setupCancellation() + /* + * Invariant: at the moment of invocation, `this` has not yet + * leaked to user code and no one is able to invoke `resume` or `cancel` + * on it yet. Also, this function is not invoked for reusable continuations. + */ + val handle = installParentHandle() + ?: return // fast path -- don't do anything without parent + // now check our state _after_ registering, could have completed while we were registering, + // but only if parent was cancelled. Parent could be in a "cancelling" state for a while, + // so we are helping it and cleaning the node ourselves + if (isCompleted) { + // Can be invoked concurrently in 'parentCancelled', no problems here + handle.dispose() + parentHandle = NonDisposableHandle + } } private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable(this) @@ -118,40 +129,6 @@ internal open class CancellableContinuationImpl( return true } - /** - * Setups parent cancellation and checks for postponed cancellation in the case of reusable continuations. - * It is only invoked from an internal [getResult] function for reusable continuations - * and from [suspendCancellableCoroutine] to establish a cancellation before registering CC anywhere. - */ - private fun setupCancellation() { - if (checkCompleted()) return - if (parentHandle !== null) return // fast path 2 -- was already initialized - val parent = delegate.context[Job] ?: return // fast path 3 -- don't do anything without parent - val handle = parent.invokeOnCompletion( - onCancelling = true, - handler = ChildContinuation(this).asHandler - ) - parentHandle = handle - // now check our state _after_ registering (could have completed while we were registering) - // Also note that we do not dispose parent for reusable continuations, dispatcher will do that for us - if (isCompleted && !isReusable()) { - handle.dispose() // it is Ok to call dispose twice -- here and in disposeParentHandle - parentHandle = NonDisposableHandle // release it just in case, to aid GC - } - } - - private fun checkCompleted(): Boolean { - val completed = isCompleted - if (!resumeMode.isReusableMode) return completed // Do not check postponed cancellation for non-reusable continuations - val dispatched = delegate as? DispatchedContinuation<*> ?: return completed - val cause = dispatched.checkPostponedCancellation(this) ?: return completed - if (!completed) { - // Note: this cancel may fail if one more concurrent cancel is currently being invoked - cancel(cause) - } - return true - } - public override val callerFrame: CoroutineStackFrame? get() = delegate as? CoroutineStackFrame @@ -188,7 +165,9 @@ internal open class CancellableContinuationImpl( */ private fun cancelLater(cause: Throwable): Boolean { if (!resumeMode.isReusableMode) return false - val dispatched = (delegate as? DispatchedContinuation<*>) ?: return false + // Ensure that we are postponing cancellation to the right instance + if (!isReusable()) return false + val dispatched = delegate as DispatchedContinuation<*> return dispatched.postponeCancellation(cause) } @@ -216,7 +195,7 @@ internal open class CancellableContinuationImpl( private inline fun callCancelHandlerSafely(block: () -> Unit) { try { - block() + block() } catch (ex: Throwable) { // Handler should never fail, if it does -- it is an unhandled exception handleCoroutineException( @@ -276,9 +255,33 @@ internal open class CancellableContinuationImpl( @PublishedApi internal fun getResult(): Any? { - setupCancellation() - if (trySuspend()) return COROUTINE_SUSPENDED + val isReusable = isReusable() + // trySuspend may fail either if 'block' has resumed/cancelled a continuation + // or we got async cancellation from parent. + if (trySuspend()) { + /* + * We were neither resumed nor cancelled, time to suspend. + * But first we have to install parent cancellation handle (if we didn't yet), + * so CC could be properly resumed on parent cancellation. + */ + if (parentHandle == null) { + installParentHandle() + } + /* + * Release the continuation after installing the handle (if needed). + * If we were successful, then do nothing, it's ok to reuse the instance now. + * Otherwise, dispose the handle by ourselves. + */ + if (isReusable) { + releaseClaimedReusableContinuation() + } + return COROUTINE_SUSPENDED + } // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state + if (isReusable) { + // release claimed reusable continuation for the future reuse + releaseClaimedReusableContinuation() + } val state = this.state if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this) // if the parent job was already cancelled, then throw the corresponding cancellation exception @@ -296,6 +299,28 @@ internal open class CancellableContinuationImpl( return getSuccessfulResult(state) } + private fun installParentHandle(): DisposableHandle? { + val parent = context[Job] ?: return null // don't do anything without a parent + // Install the handle + val handle = parent.invokeOnCompletion( + onCancelling = true, + handler = ChildContinuation(this).asHandler + ) + parentHandle = handle + return handle + } + + /** + * Tries to release reusable continuation. It can fail is there was an asynchronous cancellation, + * in which case it detaches from the parent and cancels this continuation. + */ + private fun releaseClaimedReusableContinuation() { + // Cannot be casted if e.g. invoked from `installParentHandleReusable` for context without dispatchers, but with Job in it + val cancellationCause = (delegate as? DispatchedContinuation<*>)?.tryReleaseClaimedContinuation(this) ?: return + detachChild() + cancel(cancellationCause) + } + override fun resumeWith(result: Result) = resumeImpl(result.toState(this), resumeMode) @@ -462,11 +487,10 @@ internal open class CancellableContinuationImpl( /** * Detaches from the parent. - * Invariant: used from [CoroutineDispatcher.releaseInterceptedContinuation] iff [isReusable] is `true` */ internal fun detachChild() { - val handle = parentHandle - handle?.dispose() + val handle = parentHandle ?: return + handle.dispose() parentHandle = NonDisposableHandle } diff --git a/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt b/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt index b2b887988d..10be4a3d9e 100644 --- a/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt @@ -101,7 +101,12 @@ public abstract class CoroutineDispatcher : @InternalCoroutinesApi public override fun releaseInterceptedContinuation(continuation: Continuation<*>) { - (continuation as DispatchedContinuation<*>).reusableCancellableContinuation?.detachChild() + /* + * Unconditional cast is safe here: we only return DispatchedContinuation from `interceptContinuation`, + * any ClassCastException can only indicate compiler bug + */ + val dispatched = continuation as DispatchedContinuation<*> + dispatched.release() } /** diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index 5b516ae27f..f011dcf898 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -1228,6 +1228,8 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren * thrown and not a JobCancellationException. */ val cont = AwaitContinuation(uCont.intercepted(), this) + // we are mimicking suspendCancellableCoroutine here and call initCancellability, too. + cont.initCancellability() cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(cont).asHandler)) cont.getResult() } diff --git a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt index 2874e7d592..45b9699c84 100644 --- a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt +++ b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt @@ -46,17 +46,15 @@ internal class DispatchedContinuation( * 4) [Throwable] continuation was cancelled with this cause while being in [suspendCancellableCoroutineReusable], * [CancellableContinuationImpl.getResult] will check for cancellation later. * - * [REUSABLE_CLAIMED] state is required to prevent the lost resume in the channel. - * AbstractChannel.receive method relies on the fact that the following pattern + * [REUSABLE_CLAIMED] state is required to prevent double-use of the reused continuation. + * In the `getResult`, we have the following code: * ``` - * suspendCancellableCoroutineReusable { cont -> - * val result = pollFastPath() - * if (result != null) cont.resume(result) + * if (trySuspend()) { + * // <- at this moment current continuation can be redispatched and claimed again. + * attachChildToParent() + * releaseClaimedContinuation() * } * ``` - * always succeeds. - * To make it always successful, we actually postpone "reusable" cancellation - * to this phase and set cancellation only at the moment of instantiation. */ private val _reusableCancellableContinuation = atomic(null) @@ -66,9 +64,9 @@ internal class DispatchedContinuation( public fun isReusable(requester: CancellableContinuationImpl<*>): Boolean { /* * Reusability control: - * `null` -> no reusability at all, false + * `null` -> no reusability at all, `false` * If current state is not CCI, then we are within `suspendCancellableCoroutineReusable`, true - * Else, if result is CCI === requester. + * Else, if result is CCI === requester, then it's our reusable continuation * Identity check my fail for the following pattern: * ``` * loop: @@ -82,6 +80,27 @@ internal class DispatchedContinuation( return true } + + /** + * Awaits until previous call to `suspendCancellableCoroutineReusable` will + * stop mutating cached instance + */ + public fun awaitReusability() { + _reusableCancellableContinuation.loop { it -> + if (it !== REUSABLE_CLAIMED) return + } + } + + public fun release() { + /* + * Called from `releaseInterceptedContinuation`, can be concurrent with + * the code in `getResult` right after `trySuspend` returned `true`, so we have + * to wait for a release here. + */ + awaitReusability() + reusableCancellableContinuation?.detachChild() + } + /** * Claims the continuation for [suspendCancellableCoroutineReusable] block, * so all cancellations will be postponed. @@ -103,11 +122,20 @@ internal class DispatchedContinuation( _reusableCancellableContinuation.value = REUSABLE_CLAIMED return null } + // potentially competing with cancel state is CancellableContinuationImpl<*> -> { if (_reusableCancellableContinuation.compareAndSet(state, REUSABLE_CLAIMED)) { return state as CancellableContinuationImpl } } + state === REUSABLE_CLAIMED -> { + // Do nothing, wait until reusable instance will be returned from + // getResult() of a previous `suspendCancellableCoroutineReusable` + } + state is Throwable -> { + // Also do nothing, Throwable can only indicate that the CC + // is in REUSABLE_CLAIMED state, but with postponed cancellation + } else -> error("Inconsistent state $state") } } @@ -127,14 +155,13 @@ internal class DispatchedContinuation( * * See [CancellableContinuationImpl.getResult]. */ - fun checkPostponedCancellation(continuation: CancellableContinuation<*>): Throwable? { + fun tryReleaseClaimedContinuation(continuation: CancellableContinuation<*>): Throwable? { _reusableCancellableContinuation.loop { state -> // not when(state) to avoid Intrinsics.equals call when { state === REUSABLE_CLAIMED -> { if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, continuation)) return null } - state === null -> return null state is Throwable -> { require(_reusableCancellableContinuation.compareAndSet(state, null)) return state diff --git a/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt b/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt index 55c05c55b0..c7c2c04e59 100644 --- a/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -52,4 +52,4 @@ class CancelledAwaitStressTest : TestBase() { private fun keepMe(a: ByteArray) { // does nothing, makes sure the variable is kept in state-machine } -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/FieldWalker.kt b/kotlinx-coroutines-core/jvm/test/FieldWalker.kt index e8079ebdfa..c4232d6e60 100644 --- a/kotlinx-coroutines-core/jvm/test/FieldWalker.kt +++ b/kotlinx-coroutines-core/jvm/test/FieldWalker.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -56,7 +56,7 @@ object FieldWalker { * Reflectively starts to walk through object graph and map to all the reached object to their path * in from root. Use [showPath] do display a path if needed. */ - private fun walkRefs(root: Any?, rootStatics: Boolean): Map { + private fun walkRefs(root: Any?, rootStatics: Boolean): IdentityHashMap { val visited = IdentityHashMap() if (root == null) return visited visited[root] = Ref.RootRef diff --git a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt new file mode 100644 index 0000000000..8a20e0843f --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt @@ -0,0 +1,41 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import kotlinx.coroutines.channels.* +import org.junit.Test +import kotlin.test.* + +class ReusableCancellableContinuationLeakStressTest : TestBase() { + + @Suppress("UnnecessaryVariable") + private suspend fun ReceiveChannel.receiveBatch(): T { + val r = receive() // DO NOT MERGE LINES, otherwise TCE will kick in + return r + } + + private val iterations = 100_000 * stressTestMultiplier + + class Leak(val i: Int) + + @Test // Simplified version of #2564 + fun testReusableContinuationLeak() = runTest { + val channel = produce(capacity = 1) { // from the main thread + (0 until iterations).forEach { + send(Leak(it)) + } + } + + launch(Dispatchers.Default) { + repeat (iterations) { + val value = channel.receiveBatch() + assertEquals(it, value.i) + } + (channel as Job).join() + + FieldWalker.assertReachableCount(0, coroutineContext.job, false) { it is Leak } + } + } +}