Skip to content

Commit 6af7a19

Browse files
committed
Provide suspendAtomicCancellableCoroutineReusable primitive, high-performance intrinsic which allows to reuse cancellation machinery and opens the way to garbage-free channels:
* Make AbstractContinuation reusable * Add additional field to DispatchedContinuation to cache cancellable continuation * Use suspendAtomicCancellableCoroutineReusable in channels (+15% of performance on ping-pong) * Detect calls to tryResume and forbid cancellation reusing in order to make channels work with selects * Fix benchmarks on 1.3 Fixes #534
1 parent 19f3032 commit 6af7a19

14 files changed

+430
-95
lines changed

benchmarks/build.gradle

-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,5 @@ dependencies {
2222
compile 'com.typesafe.akka:akka-actor_2.12:2.5.0'
2323
compile project(':kotlinx-coroutines-core-common')
2424
compile project(':kotlinx-coroutines-core')
25-
compile project(':kotlinx-coroutines-core').sourceSets.test.output
2625
compile "org.openjdk.jmh:jmh-core:1.21"
2726
}

benchmarks/src/jmh/kotlin/benchmarks/CancellableContinuationBenchmark.kt

+1-3
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,8 @@ open class CancellableContinuationBenchmark {
4747
override val context: CoroutineContext
4848
get() = EmptyCoroutineContext
4949

50-
override fun resume(value: Int) {
51-
}
50+
override fun resumeWith(result: Result<Int>) {
5251

53-
override fun resumeWithException(exception: Throwable) {
5452
}
5553
}
5654
}

benchmarks/src/jmh/kotlin/benchmarks/GuideSyncBenchmark.kt

-80
This file was deleted.

