Skip to content

Commit 8fd7ce8

Browse files
committed
~: [squash] do not postpone explicit cancellation, store Throwable instead of CancelledContinuation in reusability SM, style fixes
1 parent b493aab commit 8fd7ce8

File tree

4 files changed

+81
-42
lines changed

4 files changed

+81
-42
lines changed

kotlinx-coroutines-core/common/src/CancellableContinuation.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>):
253253
* thus leaking CC instance for indefinite time.
254254
* 2) Continuation was cancelled. Then we should prevent any further reuse and bail out.
255255
*/
256-
return delegate.claimReusableCancellableContinuation()?.apply { resetState() }
256+
return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetState() }
257257
?: return CancellableContinuationImpl(delegate, MODE_ATOMIC_DEFAULT)
258258
}
259259

kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt

+10-15
Original file line numberDiff line numberDiff line change
@@ -78,22 +78,23 @@ internal open class CancellableContinuationImpl<in T>(
7878
// This method does nothing. Leftover for binary compatibility with old compiled code
7979
}
8080

81-
private fun invalidateReusability() {
82-
if (resumeMode != MODE_ATOMIC_DEFAULT) return
83-
(delegate as? DispatchedContinuation<*>)?.makeCancellationNonReusable()
84-
}
85-
8681
private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable
8782

8883
/**
8984
* Resets cancellability state in order to [suspendAtomicCancellableCoroutineReusable] to work.
9085
* Invariant: used only by [suspendAtomicCancellableCoroutineReusable] in [REUSABLE_CLAIMED] state.
9186
*/
92-
internal fun resetState() {
87+
internal fun resetState(): Boolean {
9388
assert { parentHandle !== NonDisposableHandle }
94-
assert { _state.value !is NotCompleted }
89+
val state = _state.value
90+
assert { state !is NotCompleted }
91+
if (state is CompletedIdempotentResult) {
92+
detachChild()
93+
return false
94+
}
9595
_decision.value = UNDECIDED
9696
_state.value = Active
97+
return true
9798
}
9899

99100
/**
@@ -170,12 +171,10 @@ internal open class CancellableContinuationImpl<in T>(
170171
}
171172

172173
internal fun parentCancelled(cause: Throwable) {
173-
/*
174-
* Here we can't reliably say whether postponed cancellation will be successful, but as it's internal API
175-
* and we do not rely on return value, we are free to return `true`.
176-
*/
177174
if (cancelLater(cause)) return
178175
cancel(cause)
176+
// Even if cancellation has failed, we should detach child to avoid potential leak
177+
detachChildIfNonResuable()
179178
}
180179

