Skip to content

Commit dfbd4a8

Browse files
authored
Add optional name parameter to .limitedParallelism (#4106)
Fixes #4023
1 parent a9bb5d3 commit dfbd4a8

File tree

14 files changed

+104
-32
lines changed

14 files changed

+104
-32
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+4-2
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,9 @@ public abstract class kotlinx/coroutines/CoroutineDispatcher : kotlin/coroutines
170170
public fun get (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext$Element;
171171
public final fun interceptContinuation (Lkotlin/coroutines/Continuation;)Lkotlin/coroutines/Continuation;
172172
public fun isDispatchNeeded (Lkotlin/coroutines/CoroutineContext;)Z
173-
public fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher;
173+
public synthetic fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher;
174+
public fun limitedParallelism (ILjava/lang/String;)Lkotlinx/coroutines/CoroutineDispatcher;
175+
public static synthetic fun limitedParallelism$default (Lkotlinx/coroutines/CoroutineDispatcher;ILjava/lang/String;ILjava/lang/Object;)Lkotlinx/coroutines/CoroutineDispatcher;
174176
public fun minusKey (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext;
175177
public final fun plus (Lkotlinx/coroutines/CoroutineDispatcher;)Lkotlinx/coroutines/CoroutineDispatcher;
176178
public final fun releaseInterceptedContinuation (Lkotlin/coroutines/Continuation;)V
@@ -502,7 +504,7 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin
502504
public abstract class kotlinx/coroutines/MainCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher {
503505
public fun <init> ()V
504506
public abstract fun getImmediate ()Lkotlinx/coroutines/MainCoroutineDispatcher;
505-
public fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher;
507+
public fun limitedParallelism (ILjava/lang/String;)Lkotlinx/coroutines/CoroutineDispatcher;
506508
public fun toString ()Ljava/lang/String;
507509
protected final fun toStringInternalImpl ()Ljava/lang/String;
508510
}

kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api

+2-1
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,15 @@ abstract class kotlinx.coroutines/CoroutineDispatcher : kotlin.coroutines/Abstra
5959
open fun dispatchYield(kotlin.coroutines/CoroutineContext, kotlinx.coroutines/Runnable) // kotlinx.coroutines/CoroutineDispatcher.dispatchYield|dispatchYield(kotlin.coroutines.CoroutineContext;kotlinx.coroutines.Runnable){}[0]
6060
open fun isDispatchNeeded(kotlin.coroutines/CoroutineContext): kotlin/Boolean // kotlinx.coroutines/CoroutineDispatcher.isDispatchNeeded|isDispatchNeeded(kotlin.coroutines.CoroutineContext){}[0]
6161
open fun limitedParallelism(kotlin/Int): kotlinx.coroutines/CoroutineDispatcher // kotlinx.coroutines/CoroutineDispatcher.limitedParallelism|limitedParallelism(kotlin.Int){}[0]
62+
open fun limitedParallelism(kotlin/Int, kotlin/String? =...): kotlinx.coroutines/CoroutineDispatcher // kotlinx.coroutines/CoroutineDispatcher.limitedParallelism|limitedParallelism(kotlin.Int;kotlin.String?){}[0]
6263
open fun toString(): kotlin/String // kotlinx.coroutines/CoroutineDispatcher.toString|toString(){}[0]
6364
}
6465
abstract class kotlinx.coroutines/MainCoroutineDispatcher : kotlinx.coroutines/CoroutineDispatcher { // kotlinx.coroutines/MainCoroutineDispatcher|null[0]
6566
abstract val immediate // kotlinx.coroutines/MainCoroutineDispatcher.immediate|{}immediate[0]
6667
abstract fun <get-immediate>(): kotlinx.coroutines/MainCoroutineDispatcher // kotlinx.coroutines/MainCoroutineDispatcher.immediate.<get-immediate>|<get-immediate>(){}[0]
6768
constructor <init>() // kotlinx.coroutines/MainCoroutineDispatcher.<init>|<init>(){}[0]
6869
final fun toStringInternalImpl(): kotlin/String? // kotlinx.coroutines/MainCoroutineDispatcher.toStringInternalImpl|toStringInternalImpl(){}[0]
69-
open fun limitedParallelism(kotlin/Int): kotlinx.coroutines/CoroutineDispatcher // kotlinx.coroutines/MainCoroutineDispatcher.limitedParallelism|limitedParallelism(kotlin.Int){}[0]
70+
open fun limitedParallelism(kotlin/Int, kotlin/String?): kotlinx.coroutines/CoroutineDispatcher // kotlinx.coroutines/MainCoroutineDispatcher.limitedParallelism|limitedParallelism(kotlin.Int;kotlin.String?){}[0]
7071
open fun toString(): kotlin/String // kotlinx.coroutines/MainCoroutineDispatcher.toString|toString(){}[0]
7172
}
7273
abstract fun interface <#A: in kotlin/Any?> kotlinx.coroutines.flow/FlowCollector { // kotlinx.coroutines.flow/FlowCollector|null[0]

kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt

+15-6
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,11 @@ public abstract class CoroutineDispatcher :
8383
* // Background dispatcher for the application
8484
* val dispatcher = newFixedThreadPoolContext(4, "App Background")
8585
* // At most 2 threads will be processing images as it is really slow and CPU-intensive
86-
* val imageProcessingDispatcher = dispatcher.limitedParallelism(2)
86+
* val imageProcessingDispatcher = dispatcher.limitedParallelism(2, "Image processor")
8787
* // At most 3 threads will be processing JSON to avoid image processing starvation
88-
* val jsonProcessingDispatcher = dispatcher.limitedParallelism(3)
88+
* val jsonProcessingDispatcher = dispatcher.limitedParallelism(3, "Json processor")
8989
* // At most 1 thread will be doing IO
90-
* val fileWriterDispatcher = dispatcher.limitedParallelism(1)
90+
* val fileWriterDispatcher = dispatcher.limitedParallelism(1, "File writer")
9191
* ```
9292
* Note how in this example the application has an executor with 4 threads, but the total sum of all limits
9393
* is 6. Still, at most 4 coroutines can be executed simultaneously as each view limits only its own parallelism,
@@ -105,7 +105,7 @@ public abstract class CoroutineDispatcher :
105105
* is established between them:
106106
*
107107
* ```
108-
* val confined = Dispatchers.Default.limitedParallelism(1)
108+
* val confined = Dispatchers.Default.limitedParallelism(1, "incrementDispatcher")
109109
* var counter = 0
110110
*
111111
* // Invoked from arbitrary coroutines
@@ -135,15 +135,24 @@ public abstract class CoroutineDispatcher :
135135
* Implementations of this method are allowed to return `this` if the current dispatcher already satisfies the parallelism requirement.
136136
* For example, `Dispatchers.Main.limitedParallelism(1)` returns `Dispatchers.Main`, because the main dispatcher is already single-threaded.
137137
*
138+
* @param name optional name for the resulting dispatcher string representation if a new dispatcher was created.
139+
* Implementations are free to ignore this parameter.
138140
* @throws IllegalArgumentException if the given [parallelism] is non-positive
139141
* @throws UnsupportedOperationException if the current dispatcher does not support limited parallelism views
140142
*/
141143
@ExperimentalCoroutinesApi
142-
public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
144+
public open fun limitedParallelism(parallelism: Int, name: String? = null): CoroutineDispatcher {
143145
parallelism.checkParallelism()
144-
return LimitedDispatcher(this, parallelism)
146+
return LimitedDispatcher(this, parallelism, name)
145147
}
146148

149+
// Was experimental since 1.6.0, deprecated since 1.8.x
150+
@Deprecated("Deprecated for good. Override 'limitedParallelism(parallelism: Int, name: String?)' instead",
151+
level = DeprecationLevel.HIDDEN,
152+
replaceWith = ReplaceWith("limitedParallelism(parallelism, null)")
153+
)
154+
public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher = limitedParallelism(parallelism, null)
155+
147156
/**
148157
* Requests execution of a runnable [block].
149158
* The dispatcher guarantees that [block] will eventually execute, typically by dispatching it to a thread pool,

kotlinx-coroutines-core/common/src/EventLoop.common.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,9 @@ internal abstract class EventLoop : CoroutineDispatcher() {
111111
}
112112
}
113113

114-
final override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
114+
final override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
115115
parallelism.checkParallelism()
116-
return this
116+
return namedOrThis(name) // Single-threaded, short-circuit
117117
}
118118

119119
open fun shutdown() {}

kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,10 @@ public abstract class MainCoroutineDispatcher : CoroutineDispatcher() {
4949
*/
5050
override fun toString(): String = toStringInternalImpl() ?: "$classSimpleName@$hexAddress"
5151

52-
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
52+
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
5353
parallelism.checkParallelism()
5454
// MainCoroutineDispatcher is single-threaded -- short-circuit any attempts to limit it
55-
return this
55+
return namedOrThis(name)
5656
}
5757

5858
/**

kotlinx-coroutines-core/common/src/Unconfined.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import kotlin.jvm.*
99
internal object Unconfined : CoroutineDispatcher() {
1010

1111
@ExperimentalCoroutinesApi
12-
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
12+
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
1313
throw UnsupportedOperationException("limitedParallelism is not supported for Dispatchers.Unconfined")
1414
}
1515

kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt

+11-5
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ import kotlin.coroutines.*
2121
*/
2222
internal class LimitedDispatcher(
2323
private val dispatcher: CoroutineDispatcher,
24-
private val parallelism: Int
24+
private val parallelism: Int,
25+
private val name: String?
2526
) : CoroutineDispatcher(), Delay by (dispatcher as? Delay ?: DefaultDelay) {
2627

2728
// Atomic is necessary here for the sake of K/N memory ordering,
@@ -34,10 +35,10 @@ internal class LimitedDispatcher(
3435
private val workerAllocationLock = SynchronizedObject()
3536

3637
@ExperimentalCoroutinesApi
37-
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
38+
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
3839
parallelism.checkParallelism()
39-
if (parallelism >= this.parallelism) return this
40-
return super.limitedParallelism(parallelism)
40+
if (parallelism >= this.parallelism) return namedOrThis(name)
41+
return super.limitedParallelism(parallelism, name)
4142
}
4243

4344
override fun dispatch(context: CoroutineContext, block: Runnable) {
@@ -95,7 +96,7 @@ internal class LimitedDispatcher(
9596
}
9697
}
9798

98-
override fun toString() = "$dispatcher.limitedParallelism($parallelism)"
99+
override fun toString() = name ?: "$dispatcher.limitedParallelism($parallelism)"
99100

100101
/**
101102
* A worker that polls the queue and runs tasks until there are no more of them.
@@ -128,3 +129,8 @@ internal class LimitedDispatcher(
128129
}
129130

130131
internal fun Int.checkParallelism() = require(this >= 1) { "Expected positive parallelism level, but got $this" }
132+
133+
internal fun CoroutineDispatcher.namedOrThis(name: String?): CoroutineDispatcher {
134+
if (name != null) return NamedDispatcher(this, name)
135+
return this
136+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package kotlinx.coroutines.internal
2+
3+
import kotlinx.coroutines.*
4+
import kotlinx.coroutines.DefaultDelay
5+
import kotlin.coroutines.*
6+
7+
/**
8+
* Wrapping dispatcher that has a nice user-supplied `toString()` representation
9+
*/
10+
internal class NamedDispatcher(
11+
private val dispatcher: CoroutineDispatcher,
12+
private val name: String
13+
) : CoroutineDispatcher(), Delay by (dispatcher as? Delay ?: DefaultDelay) {
14+
15+
override fun isDispatchNeeded(context: CoroutineContext): Boolean = dispatcher.isDispatchNeeded(context)
16+
17+
override fun dispatch(context: CoroutineContext, block: Runnable) = dispatcher.dispatch(context, block)
18+
19+
@InternalCoroutinesApi
20+
override fun dispatchYield(context: CoroutineContext, block: Runnable) = dispatcher.dispatchYield(context, block)
21+
22+
override fun toString(): String {
23+
return name
24+
}
25+
}

kotlinx-coroutines-core/jsAndWasmShared/src/internal/JSDispatcher.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ internal abstract class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay
3030

3131
abstract fun scheduleQueueProcessing()
3232

33-
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
33+
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
3434
parallelism.checkParallelism()
35-
return this
35+
return namedOrThis(name)
3636
}
3737

3838
override fun dispatch(context: CoroutineContext, block: Runnable) {

kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ private class MissingMainCoroutineDispatcher(
9191
override fun isDispatchNeeded(context: CoroutineContext): Boolean =
9292
missing()
9393

94-
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher =
94+
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher =
9595
missing()
9696

9797
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =

kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt

+12-8
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@ internal object DefaultScheduler : SchedulerCoroutineDispatcher(
1212
) {
1313

1414
@ExperimentalCoroutinesApi
15-
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
15+
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
1616
parallelism.checkParallelism()
17-
if (parallelism >= CORE_POOL_SIZE) return this
18-
return super.limitedParallelism(parallelism)
17+
if (parallelism >= CORE_POOL_SIZE) {
18+
return namedOrThis(name)
19+
}
20+
return super.limitedParallelism(parallelism, name)
1921
}
2022

2123
// Shuts down the dispatcher, used only by Dispatchers.shutdown()
@@ -44,10 +46,12 @@ private object UnlimitedIoScheduler : CoroutineDispatcher() {
4446
}
4547

4648
@ExperimentalCoroutinesApi
47-
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
49+
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
4850
parallelism.checkParallelism()
49-
if (parallelism >= MAX_POOL_SIZE) return this
50-
return super.limitedParallelism(parallelism)
51+
if (parallelism >= MAX_POOL_SIZE) {
52+
return namedOrThis(name)
53+
}
54+
return super.limitedParallelism(parallelism, name)
5155
}
5256

5357
// This name only leaks to user code as part of .limitedParallelism machinery
@@ -72,9 +76,9 @@ internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor {
7276
override fun execute(command: java.lang.Runnable) = dispatch(EmptyCoroutineContext, command)
7377

7478
@ExperimentalCoroutinesApi
75-
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
79+
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
7680
// See documentation to Dispatchers.IO for the rationale
77-
return UnlimitedIoScheduler.limitedParallelism(parallelism)
81+
return UnlimitedIoScheduler.limitedParallelism(parallelism, name)
7882
}
7983

8084
override fun dispatch(context: CoroutineContext, block: Runnable) {

kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt

+24
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
@file:OptIn(ExperimentalStdlibApi::class)
2+
13
package kotlinx.coroutines
24

35
import kotlin.test.*
@@ -18,5 +20,27 @@ class DispatchersToStringTest {
1820
assertEquals("Dispatchers.Default.limitedParallelism(2)", Dispatchers.Default.limitedParallelism(2).toString())
1921
// Not overridden at all, limited parallelism returns `this`
2022
assertEquals("DefaultExecutor", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString())
23+
24+
assertEquals("filesDispatcher", Dispatchers.IO.limitedParallelism(1, "filesDispatcher").toString())
25+
assertEquals("json", Dispatchers.Default.limitedParallelism(2, "json").toString())
26+
assertEquals("\uD80C\uDE11", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42, "\uD80C\uDE11").toString())
27+
assertEquals("DefaultExecutor", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString())
28+
29+
val limitedNamed = Dispatchers.IO.limitedParallelism(10, "limited")
30+
assertEquals("limited.limitedParallelism(2)", limitedNamed.limitedParallelism(2).toString())
31+
assertEquals("2", limitedNamed.limitedParallelism(2, "2").toString())
32+
// We asked for too many threads with no name, this was returned
33+
assertEquals("limited", limitedNamed.limitedParallelism(12).toString())
34+
assertEquals("12", limitedNamed.limitedParallelism(12, "12").toString())
35+
36+
runBlocking {
37+
val d = coroutineContext[CoroutineDispatcher]!!
38+
assertContains(d.toString(), "BlockingEventLoop")
39+
val limited = d.limitedParallelism(2)
40+
assertContains(limited.toString(), "BlockingEventLoop")
41+
assertFalse(limited.toString().contains("limitedParallelism"))
42+
val named = d.limitedParallelism(2, "Named")
43+
assertEquals("Named", named.toString())
44+
}
2145
}
2246
}

kotlinx-coroutines-core/native/src/Dispatchers.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ internal object DefaultIoScheduler : CoroutineDispatcher() {
2828
private val io = unlimitedPool.limitedParallelism(64) // Default JVM size
2929

3030
@ExperimentalCoroutinesApi
31-
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
31+
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
3232
// See documentation to Dispatchers.IO for the rationale
33-
return unlimitedPool.limitedParallelism(parallelism)
33+
return unlimitedPool.limitedParallelism(parallelism, name)
3434
}
3535

3636
override fun dispatch(context: CoroutineContext, block: Runnable) {

test-utils/common/src/TestBase.common.kt

+1
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,7 @@ public class TestRuntimeException(message: String? = null, private val data: Any
252252
public class RecoverableTestException(message: String? = null) : RuntimeException(message)
253253
public class RecoverableTestCancellationException(message: String? = null) : CancellationException(message)
254254

255+
// Erases identity and equality checks for tests
255256
public fun wrapperDispatcher(context: CoroutineContext): CoroutineContext {
256257
val dispatcher = context[ContinuationInterceptor] as CoroutineDispatcher
257258
return object : CoroutineDispatcher() {

0 commit comments

Comments
 (0)