Skip to content

Commit 7048a1a

Browse files
committed
Fix limitedParallelism implementation on K/N
The initial implementation predates new memory model and was never working on it Fixes #3223
1 parent b545807 commit 7048a1a

File tree

4 files changed

+79
-12
lines changed

4 files changed

+79
-12
lines changed

kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt

+5-4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ internal class LimitedDispatcher(
2323

2424
private val queue = LockFreeTaskQueue<Runnable>(singleConsumer = false)
2525

26+
// A separate object that we can synchronize on for K/N
27+
private val workerAllocationLock = SynchronizedObject()
28+
2629
@ExperimentalCoroutinesApi
2730
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
2831
parallelism.checkParallelism()
@@ -50,8 +53,7 @@ internal class LimitedDispatcher(
5053
continue
5154
}
5255

53-
@Suppress("CAST_NEVER_SUCCEEDS")
54-
synchronized(this as SynchronizedObject) {
56+
synchronized(workerAllocationLock) {
5557
--runningWorkers
5658
if (queue.size == 0) return
5759
++runningWorkers
@@ -87,8 +89,7 @@ internal class LimitedDispatcher(
8789
}
8890

8991
private fun tryAllocateWorker(): Boolean {
90-
@Suppress("CAST_NEVER_SUCCEEDS")
91-
synchronized(this as SynchronizedObject) {
92+
synchronized(workerAllocationLock) {
9293
if (runningWorkers >= parallelism) return false
9394
++runningWorkers
9495
return true
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlin.test.*
8+
9+
class LimitedParallelismSharedTest : TestBase() {
10+
@Test
11+
fun testTaskFairness() = runTest {
12+
val view = Dispatchers.Default.limitedParallelism(1)
13+
val view2 = Dispatchers.Default.limitedParallelism(1)
14+
val j1 = launch(view) {
15+
while (true) {
16+
yield()
17+
}
18+
}
19+
val j2 = launch(view2) { j1.cancel() }
20+
joinAll(j1, j2)
21+
}
22+
23+
@Test
24+
fun testParallelismSpec() {
25+
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(0) }
26+
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(-1) }
27+
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(Int.MIN_VALUE) }
28+
Dispatchers.Default.limitedParallelism(Int.MAX_VALUE)
29+
}
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
import kotlinx.atomicfu.*
6+
import kotlinx.coroutines.*
7+
import kotlinx.coroutines.exceptions.*
8+
import kotlin.test.*
9+
10+
class LimitedParallelismSharedStressTest : TestBase() {
11+
12+
private val targetParallelism = 4
13+
private val iterations = 100_000
14+
private val parallelism = atomic(0)
15+
16+
private fun checkParallelism() {
17+
val value = parallelism.incrementAndGet()
18+
randomWait()
19+
assertTrue { value <= targetParallelism }
20+
parallelism.decrementAndGet()
21+
}
22+
23+
@Test
24+
fun testLimitedExecutor() = runMtTest {
25+
val executor = newFixedThreadPoolContext(targetParallelism, "test")
26+
val view = executor.limitedParallelism(targetParallelism)
27+
doStress {
28+
repeat(iterations) {
29+
launch(view) {
30+
checkParallelism()
31+
}
32+
}
33+
}
34+
executor.close()
35+
}
36+
37+
private suspend inline fun doStress(crossinline block: suspend CoroutineScope.() -> Unit) {
38+
repeat(stressTestMultiplier) {
39+
coroutineScope {
40+
block()
41+
}
42+
}
43+
}
44+
}

kotlinx-coroutines-core/jvm/test/LimitedParallelismTest.kt

-8
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,6 @@ import kotlin.test.*
1111

1212
class LimitedParallelismTest : TestBase() {
1313

14-
@Test
15-
fun testParallelismSpec() {
16-
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(0) }
17-
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(-1) }
18-
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(Int.MIN_VALUE) }
19-
Dispatchers.Default.limitedParallelism(Int.MAX_VALUE)
20-
}
21-
2214
@Test
2315
fun testTaskFairness() = runTest {
2416
val executor = newSingleThreadContext("test")

0 commit comments

Comments
 (0)