Skip to content

CoroutineScheduler rework #1652

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Dec 12, 2019
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package benchmarks

import kotlinx.coroutines.*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package benchmarks

import benchmarks.actors.CORES_COUNT
import benchmarks.akka.CORES_COUNT
import kotlinx.coroutines.*
import kotlinx.coroutines.scheduling.*
import org.openjdk.jmh.annotations.Param
Expand All @@ -22,14 +22,14 @@ abstract class ParametrizedDispatcherBase : CoroutineScope {

abstract var dispatcher: String
override lateinit var coroutineContext: CoroutineContext
var closeable: Closeable? = null
private var closeable: Closeable? = null

@UseExperimental(InternalCoroutinesApi::class)
@Setup
@UseExperimental(InternalCoroutinesApi::class)
open fun setup() {
coroutineContext = when {
dispatcher == "fjp" -> ForkJoinPool.commonPool().asCoroutineDispatcher()
dispatcher == "experimental" -> {
dispatcher == "scheduler" -> {
ExperimentalCoroutineDispatcher(CORES_COUNT).also { closeable = it }
}
dispatcher.startsWith("ftp") -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package benchmarks.actors
package benchmarks.akka

import akka.actor.ActorRef
import akka.actor.ActorSystem
Expand All @@ -13,7 +13,6 @@ import org.openjdk.jmh.annotations.*
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

const val N_MESSAGES = 100_000

Expand All @@ -29,12 +28,12 @@ class Stop
* PingPongAkkaBenchmark.singlePingPong default-dispatcher avgt 10 173.742 ± 41.984 ms/op
* PingPongAkkaBenchmark.singlePingPong single-thread-dispatcher avgt 10 24.181 ± 0.730 ms/op
*/
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(value = 2)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
//@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
//@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
//@Fork(value = 2)
//@BenchmarkMode(Mode.AverageTime)
//@OutputTimeUnit(TimeUnit.MILLISECONDS)
//@State(Scope.Benchmark)
open class PingPongAkkaBenchmark {

lateinit var system: ActorSystem
Expand Down Expand Up @@ -62,12 +61,12 @@ open class PingPongAkkaBenchmark {
Await.ready(system.terminate(), Duration.Inf())
}

@Benchmark
// @Benchmark
fun singlePingPong() {
runPingPongs(1)
}

@Benchmark
// @Benchmark
fun coresCountPingPongs() {
runPingPongs(Runtime.getRuntime().availableProcessors())
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package benchmarks.actors
package benchmarks.akka

import akka.actor.ActorRef
import akka.actor.ActorSystem
Expand All @@ -14,7 +14,6 @@ import scala.concurrent.Await
import scala.concurrent.duration.Duration
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.TimeUnit

const val ROUNDS = 10_000
const val STATE_SIZE = 1024
Expand All @@ -38,12 +37,12 @@ val CORES_COUNT = Runtime.getRuntime().availableProcessors()
* StatefulActorAkkaBenchmark.singleComputationSingleRequestor default-dispatcher avgt 14 39.964 ± 2.343 ms/op
* StatefulActorAkkaBenchmark.singleComputationSingleRequestor single-thread-dispatcher avgt 14 10.214 ± 2.152 ms/op
*/
@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(value = 2)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
//@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
//@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
//@Fork(value = 2)
//@BenchmarkMode(Mode.AverageTime)
//@OutputTimeUnit(TimeUnit.MILLISECONDS)
//@State(Scope.Benchmark)
open class StatefulActorAkkaBenchmark {

lateinit var system: ActorSystem
Expand Down Expand Up @@ -72,22 +71,22 @@ open class StatefulActorAkkaBenchmark {
Await.ready(system.terminate(), Duration.Inf())
}

@Benchmark
// @Benchmark
fun singleComputationSingleRequestor() {
run(1, 1)
}

@Benchmark
// @Benchmark
fun singleComputationMultipleRequestors() {
run(1, CORES_COUNT)
}

@Benchmark
// @Benchmark
fun multipleComputationsSingleRequestor() {
run(CORES_COUNT, 1)
}

@Benchmark
// @Benchmark
fun multipleComputationsMultipleRequestors() {
run(CORES_COUNT, CORES_COUNT)
}
Expand Down Expand Up @@ -120,7 +119,8 @@ open class StatefulActorAkkaBenchmark {

private fun createComputationActors(initLatch: CountDownLatch, count: Int): List<ActorRef> {
return (0 until count).map {
system.actorOf(Props.create(ComputationActor::class.java,
system.actorOf(Props.create(
ComputationActor::class.java,
LongArray(STATE_SIZE) { ThreadLocalRandom.current().nextLong(0, 100) }, initLatch)
.withDispatcher("akka.actor.$dispatcher"))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package benchmarks.scheduler

import benchmarks.akka.*
import kotlinx.coroutines.*
import org.openjdk.jmh.annotations.*
import org.openjdk.jmh.annotations.State
import java.lang.Thread.*
import java.util.concurrent.*
import kotlin.concurrent.*
import kotlin.coroutines.*

@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(value = 1)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Thread)
open class DispatchersContextSwitchBenchmark {
private val nCoroutines = 10000
private val delayTimeMs = 1L
private val nRepeatDelay = 10

private val fjp = ForkJoinPool.commonPool().asCoroutineDispatcher()
private val ftp = Executors.newFixedThreadPool(CORES_COUNT - 1).asCoroutineDispatcher()

@TearDown
fun teardown() {
ftp.close()
(ftp.executor as ExecutorService).awaitTermination(1, TimeUnit.SECONDS)
}

@Benchmark
fun coroutinesIoDispatcher() = runBenchmark(Dispatchers.IO)

@Benchmark
fun coroutinesDefaultDispatcher() = runBenchmark(Dispatchers.Default)

@Benchmark
fun coroutinesFjpDispatcher() = runBenchmark(fjp)

@Benchmark
fun coroutinesFtpDispatcher() = runBenchmark(ftp)

@Benchmark
fun coroutinesBlockingDispatcher() = runBenchmark(EmptyCoroutineContext)

@Benchmark
fun threads() {
val threads = List(nCoroutines) {
thread(start = true) {
repeat(nRepeatDelay) {
sleep(delayTimeMs)
}
}
}
threads.forEach { it.join() }
}

private fun runBenchmark(dispatcher: CoroutineContext) = runBlocking {
repeat(nCoroutines) {
launch(dispatcher) {
repeat(nRepeatDelay) {
delay(delayTimeMs)
}
}
}
}
}

Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package benchmarks
package benchmarks.scheduler

import benchmarks.*
import kotlinx.coroutines.*
import org.openjdk.jmh.annotations.*
import java.util.concurrent.*
Expand Down Expand Up @@ -44,7 +45,7 @@ open class ForkJoinBenchmark : ParametrizedDispatcherBase() {
}

lateinit var coefficients: LongArray
override var dispatcher: String = "experimental"
override var dispatcher: String = "scheduler"

@Setup
override fun setup() {
Expand Down Expand Up @@ -129,8 +130,18 @@ open class ForkJoinBenchmark : ParametrizedDispatcherBase() {
} else {
pendingCount = 2
// One may fork only once here and executing second task here with looping over firstComplete to be even more efficient
first = RecursiveAction(coefficients, start, start + (end - start) / 2, parent = this).fork()
second = RecursiveAction(coefficients, start + (end - start) / 2, end, parent = this).fork()
first = RecursiveAction(
coefficients,
start,
start + (end - start) / 2,
parent = this
).fork()
second = RecursiveAction(
coefficients,
start + (end - start) / 2,
end,
parent = this
).fork()
}

tryComplete()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package benchmarks
package benchmarks.scheduler

import benchmarks.*
import kotlinx.coroutines.*
import org.openjdk.jmh.annotations.*
import java.util.concurrent.*
Expand All @@ -21,7 +22,7 @@ import java.util.concurrent.*
@State(Scope.Benchmark)
open class LaunchBenchmark : ParametrizedDispatcherBase() {

@Param("experimental", "fjp")
@Param("scheduler", "fjp")
override var dispatcher: String = "fjp"

private val jobsToLaunch = 100
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package benchmarks
package benchmarks.scheduler

import benchmarks.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import org.openjdk.jmh.annotations.*
Expand Down Expand Up @@ -52,7 +53,7 @@ open class StatefulAsyncBenchmark : ParametrizedDispatcherBase() {
@Param("1", "8", "16")
var jobsCount = 1

@Param("fjp", "ftp_1", "ftp_8")
@Param("fjp", "ftp_1", "dispatcher")
override var dispatcher: String = "fjp"

@Volatile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package benchmarks.actors
package benchmarks.scheduler.actors

import benchmarks.*
import benchmarks.actors.StatefulActorBenchmark.*
import benchmarks.akka.*
import benchmarks.scheduler.actors.StatefulActorBenchmark.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import org.openjdk.jmh.annotations.*
Expand Down Expand Up @@ -57,18 +58,18 @@ import java.util.concurrent.*
@State(Scope.Benchmark)
open class ConcurrentStatefulActorBenchmark : ParametrizedDispatcherBase() {

@Param("1024", "8192", "262144")
@Param("1024", "8192")
var stateSize: Int = -1

@Param("fjp", "ftp_1", "ftp_8", "experimental")
@Param("fjp", "scheduler")
override var dispatcher: String = "fjp"

@Benchmark
fun multipleComputationsUnfair() = runBlocking {
val resultChannel: Channel<Unit> = Channel(1)
val computations = (0 until CORES_COUNT).map { computationActor(stateSize) }
val requestor = requestorActorUnfair(computations, resultChannel)
requestor.send(Letter(Start(), Channel(0)))
requestor.send(Letter(Start(), requestor))
resultChannel.receive()
}

Expand All @@ -77,7 +78,7 @@ open class ConcurrentStatefulActorBenchmark : ParametrizedDispatcherBase() {
val resultChannel: Channel<Unit> = Channel(1)
val computations = (0 until CORES_COUNT).map { computationActor(stateSize) }
val requestor = requestorActorFair(computations, resultChannel)
requestor.send(Letter(Start(), Channel(0)))
requestor.send(Letter(Start(), requestor))
resultChannel.receive()
}

Expand All @@ -95,6 +96,7 @@ open class ConcurrentStatefulActorBenchmark : ParametrizedDispatcherBase() {
}
is Long -> {
if (++received >= ROUNDS * 8) {
computations.forEach { it.close() }
stopChannel.send(Unit)
return@actor
} else {
Expand Down Expand Up @@ -122,6 +124,7 @@ open class ConcurrentStatefulActorBenchmark : ParametrizedDispatcherBase() {
}
is Long -> {
if (++receivedTotal >= ROUNDS * computations.size) {
computations.forEach { it.close() }
stopChannel.send(Unit)
return@actor
} else {
Expand All @@ -136,4 +139,4 @@ open class ConcurrentStatefulActorBenchmark : ParametrizedDispatcherBase() {
}
}
}
}
}
Loading