Skip to content

Commit a9bb5d3

Browse files
Limited parallelism improvements (#4098)
* Provide toString implementation for limitedParallelism * Improve documentation of limitedParallelism Co-authored-by: Dmitry Khalanskiy <[email protected]>
1 parent 50a4d86 commit a9bb5d3

File tree

7 files changed

+89
-18
lines changed

7 files changed

+89
-18
lines changed

kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt

+56-15
Original file line numberDiff line numberDiff line change
@@ -65,37 +65,78 @@ public abstract class CoroutineDispatcher :
6565

6666
/**
6767
* Creates a view of the current dispatcher that limits the parallelism to the given [value][parallelism].
68-
* The resulting view uses the original dispatcher for execution, but with the guarantee that
68+
* The resulting view uses the original dispatcher for execution but with the guarantee that
6969
* no more than [parallelism] coroutines are executed at the same time.
7070
*
7171
* This method does not impose restrictions on the number of views or the total sum of parallelism values,
7272
* each view controls its own parallelism independently with the guarantee that the effective parallelism
7373
* of all views cannot exceed the actual parallelism of the original dispatcher.
7474
*
75-
* ### Limitations
76-
*
77-
* The default implementation of `limitedParallelism` does not support direct dispatchers,
78-
* such as executing the given runnable in place during [dispatch] calls.
79-
* Any dispatcher that may return `false` from [isDispatchNeeded] is considered direct.
80-
* For direct dispatchers, it is recommended to override this method
81-
* and provide a domain-specific implementation or to throw an [UnsupportedOperationException].
75+
* The resulting dispatcher does not guarantee that the coroutines will always be dispatched on the same
76+
* subset of threads, it only guarantees that at most [parallelism] coroutines are executed at the same time,
77+
* and reuses threads from the original dispatchers.
78+
* It does not constitute a resource -- it is a _view_ of the underlying dispatcher that can be thrown away
79+
* and is not required to be closed.
8280
*
8381
* ### Example of usage
8482
* ```
85-
* private val backgroundDispatcher = newFixedThreadPoolContext(4, "App Background")
83+
* // Background dispatcher for the application
84+
* val dispatcher = newFixedThreadPoolContext(4, "App Background")
8685
* // At most 2 threads will be processing images as it is really slow and CPU-intensive
87-
* private val imageProcessingDispatcher = backgroundDispatcher.limitedParallelism(2)
86+
* val imageProcessingDispatcher = dispatcher.limitedParallelism(2)
8887
* // At most 3 threads will be processing JSON to avoid image processing starvation
89-
* private val jsonProcessingDispatcher = backgroundDispatcher.limitedParallelism(3)
88+
* val jsonProcessingDispatcher = dispatcher.limitedParallelism(3)
9089
* // At most 1 thread will be doing IO
91-
* private val fileWriterDispatcher = backgroundDispatcher.limitedParallelism(1)
90+
* val fileWriterDispatcher = dispatcher.limitedParallelism(1)
9291
* ```
9392
* Note how in this example the application has an executor with 4 threads, but the total sum of all limits
94-
* is 6. Still, at most 4 coroutines can be executed simultaneously as each view limits only its own parallelism.
93+
* is 6. Still, at most 4 coroutines can be executed simultaneously as each view limits only its own parallelism,
94+
* and at most 4 threads can exist in the system.
9595
*
9696
* Note that this example was structured in such a way that it illustrates the parallelism guarantees.
97-
* In practice, it is usually better to use [Dispatchers.IO] or [Dispatchers.Default] instead of creating a
98-
* `backgroundDispatcher`. It is both possible and advised to call `limitedParallelism` on them.
97+
* In practice, it is usually better to use `Dispatchers.IO` or [Dispatchers.Default] instead of creating a
98+
* `backgroundDispatcher`.
99+
*
100+
* ### `limitedParallelism(1)` pattern
101+
*
102+
* One of the common patterns is confining the execution of specific tasks to a sequential execution in background
103+
* with `limitedParallelism(1)` invocation.
104+
* For that purpose, the implementation guarantees that tasks are executed sequentially and that a happens-before relation
105+
* is established between them:
106+
*
107+
* ```
108+
* val confined = Dispatchers.Default.limitedParallelism(1)
109+
* var counter = 0
110+
*
111+
* // Invoked from arbitrary coroutines
112+
* launch(confined) {
113+
* // This increment is sequential and race-free
114+
* ++counter
115+
* }
116+
* ```
117+
* Note that there is no guarantee that the underlying system thread will always be the same.
118+
*
119+
* ### Dispatchers.IO
120+
*
121+
* `Dispatcher.IO` is considered _elastic_ for the purposes of limited parallelism -- the sum of
122+
* views is not restricted by the capacity of `Dispatchers.IO`.
123+
* It means that it is safe to replace `newFixedThreadPoolContext(nThreads)` with
124+
* `Dispatchers.IO.limitedParallelism(nThreads)` w.r.t. available number of threads.
125+
* See `Dispatchers.IO` documentation for more details.
126+
*
127+
* ### Restrictions and implementation details
128+
*
129+
* The default implementation of `limitedParallelism` does not support direct dispatchers,
130+
* such as executing the given runnable in place during [dispatch] calls.
131+
* Any dispatcher that may return `false` from [isDispatchNeeded] is considered direct.
132+
* For direct dispatchers, it is recommended to override this method
133+
* and provide a domain-specific implementation or to throw an [UnsupportedOperationException].
134+
*
135+
* Implementations of this method are allowed to return `this` if the current dispatcher already satisfies the parallelism requirement.
136+
* For example, `Dispatchers.Main.limitedParallelism(1)` returns `Dispatchers.Main`, because the main dispatcher is already single-threaded.
137+
*
138+
* @throws IllegalArgumentException if the given [parallelism] is non-positive
139+
* @throws UnsupportedOperationException if the current dispatcher does not support limited parallelism views
99140
*/
100141
@ExperimentalCoroutinesApi
101142
public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher {

kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ internal class LimitedDispatcher(
9595
}
9696
}
9797

98+
override fun toString() = "$dispatcher.limitedParallelism($parallelism)"
99+
98100
/**
99101
* A worker that polls the queue and runs tasks until there are no more of them.
100102
*
@@ -125,5 +127,4 @@ internal class LimitedDispatcher(
125127
}
126128
}
127129

128-
// Save a few bytecode ops
129130
internal fun Int.checkParallelism() = require(this >= 1) { "Expected positive parallelism level, but got $this" }

kotlinx-coroutines-core/concurrent/src/Dispatchers.kt

+9
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,15 @@ package kotlinx.coroutines
2626
* the system may have up to `64 + 100 + 60` threads dedicated to blocking tasks during peak loads,
2727
* but during its steady state there is only a small number of threads shared
2828
* among `Dispatchers.IO`, `myMysqlDbDispatcher` and `myMongoDbDispatcher`
29+
*
30+
* It is recommended to replace manually created thread-backed executors with `Dispatchers.IO.limitedParallelism` instead:
31+
* ```
32+
* // Requires manual closing, allocates resources for all threads
33+
* val databasePoolDispatcher = newFixedThreadPoolContext(128)
34+
*
35+
* // Provides the same number of threads as a resource but shares and caches them internally
36+
* val databasePoolDispatcher = Dispatchers.IO.limitedParallelism(128)
37+
* ```
2938
*/
3039
@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
3140
public expect val Dispatchers.IO: CoroutineDispatcher

kotlinx-coroutines-core/concurrent/src/MultithreadedDispatchers.common.kt

+4-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ import kotlin.jvm.*
1919
* associated native resources (threads or native workers). It should not be allocated in place,
2020
* should be closed at the end of its lifecycle, and has non-trivial memory and CPU footprint.
2121
* If you do not need a separate thread pool, but only have to limit effective parallelism of the dispatcher,
22-
* it is recommended to use [CoroutineDispatcher.limitedParallelism] instead.
22+
* it is recommended to use [`Dispatchers.IO.limitedParallelism(1)`][CoroutineDispatcher.limitedParallelism]
23+
* or [`Dispatchers.Default.limitedParallelism(1)`][CoroutineDispatcher.limitedParallelism] instead.
2324
*
2425
* If you need a completely separate thread pool with scheduling policy that is based on the standard
2526
* JDK executors, use the following expression:
@@ -48,7 +49,8 @@ public fun newSingleThreadContext(name: String): CloseableCoroutineDispatcher =
4849
* associated native resources (threads or native workers). It should not be allocated in place,
4950
* should be closed at the end of its lifecycle, and has non-trivial memory and CPU footprint.
5051
* If you do not need a separate thread pool, but only have to limit effective parallelism of the dispatcher,
51-
* it is recommended to use [CoroutineDispatcher.limitedParallelism] instead.
52+
* it is recommended to use [`Dispatchers.IO.limitedParallelism(nThreads)`][CoroutineDispatcher.limitedParallelism]
53+
* or [`Dispatchers.Default.limitedParallelism(nThreads)`][CoroutineDispatcher.limitedParallelism] instead.
5254
*
5355
* If you need a completely separate thread pool with scheduling policy that is based on the standard
5456
* JDK executors, use the following expression:

kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt

+5
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,11 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
184184
(this as Object).notifyAll()
185185
}
186186

