Skip to content

Commit 1ea4da4

Browse files
committed
Merge branch 'develop' into debugger-sm-bypass
# Conflicts: # kotlinx-coroutines-core/common/src/Timeout.kt
2 parents 2496dc2 + 74b250f commit 1ea4da4

File tree

100 files changed

+1591
-693
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

100 files changed

+1591
-693
lines changed

README.md

+4-4
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
[![Download](https://api.bintray.com/packages/kotlin/kotlinx/kotlinx.coroutines/images/download.svg?version=1.1.1) ](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines/1.1.1)
66

77
Library support for Kotlin coroutines with [multiplatform](#multiplatform) support.
8-
This is a companion version for Kotlin `1.3.20` release.
8+
This is a companion version for Kotlin `1.3.21` release.
99

1010
```kotlin
1111
GlobalScope.launch {
@@ -83,7 +83,7 @@ And make sure that you use the latest Kotlin version:
8383

8484
```xml
8585
<properties>
86-
<kotlin.version>1.3.20</kotlin.version>
86+
<kotlin.version>1.3.21</kotlin.version>
8787
</properties>
8888
```
8989

@@ -101,7 +101,7 @@ And make sure that you use the latest Kotlin version:
101101

102102
```groovy
103103
buildscript {
104-
ext.kotlin_version = '1.3.20'
104+
ext.kotlin_version = '1.3.21'
105105
}
106106
```
107107

@@ -127,7 +127,7 @@ And make sure that you use the latest Kotlin version:
127127

128128
```groovy
129129
plugins {
130-
kotlin("jvm") version "1.3.20"
130+
kotlin("jvm") version "1.3.21"
131131
}
132132
```
133133

benchmarks/build.gradle

+5
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,13 @@ repositories {
1212

1313
jmh.jmhVersion = '1.21'
1414

15+
// It is better to use the following to run benchmarks, otherwise you may get unexpected errors:
16+
// ../gradlew --no-daemon cleanJmhJar jmh
1517
jmh {
1618
duplicateClassesStrategy DuplicatesStrategy.INCLUDE
19+
failOnError = true
20+
resultFormat = 'CSV'
21+
// include = ['.*ChannelProducerConsumer.*']
1722
}
1823

1924
jmhJar {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
package benchmarks
2+
3+
import kotlinx.coroutines.*
4+
import kotlinx.coroutines.channels.Channel
5+
import kotlinx.coroutines.selects.select
6+
import org.openjdk.jmh.annotations.*
7+
import org.openjdk.jmh.infra.Blackhole
8+
import java.lang.Integer.max
9+
import java.util.concurrent.ForkJoinPool
10+
import java.util.concurrent.Phaser
11+
import java.util.concurrent.ThreadLocalRandom
12+
import java.util.concurrent.TimeUnit
13+
14+
15+
/**
16+
* Benchmark to measure channel algorithm performance in terms of average time per `send-receive` pair;
17+
* actually, it measures the time for a batch of such operations separated into the specified number of consumers/producers.
18+
* It uses different channels (rendezvous, buffered, unlimited; see [ChannelCreator]) and different dispatchers
19+
* (see [DispatcherCreator]). If the [_3_withSelect] property is set, it invokes `send` and
20+
* `receive` via [select], waiting on a local dummy channel simultaneously, simulating a "cancellation" channel.
21+
*
22+
* Please, be patient, this benchmark takes quite a lot of time to complete.
23+
*/
24+
@Warmup(iterations = 3, time = 500, timeUnit = TimeUnit.MICROSECONDS)
25+
@Measurement(iterations = 10, time = 500, timeUnit = TimeUnit.MICROSECONDS)
26+
@Fork(value = 3)
27+
@BenchmarkMode(Mode.AverageTime)
28+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
29+
@State(Scope.Benchmark)
30+
open class ChannelProducerConsumerBenchmark {
31+
@Param
32+
private var _0_dispatcher: DispatcherCreator = DispatcherCreator.FORK_JOIN
33+
34+
@Param
35+
private var _1_channel: ChannelCreator = ChannelCreator.RENDEZVOUS
36+
37+
@Param("0", "1000")
38+
private var _2_coroutines: Int = 0
39+
40+
@Param("false", "true")
41+
private var _3_withSelect: Boolean = false
42+
43+
@Param("1", "2", "4") // local machine
44+
// @Param("1", "2", "4", "8", "12") // local machine
45+
// @Param("1", "2", "4", "8", "16", "32", "64", "128", "144") // dasquad
46+
// @Param("1", "2", "4", "8", "16", "32", "64", "96") // Google Cloud
47+
private var _4_parallelism: Int = 0
48+
49+
private lateinit var dispatcher: CoroutineDispatcher
50+
private lateinit var channel: Channel<Int>
51+
52+
@InternalCoroutinesApi
53+
@Setup
54+
fun setup() {
55+
dispatcher = _0_dispatcher.create(_4_parallelism)
56+
channel = _1_channel.create()
57+
}
58+
59+
@Benchmark
60+
fun spmc() {
61+
if (_2_coroutines != 0) return
62+
val producers = max(1, _4_parallelism - 1)
63+
val consumers = 1
64+
run(producers, consumers)
65+
}
66+
67+
@Benchmark
68+
fun mpmc() {
69+
val producers = if (_2_coroutines == 0) (_4_parallelism + 1) / 2 else _2_coroutines / 2
70+
val consumers = producers
71+
run(producers, consumers)
72+
}
73+
74+
private fun run(producers: Int, consumers: Int) {
75+
val n = APPROX_BATCH_SIZE / producers * producers
76+
val phaser = Phaser(producers + consumers + 1)
77+
// Run producers
78+
repeat(producers) {
79+
GlobalScope.launch(dispatcher) {
80+
val dummy = if (_3_withSelect) _1_channel.create() else null
81+
repeat(n / producers) {
82+
produce(it, dummy)
83+
}
84+
phaser.arrive()
85+
}
86+
}
87+
// Run consumers
88+
repeat(consumers) {
89+
GlobalScope.launch(dispatcher) {
90+
val dummy = if (_3_withSelect) _1_channel.create() else null
91+
repeat(n / consumers) {
92+
consume(dummy)
93+
}
94+
phaser.arrive()
95+
}
96+
}
97+
// Wait until work is done
98+
phaser.arriveAndAwaitAdvance()
99+
}
100+
101+
private suspend fun produce(element: Int, dummy: Channel<Int>?) {
102+
if (_3_withSelect) {
103+
select<Unit> {
104+
channel.onSend(element) {}
105+
dummy!!.onReceive {}
106+
}
107+
} else {
108+
channel.send(element)
109+
}
110+
doWork()
111+
}
112+
113+
private suspend fun consume(dummy: Channel<Int>?) {
114+
if (_3_withSelect) {
115+
select<Unit> {
116+
channel.onReceive {}
117+
dummy!!.onReceive {}
118+
}
119+
} else {
120+
channel.receive()
121+
}
122+
doWork()
123+
}
124+
}
125+
126+
enum class DispatcherCreator(val create: (parallelism: Int) -> CoroutineDispatcher) {
127+
FORK_JOIN({ parallelism -> ForkJoinPool(parallelism).asCoroutineDispatcher() })
128+
}
129+
130+
enum class ChannelCreator(private val capacity: Int) {
131+
RENDEZVOUS(Channel.RENDEZVOUS),
132+
// BUFFERED_1(1),
133+
BUFFERED_2(2),
134+
// BUFFERED_4(4),
135+
BUFFERED_32(32),
136+
BUFFERED_128(128),
137+
BUFFERED_UNLIMITED(Channel.UNLIMITED);
138+
139+
fun create(): Channel<Int> = Channel(capacity)
140+
}
141+
142+
private fun doWork(): Unit = Blackhole.consumeCPU(ThreadLocalRandom.current().nextLong(WORK_MIN, WORK_MAX))
143+
144+
private const val WORK_MIN = 50L
145+
private const val WORK_MAX = 100L
146+
private const val APPROX_BATCH_SIZE = 100000

0 commit comments

Comments
 (0)