Skip to content

Commit 1092fca

Browse files
committed
Optimize CancellableContinuationImpl.invokeOnCancellation(..) for Segments
1 parent 27d1463 commit 1092fca

File tree

5 files changed

+118
-35
lines changed

5 files changed

+118
-35
lines changed

benchmarks/src/jmh/kotlin/benchmarks/SequentialSemaphoreBenchmark.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ import org.openjdk.jmh.annotations.*
1010
import java.util.concurrent.TimeUnit
1111
import kotlin.test.*
1212

13-
@Warmup(iterations = 5, time = 100)
14-
@Measurement(iterations = 10, time = 100)
13+
@Warmup(iterations = 5, time = 1)
14+
@Measurement(iterations = 10, time = 1)
1515
@BenchmarkMode(Mode.AverageTime)
1616
@OutputTimeUnit(TimeUnit.MILLISECONDS)
1717
@State(Scope.Benchmark)

kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt

+69-19
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,18 @@ import kotlin.coroutines.*
1010
import kotlin.coroutines.intrinsics.*
1111
import kotlin.jvm.*
1212

13+
private const val DECISION_SHIFT = 29
14+
private const val INDEX_MASK = (1 shl DECISION_SHIFT) - 1
15+
private const val NO_INDEX = INDEX_MASK
1316
private const val UNDECIDED = 0
1417
private const val SUSPENDED = 1
1518
private const val RESUMED = 2
1619

20+
private inline val Int.decision get() = this shr DECISION_SHIFT
21+
private inline val Int.index get() = this and INDEX_MASK
22+
@Suppress("NOTHING_TO_INLINE")
23+
private inline fun construct(decision: Int, index: Int) = (decision shl DECISION_SHIFT) + index
24+
1725
@JvmField
1826
internal val RESUME_TOKEN = Symbol("RESUME_TOKEN")
1927

