Skip to content

Commit 44a302e

Browse files
committed
Add a sequential semaphore benchmark and upgrade ChannelSinkBenchmark so that it supports buffered channels and pre-allocates elements.
Signed-off-by: Nikita Koval <[email protected]>
1 parent 9040230 commit 44a302e

File tree

2 files changed

+55
-6
lines changed

2 files changed

+55
-6
lines changed

benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkBenchmark.kt

+12-6
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ open class ChannelSinkBenchmark {
2424
private val unconfinedOneElement = Dispatchers.Unconfined + tl.asContextElement()
2525
private val unconfinedTwoElements = Dispatchers.Unconfined + tl.asContextElement() + tl2.asContextElement()
2626

27+
private val elements = (0 until N).toList()
28+
29+
@Param("0", "1", "8", "32")
30+
var channelCapacity = 0
31+
2732
@Benchmark
2833
fun channelPipeline(): Int = runBlocking {
2934
run(unconfined)
@@ -41,14 +46,14 @@ open class ChannelSinkBenchmark {
4146

4247
private suspend inline fun run(context: CoroutineContext): Int {
4348
return Channel
44-
.range(1, 10_000, context)
45-
.filter(context) { it % 4 == 0 }
46-
.fold(0) { a, b -> a + b }
49+
.range(context) // should not allocate `Int`s!
50+
.filter(context) { it % 4 == 0 } // should not allocate `Int`s!
51+
.fold(0) { a, b -> if (a % 8 == 0) a else b } // should not allocate `Int`s!
4752
}
4853

49-
private fun Channel.Factory.range(start: Int, count: Int, context: CoroutineContext) = GlobalScope.produce(context) {
50-
for (i in start until (start + count))
51-
send(i)
54+
private fun Channel.Factory.range(context: CoroutineContext) = GlobalScope.produce(context, capacity = channelCapacity) {
55+
for (i in 0 until N)
56+
send(elements[i]) // should not allocate `Int`s!
5257
}
5358

5459
// Migrated from deprecated operators, are good only for stressing channels
@@ -69,3 +74,4 @@ open class ChannelSinkBenchmark {
6974
}
7075
}
7176

77+
private const val N = 10_000
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package benchmarks
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.sync.*
9+
import org.openjdk.jmh.annotations.*
10+
import java.util.concurrent.TimeUnit
11+
import kotlin.test.*
12+
13+
@Warmup(iterations = 5, time = 1)
14+
@Measurement(iterations = 10, time = 1)
15+
@BenchmarkMode(Mode.AverageTime)
16+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
17+
@State(Scope.Benchmark)
18+
@Fork(1)
19+
open class SequentialSemaphoreAsMutexBenchmark {
20+
val s = Semaphore(1)
21+
22+
@Benchmark
23+
fun benchmark() : Unit = runBlocking {
24+
val s = Semaphore(permits = 1, acquiredPermits = 1)
25+
var step = 0
26+
launch(Dispatchers.Unconfined) {
27+
repeat(N) {
28+
assertEquals(it * 2, step)
29+
step++
30+
s.acquire()
31+
}
32+
}
33+
repeat(N) {
34+
assertEquals(it * 2 + 1, step)
35+
step++
36+
s.release()
37+
}
38+
}
39+
}
40+
41+
fun main() = SequentialSemaphoreAsMutexBenchmark().benchmark()
42+
43+
private val N = 1_000_000

0 commit comments

Comments
 (0)