Skip to content

Commit 2da6817

Browse files
ndkovalqwwdfsad
andauthored
Optimize CancellableContinuationImpl.invokeOnCancellation(..) for Segments (#3084)
* This optimization enables allocation-free channel operations, as we no longer have to allocate a cancellation handler for suspending channel operations * Add a sequential semaphore benchmark and a generalized version of `ChannelSinkBenchmark` that supports buffered channels and pre-allocates elements to isolate the effect Signed-off-by: Nikita Koval <[email protected]> Co-authored-by: Vsevolod Tolstopyatov <[email protected]>
1 parent cb0ef71 commit 2da6817

13 files changed

+363
-164
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2016-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package benchmarks
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.channels.*
9+
import org.openjdk.jmh.annotations.*
10+
import java.util.concurrent.*
11+
import kotlin.coroutines.*
12+
13+
@Warmup(iterations = 3, time = 1)
14+
@Measurement(iterations = 5, time = 1)
15+
@BenchmarkMode(Mode.AverageTime)
16+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
17+
@State(Scope.Benchmark)
18+
@Fork(1)
19+
open class ChannelSinkNoAllocationsBenchmark {
20+
private val unconfined = Dispatchers.Unconfined
21+
22+
@Benchmark
23+
fun channelPipeline(): Int = runBlocking {
24+
run(unconfined)
25+
}
26+
27+
private suspend inline fun run(context: CoroutineContext): Int {
28+
var size = 0
29+
Channel.range(context).consumeEach { size++ }
30+
return size
31+
}
32+
33+
private fun Channel.Factory.range(context: CoroutineContext) = GlobalScope.produce(context) {
34+
for (i in 0 until 100_000)
35+
send(Unit) // no allocations
36+
}
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package benchmarks
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.sync.*
9+
import org.openjdk.jmh.annotations.*
10+
import java.util.concurrent.TimeUnit
11+
import kotlin.test.*
12+
13+
@Warmup(iterations = 5, time = 1)
14+
@Measurement(iterations = 10, time = 1)
15+
@BenchmarkMode(Mode.AverageTime)
16+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
17+
@State(Scope.Benchmark)
18+
@Fork(1)
19+
open class SequentialSemaphoreAsMutexBenchmark {
20+
val s = Semaphore(1)
21+
22+
@Benchmark
23+
fun benchmark() : Unit = runBlocking {
24+
val s = Semaphore(permits = 1, acquiredPermits = 1)
25+
var step = 0
26+
launch(Dispatchers.Unconfined) {
27+
repeat(N) {
28+
assertEquals(it * 2, step)
29+
step++
30+
s.acquire()
31+
}
32+
}
33+
repeat(N) {
34+
assertEquals(it * 2 + 1, step)
35+
step++
36+
s.release()
37+
}
38+
}
39+
}
40+
41+
fun main() = SequentialSemaphoreAsMutexBenchmark().benchmark()
42+
43+
private val N = 1_000_000

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+3-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public final class kotlinx/coroutines/CancellableContinuation$DefaultImpls {
5151
public static synthetic fun tryResume$default (Lkotlinx/coroutines/CancellableContinuation;Ljava/lang/Object;Ljava/lang/Object;ILjava/lang/Object;)Ljava/lang/Object;
5252
}
5353

54-
public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/jvm/internal/CoroutineStackFrame, kotlinx/coroutines/CancellableContinuation, kotlinx/coroutines/channels/Waiter {
54+
public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/jvm/internal/CoroutineStackFrame, kotlinx/coroutines/CancellableContinuation, kotlinx/coroutines/Waiter {
5555
public fun <init> (Lkotlin/coroutines/Continuation;I)V
5656
public final fun callCancelHandler (Lkotlinx/coroutines/CancelHandler;Ljava/lang/Throwable;)V
5757
public final fun callOnCancellation (Lkotlin/jvm/functions/Function1;Ljava/lang/Throwable;)V
@@ -64,6 +64,7 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/
6464
public fun getStackTraceElement ()Ljava/lang/StackTraceElement;
6565
public fun initCancellability ()V
6666
public fun invokeOnCancellation (Lkotlin/jvm/functions/Function1;)V
67+
public fun invokeOnCancellation (Lkotlinx/coroutines/internal/Segment;I)V
6768
public fun isActive ()Z
6869
public fun isCancelled ()Z
6970
public fun isCompleted ()Z
@@ -1258,6 +1259,7 @@ public class kotlinx/coroutines/selects/SelectImplementation : kotlinx/coroutine
12581259
public fun invoke (Lkotlinx/coroutines/selects/SelectClause1;Lkotlin/jvm/functions/Function2;)V
12591260
public fun invoke (Lkotlinx/coroutines/selects/SelectClause2;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)V
12601261
public fun invoke (Lkotlinx/coroutines/selects/SelectClause2;Lkotlin/jvm/functions/Function2;)V
1262+
public fun invokeOnCancellation (Lkotlinx/coroutines/internal/Segment;I)V
12611263
public fun onTimeout (JLkotlin/jvm/functions/Function1;)V
12621264
public fun selectInRegistrationPhase (Ljava/lang/Object;)V
12631265
public fun trySelect (Ljava/lang/Object;Ljava/lang/Object;)Z

kotlinx-coroutines-core/common/src/CancellableContinuation.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ public suspend inline fun <T> suspendCancellableCoroutine(
328328
* [CancellableContinuationImpl] is reused.
329329
*/
330330
internal suspend inline fun <T> suspendCancellableCoroutineReusable(
331-
crossinline block: (CancellableContinuation<T>) -> Unit
331+
crossinline block: (CancellableContinuationImpl<T>) -> Unit
332332
): T = suspendCoroutineUninterceptedOrReturn { uCont ->
333333
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted())
334334
block(cancellable)

kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt

+82-23
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package kotlinx.coroutines
66

77
import kotlinx.atomicfu.*
8-
import kotlinx.coroutines.channels.Waiter
98
import kotlinx.coroutines.internal.*
109
import kotlin.coroutines.*
1110
import kotlin.coroutines.intrinsics.*
@@ -15,6 +14,15 @@ private const val UNDECIDED = 0
1514
private const val SUSPENDED = 1
1615
private const val RESUMED = 2
1716

17+
private const val DECISION_SHIFT = 29
18+
private const val INDEX_MASK = (1 shl DECISION_SHIFT) - 1
19+
private const val NO_INDEX = INDEX_MASK
20+
21+
private inline val Int.decision get() = this shr DECISION_SHIFT
22+
private inline val Int.index get() = this and INDEX_MASK
23+
@Suppress("NOTHING_TO_INLINE")
24+
private inline fun decisionAndIndex(decision: Int, index: Int) = (decision shl DECISION_SHIFT) + index
25+
1826
@JvmField
1927
internal val RESUME_TOKEN = Symbol("RESUME_TOKEN")
2028

@@ -44,7 +52,7 @@ internal open class CancellableContinuationImpl<in T>(
4452
* less dependencies.
4553
*/
4654

47-
/* decision state machine
55+
/** decision state machine
4856
4957
+-----------+ trySuspend +-----------+
5058
| UNDECIDED | -------------> | SUSPENDED |
@@ -56,9 +64,12 @@ internal open class CancellableContinuationImpl<in T>(
5664
| RESUMED |
5765
+-----------+
5866
59-
Note: both tryResume and trySuspend can be invoked at most once, first invocation wins
67+
Note: both tryResume and trySuspend can be invoked at most once, first invocation wins.
68+
If the cancellation handler is specified via a [Segment] instance and the index in it
69+
(so [Segment.onCancellation] should be called), the [_decisionAndIndex] field may store
70+
this index additionally to the "decision" value.
6071
*/
61-
private val _decision = atomic(UNDECIDED)
72+
private val _decisionAndIndex = atomic(decisionAndIndex(UNDECIDED, NO_INDEX))
6273

6374
/*
6475
=== Internal states ===
@@ -144,7 +155,7 @@ internal open class CancellableContinuationImpl<in T>(
144155
detachChild()
145156
return false
146157
}
147-
_decision.value = UNDECIDED
158+
_decisionAndIndex.value = decisionAndIndex(UNDECIDED, NO_INDEX)
148159
_state.value = Active
149160
return true
150161
}
@@ -194,10 +205,13 @@ internal open class CancellableContinuationImpl<in T>(
194205
_state.loop { state ->
195206
if (state !is NotCompleted) return false // false if already complete or cancelling
196207
// Active -- update to final state
197-
val update = CancelledContinuation(this, cause, handled = state is CancelHandler)
208+
val update = CancelledContinuation(this, cause, handled = state is CancelHandler || state is Segment<*>)
198209
if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
199210
// Invoke cancel handler if it was present
200-
(state as? CancelHandler)?.let { callCancelHandler(it, cause) }
211+
when (state) {
212+
is CancelHandler -> callCancelHandler(state, cause)
213+
is Segment<*> -> callSegmentOnCancellation(state, cause)
214+
}
201215
// Complete state update
202216
detachChildIfNonResuable()
203217
dispatchResume(resumeMode) // no need for additional cancellation checks
@@ -234,6 +248,12 @@ internal open class CancellableContinuationImpl<in T>(
234248
fun callCancelHandler(handler: CancelHandler, cause: Throwable?) =
235249
callCancelHandlerSafely { handler.invoke(cause) }
236250

251+
private fun callSegmentOnCancellation(segment: Segment<*>, cause: Throwable?) {
252+
val index = _decisionAndIndex.value.index
253+
check(index != NO_INDEX) { "The index for Segment.onCancellation(..) is broken" }
254+
callCancelHandlerSafely { segment.onCancellation(index, cause) }
255+
}
256+
237257
fun callOnCancellation(onCancellation: (cause: Throwable) -> Unit, cause: Throwable) {
238258
try {
239259
onCancellation.invoke(cause)
@@ -253,19 +273,19 @@ internal open class CancellableContinuationImpl<in T>(
253273
parent.getCancellationException()
254274

255275
private fun trySuspend(): Boolean {
256-
_decision.loop { decision ->
257-
when (decision) {
258-
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
276+
_decisionAndIndex.loop { cur ->
277+
when (cur.decision) {
278+
UNDECIDED -> if (this._decisionAndIndex.compareAndSet(cur, decisionAndIndex(SUSPENDED, cur.index))) return true
259279
RESUMED -> return false
260280
else -> error("Already suspended")
261281
}
262282
}
263283
}
264284

265285
private fun tryResume(): Boolean {
266-
_decision.loop { decision ->
267-
when (decision) {
268-
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
286+
_decisionAndIndex.loop { cur ->
287+
when (cur.decision) {
288+
UNDECIDED -> if (this._decisionAndIndex.compareAndSet(cur, decisionAndIndex(RESUMED, cur.index))) return true
269289
SUSPENDED -> return false
270290
else -> error("Already resumed")
271291
}
@@ -275,7 +295,7 @@ internal open class CancellableContinuationImpl<in T>(
275295
@PublishedApi
276296
internal fun getResult(): Any? {
277297
val isReusable = isReusable()
278-
// trySuspend may fail either if 'block' has resumed/cancelled a continuation
298+
// trySuspend may fail either if 'block' has resumed/cancelled a continuation,
279299
// or we got async cancellation from parent.
280300
if (trySuspend()) {
281301
/*
@@ -350,14 +370,44 @@ internal open class CancellableContinuationImpl<in T>(
350370
override fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?) =
351371
resumeImpl(value, resumeMode, onCancellation)
352372

373+
/**
374+
* An optimized version for the code below that does not allocate
375+
* a cancellation handler object and efficiently stores the specified
376+
* [segment] and [index] in this [CancellableContinuationImpl].
377+
*
378+
* The only difference is that `segment.onCancellation(..)` is never
379+
* called if this continuation is already completed; thus,
380+
* the semantics is similar to [BeforeResumeCancelHandler].
381+
*
382+
* ```
383+
* invokeOnCancellation { cause ->
384+
* segment.onCancellation(index, cause)
385+
* }
386+
* ```
387+
*/
388+
override fun invokeOnCancellation(segment: Segment<*>, index: Int) {
389+
_decisionAndIndex.update {
390+
check(it.index == NO_INDEX) {
391+
"invokeOnCancellation should be called at most once"
392+
}
393+
decisionAndIndex(it.decision, index)
394+
}
395+
invokeOnCancellationImpl(segment)
396+
}
397+
353398
public override fun invokeOnCancellation(handler: CompletionHandler) {
354399
val cancelHandler = makeCancelHandler(handler)
400+
invokeOnCancellationImpl(cancelHandler)
401+
}
402+
403+
private fun invokeOnCancellationImpl(handler: Any) {
404+
assert { handler is CancelHandler || handler is Segment<*> }
355405
_state.loop { state ->
356406
when (state) {
357407
is Active -> {
358-
if (_state.compareAndSet(state, cancelHandler)) return // quit on cas success
408+
if (_state.compareAndSet(state, handler)) return // quit on cas success
359409
}
360-
is CancelHandler -> multipleHandlersError(handler, state)
410+
is CancelHandler, is Segment<*> -> multipleHandlersError(handler, state)
361411
is CompletedExceptionally -> {
362412
/*
363413
* Continuation was already cancelled or completed exceptionally.
@@ -371,7 +421,13 @@ internal open class CancellableContinuationImpl<in T>(
371421
* because we play type tricks on Kotlin/JS and handler is not necessarily a function there
372422
*/
373423
if (state is CancelledContinuation) {
374-
callCancelHandler(handler, (state as? CompletedExceptionally)?.cause)
424+
val cause: Throwable? = (state as? CompletedExceptionally)?.cause
425+
if (handler is CancelHandler) {
426+
callCancelHandler(handler, cause)
427+
} else {
428+
val segment = handler as Segment<*>
429+
callSegmentOnCancellation(segment, cause)
430+
}
375431
}
376432
return
377433
}
@@ -380,14 +436,16 @@ internal open class CancellableContinuationImpl<in T>(
380436
* Continuation was already completed, and might already have cancel handler.
381437
*/
382438
if (state.cancelHandler != null) multipleHandlersError(handler, state)
383-
// BeforeResumeCancelHandler does not need to be called on a completed continuation
384-
if (cancelHandler is BeforeResumeCancelHandler) return
439+
// BeforeResumeCancelHandler and Segment.invokeOnCancellation(..)
440+
// do NOT need to be called on completed continuation.
441+
if (handler is BeforeResumeCancelHandler || handler is Segment<*>) return
442+
handler as CancelHandler
385443
if (state.cancelled) {
386444
// Was already cancelled while being dispatched -- invoke the handler directly
387445
callCancelHandler(handler, state.cancelCause)
388446
return
389447
}
390-
val update = state.copy(cancelHandler = cancelHandler)
448+
val update = state.copy(cancelHandler = handler)
391449
if (_state.compareAndSet(state, update)) return // quit on cas success
392450
}
393451
else -> {
@@ -396,15 +454,16 @@ internal open class CancellableContinuationImpl<in T>(
396454
* Change its state to CompletedContinuation, unless we have BeforeResumeCancelHandler which
397455
* does not need to be called in this case.
398456
*/
399-
if (cancelHandler is BeforeResumeCancelHandler) return
400-
val update = CompletedContinuation(state, cancelHandler = cancelHandler)
457+
if (handler is BeforeResumeCancelHandler || handler is Segment<*>) return
458+
handler as CancelHandler
459+
val update = CompletedContinuation(state, cancelHandler = handler)
401460
if (_state.compareAndSet(state, update)) return // quit on cas success
402461
}
403462
}
404463
}
405464
}
406465

407-
private fun multipleHandlersError(handler: CompletionHandler, state: Any?) {
466+
private fun multipleHandlersError(handler: Any, state: Any?) {
408467
error("It's prohibited to register multiple handlers, tried to register $handler, already has $state")
409468
}
410469

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright 2016-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlinx.coroutines.internal.Segment
8+
import kotlinx.coroutines.selects.*
9+
10+
/**
11+
* All waiters (such as [CancellableContinuationImpl] and [SelectInstance]) in synchronization and
12+
* communication primitives, should implement this interface to make the code faster and easier to read.
13+
*/
14+
internal interface Waiter {
15+
/**
16+
* When this waiter is cancelled, [Segment.onCancellation] with
17+
* the specified [segment] and [index] should be called.
18+
* This function installs the corresponding cancellation handler.
19+
*/
20+
fun invokeOnCancellation(segment: Segment<*>, index: Int)
21+
}

0 commit comments

Comments
 (0)