181180
private inline fun invokeHandlerSafely(block: () -> Unit) {
@@ -347,10 +346,6 @@ internal open class CancellableContinuationImpl<in T>(
347346
}
348347

349348
override fun tryResume(value: T, idempotent: Any?): Any? {
350-
// Important invariant: if idempotent tryResume is used, then regular resume cannot be used, since the
351-
// thread that invokes this tryResume install its descriptor into the pointer to the corresponding node first,
352-
// so no other thread can find it and invoke resume.
353-
if (idempotent != null) invalidateReusability()
354349
_state.loop { state ->
355350
when (state) {
356351
is NotCompleted -> {

kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt

+6-23
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,6 @@ import kotlin.jvm.*
1111

1212
@SharedImmutable
1313
private val UNDEFINED = Symbol("UNDEFINED")
14-
15-
// Internal to avoid synthetic accessors
16-
@SharedImmutable
17-
@JvmField
18-
internal val NON_REUSABLE = Symbol("NON_REUSABLE")
19-
2014
@SharedImmutable
2115
@JvmField
2216
internal val REUSABLE_CLAIMED = Symbol("REUSABLE_CLAIMED")
@@ -36,7 +30,8 @@ internal class DispatchedContinuation<in T>(
3630
/**
3731
* Possible states of reusability:
3832
*
39-
* 1) `null`. Cancellable continuation wasn't yet attempted to be reused.
33+
* 1) `null`. Cancellable continuation wasn't yet attempted to be reused or
34+
* way used and then invalidated (e.g. because of the cancellation).
4035
* 2) [CancellableContinuation]. Continuation to be/that is being reused.
4136
* 3) [REUSABLE_CLAIMED]. CC is currently being reused and its owner executes `suspend` block:
4237
* ```
@@ -49,7 +44,6 @@ internal class DispatchedContinuation<in T>(
4944
* ```
5045
* 4) [Throwable] continuation was cancelled with this cause while being in [suspendAtomicCancellableCoroutineReusable],
5146
* [CancellableContinuationImpl.getResult] will check for cancellation later.
52-
* 5) [NON_REUSABLE]. CC was cancelled at least once, thus cannot be longer reused.
5347
*
5448
* [REUSABLE_CLAIMED] state is required to prevent the lost resume in the channel.
5549
* AbstractChannel.receive method relies on the fact that the following pattern
@@ -69,7 +63,7 @@ internal class DispatchedContinuation<in T>(
6963
get() = _reusableCancellableContinuation.value as? CancellableContinuationImpl<*>
7064

7165
public val isReusable: Boolean
72-
get() = _reusableCancellableContinuation.value.let { it != null && it != NON_REUSABLE }
66+
get() = _reusableCancellableContinuation.value != null
7367

7468
/**
7569
* Claims the continuation for [suspendAtomicCancellableCoroutineReusable] block,
@@ -81,7 +75,6 @@ internal class DispatchedContinuation<in T>(
8175
* Transitions:
8276
* 1) `null` -> claimed, caller will instantiate CC instance
8377
* 2) `CC` -> claimed, caller will reuse CC instance
84-
* 3) `NON_REUSABLE` -> nothing, proceed, caller will instantiate CC instance
8578
*/
8679
_reusableCancellableContinuation.loop { state ->
8780
when {
@@ -98,7 +91,6 @@ internal class DispatchedContinuation<in T>(
9891
return state as CancellableContinuationImpl<T>
9992
}
10093
}
101-
state === NON_REUSABLE -> return null
10294
else -> error("Inconsistent state $state")
10395
}
10496
}
@@ -125,24 +117,14 @@ internal class DispatchedContinuation<in T>(
125117
state === REUSABLE_CLAIMED -> if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, continuation)) return null
126118
state === null -> return null
127119
state is Throwable -> {
128-
require(_reusableCancellableContinuation.compareAndSet(state, NON_REUSABLE))
120+
require(_reusableCancellableContinuation.compareAndSet(state, null))
129121
return state
130122
}
131123
else -> return null // Is not reusable
132124
}
133125
}
134126
}
135127

136-
fun makeCancellationNonReusable() {
137-
_reusableCancellableContinuation.loop { state ->
138-
when (state) {
139-
// Do not overwrite cancellation (cancelled CC can't be reused anyway)
140-
is Throwable -> return
141-
else -> if (_reusableCancellableContinuation.compareAndSet(state, NON_REUSABLE)) return
142-
}
143-
}
144-
}
145-
146128
/**
147129
* Tries to postpone cancellation if reusable CC is currently in [REUSABLE_CLAIMED] state.
148130
* Returns `true` if cancellation is (or previously was) postponed, `false` otherwise.
@@ -156,7 +138,8 @@ internal class DispatchedContinuation<in T>(
156138
}
157139
is Throwable -> return true
158140
else -> {
159-
if (_reusableCancellableContinuation.compareAndSet(state, NON_REUSABLE))
141+
// Invalidate
142+
if (_reusableCancellableContinuation.compareAndSet(state, null))
160143
return false
161144
}
162145
}

kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt

+64-3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package kotlinx.coroutines
66

77
import kotlinx.coroutines.channels.*
8+
import kotlinx.coroutines.selects.*
89
import org.junit.Test
910
import kotlin.coroutines.*
1011
import kotlin.test.*
@@ -67,12 +68,11 @@ class ReusableCancellableContinuationTest : TestBase() {
6768
suspendAtomicCancellableCoroutineReusable<Unit> {
6869
expect(2)
6970
continuation = it
70-
launch {
71+
launch { // Attach to the parent, avoid fast path
7172
expect(3)
7273
it.resume(Unit)
7374
}
7475
}
75-
println("1")
7676
expect(4)
7777
ensureActive()
7878
// Verify child was bound
@@ -105,7 +105,7 @@ class ReusableCancellableContinuationTest : TestBase() {
105105
fun testResumeRegularDoesntPreservesReference() = runTest {
106106
expect(1)
107107
var cont: Continuation<Unit>? = null
108-
launch {
108+
launch { // Attach to the parent, avoid fast path
109109
cont!!.resumeWith(Result.success(Unit))
110110
}
111111
suspendAtomicCancellableCoroutine<Unit> {
@@ -131,4 +131,65 @@ class ReusableCancellableContinuationTest : TestBase() {
131131
finish(2)
132132
}
133133
}
134+
135+
@Test
136+
fun testPropagatedCancel() = runTest({it is CancellationException}) {
137+
val currentJob = coroutineContext[Job]!!
138+
expect(1)
139+
// Bind child at first
140+
suspendAtomicCancellableCoroutineReusable<Unit> {
141+
expect(2)
142+
// Attach to the parent, avoid fast path
143+
launch {
144+
expect(3)
145+
it.resume(Unit)
146+
}
147+
}
148+
expect(4)
149+
ensureActive()
150+
// Verify child was bound
151+
assertEquals(1, FieldWalker.walk(currentJob).count { it is CancellableContinuation<*> })
152+
currentJob.cancel()
153+
assertFalse(isActive)
154+
// Child detached
155+
assertEquals(0, FieldWalker.walk(currentJob).count { it is CancellableContinuation<*> })
156+
suspendAtomicCancellableCoroutineReusable<Unit> { it.resume(Unit) }
157+
suspendAtomicCancellableCoroutineReusable<Unit> { it.resume(Unit) }
158+
assertEquals(0, FieldWalker.walk(currentJob).count { it is CancellableContinuation<*> })
159+
160+
try {
161+
suspendAtomicCancellableCoroutineReusable<Unit> {}
162+
} catch (e: CancellationException) {
163+
assertEquals(0, FieldWalker.walk(currentJob).count { it is CancellableContinuation<*> })
164+
finish(5)
165+
}
166+
}
167+
168+
@Test
169+
fun testChannelMemoryLeak() = runTest {
170+
val iterations = 100
171+
val channel = Channel<Unit>()
172+
launch {
173+
repeat(iterations) {
174+
select {
175+
channel.onSend(Unit) {}
176+
}
177+
}
178+
}
179+
180+
val receiver = launch {
181+
repeat(iterations) {
182+
channel.receive()
183+
}
184+
expect(2)
185+
val job = coroutineContext[Job]!!
186+
// 1 for reusable CC, another one for outer joiner
187+
assertEquals(2, FieldWalker.walk(job).count { it is CancellableContinuation<*> })
188+
}
189+
expect(1)
190+
receiver.join()
191+
// Reference should be claimed at this point
192+
assertEquals(0, FieldWalker.walk(receiver).count { it is CancellableContinuation<*> })
193+
finish(3)
194+
}
134195
}

0 commit comments

Comments
 (0)