Skip to content

Introduce a separate slot for stealing tasks into in CoroutineScheduler #3537

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jan 16, 2023
14 changes: 3 additions & 11 deletions kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,6 @@ internal class CoroutineScheduler(
parkedWorkersStackPush(this)
return
}
assert { localQueue.size == 0 }
workerCtl.value = PARKED // Update value once
/*
* inStack() prevents spurious wakeups, while workerCtl.value == PARKED
Expand Down Expand Up @@ -873,15 +872,10 @@ internal class CoroutineScheduler(
}
}

fun findTask(scanLocalQueue: Boolean): Task? {
if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue)
fun findTask(mayHaveLocalTasks: Boolean): Task? {
if (tryAcquireCpuPermit()) return findAnyTask(mayHaveLocalTasks)
// If we can't acquire a CPU permit -- attempt to find blocking task
val task = if (scanLocalQueue) {
localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
} else {
globalBlockingQueue.removeFirstOrNull()
}
return task ?: trySteal(blockingOnly = true)
return globalBlockingQueue.removeFirstOrNull() ?: trySteal(blockingOnly = true)
}

private fun findAnyTask(scanLocalQueue: Boolean): Task? {
Expand Down Expand Up @@ -911,7 +905,6 @@ internal class CoroutineScheduler(
}

private fun trySteal(blockingOnly: Boolean): Task? {
assert { localQueue.size == 0 }
val created = createdWorkers
// 0 to await an initialization and 1 to avoid excess stealing on single-core machines
if (created < 2) {
Expand All @@ -925,7 +918,6 @@ internal class CoroutineScheduler(
if (currentIndex > created) currentIndex = 1
val worker = workers[currentIndex]
if (worker !== null && worker !== this) {
assert { localQueue.size == 0 }
val stealResult = if (blockingOnly) {
worker.localQueue.tryStealBlocking(stolenTask)
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.scheduling

import kotlinx.coroutines.*
import org.junit.Test
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicInteger

class CoroutineSchedulerOversubscriptionTest : TestBase() {

private val inDefault = AtomicInteger(0)

private fun CountDownLatch.runAndCheck() {
if (inDefault.incrementAndGet() > CORE_POOL_SIZE) {
error("Oversubscription detected")
}

await()
inDefault.decrementAndGet()
}

@Test
fun testOverSubscriptionDeterministic() = runTest {
val barrier = CountDownLatch(1)
val threadsOccupiedBarrier = CyclicBarrier(CORE_POOL_SIZE)
// All threads but one
repeat(CORE_POOL_SIZE - 1) {
launch(Dispatchers.Default) {
threadsOccupiedBarrier.await()
barrier.runAndCheck()
}
}
threadsOccupiedBarrier.await()
withContext(Dispatchers.Default) {
// Put a task in a local queue, it will be stolen
launch(Dispatchers.Default) {
barrier.runAndCheck()
}
// Put one more task to trick the local queue check
launch(Dispatchers.Default) {
barrier.runAndCheck()
}

withContext(Dispatchers.IO) {
try {
// Release the thread
delay(100)
} finally {
barrier.countDown()
}
}
}
}

@Test
fun testOverSubscriptionStress() = repeat(1000 * stressTestMultiplierSqrt) {
inDefault.set(0)
runTest {
val barrier = CountDownLatch(1)
val threadsOccupiedBarrier = CyclicBarrier(CORE_POOL_SIZE)
// All threads but one
repeat(CORE_POOL_SIZE - 1) {
launch(Dispatchers.Default) {
threadsOccupiedBarrier.await()
barrier.runAndCheck()
}
}
threadsOccupiedBarrier.await()
withContext(Dispatchers.Default) {
// Put a task in a local queue
launch(Dispatchers.Default) {
barrier.runAndCheck()
}
// Put one more task to trick the local queue check
launch(Dispatchers.Default) {
barrier.runAndCheck()
}

withContext(Dispatchers.IO) {
yield()
barrier.countDown()
}
}
}
}
}