Skip to content

Commit d1ab4df

Browse files
committed
Coroutine scheduler is used by default instead of deprecated CommonPool
* Documentation and guide are updated correspondingly * "DefaultDispatcher" is used as a public name of the default impl * Implementation is integrated with virtual time-source * Shutdown sequence is reimplemented in a safe way for tests, makes "close" safe to use on custom instances. * "close" on DefaultDispatcher throws exception just in case * -Dkotlinx.coroutines.scheduler=off can be used to switch back to CommonPool Fixes #198
1 parent 18cc588 commit d1ab4df

22 files changed

+132
-108
lines changed

common/kotlinx-coroutines-core-common/src/Dispatchers.common.kt

+2-4
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,8 @@ public object Dispatchers {
1616
* [launch][CoroutineScope.launch], [async][CoroutineScope.async], etc
1717
* if no dispatcher nor any other [ContinuationInterceptor] is specified in their context.
1818
*
19-
* It is backed by a shared pool of threads on JVM.
20-
* You can set system property "`kotlinx.coroutines.scheduler`" (either no value or to the value of "`on`")
21-
* to use an experimental coroutine dispatcher that shares threads with [Dispatchers.IO] and thus can switch to
22-
* context without performing an actual thread context switch.
19+
* It is backed by a shared pool of threads on JVM. By default, the maximal number of threads used
20+
* by this dispatcher is equal to the number CPU cores, but is at least two.
2321
*/
2422
@JvmField
2523
public val Default: CoroutineDispatcher =

core/kotlinx-coroutines-core/build.gradle

+5-8
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ tasks.withType(Test) {
3232

3333
test {
3434
exclude '**/*LFTest.*'
35+
systemProperty 'kotlinx.coroutines.scheduler.keep.alive.sec', '100000' // any unpark problem hangs test
3536
}
3637

3738
task lockFreedomTest(type: Test, dependsOn: testClasses) {
@@ -48,18 +49,14 @@ task jdk16Test(type: Test, dependsOn: [testClasses, checkJdk16]) {
4849
exclude '**/exceptions/.*'
4950
}
5051

51-
task schedulerTest(type: Test, dependsOn: testClasses) {
52-
systemProperty 'kotlinx.coroutines.scheduler', ''
53-
systemProperty 'kotlinx.coroutines.scheduler.keep.alive.sec', '100000' // any unpark problem hangs test
54-
}
55-
5652
// Always run those tests
5753
task moreTest(dependsOn: [lockFreedomTest])
5854

59-
build.dependsOn moreTest
60-
6155
// Run these tests only during nightly stress test
62-
task extraTest(dependsOn: [jdk16Test, schedulerTest])
56+
task extraTest(dependsOn: [jdk16Test])
57+
58+
59+
build.dependsOn moreTest
6360

6461
extraTest.onlyIf { project.properties['stressTest'] != null }
6562

core/kotlinx-coroutines-core/src/CommonPool.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ object CommonPool : ExecutorCoroutineDispatcher() {
106106
pool ?: createPool().also { pool = it }
107107

108108
override fun dispatch(context: CoroutineContext, block: Runnable) =
109-
try { (pool ?: getOrCreatePoolSync()).execute(timeSource.trackTask(block)) }
109+
try { (pool ?: getOrCreatePoolSync()).execute(timeSource.wrapTask(block)) }
110110
catch (e: RejectedExecutionException) {
111111
timeSource.unTrackTask()
112112
DefaultExecutor.execute(block)

core/kotlinx-coroutines-core/src/CoroutineContext.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ internal const val COROUTINES_SCHEDULER_PROPERTY_NAME = "kotlinx.coroutines.sche
2020

2121
internal val useCoroutinesScheduler = systemProp(COROUTINES_SCHEDULER_PROPERTY_NAME).let { value ->
2222
when (value) {
23-
null -> false
24-
"", "on" -> true
23+
null, "", "on" -> true
24+
"off" -> false
2525
else -> error("System property '$COROUTINES_SCHEDULER_PROPERTY_NAME' has unrecognized value '$value'")
2626
}
2727
}
@@ -39,7 +39,7 @@ public actual val DefaultDispatcher: CoroutineDispatcher
3939
get() = Dispatchers.Default
4040

4141
internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
42-
if (useCoroutinesScheduler) BackgroundDispatcher else CommonPool
42+
if (useCoroutinesScheduler) DefaultScheduler else CommonPool
4343

4444
/**
4545
* The [CoroutineDispatcher] that is designed for offloading blocking IO tasks to a shared pool of threads.

core/kotlinx-coroutines-core/src/Dispatchers.kt

+5-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ public const val IO_PARALLELISM_PROPERTY_NAME = "kotlinx.coroutines.io.paralleli
2020
* The number of threads used by this dispatcher is limited by the value of
2121
* "`kotlinx.coroutines.io.parallelism`" ([IO_PARALLELISM_PROPERTY_NAME]) system property.
2222
* It defaults to the limit of 64 threads or the number of cores (whichever is larger).
23+
*
24+
* This dispatcher shares threads with a [Default][Dispatchers.Default] dispatcher, so using
25+
* `withContext(Dispatchers.IO) { ... }` does not lead to an actual switching to another thread —
26+
* typically execution continues in the same thread.
2327
*/
2428
public val Dispatchers.IO: CoroutineDispatcher
25-
get() = BackgroundDispatcher.IO
29+
get() = DefaultScheduler.IO

core/kotlinx-coroutines-core/src/Executors.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ private class ExecutorCoroutineDispatcherImpl(override val executor: Executor) :
8383
public abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay {
8484

8585
override fun dispatch(context: CoroutineContext, block: Runnable) =
86-
try { executor.execute(timeSource.trackTask(block)) }
86+
try { executor.execute(timeSource.wrapTask(block)) }
8787
catch (e: RejectedExecutionException) {
8888
timeSource.unTrackTask()
8989
DefaultExecutor.execute(block)

core/kotlinx-coroutines-core/src/TimeSource.kt

+4-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ import java.util.concurrent.locks.LockSupport
99
internal interface TimeSource {
1010
fun currentTimeMillis(): Long
1111
fun nanoTime(): Long
12-
fun trackTask(block: Runnable): Runnable
12+
fun wrapTask(block: Runnable): Runnable
13+
fun trackTask()
1314
fun unTrackTask()
1415
fun registerTimeLoopThread()
1516
fun unregisterTimeLoopThread()
@@ -20,7 +21,8 @@ internal interface TimeSource {
2021
internal object DefaultTimeSource : TimeSource {
2122
override fun currentTimeMillis(): Long = System.currentTimeMillis()
2223
override fun nanoTime(): Long = System.nanoTime()
23-
override fun trackTask(block: Runnable): Runnable = block
24+
override fun wrapTask(block: Runnable): Runnable = block
25+
override fun trackTask() {}
2426
override fun unTrackTask() {}
2527
override fun registerTimeLoopThread() {}
2628
override fun unregisterTimeLoopThread() {}

core/kotlinx-coroutines-core/src/scheduling/CoroutineScheduler.kt

+48-54
Original file line numberDiff line numberDiff line change
@@ -290,35 +290,33 @@ internal class CoroutineScheduler(
290290

291291
override fun execute(command: Runnable) = dispatch(command)
292292

293-
override fun close() = shutdown(1000L)
293+
override fun close() = shutdown(10_000L)
294294

295-
/*
296-
* Shuts down current scheduler and waits until all threads are stopped.
297-
* This method uses unsafe API (does unconditional unparks)
298-
* and intended to be used only for testing. Invocation has no additional effect if already closed.
299-
*/
295+
// Shuts down current scheduler and waits until all work is done and all threads are stopped.
300296
fun shutdown(timeout: Long) {
301297
// atomically set termination flag which is checked when workers are added or removed
302298
if (!isTerminated.compareAndSet(false, true)) return
303-
304-
/*
305-
* Shutdown current thread. Note that shutdown is testing utility,
306-
* so we don't do anything special to properly verify that no tasks are submitted after close()
307-
*/
308-
val thread = Thread.currentThread()
309-
(thread as? Worker)?.tryReleaseCpu(WorkerState.TERMINATED)
310-
299+
// make sure we are not waiting for the current thread
300+
val currentWorker = Thread.currentThread() as? Worker
311301
// Capture # of created workers that cannot change anymore (mind the synchronized block!)
312302
val created = synchronized(workers) { createdWorkers }
313303
for (i in 1..created) {
314304
val worker = workers[i]!!
315-
if (worker.isAlive) {
316-
// Unparking alive thread is unsafe in general, but acceptable for testing purposes
305+
if (worker.isAlive && worker !== currentWorker) {
317306
LockSupport.unpark(worker)
318307
worker.join(timeout)
308+
worker.localQueue.offloadAllWork(globalQueue)
319309
}
310+
311+
}
312+
// Finish processing tasks from globalQueue and/or from this worker's local queue
313+
while (true) {
314+
val task = currentWorker?.findTask() ?: globalQueue.removeFirstOrNull() ?: break
315+
runSafely(task)
320316
}
321-
// cleanup state to make sure that tryUnpark tries to create new threads and crashes because it isTerminated
317+
// Shutdown current thread
318+
currentWorker?.tryReleaseCpu(WorkerState.TERMINATED)
319+
// cleanup state to make sure that tryUnpark tries to create new threads and fails because isTerminated
322320
assert(cpuPermits.availablePermits() == corePoolSize)
323321
parkedWorkersStack.value = 0L
324322
controlState.value = 0L
@@ -333,6 +331,7 @@ internal class CoroutineScheduler(
333331
* @param fair whether the task should be dispatched fairly (strict FIFO) or not (semi-FIFO)
334332
*/
335333
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false) {
334+
timeSource.trackTask() // this is needed for virtual time support
336335
// TODO at some point make DispatchTask extend Task and make its field settable to save an allocation
337336
val task = Task(block, schedulerTimeSource.nanoTime(), taskContext)
338337
// try to submit the task to the local queue and act depending on the result
@@ -439,7 +438,7 @@ internal class CoroutineScheduler(
439438
private fun createNewWorker(): Int {
440439
synchronized(workers) {
441440
// Make sure we're not trying to resurrect terminated scheduler
442-
if (isTerminated.value) throw ShutdownException()
441+
if (isTerminated.value) throw RejectedExecutionException("$schedulerName was terminated")
443442
val state = controlState.value
444443
val created = createdWorkers(state)
445444
val blocking = blockingWorkers(state)
@@ -456,9 +455,6 @@ internal class CoroutineScheduler(
456455
}
457456
}
458457

459-
// Is thrown when attempting to create new worker, but this scheduler isTerminated
460-
private class ShutdownException : RuntimeException()
461-
462458
/**
463459
* Returns [ADDED], or [NOT_ADDED], or [ADDED_REQUIRES_HELP].
464460
*/
@@ -565,6 +561,17 @@ internal class CoroutineScheduler(
565561
"]"
566562
}
567563

564+
private fun runSafely(task: Task) {
565+
try {
566+
task.run()
567+
} catch (e: Throwable) {
568+
val thread = Thread.currentThread()
569+
thread.uncaughtExceptionHandler.uncaughtException(thread, e)
570+
} finally {
571+
timeSource.unTrackTask()
572+
}
573+
}
574+
568575
internal inner class Worker private constructor() : Thread() {
569576
init {
570577
isDaemon = true
@@ -685,41 +692,28 @@ internal class CoroutineScheduler(
685692
private var lastStealIndex = 0 // try in order repeated, reset when unparked
686693

687694
override fun run() {
688-
try {
689-
var wasIdle = false // local variable to avoid extra idleReset invocations when tasks repeatedly arrive
690-
while (!isTerminated.value && state != WorkerState.TERMINATED) {
691-
val task = findTask()
692-
if (task == null) {
693-
// Wait for a job with potential park
694-
if (state == WorkerState.CPU_ACQUIRED) {
695-
cpuWorkerIdle()
696-
} else {
697-
blockingWorkerIdle()
698-
}
699-
wasIdle = true
695+
var wasIdle = false // local variable to avoid extra idleReset invocations when tasks repeatedly arrive
696+
while (!isTerminated.value && state != WorkerState.TERMINATED) {
697+
val task = findTask()
698+
if (task == null) {
699+
// Wait for a job with potential park
700+
if (state == WorkerState.CPU_ACQUIRED) {
701+
cpuWorkerIdle()
700702
} else {
701-
if (wasIdle) {
702-
idleReset(task.mode)
703-
wasIdle = false
704-
}
705-
beforeTask(task)
706-
runSafely(task)
707-
afterTask(task)
703+
blockingWorkerIdle()
708704
}
705+
wasIdle = true
706+
} else {
707+
if (wasIdle) {
708+
idleReset(task.mode)
709+
wasIdle = false
710+
}
711+
beforeTask(task)
712+
runSafely(task)
713+
afterTask(task)
709714
}
710-
} catch (e: ShutdownException) {
711-
// race with shutdown -- ignore exception and don't print it on the console
712-
} finally {
713-
tryReleaseCpu(WorkerState.TERMINATED)
714-
}
715-
}
716-
717-
private fun runSafely(task: Task) {
718-
try {
719-
task.run()
720-
} catch (t: Throwable) {
721-
uncaughtExceptionHandler.uncaughtException(this, t)
722715
}
716+
tryReleaseCpu(WorkerState.TERMINATED)
723717
}
724718

725719
private fun beforeTask(task: Task) {
@@ -823,7 +817,7 @@ internal class CoroutineScheduler(
823817
private fun tryTerminateWorker() {
824818
synchronized(workers) {
825819
// Make sure we're not trying race with termination of scheduler
826-
if (isTerminated.value) throw ShutdownException()
820+
if (isTerminated.value) return
827821
// Someone else terminated, bail out
828822
if (createdWorkers <= corePoolSize) return
829823
// Try to find blocking task before termination
@@ -906,7 +900,7 @@ internal class CoroutineScheduler(
906900
spins = 0 // Volatile write, should be written last
907901
}
908902

909-
private fun findTask(): Task? {
903+
internal fun findTask(): Task? {
910904
if (tryAcquireCpuPermit()) return findTaskWithCpuPermit()
911905
/*
912906
* If the local queue is empty, try to extract blocking task from global queue.

core/kotlinx-coroutines-core/src/scheduling/ExperimentalCoroutineDispatcher.kt renamed to core/kotlinx-coroutines-core/src/scheduling/Dispatcher.kt

+17-6
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,19 @@ package kotlinx.coroutines.experimental.scheduling
77
import kotlinx.atomicfu.*
88
import kotlinx.coroutines.experimental.*
99
import kotlinx.coroutines.experimental.internal.*
10+
import java.lang.UnsupportedOperationException
1011
import java.util.concurrent.*
1112
import kotlin.coroutines.experimental.*
1213

1314
/**
14-
* Default instance of coroutine dispatcher for background coroutines (as opposed to UI coroutines).
15+
* Default instance of coroutine dispatcher.
1516
*/
16-
internal object BackgroundDispatcher : ExperimentalCoroutineDispatcher() {
17+
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
1718
val IO = blocking(systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)))
19+
20+
override fun close() {
21+
throw UnsupportedOperationException("$DEFAULT_SCHEDULER_NAME cannot be closed")
22+
}
1823
}
1924

2025
/**
@@ -39,15 +44,14 @@ open class ExperimentalCoroutineDispatcher(
3944
get() = coroutineScheduler
4045

4146
// This is variable for test purposes, so that we can reinitialize from clean state
42-
private var coroutineScheduler = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs)
47+
private var coroutineScheduler = createScheduler()
4348

4449
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
4550
coroutineScheduler.dispatch(block)
4651

4752
override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
4853
coroutineScheduler.dispatch(block, fair = true)
4954

50-
// TODO throw error when this API becomes public and close it in tests via another method
5155
override fun close() = coroutineScheduler.close()
5256

5357
override fun toString(): String {
@@ -82,16 +86,23 @@ open class ExperimentalCoroutineDispatcher(
8286
internal fun dispatchWithContext(block: Runnable, context: TaskContext, fair: Boolean): Unit =
8387
coroutineScheduler.dispatch(block, context, fair)
8488

89+
private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs)
90+
8591
// fot tests only
92+
@Synchronized
8693
internal fun usePrivateScheduler() {
87-
coroutineScheduler.shutdown(1000L)
88-
coroutineScheduler = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs)
94+
coroutineScheduler.shutdown(10_000L)
95+
coroutineScheduler = createScheduler()
8996
}
9097

9198
// for tests only
99+
@Synchronized
92100
internal fun shutdown(timeout: Long) {
93101
coroutineScheduler.shutdown(timeout)
94102
}
103+
104+
// for tests only
105+
internal fun restore() = usePrivateScheduler() // recreate scheduler
95106
}
96107

97108
private class LimitingDispatcher(

core/kotlinx-coroutines-core/src/scheduling/Tasks.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import java.util.concurrent.*
1111

1212
// TODO most of these fields will be moved to 'object ExperimentalDispatcher'
1313

14-
internal const val DEFAULT_SCHEDULER_NAME = "CoroutineScheduler"
14+
internal const val DEFAULT_SCHEDULER_NAME = "DefaultDispatcher"
1515

1616
// 100us as default
1717
@JvmField

core/kotlinx-coroutines-core/src/scheduling/WorkQueue.kt

+7
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,13 @@ internal class WorkQueue {
144144
}
145145
}
146146

147+
internal fun offloadAllWork(globalQueue: GlobalQueue) {
148+
while (true) {
149+
val task = pollExternal() ?: return
150+
globalQueue.addLast(task)
151+
}
152+
}
153+
147154
/**
148155
* [poll] for external (not owning this queue) workers
149156
*/

0 commit comments

Comments
 (0)