Skip to content

Commit 68da30d

Browse files
committed
Document dispatchYield
1 parent 1fcffbf commit 68da30d

File tree

4 files changed

+27
-14
lines changed

4 files changed

+27
-14
lines changed

kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt

+3
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,9 @@ public abstract class CoroutineDispatcher :
222222
* Though the `yield` marker may be passed as a part of [context], this
223223
* is a separate method for performance reasons.
224224
*
225+
* Implementation note: this entry-point is used for `Dispatchers.IO` and [Dispatchers.Default]
226+
* unerlying implementations, see overrides for this method.
227+
*
225228
* @suppress **This an internal API and should not be used from general code.**
226229
*/
227230
@InternalCoroutinesApi

kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt

+9-8
Original file line numberDiff line numberDiff line change
@@ -384,14 +384,15 @@ internal class CoroutineScheduler(
384384
* this [block] may execute blocking operations (IO, system calls, locking primitives etc.)
385385
*
386386
* [taskContext] -- concurrency context of given [block].
387-
* [tailDispatch] -- whether this [dispatch] call is the last action the (presumably) worker thread does in its current task.
388-
* If `true`, then the task will be dispatched in a FIFO manner and no additional workers will be requested,
387+
* [fair] -- whether this [dispatch] call is fair.
388+
* If `true` then the task will be dispatched in a FIFO manner and no additional workers will be requested,
389389
* but only if the current thread is a corresponding worker thread.
390390
* Note that caller cannot be ensured that it is being executed on worker thread for the following reasons:
391391
* - [CoroutineStart.UNDISPATCHED]
392-
* - Concurrent [close] that effectively shutdowns the worker thread
392+
* - Concurrent [close] that effectively shutdowns the worker thread.
393+
* Used for [yield].
393394
*/
394-
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
395+
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false) {
395396
trackTask() // this is needed for virtual time support
396397
val task = createTask(block, taskContext)
397398
val isBlockingTask = task.isBlocking
@@ -400,14 +401,14 @@ internal class CoroutineScheduler(
400401
val stateSnapshot = if (isBlockingTask) incrementBlockingTasks() else 0
401402
// try to submit the task to the local queue and act depending on the result
402403
val currentWorker = currentWorker()
403-
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
404+
val notAdded = currentWorker.submitToLocalQueue(task, fair)
404405
if (notAdded != null) {
405406
if (!addToGlobalQueue(notAdded)) {
406407
// Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
407408
throw RejectedExecutionException("$schedulerName was terminated")
408409
}
409410
}
410-
val skipUnpark = tailDispatch && currentWorker != null
411+
val skipUnpark = fair && currentWorker != null
411412
// Checking 'task' instead of 'notAdded' is completely okay
412413
if (isBlockingTask) {
413414
// Use state snapshot to better estimate the number of running threads
@@ -506,7 +507,7 @@ internal class CoroutineScheduler(
506507
* Returns `null` if task was successfully added or an instance of the
507508
* task that was not added or replaced (thus should be added to global queue).
508509
*/
509-
private fun Worker?.submitToLocalQueue(task: Task, tailDispatch: Boolean): Task? {
510+
private fun Worker?.submitToLocalQueue(task: Task, fair: Boolean): Task? {
510511
if (this == null) return task
511512
/*
512513
* This worker could have been already terminated from this thread by close/shutdown and it should not
@@ -518,7 +519,7 @@ internal class CoroutineScheduler(
518519
return task
519520
}
520521
mayHaveLocalTasks = true
521-
return localQueue.add(task, fair = tailDispatch)
522+
return localQueue.add(task, fair = fair)
522523
}
523524

524525
private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }

kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt

+14-4
Original file line numberDiff line numberDiff line change
@@ -113,11 +113,21 @@ internal open class SchedulerCoroutineDispatcher(
113113

114114
override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
115115

116-
override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
117-
coroutineScheduler.dispatch(block, tailDispatch = true)
116+
override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit {
117+
/*
118+
* 'dispatchYield' implementation is needed to address the scheduler's scheduling policy.
119+
* By default, the scheduler dispatches tasks in a semi-LIFO order, meaning that for the
120+
* task sequence [#1, #2, #3], the scheduling of task #4 will produce
121+
* [#4, #1, #2, #3], allocates new worker and makes #4 stealable after some time.
122+
* On a fast enough system, it means that `while (true) { yield() }` might obstruct the progress
123+
* of the system and potentially starve it.
124+
* To mitigate that, `dispatchYield` is a dedicated entry point that produces [#1, #2, #3, #4]
125+
*/
126+
coroutineScheduler.dispatch(block, fair = true)
127+
}
118128

119-
internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
120-
coroutineScheduler.dispatch(block, context, tailDispatch)
129+
internal fun dispatchWithContext(block: Runnable, context: TaskContext, fair: Boolean) {
130+
coroutineScheduler.dispatch(block, context, fair)
121131
}
122132

123133
override fun close() {

kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerTest.kt

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package kotlinx.coroutines.scheduling
22

33
import kotlinx.coroutines.testing.*
4-
import kotlinx.coroutines.*
54
import org.junit.Test
65
import java.lang.Runnable
76
import java.util.concurrent.*
@@ -80,7 +79,7 @@ class CoroutineSchedulerTest : TestBase() {
8079
it.dispatch(Runnable {
8180
expect(2)
8281
finishLatch.countDown()
83-
}, tailDispatch = true)
82+
}, fair = true)
8483
})
8584

8685
startLatch.countDown()

0 commit comments

Comments
 (0)