1
+ package benchmarks
2
+
3
+ import kotlinx.coroutines.*
4
+ import kotlinx.coroutines.channels.Channel
5
+ import kotlinx.coroutines.selects.select
6
+ import org.openjdk.jmh.annotations.*
7
+ import org.openjdk.jmh.infra.Blackhole
8
+ import java.lang.Integer.max
9
+ import java.util.concurrent.ForkJoinPool
10
+ import java.util.concurrent.Phaser
11
+ import java.util.concurrent.ThreadLocalRandom
12
+ import java.util.concurrent.TimeUnit
13
+
14
+
15
+ /* *
16
+ * Benchmark to measure channel algorithm performance in terms of average time per `send-receive` pair;
17
+ * actually, it measures the time for a batch of such operations separated into the specified number of consumers/producers.
18
+ * It uses different channels (rendezvous, buffered, unlimited; see [ChannelCreator]) and different dispatchers
19
+ * (see [DispatcherCreator]). If the [_3_withSelect] property is set, it invokes `send` and
20
+ * `receive` via [select], waiting on a local dummy channel simultaneously, simulating a "cancellation" channel.
21
+ *
22
+ * Please, be patient, this benchmark takes quite a lot of time to complete.
23
+ */
24
+ @Warmup(iterations = 3 , time = 500 , timeUnit = TimeUnit .MICROSECONDS )
25
+ @Measurement(iterations = 10 , time = 500 , timeUnit = TimeUnit .MICROSECONDS )
26
+ @Fork(value = 3 )
27
+ @BenchmarkMode(Mode .AverageTime )
28
+ @OutputTimeUnit(TimeUnit .MILLISECONDS )
29
+ @State(Scope .Benchmark )
30
+ open class ChannelProducerConsumerBenchmark {
31
+ @Param
32
+ private var _0_dispatcher : DispatcherCreator = DispatcherCreator .FORK_JOIN
33
+
34
+ @Param
35
+ private var _1_channel : ChannelCreator = ChannelCreator .RENDEZVOUS
36
+
37
+ @Param(" 0" , " 1000" )
38
+ private var _2_coroutines : Int = 0
39
+
40
+ @Param(" false" , " true" )
41
+ private var _3_withSelect : Boolean = false
42
+
43
+ @Param(" 1" , " 2" , " 4" ) // local machine
44
+ // @Param("1", "2", "4", "8", "12") // local machine
45
+ // @Param("1", "2", "4", "8", "16", "32", "64", "128", "144") // dasquad
46
+ // @Param("1", "2", "4", "8", "16", "32", "64", "96") // Google Cloud
47
+ private var _4_parallelism : Int = 0
48
+
49
+ private lateinit var dispatcher: CoroutineDispatcher
50
+ private lateinit var channel: Channel <Int >
51
+
52
+ @InternalCoroutinesApi
53
+ @Setup
54
+ fun setup () {
55
+ dispatcher = _0_dispatcher .create(_4_parallelism )
56
+ channel = _1_channel .create()
57
+ }
58
+
59
+ @Benchmark
60
+ fun spmc () {
61
+ if (_2_coroutines != 0 ) return
62
+ val producers = max(1 , _4_parallelism - 1 )
63
+ val consumers = 1
64
+ run (producers, consumers)
65
+ }
66
+
67
+ @Benchmark
68
+ fun mpmc () {
69
+ val producers = if (_2_coroutines == 0 ) (_4_parallelism + 1 ) / 2 else _2_coroutines / 2
70
+ val consumers = producers
71
+ run (producers, consumers)
72
+ }
73
+
74
+ private fun run (producers : Int , consumers : Int ) {
75
+ val n = APPROX_BATCH_SIZE / producers * producers
76
+ val phaser = Phaser (producers + consumers + 1 )
77
+ // Run producers
78
+ repeat(producers) {
79
+ GlobalScope .launch(dispatcher) {
80
+ val dummy = if (_3_withSelect ) _1_channel .create() else null
81
+ repeat(n / producers) {
82
+ produce(it, dummy)
83
+ }
84
+ phaser.arrive()
85
+ }
86
+ }
87
+ // Run consumers
88
+ repeat(consumers) {
89
+ GlobalScope .launch(dispatcher) {
90
+ val dummy = if (_3_withSelect ) _1_channel .create() else null
91
+ repeat(n / consumers) {
92
+ consume(dummy)
93
+ }
94
+ phaser.arrive()
95
+ }
96
+ }
97
+ // Wait until work is done
98
+ phaser.arriveAndAwaitAdvance()
99
+ }
100
+
101
+ private suspend fun produce (element : Int , dummy : Channel <Int >? ) {
102
+ if (_3_withSelect ) {
103
+ select<Unit > {
104
+ channel.onSend(element) {}
105
+ dummy!! .onReceive {}
106
+ }
107
+ } else {
108
+ channel.send(element)
109
+ }
110
+ doWork()
111
+ }
112
+
113
+ private suspend fun consume (dummy : Channel <Int >? ) {
114
+ if (_3_withSelect ) {
115
+ select<Unit > {
116
+ channel.onReceive {}
117
+ dummy!! .onReceive {}
118
+ }
119
+ } else {
120
+ channel.receive()
121
+ }
122
+ doWork()
123
+ }
124
+ }
125
+
126
+ enum class DispatcherCreator (val create : (parallelism: Int ) -> CoroutineDispatcher ) {
127
+ FORK_JOIN ({ parallelism -> ForkJoinPool (parallelism).asCoroutineDispatcher() })
128
+ }
129
+
130
+ enum class ChannelCreator (private val capacity : Int ) {
131
+ RENDEZVOUS (Channel .RENDEZVOUS ),
132
+ // BUFFERED_1(1),
133
+ BUFFERED_2 (2 ),
134
+ // BUFFERED_4(4),
135
+ BUFFERED_32 (32 ),
136
+ BUFFERED_128 (128 ),
137
+ BUFFERED_UNLIMITED (Channel .UNLIMITED );
138
+
139
+ fun create (): Channel <Int > = Channel (capacity)
140
+ }
141
+
142
+ private fun doWork (): Unit = Blackhole .consumeCPU(ThreadLocalRandom .current().nextLong(WORK_MIN , WORK_MAX ))
143
+
144
+ private const val WORK_MIN = 50L
145
+ private const val WORK_MAX = 100L
146
+ private const val APPROX_BATCH_SIZE = 100000
0 commit comments