Skip to content

Commit b493aab

Browse files
committed
~: [squash] do not postpone explicit cancellation, store Throwable instead of CancelledContinuation in reusability SM, style fixes
1 parent 01c4e70 commit b493aab

File tree

8 files changed

+65
-50
lines changed

8 files changed

+65
-50
lines changed

benchmarks/src/jmh/kotlin/benchmarks/tailcall/SimpleChannel.kt

+2
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,13 @@ class CancellableChannel : SimpleChannel() {
8383
}
8484

8585
class CancellableReusableChannel : SimpleChannel() {
86+
@Suppress("INVISIBLE_MEMBER")
8687
override suspend fun suspendReceive(): Int = suspendAtomicCancellableCoroutineReusable {
8788
consumer = it.intercepted()
8889
COROUTINE_SUSPENDED
8990
}
9091

92+
@Suppress("INVISIBLE_MEMBER")
9193
override suspend fun suspendSend(element: Int) = suspendAtomicCancellableCoroutineReusable<Unit> {
9294
enqueuedValue = element
9395
producer = it.intercepted()

kotlinx-coroutines-core/common/src/CancellableContinuation.kt

+2-4
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,6 @@ internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>):
239239
if (delegate !is DispatchedContinuation<T>) {
240240
return CancellableContinuationImpl(delegate, resumeMode = MODE_ATOMIC_DEFAULT)
241241
}
242-
243242
/*
244243
* Attempt to claim reusable instance.
245244
*
@@ -254,9 +253,8 @@ internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>):
254253
* thus leaking CC instance for indefinite time.
255254
* 2) Continuation was cancelled. Then we should prevent any further reuse and bail out.
256255
*/
257-
val claimed = delegate.claimReusableCancellableContinuation() ?: return CancellableContinuationImpl(delegate, MODE_ATOMIC_DEFAULT)
258-
claimed.resetState()
259-
return claimed
256+
return delegate.claimReusableCancellableContinuation()?.apply { resetState() }
257+
?: return CancellableContinuationImpl(delegate, MODE_ATOMIC_DEFAULT)
260258
}
261259

