Skip to content

Commit acf9dac

Browse files
committed
~update benchmarks
1 parent a17eaec commit acf9dac

File tree

2 files changed

+93
-2
lines changed

2 files changed

+93
-2
lines changed

benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkBenchmark.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import org.openjdk.jmh.annotations.*
1010
import java.util.concurrent.*
1111
import kotlin.coroutines.*
1212

13-
@Warmup(iterations = 5, time = 1)
13+
@Warmup(iterations = 7, time = 1)
1414
@Measurement(iterations = 5, time = 1)
1515
@BenchmarkMode(Mode.AverageTime)
1616
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@@ -41,7 +41,7 @@ open class ChannelSinkBenchmark {
4141

4242
private suspend inline fun run(context: CoroutineContext): Int {
4343
return Channel
44-
.range(1, 1_000_000, context)
44+
.range(1, 10_000, context)
4545
.filter(context) { it % 4 == 0 }
4646
.fold(0) { a, b -> a + b }
4747
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package benchmarks
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.channels.*
9+
import org.openjdk.jmh.annotations.*
10+
import java.util.concurrent.*
11+
import kotlin.coroutines.*
12+
13+
@Warmup(iterations = 7, time = 1)
14+
@Measurement(iterations = 5, time = 1)
15+
@BenchmarkMode(Mode.AverageTime)
16+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
17+
@State(Scope.Benchmark)
18+
@Fork(2)
19+
open class ChannelSinkDepthBenchmark {
20+
private val tl = ThreadLocal.withInitial({ 42 })
21+
22+
private val unconfinedOneElement = Dispatchers.Unconfined + tl.asContextElement()
23+
24+
@Benchmark
25+
fun depth1(): Int = runBlocking {
26+
run(1, unconfinedOneElement)
27+
}
28+
29+
@Benchmark
30+
fun depth10(): Int = runBlocking {
31+
run(10, unconfinedOneElement)
32+
}
33+
34+
@Benchmark
35+
fun depth100(): Int = runBlocking {
36+
run(100, unconfinedOneElement)
37+
}
38+
39+
@Benchmark
40+
fun depth1000(): Int = runBlocking {
41+
run(1000, unconfinedOneElement)
42+
}
43+
44+
private suspend inline fun run(callTraceDepth: Int, context: CoroutineContext): Int {
45+
return Channel
46+
.range(1, 10_000, context)
47+
.filter(callTraceDepth, context) { it % 4 == 0 }
48+
.fold(0) { a, b -> a + b }
49+
}
50+
51+
private fun Channel.Factory.range(start: Int, count: Int, context: CoroutineContext) =
52+
GlobalScope.produce(context) {
53+
for (i in start until (start + count))
54+
send(i)
55+
}
56+
57+
// Migrated from deprecated operators, are good only for stressing channels
58+
59+
private fun ReceiveChannel<Int>.filter(
60+
callTraceDepth: Int,
61+
context: CoroutineContext = Dispatchers.Unconfined,
62+
predicate: suspend (Int) -> Boolean
63+
): ReceiveChannel<Int> =
64+
GlobalScope.produce(context, onCompletion = { cancel() }) {
65+
deeplyNestedFilter(this, callTraceDepth, predicate)
66+
}
67+
68+
private suspend fun ReceiveChannel<Int>.deeplyNestedFilter(
69+
sink: ProducerScope<Int>,
70+
depth: Int,
71+
predicate: suspend (Int) -> Boolean
72+
) {
73+
if (depth <= 1) {
74+
for (e in this) {
75+
if (predicate(e)) sink.send(e)
76+
}
77+
} else {
78+
deeplyNestedFilter(sink, depth - 1, predicate)
79+
require(true) // tail-call
80+
}
81+
}
82+
83+
private suspend inline fun <E, R> ReceiveChannel<E>.fold(initial: R, operation: (acc: R, E) -> R): R {
84+
var accumulator = initial
85+
consumeEach {
86+
accumulator = operation(accumulator, it)
87+
}
88+
return accumulator
89+
}
90+
}
91+

0 commit comments

Comments
 (0)