benchmarks/src/jmh/kotlin/benchmarks/actors/PingPongWithBlockingContext.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ open class PingPongWithBlockingContext {
5454

5555
private suspend fun runPingPongs(pingContext: CoroutineContext, pongContext: CoroutineContext) {
5656
val me = Channel<PingPongActorBenchmark.Letter>()
57-
val pong = pongActorCoroutine(pongContext)
58-
val ping = pingActorCoroutine(pingContext, pong)
57+
val pong = CoroutineScope(pongContext).pongActorCoroutine()
58+
val ping = CoroutineScope(pingContext).pingActorCoroutine(pong)
5959
ping.send(PingPongActorBenchmark.Letter(Start(), me))
6060

6161
me.receive()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package benchmarks.channels
6+
7+
import kotlinx.coroutines.*
8+
import kotlin.coroutines.*
9+
import kotlin.coroutines.intrinsics.*
10+
11+
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
12+
public class CancellableChannel {
13+
companion object {
14+
const val NULL_SURROGATE: Int = -1
15+
}
16+
17+
private var producer: Continuation<Unit>? = null
18+
private var enqueuedValue: Int = NULL_SURROGATE
19+
private var consumer: Continuation<Int>? = null
20+
21+
suspend fun send(element: Int) {
22+
require(element != NULL_SURROGATE)
23+
if (offer(element)) {
24+
return
25+
}
26+
27+
return suspendCancellableCoroutine {
28+
enqueuedValue = element
29+
producer = it.intercepted()
30+
COROUTINE_SUSPENDED
31+
}
32+
}
33+
34+
private fun offer(element: Int): Boolean {
35+
if (consumer == null) {
36+
return false
37+
}
38+
39+
consumer!!.resume(element)
40+
consumer = null
41+
return true
42+
}
43+
44+
suspend fun receive(): Int {
45+
// Cached value
46+
if (enqueuedValue != NULL_SURROGATE) {
47+
val result = enqueuedValue
48+
enqueuedValue = NULL_SURROGATE
49+
producer!!.resume(Unit)
50+
return result
51+
}
52+
53+
return suspendCancellableCoroutine {
54+
consumer = it.intercepted()
55+
COROUTINE_SUSPENDED
56+
}
57+
}
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package benchmarks.channels
6+
7+
import kotlin.coroutines.*
8+
import kotlin.coroutines.intrinsics.*
9+
10+
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
11+
public class NonCancellableChannel {
12+
companion object {
13+
const val NULL_SURROGATE: Int = -1
14+
}
15+
16+
private var producer: Continuation<Unit>? = null
17+
private var enqueuedValue: Int = NULL_SURROGATE
18+
private var consumer: Continuation<Int>? = null
19+
20+
suspend fun send(element: Int) {
21+
require(element != NULL_SURROGATE)
22+
if (offer(element)) {
23+
return
24+
}
25+
26+
return suspendCoroutineUninterceptedOrReturn {
27+
enqueuedValue = element
28+
producer = it.intercepted()
29+
COROUTINE_SUSPENDED
30+
}
31+
}
32+
33+
private fun offer(element: Int): Boolean {
34+
if (consumer == null) {
35+
return false
36+
}
37+
38+
consumer!!.resume(element)
39+
consumer = null
40+
return true
41+
}
42+
43+
suspend fun receive(): Int {
44+
// Cached value
45+
if (enqueuedValue != NULL_SURROGATE) {
46+
val result = enqueuedValue
47+
enqueuedValue = NULL_SURROGATE
48+
producer!!.resume(Unit)
49+
return result
50+
}
51+
52+
return suspendCoroutineUninterceptedOrReturn {
53+
consumer = it.intercepted()
54+
COROUTINE_SUSPENDED
55+
}
56+
}
57+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package benchmarks.channels
6+
7+
import kotlinx.coroutines.*
8+
import kotlin.coroutines.*
9+
import kotlin.coroutines.intrinsics.*
10+
11+
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
12+
public class ReusableCancellabilityChannel {
13+
companion object {
14+
const val NULL_SURROGATE: Int = -1
15+
}
16+
17+
private var producer: Continuation<Unit>? = null
18+
private var enqueuedValue: Int = NULL_SURROGATE
19+
20+
private var consumer: Continuation<Int>? = null
21+
22+
23+
suspend fun send(element: Int) {
24+
require(element != NULL_SURROGATE)
25+
if (offer(element)) {
26+
return
27+
}
28+
29+
return suspendAtomicCancellableCoroutineReusable {
30+
enqueuedValue = element
31+
producer = it.intercepted()
32+
}
33+
}
34+
35+
private fun offer(element: Int): Boolean {
36+
if (consumer == null) {
37+
return false
38+
}
39+
40+
consumer!!.resume(element)
41+
consumer = null
42+
return true
43+
}
44+
45+
suspend fun receive(): Int {
46+
// Cached value
47+
if (enqueuedValue != NULL_SURROGATE) {
48+
val result = enqueuedValue
49+
enqueuedValue = NULL_SURROGATE
50+
producer!!.resume(Unit)
51+
return result
52+
}
53+
54+
return suspendAtomicCancellableCoroutineReusable {
55+
consumer = it.intercepted()
56+
}
57+
}
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package benchmarks.channels
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.timeunit.*
9+
import org.openjdk.jmh.annotations.*
10+
11+
/*
12+
* Benchmark Mode Cnt Score Error Units
13+
* ReusableCancellableContinuationBenchmark.runCancellable avgt 5 2464.846 ± 24.565 us/op
14+
* ReusableCancellableContinuationBenchmark.runNonCancellable avgt 5 1112.962 ± 9.709 us/op
15+
* ReusableCancellableContinuationBenchmark.runReusable avgt 5 1615.746 ± 30.324 us/op
16+
*/
17+
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
18+
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
19+
@Fork(value = 1)
20+
@BenchmarkMode(Mode.AverageTime)
21+
@OutputTimeUnit(TimeUnit.MICROSECONDS)
22+
@State(Scope.Benchmark)
23+
open class ReusableCancellableContinuationBenchmark {
24+
25+
private val iterations = 10_000
26+
27+
@Volatile
28+
private var sink: Int = 0
29+
30+
@Benchmark
31+
fun runCancellable() = runBlocking {
32+
val ch = CancellableChannel()
33+
async {
34+
repeat(iterations) { ch.send(it) }
35+
}
36+
37+
async {
38+
repeat(iterations) {
39+
sink = ch.receive()
40+
}
41+
}
42+
}
43+
44+
@Benchmark
45+
fun runNonCancellable() = runBlocking {
46+
val ch = NonCancellableChannel()
47+
async {
48+
repeat(iterations) { ch.send(it) }
49+
}
50+
51+
async {
52+
repeat(iterations) {
53+
sink = ch.receive()
54+
}
55+
}
56+
}
57+
58+
@Benchmark
59+
fun runReusable() = runBlocking {
60+
val ch = ReusableCancellabilityChannel()
61+
async {
62+
repeat(iterations) { ch.send(it) }
63+
}
64+
65+
async {
66+
repeat(iterations) {
67+
sink = ch.receive()
68+
}
69+
}
70+
71+
}
72+
}
73+

0 commit comments

Comments
 (0)