Skip to content

Commit 9306f8c

Browse files
committed
Rework reusability control in cancellable contination
* Update initCancellability documentation and implementation to be aligned with current invariants * Make parentHandle non-volatile and ensure there are no races around it * Establish new reusability invariants - Reusable continuation can be used _only_ if it's state is not REUSABLE_CLAIMED - If it is, spin-loop and wait for release - Now the parent is attached to reusable continuation only if it was suspended at least once. Otherwise, the state machine can return via fast-path and noone will be able to release intercepted continuation (-> detach from parent) - It implies that the parent is attached after trySuspend call and can be concurrently reused, this is where new invariant comes into play Fixes #2564
1 parent a8d55d6 commit 9306f8c

File tree

6 files changed

+186
-62
lines changed

6 files changed

+186
-62
lines changed

kotlinx-coroutines-core/common/src/CancellableContinuation.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,10 @@ public interface CancellableContinuation<in T> : Continuation<T> {
104104
public fun completeResume(token: Any)
105105

106106
/**
107-
* Legacy function that turned on cancellation behavior in [suspendCancellableCoroutine] before kotlinx.coroutines 1.1.0.
108-
* This function does nothing and is left only for binary compatibility with old compiled code.
107+
* Internal function that setups cancellation behavior in [suspendCancellableCoroutine].
108+
* It's illegal to call this function in any non-`kotlinx.coroutines` code and
109+
* such calls lead to undefined behaviour.
110+
* Exposes in our ABI since 1.0.0 withing `suspendCancellableCoroutine` body.
109111
*
110112
* @suppress **This is unstable API and it is subject to change.**
111113
*/

kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt

Lines changed: 98 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,7 @@ internal open class CancellableContinuationImpl<in T>(
7272
*/
7373
private val _state = atomic<Any?>(Active)
7474

75-
private val _parentHandle = atomic<DisposableHandle?>(null)
76-
private var parentHandle: DisposableHandle?
77-
get() = _parentHandle.value
78-
set(value) { _parentHandle.value = value }
75+
private var parentHandle: DisposableHandle? = null
7976

8077
internal val state: Any? get() = _state.value
8178

@@ -93,7 +90,25 @@ internal open class CancellableContinuationImpl<in T>(
9390
}
9491

9592
public override fun initCancellability() {
96-
setupCancellation()
93+
/*
94+
* Invariant: at the moment of invocation, `this` has not yet
95+
* leaked to user code and no one is able to invoke `resume` or `cancel`
96+
* on it yet. Also, this function is not invoked for reusable continuations.
97+
*/
98+
val parent = context[Job] ?: return // fast path -- don't do anything without parent
99+
val handle = parent.invokeOnCompletion(
100+
onCancelling = true,
101+
handler = ChildContinuation(this).asHandler
102+
)
103+
parentHandle = handle
104+
// now check our state _after_ registering, could have completed while we were registering,
105+
// but only if parent was cancelled. Parent could be in a "cancelling" state for a while,
106+
// so we are helping him and cleaning the node ourselves
107+
if (isCompleted) {
108+
// Can be invoked concurrently in 'parentCancelled', no problems here
109+
handle.dispose()
110+
parentHandle = NonDisposableHandle
111+
}
97112
}
98113

99114
private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable(this)
@@ -118,40 +133,6 @@ internal open class CancellableContinuationImpl<in T>(
118133
return true
119134
}
120135

121-
/**
122-
* Setups parent cancellation and checks for postponed cancellation in the case of reusable continuations.
123-
* It is only invoked from an internal [getResult] function for reusable continuations
124-
* and from [suspendCancellableCoroutine] to establish a cancellation before registering CC anywhere.
125-
*/
126-
private fun setupCancellation() {
127-
if (checkCompleted()) return
128-
if (parentHandle !== null) return // fast path 2 -- was already initialized
129-
val parent = delegate.context[Job] ?: return // fast path 3 -- don't do anything without parent
130-
val handle = parent.invokeOnCompletion(
131-
onCancelling = true,
132-
handler = ChildContinuation(this).asHandler
133-
)
134-
parentHandle = handle
135-
// now check our state _after_ registering (could have completed while we were registering)
136-
// Also note that we do not dispose parent for reusable continuations, dispatcher will do that for us
137-
if (isCompleted && !isReusable()) {
138-
handle.dispose() // it is Ok to call dispose twice -- here and in disposeParentHandle
139-
parentHandle = NonDisposableHandle // release it just in case, to aid GC
140-
}
141-
}
142-
143-
private fun checkCompleted(): Boolean {
144-
val completed = isCompleted
145-
if (!resumeMode.isReusableMode) return completed // Do not check postponed cancellation for non-reusable continuations
146-
val dispatched = delegate as? DispatchedContinuation<*> ?: return completed
147-
val cause = dispatched.checkPostponedCancellation(this) ?: return completed
148-
if (!completed) {
149-
// Note: this cancel may fail if one more concurrent cancel is currently being invoked
150-
cancel(cause)
151-
}
152-
return true
153-
}
154-
155136
public override val callerFrame: CoroutineStackFrame?
156137
get() = delegate as? CoroutineStackFrame
157138

@@ -216,7 +197,7 @@ internal open class CancellableContinuationImpl<in T>(
216197

217198
private inline fun callCancelHandlerSafely(block: () -> Unit) {
218199
try {
219-
block()
200+
block()
220201
} catch (ex: Throwable) {
221202
// Handler should never fail, if it does -- it is an unhandled exception
222203
handleCoroutineException(
@@ -274,10 +255,54 @@ internal open class CancellableContinuationImpl<in T>(
274255
}
275256
}
276257

258+
private fun checkCancellation(): Job? {
259+
// Don't need to check for non-reusable continuations, handle is already installed
260+
if (!resumeMode.isReusableMode) return null
261+
val parent: Job?
262+
if (parentHandle == null) {
263+
// No parent -- no postponed and no async cancellations
264+
parent = context[Job] ?: return null
265+
/*
266+
* Rare slow-path: parent handle is not yet installed in reusable CC,
267+
* but parent is cancelled. Just let already existing machinery to figure everything out
268+
* and advance state machine for us.
269+
*/
270+
if (parent.isCancelled) {
271+
installParentHandleReusable(parent)
272+
return parent
273+
}
274+
} else {
275+
// Parent handle is not null, no need to lookup it
276+
parent = null
277+
}
278+
return parent
279+
}
280+
277281
@PublishedApi
278282
internal fun getResult(): Any? {
279-
setupCancellation()
280-
if (trySuspend()) return COROUTINE_SUSPENDED
283+
val isReusable = isReusable()
284+
/*
285+
* Check postponed or async cancellation for reusable continuations.
286+
* Returns job to avoid looking it up twice
287+
*/
288+
val parentJob = checkCancellation()
289+
// trySuspend may fail either if 'block' has resumed/cancelled a continuation
290+
// or we got async cancellation from parent.
291+
if (trySuspend()) {
292+
/*
293+
* We were neither resumed nor cancelled, time to suspend.
294+
* But first we have to install parent cancellation handle (if we didn't yet),
295+
* so CC could be properly resumed on parent cancellation.
296+
*/
297+
if (parentHandle == null) {
298+
installParentHandleReusable(parentJob)
299+
} else if (isReusable) {
300+
releaseClaimedReusableContinuation()
301+
}
302+
return COROUTINE_SUSPENDED
303+
} else if (isReusable) {
304+
releaseClaimedReusableContinuation()
305+
}
281306
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
282307
val state = this.state
283308
if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this)
@@ -296,6 +321,31 @@ internal open class CancellableContinuationImpl<in T>(
296321
return getSuccessfulResult(state)
297322
}
298323

324+
private fun installParentHandleReusable(parent: Job?) {
325+
if (parent == null) return // don't do anything without parent or if completed
326+
// Install the handle
327+
val handle = parent.invokeOnCompletion(
328+
onCancelling = true,
329+
handler = ChildContinuation(this).asHandler
330+
)
331+
parentHandle = handle
332+
/*
333+
* Finally release the continuation after installing the handle. If we were successful, then
334+
* do nothing, it's ok to reuse the instance now.
335+
* Otherwise, dispose the handle by ourselves.
336+
*/
337+
releaseClaimedReusableContinuation()
338+
}
339+
340+
private fun releaseClaimedReusableContinuation() {
341+
val cancellationCause = (delegate as DispatchedContinuation<*>).tryReleaseClaimedContinuation(this) ?: return
342+
parentHandle?.let {
343+
it.dispose()
344+
parentHandle = NonDisposableHandle
345+
}
346+
cancel(cancellationCause)
347+
}
348+
299349
override fun resumeWith(result: Result<T>) =
300350
resumeImpl(result.toState(this), resumeMode)
301351

@@ -462,12 +512,15 @@ internal open class CancellableContinuationImpl<in T>(
462512

463513
/**
464514
* Detaches from the parent.
465-
* Invariant: used from [CoroutineDispatcher.releaseInterceptedContinuation] iff [isReusable] is `true`
515+
* * Used from [CoroutineDispatcher.releaseInterceptedContinuation] iff [isReusable] is `true`
516+
* * Used from [parentCancelled] iff [isReusable] is `false`
466517
*/
467518
internal fun detachChild() {
468519
val handle = parentHandle
469-
handle?.dispose()
470-
parentHandle = NonDisposableHandle
520+
if (handle != null) {
521+
handle.dispose()
522+
parentHandle = NonDisposableHandle
523+
}
471524
}
472525

473526
// Note: Always returns RESUME_TOKEN | null

kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,12 @@ public abstract class CoroutineDispatcher :
101101

102102
@InternalCoroutinesApi
103103
public override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
104-
(continuation as DispatchedContinuation<*>).reusableCancellableContinuation?.detachChild()
104+
/*
105+
* Unconditional cast is safe here: we only return DispatchedContinuation from `interceptContinuation`,
106+
* any ClassCastException can only indicate compiler bug
107+
*/
108+
val dispatched = continuation as DispatchedContinuation<*>
109+
dispatched.release()
105110
}
106111

107112
/**

kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,17 +46,15 @@ internal class DispatchedContinuation<in T>(
4646
* 4) [Throwable] continuation was cancelled with this cause while being in [suspendCancellableCoroutineReusable],
4747
* [CancellableContinuationImpl.getResult] will check for cancellation later.
4848
*
49-
* [REUSABLE_CLAIMED] state is required to prevent the lost resume in the channel.
50-
* AbstractChannel.receive method relies on the fact that the following pattern
49+
* [REUSABLE_CLAIMED] state is required to prevent double-use of the reused continuation.
50+
* In the `getResult`, we have the following code:
5151
* ```
52-
* suspendCancellableCoroutineReusable { cont ->
53-
* val result = pollFastPath()
54-
* if (result != null) cont.resume(result)
52+
* if (trySuspend()) {
53+
* // <- at this moment current continuation can be redispatched and claimed again.
54+
* attachChildToParent()
55+
* releaseClaimedContinuation()
5556
* }
5657
* ```
57-
* always succeeds.
58-
* To make it always successful, we actually postpone "reusable" cancellation
59-
* to this phase and set cancellation only at the moment of instantiation.
6058
*/
6159
private val _reusableCancellableContinuation = atomic<Any?>(null)
6260

@@ -66,9 +64,9 @@ internal class DispatchedContinuation<in T>(
6664
public fun isReusable(requester: CancellableContinuationImpl<*>): Boolean {
6765
/*
6866
* Reusability control:
69-
* `null` -> no reusability at all, false
67+
* `null` -> no reusability at all, `false`
7068
* If current state is not CCI, then we are within `suspendCancellableCoroutineReusable`, true
71-
* Else, if result is CCI === requester.
69+
* Else, if result is CCI === requester, then it's our reusable continuation
7270
* Identity check my fail for the following pattern:
7371
* ```
7472
* loop:
@@ -82,6 +80,27 @@ internal class DispatchedContinuation<in T>(
8280
return true
8381
}
8482

83+
84+
/**
85+
* Awaits until previous call to `suspendCancellableCoroutineReusable` will
86+
* stop mutating cached instance
87+
*/
88+
public fun awaitReusability() {
89+
_reusableCancellableContinuation.loop { it ->
90+
if (it !== REUSABLE_CLAIMED) return
91+
}
92+
}
93+
94+
public fun release() {
95+
/*
96+
* Called from `releaseInterceptedContinuation`, can be concurrent with
97+
* the code in `getResult` right after `trySuspend` returned `true`, so we have
98+
* to wait for a release here.
99+
*/
100+
awaitReusability()
101+
reusableCancellableContinuation?.detachChild()
102+
}
103+
85104
/**
86105
* Claims the continuation for [suspendCancellableCoroutineReusable] block,
87106
* so all cancellations will be postponed.
@@ -103,11 +122,16 @@ internal class DispatchedContinuation<in T>(
103122
_reusableCancellableContinuation.value = REUSABLE_CLAIMED
104123
return null
105124
}
125+
// potentially competing with cancel
106126
state is CancellableContinuationImpl<*> -> {
107127
if (_reusableCancellableContinuation.compareAndSet(state, REUSABLE_CLAIMED)) {
108128
return state as CancellableContinuationImpl<T>
109129
}
110130
}
131+
state === REUSABLE_CLAIMED -> {
132+
// Do nothing, wait until reusable instance will be returned from
133+
// getResult() of a previous `suspendCancellableCoroutineReusable`
134+
}
111135
else -> error("Inconsistent state $state")
112136
}
113137
}
@@ -127,14 +151,13 @@ internal class DispatchedContinuation<in T>(
127151
*
128152
* See [CancellableContinuationImpl.getResult].
129153
*/
130-
fun checkPostponedCancellation(continuation: CancellableContinuation<*>): Throwable? {
154+
fun tryReleaseClaimedContinuation(continuation: CancellableContinuation<*>): Throwable? {
131155
_reusableCancellableContinuation.loop { state ->
132156
// not when(state) to avoid Intrinsics.equals call
133157
when {
134158
state === REUSABLE_CLAIMED -> {
135159
if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, continuation)) return null
136160
}
137-
state === null -> return null
138161
state is Throwable -> {
139162
require(_reusableCancellableContinuation.compareAndSet(state, null))
140163
return state

kotlinx-coroutines-core/jvm/test/FieldWalker.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines
@@ -56,7 +56,7 @@ object FieldWalker {
5656
* Reflectively starts to walk through object graph and map to all the reached object to their path
5757
* in from root. Use [showPath] do display a path if needed.
5858
*/
59-
private fun walkRefs(root: Any?, rootStatics: Boolean): Map<Any, Ref> {
59+
private fun walkRefs(root: Any?, rootStatics: Boolean): IdentityHashMap<Any, Ref> {
6060
val visited = IdentityHashMap<Any, Ref>()
6161
if (root == null) return visited
6262
visited[root] = Ref.RootRef
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlinx.coroutines.channels.*
8+
import org.junit.Test
9+
import kotlin.test.*
10+
11+
class ReusableCancellableContinuationLeakStressTest : TestBase() {
12+
13+
@Suppress("UnnecessaryVariable")
14+
private suspend fun <T : Any> ReceiveChannel<T>.receiveBatch(): T {
15+
val r = receive() // DO NOT MERGE LINES, otherwise TCE will kick in
16+
return r
17+
}
18+
19+
private val iterations = 100_000 * stressTestMultiplier
20+
21+
class Leak(val i: Int)
22+
23+
@Test // Simplified version of #2564
24+
fun testReusableContinuationLeak() = runTest {
25+
val channel = produce(capacity = 1) { // from the main thread
26+
(0 until iterations).forEach {
27+
send(Leak(it))
28+
}
29+
}
30+
31+
launch(Dispatchers.Default) {
32+
repeat (iterations) {
33+
val value = channel.receiveBatch()
34+
assertEquals(it, value.i)
35+
}
36+
(channel as Job).join()
37+
38+
FieldWalker.assertReachableCount(0, coroutineContext.job, false) { it is Leak }
39+
}
40+
}
41+
}

0 commit comments

Comments
 (0)