diff --git a/common/kotlinx-coroutines-core-common/src/Dispatchers.common.kt b/common/kotlinx-coroutines-core-common/src/Dispatchers.common.kt index 54268911a0..4e723eddd4 100644 --- a/common/kotlinx-coroutines-core-common/src/Dispatchers.common.kt +++ b/common/kotlinx-coroutines-core-common/src/Dispatchers.common.kt @@ -16,10 +16,8 @@ public object Dispatchers { * [launch][CoroutineScope.launch], [async][CoroutineScope.async], etc * if no dispatcher nor any other [ContinuationInterceptor] is specified in their context. * - * It is backed by a shared pool of threads on JVM. - * You can set system property "`kotlinx.coroutines.scheduler`" (either no value or to the value of "`on`") - * to use an experimental coroutine dispatcher that shares threads with [Dispatchers.IO] and thus can switch to - * context without performing an actual thread context switch. + * It is backed by a shared pool of threads on JVM. By default, the maximal number of threads used + * by this dispatcher is equal to the number CPU cores, but is at least two. */ @JvmField public val Default: CoroutineDispatcher = diff --git a/core/kotlinx-coroutines-core/build.gradle b/core/kotlinx-coroutines-core/build.gradle index 8756e18175..d2f2f5ef95 100644 --- a/core/kotlinx-coroutines-core/build.gradle +++ b/core/kotlinx-coroutines-core/build.gradle @@ -32,6 +32,7 @@ tasks.withType(Test) { test { exclude '**/*LFTest.*' + systemProperty 'kotlinx.coroutines.scheduler.keep.alive.sec', '100000' // any unpark problem hangs test } task lockFreedomTest(type: Test, dependsOn: testClasses) { @@ -48,18 +49,14 @@ task jdk16Test(type: Test, dependsOn: [testClasses, checkJdk16]) { exclude '**/exceptions/.*' } -task schedulerTest(type: Test, dependsOn: testClasses) { - systemProperty 'kotlinx.coroutines.scheduler', '' - systemProperty 'kotlinx.coroutines.scheduler.keep.alive.sec', '100000' // any unpark problem hangs test -} - // Always run those tests task moreTest(dependsOn: [lockFreedomTest]) -build.dependsOn moreTest - // Run these tests only during nightly stress test -task extraTest(dependsOn: [jdk16Test, schedulerTest]) +task extraTest(dependsOn: [jdk16Test]) + + +build.dependsOn moreTest extraTest.onlyIf { project.properties['stressTest'] != null } diff --git a/core/kotlinx-coroutines-core/src/CommonPool.kt b/core/kotlinx-coroutines-core/src/CommonPool.kt index 27e3991c1d..966fa50225 100644 --- a/core/kotlinx-coroutines-core/src/CommonPool.kt +++ b/core/kotlinx-coroutines-core/src/CommonPool.kt @@ -106,7 +106,7 @@ object CommonPool : ExecutorCoroutineDispatcher() { pool ?: createPool().also { pool = it } override fun dispatch(context: CoroutineContext, block: Runnable) = - try { (pool ?: getOrCreatePoolSync()).execute(timeSource.trackTask(block)) } + try { (pool ?: getOrCreatePoolSync()).execute(timeSource.wrapTask(block)) } catch (e: RejectedExecutionException) { timeSource.unTrackTask() DefaultExecutor.execute(block) diff --git a/core/kotlinx-coroutines-core/src/CoroutineContext.kt b/core/kotlinx-coroutines-core/src/CoroutineContext.kt index c2af9732db..669ff1da17 100644 --- a/core/kotlinx-coroutines-core/src/CoroutineContext.kt +++ b/core/kotlinx-coroutines-core/src/CoroutineContext.kt @@ -20,8 +20,8 @@ internal const val COROUTINES_SCHEDULER_PROPERTY_NAME = "kotlinx.coroutines.sche internal val useCoroutinesScheduler = systemProp(COROUTINES_SCHEDULER_PROPERTY_NAME).let { value -> when (value) { - null -> false - "", "on" -> true + null, "", "on" -> true + "off" -> false else -> error("System property '$COROUTINES_SCHEDULER_PROPERTY_NAME' has unrecognized value '$value'") } } @@ -39,7 +39,7 @@ public actual val DefaultDispatcher: CoroutineDispatcher get() = Dispatchers.Default internal actual fun createDefaultDispatcher(): CoroutineDispatcher = - if (useCoroutinesScheduler) BackgroundDispatcher else CommonPool + if (useCoroutinesScheduler) DefaultScheduler else CommonPool /** * The [CoroutineDispatcher] that is designed for offloading blocking IO tasks to a shared pool of threads. diff --git a/core/kotlinx-coroutines-core/src/Dispatchers.kt b/core/kotlinx-coroutines-core/src/Dispatchers.kt index 8c8a21057e..c689b7e738 100644 --- a/core/kotlinx-coroutines-core/src/Dispatchers.kt +++ b/core/kotlinx-coroutines-core/src/Dispatchers.kt @@ -20,6 +20,10 @@ public const val IO_PARALLELISM_PROPERTY_NAME = "kotlinx.coroutines.io.paralleli * The number of threads used by this dispatcher is limited by the value of * "`kotlinx.coroutines.io.parallelism`" ([IO_PARALLELISM_PROPERTY_NAME]) system property. * It defaults to the limit of 64 threads or the number of cores (whichever is larger). + * + * This dispatcher shares threads with a [Default][Dispatchers.Default] dispatcher, so using + * `withContext(Dispatchers.IO) { ... }` does not lead to an actual switching to another thread — + * typically execution continues in the same thread. */ public val Dispatchers.IO: CoroutineDispatcher - get() = BackgroundDispatcher.IO + get() = DefaultScheduler.IO diff --git a/core/kotlinx-coroutines-core/src/Executors.kt b/core/kotlinx-coroutines-core/src/Executors.kt index dc0ba5f55b..698649e512 100644 --- a/core/kotlinx-coroutines-core/src/Executors.kt +++ b/core/kotlinx-coroutines-core/src/Executors.kt @@ -95,7 +95,7 @@ public abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatc } override fun dispatch(context: CoroutineContext, block: Runnable) = - try { executor.execute(timeSource.trackTask(block)) } + try { executor.execute(timeSource.wrapTask(block)) } catch (e: RejectedExecutionException) { timeSource.unTrackTask() DefaultExecutor.execute(block) diff --git a/core/kotlinx-coroutines-core/src/TimeSource.kt b/core/kotlinx-coroutines-core/src/TimeSource.kt index f32dc30a65..7bda699a21 100644 --- a/core/kotlinx-coroutines-core/src/TimeSource.kt +++ b/core/kotlinx-coroutines-core/src/TimeSource.kt @@ -9,7 +9,8 @@ import java.util.concurrent.locks.LockSupport internal interface TimeSource { fun currentTimeMillis(): Long fun nanoTime(): Long - fun trackTask(block: Runnable): Runnable + fun wrapTask(block: Runnable): Runnable + fun trackTask() fun unTrackTask() fun registerTimeLoopThread() fun unregisterTimeLoopThread() @@ -20,7 +21,8 @@ internal interface TimeSource { internal object DefaultTimeSource : TimeSource { override fun currentTimeMillis(): Long = System.currentTimeMillis() override fun nanoTime(): Long = System.nanoTime() - override fun trackTask(block: Runnable): Runnable = block + override fun wrapTask(block: Runnable): Runnable = block + override fun trackTask() {} override fun unTrackTask() {} override fun registerTimeLoopThread() {} override fun unregisterTimeLoopThread() {} diff --git a/core/kotlinx-coroutines-core/src/scheduling/CoroutineScheduler.kt b/core/kotlinx-coroutines-core/src/scheduling/CoroutineScheduler.kt index 37e193271b..23547acd9c 100644 --- a/core/kotlinx-coroutines-core/src/scheduling/CoroutineScheduler.kt +++ b/core/kotlinx-coroutines-core/src/scheduling/CoroutineScheduler.kt @@ -245,8 +245,9 @@ internal class CoroutineScheduler( private val random = Random() - // This is used a "stop signal" for debugging/tests only - private val isTerminated = atomic(false) + // This is used a "stop signal" for close and shutdown functions + private val _isTerminated = atomic(0) // todo: replace with atomic boolean on new versions of atomicFu + private val isTerminated: Boolean get() = _isTerminated.value != 0 companion object { private const val MAX_SPINS = 1000 @@ -290,35 +291,33 @@ internal class CoroutineScheduler( override fun execute(command: Runnable) = dispatch(command) - override fun close() = shutdown(1000L) + override fun close() = shutdown(10_000L) - /* - * Shuts down current scheduler and waits until all threads are stopped. - * This method uses unsafe API (does unconditional unparks) - * and intended to be used only for testing. Invocation has no additional effect if already closed. - */ + // Shuts down current scheduler and waits until all work is done and all threads are stopped. fun shutdown(timeout: Long) { // atomically set termination flag which is checked when workers are added or removed - if (!isTerminated.compareAndSet(false, true)) return - - /* - * Shutdown current thread. Note that shutdown is testing utility, - * so we don't do anything special to properly verify that no tasks are submitted after close() - */ - val thread = Thread.currentThread() - (thread as? Worker)?.tryReleaseCpu(WorkerState.TERMINATED) - + if (!_isTerminated.compareAndSet(0, 1)) return + // make sure we are not waiting for the current thread + val currentWorker = Thread.currentThread() as? Worker // Capture # of created workers that cannot change anymore (mind the synchronized block!) val created = synchronized(workers) { createdWorkers } for (i in 1..created) { val worker = workers[i]!! - if (worker.isAlive) { - // Unparking alive thread is unsafe in general, but acceptable for testing purposes + if (worker.isAlive && worker !== currentWorker) { LockSupport.unpark(worker) worker.join(timeout) + worker.localQueue.offloadAllWork(globalQueue) } + + } + // Finish processing tasks from globalQueue and/or from this worker's local queue + while (true) { + val task = currentWorker?.findTask() ?: globalQueue.removeFirstOrNull() ?: break + runSafely(task) } - // cleanup state to make sure that tryUnpark tries to create new threads and crashes because it isTerminated + // Shutdown current thread + currentWorker?.tryReleaseCpu(WorkerState.TERMINATED) + // cleanup state to make sure that tryUnpark tries to create new threads and fails because isTerminated assert(cpuPermits.availablePermits() == corePoolSize) parkedWorkersStack.value = 0L controlState.value = 0L @@ -333,6 +332,7 @@ internal class CoroutineScheduler( * @param fair whether the task should be dispatched fairly (strict FIFO) or not (semi-FIFO) */ fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false) { + timeSource.trackTask() // this is needed for virtual time support // TODO at some point make DispatchTask extend Task and make its field settable to save an allocation val task = Task(block, schedulerTimeSource.nanoTime(), taskContext) // try to submit the task to the local queue and act depending on the result @@ -439,7 +439,7 @@ internal class CoroutineScheduler( private fun createNewWorker(): Int { synchronized(workers) { // Make sure we're not trying to resurrect terminated scheduler - if (isTerminated.value) throw ShutdownException() + if (isTerminated) throw RejectedExecutionException("$schedulerName was terminated") val state = controlState.value val created = createdWorkers(state) val blocking = blockingWorkers(state) @@ -456,9 +456,6 @@ internal class CoroutineScheduler( } } - // Is thrown when attempting to create new worker, but this scheduler isTerminated - private class ShutdownException : RuntimeException() - /** * Returns [ADDED], or [NOT_ADDED], or [ADDED_REQUIRES_HELP]. */ @@ -565,6 +562,17 @@ internal class CoroutineScheduler( "]" } + private fun runSafely(task: Task) { + try { + task.run() + } catch (e: Throwable) { + val thread = Thread.currentThread() + thread.uncaughtExceptionHandler.uncaughtException(thread, e) + } finally { + timeSource.unTrackTask() + } + } + internal inner class Worker private constructor() : Thread() { init { isDaemon = true @@ -685,41 +693,28 @@ internal class CoroutineScheduler( private var lastStealIndex = 0 // try in order repeated, reset when unparked override fun run() { - try { - var wasIdle = false // local variable to avoid extra idleReset invocations when tasks repeatedly arrive - while (!isTerminated.value && state != WorkerState.TERMINATED) { - val task = findTask() - if (task == null) { - // Wait for a job with potential park - if (state == WorkerState.CPU_ACQUIRED) { - cpuWorkerIdle() - } else { - blockingWorkerIdle() - } - wasIdle = true + var wasIdle = false // local variable to avoid extra idleReset invocations when tasks repeatedly arrive + while (!isTerminated && state != WorkerState.TERMINATED) { + val task = findTask() + if (task == null) { + // Wait for a job with potential park + if (state == WorkerState.CPU_ACQUIRED) { + cpuWorkerIdle() } else { - if (wasIdle) { - idleReset(task.mode) - wasIdle = false - } - beforeTask(task) - runSafely(task) - afterTask(task) + blockingWorkerIdle() } + wasIdle = true + } else { + if (wasIdle) { + idleReset(task.mode) + wasIdle = false + } + beforeTask(task) + runSafely(task) + afterTask(task) } - } catch (e: ShutdownException) { - // race with shutdown -- ignore exception and don't print it on the console - } finally { - tryReleaseCpu(WorkerState.TERMINATED) - } - } - - private fun runSafely(task: Task) { - try { - task.run() - } catch (t: Throwable) { - uncaughtExceptionHandler.uncaughtException(this, t) } + tryReleaseCpu(WorkerState.TERMINATED) } private fun beforeTask(task: Task) { @@ -823,7 +818,7 @@ internal class CoroutineScheduler( private fun tryTerminateWorker() { synchronized(workers) { // Make sure we're not trying race with termination of scheduler - if (isTerminated.value) throw ShutdownException() + if (isTerminated) return // Someone else terminated, bail out if (createdWorkers <= corePoolSize) return // Try to find blocking task before termination @@ -906,7 +901,7 @@ internal class CoroutineScheduler( spins = 0 // Volatile write, should be written last } - private fun findTask(): Task? { + internal fun findTask(): Task? { if (tryAcquireCpuPermit()) return findTaskWithCpuPermit() /* * If the local queue is empty, try to extract blocking task from global queue. diff --git a/core/kotlinx-coroutines-core/src/scheduling/ExperimentalCoroutineDispatcher.kt b/core/kotlinx-coroutines-core/src/scheduling/Dispatcher.kt similarity index 91% rename from core/kotlinx-coroutines-core/src/scheduling/ExperimentalCoroutineDispatcher.kt rename to core/kotlinx-coroutines-core/src/scheduling/Dispatcher.kt index 776b7af5bf..a21c7685af 100644 --- a/core/kotlinx-coroutines-core/src/scheduling/ExperimentalCoroutineDispatcher.kt +++ b/core/kotlinx-coroutines-core/src/scheduling/Dispatcher.kt @@ -7,14 +7,19 @@ package kotlinx.coroutines.experimental.scheduling import kotlinx.atomicfu.* import kotlinx.coroutines.experimental.* import kotlinx.coroutines.experimental.internal.* +import java.lang.UnsupportedOperationException import java.util.concurrent.* import kotlin.coroutines.experimental.* /** - * Default instance of coroutine dispatcher for background coroutines (as opposed to UI coroutines). + * Default instance of coroutine dispatcher. */ -internal object BackgroundDispatcher : ExperimentalCoroutineDispatcher() { +internal object DefaultScheduler : ExperimentalCoroutineDispatcher() { val IO = blocking(systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS))) + + override fun close() { + throw UnsupportedOperationException("$DEFAULT_SCHEDULER_NAME cannot be closed") + } } /** @@ -39,7 +44,7 @@ open class ExperimentalCoroutineDispatcher( get() = coroutineScheduler // This is variable for test purposes, so that we can reinitialize from clean state - private var coroutineScheduler = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs) + private var coroutineScheduler = createScheduler() override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block) @@ -47,7 +52,6 @@ open class ExperimentalCoroutineDispatcher( override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block, fair = true) - // TODO throw error when this API becomes public and close it in tests via another method override fun close() = coroutineScheduler.close() override fun toString(): String { @@ -82,16 +86,23 @@ open class ExperimentalCoroutineDispatcher( internal fun dispatchWithContext(block: Runnable, context: TaskContext, fair: Boolean): Unit = coroutineScheduler.dispatch(block, context, fair) + private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs) + // fot tests only + @Synchronized internal fun usePrivateScheduler() { - coroutineScheduler.shutdown(1000L) - coroutineScheduler = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs) + coroutineScheduler.shutdown(10_000L) + coroutineScheduler = createScheduler() } // for tests only + @Synchronized internal fun shutdown(timeout: Long) { coroutineScheduler.shutdown(timeout) } + + // for tests only + internal fun restore() = usePrivateScheduler() // recreate scheduler } private class LimitingDispatcher( diff --git a/core/kotlinx-coroutines-core/src/scheduling/Tasks.kt b/core/kotlinx-coroutines-core/src/scheduling/Tasks.kt index 2b73f72259..545a5bb9e8 100644 --- a/core/kotlinx-coroutines-core/src/scheduling/Tasks.kt +++ b/core/kotlinx-coroutines-core/src/scheduling/Tasks.kt @@ -11,7 +11,7 @@ import java.util.concurrent.* // TODO most of these fields will be moved to 'object ExperimentalDispatcher' -internal const val DEFAULT_SCHEDULER_NAME = "CoroutineScheduler" +internal const val DEFAULT_SCHEDULER_NAME = "DefaultDispatcher" // 100us as default @JvmField diff --git a/core/kotlinx-coroutines-core/src/scheduling/WorkQueue.kt b/core/kotlinx-coroutines-core/src/scheduling/WorkQueue.kt index b86ef4ccc4..f41b0f7e4e 100644 --- a/core/kotlinx-coroutines-core/src/scheduling/WorkQueue.kt +++ b/core/kotlinx-coroutines-core/src/scheduling/WorkQueue.kt @@ -144,6 +144,13 @@ internal class WorkQueue { } } + internal fun offloadAllWork(globalQueue: GlobalQueue) { + while (true) { + val task = pollExternal() ?: return + globalQueue.addLast(task) + } + } + /** * [poll] for external (not owning this queue) workers */ diff --git a/core/kotlinx-coroutines-core/test/TestBase.kt b/core/kotlinx-coroutines-core/test/TestBase.kt index 7d576cf165..e92afeaf7d 100644 --- a/core/kotlinx-coroutines-core/test/TestBase.kt +++ b/core/kotlinx-coroutines-core/test/TestBase.kt @@ -5,8 +5,8 @@ package kotlinx.coroutines.experimental import kotlinx.coroutines.experimental.internal.* -import org.junit.* import kotlinx.coroutines.experimental.scheduling.* +import org.junit.* import java.util.concurrent.atomic.* private val VERBOSE = systemProp("test.verbose", false) @@ -124,13 +124,15 @@ public actual open class TestBase actual constructor() { fun initPoolsBeforeTest() { CommonPool.usePrivatePool() - BackgroundDispatcher.usePrivateScheduler() + DefaultScheduler.usePrivateScheduler() } fun shutdownPoolsAfterTest() { CommonPool.shutdown(SHUTDOWN_TIMEOUT) - BackgroundDispatcher.shutdown(SHUTDOWN_TIMEOUT) + DefaultScheduler.shutdown(SHUTDOWN_TIMEOUT) DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT) + CommonPool.restore() + DefaultScheduler.restore() } @Suppress("ACTUAL_WITHOUT_EXPECT", "ACTUAL_FUNCTION_WITH_DEFAULT_ARGUMENTS") diff --git a/core/kotlinx-coroutines-core/test/VirtualTimeSource.kt b/core/kotlinx-coroutines-core/test/VirtualTimeSource.kt index 370c871243..bd9fe03566 100644 --- a/core/kotlinx-coroutines-core/test/VirtualTimeSource.kt +++ b/core/kotlinx-coroutines-core/test/VirtualTimeSource.kt @@ -58,15 +58,19 @@ internal class VirtualTimeSource( override fun currentTimeMillis(): Long = TimeUnit.NANOSECONDS.toMillis(time) override fun nanoTime(): Long = time - @Synchronized - override fun trackTask(block: Runnable): Runnable { - trackedTasks++ + override fun wrapTask(block: Runnable): Runnable { + trackTask() return Runnable { try { block.run() } finally { unTrackTask() } } } + @Synchronized + override fun trackTask() { + trackedTasks++ + } + @Synchronized override fun unTrackTask() { assert(trackedTasks > 0) diff --git a/core/kotlinx-coroutines-core/test/channels/ChannelsConsumeTest.kt b/core/kotlinx-coroutines-core/test/channels/ChannelsConsumeTest.kt index 73557cce1b..30bd1b31dc 100644 --- a/core/kotlinx-coroutines-core/test/channels/ChannelsConsumeTest.kt +++ b/core/kotlinx-coroutines-core/test/channels/ChannelsConsumeTest.kt @@ -11,7 +11,7 @@ import kotlin.test.* /** * Tests that various operators on channels properly consume (close) their source channels. */ -class ChannelsConsumeTest { +class ChannelsConsumeTest : TestBase() { private val sourceList = (1..10).toList() // test source with numbers 1..10 diff --git a/core/kotlinx-coroutines-core/test/channels/SimpleSendReceiveJvmTest.kt b/core/kotlinx-coroutines-core/test/channels/SimpleSendReceiveJvmTest.kt index da5f7cdcff..ceb630c4e5 100644 --- a/core/kotlinx-coroutines-core/test/channels/SimpleSendReceiveJvmTest.kt +++ b/core/kotlinx-coroutines-core/test/channels/SimpleSendReceiveJvmTest.kt @@ -16,7 +16,7 @@ class SimpleSendReceiveJvmTest( private val kind: TestChannelKind, val n: Int, val concurrent: Boolean -) { +) : TestBase() { companion object { @Parameterized.Parameters(name = "{0}, n={1}, concurrent={2}") @JvmStatic diff --git a/core/kotlinx-coroutines-core/test/guide/example-context-01.kt b/core/kotlinx-coroutines-core/test/guide/example-context-01.kt index c417dda7bb..7058f5d162 100644 --- a/core/kotlinx-coroutines-core/test/guide/example-context-01.kt +++ b/core/kotlinx-coroutines-core/test/guide/example-context-01.kt @@ -15,7 +15,7 @@ fun main(args: Array) = runBlocking { launch(Dispatchers.Unconfined) { // not confined -- will work with main thread println("Unconfined : I'm working in thread ${Thread.currentThread().name}") } - launch(Dispatchers.Default) { // will get dispatched to ForkJoinPool.commonPool (or equivalent) + launch(Dispatchers.Default) { // will get dispatched to DefaultDispatcher println("Default : I'm working in thread ${Thread.currentThread().name}") } launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread diff --git a/core/kotlinx-coroutines-core/test/guide/test/DispatcherGuideTest.kt b/core/kotlinx-coroutines-core/test/guide/test/DispatcherGuideTest.kt index c59745736d..f45bc1353f 100644 --- a/core/kotlinx-coroutines-core/test/guide/test/DispatcherGuideTest.kt +++ b/core/kotlinx-coroutines-core/test/guide/test/DispatcherGuideTest.kt @@ -9,7 +9,7 @@ class DispatchersGuideTest { fun testKotlinxCoroutinesExperimentalGuideContext01() { test("KotlinxCoroutinesExperimentalGuideContext01") { kotlinx.coroutines.experimental.guide.context01.main(emptyArray()) }.verifyLinesStartUnordered( "Unconfined : I'm working in thread main", - "Default : I'm working in thread CommonPool-worker-1", + "Default : I'm working in thread DefaultDispatcher-worker-1", "newSingleThreadContext: I'm working in thread MyOwnThread", "main runBlocking : I'm working in thread main" ) @@ -84,7 +84,7 @@ class DispatchersGuideTest { @Test fun testKotlinxCoroutinesExperimentalGuideContext09() { test("KotlinxCoroutinesExperimentalGuideContext09") { kotlinx.coroutines.experimental.guide.context09.main(emptyArray()) }.verifyLinesFlexibleThread( - "I'm working in thread CommonPool-worker-1 @test#2" + "I'm working in thread DefaultDispatcher-worker-1 @test#2" ) } @@ -102,8 +102,8 @@ class DispatchersGuideTest { fun testKotlinxCoroutinesExperimentalGuideContext11() { test("KotlinxCoroutinesExperimentalGuideContext11") { kotlinx.coroutines.experimental.guide.context11.main(emptyArray()) }.verifyLinesFlexibleThread( "Pre-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main'", - "Launch start, current thread: Thread[CommonPool-worker-1 @coroutine#2,5,main], thread local value: 'launch'", - "After yield, current thread: Thread[CommonPool-worker-2 @coroutine#2,5,main], thread local value: 'launch'", + "Launch start, current thread: Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main], thread local value: 'launch'", + "After yield, current thread: Thread[DefaultDispatcher-worker-2 @coroutine#2,5,main], thread local value: 'launch'", "Post-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main'" ) } diff --git a/core/kotlinx-coroutines-core/test/guide/test/ExceptionsGuideTest.kt b/core/kotlinx-coroutines-core/test/guide/test/ExceptionsGuideTest.kt index 5b2ae61fff..e5c069051a 100644 --- a/core/kotlinx-coroutines-core/test/guide/test/ExceptionsGuideTest.kt +++ b/core/kotlinx-coroutines-core/test/guide/test/ExceptionsGuideTest.kt @@ -9,7 +9,7 @@ class ExceptionsGuideTest { fun testKotlinxCoroutinesExperimentalGuideExceptions01() { test("KotlinxCoroutinesExperimentalGuideExceptions01") { kotlinx.coroutines.experimental.guide.exceptions01.main(emptyArray()) }.verifyExceptions( "Throwing exception from launch", - "Exception in thread \"ForkJoinPool.commonPool-worker-2 @coroutine#2\" java.lang.IndexOutOfBoundsException", + "Exception in thread \"DefaultDispatcher-worker-2 @coroutine#2\" java.lang.IndexOutOfBoundsException", "Joined failed job", "Throwing exception from async", "Caught ArithmeticException" diff --git a/core/kotlinx-coroutines-core/test/guide/test/TestUtil.kt b/core/kotlinx-coroutines-core/test/guide/test/TestUtil.kt index 8b3c05165c..2d9b5265ed 100644 --- a/core/kotlinx-coroutines-core/test/guide/test/TestUtil.kt +++ b/core/kotlinx-coroutines-core/test/guide/test/TestUtil.kt @@ -6,11 +6,12 @@ package kotlinx.coroutines.experimental.guide.test import kotlinx.coroutines.experimental.* import kotlinx.coroutines.experimental.internal.* +import kotlinx.coroutines.experimental.scheduling.* import org.junit.Assert.* import java.io.* import java.util.concurrent.* -fun trackTask(block: Runnable) = timeSource.trackTask(block) +fun wrapTask(block: Runnable) = timeSource.wrapTask(block) // helper function to dump exception to stdout for ease of debugging failed tests private inline fun outputException(name: String, block: () -> T): T = @@ -37,6 +38,7 @@ fun test(name: String, block: () -> Unit): List = outputException(name) System.setErr(ps) System.setOut(ps) CommonPool.usePrivatePool() + DefaultScheduler.usePrivateScheduler() DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT) resetCoroutineId() val threadsBefore = currentThreads() @@ -53,9 +55,9 @@ fun test(name: String, block: () -> Unit): List = outputException(name) oldOut.println("--- shutting down") // the shutdown CommonPool.shutdown(SHUTDOWN_TIMEOUT) + DefaultScheduler.shutdown(SHUTDOWN_TIMEOUT) shutdownDispatcherPools(SHUTDOWN_TIMEOUT) DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT) // the last man standing -- cleanup all pending tasks - CommonPool.restore() if (tee.flushLine()) oldOut.println() oldOut.println("--- done") System.setOut(sout) @@ -63,6 +65,8 @@ fun test(name: String, block: () -> Unit): List = outputException(name) checkTestThreads(threadsBefore) } } + CommonPool.restore() + DefaultScheduler.restore() return ByteArrayInputStream(bytes).bufferedReader().readLines() } @@ -122,9 +126,10 @@ private fun sanitize(s: String, mode: SanitizeMode): String { res = res.replace(Regex(" [0-9]+ ms"), " xxx ms") } SanitizeMode.FLEXIBLE_THREAD -> { - res = res.replace(Regex("ForkJoinPool\\.commonPool-worker-[0-9]+"), "CommonPool") - res = res.replace(Regex("ForkJoinPool-[0-9]+-worker-[0-9]+"), "CommonPool") - res = res.replace(Regex("CommonPool-worker-[0-9]+"), "CommonPool") + res = res.replace(Regex("ForkJoinPool\\.commonPool-worker-[0-9]+"), "DefaultDispatcher") + res = res.replace(Regex("ForkJoinPool-[0-9]+-worker-[0-9]+"), "DefaultDispatcher") + res = res.replace(Regex("CommonPool-worker-[0-9]+"), "DefaultDispatcher") + res = res.replace(Regex("DefaultDispatcher-worker-[0-9]+"), "DefaultDispatcher") res = res.replace(Regex("RxComputationThreadPool-[0-9]+"), "RxComputationThreadPool") res = res.replace(Regex("Test( worker)?"), "main") } diff --git a/docs/coroutine-context-and-dispatchers.md b/docs/coroutine-context-and-dispatchers.md index e955901efa..25761b091a 100644 --- a/docs/coroutine-context-and-dispatchers.md +++ b/docs/coroutine-context-and-dispatchers.md @@ -72,7 +72,7 @@ fun main(args: Array) = runBlocking { launch(Dispatchers.Unconfined) { // not confined -- will work with main thread println("Unconfined : I'm working in thread ${Thread.currentThread().name}") } - launch(Dispatchers.Default) { // will get dispatched to ForkJoinPool.commonPool (or equivalent) + launch(Dispatchers.Default) { // will get dispatched to DefaultDispatcher println("Default : I'm working in thread ${Thread.currentThread().name}") } launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread @@ -89,7 +89,7 @@ It produces the following output (maybe in different order): ```text Unconfined : I'm working in thread main -Default : I'm working in thread CommonPool-worker-1 +Default : I'm working in thread DefaultDispatcher-worker-1 newSingleThreadContext: I'm working in thread MyOwnThread main runBlocking : I'm working in thread main ``` @@ -475,7 +475,7 @@ fun main(args: Array) = runBlocking { The output of this code with `-Dkotlinx.coroutines.debug` JVM option is: ```text -I'm working in thread CommonPool-worker-1 @test#2 +I'm working in thread DefaultDispatcher-worker-1 @test#2 ``` @@ -632,8 +632,8 @@ Thus, output (with [debug](#debugging-coroutines-and-threads)) is: ```text Pre-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main' -Launch start, current thread: Thread[CommonPool-worker-1 @coroutine#2,5,main], thread local value: 'launch' -After yield, current thread: Thread[CommonPool-worker-2 @coroutine#2,5,main], thread local value: 'launch' +Launch start, current thread: Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main], thread local value: 'launch' +After yield, current thread: Thread[DefaultDispatcher-worker-2 @coroutine#2,5,main], thread local value: 'launch' Post-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main' ``` diff --git a/docs/exception-handling.md b/docs/exception-handling.md index e2772e332c..bf454a1d78 100644 --- a/docs/exception-handling.md +++ b/docs/exception-handling.md @@ -81,7 +81,7 @@ The output of this code is (with [debug](https://github.com/Kotlin/kotlinx.corou ```text Throwing exception from launch -Exception in thread "ForkJoinPool.commonPool-worker-2 @coroutine#2" java.lang.IndexOutOfBoundsException +Exception in thread "DefaultDispatcher-worker-2 @coroutine#2" java.lang.IndexOutOfBoundsException Joined failed job Throwing exception from async Caught ArithmeticException diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/test/ReactiveTestBase.kt b/reactive/kotlinx-coroutines-rx2/test/guide/test/ReactiveTestBase.kt index d065d61c57..538de3d5d0 100644 --- a/reactive/kotlinx-coroutines-rx2/test/guide/test/ReactiveTestBase.kt +++ b/reactive/kotlinx-coroutines-rx2/test/guide/test/ReactiveTestBase.kt @@ -43,5 +43,5 @@ private class WrapperWorker(private val worker: Scheduler.Worker) : Scheduler.Wo override fun isDisposed(): Boolean = worker.isDisposed override fun dispose() = worker.dispose() override fun schedule(run: Runnable, delay: Long, unit: TimeUnit): Disposable = - worker.schedule(trackTask(run), delay, unit) + worker.schedule(wrapTask(run), delay, unit) }