-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathThreadLocalStressTest.kt
165 lines (149 loc) · 5.61 KB
/
ThreadLocalStressTest.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
import kotlinx.coroutines.sync.*
import java.util.concurrent.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.test.*
class ThreadLocalStressTest : TestBase() {
private val threadLocal = ThreadLocal<String>()
// See the comment in doStress for the machinery
@Test
fun testStress() = runTest {
repeat (100 * stressTestMultiplierSqrt) {
withContext(Dispatchers.Default) {
repeat(100) {
launch {
doStress(null)
}
}
}
}
}
@Test
fun testStressWithOuterValue() = runTest {
repeat (100 * stressTestMultiplierSqrt) {
withContext(Dispatchers.Default + threadLocal.asContextElement("bar")) {
repeat(100) {
launch {
doStress("bar")
}
}
}
}
}
private suspend fun doStress(expectedValue: String?) {
assertEquals(expectedValue, threadLocal.get())
try {
/*
* Here we are using very specific code-path to trigger the execution we want to.
* The bug, in general, has a larger impact, but this particular code pinpoints it:
*
* 1) We use _undispatched_ withContext with thread element
* 2) We cancel the coroutine
* 3) We use 'suspendCancellableCoroutineReusable' that does _postponed_ cancellation check
* which makes the reproduction of this race pretty reliable.
*
* Now the following code path is likely to be triggered:
*
* T1 from within 'withContinuationContext' method:
* Finds 'oldValue', finds undispatched completion, invokes its 'block' argument.
* 'block' is this coroutine, it goes to 'trySuspend', checks for postponed cancellation and *dispatches* it.
* The execution stops _right_ before 'undispatchedCompletion.clearThreadContext()'.
*
* T2 now executes the dispatched cancellation and concurrently mutates the state of the undispatched completion.
* All bets are off, now both threads can leave the thread locals state inconsistent.
*/
withContext(threadLocal.asContextElement("foo")) {
yield()
cancel()
suspendCancellableCoroutineReusable<Unit> { }
}
} finally {
assertEquals(expectedValue, threadLocal.get())
}
}
/*
* Another set of tests for undispatcheable continuations that do not require stress test multiplier.
* Also note that `uncaughtExceptionHandler` is used as the only available mechanism to propagate error from
* `resumeWith`
*/
@Test
fun testNonDispatcheableLeak() {
repeat(100) {
doTestWithPreparation(
::doTest,
{ threadLocal.set(null) }) { threadLocal.get() != null }
assertNull(threadLocal.get())
}
}
@Test
fun testNonDispatcheableLeakWithInitial() {
repeat(100) {
doTestWithPreparation(::doTest, { threadLocal.set("initial") }) { threadLocal.get() != "initial" }
assertEquals("initial", threadLocal.get())
}
}
@Test
fun testNonDispatcheableLeakWithContextSwitch() {
repeat(100) {
doTestWithPreparation(
::doTestWithContextSwitch,
{ threadLocal.set(null) }) { threadLocal.get() != null }
assertNull(threadLocal.get())
}
}
@Test
fun testNonDispatcheableLeakWithInitialWithContextSwitch() {
repeat(100) {
doTestWithPreparation(
::doTestWithContextSwitch,
{ threadLocal.set("initial") }) { false /* can randomly wake up on the non-main thread */ }
// Here we are always on the main thread
assertEquals("initial", threadLocal.get())
}
}
private fun doTestWithPreparation(testBody: suspend () -> Unit, setup: () -> Unit, isInvalid: () -> Boolean) {
setup()
val latch = CountDownLatch(1)
testBody.startCoroutineUninterceptedOrReturn(Continuation(EmptyCoroutineContext) {
if (isInvalid()) {
Thread.currentThread().uncaughtExceptionHandler.uncaughtException(
Thread.currentThread(),
IllegalStateException("Unexpected error: thread local was not cleaned")
)
}
latch.countDown()
})
latch.await()
}
private suspend fun doTest() {
withContext(threadLocal.asContextElement("foo")) {
try {
coroutineScope {
val semaphore = Semaphore(1, 1)
cancel()
semaphore.acquire()
}
} catch (e: CancellationException) {
// Ignore cancellation
}
}
}
private suspend fun doTestWithContextSwitch() {
withContext(threadLocal.asContextElement("foo")) {
try {
coroutineScope {
val semaphore = Semaphore(1, 1)
GlobalScope.launch { }.join()
cancel()
semaphore.acquire()
}
} catch (e: CancellationException) {
// Ignore cancellation
}
}
}
}