Skip to content

Commit 86c39d6

Browse files
committed
Fix oversubscription in CoroutineScheduler (-> Dispatchers.Default)
Previously, a worker thread unconditionally processed tasks from its own local queue, even if tasks were CPU-intensive, but CPU token was not acquired. Fixes #3418
1 parent ca035a0 commit 86c39d6

File tree

4 files changed

+52
-7
lines changed

4 files changed

+52
-7
lines changed

kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt

+8-2
Original file line numberDiff line numberDiff line change
@@ -874,8 +874,14 @@ internal class CoroutineScheduler(
874874

875875
fun findTask(mayHaveLocalTasks: Boolean): Task? {
876876
if (tryAcquireCpuPermit()) return findAnyTask(mayHaveLocalTasks)
877-
// If we can't acquire a CPU permit -- attempt to find blocking task
878-
return globalBlockingQueue.removeFirstOrNull() ?: trySteal(blockingOnly = true)
877+
/*
878+
* If we can't acquire a CPU permit, attempt to find blocking task:
879+
* * Check if our queue has one (maybe mixed in with CPU tasks)
880+
* * Poll global and try steal
881+
*/
882+
return localQueue.pollBlocking()
883+
?: globalBlockingQueue.removeFirstOrNull()
884+
?: trySteal(blockingOnly = true)
879885
}
880886

881887
private fun findAnyTask(scanLocalQueue: Boolean): Task? {

kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt

+31-3
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,12 @@ internal class WorkQueue {
4747
* [T2] changeProducerIndex (3)
4848
* [T3] changeConsumerIndex (4)
4949
*
50-
* Which can lead to resulting size bigger than actual size at any moment of time.
51-
* This is in general harmless because steal will be blocked by timer
50+
* Which can lead to resulting size being negative or bigger than actual size at any moment of time.
51+
* This is in general harmless because steal will be blocked by timer.
52+
* Negative sizes can be observed only when non-owner reads the size, which happens only
53+
* for diagnostic toString().
5254
*/
53-
internal val bufferSize: Int get() = producerIndex.value - consumerIndex.value
55+
private val bufferSize: Int get() = producerIndex.value - consumerIndex.value
5456
internal val size: Int get() = if (lastScheduledTask.value != null) bufferSize + 1 else bufferSize
5557
private val buffer: AtomicReferenceArray<Task?> = AtomicReferenceArray(BUFFER_CAPACITY)
5658
private val lastScheduledTask = atomic<Task?>(null)
@@ -134,6 +136,32 @@ internal class WorkQueue {
134136
return tryStealLastScheduled(stolenTaskRef, blockingOnly = true)
135137
}
136138

139+
// Polls for blocking task, invoked only by the owner
140+
fun pollBlocking(): Task? {
141+
while (true) { // Poll the slot
142+
val lastScheduled = lastScheduledTask.value ?: break
143+
if (!lastScheduled.isBlocking) break
144+
if (lastScheduledTask.compareAndSet(lastScheduled, null)) {
145+
return lastScheduled
146+
} // Failed -> someone else stole it
147+
}
148+
149+
val start = consumerIndex.value
150+
var end = producerIndex.value
151+
152+
while (start != end) {
153+
--end
154+
val index = end and MASK
155+
if (blockingTasksInBuffer.value == 0) break
156+
val value = buffer[index]
157+
if (value != null && value.isBlocking && buffer.compareAndSet(index, value, null)) {
158+
blockingTasksInBuffer.decrementAndGet()
159+
return value
160+
}
161+
}
162+
return null
163+
}
164+
137165
fun offloadAllWorkTo(globalQueue: GlobalQueue) {
138166
lastScheduledTask.getAndSet(null)?.let { globalQueue.addLast(it) }
139167
while (pollTo(globalQueue)) {

kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueStressTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class WorkQueueStressTest : TestBase() {
4141
threads += thread(name = "producer") {
4242
startLatch.await()
4343
for (i in 1..offerIterations) {
44-
while (producerQueue.bufferSize > BUFFER_CAPACITY / 2) {
44+
while (producerQueue.size > BUFFER_CAPACITY / 2) {
4545
Thread.yield()
4646
}
4747

@@ -79,7 +79,7 @@ class WorkQueueStressTest : TestBase() {
7979
threads += thread(name = "producer") {
8080
startLatch.await()
8181
for (i in 1..offerIterations) {
82-
while (producerQueue.bufferSize == BUFFER_CAPACITY - 1) {
82+
while (producerQueue.size == BUFFER_CAPACITY - 1) {
8383
Thread.yield()
8484
}
8585

kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt

+11
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,20 @@ class WorkQueueTest : TestBase() {
6969
assertEquals(TASK_STOLEN, victim.trySteal(ref))
7070
assertEquals(arrayListOf(2L), stealer.drain(ref))
7171
}
72+
73+
@Test
74+
fun testPollBlocking() {
75+
val queue = WorkQueue()
76+
assertNull(queue.pollBlocking())
77+
val blockingTask = blockingTask(1L)
78+
queue.add(blockingTask)
79+
queue.add(task(1L))
80+
assertSame(blockingTask, queue.pollBlocking())
81+
}
7282
}
7383

7484
internal fun task(n: Long) = TaskImpl(Runnable {}, n, NonBlockingContext)
85+
internal fun blockingTask(n: Long) = TaskImpl(Runnable {}, n, BlockingContext)
7586

7687
internal fun WorkQueue.drain(ref: ObjectRef<Task?>): List<Long> {
7788
var task: Task? = poll()

0 commit comments

Comments
 (0)