Skip to content

Reach target parallelism in .limitedParallelism of failing dispatchers #4330

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ public abstract class CoroutineDispatcher :
*
* This method should generally be exception-safe. An exception thrown from this method
* may leave the coroutines that use this dispatcher in an inconsistent and hard-to-debug state.
* It is assumed that if any exceptions do get thrown from this method, then [block] will not be executed.
*
* This method must not immediately call [block]. Doing so may result in `StackOverflowError`
* when `dispatch` is invoked repeatedly, for example when [yield] is called in a loop.
Expand Down
48 changes: 33 additions & 15 deletions kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,17 @@ internal class LimitedDispatcher(
// `runningWorkers` when they observed an empty queue.
if (!tryAllocateWorker()) return
val task = obtainTaskOrDeallocateWorker() ?: return
startWorker(Worker(task))
try {
startWorker(Worker(task))
} catch (e: Throwable) {
/* If we failed to start a worker, we should decrement the counter.
The queue is in an inconsistent state--it's non-empty despite the target parallelism not having been
reached--but at least a properly functioning worker will have a chance to correct this if some future
dispatch does succeed.
If we don't decrement the counter, it will be impossible to ever reach the target parallelism again. */
runningWorkers.decrementAndGet()
throw e
Copy link
Contributor

@JakeWharton JakeWharton Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to basically be a finally block. Any reason not to use that, instead of catch+rethrow? Same holds true below.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason to use catch/throw is that we don't want to deallocate the worker if startWorker finishes normally, as this means the worker is actually started. The same holds true below: if return is used, this means we've already deallocated the worker and don't need to do that again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right right. Thanks for clarifying.

}
}

/**
Expand Down Expand Up @@ -107,21 +117,29 @@ internal class LimitedDispatcher(
*/
private inner class Worker(private var currentTask: Runnable) : Runnable {
override fun run() {
var fairnessCounter = 0
while (true) {
try {
currentTask.run()
} catch (e: Throwable) {
handleCoroutineException(EmptyCoroutineContext, e)
try {
var fairnessCounter = 0
while (true) {
try {
currentTask.run()
} catch (e: Throwable) {
handleCoroutineException(EmptyCoroutineContext, e)
}
currentTask = obtainTaskOrDeallocateWorker() ?: return
// 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well
if (++fairnessCounter >= 16 && dispatcher.safeIsDispatchNeeded(this@LimitedDispatcher)) {
// Do "yield" to let other views execute their runnable as well
// Note that we do not decrement 'runningWorkers' as we are still committed to our part of work
dispatcher.safeDispatch(this@LimitedDispatcher, this)
return
}
}
currentTask = obtainTaskOrDeallocateWorker() ?: return
// 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well
if (++fairnessCounter >= 16 && dispatcher.safeIsDispatchNeeded(this@LimitedDispatcher)) {
// Do "yield" to let other views execute their runnable as well
// Note that we do not decrement 'runningWorkers' as we are still committed to our part of work
dispatcher.safeDispatch(this@LimitedDispatcher, this)
return
} catch (e: Throwable) {
// If the worker failed, we should deallocate its slot
synchronized(workerAllocationLock) {
runningWorkers.decrementAndGet()
}
throw e
}
}
}
Expand All @@ -132,4 +150,4 @@ internal fun Int.checkParallelism() = require(this >= 1) { "Expected positive pa
internal fun CoroutineDispatcher.namedOrThis(name: String?): CoroutineDispatcher {
if (name != null) return NamedDispatcher(this, name)
return this
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package kotlinx.coroutines

import kotlinx.coroutines.testing.*
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.test.*

class LimitedParallelismSharedTest : TestBase() {
Expand Down Expand Up @@ -28,4 +30,30 @@ class LimitedParallelismSharedTest : TestBase() {
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(Int.MIN_VALUE) }
Dispatchers.Default.limitedParallelism(Int.MAX_VALUE)
}

/**
* Checks that even if the dispatcher sporadically fails, the limited dispatcher will still allow reaching the
* target parallelism level.
*/
@Test
fun testLimitedParallelismOfOccasionallyFailingDispatcher() {
val limit = 5
var doFail = false
val workerQueue = mutableListOf<Runnable>()
val limited = object: CoroutineDispatcher() {
override fun dispatch(context: CoroutineContext, block: Runnable) {
if (doFail) throw TestException()
workerQueue.add(block)
}
}.limitedParallelism(limit)
repeat(6 * limit) {
try {
limited.dispatch(EmptyCoroutineContext, Runnable { /* do nothing */ })
} catch (_: DispatchException) {
// ignore
}
doFail = !doFail
}
assertEquals(limit, workerQueue.size)
}
}
54 changes: 54 additions & 0 deletions kotlinx-coroutines-core/jvm/test/LimitedParallelismStressTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import org.junit.runner.*
import org.junit.runners.*
import java.util.concurrent.*
import java.util.concurrent.atomic.*
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.test.*

@RunWith(Parameterized::class)
Expand Down Expand Up @@ -84,6 +86,58 @@ class LimitedParallelismStressTest(private val targetParallelism: Int) : TestBas
}
}

/**
* Checks that dispatcher failures during fairness redispatches don't prevent reaching the target parallelism.
*/
@Test
fun testLimitedFailingDispatcherReachesTargetParallelism() = runTest {
val keepFailing = AtomicBoolean(true)
val occasionallyFailing = object: CoroutineDispatcher() {
override fun dispatch(context: CoroutineContext, block: Runnable) {
if (keepFailing.get() && ThreadLocalRandom.current().nextBoolean()) throw TestException()
executor.dispatch(context, block)
}
}.limitedParallelism(targetParallelism)
doStress {
repeat(1000) {
keepFailing.set(true) // we want the next tasks to sporadically fail
// Start some tasks to make sure redispatching for fairness is happening
repeat(targetParallelism * 16 + 1) {
// targetParallelism * 16 + 1 because we need at least one worker to go through a fairness yield
// with high probability.
try {
occasionallyFailing.dispatch(EmptyCoroutineContext, Runnable {
// do nothing.
})
} catch (_: DispatchException) {
// ignore
}
}
keepFailing.set(false) // we want the next tasks to succeed
val barrier = CyclicBarrier(targetParallelism + 1)
repeat(targetParallelism) {
launch(occasionallyFailing) {
barrier.await()
}
}
val success = launch(Dispatchers.Default) {
// Successfully awaited parallelism + 1
barrier.await()
}
// Feed the dispatcher with more tasks to make sure it's not stuck
while (success.isActive) {
Thread.sleep(1)
repeat(targetParallelism) {
occasionallyFailing.dispatch(EmptyCoroutineContext, Runnable {
// do nothing.
})
}
}
coroutineContext.job.children.toList().joinAll()
}
}
}

private suspend inline fun doStress(crossinline block: suspend CoroutineScope.() -> Unit) {
repeat(stressTestMultiplier) {
coroutineScope {
Expand Down