Skip to content

Commit c7042e2

Browse files
committed
In progress
Signed-off-by: Nikita Koval <[email protected]>
1 parent 44a302e commit c7042e2

10 files changed

+276
-166
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+3-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public final class kotlinx/coroutines/CancellableContinuation$DefaultImpls {
5151
public static synthetic fun tryResume$default (Lkotlinx/coroutines/CancellableContinuation;Ljava/lang/Object;Ljava/lang/Object;ILjava/lang/Object;)Ljava/lang/Object;
5252
}
5353

54-
public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/jvm/internal/CoroutineStackFrame, kotlinx/coroutines/CancellableContinuation, kotlinx/coroutines/channels/Waiter {
54+
public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/jvm/internal/CoroutineStackFrame, kotlinx/coroutines/CancellableContinuation, kotlinx/coroutines/Waiter {
5555
public fun <init> (Lkotlin/coroutines/Continuation;I)V
5656
public final fun callCancelHandler (Lkotlinx/coroutines/CancelHandler;Ljava/lang/Throwable;)V
5757
public final fun callOnCancellation (Lkotlin/jvm/functions/Function1;Ljava/lang/Throwable;)V
@@ -64,6 +64,7 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/
6464
public fun getStackTraceElement ()Ljava/lang/StackTraceElement;
6565
public fun initCancellability ()V
6666
public fun invokeOnCancellation (Lkotlin/jvm/functions/Function1;)V
67+
public fun invokeOnCancellation (Lkotlinx/coroutines/internal/Segment;I)V
6768
public fun isActive ()Z
6869
public fun isCancelled ()Z
6970
public fun isCompleted ()Z
@@ -1257,6 +1258,7 @@ public class kotlinx/coroutines/selects/SelectImplementation : kotlinx/coroutine
12571258
public fun invoke (Lkotlinx/coroutines/selects/SelectClause1;Lkotlin/jvm/functions/Function2;)V
12581259
public fun invoke (Lkotlinx/coroutines/selects/SelectClause2;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)V
12591260
public fun invoke (Lkotlinx/coroutines/selects/SelectClause2;Lkotlin/jvm/functions/Function2;)V
1261+
public fun invokeOnCancellation (Lkotlinx/coroutines/internal/Segment;I)V
12601262
public fun onTimeout (JLkotlin/jvm/functions/Function1;)V
12611263
public fun selectInRegistrationPhase (Ljava/lang/Object;)V
12621264
public fun trySelect (Ljava/lang/Object;Ljava/lang/Object;)Z

kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt

+77-23
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package kotlinx.coroutines
66

77
import kotlinx.atomicfu.*
8-
import kotlinx.coroutines.channels.Waiter
98
import kotlinx.coroutines.internal.*
109
import kotlin.coroutines.*
1110
import kotlin.coroutines.intrinsics.*
@@ -15,6 +14,15 @@ private const val UNDECIDED = 0
1514
private const val SUSPENDED = 1
1615
private const val RESUMED = 2
1716

17+
private const val DECISION_SHIFT = 29
18+
private const val INDEX_MASK = (1 shl DECISION_SHIFT) - 1
19+
private const val NO_INDEX = INDEX_MASK
20+
21+
private inline val Int.decision get() = this shr DECISION_SHIFT
22+
private inline val Int.index get() = this and INDEX_MASK
23+
@Suppress("NOTHING_TO_INLINE")
24+
private inline fun decisionAndIndex(decision: Int, index: Int) = (decision shl DECISION_SHIFT) + index
25+
1826
@JvmField
1927
internal val RESUME_TOKEN = Symbol("RESUME_TOKEN")
2028

@@ -44,7 +52,7 @@ internal open class CancellableContinuationImpl<in T>(
4452
* less dependencies.
4553
*/
4654

47-
/* decision state machine
55+
/** decision state machine
4856
4957
+-----------+ trySuspend +-----------+
5058
| UNDECIDED | -------------> | SUSPENDED |
@@ -56,9 +64,11 @@ internal open class CancellableContinuationImpl<in T>(
5664
| RESUMED |
5765
+-----------+
5866
59-
Note: both tryResume and trySuspend can be invoked at most once, first invocation wins
67+
Note: both tryResume and trySuspend can be invoked at most once, first invocation wins.
68+
In addition to the "decision" value, the [_decisionAndIndex] field may store the index in
69+
the [Segment], if the cancellation handler stores
6070
*/
61-
private val _decision = atomic(UNDECIDED)
71+
private val _decisionAndIndex = atomic(decisionAndIndex(UNDECIDED, NO_INDEX))
6272

6373
/*
6474
=== Internal states ===
@@ -144,7 +154,7 @@ internal open class CancellableContinuationImpl<in T>(
144154
detachChild()
145155
return false
146156
}
147-
_decision.value = UNDECIDED
157+
_decisionAndIndex.value = decisionAndIndex(UNDECIDED, NO_INDEX)
148158
_state.value = Active
149159
return true
150160
}
@@ -194,10 +204,13 @@ internal open class CancellableContinuationImpl<in T>(
194204
_state.loop { state ->
195205
if (state !is NotCompleted) return false // false if already complete or cancelling
196206
// Active -- update to final state
197-
val update = CancelledContinuation(this, cause, handled = state is CancelHandler)
207+
val update = CancelledContinuation(this, cause, handled = state is CancelHandler || state is Segment<*>)
198208
if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
199209
// Invoke cancel handler if it was present
200-
(state as? CancelHandler)?.let { callCancelHandler(it, cause) }
210+
when (state) {
211+
is CancelHandler -> callCancelHandler(state, cause)
212+
is Segment<*> -> callSegmentOnCancellation(state, cause)
213+
}
201214
// Complete state update
202215
detachChildIfNonResuable()
203216
dispatchResume(resumeMode) // no need for additional cancellation checks
@@ -234,6 +247,12 @@ internal open class CancellableContinuationImpl<in T>(
234247
fun callCancelHandler(handler: CancelHandler, cause: Throwable?) =
235248
callCancelHandlerSafely { handler.invoke(cause) }
236249

250+
private fun callSegmentOnCancellation(segment: Segment<*>, cause: Throwable?) {
251+
val index = _decisionAndIndex.value.index
252+
check(index != NO_INDEX) { "The index for Segment.onCancellation(..) is broken" }
253+
callCancelHandlerSafely { segment.onCancellation(index, cause) }
254+
}
255+
237256
fun callOnCancellation(onCancellation: (cause: Throwable) -> Unit, cause: Throwable) {
238257
try {
239258
onCancellation.invoke(cause)
@@ -253,19 +272,19 @@ internal open class CancellableContinuationImpl<in T>(
253272
parent.getCancellationException()
254273

255274
private fun trySuspend(): Boolean {
256-
_decision.loop { decision ->
257-
when (decision) {
258-
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
275+
_decisionAndIndex.loop { cur ->
276+
when (cur.decision) {
277+
UNDECIDED -> if (this._decisionAndIndex.compareAndSet(cur, decisionAndIndex(SUSPENDED, cur.index))) return true
259278
RESUMED -> return false
260279
else -> error("Already suspended")
261280
}
262281
}
263282
}
264283

265284
private fun tryResume(): Boolean {
266-
_decision.loop { decision ->
267-
when (decision) {
268-
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
285+
_decisionAndIndex.loop { cur ->
286+
when (cur.decision) {
287+
UNDECIDED -> if (this._decisionAndIndex.compareAndSet(cur, decisionAndIndex(RESUMED, cur.index))) return true
269288
SUSPENDED -> return false
270289
else -> error("Already resumed")
271290
}
@@ -275,7 +294,7 @@ internal open class CancellableContinuationImpl<in T>(
275294
@PublishedApi
276295
internal fun getResult(): Any? {
277296
val isReusable = isReusable()
278-
// trySuspend may fail either if 'block' has resumed/cancelled a continuation
297+
// trySuspend may fail either if 'block' has resumed/cancelled a continuation,
279298
// or we got async cancellation from parent.
280299
if (trySuspend()) {
281300
/*
@@ -350,14 +369,40 @@ internal open class CancellableContinuationImpl<in T>(
350369
override fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?) =
351370
resumeImpl(value, resumeMode, onCancellation)
352371

372+
/**
373+
* An optimized version for the code below that does not allocate
374+
* a cancellation handler object and efficiently stores the specified
375+
* [segment] and [index] in this [CancellableContinuationImpl].
376+
*
377+
* ```
378+
* invokeOnCancellation { cause ->
379+
* segment.onCancellation(index, cause)
380+
* }
381+
* ```
382+
*/
383+
override fun invokeOnCancellation(segment: Segment<*>, index: Int) {
384+
_decisionAndIndex.update {
385+
check(it.index == NO_INDEX) {
386+
"invokeOnCancellation should be invoked at most once"
387+
}
388+
decisionAndIndex(it.decision, index)
389+
}
390+
invokeOnCancellationImpl(segment)
391+
}
392+
353393
public override fun invokeOnCancellation(handler: CompletionHandler) {
354394
val cancelHandler = makeCancelHandler(handler)
395+
invokeOnCancellationImpl(cancelHandler)
396+
}
397+
398+
private fun invokeOnCancellationImpl(handler: Any) {
399+
assert { handler is CancelHandler || handler is Segment<*> }
355400
_state.loop { state ->
356401
when (state) {
357402
is Active -> {
358-
if (_state.compareAndSet(state, cancelHandler)) return // quit on cas success
403+
if (_state.compareAndSet(state, handler)) return // quit on cas success
359404
}
360-
is CancelHandler -> multipleHandlersError(handler, state)
405+
is CancelHandler, is Segment<*> -> multipleHandlersError(handler, state)
361406
is CompletedExceptionally -> {
362407
/*
363408
* Continuation was already cancelled or completed exceptionally.
@@ -371,7 +416,13 @@ internal open class CancellableContinuationImpl<in T>(
371416
* because we play type tricks on Kotlin/JS and handler is not necessarily a function there
372417
*/
373418
if (state is CancelledContinuation) {
374-
callCancelHandler(handler, (state as? CompletedExceptionally)?.cause)
419+
val cause: Throwable? = (state as? CompletedExceptionally)?.cause
420+
if (handler is CancelHandler) {
421+
callCancelHandler(handler, cause)
422+
} else {
423+
val segment = handler as Segment<*>
424+
callSegmentOnCancellation(segment, cause)
425+
}
375426
}
376427
return
377428
}
@@ -380,14 +431,16 @@ internal open class CancellableContinuationImpl<in T>(
380431
* Continuation was already completed, and might already have cancel handler.
381432
*/
382433
if (state.cancelHandler != null) multipleHandlersError(handler, state)
383-
// BeforeResumeCancelHandler does not need to be called on a completed continuation
384-
if (cancelHandler is BeforeResumeCancelHandler) return
434+
// BeforeResumeCancelHandler and Segment.invokeOnCancellation(..)
435+
// do NOT need to be called on completed continuation.
436+
if (handler is BeforeResumeCancelHandler || handler is Segment<*>) return
437+
handler as CancelHandler
385438
if (state.cancelled) {
386439
// Was already cancelled while being dispatched -- invoke the handler directly
387440
callCancelHandler(handler, state.cancelCause)
388441
return
389442
}
390-
val update = state.copy(cancelHandler = cancelHandler)
443+
val update = state.copy(cancelHandler = handler)
391444
if (_state.compareAndSet(state, update)) return // quit on cas success
392445
}
393446
else -> {
@@ -396,15 +449,16 @@ internal open class CancellableContinuationImpl<in T>(
396449
* Change its state to CompletedContinuation, unless we have BeforeResumeCancelHandler which
397450
* does not need to be called in this case.
398451
*/
399-
if (cancelHandler is BeforeResumeCancelHandler) return
400-
val update = CompletedContinuation(state, cancelHandler = cancelHandler)
452+
if (handler is BeforeResumeCancelHandler || handler is Segment<*>) return
453+
handler as CancelHandler
454+
val update = CompletedContinuation(state, cancelHandler = handler)
401455
if (_state.compareAndSet(state, update)) return // quit on cas success
402456
}
403457
}
404458
}
405459
}
406460

407-
private fun multipleHandlersError(handler: CompletionHandler, state: Any?) {
461+
private fun multipleHandlersError(handler: Any, state: Any?) {
408462
error("It's prohibited to register multiple handlers, tried to register $handler, already has $state")
409463
}
410464

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright 2016-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlinx.coroutines.internal.Segment
8+
import kotlinx.coroutines.selects.*
9+
10+
/**
11+
* All waiters (such as [CancellableContinuationImpl] and [SelectInstance]) in synchronization and
12+
* communication primitives, should implement this interface to make the code faster and easier to read.
13+
*/
14+
internal interface Waiter {
15+
/**
16+
* When this waiter is cancelled, [Segment.onCancellation] with
17+
* the specified [segment] and [index] should be called.
18+
* This function installs the corresponding cancellation handler.
19+
*/
20+
fun invokeOnCancellation(segment: Segment<*>, index: Int)
21+
}

0 commit comments

Comments
 (0)