Skip to content

Commit dd30af0

Browse files
committed
Improve performance of task acquisition
* Do not push worker to the stack during second pass of "min duration scanning" * Locally cache whether local queue has any work to execute to save atomic getAndSet and a bunch of atomic loads * More precise rendezvous for parking * Long-scanning stealing to emulate spinning
1 parent a5fa571 commit dd30af0

File tree

7 files changed

+51
-37
lines changed

7 files changed

+51
-37
lines changed

benchmarks/src/jmh/kotlin/benchmarks/scheduler/DispatchersContextSwitchBenchmark.kt

+2-5
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,10 @@ open class DispatchersContextSwitchBenchmark {
6464
repeat(nCoroutines) {
6565
launch(dispatcher) {
6666
repeat(nRepeatDelay) {
67-
delayOrYield()
67+
delay(delayTimeMs)
6868
}
6969
}
7070
}
7171
}
72+
}
7273

73-
private suspend fun delayOrYield() {
74-
delay(delayTimeMs)
75-
}
76-
}

benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/CycledActorsBenchmark.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ import java.util.concurrent.*
2929
* CycledActorsBenchmark.cycledActors 262144 experimental avgt 14 1804.146 ± 57.275 ms/op
3030
*/
3131
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
32-
@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
33-
@Fork(value = 3)
32+
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
33+
@Fork(value = 1)
3434
@BenchmarkMode(Mode.AverageTime)
3535
@OutputTimeUnit(TimeUnit.MILLISECONDS)
3636
@State(Scope.Benchmark)
@@ -43,7 +43,7 @@ open class CycledActorsBenchmark : ParametrizedDispatcherBase() {
4343
@Param("fjp", "ftp_1", "scheduler")
4444
override var dispatcher: String = "fjp"
4545

46-
@Param("524288")
46+
@Param("1", "1024")
4747
var actorStateSize = 1
4848

4949
@Benchmark

benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/PingPongActorBenchmark.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import java.util.concurrent.*
2727
*/
2828
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
2929
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
30-
@Fork(value = 2)
30+
@Fork(value = 1)
3131
@BenchmarkMode(Mode.AverageTime)
3232
@OutputTimeUnit(TimeUnit.MILLISECONDS)
3333
@State(Scope.Benchmark)

benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/PingPongWithBlockingContext.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ import kotlin.coroutines.*
2020
* PingPongWithBlockingContext.withContextPingPong avgt 20 761.669 ± 41.371 ms/op
2121
*/
2222
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
23-
@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
24-
@Fork(value = 2)
23+
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
24+
@Fork(value = 1)
2525
@BenchmarkMode(Mode.AverageTime)
2626
@OutputTimeUnit(TimeUnit.MILLISECONDS)
2727
@State(Scope.Benchmark)

benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/StatefulActorBenchmark.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import java.util.concurrent.*
3333
*/
3434
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
3535
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
36-
@Fork(value = 2)
36+
@Fork(value = 1)
3737
@BenchmarkMode(Mode.AverageTime)
3838
@OutputTimeUnit(TimeUnit.MILLISECONDS)
3939
@State(Scope.Benchmark)

kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt

+41-24
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ internal class CoroutineScheduler(
334334
globalCpuQueue.close()
335335
// Finish processing tasks from globalQueue and/or from this worker's local queue
336336
while (true) {
337-
val task = currentWorker?.findTask()
337+
val task = currentWorker?.findTask(true)
338338
?: globalCpuQueue.removeFirstOrNull()
339339
?: globalBlockingQueue.removeFirstOrNull()
340340
?: break
@@ -469,10 +469,10 @@ internal class CoroutineScheduler(
469469
*/
470470
if (worker.state === WorkerState.TERMINATED) return task
471471
// Do not add CPU tasks in local queue if we are not able to execute it
472-
// TODO discuss: maybe add it to the local queue and offload back in the global queue iff permit wasn't acquired?
473472
if (task.mode == TaskMode.NON_BLOCKING && worker.isBlocking) {
474473
return task
475474
}
475+
worker.mayHaveLocalTasks = true
476476
return worker.localQueue.add(task, fair = fair)
477477
}
478478

@@ -658,42 +658,47 @@ internal class CoroutineScheduler(
658658
}
659659

660660
override fun run() = runWorker()
661+
@JvmField
662+
var mayHaveLocalTasks = false
661663

662664
private fun runWorker() {
663665
var rescanned = false
664666
while (!isTerminated && state != WorkerState.TERMINATED) {
665-
val task = findTask()
667+
val task = findTask(mayHaveLocalTasks)
666668
// Task found. Execute and repeat
667669
if (task != null) {
668670
rescanned = false
669671
minDelayUntilStealableTaskNs = 0L
670672
executeTask(task)
671673
continue
674+
} else {
675+
mayHaveLocalTasks = false
672676
}
673677
/*
674678
* No tasks were found:
675679
* 1) Either at least one of the workers has stealable task in its FIFO-buffer with a stealing deadline.
676680
* Then its deadline is stored in [minDelayUntilStealableTask]
677681
*
678682
* Then just park for that duration (ditto re-scanning).
679-
* While it could potentially lead to short (up to WORK_STEALING_TIME_RESOLUTION_NS ns) starvations,
683+
* While it could potentially lead to short (up to WORK_STEALING_TIME_RESOLUTION_NS ns) starvations,
680684
* excess unparks and managing "one unpark per signalling" invariant become unfeasible, instead we are going to resolve
681685
* it with "spinning via scans" mechanism.
682686
* NB: this short potential parking does not interfere with `tryUnpark`
683687
*/
684688
if (minDelayUntilStealableTaskNs != 0L) {
685689
if (!rescanned) {
686690
rescanned = true
687-
continue
688691
} else {
692+
rescanned = false
689693
tryReleaseCpu(WorkerState.PARKING)
690694
interrupted()
691695
LockSupport.parkNanos(minDelayUntilStealableTaskNs)
692696
minDelayUntilStealableTaskNs = 0L
693697
}
698+
continue
694699
}
695700
/*
696-
* 2) No tasks available, time to park and, potentially, shut down the thread.
701+
* 2) Or no tasks available, time to park and, potentially, shut down the thread.
697702
* Add itself to the stack of parked workers, re-scans all the queues
698703
* to avoid missing wake-up (requestCpuWorker) and either starts executing discovered tasks or parks itself awaiting for new tasks.
699704
*/
@@ -704,20 +709,24 @@ internal class CoroutineScheduler(
704709

705710
// Counterpart to "tryUnpark"
706711
private fun tryPark() {
707-
parkingState.value = PARKING_ALLOWED
712+
if (!inStack()) {
713+
parkingState.value = PARKING_ALLOWED
714+
}
708715
if (parkedWorkersStackPush(this)) {
709716
return
710717
} else {
711718
assert { localQueue.size == 0 }
712-
tryReleaseCpu(WorkerState.PARKING)
713-
interrupted() // Cleanup interruptions
714-
// Failed to get a parking permit, bailout
715-
if (!parkingState.compareAndSet(PARKING_ALLOWED, PARKED)) {
716-
return
717-
}
718-
while (inStack()) { // Prevent spurious wakeups
719+
// Failed to get a parking permit => we are not in the stack
720+
while (inStack()) {
719721
if (isTerminated || state == WorkerState.TERMINATED) break
720-
park()
722+
if (parkingState.value != PARKED && !parkingState.compareAndSet(PARKING_ALLOWED, PARKED)) {
723+
return
724+
}
725+
tryReleaseCpu(WorkerState.PARKING)
726+
interrupted() // Cleanup interruptions
727+
if (inStack()) {
728+
park()
729+
}
721730
}
722731
}
723732
}
@@ -848,22 +857,30 @@ internal class CoroutineScheduler(
848857
}
849858
}
850859

851-
fun findTask(): Task? {
852-
if (tryAcquireCpuPermit()) return findAnyTask()
860+
fun findTask(scanLocalQueue: Boolean): Task? {
861+
if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue)
853862
// If we can't acquire a CPU permit -- attempt to find blocking task
854-
val task = localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
863+
val task = if (scanLocalQueue) {
864+
localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
865+
} else {
866+
globalBlockingQueue.removeFirstOrNull()
867+
}
855868
return task ?: trySteal(blockingOnly = true)
856869
}
857870

858-
private fun findAnyTask(): Task? {
871+
private fun findAnyTask(scanLocalQueue: Boolean): Task? {
859872
/*
860873
* Anti-starvation mechanism: probabilistically poll either local
861874
* or global queue to ensure progress for both external and internal tasks.
862875
*/
863-
val globalFirst = nextInt(2 * corePoolSize) == 0
864-
if (globalFirst) pollGlobalQueues()?.let { return it }
865-
localQueue.poll()?.let { return it }
866-
if (!globalFirst) pollGlobalQueues()?.let { return it }
876+
if (scanLocalQueue) {
877+
val globalFirst = nextInt(2 * corePoolSize) == 0
878+
if (globalFirst) pollGlobalQueues()?.let { return it }
879+
localQueue.poll()?.let { return it }
880+
if (!globalFirst) pollGlobalQueues()?.let { return it }
881+
} else {
882+
pollGlobalQueues()?.let { return it }
883+
}
867884
return trySteal(blockingOnly = false)
868885
}
869886

@@ -887,7 +904,7 @@ internal class CoroutineScheduler(
887904

888905
var currentIndex = nextInt(created)
889906
var minDelay = Long.MAX_VALUE
890-
repeat(created) {
907+
repeat(workers.length()) {
891908
++currentIndex
892909
if (currentIndex > created) currentIndex = 1
893910
val worker = workers[currentIndex]

kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ internal val MAX_POOL_SIZE = systemProp(
4545

4646
@JvmField
4747
internal val IDLE_WORKER_KEEP_ALIVE_NS = TimeUnit.SECONDS.toNanos(
48-
systemProp("kotlinx.coroutines.scheduler.keep.alive.sec", 100000L)
48+
systemProp("kotlinx.coroutines.scheduler.keep.alive.sec", 60L)
4949
)
5050

5151
@JvmField

0 commit comments

Comments
 (0)