-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathChannelProducerConsumerBenchmark.kt
150 lines (130 loc) · 4.77 KB
/
ChannelProducerConsumerBenchmark.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package benchmarks
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.selects.select
import org.openjdk.jmh.annotations.*
import org.openjdk.jmh.infra.Blackhole
import java.lang.Integer.max
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.Phaser
import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.TimeUnit
/**
* Benchmark to measure channel algorithm performance in terms of average time per `send-receive` pair;
* actually, it measures the time for a batch of such operations separated into the specified number of consumers/producers.
* It uses different channels (rendezvous, buffered, unlimited; see [ChannelCreator]) and different dispatchers
* (see [DispatcherCreator]). If the [_3_withSelect] property is set, it invokes `send` and
* `receive` via [select], waiting on a local dummy channel simultaneously, simulating a "cancellation" channel.
*
* Please, be patient, this benchmark takes quite a lot of time to complete.
*/
@Warmup(iterations = 3, time = 500, timeUnit = TimeUnit.MICROSECONDS)
@Measurement(iterations = 10, time = 500, timeUnit = TimeUnit.MICROSECONDS)
@Fork(value = 3)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
open class ChannelProducerConsumerBenchmark {
@Param
private var _0_dispatcher: DispatcherCreator = DispatcherCreator.FORK_JOIN
@Param
private var _1_channel: ChannelCreator = ChannelCreator.RENDEZVOUS
@Param("0", "1000")
private var _2_coroutines: Int = 0
@Param("false", "true")
private var _3_withSelect: Boolean = false
@Param("1", "2", "4") // local machine
// @Param("1", "2", "4", "8", "12") // local machine
// @Param("1", "2", "4", "8", "16", "32", "64", "128", "144") // dasquad
// @Param("1", "2", "4", "8", "16", "32", "64", "96") // Google Cloud
private var _4_parallelism: Int = 0
private lateinit var dispatcher: CoroutineDispatcher
private lateinit var channel: Channel<Int>
@InternalCoroutinesApi
@Setup
fun setup() {
dispatcher = _0_dispatcher.create(_4_parallelism)
channel = _1_channel.create()
}
@Benchmark
fun spmc() {
if (_2_coroutines != 0) return
val producers = max(1, _4_parallelism - 1)
val consumers = 1
run(producers, consumers)
}
@Benchmark
fun mpmc() {
val producers = if (_2_coroutines == 0) (_4_parallelism + 1) / 2 else _2_coroutines / 2
val consumers = producers
run(producers, consumers)
}
private fun run(producers: Int, consumers: Int) {
val n = APPROX_BATCH_SIZE / producers * producers
val phaser = Phaser(producers + consumers + 1)
// Run producers
repeat(producers) {
GlobalScope.launch(dispatcher) {
val dummy = if (_3_withSelect) _1_channel.create() else null
repeat(n / producers) {
produce(it, dummy)
}
phaser.arrive()
}
}
// Run consumers
repeat(consumers) {
GlobalScope.launch(dispatcher) {
val dummy = if (_3_withSelect) _1_channel.create() else null
repeat(n / consumers) {
consume(dummy)
}
phaser.arrive()
}
}
// Wait until work is done
phaser.arriveAndAwaitAdvance()
}
private suspend fun produce(element: Int, dummy: Channel<Int>?) {
if (_3_withSelect) {
select<Unit> {
channel.onSend(element) {}
dummy!!.onReceive {}
}
} else {
channel.send(element)
}
doWork()
}
private suspend fun consume(dummy: Channel<Int>?) {
if (_3_withSelect) {
select<Unit> {
channel.onReceive {}
dummy!!.onReceive {}
}
} else {
channel.receive()
}
doWork()
}
}
enum class DispatcherCreator(val create: (parallelism: Int) -> CoroutineDispatcher) {
FORK_JOIN({ parallelism -> ForkJoinPool(parallelism).asCoroutineDispatcher() })
}
enum class ChannelCreator(private val capacity: Int) {
RENDEZVOUS(Channel.RENDEZVOUS),
// BUFFERED_1(1),
BUFFERED_2(2),
// BUFFERED_4(4),
BUFFERED_32(32),
BUFFERED_128(128),
BUFFERED_UNLIMITED(Channel.UNLIMITED);
fun create(): Channel<Int> = Channel(capacity)
}
private fun doWork(): Unit = Blackhole.consumeCPU(ThreadLocalRandom.current().nextLong(WORK_MIN, WORK_MAX))
private const val WORK_MIN = 50L
private const val WORK_MAX = 100L
private const val APPROX_BATCH_SIZE = 100000