Skip to content

Commit ca035a0

Browse files
committed
Fix oversubscription in CoroutineScheduler (-> Dispatchers.Default)
Previously, a worker thread unconditionally processed tasks from its own local queue, even if tasks were CPU-intensive, but CPU token was not acquired. Fixes #3418
1 parent 54f5766 commit ca035a0

File tree

2 files changed

+92
-11
lines changed

2 files changed

+92
-11
lines changed

kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt

+3-11
Original file line numberDiff line numberDiff line change
@@ -726,7 +726,6 @@ internal class CoroutineScheduler(
726726
parkedWorkersStackPush(this)
727727
return
728728
}
729-
assert { localQueue.size == 0 }
730729
workerCtl.value = PARKED // Update value once
731730
/*
732731
* inStack() prevents spurious wakeups, while workerCtl.value == PARKED
@@ -873,15 +872,10 @@ internal class CoroutineScheduler(
873872
}
874873
}
875874

876-
fun findTask(scanLocalQueue: Boolean): Task? {
877-
if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue)
875+
fun findTask(mayHaveLocalTasks: Boolean): Task? {
876+
if (tryAcquireCpuPermit()) return findAnyTask(mayHaveLocalTasks)
878877
// If we can't acquire a CPU permit -- attempt to find blocking task
879-
val task = if (scanLocalQueue) {
880-
localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
881-
} else {
882-
globalBlockingQueue.removeFirstOrNull()
883-
}
884-
return task ?: trySteal(blockingOnly = true)
878+
return globalBlockingQueue.removeFirstOrNull() ?: trySteal(blockingOnly = true)
885879
}
886880

887881
private fun findAnyTask(scanLocalQueue: Boolean): Task? {
@@ -911,7 +905,6 @@ internal class CoroutineScheduler(
911905
}
912906

913907
private fun trySteal(blockingOnly: Boolean): Task? {
914-
assert { localQueue.size == 0 }
915908
val created = createdWorkers
916909
// 0 to await an initialization and 1 to avoid excess stealing on single-core machines
917910
if (created < 2) {
@@ -925,7 +918,6 @@ internal class CoroutineScheduler(
925918
if (currentIndex > created) currentIndex = 1
926919
val worker = workers[currentIndex]
927920
if (worker !== null && worker !== this) {
928-
assert { localQueue.size == 0 }
929921
val stealResult = if (blockingOnly) {
930922
worker.localQueue.tryStealBlocking(stolenTask)
931923
} else {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.scheduling
6+
7+
import kotlinx.coroutines.*
8+
import org.junit.Test
9+
import java.util.concurrent.*
10+
import java.util.concurrent.atomic.AtomicInteger
11+
12+
class CoroutineSchedulerOversubscriptionTest : TestBase() {
13+
14+
private val inDefault = AtomicInteger(0)
15+
16+
private fun CountDownLatch.runAndCheck() {
17+
if (inDefault.incrementAndGet() > CORE_POOL_SIZE) {
18+
error("Oversubscription detected")
19+
}
20+
21+
await()
22+
inDefault.decrementAndGet()
23+
}
24+
25+
@Test
26+
fun testOverSubscriptionDeterministic() = runTest {
27+
val barrier = CountDownLatch(1)
28+
val threadsOccupiedBarrier = CyclicBarrier(CORE_POOL_SIZE)
29+
// All threads but one
30+
repeat(CORE_POOL_SIZE - 1) {
31+
launch(Dispatchers.Default) {
32+
threadsOccupiedBarrier.await()
33+
barrier.runAndCheck()
34+
}
35+
}
36+
threadsOccupiedBarrier.await()
37+
withContext(Dispatchers.Default) {
38+
// Put a task in a local queue, it will be stolen
39+
launch(Dispatchers.Default) {
40+
barrier.runAndCheck()
41+
}
42+
// Put one more task to trick the local queue check
43+
launch(Dispatchers.Default) {
44+
barrier.runAndCheck()
45+
}
46+
47+
withContext(Dispatchers.IO) {
48+
try {
49+
// Release the thread
50+
delay(100)
51+
} finally {
52+
barrier.countDown()
53+
}
54+
}
55+
}
56+
}
57+
58+
@Test
59+
fun testOverSubscriptionStress() = repeat(1000 * stressTestMultiplierSqrt) {
60+
inDefault.set(0)
61+
runTest {
62+
val barrier = CountDownLatch(1)
63+
val threadsOccupiedBarrier = CyclicBarrier(CORE_POOL_SIZE)
64+
// All threads but one
65+
repeat(CORE_POOL_SIZE - 1) {
66+
launch(Dispatchers.Default) {
67+
threadsOccupiedBarrier.await()
68+
barrier.runAndCheck()
69+
}
70+
}
71+
threadsOccupiedBarrier.await()
72+
withContext(Dispatchers.Default) {
73+
// Put a task in a local queue
74+
launch(Dispatchers.Default) {
75+
barrier.runAndCheck()
76+
}
77+
// Put one more task to trick the local queue check
78+
launch(Dispatchers.Default) {
79+
barrier.runAndCheck()
80+
}
81+
82+
withContext(Dispatchers.IO) {
83+
yield()
84+
barrier.countDown()
85+
}
86+
}
87+
}
88+
}
89+
}

0 commit comments

Comments
 (0)