Skip to content

Commit 01c4e70

Browse files
committed
Introduce reusable cancellable continuations for hot loops with channels
* Average performance improvement is around 25% * API is internal and targeted to specific usage * DispatchedTask and DispatchedContinuation are extracted to separate files for better readability and maintainability * Flow benchmark (temporary) rewritten to avoid take
1 parent bda9c79 commit 01c4e70

16 files changed

+885
-298
lines changed

benchmarks/src/jmh/kotlin/benchmarks/CancellableContinuationBenchmark.kt

-53
This file was deleted.

benchmarks/src/jmh/kotlin/benchmarks/flow/NumbersBenchmark.kt

+6-23
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,6 @@ import kotlinx.coroutines.flow.*
1313
import org.openjdk.jmh.annotations.*
1414
import java.util.concurrent.*
1515

16-
/*
17-
* Results:
18-
*
19-
* // Throw FlowAborted overhead
20-
* Numbers.primes avgt 7 3039.185 ± 25.598 us/op
21-
* Numbers.primesRx avgt 7 2677.937 ± 17.720 us/op
22-
*
23-
* // On par
24-
* Numbers.transformations avgt 7 16.207 ± 0.133 us/op
25-
* Numbers.transformationsRx avgt 7 19.626 ± 0.135 us/op
26-
*
27-
* // Channels overhead
28-
* Numbers.zip avgt 7 434.160 ± 7.014 us/op
29-
* Numbers.zipRx avgt 7 87.898 ± 5.007 us/op
30-
*
31-
*/
3216
@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
3317
@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
3418
@Fork(value = 1)
@@ -39,11 +23,11 @@ open class NumbersBenchmark {
3923

4024
companion object {
4125
private const val primes = 100
42-
private const val natural = 1000
26+
private const val natural = 1000L
4327
}
4428

45-
private fun numbers() = flow {
46-
for (i in 2L..Long.MAX_VALUE) emit(i)
29+
private fun numbers(limit: Long = Long.MAX_VALUE) = flow {
30+
for (i in 2L..limit) emit(i)
4731
}
4832

4933
private fun primesFlow(): Flow<Long> = flow {
@@ -80,7 +64,7 @@ open class NumbersBenchmark {
8064

8165
@Benchmark
8266
fun zip() = runBlocking {
83-
val numbers = numbers().take(natural)
67+
val numbers = numbers(natural)
8468
val first = numbers
8569
.filter { it % 2L != 0L }
8670
.map { it * it }
@@ -105,8 +89,7 @@ open class NumbersBenchmark {
10589

10690
@Benchmark
10791
fun transformations(): Int = runBlocking {
108-
numbers()
109-
.take(natural)
92+
numbers(natural)
11093
.filter { it % 2L != 0L }
11194
.map { it * it }
11295
.filter { (it + 1) % 3 == 0L }.count()
@@ -120,4 +103,4 @@ open class NumbersBenchmark {
120103
.filter { (it + 1) % 3 == 0L }.count()
121104
.blockingGet()
122105
}
123-
}
106+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package benchmarks.tailcall
6+
7+
import kotlinx.coroutines.*
8+
import kotlin.coroutines.*
9+
import kotlin.coroutines.intrinsics.*
10+
11+
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
12+
public abstract class SimpleChannel {
13+
companion object {
14+
const val NULL_SURROGATE: Int = -1
15+
}
16+
17+
@JvmField
18+
protected var producer: Continuation<Unit>? = null
19+
@JvmField
20+
protected var enqueuedValue: Int = NULL_SURROGATE
21+
@JvmField
22+
protected var consumer: Continuation<Int>? = null
23+
24+
suspend fun send(element: Int) {
25+
require(element != NULL_SURROGATE)
26+
if (offer(element)) {
27+
return
28+
}
29+
30+
return suspendSend(element)
31+
}
32+
33+
private fun offer(element: Int): Boolean {
34+
if (consumer == null) {
35+
return false
36+
}
37+
38+
consumer!!.resume(element)
39+
consumer = null
40+
return true
41+
}
42+
43+
suspend fun receive(): Int {
44+
// Cached value
45+
if (enqueuedValue != NULL_SURROGATE) {
46+
val result = enqueuedValue
47+
enqueuedValue = NULL_SURROGATE
48+
producer!!.resume(Unit)
49+
return result
50+
}
51+
52+
return suspendReceive()
53+
}
54+
55+
abstract suspend fun suspendReceive(): Int
56+
abstract suspend fun suspendSend(element: Int)
57+
}
58+
59+
class NonCancellableChannel : SimpleChannel() {
60+
override suspend fun suspendReceive(): Int = suspendCoroutineUninterceptedOrReturn {
61+
consumer = it.intercepted()
62+
COROUTINE_SUSPENDED
63+
}
64+
65+
override suspend fun suspendSend(element: Int) = suspendCoroutineUninterceptedOrReturn<Unit> {
66+
enqueuedValue = element
67+
producer = it.intercepted()
68+
COROUTINE_SUSPENDED
69+
}
70+
}
71+
72+
class CancellableChannel : SimpleChannel() {
73+
override suspend fun suspendReceive(): Int = suspendAtomicCancellableCoroutine {
74+
consumer = it.intercepted()
75+
COROUTINE_SUSPENDED
76+
}
77+
78+
override suspend fun suspendSend(element: Int) = suspendAtomicCancellableCoroutine<Unit> {
79+
enqueuedValue = element
80+
producer = it.intercepted()
81+
COROUTINE_SUSPENDED
82+
}
83+
}
84+
85+
class CancellableReusableChannel : SimpleChannel() {
86+
override suspend fun suspendReceive(): Int = suspendAtomicCancellableCoroutineReusable {
87+
consumer = it.intercepted()
88+
COROUTINE_SUSPENDED
89+
}
90+
91+
override suspend fun suspendSend(element: Int) = suspendAtomicCancellableCoroutineReusable<Unit> {
92+
enqueuedValue = element
93+
producer = it.intercepted()
94+
COROUTINE_SUSPENDED
95+
}
96+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package benchmarks.tailcall
6+
7+
import kotlinx.coroutines.*
8+
import org.openjdk.jmh.annotations.*
9+
import java.util.concurrent.*
10+
11+
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
12+
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
13+
@Fork(value = 1)
14+
@BenchmarkMode(Mode.AverageTime)
15+
@OutputTimeUnit(TimeUnit.MICROSECONDS)
16+
@State(Scope.Benchmark)
17+
open class SimpleChannelBenchmark {
18+
19+
private val iterations = 10_000
20+
21+
@Volatile
22+
private var sink: Int = 0
23+
24+
@Benchmark
25+
fun cancellable() = runBlocking {
26+
val ch = CancellableChannel()
27+
launch {
28+
repeat(iterations) { ch.send(it) }
29+
}
30+
31+
launch {
32+
repeat(iterations) { sink = ch.receive() }
33+
}
34+
}
35+
36+
@Benchmark
37+
fun cancellableReusable() = runBlocking {
38+
val ch = CancellableReusableChannel()
39+
launch {
40+
repeat(iterations) { ch.send(it) }
41+
}
42+
43+
launch {
44+
repeat(iterations) { sink = ch.receive() }
45+
}
46+
}
47+
48+
@Benchmark
49+
fun nonCancellable() = runBlocking {
50+
val ch = NonCancellableChannel()
51+
launch {
52+
repeat(iterations) { ch.send(it) }
53+
}
54+
55+
launch {
56+
repeat(iterations) {
57+
sink = ch.receive()
58+
}
59+
}
60+
}
61+
}

kotlinx-coroutines-core/common/src/CancellableContinuation.kt

+36
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,42 @@ public suspend inline fun <T> suspendAtomicCancellableCoroutine(
223223
cancellable.getResult()
224224
}
225225

226+
/**
227+
* Suspends coroutine similar to [suspendAtomicCancellableCoroutine], but an instance of [CancellableContinuationImpl] is reused if possible.
228+
*/
229+
internal suspend inline fun <T> suspendAtomicCancellableCoroutineReusable(
230+
crossinline block: (CancellableContinuation<T>) -> Unit
231+
): T = suspendCoroutineUninterceptedOrReturn { uCont ->
232+
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted())
233+
block(cancellable)
234+
cancellable.getResult()
235+
}
236+
237+
internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>): CancellableContinuationImpl<T> {
238+
// If used outside of our dispatcher
239+
if (delegate !is DispatchedContinuation<T>) {
240+
return CancellableContinuationImpl(delegate, resumeMode = MODE_ATOMIC_DEFAULT)
241+
}
242+
243+
/*
244+
* Attempt to claim reusable instance.
245+
*
246+
* suspendAtomicCancellableCoroutineReusable { // <- claimed
247+
* // Any asynchronous cancellation is "postponed" while this block
248+
* // is being executed
249+
* } // postponed cancellation is checked here.
250+
*
251+
* Claim can fail for the following reasons:
252+
* 1) Someone tried to make idempotent resume.
253+
* Idempotent resume is internal (used only by us) and is used only in `select`,
254+
* thus leaking CC instance for indefinite time.
255+
* 2) Continuation was cancelled. Then we should prevent any further reuse and bail out.
256+
*/
257+
val claimed = delegate.claimReusableCancellableContinuation() ?: return CancellableContinuationImpl(delegate, MODE_ATOMIC_DEFAULT)
258+
claimed.resetState()
259+
return claimed
260+
}
261+
226262
/**
227263
* @suppress **Deprecated**
228264
*/

0 commit comments

Comments
 (0)