@@ -55,9 +63,9 @@ internal open class CancellableContinuationImpl<in T>(
5563
| RESUMED |
5664
+-----------+
5765
58-
Note: both tryResume and trySuspend can be invoked at most once, first invocation wins
66+
Note: both tryResume and trySuspend can be invoked at most once, first invocation wins.
5967
*/
60-
private val _decision = atomic(UNDECIDED)
68+
private val _decisionAndIndex = atomic(construct(UNDECIDED, NO_INDEX))
6169

6270
/*
6371
=== Internal states ===
@@ -122,7 +130,7 @@ internal open class CancellableContinuationImpl<in T>(
122130
detachChild()
123131
return false
124132
}
125-
_decision.value = UNDECIDED
133+
_decisionAndIndex.value = construct(UNDECIDED, NO_INDEX)
126134
_state.value = Active
127135
return true
128136
}
@@ -172,10 +180,11 @@ internal open class CancellableContinuationImpl<in T>(
172180
_state.loop { state ->
173181
if (state !is NotCompleted) return false // false if already complete or cancelling
174182
// Active -- update to final state
175-
val update = CancelledContinuation(this, cause, handled = state is CancelHandler)
183+
val update = CancelledContinuation(this, cause, handled = state is CancelHandler || state is Segment<*>)
176184
if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
177185
// Invoke cancel handler if it was present
178186
(state as? CancelHandler)?.let { callCancelHandler(it, cause) }
187+
(state as? Segment<*>)?.let { callSegmentOnCancellation(it, cause) }
179188
// Complete state update
180189
detachChildIfNonResuable()
181190
dispatchResume(resumeMode) // no need for additional cancellation checks
@@ -212,6 +221,13 @@ internal open class CancellableContinuationImpl<in T>(
212221
fun callCancelHandler(handler: CancelHandler, cause: Throwable?) =
213222
callCancelHandlerSafely { handler.invoke(cause) }
214223

224+
private fun callSegmentOnCancellation(segment: Segment<*>, cause: Throwable?) {
225+
val index = _decisionAndIndex.value.index
226+
check(index != NO_INDEX) { "The index for segment.invokeOnCancellation(..) is broken" }
227+
callCancelHandlerSafely { segment.invokeOnCancellation(index, cause) }
228+
}
229+
230+
215231
fun callOnCancellation(onCancellation: (cause: Throwable) -> Unit, cause: Throwable) {
216232
try {
217233
onCancellation.invoke(cause)
@@ -231,19 +247,19 @@ internal open class CancellableContinuationImpl<in T>(
231247
parent.getCancellationException()
232248

233249
private fun trySuspend(): Boolean {
234-
_decision.loop { decision ->
235-
when (decision) {
236-
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
250+
_decisionAndIndex.loop { cur ->
251+
when (cur.decision) {
252+
UNDECIDED -> if (this._decisionAndIndex.compareAndSet(cur, construct(SUSPENDED, cur.index))) return true
237253
RESUMED -> return false
238254
else -> error("Already suspended")
239255
}
240256
}
241257
}
242258

243259
private fun tryResume(): Boolean {
244-
_decision.loop { decision ->
245-
when (decision) {
246-
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
260+
_decisionAndIndex.loop { cur ->
261+
when (cur.decision) {
262+
UNDECIDED -> if (this._decisionAndIndex.compareAndSet(cur, construct(RESUMED, cur.index))) return true
247263
SUSPENDED -> return false
248264
else -> error("Already resumed")
249265
}
@@ -328,14 +344,39 @@ internal open class CancellableContinuationImpl<in T>(
328344
override fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?) =
329345
resumeImpl(value, resumeMode, onCancellation)
330346

347+
/**
348+
* An optimized version for the code below that does not allocate
349+
* a cancellation handler object and efficiently stores the specified
350+
* [segment] and [index] in this [CancellableContinuationImpl].
351+
* ```
352+
* invokeOnCancellation { cause ->
353+
* segment.invokeOnCancellation(index, cause)
354+
* }
355+
* ```
356+
*/
357+
internal fun invokeOnCancellation(segment: Segment<*>, index: Int) {
358+
_decisionAndIndex.update {
359+
check(it.index == NO_INDEX) {
360+
"invokeOnCancellation should be invoked at most once"
361+
}
362+
construct(it.decision, index)
363+
}
364+
invokeOnCancellationImpl(segment)
365+
}
366+
331367
public override fun invokeOnCancellation(handler: CompletionHandler) {
332368
val cancelHandler = makeCancelHandler(handler)
369+
invokeOnCancellationImpl(cancelHandler)
370+
}
371+
372+
private fun invokeOnCancellationImpl(handler: Any) {
373+
assert { handler is CancelHandler || handler is Segment<*> }
333374
_state.loop { state ->
334375
when (state) {
335376
is Active -> {
336-
if (_state.compareAndSet(state, cancelHandler)) return // quit on cas success
377+
if (_state.compareAndSet(state, handler)) return // quit on cas success
337378
}
338-
is CancelHandler -> multipleHandlersError(handler, state)
379+
is CancelHandler, is Segment<*> -> multipleHandlersError(handler, state)
339380
is CompletedExceptionally -> {
340381
/*
341382
* Continuation was already cancelled or completed exceptionally.
@@ -349,7 +390,13 @@ internal open class CancellableContinuationImpl<in T>(
349390
* because we play type tricks on Kotlin/JS and handler is not necessarily a function there
350391
*/
351392
if (state is CancelledContinuation) {
352-
callCancelHandler(handler, (state as? CompletedExceptionally)?.cause)
393+
val cause: Throwable? = (state as? CompletedExceptionally)?.cause
394+
if (handler is CancelHandler) {
395+
callCancelHandler(handler, cause)
396+
} else {
397+
val segment = handler as Segment<*>
398+
callSegmentOnCancellation(segment, cause)
399+
}
353400
}
354401
return
355402
}
@@ -358,14 +405,16 @@ internal open class CancellableContinuationImpl<in T>(
358405
* Continuation was already completed, and might already have cancel handler.
359406
*/
360407
if (state.cancelHandler != null) multipleHandlersError(handler, state)
361-
// BeforeResumeCancelHandler does not need to be called on a completed continuation
362-
if (cancelHandler is BeforeResumeCancelHandler) return
408+
// BeforeResumeCancelHandler and Segment.invokeOnCancellation(..)
409+
// do NOT need to be called on completed continuation.
410+
if (handler is BeforeResumeCancelHandler || handler is Segment<*>) return
411+
handler as CancelHandler
363412
if (state.cancelled) {
364413
// Was already cancelled while being dispatched -- invoke the handler directly
365414
callCancelHandler(handler, state.cancelCause)
366415
return
367416
}
368-
val update = state.copy(cancelHandler = cancelHandler)
417+
val update = state.copy(cancelHandler = handler)
369418
if (_state.compareAndSet(state, update)) return // quit on cas success
370419
}
371420
else -> {
@@ -374,15 +423,16 @@ internal open class CancellableContinuationImpl<in T>(
374423
* Change its state to CompletedContinuation, unless we have BeforeResumeCancelHandler which
375424
* does not need to be called in this case.
376425
*/
377-
if (cancelHandler is BeforeResumeCancelHandler) return
378-
val update = CompletedContinuation(state, cancelHandler = cancelHandler)
426+
if (handler is BeforeResumeCancelHandler || handler is Segment<*>) return
427+
handler as CancelHandler
428+
val update = CompletedContinuation(state, cancelHandler = handler)
379429
if (_state.compareAndSet(state, update)) return // quit on cas success
380430
}
381431
}
382432
}
383433
}
384434

385-
private fun multipleHandlersError(handler: CompletionHandler, state: Any?) {
435+
private fun multipleHandlersError(handler: Any, state: Any?) {
386436
error("It's prohibited to register multiple handlers, tried to register $handler, already has $state")
387437
}
388438

kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt

+16-1
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,15 @@ internal abstract class ConcurrentLinkedListNode<N : ConcurrentLinkedListNode<N>
186186
* Essentially, this is a node in the Michael-Scott queue algorithm,
187187
* but with maintaining [prev] pointer for efficient [remove] implementation.
188188
*/
189-
internal abstract class Segment<S : Segment<S>>(val id: Long, prev: S?, pointers: Int): ConcurrentLinkedListNode<S>(prev) {
189+
internal abstract class Segment<S : Segment<S>>(val id: Long, prev: S?, pointers: Int) :
190+
ConcurrentLinkedListNode<S>(prev),
191+
// Segments typically store waiting continuations. Thus, on cancellation, the corresponding
192+
// slot should be cleaned and the segment should be removed if it becomes full of cancelled cells.
193+
// To install such a handler efficiently, without creating an extra object, we allow storing
194+
// segments as cancellation handlers in [CancellableContinuationImpl] state, putting the slot
195+
// index in another field. The details are here: https://github.com/Kotlin/kotlinx.coroutines/pull/3084.
196+
NotCompleted
197+
{
190198
/**
191199
* This property should return the maximal number of slots in this segment,
192200
* it is used to define whether the segment is logically removed.
@@ -211,6 +219,13 @@ internal abstract class Segment<S : Segment<S>>(val id: Long, prev: S?, pointers
211219
// returns `true` if this segment is logically removed after the decrement.
212220
internal fun decPointers() = cleanedAndPointers.addAndGet(-(1 shl POINTERS_SHIFT)) == maxSlots && !isTail
213221

222+
/**
223+
* This function is invoked on continuation cancellation when this segment
224+
* with the specified [index] are installed as cancellation handler via
225+
* `CancellableContinuationImpl.invokeOnCancellation(Segment, Int)`.
226+
*/
227+
internal open fun invokeOnCancellation(index: Int, cause: Throwable?) {}
228+
214229
/**
215230
* Invoked on each slot clean-up; should not be invoked twice for the same slot.
216231
*/

kotlinx-coroutines-core/common/src/sync/Semaphore.kt

+3-13
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,8 @@ private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Se
198198
val i = (enqIdx % SEGMENT_SIZE).toInt()
199199
// the regular (fast) path -- if the cell is empty, try to install continuation
200200
if (segment.cas(i, null, cont)) { // installed continuation successfully
201-
cont.invokeOnCancellation(CancelSemaphoreAcquisitionHandler(segment, i).asHandler)
201+
cont as CancellableContinuationImpl<*>
202+
cont.invokeOnCancellation(segment, i)
202203
return true
203204
}
204205
// On CAS failure -- the cell must be either PERMIT or BROKEN
@@ -245,17 +246,6 @@ private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Se
245246
}
246247
}
247248

248-
private class CancelSemaphoreAcquisitionHandler(
249-
private val segment: SemaphoreSegment,
250-
private val index: Int
251-
) : CancelHandler() {
252-
override fun invoke(cause: Throwable?) {
253-
segment.cancel(index)
254-
}
255-
256-
override fun toString() = "CancelSemaphoreAcquisitionHandler[$segment, $index]"
257-
}
258-
259249
private fun createSegment(id: Long, prev: SemaphoreSegment?) = SemaphoreSegment(id, prev, 0)
260250

261251
private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?, pointers: Int) : Segment<SemaphoreSegment>(id, prev, pointers) {
@@ -278,7 +268,7 @@ private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?, pointers: Int)
278268

279269
// Cleans the acquirer slot located by the specified index
280270
// and removes this segment physically if all slots are cleaned.
281-
fun cancel(index: Int) {
271+
override fun invokeOnCancellation(index: Int, cause: Throwable?) {
282272
// Clean the slot
283273
set(index, CANCELLED)
284274
// Remove this segment if needed

kotlinx-coroutines-core/common/test/CancellableContinuationHandlersTest.kt

+28
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
package kotlinx.coroutines
88

9+
import kotlinx.coroutines.internal.*
910
import kotlin.coroutines.*
1011
import kotlin.test.*
1112

@@ -159,4 +160,31 @@ class CancellableContinuationHandlersTest : TestBase() {
159160
}
160161
finish(3)
161162
}
163+
164+
@Test
165+
fun testSegmentAsHandler() = runTest {
166+
class MySegment : Segment<MySegment>(0, null, 0) {
167+
override val maxSlots: Int get() = 0
168+
169+
var invokeOnCancellationCalled = false
170+
override fun invokeOnCancellation(index: Int, cause: Throwable?) {
171+
invokeOnCancellationCalled = true
172+
}
173+
}
174+
val s = MySegment()
175+
expect(1)
176+
try {
177+
suspendCancellableCoroutine<Unit> { c ->
178+
expect(2)
179+
c as CancellableContinuationImpl<*>
180+
c.invokeOnCancellation(s, 0)
181+
c.cancel()
182+
}
183+
} catch (e: CancellationException) {
184+
expect(3)
185+
}
186+
expect(4)
187+
check(s.invokeOnCancellationCalled)
188+
finish(5)
189+
}
162190
}

0 commit comments

Comments
 (0)