Skip to content

Commit c526c3f

Browse files
authored
Do not request additional worker from 'yield' calls and during post-e… (#1728)
Do not request additional worker from 'yield' calls and during post-execution phase in LimitingDispatcher Fixes #1704 Fixes #1706
1 parent 70e3583 commit c526c3f

7 files changed

+66
-23
lines changed

kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt

+22-12
Original file line numberDiff line numberDiff line change
@@ -372,25 +372,34 @@ internal class CoroutineScheduler(
372372
* Dispatches execution of a runnable [block] with a hint to a scheduler whether
373373
* this [block] may execute blocking operations (IO, system calls, locking primitives etc.)
374374
*
375-
* @param taskContext concurrency context of given [block]
376-
* @param fair whether the task should be dispatched fairly (strict FIFO) or not (semi-FIFO)
375+
* [taskContext] -- concurrency context of given [block].
376+
* [tailDispatch] -- whether this [dispatch] call is the last action the (presumably) worker thread does in its current task.
377+
* If `true`, then the task will be dispatched in a FIFO manner and no additional workers will be requested,
378+
* but only if the current thread is a corresponding worker thread.
379+
* Note that caller cannot be ensured that it is being executed on worker thread for the following reasons:
380+
* * [CoroutineStart.UNDISPATCHED]
381+
* * Concurrent [close] that effectively shutdowns the worker thread
377382
*/
378-
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false) {
383+
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
379384
trackTask() // this is needed for virtual time support
380385
val task = createTask(block, taskContext)
381386
// try to submit the task to the local queue and act depending on the result
382-
val notAdded = submitToLocalQueue(task, fair)
387+
val currentWorker = currentWorker()
388+
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
383389
if (notAdded != null) {
384390
if (!addToGlobalQueue(notAdded)) {
385391
// Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
386392
throw RejectedExecutionException("$schedulerName was terminated")
387393
}
388394
}
395+
val skipUnpark = tailDispatch && currentWorker != null
389396
// Checking 'task' instead of 'notAdded' is completely okay
390397
if (task.mode == TaskMode.NON_BLOCKING) {
398+
if (skipUnpark) return
391399
signalCpuWork()
392400
} else {
393-
signalBlockingWork()
401+
// Increment blocking tasks anyway
402+
signalBlockingWork(skipUnpark = skipUnpark)
394403
}
395404
}
396405

@@ -404,9 +413,10 @@ internal class CoroutineScheduler(
404413
return TaskImpl(block, nanoTime, taskContext)
405414
}
406415

407-
private fun signalBlockingWork() {
416+
private fun signalBlockingWork(skipUnpark: Boolean) {
408417
// Use state snapshot to avoid thread overprovision
409418
val stateSnapshot = incrementBlockingTasks()
419+
if (skipUnpark) return
410420
if (tryUnpark()) return
411421
if (tryCreateWorker(stateSnapshot)) return
412422
tryUnpark() // Try unpark again in case there was race between permit release and parking
@@ -481,19 +491,19 @@ internal class CoroutineScheduler(
481491
* Returns `null` if task was successfully added or an instance of the
482492
* task that was not added or replaced (thus should be added to global queue).
483493
*/
484-
private fun submitToLocalQueue(task: Task, fair: Boolean): Task? {
485-
val worker = currentWorker() ?: return task
494+
private fun Worker?.submitToLocalQueue(task: Task, tailDispatch: Boolean): Task? {
495+
if (this == null) return task
486496
/*
487497
* This worker could have been already terminated from this thread by close/shutdown and it should not
488498
* accept any more tasks into its local queue.
489499
*/
490-
if (worker.state === WorkerState.TERMINATED) return task
500+
if (state === WorkerState.TERMINATED) return task
491501
// Do not add CPU tasks in local queue if we are not able to execute it
492-
if (task.mode === TaskMode.NON_BLOCKING && worker.state === WorkerState.BLOCKING) {
502+
if (task.mode === TaskMode.NON_BLOCKING && state === WorkerState.BLOCKING) {
493503
return task
494504
}
495-
worker.mayHaveLocalTasks = true
496-
return worker.localQueue.add(task, fair = fair)
505+
mayHaveLocalTasks = true
506+
return localQueue.add(task, fair = tailDispatch)
497507
}
498508

499509
private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }

kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt

+9-5
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ open class ExperimentalCoroutineDispatcher(
6565

6666
override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
6767
try {
68-
coroutineScheduler.dispatch(block, fair = true)
68+
coroutineScheduler.dispatch(block, tailDispatch = true)
6969
} catch (e: RejectedExecutionException) {
7070
DefaultExecutor.dispatchYield(context, block)
7171
}
@@ -101,9 +101,9 @@ open class ExperimentalCoroutineDispatcher(
101101
return LimitingDispatcher(this, parallelism, TaskMode.NON_BLOCKING)
102102
}
103103

104-
internal fun dispatchWithContext(block: Runnable, context: TaskContext, fair: Boolean) {
104+
internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
105105
try {
106-
coroutineScheduler.dispatch(block, context, fair)
106+
coroutineScheduler.dispatch(block, context, tailDispatch)
107107
} catch (e: RejectedExecutionException) {
108108
// Context shouldn't be lost here to properly invoke before/after task
109109
DefaultExecutor.enqueue(coroutineScheduler.createTask(block, context))
@@ -147,15 +147,15 @@ private class LimitingDispatcher(
147147

148148
override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)
149149

150-
private fun dispatch(block: Runnable, fair: Boolean) {
150+
private fun dispatch(block: Runnable, tailDispatch: Boolean) {
151151
var taskToSchedule = block
152152
while (true) {
153153
// Commit in-flight tasks slot
154154
val inFlight = inFlightTasks.incrementAndGet()
155155

156156
// Fast path, if parallelism limit is not reached, dispatch task and return
157157
if (inFlight <= parallelism) {
158-
dispatcher.dispatchWithContext(taskToSchedule, this, fair)
158+
dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
159159
return
160160
}
161161

@@ -185,6 +185,10 @@ private class LimitingDispatcher(
185185
}
186186
}
187187

188+
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
189+
dispatch(block, tailDispatch = true)
190+
}
191+
188192
override fun toString(): String {
189193
return "${super.toString()}[dispatcher = $dispatcher]"
190194
}

kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt

+18-3
Original file line numberDiff line numberDiff line change
@@ -194,10 +194,10 @@ class BlockingCoroutineDispatcherTest : SchedulerTestBase() {
194194
fun testYield() = runBlocking {
195195
corePoolSize = 1
196196
maxPoolSize = 1
197-
val ds = blockingDispatcher(1)
198-
val outerJob = launch(ds) {
197+
val bd = blockingDispatcher(1)
198+
val outerJob = launch(bd) {
199199
expect(1)
200-
val innerJob = launch(ds) {
200+
val innerJob = launch(bd) {
201201
// Do nothing
202202
expect(3)
203203
}
@@ -215,6 +215,21 @@ class BlockingCoroutineDispatcherTest : SchedulerTestBase() {
215215
finish(5)
216216
}
217217

218+
@Test
219+
fun testUndispatchedYield() = runTest {
220+
expect(1)
221+
corePoolSize = 1
222+
maxPoolSize = 1
223+
val blockingDispatcher = blockingDispatcher(1)
224+
val job = launch(blockingDispatcher, CoroutineStart.UNDISPATCHED) {
225+
expect(2)
226+
yield()
227+
}
228+
expect(3)
229+
job.join()
230+
finish(4)
231+
}
232+
218233
@Test(expected = IllegalArgumentException::class)
219234
fun testNegativeParallelism() {
220235
blockingDispatcher(-1)

kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherThreadLimitStressTest.kt

-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ class BlockingCoroutineDispatcherThreadLimitStressTest : SchedulerTestBase() {
2121
private val concurrentWorkers = AtomicInteger(0)
2222

2323
@Test
24-
@Ignore
2524
fun testLimitParallelismToOne() = runTest {
2625
val limitingDispatcher = blockingDispatcher(1)
2726
// Do in bursts to avoid OOM

kotlinx-coroutines-core/jvm/test/scheduling/CoroutineDispatcherTest.kt

+12
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,18 @@ class CoroutineDispatcherTest : SchedulerTestBase() {
117117
finish(5)
118118
}
119119

120+
@Test
121+
fun testUndispatchedYield() = runTest {
122+
expect(1)
123+
val job = launch(dispatcher, CoroutineStart.UNDISPATCHED) {
124+
expect(2)
125+
yield()
126+
}
127+
expect(3)
128+
job.join()
129+
finish(4)
130+
}
131+
120132
@Test
121133
fun testThreadName() = runBlocking {
122134
val initialCount = Thread.getAllStackTraces().keys.asSequence()

kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerCloseStressTest.kt

+4-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import org.junit.Test
1010
import org.junit.runner.*
1111
import org.junit.runners.*
1212
import java.util.*
13-
import java.util.concurrent.*
1413
import kotlin.test.*
1514

1615
@RunWith(Parameterized::class)
@@ -79,6 +78,10 @@ class CoroutineSchedulerCloseStressTest(private val mode: Mode) : TestBase() {
7978
} else {
8079
if (rnd.nextBoolean()) {
8180
delay(1000)
81+
val t = Thread.currentThread()
82+
if (!t.name.contains("DefaultDispatcher-worker")) {
83+
val a = 2
84+
}
8285
} else {
8386
yield()
8487
}

kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ class CoroutineSchedulerTest : TestBase() {
8282
it.dispatch(Runnable {
8383
expect(2)
8484
finishLatch.countDown()
85-
}, fair = true)
85+
}, tailDispatch = true)
8686
})
8787

8888
startLatch.countDown()

0 commit comments

Comments
 (0)