@@ -303,14 +303,10 @@ internal class CoroutineScheduler(
303
303
@JvmField
304
304
val NOT_IN_STACK = Symbol (" NOT_IN_STACK" )
305
305
306
- // Worker termination states
307
- private const val TERMINATION_FORBIDDEN = - 1
308
- private const val TERMINATION_ALLOWED = 0
306
+ // Worker ctl states
307
+ private const val PARKED = - 1
308
+ private const val CLAIMED = 0
309
309
private const val TERMINATED = 1
310
- // Worker parking states
311
- private const val PARKING_FORBIDDEN = - 1
312
- private const val PARKING_ALLOWED = 0
313
- private const val PARKED = 1
314
310
315
311
// Masks of control state
316
312
private const val BLOCKING_SHIFT = 21 // 2M threads max
@@ -443,10 +439,10 @@ internal class CoroutineScheduler(
443
439
private fun tryUnpark (): Boolean {
444
440
while (true ) {
445
441
val worker = parkedWorkersStackPop() ? : return false
446
- if (! worker.parkingState .compareAndSet(PARKING_ALLOWED , PARKING_FORBIDDEN )) {
442
+ if (worker.workerCtl .compareAndSet(PARKED , CLAIMED )) {
447
443
LockSupport .unpark(worker)
444
+ return true
448
445
}
449
- if (worker.tryForbidTermination()) return true
450
446
}
451
447
}
452
448
@@ -596,23 +592,19 @@ internal class CoroutineScheduler(
596
592
/* *
597
593
* Worker state. **Updated only by this worker thread**.
598
594
* By default, worker is in DORMANT state in the case when it was created, but all CPU tokens or tasks were taken.
595
+ * Is used locally by the worker to maintain its own invariants.
599
596
*/
600
597
@JvmField
601
598
var state = WorkerState .DORMANT
602
599
603
600
/* *
604
- * Small state machine for termination.
605
- * Followed states are allowed:
606
- * [TERMINATION_ALLOWED] -- worker can wake up and terminate itself
607
- * [TERMINATION_FORBIDDEN] -- worker is not allowed to terminate (because it was chosen by another thread to help)
608
- * [TERMINATED] -- final state, thread is terminating and cannot be resurrected
609
- *
610
- * Allowed transitions:
611
- * [TERMINATION_ALLOWED] -> [TERMINATION_FORBIDDEN]
612
- * [TERMINATION_ALLOWED] -> [TERMINATED]
613
- * [TERMINATION_FORBIDDEN] -> [TERMINATION_ALLOWED]
601
+ * Worker control state responsible for worker claiming, parking and termination.
602
+ * List of states:
603
+ * [PARKED] -- worker is parked and can self-terminate after a termination deadline.
604
+ * [CLAIMED] -- worker is claimed by an external submitter.
605
+ * [TERMINATED] -- worker is terminated and no longer usable.
614
606
*/
615
- private val terminationState = atomic(TERMINATION_ALLOWED )
607
+ val workerCtl = atomic(CLAIMED )
616
608
617
609
/* *
618
610
* It is set to the termination deadline when started doing [park] and it reset
@@ -632,25 +624,8 @@ internal class CoroutineScheduler(
632
624
* The delay until at least one task in other worker queues will become stealable.
633
625
*/
634
626
private var minDelayUntilStealableTaskNs = 0L
635
- // PARKING_ALLOWED | PARKING_FORBIDDEN | PARKED
636
- val parkingState = atomic(PARKING_ALLOWED )
637
627
638
628
private var rngState = Random .nextInt()
639
- /* *
640
- * Tries to set [terminationState] to [TERMINATION_FORBIDDEN], returns `false` if this attempt fails.
641
- * This attempt may fail either because worker terminated itself or because someone else
642
- * claimed this worker (though this case is rare, because require very bad timings)
643
- */
644
- fun tryForbidTermination (): Boolean =
645
- when (val state = terminationState.value) {
646
- TERMINATED -> false // already terminated
647
- TERMINATION_FORBIDDEN -> false // already forbidden, someone else claimed this worker
648
- TERMINATION_ALLOWED -> terminationState.compareAndSet(
649
- TERMINATION_ALLOWED ,
650
- TERMINATION_FORBIDDEN
651
- )
652
- else -> error(" Invalid terminationState = $state " )
653
- }
654
629
655
630
/* *
656
631
* Tries to acquire CPU token if worker doesn't have one
@@ -730,23 +705,16 @@ internal class CoroutineScheduler(
730
705
// Counterpart to "tryUnpark"
731
706
private fun tryPark () {
732
707
if (! inStack()) {
733
- parkingState.value = PARKING_ALLOWED
734
708
parkedWorkersStackPush(this )
735
709
return
736
-
737
710
}
738
711
assert { localQueue.size == 0 }
739
- // Failed to get a parking permit => we are not in the stack
740
- while (inStack()) {
712
+ workerCtl.value = PARKED // Update value once
713
+ while (inStack()) { // Prevent spurious wakeups
741
714
if (isTerminated || state == WorkerState .TERMINATED ) break
742
- if (parkingState.value != PARKED && ! parkingState.compareAndSet(PARKING_ALLOWED , PARKED )) {
743
- return
744
- }
745
715
tryReleaseCpu(WorkerState .PARKING )
746
716
interrupted() // Cleanup interruptions
747
- if (inStack()) {
748
- park()
749
- }
717
+ park()
750
718
}
751
719
}
752
720
@@ -798,7 +766,6 @@ internal class CoroutineScheduler(
798
766
}
799
767
800
768
private fun park () {
801
- terminationState.value = TERMINATION_ALLOWED
802
769
// set termination deadline the first time we are here (it is reset in idleReset)
803
770
if (terminationDeadline == 0L ) terminationDeadline = System .nanoTime() + idleWorkerKeepAliveNs
804
771
// actually park
@@ -824,7 +791,7 @@ internal class CoroutineScheduler(
824
791
* See tryUnpark for state reasoning.
825
792
* If this CAS fails, then we were successfully unparked by other worker and cannot terminate.
826
793
*/
827
- if (! terminationState .compareAndSet(TERMINATION_ALLOWED , TERMINATED )) return
794
+ if (! workerCtl .compareAndSet(PARKED , TERMINATED )) return
828
795
/*
829
796
* At this point this thread is no longer considered as usable for scheduling.
830
797
* We need multi-step choreography to reindex workers.
@@ -923,7 +890,7 @@ internal class CoroutineScheduler(
923
890
924
891
var currentIndex = nextInt(created)
925
892
var minDelay = Long .MAX_VALUE
926
- repeat(workers.length() ) {
893
+ repeat(created ) {
927
894
++ currentIndex
928
895
if (currentIndex > created) currentIndex = 1
929
896
val worker = workers[currentIndex]
0 commit comments