Skip to content

Commit 7317fe5

Browse files
committed
Optimize CancellableContinuationImpl.invokeOnCancellation(..) for Segments
Signed-off-by: Nikita Koval <[email protected]>
1 parent 7a70cff commit 7317fe5

10 files changed

+281
-166
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+3-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public final class kotlinx/coroutines/CancellableContinuation$DefaultImpls {
5151
public static synthetic fun tryResume$default (Lkotlinx/coroutines/CancellableContinuation;Ljava/lang/Object;Ljava/lang/Object;ILjava/lang/Object;)Ljava/lang/Object;
5252
}
5353

54-
public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/jvm/internal/CoroutineStackFrame, kotlinx/coroutines/CancellableContinuation, kotlinx/coroutines/channels/Waiter {
54+
public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/jvm/internal/CoroutineStackFrame, kotlinx/coroutines/CancellableContinuation, kotlinx/coroutines/Waiter {
5555
public fun <init> (Lkotlin/coroutines/Continuation;I)V
5656
public final fun callCancelHandler (Lkotlinx/coroutines/CancelHandler;Ljava/lang/Throwable;)V
5757
public final fun callOnCancellation (Lkotlin/jvm/functions/Function1;Ljava/lang/Throwable;)V
@@ -64,6 +64,7 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/
6464
public fun getStackTraceElement ()Ljava/lang/StackTraceElement;
6565
public fun initCancellability ()V
6666
public fun invokeOnCancellation (Lkotlin/jvm/functions/Function1;)V
67+
public fun invokeOnCancellation (Lkotlinx/coroutines/internal/Segment;I)V
6768
public fun isActive ()Z
6869
public fun isCancelled ()Z
6970
public fun isCompleted ()Z
@@ -1257,6 +1258,7 @@ public class kotlinx/coroutines/selects/SelectImplementation : kotlinx/coroutine
12571258
public fun invoke (Lkotlinx/coroutines/selects/SelectClause1;Lkotlin/jvm/functions/Function2;)V
12581259
public fun invoke (Lkotlinx/coroutines/selects/SelectClause2;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)V
12591260
public fun invoke (Lkotlinx/coroutines/selects/SelectClause2;Lkotlin/jvm/functions/Function2;)V
1261+
public fun invokeOnCancellation (Lkotlinx/coroutines/internal/Segment;I)V
12601262
public fun onTimeout (JLkotlin/jvm/functions/Function1;)V
12611263
public fun selectInRegistrationPhase (Ljava/lang/Object;)V
12621264
public fun trySelect (Ljava/lang/Object;Ljava/lang/Object;)Z

kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt

+82-23
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package kotlinx.coroutines
66

77
import kotlinx.atomicfu.*
8-
import kotlinx.coroutines.channels.Waiter
98
import kotlinx.coroutines.internal.*
109
import kotlin.coroutines.*
1110
import kotlin.coroutines.intrinsics.*
@@ -15,6 +14,15 @@ private const val UNDECIDED = 0
1514
private const val SUSPENDED = 1
1615
private const val RESUMED = 2
1716

17+
private const val DECISION_SHIFT = 29
18+
private const val INDEX_MASK = (1 shl DECISION_SHIFT) - 1
19+
private const val NO_INDEX = INDEX_MASK
20+
21+
private inline val Int.decision get() = this shr DECISION_SHIFT
22+
private inline val Int.index get() = this and INDEX_MASK
23+
@Suppress("NOTHING_TO_INLINE")
24+
private inline fun decisionAndIndex(decision: Int, index: Int) = (decision shl DECISION_SHIFT) + index
25+
1826
@JvmField
1927
internal val RESUME_TOKEN = Symbol("RESUME_TOKEN")
2028

@@ -44,7 +52,7 @@ internal open class CancellableContinuationImpl<in T>(
4452
* less dependencies.
4553
*/
4654

47-
/* decision state machine
55+
/** decision state machine
4856
4957
+-----------+ trySuspend +-----------+
5058
| UNDECIDED | -------------> | SUSPENDED |
@@ -56,9 +64,12 @@ internal open class CancellableContinuationImpl<in T>(
5664
| RESUMED |
5765
+-----------+
5866
59-
Note: both tryResume and trySuspend can be invoked at most once, first invocation wins
67+
Note: both tryResume and trySuspend can be invoked at most once, first invocation wins.
68+
If the cancellation handler is specified via a [Segment] instance and the index in it
69+
(so [Segment.onCancellation] should be called), the [_decisionAndIndex] field may store
70+
this index additionally to the "decision" value.
6071
*/
61-
private val _decision = atomic(UNDECIDED)
72+
private val _decisionAndIndex = atomic(decisionAndIndex(UNDECIDED, NO_INDEX))
6273

6374
/*
6475
=== Internal states ===
@@ -144,7 +155,7 @@ internal open class CancellableContinuationImpl<in T>(
144155
detachChild()
145156
return false
146157
}
147-
_decision.value = UNDECIDED
158+
_decisionAndIndex.value = decisionAndIndex(UNDECIDED, NO_INDEX)
148159
_state.value = Active
149160
return true
150161
}
@@ -194,10 +205,13 @@ internal open class CancellableContinuationImpl<in T>(
194205
_state.loop { state ->
195206
if (state !is NotCompleted) return false // false if already complete or cancelling
196207
// Active -- update to final state
197-
val update = CancelledContinuation(this, cause, handled = state is CancelHandler)
208+
val update = CancelledContinuation(this, cause, handled = state is CancelHandler || state is Segment<*>)
198209
if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
199210
// Invoke cancel handler if it was present
200-
(state as? CancelHandler)?.let { callCancelHandler(it, cause) }
211+
when (state) {
212+
is CancelHandler -> callCancelHandler(state, cause)
213+
is Segment<*> -> callSegmentOnCancellation(state, cause)
214+
}
201215
// Complete state update
202216
detachChildIfNonResuable()
203217
dispatchResume(resumeMode) // no need for additional cancellation checks
@@ -234,6 +248,12 @@ internal open class CancellableContinuationImpl<in T>(
234248
fun callCancelHandler(handler: CancelHandler, cause: Throwable?) =
235249
callCancelHandlerSafely { handler.invoke(cause) }
236250

251+
private fun callSegmentOnCancellation(segment: Segment<*>, cause: Throwable?) {
252+
val index = _decisionAndIndex.value.index
253+
check(index != NO_INDEX) { "The index for Segment.onCancellation(..) is broken" }
254+
callCancelHandlerSafely { segment.onCancellation(index, cause) }
255+
}
256+
237257
fun callOnCancellation(onCancellation: (cause: Throwable) -> Unit, cause: Throwable) {
238258
try {
239259
onCancellation.invoke(cause)
@@ -253,19 +273,19 @@ internal open class CancellableContinuationImpl<in T>(
253273
parent.getCancellationException()
254274

255275
private fun trySuspend(): Boolean {
256-
_decision.loop { decision ->
257-
when (decision) {
258-
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
276+
_decisionAndIndex.loop { cur ->
277+
when (cur.decision) {
278+
UNDECIDED -> if (this._decisionAndIndex.compareAndSet(cur, decisionAndIndex(SUSPENDED, cur.index))) return true
259279
RESUMED -> return false
260280
else -> error("Already suspended")
261281
}
262282
}
263283
}
264284

265285
private fun tryResume(): Boolean {
266-
_decision.loop { decision ->
267-
when (decision) {
268-
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
286+
_decisionAndIndex.loop { cur ->
287+
when (cur.decision) {
288+
UNDECIDED -> if (this._decisionAndIndex.compareAndSet(cur, decisionAndIndex(RESUMED, cur.index))) return true
269289
SUSPENDED -> return false
270290
else -> error("Already resumed")
271291
}
@@ -275,7 +295,7 @@ internal open class CancellableContinuationImpl<in T>(
275295
@PublishedApi
276296
internal fun getResult(): Any? {
277297
val isReusable = isReusable()
278-
// trySuspend may fail either if 'block' has resumed/cancelled a continuation
298+
// trySuspend may fail either if 'block' has resumed/cancelled a continuation,
279299
// or we got async cancellation from parent.
280300
if (trySuspend()) {
281301
/*
@@ -350,14 +370,44 @@ internal open class CancellableContinuationImpl<in T>(
350370
override fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?) =
351371
resumeImpl(value, resumeMode, onCancellation)
352372

373+
/**
374+
* An optimized version for the code below that does not allocate
375+
* a cancellation handler object and efficiently stores the specified
376+
* [segment] and [index] in this [CancellableContinuationImpl].
377+
*
378+
* The only difference is that `segment.onCancellation(..)` is never
379+
* called if this continuation is already completed; thus,
380+
* the semantics is similar to [BeforeResumeCancelHandler].
381+
*
382+
* ```
383+
* invokeOnCancellation { cause ->
384+
* segment.onCancellation(index, cause)
385+
* }
386+
* ```
387+
*/
388+
override fun invokeOnCancellation(segment: Segment<*>, index: Int) {
389+
_decisionAndIndex.update {
390+
check(it.index == NO_INDEX) {
391+
"invokeOnCancellation should be called at most once"
392+
}
393+
decisionAndIndex(it.decision, index)
394+
}
395+
invokeOnCancellationImpl(segment)
396+
}
397+
353398
public override fun invokeOnCancellation(handler: CompletionHandler) {
354399
val cancelHandler = makeCancelHandler(handler)
400+
invokeOnCancellationImpl(cancelHandler)
401+
}
402+
403+
private fun invokeOnCancellationImpl(handler: Any) {
404+
assert { handler is CancelHandler || handler is Segment<*> }
355405
_state.loop { state ->
356406
when (state) {
357407
is Active -> {
358-
if (_state.compareAndSet(state, cancelHandler)) return // quit on cas success
408+
if (_state.compareAndSet(state, handler)) return // quit on cas success
359409
}
360-
is CancelHandler -> multipleHandlersError(handler, state)
410+
is CancelHandler, is Segment<*> -> multipleHandlersError(handler, state)
361411
is CompletedExceptionally -> {
362412
/*
363413
* Continuation was already cancelled or completed exceptionally.
@@ -371,7 +421,13 @@ internal open class CancellableContinuationImpl<in T>(
371421
* because we play type tricks on Kotlin/JS and handler is not necessarily a function there
372422
*/
373423
if (state is CancelledContinuation) {
374-
callCancelHandler(handler, (state as? CompletedExceptionally)?.cause)
424+
val cause: Throwable? = (state as? CompletedExceptionally)?.cause
425+
if (handler is CancelHandler) {
426+
callCancelHandler(handler, cause)
427+
} else {
428+
val segment = handler as Segment<*>
429+
callSegmentOnCancellation(segment, cause)
430+
}
375431
}
376432
return
377433
}
@@ -380,14 +436,16 @@ internal open class CancellableContinuationImpl<in T>(
380436
* Continuation was already completed, and might already have cancel handler.
381437
*/
382438
if (state.cancelHandler != null) multipleHandlersError(handler, state)
383-
// BeforeResumeCancelHandler does not need to be called on a completed continuation
384-
if (cancelHandler is BeforeResumeCancelHandler) return
439+
// BeforeResumeCancelHandler and Segment.invokeOnCancellation(..)
440+
// do NOT need to be called on completed continuation.
441+
if (handler is BeforeResumeCancelHandler || handler is Segment<*>) return
442+
handler as CancelHandler
385443
if (state.cancelled) {
386444
// Was already cancelled while being dispatched -- invoke the handler directly
387445
callCancelHandler(handler, state.cancelCause)
388446
return
389447
}
390-
val update = state.copy(cancelHandler = cancelHandler)
448+
val update = state.copy(cancelHandler = handler)
391449
if (_state.compareAndSet(state, update)) return // quit on cas success
392450
}
393451
else -> {
@@ -396,15 +454,16 @@ internal open class CancellableContinuationImpl<in T>(
396454
* Change its state to CompletedContinuation, unless we have BeforeResumeCancelHandler which
397455
* does not need to be called in this case.
398456
*/
399-
if (cancelHandler is BeforeResumeCancelHandler) return
400-
val update = CompletedContinuation(state, cancelHandler = cancelHandler)
457+
if (handler is BeforeResumeCancelHandler || handler is Segment<*>) return
458+
handler as CancelHandler
459+
val update = CompletedContinuation(state, cancelHandler = handler)
401460
if (_state.compareAndSet(state, update)) return // quit on cas success
402461
}
403462
}
404463
}
405464
}
406465

407-
private fun multipleHandlersError(handler: CompletionHandler, state: Any?) {
466+
private fun multipleHandlersError(handler: Any, state: Any?) {
408467
error("It's prohibited to register multiple handlers, tried to register $handler, already has $state")
409468
}
410469

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright 2016-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlinx.coroutines.internal.Segment
8+
import kotlinx.coroutines.selects.*
9+
10+
/**
11+
* All waiters (such as [CancellableContinuationImpl] and [SelectInstance]) in synchronization and
12+
* communication primitives, should implement this interface to make the code faster and easier to read.
13+
*/
14+
internal interface Waiter {
15+
/**
16+
* When this waiter is cancelled, [Segment.onCancellation] with
17+
* the specified [segment] and [index] should be called.
18+
* This function installs the corresponding cancellation handler.
19+
*/
20+
fun invokeOnCancellation(segment: Segment<*>, index: Int)
21+
}

0 commit comments

Comments
 (0)