Skip to content

Commit e86e192

Browse files
committed
Revert "Introduce reusable cancellable continuations for hot loops with channels (#1534)"
This reverts commit 946e578
1 parent d5da2e0 commit e86e192

20 files changed

+315
-1055
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package benchmarks
6+
7+
import kotlinx.coroutines.*
8+
import org.openjdk.jmh.annotations.*
9+
import java.util.concurrent.*
10+
import kotlin.coroutines.*
11+
import kotlin.coroutines.intrinsics.*
12+
13+
@Warmup(iterations = 5)
14+
@Measurement(iterations = 10)
15+
@BenchmarkMode(Mode.Throughput)
16+
@OutputTimeUnit(TimeUnit.MICROSECONDS)
17+
@State(Scope.Benchmark)
18+
@Fork(2)
19+
open class CancellableContinuationBenchmark {
20+
21+
@Benchmark
22+
fun awaitWithSuspension(): Int {
23+
val deferred = CompletableDeferred<Int>()
24+
return run(allowSuspend = true) { deferred.await() }
25+
}
26+
27+
@Benchmark
28+
fun awaitNoSuspension(): Int {
29+
val deferred = CompletableDeferred(1)
30+
return run { deferred.await() }
31+
}
32+
33+
private fun run(allowSuspend: Boolean = false, block: suspend () -> Int): Int {
34+
val value = block.startCoroutineUninterceptedOrReturn(EmptyContinuation)
35+
if (value === COROUTINE_SUSPENDED) {
36+
if (!allowSuspend) {
37+
throw IllegalStateException("Unexpected suspend")
38+
} else {
39+
return -1
40+
}
41+
}
42+
43+
return value as Int
44+
}
45+
46+
object EmptyContinuation : Continuation<Int> {
47+
override val context: CoroutineContext
48+
get() = EmptyCoroutineContext
49+
50+
override fun resumeWith(result: Result<Int>) {
51+
}
52+
}
53+
}

benchmarks/src/jmh/kotlin/benchmarks/flow/NumbersBenchmark.kt

+23-6
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,22 @@ import kotlinx.coroutines.flow.*
1313
import org.openjdk.jmh.annotations.*
1414
import java.util.concurrent.*
1515

16+
/*
17+
* Results:
18+
*
19+
* // Throw FlowAborted overhead
20+
* Numbers.primes avgt 7 3039.185 ± 25.598 us/op
21+
* Numbers.primesRx avgt 7 2677.937 ± 17.720 us/op
22+
*
23+
* // On par
24+
* Numbers.transformations avgt 7 16.207 ± 0.133 us/op
25+
* Numbers.transformationsRx avgt 7 19.626 ± 0.135 us/op
26+
*
27+
* // Channels overhead
28+
* Numbers.zip avgt 7 434.160 ± 7.014 us/op
29+
* Numbers.zipRx avgt 7 87.898 ± 5.007 us/op
30+
*
31+
*/
1632
@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
1733
@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
1834
@Fork(value = 1)
@@ -23,11 +39,11 @@ open class NumbersBenchmark {
2339

2440
companion object {
2541
private const val primes = 100
26-
private const val natural = 1000L
42+
private const val natural = 1000
2743
}
2844

29-
private fun numbers(limit: Long = Long.MAX_VALUE) = flow {
30-
for (i in 2L..limit) emit(i)
45+
private fun numbers() = flow {
46+
for (i in 2L..Long.MAX_VALUE) emit(i)
3147
}
3248

3349
private fun primesFlow(): Flow<Long> = flow {
@@ -64,7 +80,7 @@ open class NumbersBenchmark {
6480

6581
@Benchmark
6682
fun zip() = runBlocking {
67-
val numbers = numbers(natural)
83+
val numbers = numbers().take(natural)
6884
val first = numbers
6985
.filter { it % 2L != 0L }
7086
.map { it * it }
@@ -89,7 +105,8 @@ open class NumbersBenchmark {
89105

90106
@Benchmark
91107
fun transformations(): Int = runBlocking {
92-
numbers(natural)
108+
numbers()
109+
.take(natural)
93110
.filter { it % 2L != 0L }
94111
.map { it * it }
95112
.filter { (it + 1) % 3 == 0L }.count()
@@ -103,4 +120,4 @@ open class NumbersBenchmark {
103120
.filter { (it + 1) % 3 == 0L }.count()
104121
.blockingGet()
105122
}
106-
}
123+
}

benchmarks/src/jmh/kotlin/benchmarks/tailcall/SimpleChannel.kt

-98
This file was deleted.

benchmarks/src/jmh/kotlin/benchmarks/tailcall/SimpleChannelBenchmark.kt

-61
This file was deleted.

kotlinx-coroutines-core/common/src/CancellableContinuation.kt

+1-35
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines
@@ -223,40 +223,6 @@ public suspend inline fun <T> suspendAtomicCancellableCoroutine(
223223
cancellable.getResult()
224224
}
225225

226-
/**
227-
* Suspends coroutine similar to [suspendAtomicCancellableCoroutine], but an instance of [CancellableContinuationImpl] is reused if possible.
228-
*/
229-
internal suspend inline fun <T> suspendAtomicCancellableCoroutineReusable(
230-
crossinline block: (CancellableContinuation<T>) -> Unit
231-
): T = suspendCoroutineUninterceptedOrReturn { uCont ->
232-
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted())
233-
block(cancellable)
234-
cancellable.getResult()
235-
}
236-
237-
internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>): CancellableContinuationImpl<T> {
238-
// If used outside of our dispatcher
239-
if (delegate !is DispatchedContinuation<T>) {
240-
return CancellableContinuationImpl(delegate, resumeMode = MODE_ATOMIC_DEFAULT)
241-
}
242-
/*
243-
* Attempt to claim reusable instance.
244-
*
245-
* suspendAtomicCancellableCoroutineReusable { // <- claimed
246-
* // Any asynchronous cancellation is "postponed" while this block
247-
* // is being executed
248-
* } // postponed cancellation is checked here.
249-
*
250-
* Claim can fail for the following reasons:
251-
* 1) Someone tried to make idempotent resume.
252-
* Idempotent resume is internal (used only by us) and is used only in `select`,
253-
* thus leaking CC instance for indefinite time.
254-
* 2) Continuation was cancelled. Then we should prevent any further reuse and bail out.
255-
*/
256-
return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetState() }
257-
?: return CancellableContinuationImpl(delegate, MODE_ATOMIC_DEFAULT)
258-
}
259-
260226
/**
261227
* @suppress **Deprecated**
262228
*/

0 commit comments

Comments
 (0)