Skip to content

Commit cac1a0f

Browse files
qwwdfsadelizarov
authored andcommitted
Provide API to create limited view of experimental dispatcher
Fixes #475
1 parent 8dd5b06 commit cac1a0f

File tree

4 files changed

+123
-7
lines changed

4 files changed

+123
-7
lines changed

core/kotlinx-coroutines-core/src/scheduling/ExperimentalCoroutineDispatcher.kt

+19-6
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,27 @@ class ExperimentalCoroutineDispatcher(
4848

4949
/**
5050
* Creates new coroutine execution context with limited parallelism to execute tasks which may potentially block.
51-
* Resulting [CoroutineDispatcher] doesn't own any resources (its threads) and piggybacks on the original [ExperimentalCoroutineDispatcher],
52-
* executing tasks in this context, giving original dispatcher hint to adjust its behaviour.
51+
* Resulting [CoroutineDispatcher] doesn't own any resources (its threads) and provides a view of the original [ExperimentalCoroutineDispatcher],
52+
* giving it additional hints to adjust its behaviour.
5353
*
54-
* @param parallelism parallelism level, indicating how many threads can execute tasks in given context in parallel.
54+
* @param parallelism parallelism level, indicating how many threads can execute tasks in the resulting dispatcher parallel.
5555
*/
56-
fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher {
56+
public fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher {
5757
require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
58-
return LimitingBlockingDispatcher(this, parallelism, TaskMode.PROBABLY_BLOCKING)
58+
return LimitingDispatcher(this, parallelism, TaskMode.PROBABLY_BLOCKING)
59+
}
60+
61+
/**
62+
* Creates new coroutine execution context with limited parallelism to execute CPU-intensive tasks.
63+
* Resulting [CoroutineDispatcher] doesn't own any resources (its threads) and provides a view of the original [ExperimentalCoroutineDispatcher],
64+
* giving it additional hints to adjust its behaviour.
65+
*
66+
* @param parallelism parallelism level, indicating how many threads can execute tasks in the resulting dispatcher parallel.
67+
*/
68+
public fun limited(parallelism: Int): CoroutineDispatcher {
69+
require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
70+
require(parallelism <= corePoolSize) { "Expected parallelism level lesser than core pool size ($corePoolSize), but have $parallelism" }
71+
return LimitingDispatcher(this, parallelism, TaskMode.NON_BLOCKING)
5972
}
6073

6174
internal fun dispatchWithContext(block: Runnable, context: TaskContext, fair: Boolean): Unit =
@@ -73,7 +86,7 @@ class ExperimentalCoroutineDispatcher(
7386
}
7487
}
7588

76-
private class LimitingBlockingDispatcher(
89+
private class LimitingDispatcher(
7790
val dispatcher: ExperimentalCoroutineDispatcher,
7891
val parallelism: Int,
7992
override val taskMode: TaskMode
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package kotlinx.coroutines.experimental.scheduling
2+
3+
import kotlinx.atomicfu.*
4+
import kotlinx.coroutines.experimental.*
5+
import org.junit.Test
6+
import kotlin.coroutines.experimental.*
7+
import kotlin.test.*
8+
9+
class LimitingCoroutineDispatcherStressTest : SchedulerTestBase() {
10+
11+
init {
12+
corePoolSize = 3
13+
}
14+
15+
private val blocking = blockingDispatcher(2)
16+
private val cpuView = view(2)
17+
private val cpuView2 = view(2)
18+
private val concurrentWorkers = atomic(0)
19+
private val iterations = 25_000 * stressTestMultiplierSqrt
20+
21+
@Test
22+
fun testCpuLimitNotExtended() = runBlocking<Unit> {
23+
val tasks = ArrayList<Deferred<*>>(iterations * 2)
24+
repeat(iterations) {
25+
tasks += task(cpuView, 3)
26+
tasks += task(cpuView2, 3)
27+
}
28+
29+
tasks.awaitAll()
30+
}
31+
32+
@Test
33+
fun testCpuLimitWithBlocking() = runBlocking<Unit> {
34+
val tasks = ArrayList<Deferred<*>>(iterations * 2)
35+
repeat(iterations) {
36+
tasks += task(cpuView, 4)
37+
tasks += task(blocking, 4)
38+
}
39+
40+
tasks.awaitAll()
41+
}
42+
43+
private fun task(ctx: CoroutineContext, maxLimit: Int): Deferred<Unit> {
44+
return async(ctx) {
45+
try {
46+
val currentlyExecuting = concurrentWorkers.incrementAndGet()
47+
assertTrue(currentlyExecuting <= maxLimit, "Executing: $currentlyExecuting, max limit: $maxLimit")
48+
} finally {
49+
concurrentWorkers.decrementAndGet()
50+
}
51+
}
52+
}
53+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package kotlinx.coroutines.experimental.scheduling
2+
3+
import kotlinx.coroutines.experimental.*
4+
import org.junit.*
5+
import java.util.concurrent.*
6+
7+
class LimitingDispatcherTest : SchedulerTestBase() {
8+
9+
@Test(expected = IllegalArgumentException::class)
10+
fun testTooLargeView() {
11+
view(corePoolSize + 1)
12+
}
13+
14+
@Test(expected = IllegalArgumentException::class)
15+
fun testNegativeView() {
16+
view(-1)
17+
}
18+
19+
@Test(expected = IllegalArgumentException::class)
20+
fun testZeroView() {
21+
view(0)
22+
}
23+
24+
@Test(timeout = 10_000)
25+
fun testBlockingInterleave() = runBlocking {
26+
corePoolSize = 3
27+
val view = view(2)
28+
val blocking = blockingDispatcher(4)
29+
val barrier = CyclicBarrier(6)
30+
val tasks = ArrayList<Job>(6)
31+
repeat(2) {
32+
tasks += async(view) {
33+
barrier.await()
34+
}
35+
36+
repeat(2) {
37+
tasks += async(blocking) {
38+
barrier.await()
39+
}
40+
}
41+
}
42+
43+
tasks.joinAll()
44+
}
45+
}

core/kotlinx-coroutines-core/test/scheduling/SchedulerTestBase.kt

+6-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ abstract class SchedulerTestBase : TestBase() {
6060
}
6161

6262
private val exception = atomic<Throwable?>(null)
63-
private val handler = CoroutineExceptionHandler({ _, e -> exception.value = e })
63+
private val handler = CoroutineExceptionHandler { _, e -> exception.value = e }
6464

6565
protected var corePoolSize = 1
6666
protected var maxPoolSize = 1024
@@ -89,6 +89,11 @@ abstract class SchedulerTestBase : TestBase() {
8989
return _dispatcher!!.blocking(parallelism) + handler
9090
}
9191

92+
protected fun view(parallelism: Int): CoroutineContext {
93+
val intitialize = dispatcher
94+
return _dispatcher!!.limited(parallelism) + handler
95+
}
96+
9297
fun initialPoolSize() = corePoolSize.coerceAtMost(2)
9398

9499
@After

0 commit comments

Comments
 (0)