Skip to content

Commit e608dfb

Browse files
authored
Rework reusability control in cancellable continuation (#2581)
* Rework reusability control in cancellable continuation * Update initCancellability documentation and implementation to be aligned with current invariants * Make parentHandle non-volatile and ensure there are no races around it * Establish new reusability invariants - Reusable continuation can be used _only_ if it states is not REUSABLE_CLAIMED - If it is, spin-loop and wait for release - Now the parent is attached to reusable continuation only if it was suspended at least once. Otherwise, the state machine can return via fast-path and no one will be able to release intercepted continuation (-> detach from parent) - It implies that the parent is attached after trySuspend call and can be concurrently reused, this is where new invariant comes into play * Leverage the fact that it's non-atomic and do not check it for cancellation prematurely. It increases the performance of fast-path, but potentially affects rare cancellation cases Fixes #2564
1 parent 5f9e52c commit e608dfb

8 files changed

+167
-66
lines changed

Diff for: kotlinx-coroutines-core/common/src/CancellableContinuation.kt

+5-3
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,10 @@ public interface CancellableContinuation<in T> : Continuation<T> {
104104
public fun completeResume(token: Any)
105105

106106
/**
107-
* Legacy function that turned on cancellation behavior in [suspendCancellableCoroutine] before kotlinx.coroutines 1.1.0.
108-
* This function does nothing and is left only for binary compatibility with old compiled code.
107+
* Internal function that setups cancellation behavior in [suspendCancellableCoroutine].
108+
* It's illegal to call this function in any non-`kotlinx.coroutines` code and
109+
* such calls lead to undefined behaviour.
110+
* Exposed in our ABI since 1.0.0 withing `suspendCancellableCoroutine` body.
109111
*
110112
* @suppress **This is unstable API and it is subject to change.**
111113
*/
@@ -332,7 +334,7 @@ internal suspend inline fun <T> suspendCancellableCoroutineReusable(
332334
internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>): CancellableContinuationImpl<T> {
333335
// If used outside of our dispatcher
334336
if (delegate !is DispatchedContinuation<T>) {
335-
return CancellableContinuationImpl(delegate, MODE_CANCELLABLE_REUSABLE)
337+
return CancellableContinuationImpl(delegate, MODE_CANCELLABLE)
336338
}
337339
/*
338340
* Attempt to claim reusable instance.

Diff for: kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt

+70-46
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,7 @@ internal open class CancellableContinuationImpl<in T>(
7272
*/
7373
private val _state = atomic<Any?>(Active)
7474

75-
private val _parentHandle = atomic<DisposableHandle?>(null)
76-
private var parentHandle: DisposableHandle?
77-
get() = _parentHandle.value
78-
set(value) { _parentHandle.value = value }
75+
private var parentHandle: DisposableHandle? = null
7976

8077
internal val state: Any? get() = _state.value
8178

@@ -93,7 +90,21 @@ internal open class CancellableContinuationImpl<in T>(
9390
}
9491

9592
public override fun initCancellability() {
96-
setupCancellation()
93+
/*
94+
* Invariant: at the moment of invocation, `this` has not yet
95+
* leaked to user code and no one is able to invoke `resume` or `cancel`
96+
* on it yet. Also, this function is not invoked for reusable continuations.
97+
*/
98+
val handle = installParentHandle()
99+
?: return // fast path -- don't do anything without parent
100+
// now check our state _after_ registering, could have completed while we were registering,
101+
// but only if parent was cancelled. Parent could be in a "cancelling" state for a while,
102+
// so we are helping it and cleaning the node ourselves
103+
if (isCompleted) {
104+
// Can be invoked concurrently in 'parentCancelled', no problems here
105+
handle.dispose()
106+
parentHandle = NonDisposableHandle
107+
}
97108
}
98109

99110
private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable(this)
@@ -118,40 +129,6 @@ internal open class CancellableContinuationImpl<in T>(
118129
return true
119130
}
120131

121-
/**
122-
* Setups parent cancellation and checks for postponed cancellation in the case of reusable continuations.
123-
* It is only invoked from an internal [getResult] function for reusable continuations
124-
* and from [suspendCancellableCoroutine] to establish a cancellation before registering CC anywhere.
125-
*/
126-
private fun setupCancellation() {
127-
if (checkCompleted()) return
128-
if (parentHandle !== null) return // fast path 2 -- was already initialized
129-
val parent = delegate.context[Job] ?: return // fast path 3 -- don't do anything without parent
130-
val handle = parent.invokeOnCompletion(
131-
onCancelling = true,
132-
handler = ChildContinuation(this).asHandler
133-
)
134-
parentHandle = handle
135-
// now check our state _after_ registering (could have completed while we were registering)
136-
// Also note that we do not dispose parent for reusable continuations, dispatcher will do that for us
137-
if (isCompleted && !isReusable()) {
138-
handle.dispose() // it is Ok to call dispose twice -- here and in disposeParentHandle
139-
parentHandle = NonDisposableHandle // release it just in case, to aid GC
140-
}
141-
}
142-
143-
private fun checkCompleted(): Boolean {
144-
val completed = isCompleted
145-
if (!resumeMode.isReusableMode) return completed // Do not check postponed cancellation for non-reusable continuations
146-
val dispatched = delegate as? DispatchedContinuation<*> ?: return completed
147-
val cause = dispatched.checkPostponedCancellation(this) ?: return completed
148-
if (!completed) {
149-
// Note: this cancel may fail if one more concurrent cancel is currently being invoked
150-
cancel(cause)
151-
}
152-
return true
153-
}
154-
155132
public override val callerFrame: CoroutineStackFrame?
156133
get() = delegate as? CoroutineStackFrame
157134

@@ -188,7 +165,9 @@ internal open class CancellableContinuationImpl<in T>(
188165
*/
189166
private fun cancelLater(cause: Throwable): Boolean {
190167
if (!resumeMode.isReusableMode) return false
191-
val dispatched = (delegate as? DispatchedContinuation<*>) ?: return false
168+
// Ensure that we are postponing cancellation to the right instance
169+
if (!isReusable()) return false
170+
val dispatched = delegate as DispatchedContinuation<*>
192171
return dispatched.postponeCancellation(cause)
193172
}
194173

@@ -216,7 +195,7 @@ internal open class CancellableContinuationImpl<in T>(
216195

217196
private inline fun callCancelHandlerSafely(block: () -> Unit) {
218197
try {
219-
block()
198+
block()
220199
} catch (ex: Throwable) {
221200
// Handler should never fail, if it does -- it is an unhandled exception
222201
handleCoroutineException(
@@ -276,9 +255,33 @@ internal open class CancellableContinuationImpl<in T>(
276255

277256
@PublishedApi
278257
internal fun getResult(): Any? {
279-
setupCancellation()
280-
if (trySuspend()) return COROUTINE_SUSPENDED
258+
val isReusable = isReusable()
259+
// trySuspend may fail either if 'block' has resumed/cancelled a continuation
260+
// or we got async cancellation from parent.
261+
if (trySuspend()) {
262+
/*
263+
* We were neither resumed nor cancelled, time to suspend.
264+
* But first we have to install parent cancellation handle (if we didn't yet),
265+
* so CC could be properly resumed on parent cancellation.
266+
*/
267+
if (parentHandle == null) {
268+
installParentHandle()
269+
}
270+
/*
271+
* Release the continuation after installing the handle (if needed).
272+
* If we were successful, then do nothing, it's ok to reuse the instance now.
273+
* Otherwise, dispose the handle by ourselves.
274+
*/
275+
if (isReusable) {
276+
releaseClaimedReusableContinuation()
277+
}
278+
return COROUTINE_SUSPENDED
279+
}
281280
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
281+
if (isReusable) {
282+
// release claimed reusable continuation for the future reuse
283+
releaseClaimedReusableContinuation()
284+
}
282285
val state = this.state
283286
if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this)
284287
// if the parent job was already cancelled, then throw the corresponding cancellation exception
@@ -296,6 +299,28 @@ internal open class CancellableContinuationImpl<in T>(
296299
return getSuccessfulResult(state)
297300
}
298301

302+
private fun installParentHandle(): DisposableHandle? {
303+
val parent = context[Job] ?: return null // don't do anything without a parent
304+
// Install the handle
305+
val handle = parent.invokeOnCompletion(
306+
onCancelling = true,
307+
handler = ChildContinuation(this).asHandler
308+
)
309+
parentHandle = handle
310+
return handle
311+
}
312+
313+
/**
314+
* Tries to release reusable continuation. It can fail is there was an asynchronous cancellation,
315+
* in which case it detaches from the parent and cancels this continuation.
316+
*/
317+
private fun releaseClaimedReusableContinuation() {
318+
// Cannot be casted if e.g. invoked from `installParentHandleReusable` for context without dispatchers, but with Job in it
319+
val cancellationCause = (delegate as? DispatchedContinuation<*>)?.tryReleaseClaimedContinuation(this) ?: return
320+
detachChild()
321+
cancel(cancellationCause)
322+
}
323+
299324
override fun resumeWith(result: Result<T>) =
300325
resumeImpl(result.toState(this), resumeMode)
301326

@@ -462,11 +487,10 @@ internal open class CancellableContinuationImpl<in T>(
462487

463488
/**
464489
* Detaches from the parent.
465-
* Invariant: used from [CoroutineDispatcher.releaseInterceptedContinuation] iff [isReusable] is `true`
466490
*/
467491
internal fun detachChild() {
468-
val handle = parentHandle
469-
handle?.dispose()
492+
val handle = parentHandle ?: return
493+
handle.dispose()
470494
parentHandle = NonDisposableHandle
471495
}
472496

Diff for: kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt

+6-1
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,12 @@ public abstract class CoroutineDispatcher :
101101

102102
@InternalCoroutinesApi
103103
public override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
104-
(continuation as DispatchedContinuation<*>).reusableCancellableContinuation?.detachChild()
104+
/*
105+
* Unconditional cast is safe here: we only return DispatchedContinuation from `interceptContinuation`,
106+
* any ClassCastException can only indicate compiler bug
107+
*/
108+
val dispatched = continuation as DispatchedContinuation<*>
109+
dispatched.release()
105110
}
106111

107112
/**

Diff for: kotlinx-coroutines-core/common/src/JobSupport.kt

+2
Original file line numberDiff line numberDiff line change
@@ -1228,6 +1228,8 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
12281228
* thrown and not a JobCancellationException.
12291229
*/
12301230
val cont = AwaitContinuation(uCont.intercepted(), this)
1231+
// we are mimicking suspendCancellableCoroutine here and call initCancellability, too.
1232+
cont.initCancellability()
12311233
cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(cont).asHandler))
12321234
cont.getResult()
12331235
}

Diff for: kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt

+39-12
Original file line numberDiff line numberDiff line change
@@ -46,17 +46,15 @@ internal class DispatchedContinuation<in T>(
4646
* 4) [Throwable] continuation was cancelled with this cause while being in [suspendCancellableCoroutineReusable],
4747
* [CancellableContinuationImpl.getResult] will check for cancellation later.
4848
*
49-
* [REUSABLE_CLAIMED] state is required to prevent the lost resume in the channel.
50-
* AbstractChannel.receive method relies on the fact that the following pattern
49+
* [REUSABLE_CLAIMED] state is required to prevent double-use of the reused continuation.
50+
* In the `getResult`, we have the following code:
5151
* ```
52-
* suspendCancellableCoroutineReusable { cont ->
53-
* val result = pollFastPath()
54-
* if (result != null) cont.resume(result)
52+
* if (trySuspend()) {
53+
* // <- at this moment current continuation can be redispatched and claimed again.
54+
* attachChildToParent()
55+
* releaseClaimedContinuation()
5556
* }
5657
* ```
57-
* always succeeds.
58-
* To make it always successful, we actually postpone "reusable" cancellation
59-
* to this phase and set cancellation only at the moment of instantiation.
6058
*/
6159
private val _reusableCancellableContinuation = atomic<Any?>(null)
6260

@@ -66,9 +64,9 @@ internal class DispatchedContinuation<in T>(
6664
public fun isReusable(requester: CancellableContinuationImpl<*>): Boolean {
6765
/*
6866
* Reusability control:
69-
* `null` -> no reusability at all, false
67+
* `null` -> no reusability at all, `false`
7068
* If current state is not CCI, then we are within `suspendCancellableCoroutineReusable`, true
71-
* Else, if result is CCI === requester.
69+
* Else, if result is CCI === requester, then it's our reusable continuation
7270
* Identity check my fail for the following pattern:
7371
* ```
7472
* loop:
@@ -82,6 +80,27 @@ internal class DispatchedContinuation<in T>(
8280
return true
8381
}
8482

83+
84+
/**
85+
* Awaits until previous call to `suspendCancellableCoroutineReusable` will
86+
* stop mutating cached instance
87+
*/
88+
public fun awaitReusability() {
89+
_reusableCancellableContinuation.loop { it ->
90+
if (it !== REUSABLE_CLAIMED) return
91+
}
92+
}
93+
94+
public fun release() {
95+
/*
96+
* Called from `releaseInterceptedContinuation`, can be concurrent with
97+
* the code in `getResult` right after `trySuspend` returned `true`, so we have
98+
* to wait for a release here.
99+
*/
100+
awaitReusability()
101+
reusableCancellableContinuation?.detachChild()
102+
}
103+
85104
/**
86105
* Claims the continuation for [suspendCancellableCoroutineReusable] block,
87106
* so all cancellations will be postponed.
@@ -103,11 +122,20 @@ internal class DispatchedContinuation<in T>(
103122
_reusableCancellableContinuation.value = REUSABLE_CLAIMED
104123
return null
105124
}
125+
// potentially competing with cancel
106126
state is CancellableContinuationImpl<*> -> {
107127
if (_reusableCancellableContinuation.compareAndSet(state, REUSABLE_CLAIMED)) {
108128
return state as CancellableContinuationImpl<T>
109129
}
110130
}
131+
state === REUSABLE_CLAIMED -> {
132+
// Do nothing, wait until reusable instance will be returned from
133+
// getResult() of a previous `suspendCancellableCoroutineReusable`
134+
}
135+
state is Throwable -> {
136+
// Also do nothing, Throwable can only indicate that the CC
137+
// is in REUSABLE_CLAIMED state, but with postponed cancellation
138+
}
111139
else -> error("Inconsistent state $state")
112140
}
113141
}
@@ -127,14 +155,13 @@ internal class DispatchedContinuation<in T>(
127155
*
128156
* See [CancellableContinuationImpl.getResult].
129157
*/
130-
fun checkPostponedCancellation(continuation: CancellableContinuation<*>): Throwable? {
158+
fun tryReleaseClaimedContinuation(continuation: CancellableContinuation<*>): Throwable? {
131159
_reusableCancellableContinuation.loop { state ->
132160
// not when(state) to avoid Intrinsics.equals call
133161
when {
134162
state === REUSABLE_CLAIMED -> {
135163
if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, continuation)) return null
136164
}
137-
state === null -> return null
138165
state is Throwable -> {
139166
require(_reusableCancellableContinuation.compareAndSet(state, null))
140167
return state

Diff for: kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines
@@ -52,4 +52,4 @@ class CancelledAwaitStressTest : TestBase() {
5252
private fun keepMe(a: ByteArray) {
5353
// does nothing, makes sure the variable is kept in state-machine
5454
}
55-
}
55+
}

Diff for: kotlinx-coroutines-core/jvm/test/FieldWalker.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines
@@ -56,7 +56,7 @@ object FieldWalker {
5656
* Reflectively starts to walk through object graph and map to all the reached object to their path
5757
* in from root. Use [showPath] do display a path if needed.
5858
*/
59-
private fun walkRefs(root: Any?, rootStatics: Boolean): Map<Any, Ref> {
59+
private fun walkRefs(root: Any?, rootStatics: Boolean): IdentityHashMap<Any, Ref> {
6060
val visited = IdentityHashMap<Any, Ref>()
6161
if (root == null) return visited
6262
visited[root] = Ref.RootRef
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlinx.coroutines.channels.*
8+
import org.junit.Test
9+
import kotlin.test.*
10+
11+
class ReusableCancellableContinuationLeakStressTest : TestBase() {
12+
13+
@Suppress("UnnecessaryVariable")
14+
private suspend fun <T : Any> ReceiveChannel<T>.receiveBatch(): T {
15+
val r = receive() // DO NOT MERGE LINES, otherwise TCE will kick in
16+
return r
17+
}
18+
19+
private val iterations = 100_000 * stressTestMultiplier
20+
21+
class Leak(val i: Int)
22+
23+
@Test // Simplified version of #2564
24+
fun testReusableContinuationLeak() = runTest {
25+
val channel = produce(capacity = 1) { // from the main thread
26+
(0 until iterations).forEach {
27+
send(Leak(it))
28+
}
29+
}
30+
31+
launch(Dispatchers.Default) {
32+
repeat (iterations) {
33+
val value = channel.receiveBatch()
34+
assertEquals(it, value.i)
35+
}
36+
(channel as Job).join()
37+
38+
FieldWalker.assertReachableCount(0, coroutineContext.job, false) { it is Leak }
39+
}
40+
}
41+
}

0 commit comments

Comments
 (0)