Skip to content

Document and fix dispatchYield codepath #4255

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ public abstract class CoroutineDispatcher :
* Though the `yield` marker may be passed as a part of [context], this
* is a separate method for performance reasons.
*
* Implementation note: this entry-point is used for `Dispatchers.IO` and [Dispatchers.Default]
* unerlying implementations, see overrides for this method.
*
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
Expand Down
23 changes: 10 additions & 13 deletions kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -384,14 +384,14 @@ internal class CoroutineScheduler(
* this [block] may execute blocking operations (IO, system calls, locking primitives etc.)
*
* [taskContext] -- concurrency context of given [block].
* [tailDispatch] -- whether this [dispatch] call is the last action the (presumably) worker thread does in its current task.
* If `true`, then the task will be dispatched in a FIFO manner and no additional workers will be requested,
* but only if the current thread is a corresponding worker thread.
* [fair] -- whether this [dispatch] call is fair.
* If `true` then the task will be dispatched in a FIFO manner.
* Note that caller cannot be ensured that it is being executed on worker thread for the following reasons:
* - [CoroutineStart.UNDISPATCHED]
* - Concurrent [close] that effectively shutdowns the worker thread
* - Concurrent [close] that effectively shutdowns the worker thread.
* Used for [yield].
*/
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false) {
trackTask() // this is needed for virtual time support
val task = createTask(block, taskContext)
val isBlockingTask = task.isBlocking
Expand All @@ -400,20 +400,18 @@ internal class CoroutineScheduler(
val stateSnapshot = if (isBlockingTask) incrementBlockingTasks() else 0
// try to submit the task to the local queue and act depending on the result
val currentWorker = currentWorker()
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
val notAdded = currentWorker.submitToLocalQueue(task, fair)
if (notAdded != null) {
if (!addToGlobalQueue(notAdded)) {
// Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
throw RejectedExecutionException("$schedulerName was terminated")
}
}
val skipUnpark = tailDispatch && currentWorker != null
// Checking 'task' instead of 'notAdded' is completely okay
if (isBlockingTask) {
// Use state snapshot to better estimate the number of running threads
signalBlockingWork(stateSnapshot, skipUnpark = skipUnpark)
signalBlockingWork(stateSnapshot)
} else {
if (skipUnpark) return
signalCpuWork()
}
}
Expand All @@ -429,8 +427,7 @@ internal class CoroutineScheduler(
}

// NB: should only be called from 'dispatch' method due to blocking tasks increment
private fun signalBlockingWork(stateSnapshot: Long, skipUnpark: Boolean) {
if (skipUnpark) return
private fun signalBlockingWork(stateSnapshot: Long) {
if (tryUnpark()) return
// Use state snapshot to avoid accidental thread overprovision
if (tryCreateWorker(stateSnapshot)) return
Expand Down Expand Up @@ -506,7 +503,7 @@ internal class CoroutineScheduler(
* Returns `null` if task was successfully added or an instance of the
* task that was not added or replaced (thus should be added to global queue).
*/
private fun Worker?.submitToLocalQueue(task: Task, tailDispatch: Boolean): Task? {
private fun Worker?.submitToLocalQueue(task: Task, fair: Boolean): Task? {
if (this == null) return task
/*
* This worker could have been already terminated from this thread by close/shutdown and it should not
Expand All @@ -518,7 +515,7 @@ internal class CoroutineScheduler(
return task
}
mayHaveLocalTasks = true
return localQueue.add(task, fair = tailDispatch)
return localQueue.add(task, fair = fair)
}

private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }
Expand Down
18 changes: 14 additions & 4 deletions kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,21 @@ internal open class SchedulerCoroutineDispatcher(

override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)

override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
coroutineScheduler.dispatch(block, tailDispatch = true)
override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit {
/*
* 'dispatchYield' implementation is needed to address the scheduler's scheduling policy.
* By default, the scheduler dispatches tasks in a semi-LIFO order, meaning that for the
* task sequence [#1, #2, #3], the scheduling of task #4 will produce
* [#4, #1, #2, #3], allocates new worker and makes #4 stealable after some time.
* On a fast enough system, it means that `while (true) { yield() }` might obstruct the progress
* of the system and potentially starve it.
Copy link
Collaborator

@dkhalanskyjb dkhalanskyjb Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you're proposing is better than what we have now, but it's still not ideal. We are just one step farther from seeing the same effect. LoggingDispatcher(Dispatchers.IO) will exhibit this behavior, as the author of LoggingDispatcher can't override dispatchYield. That's a problem we're also facing with Delay (people can't wrap Delay implementations and expect the same behavior).

I see the following options of dealing with this problem for good:

  • Make the YieldContext marker used by Dispatchers.Unconfined available to all dispatchers (and not just those not needing a dispatch) and check its presence when dispatch'ing to the coroutine scheduler; remove dispatchYield completely. Probably too slow to check that on every dispatch?
  • Apply the proposed fix, but also design and publish some support for wrapping coroutine dispatchers that takes care to propagate dispatchYield.
  • Make dispatchYield a valid optimization in a wide range of scenarios (including things like "skip suspensions on Dispatchers.Main if the looper is empty anyway") by recognizing the coroutine not being yet dispatched to the correct execution context as a separate state. EDIT: ... and make dispatchYield public, as it will no longer be deeply tied to our implementation details.

Thoughts?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct, and it seems like the proper solution includes a solution for Delay as well.
In general, it's not really sustainable to include @Internal* methods or superinterfaces into the class that we expect people to extend.

Probably too slow to check that on every dispatch?

Yes, this option we'd rather avoid -- quite a non-deterministic check for the sake of code path that is really rare.

I would merge this as is and, as a next step, would consider sticking to the third option.
Probably, making dispatchYield public should be enough, not sure about the yield optimization (as it's quite niche)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure about the yield optimization (as it's quite niche)

My idea actually wasn't to introduce the optimizations (they are even more niche than even yield(), which itself is niche) (though introducing the optimizations actually shouldn't be difficult) but to ensure that if someone does override dispatchYield in favor of a non-trivial optimized implementation, that implementation wouldn't break the way ours did in #4248. If we introduce dispatchYield for public implementation without fixing the state machine, the most obvious usage of dispatchYield becomes a footgun.

* To mitigate that, `dispatchYield` is a dedicated entry point that produces [#1, #2, #3, #4]
*/
coroutineScheduler.dispatch(block, fair = true)
}

internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
coroutineScheduler.dispatch(block, context, tailDispatch)
internal fun dispatchWithContext(block: Runnable, context: TaskContext, fair: Boolean) {
coroutineScheduler.dispatch(block, context, fair)
}

override fun close() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package kotlinx.coroutines.scheduling

import kotlinx.coroutines.testing.*
import kotlinx.coroutines.*
import org.junit.Test
import java.lang.Runnable
import java.util.concurrent.*
Expand Down Expand Up @@ -80,7 +79,7 @@ class CoroutineSchedulerTest : TestBase() {
it.dispatch(Runnable {
expect(2)
finishLatch.countDown()
}, tailDispatch = true)
}, fair = true)
})

startLatch.countDown()
Expand Down