-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathLimitedParallelismStressTest.kt
148 lines (136 loc) · 4.94 KB
/
LimitedParallelismStressTest.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package kotlinx.coroutines
import kotlinx.coroutines.testing.*
import org.junit.*
import org.junit.Test
import org.junit.runner.*
import org.junit.runners.*
import java.util.concurrent.*
import java.util.concurrent.atomic.*
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.test.*
@RunWith(Parameterized::class)
class LimitedParallelismStressTest(private val targetParallelism: Int) : TestBase() {
companion object {
@Parameterized.Parameters(name = "{0}")
@JvmStatic
fun params(): Collection<Array<Any>> = listOf(1, 2, 3, 4).map { arrayOf(it) }
}
@get:Rule
val executor = ExecutorRule(targetParallelism * 2)
private val iterations = 100_000
private val parallelism = AtomicInteger(0)
private fun checkParallelism() {
val value = parallelism.incrementAndGet()
Thread.yield()
assertTrue { value <= targetParallelism }
parallelism.decrementAndGet()
}
@Test
fun testLimitedExecutor() = runTest {
val view = executor.limitedParallelism(targetParallelism)
doStress {
repeat(iterations) {
launch(view) {
checkParallelism()
}
}
}
}
@Test
fun testLimitedDispatchersIo() = runTest {
val view = Dispatchers.IO.limitedParallelism(targetParallelism)
doStress {
repeat(iterations) {
launch(view) {
checkParallelism()
}
}
}
}
@Test
fun testLimitedDispatchersIoDispatchYield() = runTest {
val view = Dispatchers.IO.limitedParallelism(targetParallelism)
doStress {
launch(view) {
yield()
checkParallelism()
}
}
}
@Test
fun testLimitedExecutorReachesTargetParallelism() = runTest {
val view = executor.limitedParallelism(targetParallelism)
doStress {
repeat(iterations) {
val barrier = CyclicBarrier(targetParallelism + 1)
repeat(targetParallelism) {
launch(view) {
barrier.await()
}
}
// Successfully awaited parallelism + 1
barrier.await()
coroutineContext.job.children.toList().joinAll()
}
}
}
/**
* Checks that dispatcher failures during fairness redispatches don't prevent reaching the target parallelism.
*/
@Test
fun testLimitedFailingDispatcherReachesTargetParallelism() = runTest {
val keepFailing = AtomicBoolean(true)
val occasionallyFailing = object: CoroutineDispatcher() {
override fun dispatch(context: CoroutineContext, block: Runnable) {
if (keepFailing.get() && ThreadLocalRandom.current().nextBoolean()) throw TestException()
executor.dispatch(context, block)
}
}.limitedParallelism(targetParallelism)
doStress {
repeat(1000) {
keepFailing.set(true) // we want the next tasks to sporadically fail
// Start some tasks to make sure redispatching for fairness is happening
repeat(targetParallelism * 16 + 1) {
// targetParallelism * 16 + 1 because we need at least one worker to go through a fairness yield
// with high probability.
try {
occasionallyFailing.dispatch(EmptyCoroutineContext, Runnable {
// do nothing.
})
} catch (_: DispatchException) {
// ignore
}
}
keepFailing.set(false) // we want the next tasks to succeed
val barrier = CyclicBarrier(targetParallelism + 1)
repeat(targetParallelism) {
launch(occasionallyFailing) {
barrier.await()
}
}
val success = launch(Dispatchers.Default) {
// Successfully awaited parallelism + 1
barrier.await()
}
// Feed the dispatcher with more tasks to make sure it's not stuck
while (success.isActive) {
Thread.sleep(1)
repeat(targetParallelism) {
occasionallyFailing.dispatch(EmptyCoroutineContext, Runnable {
// do nothing.
})
}
}
coroutineContext.job.children.toList().joinAll()
}
}
}
private suspend inline fun doStress(crossinline block: suspend CoroutineScope.() -> Unit) {
repeat(stressTestMultiplier) {
coroutineScope {
block()
}
}
}
}