From 9306f8c69ca184ac1c884bbff0551a4c1bcda080 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Sat, 13 Mar 2021 18:07:19 +0300 Subject: [PATCH 1/6] Rework reusability control in cancellable contination * Update initCancellability documentation and implementation to be aligned with current invariants * Make parentHandle non-volatile and ensure there are no races around it * Establish new reusability invariants - Reusable continuation can be used _only_ if it's state is not REUSABLE_CLAIMED - If it is, spin-loop and wait for release - Now the parent is attached to reusable continuation only if it was suspended at least once. Otherwise, the state machine can return via fast-path and noone will be able to release intercepted continuation (-> detach from parent) - It implies that the parent is attached after trySuspend call and can be concurrently reused, this is where new invariant comes into play Fixes #2564 --- .../common/src/CancellableContinuation.kt | 6 +- .../common/src/CancellableContinuationImpl.kt | 143 ++++++++++++------ .../common/src/CoroutineDispatcher.kt | 7 +- .../src/internal/DispatchedContinuation.kt | 47 ++++-- .../jvm/test/FieldWalker.kt | 4 +- ...leCancellableContinuationLeakStressTest.kt | 41 +++++ 6 files changed, 186 insertions(+), 62 deletions(-) create mode 100644 kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt index 8f589912a0..168e15b603 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt @@ -104,8 +104,10 @@ public interface CancellableContinuation : Continuation { public fun completeResume(token: Any) /** - * Legacy function that turned on cancellation behavior in [suspendCancellableCoroutine] before kotlinx.coroutines 1.1.0. - * This function does nothing and is left only for binary compatibility with old compiled code. + * Internal function that setups cancellation behavior in [suspendCancellableCoroutine]. + * It's illegal to call this function in any non-`kotlinx.coroutines` code and + * such calls lead to undefined behaviour. + * Exposes in our ABI since 1.0.0 withing `suspendCancellableCoroutine` body. * * @suppress **This is unstable API and it is subject to change.** */ diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt index 1a8f35663a..0d2f29c5df 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt @@ -72,10 +72,7 @@ internal open class CancellableContinuationImpl( */ private val _state = atomic(Active) - private val _parentHandle = atomic(null) - private var parentHandle: DisposableHandle? - get() = _parentHandle.value - set(value) { _parentHandle.value = value } + private var parentHandle: DisposableHandle? = null internal val state: Any? get() = _state.value @@ -93,7 +90,25 @@ internal open class CancellableContinuationImpl( } public override fun initCancellability() { - setupCancellation() + /* + * Invariant: at the moment of invocation, `this` has not yet + * leaked to user code and no one is able to invoke `resume` or `cancel` + * on it yet. Also, this function is not invoked for reusable continuations. + */ + val parent = context[Job] ?: return // fast path -- don't do anything without parent + val handle = parent.invokeOnCompletion( + onCancelling = true, + handler = ChildContinuation(this).asHandler + ) + parentHandle = handle + // now check our state _after_ registering, could have completed while we were registering, + // but only if parent was cancelled. Parent could be in a "cancelling" state for a while, + // so we are helping him and cleaning the node ourselves + if (isCompleted) { + // Can be invoked concurrently in 'parentCancelled', no problems here + handle.dispose() + parentHandle = NonDisposableHandle + } } private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable(this) @@ -118,40 +133,6 @@ internal open class CancellableContinuationImpl( return true } - /** - * Setups parent cancellation and checks for postponed cancellation in the case of reusable continuations. - * It is only invoked from an internal [getResult] function for reusable continuations - * and from [suspendCancellableCoroutine] to establish a cancellation before registering CC anywhere. - */ - private fun setupCancellation() { - if (checkCompleted()) return - if (parentHandle !== null) return // fast path 2 -- was already initialized - val parent = delegate.context[Job] ?: return // fast path 3 -- don't do anything without parent - val handle = parent.invokeOnCompletion( - onCancelling = true, - handler = ChildContinuation(this).asHandler - ) - parentHandle = handle - // now check our state _after_ registering (could have completed while we were registering) - // Also note that we do not dispose parent for reusable continuations, dispatcher will do that for us - if (isCompleted && !isReusable()) { - handle.dispose() // it is Ok to call dispose twice -- here and in disposeParentHandle - parentHandle = NonDisposableHandle // release it just in case, to aid GC - } - } - - private fun checkCompleted(): Boolean { - val completed = isCompleted - if (!resumeMode.isReusableMode) return completed // Do not check postponed cancellation for non-reusable continuations - val dispatched = delegate as? DispatchedContinuation<*> ?: return completed - val cause = dispatched.checkPostponedCancellation(this) ?: return completed - if (!completed) { - // Note: this cancel may fail if one more concurrent cancel is currently being invoked - cancel(cause) - } - return true - } - public override val callerFrame: CoroutineStackFrame? get() = delegate as? CoroutineStackFrame @@ -216,7 +197,7 @@ internal open class CancellableContinuationImpl( private inline fun callCancelHandlerSafely(block: () -> Unit) { try { - block() + block() } catch (ex: Throwable) { // Handler should never fail, if it does -- it is an unhandled exception handleCoroutineException( @@ -274,10 +255,54 @@ internal open class CancellableContinuationImpl( } } + private fun checkCancellation(): Job? { + // Don't need to check for non-reusable continuations, handle is already installed + if (!resumeMode.isReusableMode) return null + val parent: Job? + if (parentHandle == null) { + // No parent -- no postponed and no async cancellations + parent = context[Job] ?: return null + /* + * Rare slow-path: parent handle is not yet installed in reusable CC, + * but parent is cancelled. Just let already existing machinery to figure everything out + * and advance state machine for us. + */ + if (parent.isCancelled) { + installParentHandleReusable(parent) + return parent + } + } else { + // Parent handle is not null, no need to lookup it + parent = null + } + return parent + } + @PublishedApi internal fun getResult(): Any? { - setupCancellation() - if (trySuspend()) return COROUTINE_SUSPENDED + val isReusable = isReusable() + /* + * Check postponed or async cancellation for reusable continuations. + * Returns job to avoid looking it up twice + */ + val parentJob = checkCancellation() + // trySuspend may fail either if 'block' has resumed/cancelled a continuation + // or we got async cancellation from parent. + if (trySuspend()) { + /* + * We were neither resumed nor cancelled, time to suspend. + * But first we have to install parent cancellation handle (if we didn't yet), + * so CC could be properly resumed on parent cancellation. + */ + if (parentHandle == null) { + installParentHandleReusable(parentJob) + } else if (isReusable) { + releaseClaimedReusableContinuation() + } + return COROUTINE_SUSPENDED + } else if (isReusable) { + releaseClaimedReusableContinuation() + } // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state val state = this.state if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this) @@ -296,6 +321,31 @@ internal open class CancellableContinuationImpl( return getSuccessfulResult(state) } + private fun installParentHandleReusable(parent: Job?) { + if (parent == null) return // don't do anything without parent or if completed + // Install the handle + val handle = parent.invokeOnCompletion( + onCancelling = true, + handler = ChildContinuation(this).asHandler + ) + parentHandle = handle + /* + * Finally release the continuation after installing the handle. If we were successful, then + * do nothing, it's ok to reuse the instance now. + * Otherwise, dispose the handle by ourselves. + */ + releaseClaimedReusableContinuation() + } + + private fun releaseClaimedReusableContinuation() { + val cancellationCause = (delegate as DispatchedContinuation<*>).tryReleaseClaimedContinuation(this) ?: return + parentHandle?.let { + it.dispose() + parentHandle = NonDisposableHandle + } + cancel(cancellationCause) + } + override fun resumeWith(result: Result) = resumeImpl(result.toState(this), resumeMode) @@ -462,12 +512,15 @@ internal open class CancellableContinuationImpl( /** * Detaches from the parent. - * Invariant: used from [CoroutineDispatcher.releaseInterceptedContinuation] iff [isReusable] is `true` + * * Used from [CoroutineDispatcher.releaseInterceptedContinuation] iff [isReusable] is `true` + * * Used from [parentCancelled] iff [isReusable] is `false` */ internal fun detachChild() { val handle = parentHandle - handle?.dispose() - parentHandle = NonDisposableHandle + if (handle != null) { + handle.dispose() + parentHandle = NonDisposableHandle + } } // Note: Always returns RESUME_TOKEN | null diff --git a/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt b/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt index b2b887988d..10be4a3d9e 100644 --- a/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt @@ -101,7 +101,12 @@ public abstract class CoroutineDispatcher : @InternalCoroutinesApi public override fun releaseInterceptedContinuation(continuation: Continuation<*>) { - (continuation as DispatchedContinuation<*>).reusableCancellableContinuation?.detachChild() + /* + * Unconditional cast is safe here: we only return DispatchedContinuation from `interceptContinuation`, + * any ClassCastException can only indicate compiler bug + */ + val dispatched = continuation as DispatchedContinuation<*> + dispatched.release() } /** diff --git a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt index 2874e7d592..c566f4c186 100644 --- a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt +++ b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt @@ -46,17 +46,15 @@ internal class DispatchedContinuation( * 4) [Throwable] continuation was cancelled with this cause while being in [suspendCancellableCoroutineReusable], * [CancellableContinuationImpl.getResult] will check for cancellation later. * - * [REUSABLE_CLAIMED] state is required to prevent the lost resume in the channel. - * AbstractChannel.receive method relies on the fact that the following pattern + * [REUSABLE_CLAIMED] state is required to prevent double-use of the reused continuation. + * In the `getResult`, we have the following code: * ``` - * suspendCancellableCoroutineReusable { cont -> - * val result = pollFastPath() - * if (result != null) cont.resume(result) + * if (trySuspend()) { + * // <- at this moment current continuation can be redispatched and claimed again. + * attachChildToParent() + * releaseClaimedContinuation() * } * ``` - * always succeeds. - * To make it always successful, we actually postpone "reusable" cancellation - * to this phase and set cancellation only at the moment of instantiation. */ private val _reusableCancellableContinuation = atomic(null) @@ -66,9 +64,9 @@ internal class DispatchedContinuation( public fun isReusable(requester: CancellableContinuationImpl<*>): Boolean { /* * Reusability control: - * `null` -> no reusability at all, false + * `null` -> no reusability at all, `false` * If current state is not CCI, then we are within `suspendCancellableCoroutineReusable`, true - * Else, if result is CCI === requester. + * Else, if result is CCI === requester, then it's our reusable continuation * Identity check my fail for the following pattern: * ``` * loop: @@ -82,6 +80,27 @@ internal class DispatchedContinuation( return true } + + /** + * Awaits until previous call to `suspendCancellableCoroutineReusable` will + * stop mutating cached instance + */ + public fun awaitReusability() { + _reusableCancellableContinuation.loop { it -> + if (it !== REUSABLE_CLAIMED) return + } + } + + public fun release() { + /* + * Called from `releaseInterceptedContinuation`, can be concurrent with + * the code in `getResult` right after `trySuspend` returned `true`, so we have + * to wait for a release here. + */ + awaitReusability() + reusableCancellableContinuation?.detachChild() + } + /** * Claims the continuation for [suspendCancellableCoroutineReusable] block, * so all cancellations will be postponed. @@ -103,11 +122,16 @@ internal class DispatchedContinuation( _reusableCancellableContinuation.value = REUSABLE_CLAIMED return null } + // potentially competing with cancel state is CancellableContinuationImpl<*> -> { if (_reusableCancellableContinuation.compareAndSet(state, REUSABLE_CLAIMED)) { return state as CancellableContinuationImpl } } + state === REUSABLE_CLAIMED -> { + // Do nothing, wait until reusable instance will be returned from + // getResult() of a previous `suspendCancellableCoroutineReusable` + } else -> error("Inconsistent state $state") } } @@ -127,14 +151,13 @@ internal class DispatchedContinuation( * * See [CancellableContinuationImpl.getResult]. */ - fun checkPostponedCancellation(continuation: CancellableContinuation<*>): Throwable? { + fun tryReleaseClaimedContinuation(continuation: CancellableContinuation<*>): Throwable? { _reusableCancellableContinuation.loop { state -> // not when(state) to avoid Intrinsics.equals call when { state === REUSABLE_CLAIMED -> { if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, continuation)) return null } - state === null -> return null state is Throwable -> { require(_reusableCancellableContinuation.compareAndSet(state, null)) return state diff --git a/kotlinx-coroutines-core/jvm/test/FieldWalker.kt b/kotlinx-coroutines-core/jvm/test/FieldWalker.kt index e8079ebdfa..c4232d6e60 100644 --- a/kotlinx-coroutines-core/jvm/test/FieldWalker.kt +++ b/kotlinx-coroutines-core/jvm/test/FieldWalker.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -56,7 +56,7 @@ object FieldWalker { * Reflectively starts to walk through object graph and map to all the reached object to their path * in from root. Use [showPath] do display a path if needed. */ - private fun walkRefs(root: Any?, rootStatics: Boolean): Map { + private fun walkRefs(root: Any?, rootStatics: Boolean): IdentityHashMap { val visited = IdentityHashMap() if (root == null) return visited visited[root] = Ref.RootRef diff --git a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt new file mode 100644 index 0000000000..8a20e0843f --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt @@ -0,0 +1,41 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import kotlinx.coroutines.channels.* +import org.junit.Test +import kotlin.test.* + +class ReusableCancellableContinuationLeakStressTest : TestBase() { + + @Suppress("UnnecessaryVariable") + private suspend fun ReceiveChannel.receiveBatch(): T { + val r = receive() // DO NOT MERGE LINES, otherwise TCE will kick in + return r + } + + private val iterations = 100_000 * stressTestMultiplier + + class Leak(val i: Int) + + @Test // Simplified version of #2564 + fun testReusableContinuationLeak() = runTest { + val channel = produce(capacity = 1) { // from the main thread + (0 until iterations).forEach { + send(Leak(it)) + } + } + + launch(Dispatchers.Default) { + repeat (iterations) { + val value = channel.receiveBatch() + assertEquals(it, value.i) + } + (channel as Job).join() + + FieldWalker.assertReachableCount(0, coroutineContext.job, false) { it is Leak } + } + } +} From 8bc7bb678efafbf9d759ac75b1209f12509be40d Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Sat, 13 Mar 2021 18:37:46 +0300 Subject: [PATCH 2/6] Simplify reusable continuations ever further * Leverage the fact that it's non-atomic and do not check it for cancellation prematurely. It increases the performance of fast-path, but potentially affects rare cancellation cases * Fix AwaitContinuation that has parentHandle == null but is not reusable --- .../common/src/CancellableContinuation.kt | 2 +- .../common/src/CancellableContinuationImpl.kt | 40 +++---------------- .../common/src/JobSupport.kt | 5 +++ .../jvm/test/CancelledAwaitStressTest.kt | 4 +- 4 files changed, 14 insertions(+), 37 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt index 168e15b603..cd1e87afad 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt @@ -107,7 +107,7 @@ public interface CancellableContinuation : Continuation { * Internal function that setups cancellation behavior in [suspendCancellableCoroutine]. * It's illegal to call this function in any non-`kotlinx.coroutines` code and * such calls lead to undefined behaviour. - * Exposes in our ABI since 1.0.0 withing `suspendCancellableCoroutine` body. + * Exposed in our ABI since 1.0.0 withing `suspendCancellableCoroutine` body. * * @suppress **This is unstable API and it is subject to change.** */ diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt index 0d2f29c5df..da00901d5c 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt @@ -255,37 +255,9 @@ internal open class CancellableContinuationImpl( } } - private fun checkCancellation(): Job? { - // Don't need to check for non-reusable continuations, handle is already installed - if (!resumeMode.isReusableMode) return null - val parent: Job? - if (parentHandle == null) { - // No parent -- no postponed and no async cancellations - parent = context[Job] ?: return null - /* - * Rare slow-path: parent handle is not yet installed in reusable CC, - * but parent is cancelled. Just let already existing machinery to figure everything out - * and advance state machine for us. - */ - if (parent.isCancelled) { - installParentHandleReusable(parent) - return parent - } - } else { - // Parent handle is not null, no need to lookup it - parent = null - } - return parent - } - @PublishedApi internal fun getResult(): Any? { val isReusable = isReusable() - /* - * Check postponed or async cancellation for reusable continuations. - * Returns job to avoid looking it up twice - */ - val parentJob = checkCancellation() // trySuspend may fail either if 'block' has resumed/cancelled a continuation // or we got async cancellation from parent. if (trySuspend()) { @@ -295,7 +267,7 @@ internal open class CancellableContinuationImpl( * so CC could be properly resumed on parent cancellation. */ if (parentHandle == null) { - installParentHandleReusable(parentJob) + installParentHandleReusable() } else if (isReusable) { releaseClaimedReusableContinuation() } @@ -321,14 +293,13 @@ internal open class CancellableContinuationImpl( return getSuccessfulResult(state) } - private fun installParentHandleReusable(parent: Job?) { - if (parent == null) return // don't do anything without parent or if completed + private fun installParentHandleReusable() { + val parent = context[Job] ?: return // don't do anything without parent or if completed // Install the handle - val handle = parent.invokeOnCompletion( + parentHandle = parent.invokeOnCompletion( onCancelling = true, handler = ChildContinuation(this).asHandler ) - parentHandle = handle /* * Finally release the continuation after installing the handle. If we were successful, then * do nothing, it's ok to reuse the instance now. @@ -338,7 +309,8 @@ internal open class CancellableContinuationImpl( } private fun releaseClaimedReusableContinuation() { - val cancellationCause = (delegate as DispatchedContinuation<*>).tryReleaseClaimedContinuation(this) ?: return + // Cannot be casted if e.g. invoked from `installParentHandleReusable` for context without dispatchers, but with Job in it + val cancellationCause = (delegate as? DispatchedContinuation<*>)?.tryReleaseClaimedContinuation(this) ?: return parentHandle?.let { it.dispose() parentHandle = NonDisposableHandle diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index 5b516ae27f..866ae867ba 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -1160,6 +1160,11 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren delegate: Continuation, private val job: JobSupport ) : CancellableContinuationImpl(delegate, MODE_CANCELLABLE) { + + init { + initCancellability() + } + override fun getContinuationCancellationCause(parent: Job): Throwable { val state = job.state /* diff --git a/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt b/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt index 55c05c55b0..c7c2c04e59 100644 --- a/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -52,4 +52,4 @@ class CancelledAwaitStressTest : TestBase() { private fun keepMe(a: ByteArray) { // does nothing, makes sure the variable is kept in state-machine } -} \ No newline at end of file +} From f9307e5455c685d4808189ba0022f4919741511f Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Fri, 19 Mar 2021 00:07:33 +0300 Subject: [PATCH 3/6] ~ Minor refactoring and some more comments --- .../common/src/CancellableContinuationImpl.kt | 57 +++++++++---------- .../common/src/JobSupport.kt | 7 +-- 2 files changed, 29 insertions(+), 35 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt index da00901d5c..42bc26d9d3 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt @@ -95,12 +95,8 @@ internal open class CancellableContinuationImpl( * leaked to user code and no one is able to invoke `resume` or `cancel` * on it yet. Also, this function is not invoked for reusable continuations. */ - val parent = context[Job] ?: return // fast path -- don't do anything without parent - val handle = parent.invokeOnCompletion( - onCancelling = true, - handler = ChildContinuation(this).asHandler - ) - parentHandle = handle + val handle = installParentHandle() + ?: return // fast path -- don't do anything without parent // now check our state _after_ registering, could have completed while we were registering, // but only if parent was cancelled. Parent could be in a "cancelling" state for a while, // so we are helping him and cleaning the node ourselves @@ -267,15 +263,23 @@ internal open class CancellableContinuationImpl( * so CC could be properly resumed on parent cancellation. */ if (parentHandle == null) { - installParentHandleReusable() - } else if (isReusable) { + installParentHandle() + } + /* + * Release the continuation after installing the handle (if needed). + * If we were successful, then do nothing, it's ok to reuse the instance now. + * Otherwise, dispose the handle by ourselves. + */ + if (isReusable) { releaseClaimedReusableContinuation() } return COROUTINE_SUSPENDED - } else if (isReusable) { - releaseClaimedReusableContinuation() } // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state + if (isReusable) { + // release claimed reusable continuation for the future reuse + releaseClaimedReusableContinuation() + } val state = this.state if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this) // if the parent job was already cancelled, then throw the corresponding cancellation exception @@ -293,28 +297,25 @@ internal open class CancellableContinuationImpl( return getSuccessfulResult(state) } - private fun installParentHandleReusable() { - val parent = context[Job] ?: return // don't do anything without parent or if completed + private fun installParentHandle(): DisposableHandle? { + val parent = context[Job] ?: return null // don't do anything without a parent // Install the handle - parentHandle = parent.invokeOnCompletion( + val handle = parent.invokeOnCompletion( onCancelling = true, handler = ChildContinuation(this).asHandler ) - /* - * Finally release the continuation after installing the handle. If we were successful, then - * do nothing, it's ok to reuse the instance now. - * Otherwise, dispose the handle by ourselves. - */ - releaseClaimedReusableContinuation() + parentHandle = handle + return handle } + /** + * Tries to release reusable continuation. It can fail is there was an asynchronous cancellation, + * in which case it detaches from the parent and cancels this continuation. + */ private fun releaseClaimedReusableContinuation() { // Cannot be casted if e.g. invoked from `installParentHandleReusable` for context without dispatchers, but with Job in it val cancellationCause = (delegate as? DispatchedContinuation<*>)?.tryReleaseClaimedContinuation(this) ?: return - parentHandle?.let { - it.dispose() - parentHandle = NonDisposableHandle - } + detachChild() cancel(cancellationCause) } @@ -484,15 +485,11 @@ internal open class CancellableContinuationImpl( /** * Detaches from the parent. - * * Used from [CoroutineDispatcher.releaseInterceptedContinuation] iff [isReusable] is `true` - * * Used from [parentCancelled] iff [isReusable] is `false` */ internal fun detachChild() { - val handle = parentHandle - if (handle != null) { - handle.dispose() - parentHandle = NonDisposableHandle - } + val handle = parentHandle ?: return + handle.dispose() + parentHandle = NonDisposableHandle } // Note: Always returns RESUME_TOKEN | null diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index 866ae867ba..f011dcf898 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -1160,11 +1160,6 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren delegate: Continuation, private val job: JobSupport ) : CancellableContinuationImpl(delegate, MODE_CANCELLABLE) { - - init { - initCancellability() - } - override fun getContinuationCancellationCause(parent: Job): Throwable { val state = job.state /* @@ -1233,6 +1228,8 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren * thrown and not a JobCancellationException. */ val cont = AwaitContinuation(uCont.intercepted(), this) + // we are mimicking suspendCancellableCoroutine here and call initCancellability, too. + cont.initCancellability() cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(cont).asHandler)) cont.getResult() } From 2f13e5c14f7762a1d9651ac3c00d7bd8b9e9592c Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Fri, 19 Mar 2021 00:12:49 +0300 Subject: [PATCH 4/6] ~ Comment fixed --- .../common/src/CancellableContinuationImpl.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt index 42bc26d9d3..b24f09a953 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt @@ -99,7 +99,7 @@ internal open class CancellableContinuationImpl( ?: return // fast path -- don't do anything without parent // now check our state _after_ registering, could have completed while we were registering, // but only if parent was cancelled. Parent could be in a "cancelling" state for a while, - // so we are helping him and cleaning the node ourselves + // so we are helping it and cleaning the node ourselves if (isCompleted) { // Can be invoked concurrently in 'parentCancelled', no problems here handle.dispose() From 6e51bdb118bb4e0cdd7a4108d7ddac95f942dbbf Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 23 Mar 2021 16:57:13 +0300 Subject: [PATCH 5/6] ~do not postpone cancellation for reusable continuations that do not belong to the current instance --- kotlinx-coroutines-core/common/src/CancellableContinuation.kt | 2 +- .../common/src/CancellableContinuationImpl.kt | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt index cd1e87afad..b133b7935d 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt @@ -334,7 +334,7 @@ internal suspend inline fun suspendCancellableCoroutineReusable( internal fun getOrCreateCancellableContinuation(delegate: Continuation): CancellableContinuationImpl { // If used outside of our dispatcher if (delegate !is DispatchedContinuation) { - return CancellableContinuationImpl(delegate, MODE_CANCELLABLE_REUSABLE) + return CancellableContinuationImpl(delegate, MODE_CANCELLABLE) } /* * Attempt to claim reusable instance. diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt index b24f09a953..c310623c5d 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt @@ -165,7 +165,9 @@ internal open class CancellableContinuationImpl( */ private fun cancelLater(cause: Throwable): Boolean { if (!resumeMode.isReusableMode) return false - val dispatched = (delegate as? DispatchedContinuation<*>) ?: return false + // Ensure that we are postponing cancellation to the right instance + if (!isReusable()) return false + val dispatched = delegate as DispatchedContinuation<*> return dispatched.postponeCancellation(cause) } From efd160e7a8514785f77ec0a8460754b2ed43f177 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 23 Mar 2021 17:28:05 +0300 Subject: [PATCH 6/6] ~properly await for cancellable continuation to be released --- .../common/src/internal/DispatchedContinuation.kt | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt index c566f4c186..45b9699c84 100644 --- a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt +++ b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt @@ -132,6 +132,10 @@ internal class DispatchedContinuation( // Do nothing, wait until reusable instance will be returned from // getResult() of a previous `suspendCancellableCoroutineReusable` } + state is Throwable -> { + // Also do nothing, Throwable can only indicate that the CC + // is in REUSABLE_CLAIMED state, but with postponed cancellation + } else -> error("Inconsistent state $state") } }