Skip to content

Commit 1df0be5

Browse files
authored
Introduce CoroutineDispatcher.limitedParallelism and make Dispatchers.IO unbounded for limited parallelism (Kotlin#2918)
* Introduce CoroutineDispatcher.limitedParallelism for granular concurrency control * Elastic Dispatchers.IO: * Extract Ktor-obsolete API to a separate file for backwards compatibility * Make Dispatchers.IO being a slice of unlimited blocking scheduler * Make Dispatchers.IO.limitParallelism take slices from the same internal scheduler Fixes Kotlin#2943 Fixes Kotlin#2919
1 parent 8d1ee7d commit 1df0be5

29 files changed

+755
-347
lines changed

benchmarks/src/jmh/kotlin/benchmarks/ParametrizedDispatcherBase.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ abstract class ParametrizedDispatcherBase : CoroutineScope {
3030
coroutineContext = when {
3131
dispatcher == "fjp" -> ForkJoinPool.commonPool().asCoroutineDispatcher()
3232
dispatcher == "scheduler" -> {
33-
ExperimentalCoroutineDispatcher(CORES_COUNT).also { closeable = it }
33+
Dispatchers.Default
3434
}
3535
dispatcher.startsWith("ftp") -> {
3636
newFixedThreadPoolContext(dispatcher.substring(4).toInt(), dispatcher).also { closeable = it }

benchmarks/src/jmh/kotlin/benchmarks/SemaphoreBenchmark.kt

+4-7
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,10 @@ package benchmarks
66

77
import benchmarks.common.*
88
import kotlinx.coroutines.*
9-
import kotlinx.coroutines.channels.Channel
10-
import kotlinx.coroutines.scheduling.ExperimentalCoroutineDispatcher
11-
import kotlinx.coroutines.sync.Semaphore
12-
import kotlinx.coroutines.sync.withPermit
9+
import kotlinx.coroutines.channels.*
10+
import kotlinx.coroutines.sync.*
1311
import org.openjdk.jmh.annotations.*
14-
import java.util.concurrent.ForkJoinPool
15-
import java.util.concurrent.TimeUnit
12+
import java.util.concurrent.*
1613

1714
@Warmup(iterations = 3, time = 500, timeUnit = TimeUnit.MICROSECONDS)
1815
@Measurement(iterations = 10, time = 500, timeUnit = TimeUnit.MICROSECONDS)
@@ -84,7 +81,7 @@ open class SemaphoreBenchmark {
8481

8582
enum class SemaphoreBenchDispatcherCreator(val create: (parallelism: Int) -> CoroutineDispatcher) {
8683
FORK_JOIN({ parallelism -> ForkJoinPool(parallelism).asCoroutineDispatcher() }),
87-
EXPERIMENTAL({ parallelism -> ExperimentalCoroutineDispatcher(corePoolSize = parallelism, maxPoolSize = parallelism) })
84+
EXPERIMENTAL({ parallelism -> Dispatchers.Default }) // TODO doesn't take parallelism into account
8885
}
8986

9087
private const val WORK_INSIDE = 80

benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/PingPongWithBlockingContext.kt

+2-4
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,8 @@ import kotlin.coroutines.*
2727
@State(Scope.Benchmark)
2828
open class PingPongWithBlockingContext {
2929

30-
@UseExperimental(InternalCoroutinesApi::class)
31-
private val experimental = ExperimentalCoroutineDispatcher(8)
32-
@UseExperimental(InternalCoroutinesApi::class)
33-
private val blocking = experimental.blocking(8)
30+
private val experimental = Dispatchers.Default
31+
private val blocking = Dispatchers.IO.limitedParallelism(8)
3432
private val threadPool = newFixedThreadPoolContext(8, "PongCtx")
3533

3634
@TearDown

integration/kotlinx-coroutines-play-services/test/TaskTest.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ class TaskTest : TestBase() {
4545
}
4646

4747
@Test
48-
fun testCancelledAsTask() {
49-
val deferred = GlobalScope.async {
48+
fun testCancelledAsTask() = runTest {
49+
val deferred = async(Dispatchers.Default) {
5050
delay(100)
5151
}.apply { cancel() }
5252

@@ -60,8 +60,8 @@ class TaskTest : TestBase() {
6060
}
6161

6262
@Test
63-
fun testThrowingAsTask() {
64-
val deferred = GlobalScope.async<Int> {
63+
fun testThrowingAsTask() = runTest({ e -> e is TestException }) {
64+
val deferred = async<Int>(Dispatchers.Default) {
6565
throw TestException("Fail")
6666
}
6767

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+2
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ public abstract class kotlinx/coroutines/CoroutineDispatcher : kotlin/coroutines
156156
public fun get (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext$Element;
157157
public final fun interceptContinuation (Lkotlin/coroutines/Continuation;)Lkotlin/coroutines/Continuation;
158158
public fun isDispatchNeeded (Lkotlin/coroutines/CoroutineContext;)Z
159+
public fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher;
159160
public fun minusKey (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext;
160161
public final fun plus (Lkotlinx/coroutines/CoroutineDispatcher;)Lkotlinx/coroutines/CoroutineDispatcher;
161162
public final fun releaseInterceptedContinuation (Lkotlin/coroutines/Continuation;)V
@@ -447,6 +448,7 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin
447448
public abstract class kotlinx/coroutines/MainCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher {
448449
public fun <init> ()V
449450
public abstract fun getImmediate ()Lkotlinx/coroutines/MainCoroutineDispatcher;
451+
public fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher;
450452
public fun toString ()Ljava/lang/String;
451453
protected final fun toStringInternalImpl ()Ljava/lang/String;
452454
}

kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt

+39
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,45 @@ public abstract class CoroutineDispatcher :
6161
*/
6262
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
6363

64+
/**
65+
* Creates a view of the current dispatcher that limits the parallelism to the given [value][parallelism].
66+
* The resulting view uses the original dispatcher for execution, but with the guarantee that
67+
* no more than [parallelism] coroutines are executed at the same time.
68+
*
69+
* This method does not impose restrictions on the number of views or the total sum of parallelism values,
70+
* each view controls its own parallelism independently with the guarantee that the effective parallelism
71+
* of all views cannot exceed the actual parallelism of the original dispatcher.
72+
*
73+
* ### Limitations
74+
*
75+
* The default implementation of `limitedParallelism` does not support direct dispatchers,
76+
* such as executing the given runnable in place during [dispatch] calls.
77+
* Any dispatcher that may return `false` from [isDispatchNeeded] is considered direct.
78+
* For direct dispatchers, it is recommended to override this method
79+
* and provide a domain-specific implementation or to throw an [UnsupportedOperationException].
80+
*
81+
* ### Example of usage
82+
* ```
83+
* private val backgroundDispatcher = newFixedThreadPoolContext(4, "App Background")
84+
* // At most 2 threads will be processing images as it is really slow and CPU-intensive
85+
* private val imageProcessingDispatcher = backgroundDispatcher.limitedParallelism(2)
86+
* // At most 3 threads will be processing JSON to avoid image processing starvation
87+
* private val imageProcessingDispatcher = backgroundDispatcher.limitedParallelism(3)
88+
* // At most 1 thread will be doing IO
89+
* private val fileWriterDispatcher = backgroundDispatcher.limitedParallelism(1)
90+
* ```
91+
* is 6. Yet at most 4 coroutines can be executed simultaneously as each view limits only its own parallelism.
92+
*
93+
* Note that this example was structured in such a way that it illustrates the parallelism guarantees.
94+
* In practice, it is usually better to use [Dispatchers.IO] or [Dispatchers.Default] instead of creating a
95+
* `backgroundDispatcher`. It is both possible and advised to call `limitedParallelism` on them.
96+
*/
97+
@ExperimentalCoroutinesApi
98+
public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
99+
parallelism.checkParallelism()
100+
return LimitedDispatcher(this, parallelism)
101+
}
102+
64103
/**
65104
* Dispatches execution of a runnable [block] onto another thread in the given [context].
66105
* This method should guarantee that the given [block] will be eventually invoked,

kotlinx-coroutines-core/common/src/EventLoop.common.kt

+5-1
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,11 @@ internal abstract class EventLoop : CoroutineDispatcher() {
115115
}
116116
}
117117

118+
final override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
119+
parallelism.checkParallelism()
120+
return this
121+
}
122+
118123
open fun shutdown() {}
119124
}
120125

@@ -525,4 +530,3 @@ internal expect fun nanoTime(): Long
525530
internal expect object DefaultExecutor {
526531
public fun enqueue(task: Runnable)
527532
}
528-

kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt

+8
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
package kotlinx.coroutines
66

7+
import kotlinx.coroutines.internal.*
8+
79
/**
810
* Base class for special [CoroutineDispatcher] which is confined to application "Main" or "UI" thread
911
* and used for any UI-based activities. Instance of `MainDispatcher` can be obtained by [Dispatchers.Main].
@@ -51,6 +53,12 @@ public abstract class MainCoroutineDispatcher : CoroutineDispatcher() {
5153
*/
5254
override fun toString(): String = toStringInternalImpl() ?: "$classSimpleName@$hexAddress"
5355

56+
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
57+
parallelism.checkParallelism()
58+
// MainCoroutineDispatcher is single-threaded -- short-circuit any attempts to limit it
59+
return this
60+
}
61+
5462
/**
5563
* Internal method for more specific [toString] implementations. It returns non-null
5664
* string if this dispatcher is set in the platform as the main one.

kotlinx-coroutines-core/common/src/Unconfined.kt

+6
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@ import kotlin.jvm.*
1111
* A coroutine dispatcher that is not confined to any specific thread.
1212
*/
1313
internal object Unconfined : CoroutineDispatcher() {
14+
15+
@ExperimentalCoroutinesApi
16+
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
17+
throw UnsupportedOperationException("limitedParallelism is not supported for Dispatchers.Unconfined")
18+
}
19+
1420
override fun isDispatchNeeded(context: CoroutineContext): Boolean = false
1521

1622
override fun dispatch(context: CoroutineContext, block: Runnable) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
7+
import kotlinx.coroutines.*
8+
import kotlin.coroutines.*
9+
import kotlin.jvm.*
10+
11+
/**
12+
* The result of .limitedParallelism(x) call, a dispatcher
13+
* that wraps the given dispatcher, but limits the parallelism level, while
14+
* trying to emulate fairness.
15+
*/
16+
internal class LimitedDispatcher(
17+
private val dispatcher: CoroutineDispatcher,
18+
private val parallelism: Int
19+
) : CoroutineDispatcher(), Runnable, Delay by (dispatcher as? Delay ?: DefaultDelay) {
20+
21+
@Volatile
22+
private var runningWorkers = 0
23+
24+
private val queue = LockFreeTaskQueue<Runnable>(singleConsumer = false)
25+
26+
@ExperimentalCoroutinesApi
27+
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
28+
parallelism.checkParallelism()
29+
if (parallelism >= this.parallelism) return this
30+
return super.limitedParallelism(parallelism)
31+
}
32+
33+
override fun run() {
34+
var fairnessCounter = 0
35+
while (true) {
36+
val task = queue.removeFirstOrNull()
37+
if (task != null) {
38+
try {
39+
task.run()
40+
} catch (e: Throwable) {
41+
handleCoroutineException(EmptyCoroutineContext, e)
42+
}
43+
// 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well
44+
if (++fairnessCounter >= 16 && dispatcher.isDispatchNeeded(this)) {
45+
// Do "yield" to let other views to execute their runnable as well
46+
// Note that we do not decrement 'runningWorkers' as we still committed to do our part of work
47+
dispatcher.dispatch(this, this)
48+
return
49+
}
50+
continue
51+
}
52+
53+
@Suppress("CAST_NEVER_SUCCEEDS")
54+
synchronized(this as SynchronizedObject) {
55+
--runningWorkers
56+
if (queue.size == 0) return
57+
++runningWorkers
58+
fairnessCounter = 0
59+
}
60+
}
61+
}
62+
63+
override fun dispatch(context: CoroutineContext, block: Runnable) {
64+
dispatchInternal(block) {
65+
dispatcher.dispatch(this, this)
66+
}
67+
}
68+
69+
@InternalCoroutinesApi
70+
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
71+
dispatchInternal(block) {
72+
dispatcher.dispatchYield(this, this)
73+
}
74+
}
75+
76+
private inline fun dispatchInternal(block: Runnable, dispatch: () -> Unit) {
77+
// Add task to queue so running workers will be able to see that
78+
if (addAndTryDispatching(block)) return
79+
/*
80+
* Protect against the race when the number of workers is enough,
81+
* but one (because of synchronized serialization) attempts to complete,
82+
* and we just observed the number of running workers smaller than the actual
83+
* number (hit right between `--runningWorkers` and `++runningWorkers` in `run()`)
84+
*/
85+
if (!tryAllocateWorker()) return
86+
dispatch()
87+
}
88+
89+
private fun tryAllocateWorker(): Boolean {
90+
@Suppress("CAST_NEVER_SUCCEEDS")
91+
synchronized(this as SynchronizedObject) {
92+
if (runningWorkers >= parallelism) return false
93+
++runningWorkers
94+
return true
95+
}
96+
}
97+
98+
private fun addAndTryDispatching(block: Runnable): Boolean {
99+
queue.addLast(block)
100+
return runningWorkers >= parallelism
101+
}
102+
}
103+
104+
// Save a few bytecode ops
105+
internal fun Int.checkParallelism() = require(this >= 1) { "Expected positive parallelism level, but got $this" }

kotlinx-coroutines-core/js/src/JSDispatcher.kt

+5
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ internal sealed class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay {
3131

3232
abstract fun scheduleQueueProcessing()
3333

34+
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
35+
parallelism.checkParallelism()
36+
return this
37+
}
38+
3439
override fun dispatch(context: CoroutineContext, block: Runnable) {
3540
messageQueue.enqueue(block)
3641
}

kotlinx-coroutines-core/jvm/src/Dispatchers.kt

+23-7
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public actual object Dispatchers {
8686
* Note that if you need your coroutine to be confined to a particular thread or a thread-pool after resumption,
8787
* but still want to execute it in the current call-frame until its first suspension, then you can use
8888
* an optional [CoroutineStart] parameter in coroutine builders like
89-
* [launch][CoroutineScope.launch] and [async][CoroutineScope.async] setting it to the
89+
* [launch][CoroutineScope.launch] and [async][CoroutineScope.async] setting it to
9090
* the value of [CoroutineStart.UNDISPATCHED].
9191
*/
9292
@JvmStatic
@@ -100,22 +100,38 @@ public actual object Dispatchers {
100100
* "`kotlinx.coroutines.io.parallelism`" ([IO_PARALLELISM_PROPERTY_NAME]) system property.
101101
* It defaults to the limit of 64 threads or the number of cores (whichever is larger).
102102
*
103-
* Moreover, the maximum configurable number of threads is capped by the
104-
* `kotlinx.coroutines.scheduler.max.pool.size` system property.
105-
* If you need a higher number of parallel threads,
106-
* you should use a custom dispatcher backed by your own thread pool.
103+
* ### Elasticity for limited parallelism
104+
*
105+
* `Dispatchers.IO` has a unique property of elasticity: its views
106+
* obtained with [CoroutineDispatcher.limitedParallelism] are
107+
* not restricted by the `Dispatchers.IO` parallelism. Conceptually, there is
108+
* a dispatcher backed by an unlimited pool of threads, and both `Dispatchers.IO`
109+
* and views of `Dispatchers.IO` are actually views of that dispatcher. In practice
110+
* this means that, despite not abiding by `Dispatchers.IO`'s parallelism
111+
* restrictions, its views share threads and resources with it.
112+
*
113+
* In the following example
114+
* ```
115+
* // 100 threads for MySQL connection
116+
* val myMysqlDbDispatcher = Dispatchers.IO.limitedParallelism(100)
117+
* // 60 threads for MongoDB connection
118+
* val myMongoDbDispatcher = Dispatchers.IO.limitedParallelism(60)
119+
* ```
120+
* the system may have up to `64 + 100 + 60` threads dedicated to blocking tasks during peak loads,
121+
* but during its steady state there is only a small number of threads shared
122+
* among `Dispatchers.IO`, `myMysqlDbDispatcher` and `myMongoDbDispatcher`.
107123
*
108124
* ### Implementation note
109125
*
110-
* This dispatcher shares threads with the [Default][Dispatchers.Default] dispatcher, so using
126+
* This dispatcher and its views share threads with the [Default][Dispatchers.Default] dispatcher, so using
111127
* `withContext(Dispatchers.IO) { ... }` when already running on the [Default][Dispatchers.Default]
112128
* dispatcher does not lead to an actual switching to another thread &mdash; typically execution
113129
* continues in the same thread.
114130
* As a result of thread sharing, more than 64 (default parallelism) threads can be created (but not used)
115131
* during operations over IO dispatcher.
116132
*/
117133
@JvmStatic
118-
public val IO: CoroutineDispatcher = DefaultScheduler.IO
134+
public val IO: CoroutineDispatcher = DefaultIoScheduler
119135

120136
/**
121137
* Shuts down built-in dispatchers, such as [Default] and [IO],

kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt

+3
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ private class MissingMainCoroutineDispatcher(
9393
override fun isDispatchNeeded(context: CoroutineContext): Boolean =
9494
missing()
9595

96+
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher =
97+
missing()
98+
9699
override suspend fun delay(time: Long) =
97100
missing()
98101

0 commit comments

Comments
 (0)