Skip to content

Commit dd7ad95

Browse files
committed
Merge termination state machine and parking state machine into one, get rid of long-scanning sequence
1 parent 31f71dd commit dd7ad95

File tree

1 file changed

+17
-49
lines changed

1 file changed

+17
-49
lines changed

kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt

+17-49
Original file line numberDiff line numberDiff line change
@@ -303,14 +303,10 @@ internal class CoroutineScheduler(
303303
@JvmField
304304
val NOT_IN_STACK = Symbol("NOT_IN_STACK")
305305

306-
// Worker termination states
307-
private const val TERMINATION_FORBIDDEN = -1
308-
private const val TERMINATION_ALLOWED = 0
306+
// Worker ctl states
307+
private const val PARKED = -1
308+
private const val CLAIMED = 0
309309
private const val TERMINATED = 1
310-
// Worker parking states
311-
private const val PARKING_FORBIDDEN = -1
312-
private const val PARKING_ALLOWED = 0
313-
private const val PARKED = 1
314310

315311
// Masks of control state
316312
private const val BLOCKING_SHIFT = 21 // 2M threads max
@@ -443,10 +439,10 @@ internal class CoroutineScheduler(
443439
private fun tryUnpark(): Boolean {
444440
while (true) {
445441
val worker = parkedWorkersStackPop() ?: return false
446-
if (!worker.parkingState.compareAndSet(PARKING_ALLOWED, PARKING_FORBIDDEN)) {
442+
if (worker.workerCtl.compareAndSet(PARKED, CLAIMED)) {
447443
LockSupport.unpark(worker)
444+
return true
448445
}
449-
if (worker.tryForbidTermination()) return true
450446
}
451447
}
452448

@@ -596,23 +592,19 @@ internal class CoroutineScheduler(
596592
/**
597593
* Worker state. **Updated only by this worker thread**.
598594
* By default, worker is in DORMANT state in the case when it was created, but all CPU tokens or tasks were taken.
595+
* Is used locally by the worker to maintain its own invariants.
599596
*/
600597
@JvmField
601598
var state = WorkerState.DORMANT
602599

603600
/**
604-
* Small state machine for termination.
605-
* Followed states are allowed:
606-
* [TERMINATION_ALLOWED] -- worker can wake up and terminate itself
607-
* [TERMINATION_FORBIDDEN] -- worker is not allowed to terminate (because it was chosen by another thread to help)
608-
* [TERMINATED] -- final state, thread is terminating and cannot be resurrected
609-
*
610-
* Allowed transitions:
611-
* [TERMINATION_ALLOWED] -> [TERMINATION_FORBIDDEN]
612-
* [TERMINATION_ALLOWED] -> [TERMINATED]
613-
* [TERMINATION_FORBIDDEN] -> [TERMINATION_ALLOWED]
601+
* Worker control state responsible for worker claiming, parking and termination.
602+
* List of states:
603+
* [PARKED] -- worker is parked and can self-terminate after a termination deadline.
604+
* [CLAIMED] -- worker is claimed by an external submitter.
605+
* [TERMINATED] -- worker is terminated and no longer usable.
614606
*/
615-
private val terminationState = atomic(TERMINATION_ALLOWED)
607+
val workerCtl = atomic(CLAIMED)
616608

617609
/**
618610
* It is set to the termination deadline when started doing [park] and it reset
@@ -632,25 +624,8 @@ internal class CoroutineScheduler(
632624
* The delay until at least one task in other worker queues will become stealable.
633625
*/
634626
private var minDelayUntilStealableTaskNs = 0L
635-
// PARKING_ALLOWED | PARKING_FORBIDDEN | PARKED
636-
val parkingState = atomic(PARKING_ALLOWED)
637627

638628
private var rngState = Random.nextInt()
639-
/**
640-
* Tries to set [terminationState] to [TERMINATION_FORBIDDEN], returns `false` if this attempt fails.
641-
* This attempt may fail either because worker terminated itself or because someone else
642-
* claimed this worker (though this case is rare, because require very bad timings)
643-
*/
644-
fun tryForbidTermination(): Boolean =
645-
when (val state = terminationState.value) {
646-
TERMINATED -> false // already terminated
647-
TERMINATION_FORBIDDEN -> false // already forbidden, someone else claimed this worker
648-
TERMINATION_ALLOWED -> terminationState.compareAndSet(
649-
TERMINATION_ALLOWED,
650-
TERMINATION_FORBIDDEN
651-
)
652-
else -> error("Invalid terminationState = $state")
653-
}
654629

655630
/**
656631
* Tries to acquire CPU token if worker doesn't have one
@@ -730,23 +705,17 @@ internal class CoroutineScheduler(
730705
// Counterpart to "tryUnpark"
731706
private fun tryPark() {
732707
if (!inStack()) {
733-
parkingState.value = PARKING_ALLOWED
734708
parkedWorkersStackPush(this)
735709
return
736-
737710
}
738711
assert { localQueue.size == 0 }
739-
// Failed to get a parking permit => we are not in the stack
740712
while (inStack()) {
741713
if (isTerminated || state == WorkerState.TERMINATED) break
742-
if (parkingState.value != PARKED && !parkingState.compareAndSet(PARKING_ALLOWED, PARKED)) {
743-
return
744-
}
745714
tryReleaseCpu(WorkerState.PARKING)
715+
workerCtl.value = PARKED
716+
if (!inStack()) return
746717
interrupted() // Cleanup interruptions
747-
if (inStack()) {
748-
park()
749-
}
718+
park()
750719
}
751720
}
752721

@@ -798,7 +767,6 @@ internal class CoroutineScheduler(
798767
}
799768

800769
private fun park() {
801-
terminationState.value = TERMINATION_ALLOWED
802770
// set termination deadline the first time we are here (it is reset in idleReset)
803771
if (terminationDeadline == 0L) terminationDeadline = System.nanoTime() + idleWorkerKeepAliveNs
804772
// actually park
@@ -824,7 +792,7 @@ internal class CoroutineScheduler(
824792
* See tryUnpark for state reasoning.
825793
* If this CAS fails, then we were successfully unparked by other worker and cannot terminate.
826794
*/
827-
if (!terminationState.compareAndSet(TERMINATION_ALLOWED, TERMINATED)) return
795+
if (!workerCtl.compareAndSet(PARKED, TERMINATED)) return
828796
/*
829797
* At this point this thread is no longer considered as usable for scheduling.
830798
* We need multi-step choreography to reindex workers.
@@ -923,7 +891,7 @@ internal class CoroutineScheduler(
923891

924892
var currentIndex = nextInt(created)
925893
var minDelay = Long.MAX_VALUE
926-
repeat(workers.length()) {
894+
repeat(created) {
927895
++currentIndex
928896
if (currentIndex > created) currentIndex = 1
929897
val worker = workers[currentIndex]

0 commit comments

Comments
 (0)