Skip to content

Commit d30af7c

Browse files
authored
Document and fix dispatchYield codepath (Kotlin#4255)
* Treat dispatchYield as regular dispatch in CoroutineScheduler * Always unpark a worker to avoid potential starvation in cases when coroutine was launched via UNDISPATCHED mechanism Fixes Kotlin#4248
1 parent 46f9ccc commit d30af7c

File tree

4 files changed

+28
-19
lines changed

4 files changed

+28
-19
lines changed

kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt

+3
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,9 @@ public abstract class CoroutineDispatcher :
222222
* Though the `yield` marker may be passed as a part of [context], this
223223
* is a separate method for performance reasons.
224224
*
225+
* Implementation note: this entry-point is used for `Dispatchers.IO` and [Dispatchers.Default]
226+
* unerlying implementations, see overrides for this method.
227+
*
225228
* @suppress **This an internal API and should not be used from general code.**
226229
*/
227230
@InternalCoroutinesApi

kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt

+10-13
Original file line numberDiff line numberDiff line change
@@ -384,14 +384,14 @@ internal class CoroutineScheduler(
384384
* this [block] may execute blocking operations (IO, system calls, locking primitives etc.)
385385
*
386386
* [taskContext] -- concurrency context of given [block].
387-
* [tailDispatch] -- whether this [dispatch] call is the last action the (presumably) worker thread does in its current task.
388-
* If `true`, then the task will be dispatched in a FIFO manner and no additional workers will be requested,
389-
* but only if the current thread is a corresponding worker thread.
387+
* [fair] -- whether this [dispatch] call is fair.
388+
* If `true` then the task will be dispatched in a FIFO manner.
390389
* Note that caller cannot be ensured that it is being executed on worker thread for the following reasons:
391390
* - [CoroutineStart.UNDISPATCHED]
392-
* - Concurrent [close] that effectively shutdowns the worker thread
391+
* - Concurrent [close] that effectively shutdowns the worker thread.
392+
* Used for [yield].
393393
*/
394-
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
394+
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false) {
395395
trackTask() // this is needed for virtual time support
396396
val task = createTask(block, taskContext)
397397
val isBlockingTask = task.isBlocking
@@ -400,20 +400,18 @@ internal class CoroutineScheduler(
400400
val stateSnapshot = if (isBlockingTask) incrementBlockingTasks() else 0
401401
// try to submit the task to the local queue and act depending on the result
402402
val currentWorker = currentWorker()
403-
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
403+
val notAdded = currentWorker.submitToLocalQueue(task, fair)
404404
if (notAdded != null) {
405405
if (!addToGlobalQueue(notAdded)) {
406406
// Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
407407
throw RejectedExecutionException("$schedulerName was terminated")
408408
}
409409
}
410-
val skipUnpark = tailDispatch && currentWorker != null
411410
// Checking 'task' instead of 'notAdded' is completely okay
412411
if (isBlockingTask) {
413412
// Use state snapshot to better estimate the number of running threads
414-
signalBlockingWork(stateSnapshot, skipUnpark = skipUnpark)
413+
signalBlockingWork(stateSnapshot)
415414
} else {
416-
if (skipUnpark) return
417415
signalCpuWork()
418416
}
419417
}
@@ -429,8 +427,7 @@ internal class CoroutineScheduler(
429427
}
430428

431429
// NB: should only be called from 'dispatch' method due to blocking tasks increment
432-
private fun signalBlockingWork(stateSnapshot: Long, skipUnpark: Boolean) {
433-
if (skipUnpark) return
430+
private fun signalBlockingWork(stateSnapshot: Long) {
434431
if (tryUnpark()) return
435432
// Use state snapshot to avoid accidental thread overprovision
436433
if (tryCreateWorker(stateSnapshot)) return
@@ -506,7 +503,7 @@ internal class CoroutineScheduler(
506503
* Returns `null` if task was successfully added or an instance of the
507504
* task that was not added or replaced (thus should be added to global queue).
508505
*/
509-
private fun Worker?.submitToLocalQueue(task: Task, tailDispatch: Boolean): Task? {
506+
private fun Worker?.submitToLocalQueue(task: Task, fair: Boolean): Task? {
510507
if (this == null) return task
511508
/*
512509
* This worker could have been already terminated from this thread by close/shutdown and it should not
@@ -518,7 +515,7 @@ internal class CoroutineScheduler(
518515
return task
519516
}
520517
mayHaveLocalTasks = true
521-
return localQueue.add(task, fair = tailDispatch)
518+
return localQueue.add(task, fair = fair)
522519
}
523520

524521
private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }

kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt

+14-4
Original file line numberDiff line numberDiff line change
@@ -113,11 +113,21 @@ internal open class SchedulerCoroutineDispatcher(
113113

114114
override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
115115

116-
override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
117-
coroutineScheduler.dispatch(block, tailDispatch = true)
116+
override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit {
117+
/*
118+
* 'dispatchYield' implementation is needed to address the scheduler's scheduling policy.
119+
* By default, the scheduler dispatches tasks in a semi-LIFO order, meaning that for the
120+
* task sequence [#1, #2, #3], the scheduling of task #4 will produce
121+
* [#4, #1, #2, #3], allocates new worker and makes #4 stealable after some time.
122+
* On a fast enough system, it means that `while (true) { yield() }` might obstruct the progress
123+
* of the system and potentially starve it.
124+
* To mitigate that, `dispatchYield` is a dedicated entry point that produces [#1, #2, #3, #4]
125+
*/
126+
coroutineScheduler.dispatch(block, fair = true)
127+
}
118128

119-
internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
120-
coroutineScheduler.dispatch(block, context, tailDispatch)
129+
internal fun dispatchWithContext(block: Runnable, context: TaskContext, fair: Boolean) {
130+
coroutineScheduler.dispatch(block, context, fair)
121131
}
122132

123133
override fun close() {

kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerTest.kt

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package kotlinx.coroutines.scheduling
22

33
import kotlinx.coroutines.testing.*
4-
import kotlinx.coroutines.*
54
import org.junit.Test
65
import java.lang.Runnable
76
import java.util.concurrent.*
@@ -80,7 +79,7 @@ class CoroutineSchedulerTest : TestBase() {
8079
it.dispatch(Runnable {
8180
expect(2)
8281
finishLatch.countDown()
83-
}, tailDispatch = true)
82+
}, fair = true)
8483
})
8584

8685
startLatch.countDown()

0 commit comments

Comments
 (0)