262260
/**

kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt

+20-24
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,10 @@ internal open class CancellableContinuationImpl<in T>(
122122
val completed = isCompleted
123123
if (resumeMode != MODE_ATOMIC_DEFAULT) return completed // Do not check postponed cancellation for non-reusable continuations
124124
val dispatched = delegate as? DispatchedContinuation<*> ?: return completed
125-
val cancelled = dispatched.checkPostponedCancellation(this) ?: return completed
125+
val cause = dispatched.checkPostponedCancellation(this) ?: return completed
126126
if (!completed) {
127127
// Note: this cancel may fail if one more concurrent cancel is currently being invoked
128-
cancel(cancelled.cause)
128+
cancel(cause)
129129
}
130130
return true
131131
}
@@ -148,21 +148,13 @@ internal open class CancellableContinuationImpl<in T>(
148148
/*
149149
* Attempt to postpone cancellation for reusable cancellable continuation
150150
*/
151-
private fun cancelLater(cancelled: CancelledContinuation): Boolean {
151+
private fun cancelLater(cause: Throwable): Boolean {
152152
if (resumeMode != MODE_ATOMIC_DEFAULT) return false
153153
val dispatched = (delegate as? DispatchedContinuation<*>) ?: return false
154-
return dispatched.postponeCancellation(cancelled)
154+
return dispatched.postponeCancellation(cause)
155155
}
156156

157157
public override fun cancel(cause: Throwable?): Boolean {
158-
if (cancelLater(CancelledContinuation(this, cause, handled = state is CancelHandler))) {
159-
/*
160-
* Here we can't reliably say whether postponed cancellation will be successful, but as it's internal API
161-
* and we do not rely on return value, we are free to return `true`.
162-
*/
163-
return true
164-
}
165-
166158
_state.loop { state ->
167159
if (state !is NotCompleted) return false // false if already complete or cancelling
168160
// Active -- update to final state
@@ -171,12 +163,21 @@ internal open class CancellableContinuationImpl<in T>(
171163
// Invoke cancel handler if it was present
172164
if (state is CancelHandler) invokeHandlerSafely { state.invoke(cause) }
173165
// Complete state update
174-
disposeParentHandle()
166+
detachChildIfNonResuable()
175167
dispatchResume(mode = MODE_ATOMIC_DEFAULT)
176168
return true
177169
}
178170
}
179171

172+
internal fun parentCancelled(cause: Throwable) {
173+
/*
174+
* Here we can't reliably say whether postponed cancellation will be successful, but as it's internal API
175+
* and we do not rely on return value, we are free to return `true`.
176+
*/
177+
if (cancelLater(cause)) return
178+
cancel(cause)
179+
}
180+
180181
private inline fun invokeHandlerSafely(block: () -> Unit) {
181182
try {
182183
block()
@@ -308,7 +309,7 @@ internal open class CancellableContinuationImpl<in T>(
308309
when (state) {
309310
is NotCompleted -> {
310311
if (!_state.compareAndSet(state, proposedUpdate)) return@loop // retry on cas failure
311-
disposeParentHandle()
312+
detachChildIfNonResuable()
312313
dispatchResume(resumeMode)
313314
return null
314315
}
@@ -330,14 +331,9 @@ internal open class CancellableContinuationImpl<in T>(
330331
}
331332

332333
// Unregister from parent job
333-
private fun disposeParentHandle() {
334-
// If instance is reusable, do not detach on every reuse, #releaseInterceptedContinuation
335-
// will do it for us in the end
336-
if (isReusable()) return
337-
parentHandle?.let { // volatile read parentHandle (once)
338-
it.dispose()
339-
parentHandle = NonDisposableHandle // release it just in case, to aid GC
340-
}
334+
private fun detachChildIfNonResuable() {
335+
// If instance is reusable, do not detach on every reuse, #releaseInterceptedContinuation will do it for us in the end
336+
if (!isReusable()) detachChild()
341337
}
342338

343339
/**
@@ -361,7 +357,7 @@ internal open class CancellableContinuationImpl<in T>(
361357
val update: Any? = if (idempotent == null) value else
362358
CompletedIdempotentResult(idempotent, value, state)
363359
if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
364-
disposeParentHandle()
360+
detachChildIfNonResuable()
365361
return state
366362
}
367363
is CompletedIdempotentResult -> {
@@ -383,7 +379,7 @@ internal open class CancellableContinuationImpl<in T>(
383379
is NotCompleted -> {
384380
val update = CompletedExceptionally(exception)
385381
if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
386-
disposeParentHandle()
382+
detachChildIfNonResuable()
387383
return state
388384
}
389385
else -> return null // cannot resume -- not active anymore

kotlinx-coroutines-core/common/src/JobSupport.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1415,7 +1415,7 @@ internal class ChildContinuation(
14151415
@JvmField val child: CancellableContinuationImpl<*>
14161416
) : JobCancellingNode<Job>(parent) {
14171417
override fun invoke(cause: Throwable?) {
1418-
child.cancel(child.getContinuationCancellationCause(job))
1418+
child.parentCancelled(child.getContinuationCancellationCause(job))
14191419
}
14201420
override fun toString(): String =
14211421
"ChildContinuation[$child]"

kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt

+18-15
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ internal class DispatchedContinuation<in T>(
4747
* }
4848
* // state == CC
4949
* ```
50-
* 4) [CancelledContinuation] continuation was cancelled while being in [suspendAtomicCancellableCoroutineReusable],
50+
* 4) [Throwable] continuation was cancelled with this cause while being in [suspendAtomicCancellableCoroutineReusable],
5151
* [CancellableContinuationImpl.getResult] will check for cancellation later.
5252
* 5) [NON_REUSABLE]. CC was cancelled at least once, thus cannot be longer reused.
5353
*
@@ -84,26 +84,29 @@ internal class DispatchedContinuation<in T>(
8484
* 3) `NON_REUSABLE` -> nothing, proceed, caller will instantiate CC instance
8585
*/
8686
_reusableCancellableContinuation.loop { state ->
87-
when (state) {
88-
null -> {
89-
if (_reusableCancellableContinuation.compareAndSet(null, REUSABLE_CLAIMED)) {
90-
return null
91-
}
87+
when {
88+
state === null -> {
89+
/*
90+
* null -> CC was not yet published -> we do not compete with cancel
91+
* -> can use plain store instead of CAS
92+
*/
93+
_reusableCancellableContinuation.value = REUSABLE_CLAIMED
94+
return null
9295
}
93-
is CancellableContinuationImpl<*> -> {
96+
state is CancellableContinuationImpl<*> -> {
9497
if (_reusableCancellableContinuation.compareAndSet(state, REUSABLE_CLAIMED)) {
9598
return state as CancellableContinuationImpl<T>
9699
}
97100
}
98-
NON_REUSABLE -> return null
101+
state === NON_REUSABLE -> return null
99102
else -> error("Inconsistent state $state")
100103
}
101104
}
102105
}
103106

104107
/**
105108
* Checks whether there were any attempts to cancel reusable CC while it was in [REUSABLE_CLAIMED] state
106-
* and returns [CancelledContinuation] if so, `null` otherwise.
109+
* and returns cancellation cause if so, `null` otherwise.
107110
* If continuation was cancelled, it becomes non-reusable.
108111
*
109112
* ```
@@ -115,13 +118,13 @@ internal class DispatchedContinuation<in T>(
115118
*
116119
* See [CancellableContinuationImpl.getResult].
117120
*/
118-
fun checkPostponedCancellation(continuation: CancellableContinuation<*>): CancelledContinuation? {
121+
fun checkPostponedCancellation(continuation: CancellableContinuation<*>): Throwable? {
119122
_reusableCancellableContinuation.loop { state ->
120123
// not when(state) to avoid Intrinsics.equals call
121124
when {
122125
state === REUSABLE_CLAIMED -> if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, continuation)) return null
123126
state === null -> return null
124-
state is CancelledContinuation -> {
127+
state is Throwable -> {
125128
require(_reusableCancellableContinuation.compareAndSet(state, NON_REUSABLE))
126129
return state
127130
}
@@ -134,7 +137,7 @@ internal class DispatchedContinuation<in T>(
134137
_reusableCancellableContinuation.loop { state ->
135138
when (state) {
136139
// Do not overwrite cancellation (cancelled CC can't be reused anyway)
137-
is CancelledContinuation -> return
140+
is Throwable -> return
138141
else -> if (_reusableCancellableContinuation.compareAndSet(state, NON_REUSABLE)) return
139142
}
140143
}
@@ -144,14 +147,14 @@ internal class DispatchedContinuation<in T>(
144147
* Tries to postpone cancellation if reusable CC is currently in [REUSABLE_CLAIMED] state.
145148
* Returns `true` if cancellation is (or previously was) postponed, `false` otherwise.
146149
*/
147-
fun postponeCancellation(cancelled: CancelledContinuation): Boolean {
150+
fun postponeCancellation(cause: Throwable): Boolean {
148151
_reusableCancellableContinuation.loop { state ->
149152
when (state) {
150153
REUSABLE_CLAIMED -> {
151-
if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, cancelled))
154+
if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, cause))
152155
return true
153156
}
154-
is CancelledContinuation -> return true
157+
is Throwable -> return true
155158
else -> {
156159
if (_reusableCancellableContinuation.compareAndSet(state, NON_REUSABLE))
157160
return false

kotlinx-coroutines-core/common/src/sync/Mutex.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
187187
return lockSuspend(owner)
188188
}
189189

190-
private suspend fun lockSuspend(owner: Any?) = suspendAtomicCancellableCoroutine<Unit> sc@ { cont ->
190+
private suspend fun lockSuspend(owner: Any?) = suspendAtomicCancellableCoroutineReusable<Unit> sc@ { cont ->
191191
val waiter = LockCont(owner, cont)
192192
_state.loop { state ->
193193
when (state) {

kotlinx-coroutines-core/common/src/sync/Semaphore.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ private class SemaphoreImpl(
131131
cur + 1
132132
}
133133

134-
private suspend fun addToQueueAndSuspend() = suspendAtomicCancellableCoroutine<Unit> sc@ { cont ->
134+
private suspend fun addToQueueAndSuspend() = suspendAtomicCancellableCoroutineReusable<Unit> sc@ { cont ->
135135
val last = this.tail
136136
val enqIdx = enqIdx.getAndIncrement()
137137
val segment = getSegment(last, enqIdx / SEGMENT_SIZE)

kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt

+20-4
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,30 @@ class ReusableCancellableContinuationTest : TestBase() {
6060
}
6161

6262
@Test
63-
fun testNotCancelledOnClaimedResume() = runTest {
63+
fun testNotCancelledOnClaimedResume() = runTest({ it is CancellationException }) {
6464
expect(1)
65+
// Bind child at first
66+
var continuation: Continuation<*>? = null
6567
suspendAtomicCancellableCoroutineReusable<Unit> {
66-
it.cancel()
67-
it.resume(Unit)
68+
expect(2)
69+
continuation = it
70+
launch {
71+
expect(3)
72+
it.resume(Unit)
73+
}
6874
}
75+
println("1")
76+
expect(4)
6977
ensureActive()
70-
finish(2)
78+
// Verify child was bound
79+
assertNotNull(FieldWalker.walk(coroutineContext[Job]!!).single { it === continuation })
80+
suspendAtomicCancellableCoroutineReusable<Unit> {
81+
expect(5)
82+
coroutineContext[Job]!!.cancel()
83+
it.resume(Unit)
84+
}
85+
assertFalse(isActive)
86+
finish(6)
7187
}
7288

7389
@Test

0 commit comments

Comments
 (0)