Skip to content

Commit 2fd02cd

Browse files
committed
Documentation improvements, naming, proper spurious wakeup check
1 parent 81b95f0 commit 2fd02cd

File tree

2 files changed

+52
-46
lines changed

2 files changed

+52
-46
lines changed

kotlinx-coroutines-core/common/src/internal/LockFreeTaskQueue.kt

-1
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,6 @@ internal class LockFreeTaskQueueCore<E : Any>(
175175
}
176176
// element == Placeholder can only be when add has not finished yet
177177
if (element is Placeholder) return null // consider it not added yet
178-
// now we tentative know element to remove -- check predicate
179178
// we cannot put null into array here, because copying thread could replace it with Placeholder and that is a disaster
180179
val newHead = (head + 1) and MAX_CAPACITY_MASK
181180
if (_state.compareAndSet(state, state.updateHead(newHead))) {

kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt

+52-45
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import kotlin.random.*
2525
* ### Structural overview
2626
*
2727
* Scheduler consists of [corePoolSize] worker threads to execute CPU-bound tasks and up to [maxPoolSize] lazily created threads
28-
* to execute blocking tasks. Every worker a has local queue in addition to a global scheduler queue and the global queue
28+
* to execute blocking tasks. Every worker has a local queue in addition to a global scheduler queue and the global queue
2929
* has priority over local queue to avoid starvation of externally-submitted (e.g. from Android UI thread) tasks.
3030
* Work-stealing is implemented on top of that queues to provide even load distribution and illusion of centralized run queue.
3131
*
@@ -245,7 +245,7 @@ internal class CoroutineScheduler(
245245
*/
246246
private val controlState = atomic(corePoolSize.toLong() shl CPU_PERMITS_SHIFT)
247247
private val createdWorkers: Int inline get() = (controlState.value and CREATED_MASK).toInt()
248-
private val availableCpuPermits: Int inline get() = (controlState.value and CPU_PERMITS_MASK shr CPU_PERMITS_SHIFT).toInt()
248+
private val availableCpuPermits: Int inline get() = availableCpuPermits(controlState.value)
249249

250250
private inline fun createdWorkers(state: Long): Int = (state and CREATED_MASK).toInt()
251251
private inline fun blockingTasks(state: Long): Int = (state and BLOCKING_MASK shr BLOCKING_SHIFT).toInt()
@@ -261,14 +261,11 @@ internal class CoroutineScheduler(
261261
controlState.addAndGet(-(1L shl BLOCKING_SHIFT))
262262
}
263263

264-
private inline fun tryAcquireCpuPermit(): Boolean {
265-
while (true) {
266-
val state = controlState.value
267-
val available = availableCpuPermits(state)
268-
if (available == 0) return false
269-
val update = state - (1L shl CPU_PERMITS_SHIFT)
270-
if (controlState.compareAndSet(state, update)) return true
271-
}
264+
private inline fun tryAcquireCpuPermit(): Boolean = controlState.loop { state ->
265+
val available = availableCpuPermits(state)
266+
if (available == 0) return false
267+
val update = state - (1L shl CPU_PERMITS_SHIFT)
268+
if (controlState.compareAndSet(state, update)) return true
272269
}
273270

274271
private inline fun releaseCpuPermit() = controlState.addAndGet(1L shl CPU_PERMITS_SHIFT)
@@ -283,9 +280,12 @@ internal class CoroutineScheduler(
283280
val NOT_IN_STACK = Symbol("NOT_IN_STACK")
284281

285282
// Worker termination states
286-
private const val FORBIDDEN = -1
287-
private const val ALLOWED = 0
283+
private const val TERMINATION_FORBIDDEN = -1
284+
private const val TERMINATION_ALLOWED = 0
288285
private const val TERMINATED = 1
286+
// Worker parking states
287+
private const val PARKING_FORBIDDEN = -1
288+
private const val PARKING_ALLOWED = 0
289289
private const val PARKED = 1
290290

291291
// Masks of control state
@@ -419,7 +419,7 @@ internal class CoroutineScheduler(
419419
private fun tryUnpark(): Boolean {
420420
while (true) {
421421
val worker = parkedWorkersStackPop() ?: return false
422-
if (!worker.parkingState.compareAndSet(ALLOWED, FORBIDDEN)) {
422+
if (!worker.parkingState.compareAndSet(PARKING_ALLOWED, PARKING_FORBIDDEN)) {
423423
LockSupport.unpark(worker)
424424
}
425425
if (worker.tryForbidTermination()) return true
@@ -525,7 +525,7 @@ internal class CoroutineScheduler(
525525
"CPU = $cpuWorkers, " +
526526
"blocking = $blockingWorkers, " +
527527
"parked = $parkedWorkers, " +
528-
"retired = $dormant, " +
528+
"dormant = $dormant, " +
529529
"terminated = $terminated}, " +
530530
"running workers queues = $queueSizes, "+
531531
"global CPU queue size = ${globalCpuQueue.size}, " +
@@ -581,16 +581,16 @@ internal class CoroutineScheduler(
581581
/**
582582
* Small state machine for termination.
583583
* Followed states are allowed:
584-
* [ALLOWED] -- worker can wake up and terminate itself
585-
* [FORBIDDEN] -- worker is not allowed to terminate (because it was chosen by another thread to help)
584+
* [TERMINATION_ALLOWED] -- worker can wake up and terminate itself
585+
* [TERMINATION_FORBIDDEN] -- worker is not allowed to terminate (because it was chosen by another thread to help)
586586
* [TERMINATED] -- final state, thread is terminating and cannot be resurrected
587587
*
588588
* Allowed transitions:
589-
* [ALLOWED] -> [FORBIDDEN]
590-
* [ALLOWED] -> [TERMINATED]
591-
* [FORBIDDEN] -> [ALLOWED]
589+
* [TERMINATION_ALLOWED] -> [TERMINATION_FORBIDDEN]
590+
* [TERMINATION_ALLOWED] -> [TERMINATED]
591+
* [TERMINATION_FORBIDDEN] -> [TERMINATION_ALLOWED]
592592
*/
593-
private val terminationState = atomic(ALLOWED)
593+
private val terminationState = atomic(TERMINATION_ALLOWED)
594594

595595
/**
596596
* It is set to the termination deadline when started doing [park] and it reset
@@ -610,22 +610,22 @@ internal class CoroutineScheduler(
610610
* The delay until at least one task in other worker queues will become stealable.
611611
*/
612612
private var minDelayUntilStealableTaskNs = 0L
613-
// ALLOWED | PARKED | FORBIDDEN
614-
val parkingState = atomic(ALLOWED)
613+
// PARKING_ALLOWED | PARKING_FORBIDDEN | PARKED
614+
val parkingState = atomic(PARKING_ALLOWED)
615615

616616
private var rngState = Random.nextInt()
617617
/**
618-
* Tries to set [terminationState] to [FORBIDDEN], returns `false` if this attempt fails.
618+
* Tries to set [terminationState] to [TERMINATION_FORBIDDEN], returns `false` if this attempt fails.
619619
* This attempt may fail either because worker terminated itself or because someone else
620620
* claimed this worker (though this case is rare, because require very bad timings)
621621
*/
622622
fun tryForbidTermination(): Boolean =
623623
when (val state = terminationState.value) {
624624
TERMINATED -> false // already terminated
625-
FORBIDDEN -> false // already forbidden, someone else claimed this worker
626-
ALLOWED -> terminationState.compareAndSet(
627-
ALLOWED,
628-
FORBIDDEN
625+
TERMINATION_FORBIDDEN -> false // already forbidden, someone else claimed this worker
626+
TERMINATION_ALLOWED -> terminationState.compareAndSet(
627+
TERMINATION_ALLOWED,
628+
TERMINATION_FORBIDDEN
629629
)
630630
else -> error("Invalid terminationState = $state")
631631
}
@@ -679,42 +679,49 @@ internal class CoroutineScheduler(
679679
* While it could potentially lead to short (up to WORK_STEALING_TIME_RESOLUTION_NS ns) starvations,
680680
* excess unparks and managing "one unpark per signalling" invariant become unfeasible, instead we are going to resolve
681681
* it with "spinning via scans" mechanism.
682+
* NB: this short potential parking does not interfere with `tryUnpark`
682683
*/
683684
if (minDelayUntilStealableTaskNs != 0L) {
684685
if (!rescanned) {
685686
rescanned = true
686687
continue
687688
} else {
688689
tryReleaseCpu(WorkerState.PARKING)
690+
interrupted()
689691
LockSupport.parkNanos(minDelayUntilStealableTaskNs)
690692
minDelayUntilStealableTaskNs = 0L
691693
}
692694
}
693695
/*
694696
* 2) No tasks available, time to park and, potentially, shut down the thread.
695-
*
696697
* Add itself to the stack of parked workers, re-scans all the queues
697698
* to avoid missing wake-up (requestCpuWorker) and either starts executing discovered tasks or parks itself awaiting for new tasks.
698699
*/
699-
parkingState.value = ALLOWED
700-
if (parkedWorkersStackPush(this)) {
701-
continue
702-
} else {
703-
assert { localQueue.size == 0 }
704-
tryReleaseCpu(WorkerState.PARKING)
705-
interrupted() // Cleanup interruptions
706-
while (inStack()) { // Prevent spurious wakeups
707-
if (isTerminated) break
708-
if (!parkingState.compareAndSet(ALLOWED, PARKED)) {
709-
break
710-
}
711-
park()
712-
}
713-
}
700+
tryPark()
714701
}
715702
tryReleaseCpu(WorkerState.TERMINATED)
716703
}
717704

705+
// Counterpart to "tryUnpark"
706+
private fun tryPark() {
707+
parkingState.value = PARKING_ALLOWED
708+
if (parkedWorkersStackPush(this)) {
709+
return
710+
} else {
711+
assert { localQueue.size == 0 }
712+
tryReleaseCpu(WorkerState.PARKING)
713+
interrupted() // Cleanup interruptions
714+
// Failed to get a parking permit, bailout
715+
if (!parkingState.compareAndSet(PARKING_ALLOWED, PARKED)) {
716+
return
717+
}
718+
while (inStack()) { // Prevent spurious wakeups
719+
if (isTerminated) break
720+
park()
721+
}
722+
}
723+
}
724+
718725
private fun inStack(): Boolean = nextParkedWorker !== NOT_IN_STACK
719726

720727
private fun executeTask(task: Task) {
@@ -763,7 +770,7 @@ internal class CoroutineScheduler(
763770
}
764771

765772
private fun park() {
766-
terminationState.value = ALLOWED
773+
terminationState.value = TERMINATION_ALLOWED
767774
// set termination deadline the first time we are here (it is reset in idleReset)
768775
if (terminationDeadline == 0L) terminationDeadline = System.nanoTime() + idleWorkerKeepAliveNs
769776
// actually park
@@ -789,7 +796,7 @@ internal class CoroutineScheduler(
789796
* See tryUnpark for state reasoning.
790797
* If this CAS fails, then we were successfully unparked by other worker and cannot terminate.
791798
*/
792-
if (!terminationState.compareAndSet(ALLOWED, TERMINATED)) return
799+
if (!terminationState.compareAndSet(TERMINATION_ALLOWED, TERMINATED)) return
793800
/*
794801
* At this point this thread is no longer considered as usable for scheduling.
795802
* We need multi-step choreography to reindex workers.

0 commit comments

Comments
 (0)