187+
// User only for testing and nothing else
187188
internal val isThreadPresent
188189
get() = _thread != null
190+
191+
override fun toString(): String {
192+
return "DefaultExecutor"
193+
}
189194
}

kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt

+5
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ private object UnlimitedIoScheduler : CoroutineDispatcher() {
4949
if (parallelism >= MAX_POOL_SIZE) return this
5050
return super.limitedParallelism(parallelism)
5151
}
52+
53+
// This name only leaks to user code as part of .limitedParallelism machinery
54+
override fun toString(): String {
55+
return "Dispatchers.IO"
56+
}
5257
}
5358

5459
// Dispatchers.IO

kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt

+8
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,12 @@ class DispatchersToStringTest {
1111
assertEquals("Dispatchers.Main[missing]", Dispatchers.Main.toString())
1212
assertEquals("Dispatchers.Main[missing]", Dispatchers.Main.immediate.toString())
1313
}
14+
15+
@Test
16+
fun testLimitedParallelism() {
17+
assertEquals("Dispatchers.IO.limitedParallelism(1)", Dispatchers.IO.limitedParallelism(1).toString())
18+
assertEquals("Dispatchers.Default.limitedParallelism(2)", Dispatchers.Default.limitedParallelism(2).toString())
19+
// Not overridden at all, limited parallelism returns `this`
20+
assertEquals("DefaultExecutor", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString())
21+
}
1422
}

0 commit comments

Comments
 (0)