Skip to content

Coroutine scheduler is used by default instead of deprecated CommonPool #633

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ public object Dispatchers {
* [launch][CoroutineScope.launch], [async][CoroutineScope.async], etc
* if no dispatcher nor any other [ContinuationInterceptor] is specified in their context.
*
* It is backed by a shared pool of threads on JVM.
* You can set system property "`kotlinx.coroutines.scheduler`" (either no value or to the value of "`on`")
* to use an experimental coroutine dispatcher that shares threads with [Dispatchers.IO] and thus can switch to
* context without performing an actual thread context switch.
* It is backed by a shared pool of threads on JVM. By default, the maximal number of threads used
* by this dispatcher is equal to the number CPU cores, but is at least two.
*/
@JvmField
public val Default: CoroutineDispatcher =
Expand Down
13 changes: 5 additions & 8 deletions core/kotlinx-coroutines-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ tasks.withType(Test) {

test {
exclude '**/*LFTest.*'
systemProperty 'kotlinx.coroutines.scheduler.keep.alive.sec', '100000' // any unpark problem hangs test
}

task lockFreedomTest(type: Test, dependsOn: testClasses) {
Expand All @@ -48,18 +49,14 @@ task jdk16Test(type: Test, dependsOn: [testClasses, checkJdk16]) {
exclude '**/exceptions/.*'
}

task schedulerTest(type: Test, dependsOn: testClasses) {
systemProperty 'kotlinx.coroutines.scheduler', ''
systemProperty 'kotlinx.coroutines.scheduler.keep.alive.sec', '100000' // any unpark problem hangs test
}

// Always run those tests
task moreTest(dependsOn: [lockFreedomTest])

build.dependsOn moreTest

// Run these tests only during nightly stress test
task extraTest(dependsOn: [jdk16Test, schedulerTest])
task extraTest(dependsOn: [jdk16Test])


build.dependsOn moreTest

extraTest.onlyIf { project.properties['stressTest'] != null }

Expand Down
2 changes: 1 addition & 1 deletion core/kotlinx-coroutines-core/src/CommonPool.kt
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ object CommonPool : ExecutorCoroutineDispatcher() {
pool ?: createPool().also { pool = it }

override fun dispatch(context: CoroutineContext, block: Runnable) =
try { (pool ?: getOrCreatePoolSync()).execute(timeSource.trackTask(block)) }
try { (pool ?: getOrCreatePoolSync()).execute(timeSource.wrapTask(block)) }
catch (e: RejectedExecutionException) {
timeSource.unTrackTask()
DefaultExecutor.execute(block)
Expand Down
6 changes: 3 additions & 3 deletions core/kotlinx-coroutines-core/src/CoroutineContext.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ internal const val COROUTINES_SCHEDULER_PROPERTY_NAME = "kotlinx.coroutines.sche

internal val useCoroutinesScheduler = systemProp(COROUTINES_SCHEDULER_PROPERTY_NAME).let { value ->
when (value) {
null -> false
"", "on" -> true
null, "", "on" -> true
"off" -> false
else -> error("System property '$COROUTINES_SCHEDULER_PROPERTY_NAME' has unrecognized value '$value'")
}
}
Expand All @@ -39,7 +39,7 @@ public actual val DefaultDispatcher: CoroutineDispatcher
get() = Dispatchers.Default

internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
if (useCoroutinesScheduler) BackgroundDispatcher else CommonPool
if (useCoroutinesScheduler) DefaultScheduler else CommonPool

/**
* The [CoroutineDispatcher] that is designed for offloading blocking IO tasks to a shared pool of threads.
Expand Down
6 changes: 5 additions & 1 deletion core/kotlinx-coroutines-core/src/Dispatchers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ public const val IO_PARALLELISM_PROPERTY_NAME = "kotlinx.coroutines.io.paralleli
* The number of threads used by this dispatcher is limited by the value of
* "`kotlinx.coroutines.io.parallelism`" ([IO_PARALLELISM_PROPERTY_NAME]) system property.
* It defaults to the limit of 64 threads or the number of cores (whichever is larger).
*
* This dispatcher shares threads with a [Default][Dispatchers.Default] dispatcher, so using
* `withContext(Dispatchers.IO) { ... }` does not lead to an actual switching to another thread —
* typically execution continues in the same thread.
*/
public val Dispatchers.IO: CoroutineDispatcher
get() = BackgroundDispatcher.IO
get() = DefaultScheduler.IO
2 changes: 1 addition & 1 deletion core/kotlinx-coroutines-core/src/Executors.kt
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatc
}

override fun dispatch(context: CoroutineContext, block: Runnable) =
try { executor.execute(timeSource.trackTask(block)) }
try { executor.execute(timeSource.wrapTask(block)) }
catch (e: RejectedExecutionException) {
timeSource.unTrackTask()
DefaultExecutor.execute(block)
Expand Down
6 changes: 4 additions & 2 deletions core/kotlinx-coroutines-core/src/TimeSource.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import java.util.concurrent.locks.LockSupport
internal interface TimeSource {
fun currentTimeMillis(): Long
fun nanoTime(): Long
fun trackTask(block: Runnable): Runnable
fun wrapTask(block: Runnable): Runnable
fun trackTask()
fun unTrackTask()
fun registerTimeLoopThread()
fun unregisterTimeLoopThread()
Expand All @@ -20,7 +21,8 @@ internal interface TimeSource {
internal object DefaultTimeSource : TimeSource {
override fun currentTimeMillis(): Long = System.currentTimeMillis()
override fun nanoTime(): Long = System.nanoTime()
override fun trackTask(block: Runnable): Runnable = block
override fun wrapTask(block: Runnable): Runnable = block
override fun trackTask() {}
override fun unTrackTask() {}
override fun registerTimeLoopThread() {}
override fun unregisterTimeLoopThread() {}
Expand Down
109 changes: 52 additions & 57 deletions core/kotlinx-coroutines-core/src/scheduling/CoroutineScheduler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,9 @@ internal class CoroutineScheduler(

private val random = Random()

// This is used a "stop signal" for debugging/tests only
private val isTerminated = atomic(false)
// This is used a "stop signal" for close and shutdown functions
private val _isTerminated = atomic(0) // todo: replace with atomic boolean on new versions of atomicFu
private val isTerminated: Boolean get() = _isTerminated.value != 0

companion object {
private const val MAX_SPINS = 1000
Expand Down Expand Up @@ -290,35 +291,33 @@ internal class CoroutineScheduler(

override fun execute(command: Runnable) = dispatch(command)

override fun close() = shutdown(1000L)
override fun close() = shutdown(10_000L)

/*
* Shuts down current scheduler and waits until all threads are stopped.
* This method uses unsafe API (does unconditional unparks)
* and intended to be used only for testing. Invocation has no additional effect if already closed.
*/
// Shuts down current scheduler and waits until all work is done and all threads are stopped.
fun shutdown(timeout: Long) {
// atomically set termination flag which is checked when workers are added or removed
if (!isTerminated.compareAndSet(false, true)) return

/*
* Shutdown current thread. Note that shutdown is testing utility,
* so we don't do anything special to properly verify that no tasks are submitted after close()
*/
val thread = Thread.currentThread()
(thread as? Worker)?.tryReleaseCpu(WorkerState.TERMINATED)

if (!_isTerminated.compareAndSet(0, 1)) return
// make sure we are not waiting for the current thread
val currentWorker = Thread.currentThread() as? Worker
// Capture # of created workers that cannot change anymore (mind the synchronized block!)
val created = synchronized(workers) { createdWorkers }
for (i in 1..created) {
val worker = workers[i]!!
if (worker.isAlive) {
// Unparking alive thread is unsafe in general, but acceptable for testing purposes
if (worker.isAlive && worker !== currentWorker) {
LockSupport.unpark(worker)
worker.join(timeout)
worker.localQueue.offloadAllWork(globalQueue)
}

}
// Finish processing tasks from globalQueue and/or from this worker's local queue
while (true) {
val task = currentWorker?.findTask() ?: globalQueue.removeFirstOrNull() ?: break
runSafely(task)
}
// cleanup state to make sure that tryUnpark tries to create new threads and crashes because it isTerminated
// Shutdown current thread
currentWorker?.tryReleaseCpu(WorkerState.TERMINATED)
// cleanup state to make sure that tryUnpark tries to create new threads and fails because isTerminated
assert(cpuPermits.availablePermits() == corePoolSize)
parkedWorkersStack.value = 0L
controlState.value = 0L
Expand All @@ -333,6 +332,7 @@ internal class CoroutineScheduler(
* @param fair whether the task should be dispatched fairly (strict FIFO) or not (semi-FIFO)
*/
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false) {
timeSource.trackTask() // this is needed for virtual time support
// TODO at some point make DispatchTask extend Task and make its field settable to save an allocation
val task = Task(block, schedulerTimeSource.nanoTime(), taskContext)
// try to submit the task to the local queue and act depending on the result
Expand Down Expand Up @@ -439,7 +439,7 @@ internal class CoroutineScheduler(
private fun createNewWorker(): Int {
synchronized(workers) {
// Make sure we're not trying to resurrect terminated scheduler
if (isTerminated.value) throw ShutdownException()
if (isTerminated) throw RejectedExecutionException("$schedulerName was terminated")
val state = controlState.value
val created = createdWorkers(state)
val blocking = blockingWorkers(state)
Expand All @@ -456,9 +456,6 @@ internal class CoroutineScheduler(
}
}

// Is thrown when attempting to create new worker, but this scheduler isTerminated
private class ShutdownException : RuntimeException()

/**
* Returns [ADDED], or [NOT_ADDED], or [ADDED_REQUIRES_HELP].
*/
Expand Down Expand Up @@ -565,6 +562,17 @@ internal class CoroutineScheduler(
"]"
}

private fun runSafely(task: Task) {
try {
task.run()
} catch (e: Throwable) {
val thread = Thread.currentThread()
thread.uncaughtExceptionHandler.uncaughtException(thread, e)
} finally {
timeSource.unTrackTask()
}
}

internal inner class Worker private constructor() : Thread() {
init {
isDaemon = true
Expand Down Expand Up @@ -685,41 +693,28 @@ internal class CoroutineScheduler(
private var lastStealIndex = 0 // try in order repeated, reset when unparked

override fun run() {
try {
var wasIdle = false // local variable to avoid extra idleReset invocations when tasks repeatedly arrive
while (!isTerminated.value && state != WorkerState.TERMINATED) {
val task = findTask()
if (task == null) {
// Wait for a job with potential park
if (state == WorkerState.CPU_ACQUIRED) {
cpuWorkerIdle()
} else {
blockingWorkerIdle()
}
wasIdle = true
var wasIdle = false // local variable to avoid extra idleReset invocations when tasks repeatedly arrive
while (!isTerminated && state != WorkerState.TERMINATED) {
val task = findTask()
if (task == null) {
// Wait for a job with potential park
if (state == WorkerState.CPU_ACQUIRED) {
cpuWorkerIdle()
} else {
if (wasIdle) {
idleReset(task.mode)
wasIdle = false
}
beforeTask(task)
runSafely(task)
afterTask(task)
blockingWorkerIdle()
}
wasIdle = true
} else {
if (wasIdle) {
idleReset(task.mode)
wasIdle = false
}
beforeTask(task)
runSafely(task)
afterTask(task)
}
} catch (e: ShutdownException) {
// race with shutdown -- ignore exception and don't print it on the console
} finally {
tryReleaseCpu(WorkerState.TERMINATED)
}
}

private fun runSafely(task: Task) {
try {
task.run()
} catch (t: Throwable) {
uncaughtExceptionHandler.uncaughtException(this, t)
}
tryReleaseCpu(WorkerState.TERMINATED)
}

private fun beforeTask(task: Task) {
Expand Down Expand Up @@ -823,7 +818,7 @@ internal class CoroutineScheduler(
private fun tryTerminateWorker() {
synchronized(workers) {
// Make sure we're not trying race with termination of scheduler
if (isTerminated.value) throw ShutdownException()
if (isTerminated) return
// Someone else terminated, bail out
if (createdWorkers <= corePoolSize) return
// Try to find blocking task before termination
Expand Down Expand Up @@ -906,7 +901,7 @@ internal class CoroutineScheduler(
spins = 0 // Volatile write, should be written last
}

private fun findTask(): Task? {
internal fun findTask(): Task? {
if (tryAcquireCpuPermit()) return findTaskWithCpuPermit()
/*
* If the local queue is empty, try to extract blocking task from global queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,19 @@ package kotlinx.coroutines.experimental.scheduling
import kotlinx.atomicfu.*
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.internal.*
import java.lang.UnsupportedOperationException
import java.util.concurrent.*
import kotlin.coroutines.experimental.*

/**
* Default instance of coroutine dispatcher for background coroutines (as opposed to UI coroutines).
* Default instance of coroutine dispatcher.
*/
internal object BackgroundDispatcher : ExperimentalCoroutineDispatcher() {
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
val IO = blocking(systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)))

override fun close() {
throw UnsupportedOperationException("$DEFAULT_SCHEDULER_NAME cannot be closed")
}
}

/**
Expand All @@ -39,15 +44,14 @@ open class ExperimentalCoroutineDispatcher(
get() = coroutineScheduler

// This is variable for test purposes, so that we can reinitialize from clean state
private var coroutineScheduler = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs)
private var coroutineScheduler = createScheduler()

override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
coroutineScheduler.dispatch(block)

override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
coroutineScheduler.dispatch(block, fair = true)

// TODO throw error when this API becomes public and close it in tests via another method
override fun close() = coroutineScheduler.close()

override fun toString(): String {
Expand Down Expand Up @@ -82,16 +86,23 @@ open class ExperimentalCoroutineDispatcher(
internal fun dispatchWithContext(block: Runnable, context: TaskContext, fair: Boolean): Unit =
coroutineScheduler.dispatch(block, context, fair)

private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs)

// fot tests only
@Synchronized
internal fun usePrivateScheduler() {
coroutineScheduler.shutdown(1000L)
coroutineScheduler = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs)
coroutineScheduler.shutdown(10_000L)
coroutineScheduler = createScheduler()
}

// for tests only
@Synchronized
internal fun shutdown(timeout: Long) {
coroutineScheduler.shutdown(timeout)
}

// for tests only
internal fun restore() = usePrivateScheduler() // recreate scheduler
}

private class LimitingDispatcher(
Expand Down
2 changes: 1 addition & 1 deletion core/kotlinx-coroutines-core/src/scheduling/Tasks.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import java.util.concurrent.*

// TODO most of these fields will be moved to 'object ExperimentalDispatcher'

internal const val DEFAULT_SCHEDULER_NAME = "CoroutineScheduler"
internal const val DEFAULT_SCHEDULER_NAME = "DefaultDispatcher"

// 100us as default
@JvmField
Expand Down
7 changes: 7 additions & 0 deletions core/kotlinx-coroutines-core/src/scheduling/WorkQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@ internal class WorkQueue {
}
}

internal fun offloadAllWork(globalQueue: GlobalQueue) {
while (true) {
val task = pollExternal() ?: return
globalQueue.addLast(task)
}
}

/**
* [poll] for external (not owning this queue) workers
*/
Expand Down
Loading