Skip to content

Commit ac466ec

Browse files
elizarovqwwdfsad
authored andcommitted
Coroutines guide and IU guide update for stuctured concurrency
* Updated binary compatibility data * CommonPool references removed * Deprecated builders removed * Marked channels and select expressions as experimental in text
1 parent 833229f commit ac466ec

File tree

127 files changed

+1698
-1158
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

127 files changed

+1698
-1158
lines changed

benchmarks/src/jmh/kotlin/benchmarks/ForkJoinBenchmark.kt

+6-11
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,9 @@
44

55
package benchmarks
66

7-
import benchmarks.ForkJoinBenchmark.Companion.BATCH_SIZE
8-
import kotlinx.coroutines.experimental.CommonPool
9-
import kotlinx.coroutines.experimental.Deferred
10-
import kotlinx.coroutines.experimental.async
11-
import kotlinx.coroutines.experimental.runBlocking
7+
import kotlinx.coroutines.experimental.*
128
import org.openjdk.jmh.annotations.*
139
import java.util.concurrent.*
14-
import kotlin.coroutines.experimental.CoroutineContext
1510

1611
/*
1712
* Comparison of fork-join tasks using specific FJP API and classic [async] jobs.
@@ -59,12 +54,12 @@ open class ForkJoinBenchmark : ParametrizedDispatcherBase() {
5954

6055
@Benchmark
6156
fun asyncFjp() = runBlocking {
62-
startAsync(coefficients, 0, coefficients.size, CommonPool).await()
57+
CoroutineScope(CommonPool).startAsync(coefficients, 0, coefficients.size).await()
6358
}
6459

6560
@Benchmark
6661
fun asyncExperimental() = runBlocking {
67-
startAsync(coefficients, 0, coefficients.size, benchmarkContext).await()
62+
startAsync(coefficients, 0, coefficients.size).await()
6863
}
6964

7065
@Benchmark
@@ -79,12 +74,12 @@ open class ForkJoinBenchmark : ParametrizedDispatcherBase() {
7974
return ForkJoinPool.commonPool().submit(task).join()
8075
}
8176

82-
suspend fun startAsync(coefficients: LongArray, start: Int, end: Int, dispatcher: CoroutineContext): Deferred<Double> = async(dispatcher) {
77+
suspend fun CoroutineScope.startAsync(coefficients: LongArray, start: Int, end: Int): Deferred<Double> = async {
8378
if (end - start <= BATCH_SIZE) {
8479
compute(coefficients, start, end)
8580
} else {
86-
val first = startAsync(coefficients, start, start + (end - start) / 2, dispatcher)
87-
val second = startAsync(coefficients, start + (end - start) / 2, end, dispatcher)
81+
val first = startAsync(coefficients, start, start + (end - start) / 2)
82+
val second = startAsync(coefficients, start + (end - start) / 2, end)
8883
first.await() + second.await()
8984
}
9085
}

benchmarks/src/jmh/kotlin/benchmarks/LaunchBenchmark.kt

+4-5
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@
44

55
package benchmarks
66

7-
import kotlinx.coroutines.experimental.launch
7+
import kotlinx.coroutines.experimental.*
88
import org.openjdk.jmh.annotations.*
9-
import java.util.concurrent.CyclicBarrier
10-
import java.util.concurrent.TimeUnit
9+
import java.util.concurrent.*
1110

1211
/*
1312
* Benchmark to measure scheduling overhead in comparison with FJP.
@@ -34,13 +33,13 @@ open class LaunchBenchmark : ParametrizedDispatcherBase() {
3433
@Benchmark
3534
fun massiveLaunch() {
3635
repeat(submitters) {
37-
launch(benchmarkContext) {
36+
launch {
3837
// Wait until all cores are occupied
3938
allLaunched.await()
4039
allLaunched.reset()
4140

4241
(1..jobsToLaunch).map {
43-
launch(coroutineContext) {
42+
launch {
4443
// do nothing
4544
}
4645
}.map { it.join() }

benchmarks/src/jmh/kotlin/benchmarks/ParametrizedDispatcherBase.kt

+4-6
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@
55
package benchmarks
66

77
import benchmarks.actors.CORES_COUNT
8-
import kotlinx.coroutines.experimental.CommonPool
9-
import kotlinx.coroutines.experimental.ThreadPoolDispatcher
10-
import kotlinx.coroutines.experimental.newFixedThreadPoolContext
8+
import kotlinx.coroutines.experimental.*
119
import kotlinx.coroutines.experimental.scheduling.*
1210
import org.openjdk.jmh.annotations.Param
1311
import org.openjdk.jmh.annotations.Setup
@@ -19,15 +17,15 @@ import kotlin.coroutines.experimental.CoroutineContext
1917
* Base class to use different [CoroutineContext] in benchmarks via [Param] in inheritors.
2018
* Currently allowed values are "fjp" for [CommonPool] and ftp_n for [ThreadPoolDispatcher] with n threads.
2119
*/
22-
abstract class ParametrizedDispatcherBase {
20+
abstract class ParametrizedDispatcherBase : CoroutineScope {
2321

2422
abstract var dispatcher: String
25-
lateinit var benchmarkContext: CoroutineContext // coroutineContext clashes with scope parameter
23+
override lateinit var coroutineContext: CoroutineContext
2624
var closeable: Closeable? = null
2725

2826
@Setup
2927
open fun setup() {
30-
benchmarkContext = when {
28+
coroutineContext = when {
3129
dispatcher == "fjp" -> CommonPool
3230
dispatcher == "experimental" -> {
3331
ExperimentalCoroutineDispatcher(CORES_COUNT).also { closeable = it }

benchmarks/src/jmh/kotlin/benchmarks/StatefulAwaitsBenchmark.kt

+10-13
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,9 @@
55
package benchmarks
66

77
import kotlinx.coroutines.experimental.*
8-
import kotlinx.coroutines.experimental.channels.BroadcastChannel
9-
import kotlinx.coroutines.experimental.channels.Channel
8+
import kotlinx.coroutines.experimental.channels.*
109
import org.openjdk.jmh.annotations.*
11-
import java.util.concurrent.ThreadLocalRandom
12-
import java.util.concurrent.TimeUnit
13-
import kotlin.coroutines.experimental.CoroutineContext
10+
import java.util.concurrent.*
1411

1512
/*
1613
* Benchmark which launches multiple async jobs each with either own private or global shared state,
@@ -71,7 +68,7 @@ open class StatefulAsyncBenchmark : ParametrizedDispatcherBase() {
7168
fun independentStateAsync() = runBlocking {
7269
val broadcastChannel = BroadcastChannel<Int>(1)
7370
val subscriptionChannel = Channel<Int>(jobsCount)
74-
val jobs= (0 until jobsCount).map { launchJob(it, benchmarkContext, broadcastChannel, subscriptionChannel) }.toList()
71+
val jobs= (0 until jobsCount).map { launchJob(it, broadcastChannel, subscriptionChannel) }.toList()
7572

7673
repeat(jobsCount) {
7774
subscriptionChannel.receive() // await all jobs to start
@@ -86,7 +83,7 @@ open class StatefulAsyncBenchmark : ParametrizedDispatcherBase() {
8683
fun dependentStateAsync() = runBlocking {
8784
val broadcastChannel = BroadcastChannel<Int>(1)
8885
val subscriptionChannel = Channel<Int>(jobsCount)
89-
val jobs= (0 until jobsCount).map { launchJob(0, benchmarkContext, broadcastChannel, subscriptionChannel) }.toList()
86+
val jobs= (0 until jobsCount).map { launchJob(0, broadcastChannel, subscriptionChannel) }.toList()
9087

9188
repeat(jobsCount) {
9289
subscriptionChannel.receive() // await all jobs to start
@@ -97,10 +94,12 @@ open class StatefulAsyncBenchmark : ParametrizedDispatcherBase() {
9794
jobs.forEach { it.await() }
9895
}
9996

100-
private fun launchJob(stateNum: Int, dispatcher: CoroutineContext,
101-
channel: BroadcastChannel<Int>,
102-
subscriptionChannel: Channel<Int>): Deferred<Long> {
103-
return async(dispatcher) {
97+
private fun launchJob(
98+
stateNum: Int,
99+
channel: BroadcastChannel<Int>,
100+
subscriptionChannel: Channel<Int>
101+
): Deferred<Long> =
102+
async {
104103
val subscription = channel.openSubscription()
105104
subscriptionChannel.send(1)
106105
subscription.receive()
@@ -115,8 +114,6 @@ open class StatefulAsyncBenchmark : ParametrizedDispatcherBase() {
115114

116115
yield()
117116
}
118-
119117
sum
120118
}
121-
}
122119
}

benchmarks/src/jmh/kotlin/benchmarks/actors/ConcurrentStatefulActorBenchmark.kt

+53-50
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,12 @@
44

55
package benchmarks.actors
66

7-
import benchmarks.ParametrizedDispatcherBase
8-
import benchmarks.actors.StatefulActorBenchmark.Letter
9-
import kotlinx.coroutines.experimental.channels.Channel
10-
import kotlinx.coroutines.experimental.channels.SendChannel
11-
import kotlinx.coroutines.experimental.channels.actor
12-
import kotlinx.coroutines.experimental.runBlocking
7+
import benchmarks.*
8+
import benchmarks.actors.StatefulActorBenchmark.*
9+
import kotlinx.coroutines.experimental.*
10+
import kotlinx.coroutines.experimental.channels.*
1311
import org.openjdk.jmh.annotations.*
14-
import java.util.concurrent.ThreadLocalRandom
15-
import java.util.concurrent.TimeUnit
16-
import kotlin.coroutines.experimental.CoroutineContext
12+
import java.util.concurrent.*
1713

1814
/*
1915
* Noisy benchmarks useful to measure scheduling fairness and migration of affinity-sensitive tasks.
@@ -70,67 +66,74 @@ open class ConcurrentStatefulActorBenchmark : ParametrizedDispatcherBase() {
7066
@Benchmark
7167
fun multipleComputationsUnfair() = runBlocking {
7268
val resultChannel: Channel<Unit> = Channel(1)
73-
val computations = (0 until CORES_COUNT).map { computationActor(benchmarkContext, stateSize) }
74-
val requestor = requestorActorUnfair(benchmarkContext, computations, resultChannel)
69+
val computations = (0 until CORES_COUNT).map { computationActor(stateSize) }
70+
val requestor = requestorActorUnfair(computations, resultChannel)
7571
requestor.send(Letter(Start(), Channel(0)))
7672
resultChannel.receive()
7773
}
7874

7975
@Benchmark
8076
fun multipleComputationsFair() = runBlocking {
8177
val resultChannel: Channel<Unit> = Channel(1)
82-
val computations = (0 until CORES_COUNT).map { computationActor(benchmarkContext, stateSize) }
83-
val requestor = requestorActorFair(benchmarkContext, computations, resultChannel)
78+
val computations = (0 until CORES_COUNT).map { computationActor(stateSize) }
79+
val requestor = requestorActorFair(computations, resultChannel)
8480
requestor.send(Letter(Start(), Channel(0)))
8581
resultChannel.receive()
8682
}
8783

88-
fun requestorActorUnfair(context: CoroutineContext, computations: List<SendChannel<Letter>>,
89-
stopChannel: Channel<Unit>) = actor<Letter>(context, 1024) {
90-
var received = 0
91-
for (letter in channel) with(letter) {
92-
when (message) {
93-
is Start -> {
94-
computations.shuffled().forEach { it.send(Letter(ThreadLocalRandom.current().nextLong(), channel)) }
95-
}
96-
is Long -> {
97-
if (++received >= ROUNDS * 8) {
98-
stopChannel.send(Unit)
99-
return@actor
100-
} else {
101-
sender.send(Letter(ThreadLocalRandom.current().nextLong(), channel))
84+
fun requestorActorUnfair(
85+
computations: List<SendChannel<Letter>>,
86+
stopChannel: Channel<Unit>
87+
) =
88+
actor<Letter>(capacity = 1024) {
89+
var received = 0
90+
for (letter in channel) with(letter) {
91+
when (message) {
92+
is Start -> {
93+
computations.shuffled()
94+
.forEach { it.send(Letter(ThreadLocalRandom.current().nextLong(), channel)) }
10295
}
96+
is Long -> {
97+
if (++received >= ROUNDS * 8) {
98+
stopChannel.send(Unit)
99+
return@actor
100+
} else {
101+
sender.send(Letter(ThreadLocalRandom.current().nextLong(), channel))
102+
}
103+
}
104+
else -> error("Cannot happen: $letter")
103105
}
104-
else -> error("Cannot happen: $letter")
105106
}
106107
}
107-
}
108108

109+
fun requestorActorFair(
110+
computations: List<SendChannel<Letter>>,
111+
stopChannel: Channel<Unit>
112+
) =
113+
actor<Letter>(capacity = 1024) {
114+
val received = hashMapOf(*computations.map { it to 0 }.toTypedArray())
115+
var receivedTotal = 0
109116

110-
fun requestorActorFair(context: CoroutineContext, computations: List<SendChannel<Letter>>,
111-
stopChannel: Channel<Unit>) = actor<Letter>(context, 1024) {
112-
val received = hashMapOf(*computations.map { it to 0 }.toTypedArray())
113-
var receivedTotal = 0
114-
115-
for (letter in channel) with(letter) {
116-
when (message) {
117-
is Start -> {
118-
computations.shuffled().forEach { it.send(Letter(ThreadLocalRandom.current().nextLong(), channel)) }
119-
}
120-
is Long -> {
121-
if (++receivedTotal >= ROUNDS * computations.size) {
122-
stopChannel.send(Unit)
123-
return@actor
124-
} else {
125-
val receivedFromSender = received[sender]!!
126-
if (receivedFromSender <= ROUNDS) {
127-
received[sender] = receivedFromSender + 1
128-
sender.send(Letter(ThreadLocalRandom.current().nextLong(), channel))
117+
for (letter in channel) with(letter) {
118+
when (message) {
119+
is Start -> {
120+
computations.shuffled()
121+
.forEach { it.send(Letter(ThreadLocalRandom.current().nextLong(), channel)) }
122+
}
123+
is Long -> {
124+
if (++receivedTotal >= ROUNDS * computations.size) {
125+
stopChannel.send(Unit)
126+
return@actor
127+
} else {
128+
val receivedFromSender = received[sender]!!
129+
if (receivedFromSender <= ROUNDS) {
130+
received[sender] = receivedFromSender + 1
131+
sender.send(Letter(ThreadLocalRandom.current().nextLong(), channel))
132+
}
129133
}
130134
}
135+
else -> error("Cannot happen: $letter")
131136
}
132-
else -> error("Cannot happen: $letter")
133137
}
134138
}
135-
}
136139
}

benchmarks/src/jmh/kotlin/benchmarks/actors/CycledActorsBenchmark.kt

+7-10
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,12 @@
44

55
package benchmarks.actors
66

7-
import benchmarks.ParametrizedDispatcherBase
8-
import benchmarks.actors.PingPongActorBenchmark.Letter
9-
import kotlinx.coroutines.experimental.channels.Channel
10-
import kotlinx.coroutines.experimental.channels.SendChannel
11-
import kotlinx.coroutines.experimental.channels.actor
12-
import kotlinx.coroutines.experimental.runBlocking
7+
import benchmarks.*
8+
import benchmarks.actors.PingPongActorBenchmark.*
9+
import kotlinx.coroutines.experimental.*
10+
import kotlinx.coroutines.experimental.channels.*
1311
import org.openjdk.jmh.annotations.*
14-
import java.util.concurrent.ThreadLocalRandom
15-
import java.util.concurrent.TimeUnit
12+
import java.util.concurrent.*
1613

1714
/*
1815
* Cores count actors chained into single cycle pass message and process it using its private state.
@@ -68,7 +65,7 @@ open class CycledActorsBenchmark : ParametrizedDispatcherBase() {
6865
trailingActor.send(Letter(Start(), previous))
6966
}
7067

71-
private fun lastActor(stopChannel: Channel<Unit>) = actor<Letter>(benchmarkContext, capacity = 1024) {
68+
private fun lastActor(stopChannel: Channel<Unit>) = actor<Letter>(capacity = 1024) {
7269
var nextChannel: SendChannel<Letter>? = null
7370
val state = LongArray(actorStateSize) { ThreadLocalRandom.current().nextLong(1024) }
7471

@@ -90,7 +87,7 @@ open class CycledActorsBenchmark : ParametrizedDispatcherBase() {
9087
}
9188
}
9289

93-
private fun createActor(nextActor: SendChannel<Letter>, stopChannel: Channel<Unit>) = actor<Letter>(benchmarkContext, capacity = 1024) {
90+
private fun createActor(nextActor: SendChannel<Letter>, stopChannel: Channel<Unit>) = actor<Letter>(capacity = 1024) {
9491
var received = 0
9592
val state = LongArray(actorStateSize) { ThreadLocalRandom.current().nextLong(1024) }
9693

0 commit comments

Comments
 (0)