Skip to content

Introduce IO dispatcher to offload blocking I/O-intensive tasks #508

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ public final class kotlinx/coroutines/experimental/CoroutineContextKt {
public static final field DEBUG_PROPERTY_VALUE_AUTO Ljava/lang/String;
public static final field DEBUG_PROPERTY_VALUE_OFF Ljava/lang/String;
public static final field DEBUG_PROPERTY_VALUE_ON Ljava/lang/String;
public static final field IO_PARALLELISM_PROPERTY_NAME Ljava/lang/String;
public static final fun getDefaultDispatcher ()Lkotlinx/coroutines/experimental/CoroutineDispatcher;
public static final fun getIO ()Lkotlinx/coroutines/experimental/CoroutineDispatcher;
public static final fun newCoroutineContext (Lkotlin/coroutines/experimental/CoroutineContext;)Lkotlin/coroutines/experimental/CoroutineContext;
public static final fun newCoroutineContext (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;)Lkotlin/coroutines/experimental/CoroutineContext;
public static synthetic fun newCoroutineContext$default (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;ILjava/lang/Object;)Lkotlin/coroutines/experimental/CoroutineContext;
Expand Down
17 changes: 10 additions & 7 deletions common/kotlinx-coroutines-core-common/src/CoroutineDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,19 @@ import kotlin.coroutines.experimental.*
* Base class that shall be extended by all coroutine dispatcher implementations.
*
* The following standard implementations are provided by `kotlinx.coroutines`:
*
* * [DefaultDispatcher] -- is used by all standard builder if no dispatcher nor any other [ContinuationInterceptor]
* is specified in their context. It is currently equal to [CommonPool] (subject to change in the future).
* This is an appropriate choice for compute-intensive coroutines that consume CPU resources.
* * [CommonPool] -- schedules coroutine execution to a common pool of shared background threads designed
* to be used for compute-intensive code.
* * [IO] -- uses a shared pool of on-demand created threads and is designed for offloading of IO-intensive _blocking_
* operations (like file I/O and blocking socket I/O).
* * [Unconfined] -- starts coroutine execution in the current call-frame until the first suspension.
* On first suspension the coroutine builder function returns.
* The coroutine will resume in whatever thread that is used by the
* The coroutine resumes in whatever thread that is used by the
* corresponding suspending function, without confining it to any specific thread or pool.
* This in an appropriate choice for IO-intensive coroutines that do not consume CPU resources.
* * [DefaultDispatcher] -- is used by all standard builder if no dispatcher nor any other [ContinuationInterceptor]
* is specified in their context. It is currently equal to [CommonPool] (subject to change).
* * [CommonPool] -- immediately returns from the coroutine builder and schedules coroutine execution to
* a common pool of shared background threads.
* This is an appropriate choice for compute-intensive coroutines that consume a lot of CPU resources.
* **Unconfined dispatcher should not be normally used in code**.
* * Private thread pools can be created with [newSingleThreadContext] and [newFixedThreadPoolContext].
* * An arbitrary [Executor][java.util.concurrent.Executor] can be converted to dispatcher with [asCoroutineDispatcher] extension function.
*
Expand Down
26 changes: 23 additions & 3 deletions core/kotlinx-coroutines-core/src/CoroutineContext.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import java.util.concurrent.atomic.*
import kotlin.coroutines.experimental.*

/**
* Name of the property that control coroutine debugging. See [newCoroutineContext].
* Name of the property that controls coroutine debugging. See [newCoroutineContext].
*/
public const val DEBUG_PROPERTY_NAME = "kotlinx.coroutines.debug"

Expand Down Expand Up @@ -56,14 +56,34 @@ internal val useCoroutinesScheduler = systemProp(COROUTINES_SCHEDULER_PROPERTY_N
}

/**
* This is the default [CoroutineDispatcher] that is used by all standard builders like
* The default [CoroutineDispatcher] that is used by all standard builders like
* [launch], [async], etc if no dispatcher nor any other [ContinuationInterceptor] is specified in their context.
*
* It is currently equal to [CommonPool], but the value is subject to change in the future.
* You can set system property "`kotlinx.coroutines.scheduler`" (either no value or to the value of "`on`")
* to use an experimental coroutine dispatcher that shares threads with [IO] dispatcher and thus can switch to
* [IO] context without performing an actual thread context switch.
*/
@Suppress("PropertyName")
public actual val DefaultDispatcher: CoroutineDispatcher =
if (useCoroutinesScheduler) ExperimentalCoroutineDispatcher() else CommonPool
if (useCoroutinesScheduler) BackgroundDispatcher else CommonPool

/**
* Name of the property that defines the maximal number of threads that are used by [IO] coroutines dispatcher.
*/
public const val IO_PARALLELISM_PROPERTY_NAME = "kotlinx.coroutines.io.parallelism"

/**
* The [CoroutineDispatcher] that is designed for offloading blocking IO tasks to a shared pool of threads.
*
* Additional threads in this pool are created and are shutdown on demand.
* The number of threads used by this dispatcher is limited by the value of
* "`kotlinx.coroutines.io.parallelism`" ([IO_PARALLELISM_PROPERTY_NAME]) system property.
* It defaults to the limit of 64 threads or the number of cores (whichever is larger).
*/
public val IO by lazy {
BackgroundDispatcher.blocking(systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)))
}

/**
* Creates context for the new coroutine. It installs [DefaultDispatcher] when no other dispatcher nor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,16 @@ import kotlinx.coroutines.experimental.*
import java.util.concurrent.*
import kotlin.coroutines.experimental.*

/**
* Default instance of coroutine dispatcher for background coroutines (as opposed to UI coroutines).
*/
internal object BackgroundDispatcher : ExperimentalCoroutineDispatcher()

/**
* @suppress **This is unstable API and it is subject to change.**
*/
// TODO make internal (and rename) after complete integration
class ExperimentalCoroutineDispatcher(
open class ExperimentalCoroutineDispatcher(
private val corePoolSize: Int,
private val maxPoolSize: Int,
private val idleWorkerKeepAliveNs: Long
Expand Down
23 changes: 23 additions & 0 deletions core/kotlinx-coroutines-core/test/IODispatcherTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

import kotlinx.coroutines.experimental.*
import org.junit.Test
import kotlin.test.*

class IODispatcherTest : TestBase() {
@Test
fun testWithIOContext() = runTest {
// just a very basic test that is dispatcher works and indeed uses background thread
val mainThread = Thread.currentThread()
expect(1)
withContext(IO) {
expect(2)
assertNotSame(mainThread, Thread.currentThread())
}
expect(3)
assertSame(mainThread, Thread.currentThread())
finish(4)
}
}
4 changes: 2 additions & 2 deletions core/kotlinx-coroutines-core/test/TestBase.kt
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,12 @@ public actual open class TestBase actual constructor() {

fun initPoolsBeforeTest() {
CommonPool.usePrivatePool()
if (useCoroutinesScheduler) (DefaultDispatcher as ExperimentalCoroutineDispatcher).usePrivateScheduler()
BackgroundDispatcher.usePrivateScheduler()
}

fun shutdownPoolsAfterTest() {
CommonPool.shutdown(SHUTDOWN_TIMEOUT)
if (useCoroutinesScheduler) (DefaultDispatcher as ExperimentalCoroutineDispatcher).shutdown(SHUTDOWN_TIMEOUT)
BackgroundDispatcher.shutdown(SHUTDOWN_TIMEOUT)
DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT)
}

Expand Down