Skip to content

Commit 8853126

Browse files
committed
Do not probabilistically steal blocking tasks as part of anti-starvation mechanism: stealing thread might be woken up in blockingQuiescence and already have a blocking task in its local queue
1 parent db5bd54 commit 8853126

File tree

3 files changed

+17
-11
lines changed

3 files changed

+17
-11
lines changed

core/kotlinx-coroutines-core/src/scheduling/CoroutineScheduler.kt

+15-6
Original file line numberDiff line numberDiff line change
@@ -831,7 +831,7 @@ internal class CoroutineScheduler(
831831
// set termination deadline the first time we are here (it is reset in idleReset)
832832
if (terminationDeadline == 0L) terminationDeadline = System.nanoTime() + idleWorkerKeepAliveNs
833833
// actually park
834-
doPark(idleWorkerKeepAliveNs)
834+
if (!doPark(idleWorkerKeepAliveNs)) return
835835
// try terminate when we are idle past termination deadline
836836
// note, that comparison is written like this to protect against potential nanoTime wraparound
837837
if (System.nanoTime() - terminationDeadline >= 0) {
@@ -840,9 +840,15 @@ internal class CoroutineScheduler(
840840
}
841841
}
842842

843-
private fun doPark(nanos: Long) {
843+
private fun doPark(nanos: Long): Boolean {
844+
/*
845+
* Here we are trying to park, then check whether there are new blocking tasks
846+
* (because submitting thread could have missed this thread in tryUnpark)
847+
*/
844848
parkedWorkersStackPush(this)
849+
if (!blockingQuiescence()) return false
845850
LockSupport.parkNanos(nanos)
851+
return true
846852
}
847853

848854
/**
@@ -909,7 +915,7 @@ internal class CoroutineScheduler(
909915
* Returns `true` if there is no blocking tasks in the queue.
910916
*/
911917
private fun blockingQuiescence(): Boolean {
912-
globalQueue.removeFirstBlockingModeOrNull()?.let {
918+
globalQueue.removeFirstWithModeOrNull(TaskMode.PROBABLY_BLOCKING)?.let {
913919
localQueue.add(it, globalQueue)
914920
return false
915921
}
@@ -944,7 +950,7 @@ internal class CoroutineScheduler(
944950
* 2) It helps with rare race when external submitter sends depending blocking tasks
945951
* one by one and one of the requested workers may miss CPU token
946952
*/
947-
return localQueue.poll() ?: globalQueue.removeFirstBlockingModeOrNull()
953+
return localQueue.poll() ?: globalQueue.removeFirstWithModeOrNull(TaskMode.PROBABLY_BLOCKING)
948954
}
949955

950956
private fun findTaskWithCpuPermit(): Task? {
@@ -953,10 +959,13 @@ internal class CoroutineScheduler(
953959
* or local work is frequently offloaded, global queue polling will
954960
* starve tasks from local queue. But if we never poll global queue,
955961
* then local tasks may starve global queue, so poll global queue
956-
* once per two core pool size iterations
962+
* once per two core pool size iterations.
963+
* Poll global queue only for non-blocking tasks as for blocking task a separate thread was woken up.
964+
* If current thread is woken up, then its local queue is empty and it will poll global queue anyway,
965+
* otherwise current thread may already have blocking task in its local queue.
957966
*/
958967
val globalFirst = nextInt(2 * corePoolSize) == 0
959-
if (globalFirst) globalQueue.removeFirstOrNull()?.let { return it }
968+
if (globalFirst) globalQueue.removeFirstWithModeOrNull(TaskMode.NON_BLOCKING)?.let { return it }
960969
localQueue.poll()?.let { return it }
961970
if (!globalFirst) globalQueue.removeFirstOrNull()?.let { return it }
962971
return trySteal()

core/kotlinx-coroutines-core/src/scheduling/Tasks.kt

+2-3
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,8 @@ internal class TaskImpl(
110110

111111
// Open for tests
112112
internal open class GlobalQueue : LockFreeTaskQueue<Task>(singleConsumer = false) {
113-
// Open for tests
114-
public open fun removeFirstBlockingModeOrNull(): Task? =
115-
removeFirstOrNullIf { it.mode == TaskMode.PROBABLY_BLOCKING }
113+
public fun removeFirstWithModeOrNull(mode: TaskMode): Task? =
114+
removeFirstOrNullIf { it.mode == mode }
116115
}
117116

118117
internal abstract class TimeSource {

core/kotlinx-coroutines-core/test/scheduling/WorkQueueStressTest.kt

-2
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,6 @@ class WorkQueueStressTest : TestBase() {
117117
}
118118

119119
internal class Queue : GlobalQueue() {
120-
override fun removeFirstBlockingModeOrNull(): Task? = error("Should not be called")
121-
122120
fun addAll(tasks: Collection<Task>) {
123121
tasks.forEach { addLast(it) }
124122
}

0 commit comments

Comments
 (0)