diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelProducerConsumerBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelProducerConsumerBenchmark.kt index 77b907f6e9..941e3d84ba 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelProducerConsumerBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelProducerConsumerBenchmark.kt @@ -1,3 +1,7 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + package benchmarks import kotlinx.coroutines.* diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ParametrizedDispatcherBase.kt b/benchmarks/src/jmh/kotlin/benchmarks/ParametrizedDispatcherBase.kt index f8e88bf63b..fab052370e 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ParametrizedDispatcherBase.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ParametrizedDispatcherBase.kt @@ -4,7 +4,7 @@ package benchmarks -import benchmarks.actors.CORES_COUNT +import benchmarks.akka.CORES_COUNT import kotlinx.coroutines.* import kotlinx.coroutines.scheduling.* import org.openjdk.jmh.annotations.Param @@ -22,14 +22,14 @@ abstract class ParametrizedDispatcherBase : CoroutineScope { abstract var dispatcher: String override lateinit var coroutineContext: CoroutineContext - var closeable: Closeable? = null + private var closeable: Closeable? = null - @UseExperimental(InternalCoroutinesApi::class) @Setup + @UseExperimental(InternalCoroutinesApi::class) open fun setup() { coroutineContext = when { dispatcher == "fjp" -> ForkJoinPool.commonPool().asCoroutineDispatcher() - dispatcher == "experimental" -> { + dispatcher == "scheduler" -> { ExperimentalCoroutineDispatcher(CORES_COUNT).also { closeable = it } } dispatcher.startsWith("ftp") -> { diff --git a/benchmarks/src/jmh/kotlin/benchmarks/actors/PingPongAkkaBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/akka/PingPongAkkaBenchmark.kt similarity index 89% rename from benchmarks/src/jmh/kotlin/benchmarks/actors/PingPongAkkaBenchmark.kt rename to benchmarks/src/jmh/kotlin/benchmarks/akka/PingPongAkkaBenchmark.kt index 6b71e35f8a..1a6e9d4036 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/actors/PingPongAkkaBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/akka/PingPongAkkaBenchmark.kt @@ -1,8 +1,8 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -package benchmarks.actors +package benchmarks.akka import akka.actor.ActorRef import akka.actor.ActorSystem @@ -13,7 +13,6 @@ import org.openjdk.jmh.annotations.* import scala.concurrent.Await import scala.concurrent.duration.Duration import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit const val N_MESSAGES = 100_000 @@ -29,12 +28,12 @@ class Stop * PingPongAkkaBenchmark.singlePingPong default-dispatcher avgt 10 173.742 ± 41.984 ms/op * PingPongAkkaBenchmark.singlePingPong single-thread-dispatcher avgt 10 24.181 ± 0.730 ms/op */ -@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) -@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) -@Fork(value = 2) -@BenchmarkMode(Mode.AverageTime) -@OutputTimeUnit(TimeUnit.MILLISECONDS) -@State(Scope.Benchmark) +//@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +//@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +//@Fork(value = 2) +//@BenchmarkMode(Mode.AverageTime) +//@OutputTimeUnit(TimeUnit.MILLISECONDS) +//@State(Scope.Benchmark) open class PingPongAkkaBenchmark { lateinit var system: ActorSystem @@ -62,12 +61,12 @@ open class PingPongAkkaBenchmark { Await.ready(system.terminate(), Duration.Inf()) } - @Benchmark +// @Benchmark fun singlePingPong() { runPingPongs(1) } - @Benchmark +// @Benchmark fun coresCountPingPongs() { runPingPongs(Runtime.getRuntime().availableProcessors()) } diff --git a/benchmarks/src/jmh/kotlin/benchmarks/actors/StatefulActorAkkaBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/akka/StatefulActorAkkaBenchmark.kt similarity index 92% rename from benchmarks/src/jmh/kotlin/benchmarks/actors/StatefulActorAkkaBenchmark.kt rename to benchmarks/src/jmh/kotlin/benchmarks/akka/StatefulActorAkkaBenchmark.kt index c19c91fa81..4e3ad6ce4d 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/actors/StatefulActorAkkaBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/akka/StatefulActorAkkaBenchmark.kt @@ -1,8 +1,8 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -package benchmarks.actors +package benchmarks.akka import akka.actor.ActorRef import akka.actor.ActorSystem @@ -14,7 +14,6 @@ import scala.concurrent.Await import scala.concurrent.duration.Duration import java.util.concurrent.CountDownLatch import java.util.concurrent.ThreadLocalRandom -import java.util.concurrent.TimeUnit const val ROUNDS = 10_000 const val STATE_SIZE = 1024 @@ -38,12 +37,12 @@ val CORES_COUNT = Runtime.getRuntime().availableProcessors() * StatefulActorAkkaBenchmark.singleComputationSingleRequestor default-dispatcher avgt 14 39.964 ± 2.343 ms/op * StatefulActorAkkaBenchmark.singleComputationSingleRequestor single-thread-dispatcher avgt 14 10.214 ± 2.152 ms/op */ -@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) -@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) -@Fork(value = 2) -@BenchmarkMode(Mode.AverageTime) -@OutputTimeUnit(TimeUnit.MILLISECONDS) -@State(Scope.Benchmark) +//@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) +//@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) +//@Fork(value = 2) +//@BenchmarkMode(Mode.AverageTime) +//@OutputTimeUnit(TimeUnit.MILLISECONDS) +//@State(Scope.Benchmark) open class StatefulActorAkkaBenchmark { lateinit var system: ActorSystem @@ -72,22 +71,22 @@ open class StatefulActorAkkaBenchmark { Await.ready(system.terminate(), Duration.Inf()) } - @Benchmark +// @Benchmark fun singleComputationSingleRequestor() { run(1, 1) } - @Benchmark +// @Benchmark fun singleComputationMultipleRequestors() { run(1, CORES_COUNT) } - @Benchmark +// @Benchmark fun multipleComputationsSingleRequestor() { run(CORES_COUNT, 1) } - @Benchmark +// @Benchmark fun multipleComputationsMultipleRequestors() { run(CORES_COUNT, CORES_COUNT) } @@ -120,7 +119,8 @@ open class StatefulActorAkkaBenchmark { private fun createComputationActors(initLatch: CountDownLatch, count: Int): List { return (0 until count).map { - system.actorOf(Props.create(ComputationActor::class.java, + system.actorOf(Props.create( + ComputationActor::class.java, LongArray(STATE_SIZE) { ThreadLocalRandom.current().nextLong(0, 100) }, initLatch) .withDispatcher("akka.actor.$dispatcher")) } diff --git a/benchmarks/src/jmh/kotlin/benchmarks/scheduler/DispatchersContextSwitchBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/scheduler/DispatchersContextSwitchBenchmark.kt new file mode 100644 index 0000000000..e7f806760e --- /dev/null +++ b/benchmarks/src/jmh/kotlin/benchmarks/scheduler/DispatchersContextSwitchBenchmark.kt @@ -0,0 +1,73 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package benchmarks.scheduler + +import benchmarks.akka.* +import kotlinx.coroutines.* +import org.openjdk.jmh.annotations.* +import org.openjdk.jmh.annotations.State +import java.lang.Thread.* +import java.util.concurrent.* +import kotlin.concurrent.* +import kotlin.coroutines.* + +@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Thread) +open class DispatchersContextSwitchBenchmark { + private val nCoroutines = 10000 + private val delayTimeMs = 1L + private val nRepeatDelay = 10 + + private val fjp = ForkJoinPool.commonPool().asCoroutineDispatcher() + private val ftp = Executors.newFixedThreadPool(CORES_COUNT - 1).asCoroutineDispatcher() + + @TearDown + fun teardown() { + ftp.close() + (ftp.executor as ExecutorService).awaitTermination(1, TimeUnit.SECONDS) + } + + @Benchmark + fun coroutinesIoDispatcher() = runBenchmark(Dispatchers.IO) + + @Benchmark + fun coroutinesDefaultDispatcher() = runBenchmark(Dispatchers.Default) + + @Benchmark + fun coroutinesFjpDispatcher() = runBenchmark(fjp) + + @Benchmark + fun coroutinesFtpDispatcher() = runBenchmark(ftp) + + @Benchmark + fun coroutinesBlockingDispatcher() = runBenchmark(EmptyCoroutineContext) + + @Benchmark + fun threads() { + val threads = List(nCoroutines) { + thread(start = true) { + repeat(nRepeatDelay) { + sleep(delayTimeMs) + } + } + } + threads.forEach { it.join() } + } + + private fun runBenchmark(dispatcher: CoroutineContext) = runBlocking { + repeat(nCoroutines) { + launch(dispatcher) { + repeat(nRepeatDelay) { + delay(delayTimeMs) + } + } + } + } +} + diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ForkJoinBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/scheduler/ForkJoinBenchmark.kt similarity index 90% rename from benchmarks/src/jmh/kotlin/benchmarks/ForkJoinBenchmark.kt rename to benchmarks/src/jmh/kotlin/benchmarks/scheduler/ForkJoinBenchmark.kt index 21d0f54bf0..0c731c3ba8 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ForkJoinBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/scheduler/ForkJoinBenchmark.kt @@ -1,9 +1,10 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -package benchmarks +package benchmarks.scheduler +import benchmarks.* import kotlinx.coroutines.* import org.openjdk.jmh.annotations.* import java.util.concurrent.* @@ -44,7 +45,7 @@ open class ForkJoinBenchmark : ParametrizedDispatcherBase() { } lateinit var coefficients: LongArray - override var dispatcher: String = "experimental" + override var dispatcher: String = "scheduler" @Setup override fun setup() { @@ -129,8 +130,18 @@ open class ForkJoinBenchmark : ParametrizedDispatcherBase() { } else { pendingCount = 2 // One may fork only once here and executing second task here with looping over firstComplete to be even more efficient - first = RecursiveAction(coefficients, start, start + (end - start) / 2, parent = this).fork() - second = RecursiveAction(coefficients, start + (end - start) / 2, end, parent = this).fork() + first = RecursiveAction( + coefficients, + start, + start + (end - start) / 2, + parent = this + ).fork() + second = RecursiveAction( + coefficients, + start + (end - start) / 2, + end, + parent = this + ).fork() } tryComplete() diff --git a/benchmarks/src/jmh/kotlin/benchmarks/LaunchBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/scheduler/LaunchBenchmark.kt similarity index 90% rename from benchmarks/src/jmh/kotlin/benchmarks/LaunchBenchmark.kt rename to benchmarks/src/jmh/kotlin/benchmarks/scheduler/LaunchBenchmark.kt index 2639dbb2eb..8435ddc262 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/LaunchBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/scheduler/LaunchBenchmark.kt @@ -1,9 +1,10 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -package benchmarks +package benchmarks.scheduler +import benchmarks.* import kotlinx.coroutines.* import org.openjdk.jmh.annotations.* import java.util.concurrent.* @@ -21,7 +22,7 @@ import java.util.concurrent.* @State(Scope.Benchmark) open class LaunchBenchmark : ParametrizedDispatcherBase() { - @Param("experimental", "fjp") + @Param("scheduler", "fjp") override var dispatcher: String = "fjp" private val jobsToLaunch = 100 diff --git a/benchmarks/src/jmh/kotlin/benchmarks/StatefulAwaitsBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/scheduler/StatefulAwaitsBenchmark.kt similarity index 97% rename from benchmarks/src/jmh/kotlin/benchmarks/StatefulAwaitsBenchmark.kt rename to benchmarks/src/jmh/kotlin/benchmarks/scheduler/StatefulAwaitsBenchmark.kt index 8fdb146f77..93667c0c2f 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/StatefulAwaitsBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/scheduler/StatefulAwaitsBenchmark.kt @@ -1,9 +1,10 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -package benchmarks +package benchmarks.scheduler +import benchmarks.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import org.openjdk.jmh.annotations.* @@ -52,7 +53,7 @@ open class StatefulAsyncBenchmark : ParametrizedDispatcherBase() { @Param("1", "8", "16") var jobsCount = 1 - @Param("fjp", "ftp_1", "ftp_8") + @Param("fjp", "ftp_1", "dispatcher") override var dispatcher: String = "fjp" @Volatile diff --git a/benchmarks/src/jmh/kotlin/benchmarks/actors/ConcurrentStatefulActorBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/ConcurrentStatefulActorBenchmark.kt similarity index 94% rename from benchmarks/src/jmh/kotlin/benchmarks/actors/ConcurrentStatefulActorBenchmark.kt rename to benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/ConcurrentStatefulActorBenchmark.kt index db3195ff55..6998577310 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/actors/ConcurrentStatefulActorBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/ConcurrentStatefulActorBenchmark.kt @@ -2,10 +2,11 @@ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -package benchmarks.actors +package benchmarks.scheduler.actors import benchmarks.* -import benchmarks.actors.StatefulActorBenchmark.* +import benchmarks.akka.* +import benchmarks.scheduler.actors.StatefulActorBenchmark.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import org.openjdk.jmh.annotations.* @@ -57,10 +58,10 @@ import java.util.concurrent.* @State(Scope.Benchmark) open class ConcurrentStatefulActorBenchmark : ParametrizedDispatcherBase() { - @Param("1024", "8192", "262144") + @Param("1024", "8192") var stateSize: Int = -1 - @Param("fjp", "ftp_1", "ftp_8", "experimental") + @Param("fjp", "scheduler") override var dispatcher: String = "fjp" @Benchmark @@ -68,7 +69,7 @@ open class ConcurrentStatefulActorBenchmark : ParametrizedDispatcherBase() { val resultChannel: Channel = Channel(1) val computations = (0 until CORES_COUNT).map { computationActor(stateSize) } val requestor = requestorActorUnfair(computations, resultChannel) - requestor.send(Letter(Start(), Channel(0))) + requestor.send(Letter(Start(), requestor)) resultChannel.receive() } @@ -77,7 +78,7 @@ open class ConcurrentStatefulActorBenchmark : ParametrizedDispatcherBase() { val resultChannel: Channel = Channel(1) val computations = (0 until CORES_COUNT).map { computationActor(stateSize) } val requestor = requestorActorFair(computations, resultChannel) - requestor.send(Letter(Start(), Channel(0))) + requestor.send(Letter(Start(), requestor)) resultChannel.receive() } @@ -95,6 +96,7 @@ open class ConcurrentStatefulActorBenchmark : ParametrizedDispatcherBase() { } is Long -> { if (++received >= ROUNDS * 8) { + computations.forEach { it.close() } stopChannel.send(Unit) return@actor } else { @@ -122,6 +124,7 @@ open class ConcurrentStatefulActorBenchmark : ParametrizedDispatcherBase() { } is Long -> { if (++receivedTotal >= ROUNDS * computations.size) { + computations.forEach { it.close() } stopChannel.send(Unit) return@actor } else { @@ -136,4 +139,4 @@ open class ConcurrentStatefulActorBenchmark : ParametrizedDispatcherBase() { } } } -} \ No newline at end of file +} diff --git a/benchmarks/src/jmh/kotlin/benchmarks/actors/CycledActorsBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/CycledActorsBenchmark.kt similarity index 94% rename from benchmarks/src/jmh/kotlin/benchmarks/actors/CycledActorsBenchmark.kt rename to benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/CycledActorsBenchmark.kt index 385693e38d..67548f624e 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/actors/CycledActorsBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/CycledActorsBenchmark.kt @@ -2,10 +2,11 @@ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -package benchmarks.actors +package benchmarks.scheduler.actors import benchmarks.* -import benchmarks.actors.PingPongActorBenchmark.* +import benchmarks.akka.* +import benchmarks.scheduler.actors.PingPongActorBenchmark.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import org.openjdk.jmh.annotations.* @@ -28,8 +29,8 @@ import java.util.concurrent.* * CycledActorsBenchmark.cycledActors 262144 experimental avgt 14 1804.146 ± 57.275 ms/op */ @Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) -@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS) -@Fork(value = 3) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) @State(Scope.Benchmark) @@ -39,10 +40,10 @@ open class CycledActorsBenchmark : ParametrizedDispatcherBase() { val NO_CHANNEL = Channel(0) } - @Param("fjp", "ftp_1", "experimental") + @Param("fjp", "ftp_1", "scheduler") override var dispatcher: String = "fjp" - @Param("524288") + @Param("1", "1024") var actorStateSize = 1 @Benchmark diff --git a/benchmarks/src/jmh/kotlin/benchmarks/actors/PingPongActorBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/PingPongActorBenchmark.kt similarity index 96% rename from benchmarks/src/jmh/kotlin/benchmarks/actors/PingPongActorBenchmark.kt rename to benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/PingPongActorBenchmark.kt index 82e9b15222..2d547e2660 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/actors/PingPongActorBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/PingPongActorBenchmark.kt @@ -2,14 +2,14 @@ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -package benchmarks.actors +package benchmarks.scheduler.actors import benchmarks.* +import benchmarks.akka.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import org.openjdk.jmh.annotations.* import java.util.concurrent.* -import kotlin.coroutines.* /* * Benchmark (dispatcher) Mode Cnt Score Error Units @@ -27,14 +27,14 @@ import kotlin.coroutines.* */ @Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) @Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) -@Fork(value = 2) +@Fork(value = 1) @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) @State(Scope.Benchmark) open class PingPongActorBenchmark : ParametrizedDispatcherBase() { data class Letter(val message: Any?, val sender: SendChannel) - @Param("experimental", "fjp", "ftp_1", "ftp_8") + @Param("scheduler", "fjp", "ftp_1") override var dispatcher: String = "fjp" @Benchmark diff --git a/benchmarks/src/jmh/kotlin/benchmarks/actors/PingPongWithBlockingContext.kt b/benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/PingPongWithBlockingContext.kt similarity index 93% rename from benchmarks/src/jmh/kotlin/benchmarks/actors/PingPongWithBlockingContext.kt rename to benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/PingPongWithBlockingContext.kt index c6afdced25..86a9440a58 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/actors/PingPongWithBlockingContext.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/PingPongWithBlockingContext.kt @@ -2,8 +2,9 @@ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -package benchmarks.actors +package benchmarks.scheduler.actors +import benchmarks.akka.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.scheduling.* @@ -19,8 +20,8 @@ import kotlin.coroutines.* * PingPongWithBlockingContext.withContextPingPong avgt 20 761.669 ± 41.371 ms/op */ @Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) -@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS) -@Fork(value = 2) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) @State(Scope.Benchmark) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/actors/StatefulActorBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/StatefulActorBenchmark.kt similarity index 95% rename from benchmarks/src/jmh/kotlin/benchmarks/actors/StatefulActorBenchmark.kt rename to benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/StatefulActorBenchmark.kt index 6968c8952d..fb342295a6 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/actors/StatefulActorBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/StatefulActorBenchmark.kt @@ -2,9 +2,10 @@ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -package benchmarks.actors +package benchmarks.scheduler.actors import benchmarks.* +import benchmarks.akka.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import org.openjdk.jmh.annotations.* @@ -32,15 +33,15 @@ import java.util.concurrent.* */ @Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) @Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) -@Fork(value = 2) +@Fork(value = 1) @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) @State(Scope.Benchmark) open class StatefulActorBenchmark : ParametrizedDispatcherBase() { - data class Letter(val message: Any, val sender: Channel) + data class Letter(val message: Any, val sender: SendChannel) - @Param("fjp", "ftp_1", "ftp_8", "experimental") + @Param("fjp", "ftp_1", "ftp_8", "scheduler") override var dispatcher: String = "fjp" @Benchmark diff --git a/kotlinx-coroutines-core/common/src/internal/LockFreeTaskQueue.kt b/kotlinx-coroutines-core/common/src/internal/LockFreeTaskQueue.kt index 68723104e3..c764f51792 100644 --- a/kotlinx-coroutines-core/common/src/internal/LockFreeTaskQueue.kt +++ b/kotlinx-coroutines-core/common/src/internal/LockFreeTaskQueue.kt @@ -54,12 +54,9 @@ internal open class LockFreeTaskQueue( } @Suppress("UNCHECKED_CAST") - fun removeFirstOrNull(): E? = removeFirstOrNullIf { true } - - @Suppress("UNCHECKED_CAST") - inline fun removeFirstOrNullIf(predicate: (E) -> Boolean): E? { + fun removeFirstOrNull(): E? { _cur.loop { cur -> - val result = cur.removeFirstOrNullIf(predicate) + val result = cur.removeFirstOrNull() if (result !== Core.REMOVE_FROZEN) return result as E? _cur.compareAndSet(cur, cur.next()) } @@ -164,10 +161,7 @@ internal class LockFreeTaskQueueCore( } // REMOVE_FROZEN | null (EMPTY) | E (SUCCESS) - fun removeFirstOrNull(): Any? = removeFirstOrNullIf { true } - - // REMOVE_FROZEN | null (EMPTY) | E (SUCCESS) - inline fun removeFirstOrNullIf(predicate: (E) -> Boolean): Any? { + fun removeFirstOrNull(): Any? { _state.loop { state -> if (state and FROZEN_MASK != 0L) return REMOVE_FROZEN // frozen -- cannot modify state.withState { head, tail -> @@ -181,9 +175,6 @@ internal class LockFreeTaskQueueCore( } // element == Placeholder can only be when add has not finished yet if (element is Placeholder) return null // consider it not added yet - // now we tentative know element to remove -- check predicate - @Suppress("UNCHECKED_CAST") - if (!predicate(element as E)) return null // we cannot put null into array here, because copying thread could replace it with Placeholder and that is a disaster val newHead = (head + 1) and MAX_CAPACITY_MASK if (_state.compareAndSet(state, state.updateHead(newHead))) { diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index 4089710e75..09e9deb838 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -8,60 +8,92 @@ import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.internal.* import java.io.* -import java.util.* import java.util.concurrent.* +import java.util.concurrent.atomic.* import java.util.concurrent.locks.* +import kotlin.math.* +import kotlin.random.* /** - * Coroutine scheduler (pool of shared threads) which primary target is to distribute dispatched coroutines over worker threads, - * including both CPU-intensive and blocking tasks. + * Coroutine scheduler (pool of shared threads) which primary target is to distribute dispatched coroutines + * over worker threads, including both CPU-intensive and blocking tasks, is the most efficient manner. * * Current scheduler implementation has two optimization targets: * * Efficiency in the face of communication patterns (e.g., actors communicating via channel) - * * Dynamic resizing to support blocking calls without re-dispatching coroutine to separate "blocking" thread pool + * * Dynamic resizing to support blocking calls without re-dispatching coroutine to separate "blocking" thread pool. * * ### Structural overview * - * Scheduler consists of [corePoolSize] worker threads to execute CPU-bound tasks and up to [maxPoolSize] (lazily created) threads - * to execute blocking tasks. Every worker has local queue in addition to global scheduler queue and global queue - * has priority over local queue to avoid starvation of externally-submitted (e.g., from Android UI thread) tasks and work-stealing is implemented - * on top of that queues to provide even load distribution and illusion of centralized run queue. + * Scheduler consists of [corePoolSize] worker threads to execute CPU-bound tasks and up to + * [maxPoolSize] lazily created threads to execute blocking tasks. + * Every worker has a local queue in addition to a global scheduler queue + * and the global queue has priority over local queue to avoid starvation of externally-submitted + * (e.g. from Android UI thread) tasks. + * Work-stealing is implemented on top of that queues to provide + * even load distribution and illusion of centralized run queue. * - * ### Scheduling + * ### Scheduling policy * - * When a coroutine is dispatched from within scheduler worker, it's placed into the head of worker run queue. - * If the head is not empty, the task from the head is moved to the tail. Though it is unfair scheduling policy, - * it effectively couples communicating coroutines into one and eliminates scheduling latency that arises from placing task to the end of the queue. - * Placing former head to the tail is necessary to provide semi-FIFO order, otherwise queue degenerates to stack. + * When a coroutine is dispatched from within a scheduler worker, it's placed into the head of worker run queue. + * If the head is not empty, the task from the head is moved to the tail. Though it is an unfair scheduling policy, + * it effectively couples communicating coroutines into one and eliminates scheduling latency + * that arises from placing tasks to the end of the queue. + * Placing former head to the tail is necessary to provide semi-FIFO order, otherwise, queue degenerates to stack. * When a coroutine is dispatched from an external thread, it's put into the global queue. + * The original idea with a single-slot LIFO buffer comes from Golang runtime scheduler by D. Vyukov. + * It was proven to be "fair enough", performant and generally well accepted and initially was a significant inspiration + * source for the coroutine scheduler. * * ### Work stealing and affinity * - * To provide even tasks distribution worker tries to steal tasks from other workers queues before parking when his local queue is empty. - * A non-standard solution is implemented to provide tasks affinity: task may be stolen only if it's 'stale' enough (based on the value of [WORK_STEALING_TIME_RESOLUTION_NS]). - * For this purpose monotonic global clock ([System.nanoTime]) is used and every task has associated with it submission time. - * This approach shows outstanding results when coroutines are cooperative, but as downside scheduler now depends on high-resolution global clock - * which may limit scalability on NUMA machines. + * To provide even tasks distribution worker tries to steal tasks from other workers queues + * before parking when his local queue is empty. + * A non-standard solution is implemented to provide tasks affinity: a task from FIFO buffer may be stolen + * only if it is stale enough based on the value of [WORK_STEALING_TIME_RESOLUTION_NS]. + * For this purpose, monotonic global clock is used, and every task has associated with its submission time. + * This approach shows outstanding results when coroutines are cooperative, + * but as downside scheduler now depends on a high-resolution global clock, + * which may limit scalability on NUMA machines. Tasks from LIFO buffer can be stolen on a regular basis. * - * ### Dynamic resizing and support of blocking tasks + * ### Thread management + * One of the hardest parts of the scheduler is decentralized management of the threads with the progress guarantees + * similar to the regular centralized executors. + * The state of the threads consists of [controlState] and [parkedWorkersStack] fields. + * The former field incorporates the amount of created threads, CPU-tokens and blocking tasks + * that require a thread compensation, + * while the latter represents intrusive versioned Treiber stack of idle workers. + * When a worker cannot find any work, they first add themselves to the stack, + * then re-scans the queue to avoid missing signals and then attempts to park + * with additional rendezvous against unnecessary parking. + * If a worker finds a task that it cannot yet steal due to time constraints, it stores this fact in its state + * (to be uncounted when additional work is signalled) and parks for such duration. * - * To support possibly blocking tasks [TaskMode] and CPU quota (via [cpuPermits]) are used. - * To execute [TaskMode.NON_BLOCKING] tasks from the global queue or to steal tasks from other workers - * the worker should have CPU permit. When a worker starts executing [TaskMode.PROBABLY_BLOCKING] task, - * it releases its CPU permit, giving a hint to a scheduler that additional thread should be created (or awaken) - * if new [TaskMode.NON_BLOCKING] task will arrive. When a worker finishes executing blocking task, it executes - * all tasks from its local queue (including [TaskMode.NON_BLOCKING]) and then parks as retired without polling - * global queue or trying to steal new tasks. Such approach may slightly limit scalability (allowing more than [corePoolSize] threads - * to execute CPU-bound tasks at once), but in practice, it is not, significantly reducing context switches and tasks re-dispatching. + * When a new task arrives in the scheduler (whether it is local or global queue), + * either an idle worker is being signalled, or a new worker is attempted to be created. + * Only [corePoolSize] workers can be created for regular CPU tasks) * - * @suppress **This is unstable API and it is subject to change.** + * ### Support for blocking tasks + * The scheduler also supports the notion of [blocking][TaskMode.PROBABLY_BLOCKING] tasks. + * When executing or enqueuing blocking tasks, the scheduler notifies or creates one more worker in + * addition to core pool size, so at any given moment, it has [corePoolSize] threads (potentially not yet created) + * to serve CPU-bound tasks. To properly guarantee liveness, the scheduler maintains + * "CPU permits" -- [corePoolSize] special tokens that permit an arbitrary worker to execute and steal CPU-bound tasks. + * When worker encounters blocking tasks, it basically hands off its permit to another thread (not directly though) to + * keep invariant "scheduler always has at least min(pending CPU tasks, core pool size) + * and at most core pool size threads to execute CPU tasks". + * To avoid overprovision, workers without CPU permit are allowed to scan [globalBlockingQueue] + * and steal **only** blocking tasks from other workers. + * + * The scheduler does not limit the count of pending blocking tasks, potentially creating up to [maxPoolSize] threads. + * End users do not have access to the scheduler directly and can dispatch blocking tasks only with + * [LimitingDispatcher] that does control concurrency level by its own mechanism. */ @Suppress("NOTHING_TO_INLINE") internal class CoroutineScheduler( - private val corePoolSize: Int, - private val maxPoolSize: Int, - private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, - private val schedulerName: String = DEFAULT_SCHEDULER_NAME + @JvmField val corePoolSize: Int, + @JvmField val maxPoolSize: Int, + @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, + @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME ) : Executor, Closeable { init { require(corePoolSize >= MIN_SUPPORTED_POOL_SIZE) { @@ -78,23 +110,26 @@ internal class CoroutineScheduler( } } - private val globalQueue: GlobalQueue = GlobalQueue() + @JvmField + val globalCpuQueue = GlobalQueue() + @JvmField + val globalBlockingQueue = GlobalQueue() - /** - * Permits to execute non-blocking (~CPU-intensive) tasks. - * If worker owns a permit, it can schedule non-blocking tasks to its queue and steal work from other workers. - * If worker doesn't, it can execute only blocking tasks (and non-blocking leftovers from its local queue) - * and will try to park as soon as its queue is empty. - */ - private val cpuPermits = Semaphore(corePoolSize, false) + private fun addToGlobalQueue(task: Task): Boolean { + return if (task.isBlocking) { + globalBlockingQueue.addLast(task) + } else { + globalCpuQueue.addLast(task) + } + } /** * The stack of parker workers. - * Every worker registers itself in a stack before parking (if it was not previously registered) - * and callers of [requestCpuWorker] will try to unpark a thread from the top of a stack. - * This is a form of intrusive garbage-free Treiber stack where Worker also is a stack node. + * Every worker registers itself in a stack before parking (if it was not previously registered), + * so it can be signalled when new tasks arrive. + * This is a form of intrusive garbage-free Treiber stack where [Worker] also is a stack node. * - * The stack is better than a queue (even with contention on top) because it unparks threads + * The stack is better than a queue (even with the contention on top) because it unparks threads * in most-recently used order, improving both performance and locality. * Moreover, it decreases threads thrashing, if the pool has n threads when only n / 2 is required, * the latter half will never be unparked and will terminate itself after [IDLE_WORKER_KEEP_ALIVE_NS]. @@ -111,7 +146,7 @@ internal class CoroutineScheduler( * * Note, [newIndex] can be zero for the worker that is being terminated (removed from [workers]). */ - private fun parkedWorkersStackTopUpdate(worker: Worker, oldIndex: Int, newIndex: Int) { + internal fun parkedWorkersStackTopUpdate(worker: Worker, oldIndex: Int, newIndex: Int) { parkedWorkersStack.loop { top -> val index = (top and PARKED_INDEX_MASK).toInt() val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK @@ -135,9 +170,12 @@ internal class CoroutineScheduler( * This method is invoked only from the worker thread itself. * This invocation always precedes [LockSupport.parkNanos]. * See [Worker.doPark]. + * + * Returns `true` if worker was added to the stack by this invocation, `false` if it was already + * registered in the stack. */ - private fun parkedWorkersStackPush(worker: Worker) { - if (worker.nextParkedWorker !== NOT_IN_STACK) return // already in stack, bail out + internal fun parkedWorkersStackPush(worker: Worker): Boolean { + if (worker.nextParkedWorker !== NOT_IN_STACK) return false // already in stack, bail out /* * The below loop can be entered only if this worker was not in the stack and, since no other thread * can add it to the stack (only the worker itself), this invariant holds while this loop executes. @@ -153,7 +191,7 @@ internal class CoroutineScheduler( * also invokes parkedWorkersStackTopUpdate which updates version to make next CAS fail. * Successful CAS of the stack top completes successful push. */ - if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) return + if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) return true } } @@ -216,70 +254,66 @@ internal class CoroutineScheduler( * State of worker threads. * [workers] is array of lazily created workers up to [maxPoolSize] workers. * [createdWorkers] is count of already created workers (worker with index lesser than [createdWorkers] exists). - * [blockingWorkers] is count of running workers which are executing [TaskMode.PROBABLY_BLOCKING] task. - * All mutations of array's content are guarded by lock. + * [blockingTasks] is count of pending (either in the queue or being executed) tasks * * **NOTE**: `workers[0]` is always `null` (never used, works as sentinel value), so * workers are 1-indexed, code path in [Worker.trySteal] is a bit faster and index swap during termination * works properly */ - private val workers: Array = arrayOfNulls(maxPoolSize + 1) + @JvmField + val workers = AtomicReferenceArray(maxPoolSize + 1) /** * Long describing state of workers in this pool. - * Currently includes created and blocking workers each occupying [BLOCKING_SHIFT] bits. + * Currently includes created, CPU-acquired and blocking workers each occupying [BLOCKING_SHIFT] bits. */ - private val controlState = atomic(0L) - + private val controlState = atomic(corePoolSize.toLong() shl CPU_PERMITS_SHIFT) private val createdWorkers: Int inline get() = (controlState.value and CREATED_MASK).toInt() - private val blockingWorkers: Int inline get() = (controlState.value and BLOCKING_MASK shr BLOCKING_SHIFT).toInt() + private val availableCpuPermits: Int inline get() = availableCpuPermits(controlState.value) private inline fun createdWorkers(state: Long): Int = (state and CREATED_MASK).toInt() - private inline fun blockingWorkers(state: Long): Int = (state and BLOCKING_MASK shr BLOCKING_SHIFT).toInt() + private inline fun blockingTasks(state: Long): Int = (state and BLOCKING_MASK shr BLOCKING_SHIFT).toInt() + public inline fun availableCpuPermits(state: Long): Int = (state and CPU_PERMITS_MASK shr CPU_PERMITS_SHIFT).toInt() // Guarded by synchronization private inline fun incrementCreatedWorkers(): Int = createdWorkers(controlState.incrementAndGet()) private inline fun decrementCreatedWorkers(): Int = createdWorkers(controlState.getAndDecrement()) - private inline fun incrementBlockingWorkers() { controlState.addAndGet(1L shl BLOCKING_SHIFT) } - private inline fun decrementBlockingWorkers() { controlState.addAndGet(-(1L shl BLOCKING_SHIFT)) } + private inline fun incrementBlockingTasks() = controlState.addAndGet(1L shl BLOCKING_SHIFT) - private val random = Random() - - // This is used a "stop signal" for close and shutdown functions - private val _isTerminated = atomic(0) // todo: replace with atomic boolean on new versions of atomicFu - private val isTerminated: Boolean get() = _isTerminated.value != 0 + private inline fun decrementBlockingTasks() { + controlState.addAndGet(-(1L shl BLOCKING_SHIFT)) + } - companion object { - private val MAX_SPINS = systemProp("kotlinx.coroutines.scheduler.spins", 1000, minValue = 1) - private val MAX_YIELDS = MAX_SPINS + systemProp("kotlinx.coroutines.scheduler.yields", 0, minValue = 0) + private inline fun tryAcquireCpuPermit(): Boolean = controlState.loop { state -> + val available = availableCpuPermits(state) + if (available == 0) return false + val update = state - (1L shl CPU_PERMITS_SHIFT) + if (controlState.compareAndSet(state, update)) return true + } - @JvmStatic // Note that is fits into Int (it is equal to 10^9) - private val MAX_PARK_TIME_NS = TimeUnit.SECONDS.toNanos(1).toInt() + private inline fun releaseCpuPermit() = controlState.addAndGet(1L shl CPU_PERMITS_SHIFT) - @JvmStatic - private val MIN_PARK_TIME_NS = (WORK_STEALING_TIME_RESOLUTION_NS / 4) - .coerceAtLeast(10) - .coerceAtMost(MAX_PARK_TIME_NS.toLong()).toInt() + // This is used a "stop signal" for close and shutdown functions + private val _isTerminated = atomic(false) + val isTerminated: Boolean get() = _isTerminated.value + companion object { // A symbol to mark workers that are not in parkedWorkersStack - private val NOT_IN_STACK = Symbol("NOT_IN_STACK") - - // Local queue 'add' results - private const val ADDED = -1 - // Added to the local queue, but pool requires additional worker to keep up - private const val ADDED_REQUIRES_HELP = 0 - private const val NOT_ADDED = 1 + @JvmField + val NOT_IN_STACK = Symbol("NOT_IN_STACK") - // Worker termination states - private const val FORBIDDEN = -1 - private const val ALLOWED = 0 + // Worker ctl states + private const val PARKED = -1 + private const val CLAIMED = 0 private const val TERMINATED = 1 // Masks of control state private const val BLOCKING_SHIFT = 21 // 2M threads max private const val CREATED_MASK: Long = (1L shl BLOCKING_SHIFT) - 1 private const val BLOCKING_MASK: Long = CREATED_MASK shl BLOCKING_SHIFT + private const val CPU_PERMITS_SHIFT = BLOCKING_SHIFT * 2 + private const val CPU_PERMITS_MASK = CREATED_MASK shl CPU_PERMITS_SHIFT internal const val MIN_SUPPORTED_POOL_SIZE = 1 // we support 1 for test purposes, but it is not usually used internal const val MAX_SUPPORTED_POOL_SIZE = (1 shl BLOCKING_SHIFT) - 2 @@ -297,7 +331,7 @@ internal class CoroutineScheduler( // Shuts down current scheduler and waits until all work is done and all threads are stopped. fun shutdown(timeout: Long) { // atomically set termination flag which is checked when workers are added or removed - if (!_isTerminated.compareAndSet(0, 1)) return + if (!_isTerminated.compareAndSet(false, true)) return // make sure we are not waiting for the current thread val currentWorker = currentWorker() // Capture # of created workers that cannot change anymore (mind the synchronized block!) @@ -312,20 +346,24 @@ internal class CoroutineScheduler( } val state = worker.state assert { state === WorkerState.TERMINATED } // Expected TERMINATED state - worker.localQueue.offloadAllWork(globalQueue) + worker.localQueue.offloadAllWorkTo(globalBlockingQueue) // Doesn't actually matter which queue to use } } // Make sure no more work is added to GlobalQueue from anywhere - globalQueue.close() + globalBlockingQueue.close() + globalCpuQueue.close() // Finish processing tasks from globalQueue and/or from this worker's local queue while (true) { - val task = currentWorker?.findTask() ?: globalQueue.removeFirstOrNull() ?: break + val task = currentWorker?.findTask(true) + ?: globalCpuQueue.removeFirstOrNull() + ?: globalBlockingQueue.removeFirstOrNull() + ?: break runSafely(task) } // Shutdown current thread currentWorker?.tryReleaseCpu(WorkerState.TERMINATED) // check & cleanup state - assert { cpuPermits.availablePermits() == corePoolSize } + assert { availableCpuPermits == corePoolSize } parkedWorkersStack.value = 0L controlState.value = 0L } @@ -334,7 +372,6 @@ internal class CoroutineScheduler( * Dispatches execution of a runnable [block] with a hint to a scheduler whether * this [block] may execute blocking operations (IO, system calls, locking primitives etc.) * - * @param block runnable to be dispatched * @param taskContext concurrency context of given [block] * @param fair whether the task should be dispatched fairly (strict FIFO) or not (semi-FIFO) */ @@ -342,17 +379,18 @@ internal class CoroutineScheduler( trackTask() // this is needed for virtual time support val task = createTask(block, taskContext) // try to submit the task to the local queue and act depending on the result - when (submitToLocalQueue(task, fair)) { - ADDED -> return - NOT_ADDED -> { - // try to offload task to global queue - if (!globalQueue.addLast(task)) { - // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted - throw RejectedExecutionException("$schedulerName was terminated") - } - requestCpuWorker() + val notAdded = submitToLocalQueue(task, fair) + if (notAdded != null) { + if (!addToGlobalQueue(notAdded)) { + // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted + throw RejectedExecutionException("$schedulerName was terminated") } - else -> requestCpuWorker() // ask for help + } + // Checking 'task' instead of 'notAdded' is completely okay + if (task.mode == TaskMode.NON_BLOCKING) { + signalCpuWork() + } else { + signalBlockingWork() } } @@ -366,33 +404,24 @@ internal class CoroutineScheduler( return TaskImpl(block, nanoTime, taskContext) } - /** - * Unparks or creates a [Worker] for executing non-blocking tasks if there are idle cores - */ - private fun requestCpuWorker() { - // No CPU available -- nothing to request - if (cpuPermits.availablePermits() == 0) { - tryUnpark() - return - } - /* - * Fast path -- we have retired or parked worker, unpark it and we're done. - * The data race here: when only one permit is available, multiple retired workers - * can be unparked, but only one will continue execution, so we're overproviding with threads - * in case of race to avoid spurious starvation - */ + private fun signalBlockingWork() { + // Use state snapshot to avoid thread overprovision + val stateSnapshot = incrementBlockingTasks() if (tryUnpark()) return - /* - * Create a thread. - * It's not preferable to use 'cpuWorkersCounter' here (moreover, it's implicitly here as corePoolSize - cpuPermits.availableTokens), - * cpuWorkersCounter doesn't take into account threads which are created (and either running or parked), but haven't - * CPU token: retiring workers, recently unparked workers before `findTask` call, etc. - * So if we will use cpuWorkersCounter, we start to overprovide with threads too much. - */ - val state = controlState.value + if (tryCreateWorker(stateSnapshot)) return + tryUnpark() // Try unpark again in case there was race between permit release and parking + } + + internal fun signalCpuWork() { + if (tryUnpark()) return + if (tryCreateWorker()) return + tryUnpark() + } + + private fun tryCreateWorker(state: Long = controlState.value): Boolean { val created = createdWorkers(state) - val blocking = blockingWorkers(state) - val cpuWorkers = created - blocking + val blocking = blockingTasks(state) + val cpuWorkers = (created - blocking).coerceAtLeast(0) /* * We check how many threads are there to handle non-blocking work, * and create one more if we have not enough of them. @@ -402,53 +431,18 @@ internal class CoroutineScheduler( // If we've created the first cpu worker and corePoolSize > 1 then create // one more (second) cpu worker, so that stealing between them is operational if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker() - if (newCpuWorkers > 0) return + if (newCpuWorkers > 0) return true } - // Try unpark again in case there was race between permit release and parking - tryUnpark() + return false } private fun tryUnpark(): Boolean { while (true) { val worker = parkedWorkersStackPop() ?: return false - /* - * If we successfully took the worker out of the queue, it could be in the following states: - * 1) Worker is parked, then we'd like to reset its spin and park counters, so after - * unpark it will try to steal from every worker at least once - * 2) Worker is not parked, but it actually idle and - * tries to find work. Then idle reset is required as well. - * Worker state may be either PARKING or CPU_ACQUIRED (from `findTask`) - * 3) Worker is active (unparked itself from `idleCpuWorker`), found tasks to do and is currently busy. - * Then `idleResetBeforeUnpark` will do nothing, but we can't distinguish this state from previous - * one, so just retry. - * 4) Worker is terminated. No harm in resetting its counters either. - */ - worker.idleResetBeforeUnpark() - /* - * Check that the thread we've found in the queue was indeed in parking state, before we - * actually try to unpark it. - */ - val wasParking = worker.isParking - /* - * Send unpark signal anyway, because the thread may have made decision to park but have not yet set its - * state to parking and this could be the last thread we have (unparking random thread would not harm). - */ - LockSupport.unpark(worker) - /* - * If this thread was not in parking state then we definitely need to find another thread. - * We err on the side of unparking more threads than needed here. - */ - if (!wasParking) continue - /* - * Terminating worker could be selected. - * If it's already TERMINATED or we cannot forbid it from terminating, then try find another worker. - */ - if (!worker.tryForbidTermination()) continue - /* - * Here we've successfully unparked a thread that was parked and had forbidden it from making - * decision to terminate, so we are now sure we've got some help. - */ - return true + if (worker.workerCtl.compareAndSet(PARKED, CLAIMED)) { + LockSupport.unpark(worker) + return true + } } } @@ -462,73 +456,44 @@ internal class CoroutineScheduler( if (isTerminated) return -1 val state = controlState.value val created = createdWorkers(state) - val blocking = blockingWorkers(state) - val cpuWorkers = created - blocking + val blocking = blockingTasks(state) + val cpuWorkers = (created - blocking).coerceAtLeast(0) // Double check for overprovision if (cpuWorkers >= corePoolSize) return 0 - if (created >= maxPoolSize || cpuPermits.availablePermits() == 0) return 0 + if (created >= maxPoolSize) return 0 // start & register new worker, commit index only after successful creation val newIndex = createdWorkers + 1 require(newIndex > 0 && workers[newIndex] == null) - val worker = Worker(newIndex).apply { start() } - require(newIndex == incrementCreatedWorkers()) + /* + * 1) Claim the slot (under a lock) by the newly created worker + * 2) Make it observable by increment created workers count + * 3) Only then start the worker, otherwise it may miss its own creation + */ + val worker = Worker(newIndex) workers[newIndex] = worker + require(newIndex == incrementCreatedWorkers()) + worker.start() return cpuWorkers + 1 } } /** - * Returns [ADDED], or [NOT_ADDED], or [ADDED_REQUIRES_HELP]. + * Returns `null` if task was successfully added or an instance of the + * task that was not added or replaced (thus should be added to global queue). */ - private fun submitToLocalQueue(task: Task, fair: Boolean): Int { - val worker = currentWorker() ?: return NOT_ADDED - + private fun submitToLocalQueue(task: Task, fair: Boolean): Task? { + val worker = currentWorker() ?: return task /* * This worker could have been already terminated from this thread by close/shutdown and it should not * accept any more tasks into its local queue. */ - if (worker.state === WorkerState.TERMINATED) return NOT_ADDED - - var result = ADDED - if (task.mode == TaskMode.NON_BLOCKING) { - /* - * If the worker is currently executing blocking task and tries to dispatch non-blocking task, it's one the following reasons: - * 1) Blocking worker is finishing its block and resumes non-blocking continuation - * 2) Blocking worker starts to create non-blocking jobs - * - * First use-case is expected (as recommended way of using blocking contexts), - * so we add non-blocking task to local queue, but also request CPU worker to mitigate second case - */ - if (worker.isBlocking) { - result = ADDED_REQUIRES_HELP - } else { - /* - * If thread is not blocking, then it's just tries to finish its - * local work in order to park (or grab another blocking task), do not add non-blocking tasks - * to its local queue if it can't acquire CPU - */ - val hasPermit = worker.tryAcquireCpuPermit() - if (!hasPermit) { - return NOT_ADDED - } - } - } - - val noOffloadingHappened = if (fair) { - worker.localQueue.addLast(task, globalQueue) - } else { - worker.localQueue.add(task, globalQueue) - } - - if (noOffloadingHappened) { - // When we're close to queue capacity, wake up anyone to steal work - // Note: non-atomic bufferSize here is Ok (it is just a performance optimization) - if (worker.localQueue.bufferSize > QUEUE_SIZE_OFFLOAD_THRESHOLD) { - return ADDED_REQUIRES_HELP - } - return result + if (worker.state === WorkerState.TERMINATED) return task + // Do not add CPU tasks in local queue if we are not able to execute it + if (task.mode === TaskMode.NON_BLOCKING && worker.state === WorkerState.BLOCKING) { + return task } - return ADDED_REQUIRES_HELP + worker.mayHaveLocalTasks = true + return worker.localQueue.add(task, fair = fair) } private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this } @@ -539,20 +504,20 @@ internal class CoroutineScheduler( * * State of the queues: * b for blocking, c for CPU, r for retiring. - * E.g. for [1b, 1b, 2c, 1r] means that pool has + * E.g. for [1b, 1b, 2c, 1d] means that pool has * two blocking workers with queue size 1, one worker with CPU permit and queue size 1 - * and one retiring (executing his local queue before parking) worker with queue size 1. + * and one dormant (executing his local queue before parking) worker with queue size 1. */ override fun toString(): String { var parkedWorkers = 0 var blockingWorkers = 0 var cpuWorkers = 0 - var retired = 0 + var dormant = 0 var terminated = 0 val queueSizes = arrayListOf() - for (worker in workers) { - if (worker == null) continue - val queueSize = worker.localQueue.size() + for (index in 1 until workers.length()) { + val worker = workers[index] ?: continue + val queueSize = worker.localQueue.size when (worker.state) { WorkerState.PARKING -> ++parkedWorkers WorkerState.BLOCKING -> { @@ -563,9 +528,9 @@ internal class CoroutineScheduler( ++cpuWorkers queueSizes += queueSize.toString() + "c" // CPU } - WorkerState.RETIRING -> { - ++retired - if (queueSize > 0) queueSizes += queueSize.toString() + "r" // Retiring + WorkerState.DORMANT -> { + ++dormant + if (queueSize > 0) queueSizes += queueSize.toString() + "d" // Retiring } WorkerState.TERMINATED -> ++terminated } @@ -579,17 +544,19 @@ internal class CoroutineScheduler( "CPU = $cpuWorkers, " + "blocking = $blockingWorkers, " + "parked = $parkedWorkers, " + - "retired = $retired, " + + "dormant = $dormant, " + "terminated = $terminated}, " + "running workers queues = $queueSizes, "+ - "global queue size = ${globalQueue.size}, " + - "Control State Workers {" + - "created = ${createdWorkers(state)}, " + - "blocking = ${blockingWorkers(state)}}" + - "]" + "global CPU queue size = ${globalCpuQueue.size}, " + + "global blocking queue size = ${globalBlockingQueue.size}, " + + "Control State {" + + "created workers= ${createdWorkers(state)}, " + + "blocking tasks = ${blockingTasks(state)}, " + + "CPUs acquired = ${corePoolSize - availableCpuPermits(state)}" + + "}]" } - private fun runSafely(task: Task) { + fun runSafely(task: Task) { try { task.run() } catch (e: Throwable) { @@ -617,36 +584,30 @@ internal class CoroutineScheduler( indexInArray = index } - val scheduler get() = this@CoroutineScheduler + inline val scheduler get() = this@CoroutineScheduler + @JvmField val localQueue: WorkQueue = WorkQueue() /** * Worker state. **Updated only by this worker thread**. - * By default, worker is in RETIRING state in the case when it was created, but all CPU tokens or tasks were taken. + * By default, worker is in DORMANT state in the case when it was created, but all CPU tokens or tasks were taken. + * Is used locally by the worker to maintain its own invariants. */ - @Volatile - var state = WorkerState.RETIRING - - val isParking: Boolean get() = state == WorkerState.PARKING - val isBlocking: Boolean get() = state == WorkerState.BLOCKING + @JvmField + var state = WorkerState.DORMANT /** - * Small state machine for termination. - * Followed states are allowed: - * [ALLOWED] -- worker can wake up and terminate itself - * [FORBIDDEN] -- worker is not allowed to terminate (because it was chosen by another thread to help) - * [TERMINATED] -- final state, thread is terminating and cannot be resurrected - * - * Allowed transitions: - * [ALLOWED] -> [FORBIDDEN] - * [ALLOWED] -> [TERMINATED] - * [FORBIDDEN] -> [ALLOWED] + * Worker control state responsible for worker claiming, parking and termination. + * List of states: + * [PARKED] -- worker is parked and can self-terminate after a termination deadline. + * [CLAIMED] -- worker is claimed by an external submitter. + * [TERMINATED] -- worker is terminated and no longer usable. */ - private val terminationState = atomic(ALLOWED) + val workerCtl = atomic(CLAIMED) /** - * It is set to the termination deadline when started doing [blockingWorkerIdle] and it reset + * It is set to the termination deadline when started doing [park] and it reset * when there is a task. It servers as protection against spurious wakeups of parkNanos. */ private var terminationDeadline = 0L @@ -659,128 +620,130 @@ internal class CoroutineScheduler( @Volatile var nextParkedWorker: Any? = NOT_IN_STACK - /** - * Tries to set [terminationState] to [FORBIDDEN], returns `false` if this attempt fails. - * This attempt may fail either because worker terminated itself or because someone else - * claimed this worker (though this case is rare, because require very bad timings) + /* + * The delay until at least one task in other worker queues will become stealable. */ - fun tryForbidTermination(): Boolean = - when (val state = terminationState.value) { - TERMINATED -> false // already terminated - FORBIDDEN -> false // already forbidden, someone else claimed this worker - ALLOWED -> terminationState.compareAndSet( - ALLOWED, - FORBIDDEN - ) - else -> error("Invalid terminationState = $state") - } + private var minDelayUntilStealableTaskNs = 0L + + private var rngState = Random.nextInt() /** * Tries to acquire CPU token if worker doesn't have one - * @return whether worker has CPU token + * @return whether worker acquired (or already had) CPU token */ - fun tryAcquireCpuPermit(): Boolean { - return when { - state == WorkerState.CPU_ACQUIRED -> true - cpuPermits.tryAcquire() -> { - state = WorkerState.CPU_ACQUIRED - true - } - else -> false + private fun tryAcquireCpuPermit(): Boolean = when { + state == WorkerState.CPU_ACQUIRED -> true + this@CoroutineScheduler.tryAcquireCpuPermit() -> { + state = WorkerState.CPU_ACQUIRED + true } + else -> false } /** - * Releases CPU token if worker has any and changes state to [newState] - * @return whether worker had CPU token + * Releases CPU token if worker has any and changes state to [newState]. + * Returns `true` if CPU permit was returned to the pool */ internal fun tryReleaseCpu(newState: WorkerState): Boolean { val previousState = state val hadCpu = previousState == WorkerState.CPU_ACQUIRED - if (hadCpu) cpuPermits.release() + if (hadCpu) releaseCpuPermit() if (previousState != newState) state = newState return hadCpu } - /** - * Time of the last call to [requestCpuWorker] due to missing tasks deadlines. - * Used as throttling mechanism to avoid unparking multiple threads when it's not necessary - */ - private var lastExhaustionTime = 0L - - @Volatile // Required for concurrent idleResetBeforeUnpark - private var spins = 0 // spins until MAX_SPINS, then yields until MAX_YIELDS - - // Note: it is concurrently reset by idleResetBeforeUnpark - private var parkTimeNs = MIN_PARK_TIME_NS + override fun run() = runWorker() + @JvmField + var mayHaveLocalTasks = false - private var rngState = random.nextInt() - private var lastStealIndex = 0 // try in order repeated, reset when unparked - - override fun run() { - var wasIdle = false // local variable to avoid extra idleReset invocations when tasks repeatedly arrive + private fun runWorker() { + var rescanned = false while (!isTerminated && state != WorkerState.TERMINATED) { - val task = findTask() - if (task == null) { - // Wait for a job with potential park - if (state == WorkerState.CPU_ACQUIRED) { - cpuWorkerIdle() - } else { - blockingWorkerIdle() - } - wasIdle = true + val task = findTask(mayHaveLocalTasks) + // Task found. Execute and repeat + if (task != null) { + rescanned = false + minDelayUntilStealableTaskNs = 0L + executeTask(task) + continue } else { - // Note: read task.mode before running the task, because Task object will be reused after run - val taskMode = task.mode - if (wasIdle) { - idleReset(taskMode) - wasIdle = false + mayHaveLocalTasks = false + } + /* + * No tasks were found: + * 1) Either at least one of the workers has stealable task in its FIFO-buffer with a stealing deadline. + * Then its deadline is stored in [minDelayUntilStealableTask] + * + * Then just park for that duration (ditto re-scanning). + * While it could potentially lead to short (up to WORK_STEALING_TIME_RESOLUTION_NS ns) starvations, + * excess unparks and managing "one unpark per signalling" invariant become unfeasible, instead we are going to resolve + * it with "spinning via scans" mechanism. + * NB: this short potential parking does not interfere with `tryUnpark` + */ + if (minDelayUntilStealableTaskNs != 0L) { + if (!rescanned) { + rescanned = true + } else { + rescanned = false + tryReleaseCpu(WorkerState.PARKING) + interrupted() + LockSupport.parkNanos(minDelayUntilStealableTaskNs) + minDelayUntilStealableTaskNs = 0L } - beforeTask(taskMode, task.submissionTime) - runSafely(task) - afterTask(taskMode) + continue } + /* + * 2) Or no tasks available, time to park and, potentially, shut down the thread. + * Add itself to the stack of parked workers, re-scans all the queues + * to avoid missing wake-up (requestCpuWorker) and either starts executing discovered tasks or parks itself awaiting for new tasks. + */ + tryPark() } tryReleaseCpu(WorkerState.TERMINATED) } - private fun beforeTask(taskMode: TaskMode, taskSubmissionTime: Long) { - if (taskMode != TaskMode.NON_BLOCKING) { - /* - * We should release CPU *before* checking for CPU starvation, - * otherwise requestCpuWorker() will not count current thread as blocking - */ - incrementBlockingWorkers() - if (tryReleaseCpu(WorkerState.BLOCKING)) { - requestCpuWorker() - } + // Counterpart to "tryUnpark" + private fun tryPark() { + if (!inStack()) { + parkedWorkersStackPush(this) return } - /* - * If we have idle CPU and the current worker is exhausted, wake up one more worker. - * Check last exhaustion time to avoid the race between steal and next task execution - */ - if (cpuPermits.availablePermits() == 0) { - return + assert { localQueue.size == 0 } + workerCtl.value = PARKED // Update value once + while (inStack()) { // Prevent spurious wakeups + if (isTerminated || state == WorkerState.TERMINATED) break + tryReleaseCpu(WorkerState.PARKING) + interrupted() // Cleanup interruptions + park() } - val now = schedulerTimeSource.nanoTime() - if (now - taskSubmissionTime >= WORK_STEALING_TIME_RESOLUTION_NS && - now - lastExhaustionTime >= WORK_STEALING_TIME_RESOLUTION_NS * 5 - ) { - lastExhaustionTime = now - requestCpuWorker() + } + + private fun inStack(): Boolean = nextParkedWorker !== NOT_IN_STACK + + private fun executeTask(task: Task) { + val taskMode = task.mode + idleReset(taskMode) + beforeTask(taskMode) + runSafely(task) + afterTask(taskMode) + } + + private fun beforeTask(taskMode: TaskMode) { + if (taskMode == TaskMode.NON_BLOCKING) return + // Always notify about new work when releasing CPU-permit to execute some blocking task + if (tryReleaseCpu(WorkerState.BLOCKING)) { + signalCpuWork() } } private fun afterTask(taskMode: TaskMode) { - if (taskMode != TaskMode.NON_BLOCKING) { - decrementBlockingWorkers() - val currentState = state - // Shutdown sequence of blocking dispatcher - if (currentState !== WorkerState.TERMINATED) { - assert { currentState == WorkerState.BLOCKING } // "Expected BLOCKING state, but has $currentState" - state = WorkerState.RETIRING - } + if (taskMode == TaskMode.NON_BLOCKING) return + decrementBlockingTasks() + val currentState = state + // Shutdown sequence of blocking dispatcher + if (currentState !== WorkerState.TERMINATED) { + assert { currentState == WorkerState.BLOCKING } // "Expected BLOCKING state, but has $currentState" + state = WorkerState.DORMANT } } @@ -789,47 +752,24 @@ internal class CoroutineScheduler( * ThreadLocalRandom cannot be used to support Android and ThreadLocal is up to 15% slower on Ktor benchmarks */ internal fun nextInt(upperBound: Int): Int { - rngState = rngState xor (rngState shl 13) - rngState = rngState xor (rngState shr 17) - rngState = rngState xor (rngState shl 5) + var r = rngState + r = r xor (r shl 13) + r = r xor (r shr 17) + r = r xor (r shl 5) + rngState = r val mask = upperBound - 1 // Fast path for power of two bound if (mask and upperBound == 0) { - return rngState and mask + return r and mask } - return (rngState and Int.MAX_VALUE) % upperBound + return (r and Int.MAX_VALUE) % upperBound } - private fun cpuWorkerIdle() { - /* - * Simple adaptive await of work: - * Spin on the volatile field with an empty loop in hope that new work will arrive, - * then start yielding to reduce CPU pressure, and finally start adaptive parking. - * - * The main idea is not to park while it's possible (otherwise throughput on asymmetric workloads suffers due to too frequent - * park/unpark calls and delays between job submission and thread queue checking) - */ - val spins = this.spins // volatile read - if (spins <= MAX_YIELDS) { - this.spins = spins + 1 // volatile write - if (spins >= MAX_SPINS) yield() - } else { - if (parkTimeNs < MAX_PARK_TIME_NS) { - parkTimeNs = (parkTimeNs * 3 ushr 1).coerceAtMost(MAX_PARK_TIME_NS) - } - tryReleaseCpu(WorkerState.PARKING) - doPark(parkTimeNs.toLong()) - } - } - - private fun blockingWorkerIdle() { - tryReleaseCpu(WorkerState.PARKING) - if (!blockingQuiescence()) return - terminationState.value = ALLOWED + private fun park() { // set termination deadline the first time we are here (it is reset in idleReset) if (terminationDeadline == 0L) terminationDeadline = System.nanoTime() + idleWorkerKeepAliveNs // actually park - if (!doPark(idleWorkerKeepAliveNs)) return + LockSupport.parkNanos(idleWorkerKeepAliveNs) // try terminate when we are idle past termination deadline // note that comparison is written like this to protect against potential nanoTime wraparound if (System.nanoTime() - terminationDeadline >= 0) { @@ -838,17 +778,6 @@ internal class CoroutineScheduler( } } - private fun doPark(nanos: Long): Boolean { - /* - * Here we are trying to park, then check whether there are new blocking tasks - * (because submitting thread could have missed this thread in tryUnpark) - */ - parkedWorkersStackPush(this) - if (!blockingQuiescence()) return false - LockSupport.parkNanos(nanos) - return true - } - /** * Stops execution of current thread and removes it from [createdWorkers]. */ @@ -858,13 +787,11 @@ internal class CoroutineScheduler( if (isTerminated) return // Someone else terminated, bail out if (createdWorkers <= corePoolSize) return - // Try to find blocking task before termination - if (!blockingQuiescence()) return /* * See tryUnpark for state reasoning. * If this CAS fails, then we were successfully unparked by other worker and cannot terminate. */ - if (!terminationState.compareAndSet(ALLOWED, TERMINATED)) return + if (!workerCtl.compareAndSet(PARKED, TERMINATED)) return /* * At this point this thread is no longer considered as usable for scheduling. * We need multi-step choreography to reindex workers. @@ -907,92 +834,88 @@ internal class CoroutineScheduler( state = WorkerState.TERMINATED } - /** - * Checks whether new blocking tasks arrived to the pool when worker decided - * it can go to deep park/termination and puts recently arrived task to its local queue. - * Returns `true` if there is no blocking tasks in the queue. - */ - private fun blockingQuiescence(): Boolean { - globalQueue.removeFirstWithModeOrNull(TaskMode.PROBABLY_BLOCKING)?.let { - localQueue.add(it, globalQueue) - return false - } - return true - } - // It is invoked by this worker when it finds a task private fun idleReset(mode: TaskMode) { terminationDeadline = 0L // reset deadline for termination - lastStealIndex = 0 // reset steal index (next time try random) if (state == WorkerState.PARKING) { assert { mode == TaskMode.PROBABLY_BLOCKING } state = WorkerState.BLOCKING - parkTimeNs = MIN_PARK_TIME_NS } - spins = 0 } - // It is invoked by other thread before this worker is unparked - fun idleResetBeforeUnpark() { - parkTimeNs = MIN_PARK_TIME_NS - spins = 0 // Volatile write, should be written last + fun findTask(scanLocalQueue: Boolean): Task? { + if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue) + // If we can't acquire a CPU permit -- attempt to find blocking task + val task = if (scanLocalQueue) { + localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull() + } else { + globalBlockingQueue.removeFirstOrNull() + } + return task ?: trySteal(blockingOnly = true) } - internal fun findTask(): Task? { - if (tryAcquireCpuPermit()) return findTaskWithCpuPermit() + private fun findAnyTask(scanLocalQueue: Boolean): Task? { /* - * If the local queue is empty, try to extract blocking task from global queue. - * It's helpful for two reasons: - * 1) We won't call excess park/unpark here and someone's else CPU token won't be transferred, - * which is a performance win - * 2) It helps with rare race when external submitter sends depending blocking tasks - * one by one and one of the requested workers may miss CPU token + * Anti-starvation mechanism: probabilistically poll either local + * or global queue to ensure progress for both external and internal tasks. */ - return localQueue.poll() ?: globalQueue.removeFirstWithModeOrNull(TaskMode.PROBABLY_BLOCKING) + if (scanLocalQueue) { + val globalFirst = nextInt(2 * corePoolSize) == 0 + if (globalFirst) pollGlobalQueues()?.let { return it } + localQueue.poll()?.let { return it } + if (!globalFirst) pollGlobalQueues()?.let { return it } + } else { + pollGlobalQueues()?.let { return it } + } + return trySteal(blockingOnly = false) } - private fun findTaskWithCpuPermit(): Task? { - /* - * Anti-starvation mechanism: if pool is overwhelmed by external work - * or local work is frequently offloaded, global queue polling will - * starve tasks from local queue. But if we never poll global queue, - * then local tasks may starve global queue, so poll global queue - * once per two core pool size iterations. - * Poll global queue only for non-blocking tasks as for blocking task a separate thread was woken up. - * If current thread is woken up, then its local queue is empty and it will poll global queue anyway, - * otherwise current thread may already have blocking task in its local queue. - */ - val globalFirst = nextInt(2 * corePoolSize) == 0 - if (globalFirst) globalQueue.removeFirstWithModeOrNull(TaskMode.NON_BLOCKING)?.let { return it } - localQueue.poll()?.let { return it } - if (!globalFirst) globalQueue.removeFirstOrNull()?.let { return it } - return trySteal() + private fun pollGlobalQueues(): Task? { + if (nextInt(2) == 0) { + globalCpuQueue.removeFirstOrNull()?.let { return it } + return globalBlockingQueue.removeFirstOrNull() + } else { + globalBlockingQueue.removeFirstOrNull()?.let { return it } + return globalCpuQueue.removeFirstOrNull() + } } - private fun trySteal(): Task? { + private fun trySteal(blockingOnly: Boolean): Task? { + assert { localQueue.size == 0 } val created = createdWorkers // 0 to await an initialization and 1 to avoid excess stealing on single-core machines - if (created < 2) return null - - // TODO to guarantee quiescence it's probably worth to do a full scan - var stealIndex = lastStealIndex - if (stealIndex == 0) stealIndex = nextInt(created) // start with random steal index - stealIndex++ // then go sequentially - if (stealIndex > created) stealIndex = 1 - lastStealIndex = stealIndex - val worker = workers[stealIndex] - if (worker !== null && worker !== this) { - if (localQueue.trySteal(worker.localQueue, globalQueue)) { - return localQueue.poll() + if (created < 2) { + return null + } + + var currentIndex = nextInt(created) + var minDelay = Long.MAX_VALUE + repeat(created) { + ++currentIndex + if (currentIndex > created) currentIndex = 1 + val worker = workers[currentIndex] + if (worker !== null && worker !== this) { + assert { localQueue.size == 0 } + val stealResult = if (blockingOnly) { + localQueue.tryStealBlockingFrom(victim = worker.localQueue) + } else { + localQueue.tryStealFrom(victim = worker.localQueue) + } + if (stealResult == TASK_STOLEN) { + return localQueue.poll() + } else if (stealResult > 0) { + minDelay = min(minDelay, stealResult) + } } } + minDelayUntilStealableTaskNs = if (minDelay != Long.MAX_VALUE) minDelay else 0 return null } } enum class WorkerState { /** - * Has CPU token and either executes [TaskMode.NON_BLOCKING] task or tries to steal one + * Has CPU token and either executes [TaskMode.NON_BLOCKING] task or tries to find one. */ CPU_ACQUIRED, @@ -1009,7 +932,7 @@ internal class CoroutineScheduler( /** * Tries to execute its local work and then goes to infinite sleep as no longer needed worker. */ - RETIRING, + DORMANT, /** * Terminal state, will no longer be used diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt b/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt index d7cb64ab2a..c0a3e6435d 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt @@ -19,11 +19,6 @@ internal val WORK_STEALING_TIME_RESOLUTION_NS = systemProp( "kotlinx.coroutines.scheduler.resolution.ns", 100000L ) -@JvmField -internal val QUEUE_SIZE_OFFLOAD_THRESHOLD = systemProp( - "kotlinx.coroutines.scheduler.offload.threshold", 96, maxValue = BUFFER_CAPACITY -) - @JvmField internal val BLOCKING_DEFAULT_PARALLELISM = systemProp( "kotlinx.coroutines.scheduler.blocking.parallelism", 16 @@ -50,7 +45,7 @@ internal val MAX_POOL_SIZE = systemProp( @JvmField internal val IDLE_WORKER_KEEP_ALIVE_NS = TimeUnit.SECONDS.toNanos( - systemProp("kotlinx.coroutines.scheduler.keep.alive.sec", 5L) + systemProp("kotlinx.coroutines.scheduler.keep.alive.sec", 60L) ) @JvmField @@ -87,9 +82,11 @@ internal abstract class Task( @JvmField var taskContext: TaskContext ) : Runnable { constructor() : this(0, NonBlockingContext) - val mode: TaskMode get() = taskContext.taskMode + inline val mode: TaskMode get() = taskContext.taskMode } +internal inline val Task.isBlocking get() = taskContext.taskMode == TaskMode.PROBABLY_BLOCKING + // Non-reusable Task implementation to wrap Runnable instances that do not otherwise implement task internal class TaskImpl( @JvmField val block: Runnable, @@ -109,10 +106,7 @@ internal class TaskImpl( } // Open for tests -internal open class GlobalQueue : LockFreeTaskQueue(singleConsumer = false) { - public fun removeFirstWithModeOrNull(mode: TaskMode): Task? = - removeFirstOrNullIf { it.mode == mode } -} +internal class GlobalQueue : LockFreeTaskQueue(singleConsumer = false) internal abstract class TimeSource { abstract fun nanoTime(): Long diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt b/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt index a9aa86d4b3..1a0603e413 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt @@ -12,6 +12,9 @@ internal const val BUFFER_CAPACITY_BASE = 7 internal const val BUFFER_CAPACITY = 1 shl BUFFER_CAPACITY_BASE internal const val MASK = BUFFER_CAPACITY - 1 // 128 by default +internal const val TASK_STOLEN = -1L +internal const val NOTHING_TO_STEAL = -2L + /** * Tightly coupled with [CoroutineScheduler] queue of pending tasks, but extracted to separate file for simplicity. * At any moment queue is used only by [CoroutineScheduler.Worker] threads, has only one producer (worker owning this queue) @@ -23,15 +26,13 @@ internal const val MASK = BUFFER_CAPACITY - 1 // 128 by default * that these two (current one and submitted) are communicating and sharing state thus making such communication extremely fast. * E.g. submitted jobs [1, 2, 3, 4] will be executed in [4, 1, 2, 3] order. * - * ### Work offloading - * - * When the queue is full, half of existing tasks are offloaded to global queue which is regularly polled by other pool workers. - * Offloading occurs in LIFO order for the sake of implementation simplicity: offloads should be extremely rare and occurs only in specific use-cases - * (e.g. when coroutine starts heavy fork-join-like computation), so fairness is not important. - * As an alternative, offloading directly to some [CoroutineScheduler.Worker] may be used, but then the strategy of selecting any idle worker - * should be implemented and implementation should be aware multiple producers. - * - * @suppress **This is unstable API and it is subject to change.** + * ### Algorithm and implementation details + * This is a regular SPMC bounded queue with the additional property that tasks can be removed from the middle of the queue + * (scheduler workers without a CPU permit steal blocking tasks via this mechanism). Such property enforces us to use CAS in + * order to properly claim value from the buffer. + * Moreover, [Task] objects are reusable, so it may seem that this queue is prone to ABA problem. + * Indeed it formally has ABA-problem, but the whole processing logic is written in the way that such ABA is harmless. + * I have discovered a truly marvelous proof of this, which this KDoc is too narrow to contain. */ internal class WorkQueue { @@ -49,154 +50,151 @@ internal class WorkQueue { * This is in general harmless because steal will be blocked by timer */ internal val bufferSize: Int get() = producerIndex.value - consumerIndex.value - - // TODO replace with inlined array when atomicfu will support it + internal val size: Int get() = if (lastScheduledTask.value != null) bufferSize + 1 else bufferSize private val buffer: AtomicReferenceArray = AtomicReferenceArray(BUFFER_CAPACITY) - private val lastScheduledTask = atomic(null) private val producerIndex = atomic(0) private val consumerIndex = atomic(0) + // Shortcut to avoid scanning queue without blocking tasks + private val blockingTasksInBuffer = atomic(0) /** * Retrieves and removes task from the head of the queue - * Invariant: this method is called only by the owner of the queue ([pollExternal] is not) + * Invariant: this method is called only by the owner of the queue. */ - fun poll(): Task? = - lastScheduledTask.getAndSet(null) ?: pollExternal() + fun poll(): Task? = lastScheduledTask.getAndSet(null) ?: pollBuffer() /** - * Invariant: this method is called only by the owner of the queue - * - * @param task task to put into local queue - * @param globalQueue fallback queue which is used when the local queue is overflown - * @return true if no offloading happened, false otherwise + * Invariant: Called only by the owner of the queue, returns + * `null` if task was added, task that wasn't added otherwise. */ - fun add(task: Task, globalQueue: GlobalQueue): Boolean { - val previous = lastScheduledTask.getAndSet(task) ?: return true - return addLast(previous, globalQueue) + fun add(task: Task, fair: Boolean = false): Task? { + if (fair) return addLast(task) + val previous = lastScheduledTask.getAndSet(task) ?: return null + return addLast(previous) } - // Called only by the owner, returns true if no offloading happened, false otherwise - fun addLast(task: Task, globalQueue: GlobalQueue): Boolean { - var noOffloadingHappened = true + /** + * Invariant: Called only by the owner of the queue, returns + * `null` if task was added, task that wasn't added otherwise. + */ + private fun addLast(task: Task): Task? { + if (task.isBlocking) blockingTasksInBuffer.incrementAndGet() + if (bufferSize == BUFFER_CAPACITY - 1) return task + val nextIndex = producerIndex.value and MASK /* - * We need the loop here because race possible not only on full queue, - * but also on queue with one element during stealing + * If current element is not null then we're racing with a really slow consumer that committed the consumer index, + * but hasn't yet nulled out the slot, effectively preventing us from using it. + * Such situations are very rare in practise (although possible) and we decided to give up a progress guarantee + * to have a stronger invariant "add to queue with bufferSize == 0 is always successful". + * This algorithm can still be wait-free for add, but if and only if tasks are not reusable, otherwise + * nulling out the buffer wouldn't be possible. */ - while (!tryAddLast(task)) { - offloadWork(globalQueue) - noOffloadingHappened = false + while (buffer[nextIndex] != null) { + Thread.yield() } - return noOffloadingHappened + buffer.lazySet(nextIndex, task) + producerIndex.incrementAndGet() + return null } /** - * Tries stealing from [victim] queue into this queue, using [globalQueue] to offload stolen tasks in case of current queue overflow. + * Tries stealing from [victim] queue into this queue. * - * @return whether any task was stolen + * Returns [NOTHING_TO_STEAL] if queue has nothing to steal, [TASK_STOLEN] if at least task was stolen + * or positive value of how many nanoseconds should pass until the head of this queue will be available to steal. */ - fun trySteal(victim: WorkQueue, globalQueue: GlobalQueue): Boolean { - val time = schedulerTimeSource.nanoTime() - val bufferSize = victim.bufferSize - if (bufferSize == 0) return tryStealLastScheduled(time, victim, globalQueue) - /* - * Invariant: time is monotonically increasing (thanks to nanoTime), so we can stop as soon as we find the first task not satisfying a predicate. - * If queue size is larger than QUEUE_SIZE_OFFLOAD_THRESHOLD then unconditionally steal tasks over this limit to prevent possible queue overflow - */ - var wasStolen = false - repeat(((bufferSize / 2).coerceAtLeast(1))) { - val task = victim.pollExternal { task -> - time - task.submissionTime >= WORK_STEALING_TIME_RESOLUTION_NS || victim.bufferSize > QUEUE_SIZE_OFFLOAD_THRESHOLD - } - ?: return wasStolen // non-local return from trySteal as we're done - wasStolen = true - add(task, globalQueue) + fun tryStealFrom(victim: WorkQueue): Long { + assert { bufferSize == 0 } + val task = victim.pollBuffer() + if (task != null) { + val notAdded = add(task) + assert { notAdded == null } + return TASK_STOLEN } - return wasStolen + return tryStealLastScheduled(victim, blockingOnly = false) } - private fun tryStealLastScheduled( - time: Long, - victim: WorkQueue, - globalQueue: GlobalQueue - ): Boolean { - val lastScheduled = victim.lastScheduledTask.value ?: return false - if (time - lastScheduled.submissionTime < WORK_STEALING_TIME_RESOLUTION_NS) { - return false + fun tryStealBlockingFrom(victim: WorkQueue): Long { + assert { bufferSize == 0 } + var start = victim.consumerIndex.value + val end = victim.producerIndex.value + val buffer = victim.buffer + + while (start != end) { + val index = start and MASK + if (victim.blockingTasksInBuffer.value == 0) break + val value = buffer[index] + if (value != null && value.isBlocking && buffer.compareAndSet(index, value, null)) { + victim.blockingTasksInBuffer.decrementAndGet() + add(value) + return TASK_STOLEN + } else { + ++start + } } + return tryStealLastScheduled(victim, blockingOnly = true) + } - if (victim.lastScheduledTask.compareAndSet(lastScheduled, null)) { - add(lastScheduled, globalQueue) - return true + fun offloadAllWorkTo(globalQueue: GlobalQueue) { + lastScheduledTask.getAndSet(null)?.let { globalQueue.addLast(it) } + while (pollTo(globalQueue)) { + // Steal everything } - return false } - internal fun size(): Int = if (lastScheduledTask.value != null) bufferSize + 1 else bufferSize - /** - * Offloads half of the current buffer to [globalQueue] + * Contract on return value is the same as for [tryStealFrom] */ - private fun offloadWork(globalQueue: GlobalQueue) { - repeat((bufferSize / 2).coerceAtLeast(1)) { - val task = pollExternal() ?: return - addToGlobalQueue(globalQueue, task) - } - } + private fun tryStealLastScheduled(victim: WorkQueue, blockingOnly: Boolean): Long { + while (true) { + val lastScheduled = victim.lastScheduledTask.value ?: return NOTHING_TO_STEAL + if (blockingOnly && !lastScheduled.isBlocking) return NOTHING_TO_STEAL + + // TODO time wraparound ? + val time = schedulerTimeSource.nanoTime() + val staleness = time - lastScheduled.submissionTime + if (staleness < WORK_STEALING_TIME_RESOLUTION_NS) { + return WORK_STEALING_TIME_RESOLUTION_NS - staleness + } - private fun addToGlobalQueue(globalQueue: GlobalQueue, task: Task) { - /* - * globalQueue is closed as the very last step in the shutdown sequence when all worker threads had - * been already shutdown (with the only exception of the last worker thread that might be performing - * shutdown procedure itself). As a consistency check we do a [cheap!] check that it is not closed here yet. - */ - check(globalQueue.addLast(task)) { "GlobalQueue could not be closed yet" } + /* + * If CAS has failed, either someone else had stolen this task or the owner executed this task + * and dispatched another one. In the latter case we should retry to avoid missing task. + */ + if (victim.lastScheduledTask.compareAndSet(lastScheduled, null)) { + add(lastScheduled) + return TASK_STOLEN + } + continue + } } - internal fun offloadAllWork(globalQueue: GlobalQueue) { - lastScheduledTask.getAndSet(null)?.let { addToGlobalQueue(globalQueue, it) } - while (true) { - addToGlobalQueue(globalQueue, pollExternal() ?: return) - } + private fun pollTo(queue: GlobalQueue): Boolean { + val task = pollBuffer() ?: return false + queue.addLast(task) + return true } - /** - * [poll] for external (not owning this queue) workers - */ - private inline fun pollExternal(predicate: (Task) -> Boolean = { true }): Task? { + private fun pollBuffer(): Task? { while (true) { val tailLocal = consumerIndex.value if (tailLocal - producerIndex.value == 0) return null val index = tailLocal and MASK - val element = buffer[index] ?: continue - if (!predicate(element)) { - return null - } if (consumerIndex.compareAndSet(tailLocal, tailLocal + 1)) { - // 1) Help GC 2) Signal producer that this slot is consumed and may be used - return buffer.getAndSet(index, null) + // Nulls are allowed when blocking tasks are stolen from the middle of the queue. + val value = buffer.getAndSet(index, null) ?: continue + value.decrementIfBlocking() + return value } } } - // Called only by the owner - private fun tryAddLast(task: Task): Boolean { - if (bufferSize == BUFFER_CAPACITY - 1) return false - val headLocal = producerIndex.value - val nextIndex = headLocal and MASK - - /* - * If current element is not null then we're racing with consumers for the tail. If we skip this check then - * the consumer can null out current element and it will be lost. If we're racing for tail then - * the queue is close to overflowing => it's fine to offload work to global queue - */ - if (buffer[nextIndex] != null) { - return false + private fun Task?.decrementIfBlocking() { + if (this != null && isBlocking) { + val value = blockingTasksInBuffer.decrementAndGet() + assert { value >= 0 } } - - buffer.lazySet(nextIndex, task) - producerIndex.incrementAndGet() - return true } } diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherRaceStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherLivenessStressTest.kt similarity index 77% rename from kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherRaceStressTest.kt rename to kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherLivenessStressTest.kt index 2b5a8968ac..7fc212f59f 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherRaceStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherLivenessStressTest.kt @@ -6,9 +6,15 @@ package kotlinx.coroutines.scheduling import kotlinx.coroutines.* import org.junit.* +import org.junit.Test import java.util.concurrent.atomic.* +import kotlin.test.* -class BlockingCoroutineDispatcherRaceStressTest : SchedulerTestBase() { +/** + * Test that ensures implementation correctness of [LimitingDispatcher] and + * designed to stress its particular implementation details. + */ +class BlockingCoroutineDispatcherLivenessStressTest : SchedulerTestBase() { private val concurrentWorkers = AtomicInteger(0) @Before @@ -27,36 +33,32 @@ class BlockingCoroutineDispatcherRaceStressTest : SchedulerTestBase() { async(limitingDispatcher) { try { val currentlyExecuting = concurrentWorkers.incrementAndGet() - require(currentlyExecuting == 1) + assertEquals(1, currentlyExecuting) } finally { concurrentWorkers.decrementAndGet() } } } - tasks.forEach { it.await() } } - - checkPoolThreadsCreated(2..4) } @Test fun testPingPongThreadsCount() = runBlocking { corePoolSize = CORES_COUNT val iterations = 100_000 * stressTestMultiplier - // Stress test for specific case (race #2 from LimitingDispatcher). Shouldn't hang. + val completed = AtomicInteger(0) for (i in 1..iterations) { val tasks = (1..2).map { async(dispatcher) { // Useless work concurrentWorkers.incrementAndGet() concurrentWorkers.decrementAndGet() + completed.incrementAndGet() } } - tasks.forEach { it.await() } } - - checkPoolThreadsCreated(CORES_COUNT) + assertEquals(2 * iterations, completed.get()) } } diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherMixedStealingStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherMixedStealingStressTest.kt new file mode 100644 index 0000000000..1fe0d8386d --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherMixedStealingStressTest.kt @@ -0,0 +1,80 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.scheduling + +import kotlinx.coroutines.* +import org.junit.* +import java.util.concurrent.* + +/** + * Specific test that was designed to expose inference between stealing/polling of blocking and non-blocking tasks.RunningThreadStackMergeTest + */ +class BlockingCoroutineDispatcherMixedStealingStressTest : SchedulerTestBase() { + + private val iterations = 10_000 + + @Before + fun setUp() { + idleWorkerKeepAliveNs = Long.MAX_VALUE + } + + @Test + fun testBlockingProgressPreventedInternal() { + val blocking = blockingDispatcher(corePoolSize).asExecutor() + val regular = dispatcher.asExecutor() + repeat(iterations * stressTestMultiplier) { + val cpuBlocker = CyclicBarrier(corePoolSize + 1) + val blockingBlocker = CyclicBarrier(2) + regular.execute(Runnable { + // Block all CPU cores except current one + repeat(corePoolSize - 1) { + regular.execute(Runnable { + cpuBlocker.await() + }) + } + + blocking.execute(Runnable { + blockingBlocker.await() + }) + + regular.execute(Runnable { + blockingBlocker.await() + cpuBlocker.await() + }) + }) + cpuBlocker.await() + } + } + + @Test + fun testBlockingProgressPreventedExternal() { + val blocking = blockingDispatcher(corePoolSize).asExecutor() + val regular = dispatcher.asExecutor() + repeat(iterations / 2 * stressTestMultiplier) { + val cpuBlocker = CyclicBarrier(corePoolSize + 1) + val blockingBlocker = CyclicBarrier(2) + repeat(corePoolSize) { + regular.execute(Runnable { + cpuBlocker.await() + }) + } + // Wait for all threads to park + while (true) { + val waiters = Thread.getAllStackTraces().keys.count { (it.state == Thread.State.TIMED_WAITING || it.state == Thread.State.WAITING) + && it is CoroutineScheduler.Worker } + if (waiters >= corePoolSize) break + Thread.yield() + } + blocking.execute(Runnable { + blockingBlocker.await() + }) + regular.execute(Runnable { + }) + + blockingBlocker.await() + cpuBlocker.await() + } + } +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingIOTerminationStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTerminationStressTest.kt similarity index 93% rename from kotlinx-coroutines-core/jvm/test/scheduling/BlockingIOTerminationStressTest.kt rename to kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTerminationStressTest.kt index de59a84a99..9c17e6988d 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingIOTerminationStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTerminationStressTest.kt @@ -9,7 +9,7 @@ import org.junit.* import java.util.* import java.util.concurrent.* -class BlockingIOTerminationStressTest : TestBase() { +class BlockingCoroutineDispatcherTerminationStressTest : TestBase() { private val baseDispatcher = ExperimentalCoroutineDispatcher( 2, 20, TimeUnit.MILLISECONDS.toNanos(10) diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt index ce5ed99983..66b93be9cf 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt @@ -6,11 +6,15 @@ package kotlinx.coroutines.scheduling import kotlinx.coroutines.* import org.junit.* +import org.junit.rules.* import java.util.concurrent.* class BlockingCoroutineDispatcherTest : SchedulerTestBase() { - @Test(timeout = 1_000) + @get:Rule + val timeout = Timeout.seconds(10L)!! + + @Test fun testNonBlockingWithBlockingExternal() = runBlocking { val barrier = CyclicBarrier(2) @@ -24,10 +28,10 @@ class BlockingCoroutineDispatcherTest : SchedulerTestBase() { nonBlockingJob.join() blockingJob.join() - checkPoolThreadsCreated(2) + checkPoolThreadsCreated(2..3) } - @Test(timeout = 10_000) + @Test fun testNonBlockingFromBlocking() = runBlocking { val barrier = CyclicBarrier(2) @@ -41,10 +45,10 @@ class BlockingCoroutineDispatcherTest : SchedulerTestBase() { } blocking.join() - checkPoolThreadsCreated(2) + checkPoolThreadsCreated(2..3) } - @Test(timeout = 1_000) + @Test fun testScheduleBlockingThreadCount() = runTest { // After first iteration pool is idle, repeat, no new threads should be created repeat(2) { @@ -59,7 +63,7 @@ class BlockingCoroutineDispatcherTest : SchedulerTestBase() { } } - @Test(timeout = 1_000) + @Test fun testNoCpuStarvation() = runBlocking { val tasksNum = 100 val barrier = CyclicBarrier(tasksNum + 1) @@ -73,10 +77,9 @@ class BlockingCoroutineDispatcherTest : SchedulerTestBase() { tasks.forEach { require(it.isActive) } barrier.await() tasks.joinAll() - checkPoolThreadsCreated(101) } - @Test(timeout = 1_000) + @Test fun testNoCpuStarvationWithMultipleBlockingContexts() = runBlocking { val firstBarrier = CyclicBarrier(11) val secondBarrier = CyclicBarrier(11) @@ -101,7 +104,7 @@ class BlockingCoroutineDispatcherTest : SchedulerTestBase() { checkPoolThreadsCreated(21..22) } - @Test(timeout = 1_000) + @Test fun testNoExcessThreadsCreated() = runBlocking { corePoolSize = 4 @@ -221,4 +224,4 @@ class BlockingCoroutineDispatcherTest : SchedulerTestBase() { fun testZeroParallelism() { blockingDispatcher(0) } -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherThreadLimitStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherThreadLimitStressTest.kt new file mode 100644 index 0000000000..123fe3c9c4 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherThreadLimitStressTest.kt @@ -0,0 +1,71 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.scheduling + +import kotlinx.coroutines.* +import org.junit.Ignore +import org.junit.Test +import java.util.concurrent.* +import java.util.concurrent.atomic.* +import kotlin.test.* + +class BlockingCoroutineDispatcherThreadLimitStressTest : SchedulerTestBase() { + + init { + corePoolSize = CORES_COUNT + } + + private val observedConcurrency = ConcurrentHashMap() + private val concurrentWorkers = AtomicInteger(0) + + @Test + @Ignore + fun testLimitParallelismToOne() = runTest { + val limitingDispatcher = blockingDispatcher(1) + // Do in bursts to avoid OOM + repeat(100 * stressTestMultiplierSqrt) { + val iterations = 1_000 * stressTestMultiplierSqrt + val tasks = (1..iterations).map { + async(limitingDispatcher) { + try { + val currentlyExecuting = concurrentWorkers.incrementAndGet() + observedConcurrency[currentlyExecuting] = true + assertTrue(currentlyExecuting <= CORES_COUNT) + } finally { + concurrentWorkers.decrementAndGet() + } + } + } + tasks.forEach { it.await() } + for (i in CORES_COUNT + 1..CORES_COUNT * 2) { + require(i !in observedConcurrency.keys) { "Unexpected state: $observedConcurrency" } + } + checkPoolThreadsCreated(0..CORES_COUNT + 1) + } + } + + @Test + @Ignore + fun testLimitParallelism() = runBlocking { + val limitingDispatcher = blockingDispatcher(CORES_COUNT) + val iterations = 50_000 * stressTestMultiplier + val tasks = (1..iterations).map { + async(limitingDispatcher) { + try { + val currentlyExecuting = concurrentWorkers.incrementAndGet() + observedConcurrency[currentlyExecuting] = true + assertTrue(currentlyExecuting <= CORES_COUNT) + } finally { + concurrentWorkers.decrementAndGet() + } + } + } + tasks.forEach { it.await() } + for (i in CORES_COUNT + 1..CORES_COUNT * 2) { + require(i !in observedConcurrency.keys) { "Unexpected state: $observedConcurrency" } + } + checkPoolThreadsCreated(CORES_COUNT..CORES_COUNT * 3) + } +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherWorkSignallingStressTest.kt similarity index 60% rename from kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherStressTest.kt rename to kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherWorkSignallingStressTest.kt index 08b4914c4c..3280527f2a 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherWorkSignallingStressTest.kt @@ -7,48 +7,15 @@ package kotlinx.coroutines.scheduling import kotlinx.coroutines.* -import org.junit.* +import org.junit.Test import java.util.concurrent.* -import java.util.concurrent.atomic.* +import kotlin.test.* -class BlockingCoroutineDispatcherStressTest : SchedulerTestBase() { - - init { - corePoolSize = CORES_COUNT - } - - private val observedConcurrency = ConcurrentHashMap() - private val concurrentWorkers = AtomicInteger(0) - - @Test - fun testLimitParallelism() = runBlocking { - val limitingDispatcher = blockingDispatcher(CORES_COUNT) - val iterations = 50_000 * stressTestMultiplier - val tasks = (1..iterations).map { - async(limitingDispatcher) { - try { - val currentlyExecuting = concurrentWorkers.incrementAndGet() - observedConcurrency[currentlyExecuting] = true - require(currentlyExecuting <= CORES_COUNT) - } finally { - concurrentWorkers.decrementAndGet() - } - } - } - - tasks.forEach { it.await() } - require(tasks.isNotEmpty()) - for (i in CORES_COUNT + 1..CORES_COUNT * 2) { - require(i !in observedConcurrency.keys) { "Unexpected state: $observedConcurrency" } - } - - checkPoolThreadsCreated(CORES_COUNT..CORES_COUNT + CORES_COUNT * 2) - } +class BlockingCoroutineDispatcherWorkSignallingStressTest : SchedulerTestBase() { @Test fun testCpuTasksStarvation() = runBlocking { val iterations = 1000 * stressTestMultiplier - repeat(iterations) { // Create a dispatcher every iteration to increase probability of race val dispatcher = ExperimentalCoroutineDispatcher(CORES_COUNT) @@ -63,28 +30,36 @@ class BlockingCoroutineDispatcherStressTest : SchedulerTestBase() { repeat(CORES_COUNT) { async(dispatcher) { // These two will be stolen first - blockingTasks += async(blockingDispatcher) { blockingBarrier.await() } - blockingTasks += async(blockingDispatcher) { blockingBarrier.await() } - - - // Empty on CPU job which should be executed while blocked tasks are hang - cpuTasks += async(dispatcher) { cpuBarrier.await() } - + blockingTasks += blockingAwait(blockingDispatcher, blockingBarrier) + blockingTasks += blockingAwait(blockingDispatcher, blockingBarrier) + // Empty on CPU job which should be executed while blocked tasks are waiting + cpuTasks += cpuAwait(dispatcher, cpuBarrier) // Block with next task. Block cores * 3 threads in total - blockingTasks += async(blockingDispatcher) { blockingBarrier.await() } + blockingTasks += blockingAwait(blockingDispatcher, blockingBarrier) } } cpuTasks.forEach { require(it.isActive) } cpuBarrier.await() - cpuTasks.forEach { it.await() } + cpuTasks.awaitAll() blockingTasks.forEach { require(it.isActive) } blockingBarrier.await() - blockingTasks.forEach { it.await() } + blockingTasks.awaitAll() dispatcher.close() } } + private fun CoroutineScope.blockingAwait( + blockingDispatcher: CoroutineDispatcher, + blockingBarrier: CyclicBarrier + ) = async(blockingDispatcher) { blockingBarrier.await() } + + + private fun CoroutineScope.cpuAwait( + blockingDispatcher: CoroutineDispatcher, + blockingBarrier: CyclicBarrier + ) = async(blockingDispatcher) { blockingBarrier.await() } + @Test fun testBlockingTasksStarvation() = runBlocking { corePoolSize = 2 // Easier to reproduce race with unparks @@ -96,8 +71,7 @@ class BlockingCoroutineDispatcherStressTest : SchedulerTestBase() { val barrier = CyclicBarrier(blockingLimit + 1) // Should eat all limit * 3 cpu without any starvation val tasks = (1..blockingLimit).map { async(blocking) { barrier.await() } } - - tasks.forEach { require(it.isActive) } + tasks.forEach { assertTrue(it.isActive) } barrier.await() tasks.joinAll() } @@ -112,12 +86,10 @@ class BlockingCoroutineDispatcherStressTest : SchedulerTestBase() { repeat(iterations) { // Overwhelm global queue with external CPU tasks val cpuTasks = (1..CORES_COUNT).map { async(dispatcher) { while (true) delay(1) } } - val barrier = CyclicBarrier(blockingLimit + 1) // Should eat all limit * 3 cpu without any starvation val tasks = (1..blockingLimit).map { async(blocking) { barrier.await() } } - - tasks.forEach { require(it.isActive) } + tasks.forEach { assertTrue(it.isActive) } barrier.await() tasks.joinAll() cpuTasks.forEach { it.cancelAndJoin() } diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineDispatcherTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineDispatcherTest.kt index da6ef2051f..062b849c0a 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineDispatcherTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineDispatcherTest.kt @@ -19,6 +19,7 @@ class CoroutineDispatcherTest : SchedulerTestBase() { @Test fun testSingleThread() = runBlocking { + corePoolSize = 1 expect(1) withContext(dispatcher) { require(Thread.currentThread() is CoroutineScheduler.Worker) @@ -41,14 +42,12 @@ class CoroutineDispatcherTest : SchedulerTestBase() { fun testFairScheduling() = runBlocking { corePoolSize = 1 expect(1) - val outerJob = launch(dispatcher) { val d1 = launch(dispatcher) { expect(3) } val d2 = launch(dispatcher) { expect(4) } val d3 = launch(dispatcher) { expect(2) } listOf(d1, d2, d3).joinAll() } - outerJob.join() finish(5) } @@ -57,13 +56,12 @@ class CoroutineDispatcherTest : SchedulerTestBase() { fun testStealing() = runBlocking { corePoolSize = 2 val flag = AtomicBoolean(false) - val job = async(context = dispatcher) { + val job = async(dispatcher) { expect(1) val innerJob = async { expect(2) flag.set(true) } - while (!flag.get()) { Thread.yield() // Block current thread, submitted inner job will be stolen } @@ -71,33 +69,11 @@ class CoroutineDispatcherTest : SchedulerTestBase() { innerJob.await() expect(3) } - job.await() finish(4) checkPoolThreadsCreated(2) } - @Test - fun testNoStealing() = runBlocking { - corePoolSize = CORES_COUNT - schedulerTimeSource = TestTimeSource(0L) - withContext(dispatcher) { - val thread = Thread.currentThread() - val job = async(dispatcher) { - assertEquals(thread, Thread.currentThread()) - val innerJob = async(dispatcher) { - assertEquals(thread, Thread.currentThread()) - } - innerJob.await() - } - - job.await() - assertEquals(thread, Thread.currentThread()) - } - - checkPoolThreadsCreated(1..2) - } - @Test fun testDelay() = runBlocking { corePoolSize = 2 @@ -106,37 +82,10 @@ class CoroutineDispatcherTest : SchedulerTestBase() { delay(10) expect(2) } - finish(3) checkPoolThreadsCreated(2) } - @Test - fun testWithTimeout() = runBlocking { - corePoolSize = CORES_COUNT - withContext(dispatcher) { - expect(1) - val result = withTimeoutOrNull(1000) { - expect(2) - yield() // yield only now - "OK" - } - assertEquals("OK", result) - - val nullResult = withTimeoutOrNull(1000) { - expect(3) - while (true) { - yield() - } - } - - assertNull(nullResult) - finish(4) - } - - checkPoolThreadsCreated(1..CORES_COUNT) - } - @Test fun testMaxSize() = runBlocking { corePoolSize = 1 @@ -164,7 +113,6 @@ class CoroutineDispatcherTest : SchedulerTestBase() { expect(4) innerJob.join() } - outerJob.join() finish(5) } @@ -183,6 +131,5 @@ class CoroutineDispatcherTest : SchedulerTestBase() { .count { it is CoroutineScheduler.Worker && it.name.contains("SomeTestName") } assertEquals(1, count) } - } } diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerLivenessStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerLivenessStressTest.kt new file mode 100644 index 0000000000..b7677bef29 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerLivenessStressTest.kt @@ -0,0 +1,52 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.scheduling + +import kotlinx.coroutines.* +import kotlinx.coroutines.scheduling.CoroutineScheduler.Companion.MAX_SUPPORTED_POOL_SIZE +import org.junit.* +import java.util.concurrent.* + +class CoroutineSchedulerLivenessStressTest : TestBase() { + private val scheduler = lazy { CoroutineScheduler(CORE_POOL_SIZE, MAX_SUPPORTED_POOL_SIZE, Long.MAX_VALUE) } + private val iterations = 1000 * stressTestMultiplier + + @After + fun tearDown() { + if (scheduler.isInitialized()) { + scheduler.value.close() + } + } + + @Test + fun testInternalSubmissions() { + Assume.assumeTrue(CORE_POOL_SIZE >= 2) + repeat(iterations) { + val barrier = CyclicBarrier(CORE_POOL_SIZE + 1) + scheduler.value.execute { + repeat(CORE_POOL_SIZE) { + scheduler.value.execute { + barrier.await() + } + } + } + barrier.await() + } + } + + @Test + fun testExternalSubmissions() { + Assume.assumeTrue(CORE_POOL_SIZE >= 2) + repeat(iterations) { + val barrier = CyclicBarrier(CORE_POOL_SIZE + 1) + repeat(CORE_POOL_SIZE) { + scheduler.value.execute { + barrier.await() + } + } + barrier.await() + } + } +} diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerShrinkTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerShrinkTest.kt deleted file mode 100644 index 50090b533e..0000000000 --- a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerShrinkTest.kt +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.coroutines.scheduling - -import kotlinx.coroutines.* -import org.junit.* -import java.util.concurrent.* -import kotlin.coroutines.* - -@Ignore // these tests are too unstable on Windows, should be virtualized -class CoroutineSchedulerShrinkTest : SchedulerTestBase() { - - private val blockingTasksCount = CORES_COUNT * 3 - private val blockingTasksBarrier = CyclicBarrier(blockingTasksCount + 1) - lateinit var blocking: CoroutineContext - - @Before - fun setUp() { - corePoolSize = CORES_COUNT - // shutdown after 100ms - idleWorkerKeepAliveNs = TimeUnit.MILLISECONDS.toNanos(100) - blocking = blockingDispatcher(100) - } - - @Test(timeout = 10_000) - fun testShrinkOnlyBlockingTasks() = runBlocking { - // Init dispatcher - async(dispatcher) { }.await() - // Pool is initialized with core size in the beginning - checkPoolThreadsExist(1..2) - - // Run blocking tasks and check increased threads count - val blockingTasks = launchBlocking() - checkBlockingTasks(blockingTasks) - - delay(2000) - // Pool should shrink to core size +- eps - checkPoolThreadsExist(CORES_COUNT..CORES_COUNT + 3) - } - - @Test(timeout = 10_000) - fun testShrinkMixedWithWorkload() = runBlocking { - // Block blockingTasksCount cores in blocking dispatcher - val blockingTasks = launchBlocking() - - // Block cores count CPU threads - val nonBlockingBarrier = CyclicBarrier(CORES_COUNT + 1) - val nonBlockingTasks = (1..CORES_COUNT).map { - async(dispatcher) { - nonBlockingBarrier.await() - } - } - - // Check CPU tasks succeeded properly even though blocking tasks acquired everything - nonBlockingTasks.forEach { require(it.isActive) } - nonBlockingBarrier.await() - nonBlockingTasks.joinAll() - - // Check blocking tasks succeeded properly - checkBlockingTasks(blockingTasks) - - delay(2000) - // Pool should shrink to core size - checkPoolThreadsExist(CORES_COUNT..CORES_COUNT + 3) - } - - private suspend fun checkBlockingTasks(blockingTasks: List>) { - checkPoolThreadsExist(blockingTasksCount..corePoolSize + blockingTasksCount) - blockingTasksBarrier.await() - blockingTasks.joinAll() - } - - @Test(timeout = 10_000) - fun testShrinkWithExternalTasks() = runBlocking { - val nonBlockingBarrier = CyclicBarrier(CORES_COUNT + 1) - val blockingTasks = launchBlocking() - - val nonBlockingTasks = (1..CORES_COUNT).map { - async(dispatcher) { - nonBlockingBarrier.await() - } - } - - // Tasks that burn CPU. Delay is important so tasks will be scheduled from external thread - val busySpinTasks = (1..2).map { - async(dispatcher) { - while (true) { - yield() - } - } - } - - nonBlockingTasks.forEach { require(it.isActive) } - nonBlockingBarrier.await() - nonBlockingTasks.joinAll() - - checkBlockingTasks(blockingTasks) - - delay(2000) - // Pool should shrink almost to core size (+/- eps) - checkPoolThreadsExist(CORES_COUNT..CORES_COUNT + 3) - - busySpinTasks.forEach { - require(it.isActive) - it.cancelAndJoin() - } - } - - private suspend fun launchBlocking(): List> { - val result = (1..blockingTasksCount).map { - GlobalScope.async(blocking) { - blockingTasksBarrier.await() - } - } - - while (blockingTasksBarrier.numberWaiting != blockingTasksCount) { - delay(1) - } - - return result - } -} diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerStressTest.kt index 683a889efa..cb49f054ce 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerStressTest.kt @@ -4,11 +4,10 @@ package kotlinx.coroutines.scheduling +import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.internal.* -import kotlinx.coroutines.scheduling.SchedulerTestBase.Companion.checkPoolThreadsCreated import org.junit.* -import org.junit.Ignore import org.junit.Test import java.util.concurrent.* import java.util.concurrent.atomic.* @@ -37,19 +36,27 @@ class CoroutineSchedulerStressTest : TestBase() { } @Test - @Suppress("DEPRECATION") - @Ignore // this test often fails on windows, todo: figure out how to fix it. See issue #904 - fun testExternalTasksSubmission() { - stressTest(CommonPool) - } + fun testInternalTasksSubmissionProgress() { + /* + * Run a lot of tasks and validate that + * 1) All of them are completed successfully + * 2) Every thread executed task at least once + */ + dispatcher.dispatch(EmptyCoroutineContext, Runnable { + for (i in 1..tasksNum) { + dispatcher.dispatch(EmptyCoroutineContext, ValidatingRunnable()) + } + }) - @Test - fun testInternalTasksSubmission() { - stressTest(dispatcher) + finishLatch.await() + val observed = observedThreads.size + // on slow machines not all threads can be observed + assertTrue(observed in (AVAILABLE_PROCESSORS - 1)..(AVAILABLE_PROCESSORS + 1), "Observed $observed threads with $AVAILABLE_PROCESSORS available processors") + validateResults() } @Test - fun testStealingFromBlocking() { + fun testStealingFromNonProgressing() { /* * Work-stealing stress test, * one thread submits pack of tasks, waits until they are completed (to avoid work offloading) @@ -63,50 +70,24 @@ class CoroutineSchedulerStressTest : TestBase() { while (submittedTasks < tasksNum) { ++submittedTasks - dispatcher.dispatch(EmptyCoroutineContext, Runnable { - processTask() - }) - + dispatcher.dispatch(EmptyCoroutineContext, ValidatingRunnable()) while (submittedTasks - processed.get() > 100) { Thread.yield() } } - // Block current thread finishLatch.await() }) finishLatch.await() - require(!observedThreads.containsKey(blockingThread!!)) - validateResults() - } - - private fun stressTest(submissionInitiator: CoroutineDispatcher) { - /* - * Run 2 million tasks and validate that - * 1) All of them are completed successfully - * 2) Every thread executed task at least once - */ - submissionInitiator.dispatch(EmptyCoroutineContext, Runnable { - for (i in 1..tasksNum) { - dispatcher.dispatch(EmptyCoroutineContext, Runnable { - processTask() - }) - } - }) - - finishLatch.await() - val observed = observedThreads.size - // on slow machines not all threads can be observed - assertTrue(observed in (AVAILABLE_PROCESSORS - 1)..(AVAILABLE_PROCESSORS + 1), "Observed $observed threads with $AVAILABLE_PROCESSORS available processors") + assertFalse(observedThreads.containsKey(blockingThread!!)) validateResults() } private fun processTask() { val counter = observedThreads[Thread.currentThread()] ?: 0L observedThreads[Thread.currentThread()] = counter + 1 - if (processed.incrementAndGet() == tasksNum) { finishLatch.countDown() } @@ -115,6 +96,13 @@ class CoroutineSchedulerStressTest : TestBase() { private fun validateResults() { val result = observedThreads.values.sum() assertEquals(tasksNum.toLong(), result) - checkPoolThreadsCreated(AVAILABLE_PROCESSORS) + } + + private inner class ValidatingRunnable : Runnable { + private val invoked = atomic(false) + override fun run() { + if (!invoked.compareAndSet(false, true)) error("The same runnable was invoked twice") + processTask() + } } } diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerTest.kt index 780ec1b95b..ff831950b5 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerTest.kt @@ -4,12 +4,12 @@ package kotlinx.coroutines.scheduling -import kotlinx.coroutines.TestBase +import kotlinx.coroutines.* import org.junit.Test import java.lang.Runnable import java.util.concurrent.* -import java.util.concurrent.CountDownLatch import kotlin.coroutines.* +import kotlin.test.* class CoroutineSchedulerTest : TestBase() { @@ -128,6 +128,29 @@ class CoroutineSchedulerTest : TestBase() { latch.await() } + @Test + fun testInterruptionCleanup() { + ExperimentalCoroutineDispatcher(1, 1).use { + val executor = it.executor + var latch = CountDownLatch(1) + executor.execute { + Thread.currentThread().interrupt() + latch.countDown() + } + latch.await() + Thread.sleep(100) // I am really sorry + latch = CountDownLatch(1) + executor.execute { + try { + assertFalse(Thread.currentThread().isInterrupted) + } finally { + latch.countDown() + } + } + latch.await() + } + } + private fun testUniformDistribution(worker: CoroutineScheduler.Worker, bound: Int) { val result = IntArray(bound) val iterations = 10_000_000 diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt b/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt index e01f027f55..bfabf5b2f3 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt @@ -11,6 +11,7 @@ import kotlinx.coroutines.* import kotlinx.coroutines.internal.* import org.junit.* import kotlin.coroutines.* +import kotlin.test.* abstract class SchedulerTestBase : TestBase() { companion object { @@ -22,17 +23,20 @@ abstract class SchedulerTestBase : TestBase() { */ fun checkPoolThreadsCreated(expectedThreadsCount: Int = CORES_COUNT) { val threadsCount = maxSequenceNumber()!! - require(threadsCount == expectedThreadsCount) - { "Expected $expectedThreadsCount pool threads, but has $threadsCount" } + assertEquals(expectedThreadsCount, threadsCount, "Expected $expectedThreadsCount pool threads, but has $threadsCount") } /** * Asserts that any number of pool worker threads in [range] were created. * Note that 'created' doesn't mean 'exists' because pool supports dynamic shrinking */ - fun checkPoolThreadsCreated(range: IntRange) { + fun checkPoolThreadsCreated(range: IntRange, base: Int = CORES_COUNT) { val maxSequenceNumber = maxSequenceNumber()!! - require(maxSequenceNumber in range) { "Expected pool threads to be in interval $range, but has $maxSequenceNumber" } + val r = (range.first)..(range.last + base) + assertTrue( + maxSequenceNumber in r, + "Expected pool threads to be in interval $r, but has $maxSequenceNumber" + ) } /** @@ -40,7 +44,7 @@ abstract class SchedulerTestBase : TestBase() { */ fun checkPoolThreadsExist(range: IntRange) { val threads = Thread.getAllStackTraces().keys.asSequence().filter { it is CoroutineScheduler.Worker }.count() - require(threads in range) { "Expected threads in $range interval, but has $threads" } + assertTrue(threads in range, "Expected threads in $range interval, but has $threads") } private fun maxSequenceNumber(): Int? { @@ -61,15 +65,12 @@ abstract class SchedulerTestBase : TestBase() { suspend fun Iterable.joinAll() = forEach { it.join() } } - private val exception = atomic(null) - private val handler = CoroutineExceptionHandler { _, e -> exception.value = e } - - protected var corePoolSize = 1 + protected var corePoolSize = CORES_COUNT protected var maxPoolSize = 1024 protected var idleWorkerKeepAliveNs = IDLE_WORKER_KEEP_ALIVE_NS private var _dispatcher: ExperimentalCoroutineDispatcher? = null - protected val dispatcher: CoroutineContext + protected val dispatcher: CoroutineDispatcher get() { if (_dispatcher == null) { _dispatcher = ExperimentalCoroutineDispatcher( @@ -79,21 +80,21 @@ abstract class SchedulerTestBase : TestBase() { ) } - return _dispatcher!! + handler + return _dispatcher!! } protected var blockingDispatcher = lazy { blockingDispatcher(1000) } - protected fun blockingDispatcher(parallelism: Int): CoroutineContext { + protected fun blockingDispatcher(parallelism: Int): CoroutineDispatcher { val intitialize = dispatcher - return _dispatcher!!.blocking(parallelism) + handler + return _dispatcher!!.blocking(parallelism) } - protected fun view(parallelism: Int): CoroutineContext { + protected fun view(parallelism: Int): CoroutineDispatcher { val intitialize = dispatcher - return _dispatcher!!.limited(parallelism) + handler + return _dispatcher!!.limited(parallelism) } @After @@ -103,6 +104,5 @@ abstract class SchedulerTestBase : TestBase() { _dispatcher?.close() } } - exception.value?.let { throw it } } } \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueStressTest.kt index 41adddce7c..5e170c9f6b 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueStressTest.kt @@ -7,7 +7,6 @@ package kotlinx.coroutines.scheduling import kotlinx.coroutines.* import org.junit.* import org.junit.Test -import java.util.* import java.util.concurrent.* import kotlin.concurrent.* import kotlin.test.* @@ -17,8 +16,8 @@ class WorkQueueStressTest : TestBase() { private val threads = mutableListOf() private val offerIterations = 100_000 * stressTestMultiplierSqrt // memory pressure, not CPU time private val stealersCount = 6 - private val stolenTasks = Array(stealersCount) { Queue() } - private val globalQueue = Queue() // only producer will use it + private val stolenTasks = Array(stealersCount) { GlobalQueue() } + private val globalQueue = GlobalQueue() // only producer will use it private val producerQueue = WorkQueue() @Volatile @@ -45,7 +44,7 @@ class WorkQueueStressTest : TestBase() { Thread.yield() } - producerQueue.add(task(i.toLong()), globalQueue) + producerQueue.add(task(i.toLong()))?.let { globalQueue.addLast(it) } } producerFinished = true @@ -55,12 +54,14 @@ class WorkQueueStressTest : TestBase() { threads += thread(name = "stealer $i") { val myQueue = WorkQueue() startLatch.await() - while (!producerFinished || producerQueue.size() != 0) { - myQueue.trySteal(producerQueue, stolenTasks[i]) + while (!producerFinished || producerQueue.size != 0) { + stolenTasks[i].addAll(myQueue.drain().map { task(it) }) + myQueue.tryStealFrom(victim = producerQueue) } // Drain last element which is not counted in buffer - myQueue.trySteal(producerQueue, stolenTasks[i]) + stolenTasks[i].addAll(myQueue.drain().map { task(it) }) + myQueue.tryStealFrom(producerQueue) stolenTasks[i].addAll(myQueue.drain().map { task(it) }) } } @@ -73,7 +74,6 @@ class WorkQueueStressTest : TestBase() { @Test fun testSingleProducerSingleStealer() { val startLatch = CountDownLatch(1) - val fakeQueue = Queue() threads += thread(name = "producer") { startLatch.await() for (i in 1..offerIterations) { @@ -82,16 +82,16 @@ class WorkQueueStressTest : TestBase() { } // No offloading to global queue here - producerQueue.add(task(i.toLong()), fakeQueue) + producerQueue.add(task(i.toLong())) } } - val stolen = Queue() + val stolen = GlobalQueue() threads += thread(name = "stealer") { val myQueue = WorkQueue() startLatch.await() while (stolen.size != offerIterations) { - if (!myQueue.trySteal(producerQueue, stolen)) { + if (myQueue.tryStealFrom(producerQueue) != NOTHING_TO_STEAL) { stolen.addAll(myQueue.drain().map { task(it) }) } } @@ -114,10 +114,8 @@ class WorkQueueStressTest : TestBase() { val expected = (1L..offerIterations).toSet() assertEquals(expected, result, "Following elements are missing: ${(expected - result)}") } -} -internal class Queue : GlobalQueue() { - fun addAll(tasks: Collection) { + private fun GlobalQueue.addAll(tasks: Collection) { tasks.forEach { addLast(it) } } } diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt index 0c55b184d1..7acd1620f4 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt @@ -26,110 +26,49 @@ class WorkQueueTest : TestBase() { @Test fun testLastScheduledComesFirst() { val queue = WorkQueue() - val globalQueue = GlobalQueue() - (1L..4L).forEach { queue.add(task(it), globalQueue) } + (1L..4L).forEach { queue.add(task(it)) } assertEquals(listOf(4L, 1L, 2L, 3L), queue.drain()) } @Test - fun testWorkOffload() { + fun testAddWithOffload() { val queue = WorkQueue() - val globalQueue = GlobalQueue() - (1L..130L).forEach { queue.add(task(it), globalQueue) } - - val expectedLocalResults = (64L..129L).toMutableList() - expectedLocalResults.add(0, 130L) - assertEquals(expectedLocalResults, queue.drain()) - assertEquals((1L..63L).toList(), globalQueue.asTimeList()) + val size = 130L + val offload = GlobalQueue() + (0 until size).forEach { queue.add(task(it))?.let { t -> offload.addLast(t) } } + + val expectedResult = listOf(129L) + (0L..126L).toList() + val actualResult = queue.drain() + assertEquals(expectedResult, actualResult) + assertEquals((0L until size).toSet().minus(expectedResult), offload.drain().toSet()) } @Test fun testWorkOffloadPrecision() { val queue = WorkQueue() val globalQueue = GlobalQueue() - repeat(128) { require(queue.add(task(0), globalQueue)) } - require(globalQueue.isEmpty) - require(!queue.add(task(0), globalQueue)) - require(globalQueue.size == 63) - } - - @Test - fun testTimelyStealing() { - val victim = WorkQueue() - val globalQueue = GlobalQueue() - - (1L..96L).forEach { victim.add(task(it), globalQueue) } - - timeSource.step() - timeSource.step(2) - - val stealer = WorkQueue() - require(stealer.trySteal(victim, globalQueue)) - assertEquals(arrayListOf(2L, 1L), stealer.drain()) - - require(!stealer.trySteal(victim, globalQueue)) - assertEquals(emptyList(), stealer.drain()) - - timeSource.step(3) - require(stealer.trySteal(victim, globalQueue)) - assertEquals(arrayListOf(5L, 3L, 4L), stealer.drain()) - require(globalQueue.isEmpty) - assertEquals((6L..96L).toSet(), victim.drain().toSet()) - } - - @Test - fun testStealingBySize() { - val victim = WorkQueue() - val globalQueue = GlobalQueue() - - (1L..110L).forEach { victim.add(task(it), globalQueue) } - val stealer = WorkQueue() - require(stealer.trySteal(victim, globalQueue)) - assertEquals((1L..13L).toSet(), stealer.drain().toSet()) - - require(!stealer.trySteal(victim, globalQueue)) - require(stealer.drain().isEmpty()) - - - timeSource.step() - timeSource.step(13) - require(!stealer.trySteal(victim, globalQueue)) - require(stealer.drain().isEmpty()) - - timeSource.step(1) - require(stealer.trySteal(victim, globalQueue)) - assertEquals(arrayListOf(14L), stealer.drain()) - + repeat(128) { assertNull(queue.add(task(it.toLong()))) } + assertTrue(globalQueue.isEmpty) + assertEquals(127L, queue.add(task(0))?.submissionTime) } @Test fun testStealingFromHead() { val victim = WorkQueue() - val globalQueue = GlobalQueue() - (1L..2L).forEach { victim.add(task(it), globalQueue) } + victim.add(task(1L)) + victim.add(task(2L)) timeSource.step() timeSource.step(3) val stealer = WorkQueue() - require(stealer.trySteal(victim, globalQueue)) + assertEquals(TASK_STOLEN, stealer.tryStealFrom(victim)) assertEquals(arrayListOf(1L), stealer.drain()) - require(stealer.trySteal(victim, globalQueue)) + assertEquals(TASK_STOLEN, stealer.tryStealFrom(victim)) assertEquals(arrayListOf(2L), stealer.drain()) } } -internal fun GlobalQueue.asTimeList(): List { - val result = mutableListOf() - var next = removeFirstOrNull() - while (next != null) { - result += next.submissionTime - next = removeFirstOrNull() - } - - return result -} - internal fun task(n: Long) = TaskImpl(Runnable {}, n, NonBlockingContext) internal fun WorkQueue.drain(): List { @@ -139,6 +78,15 @@ internal fun WorkQueue.drain(): List { result += task.submissionTime task = poll() } + return result +} +internal fun GlobalQueue.drain(): List { + var task: Task? = removeFirstOrNull() + val result = arrayListOf() + while (task != null) { + result += task.submissionTime + task = removeFirstOrNull() + } return result }