Skip to content

Commit dab2195

Browse files
committed
Add and enhance stress tests for attaching completion handlers
1 parent a3f5532 commit dab2195

File tree

3 files changed

+227
-9
lines changed

3 files changed

+227
-9
lines changed

kotlinx-coroutines-core/jvm/test/JobChildStressTest.kt

+31-8
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,36 @@
11
package kotlinx.coroutines
22

33
import kotlinx.coroutines.testing.*
4-
import org.junit.*
5-
import org.junit.Test
64
import java.util.concurrent.*
75
import java.util.concurrent.atomic.*
86
import kotlin.test.*
97

8+
/**
9+
* Testing the procedure of attaching a child to the parent job.
10+
*/
1011
class JobChildStressTest : TestBase() {
1112
private val N_ITERATIONS = 10_000 * stressTestMultiplier
12-
private val pool = newFixedThreadPoolContext(3, "JobChildStressTest")
13+
private val pool = newFixedThreadPoolContext(2, "JobChildStressTest")
1314

14-
@After
15+
@AfterTest
1516
fun tearDown() {
1617
pool.close()
1718
}
1819

1920
/**
20-
* Perform concurrent launch of a child job & cancellation of the explicit parent job
21+
* Tests attaching a child while the parent is trying to finalize its state.
22+
*
23+
* Checks the following interleavings:
24+
* - A child attaches before the parent is cancelled.
25+
* - A child attaches after the parent is cancelled, but before the parent notifies anyone about it.
26+
* - A child attaches after the parent notifies the children about being cancelled,
27+
* but before it starts waiting for its children.
28+
* - A child attempts to attach after the parent stops waiting for its children,
29+
* which immediately cancels the child.
2130
*/
2231
@Test
2332
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
24-
fun testChild() = runTest {
33+
fun testChildAttachmentRacingWithCancellation() = runTest {
2534
val barrier = CyclicBarrier(3)
2635
repeat(N_ITERATIONS) {
2736
var wasLaunched = false
@@ -30,7 +39,7 @@ class JobChildStressTest : TestBase() {
3039
unhandledException = ex
3140
}
3241
val scope = CoroutineScope(pool + handler)
33-
val parent = CompletableDeferred<Unit>()
42+
val parent = createCompletableDeferredForTesting(it)
3443
// concurrent child launcher
3544
val launcher = scope.launch {
3645
barrier.await()
@@ -56,13 +65,27 @@ class JobChildStressTest : TestBase() {
5665
}
5766
}
5867

68+
/**
69+
* Tests attaching a child while the parent is waiting for the last child job to complete.
70+
*
71+
* Checks the following interleavings:
72+
* - A child attaches while the parent is already completing, but is waiting for its children.
73+
* - A child attempts to attach after the parent stops waiting for its children,
74+
* which immediately cancels the child.
75+
*/
5976
@Test
60-
fun testFailingChildIsAddedWhenJobFinalizesItsState() {
77+
fun testChildAttachmentRacingWithLastChildCompletion() {
6178
// All exceptions should get aggregated here
6279
repeat(N_ITERATIONS) {
6380
runBlocking {
6481
val rogueJob = AtomicReference<Job?>()
82+
/** not using [createCompletableDeferredForTesting] because we don't need extra children. */
6583
val deferred = CompletableDeferred<Unit>()
84+
// optionally, add a completion handler to the parent job, so that the child tries to enter a list with
85+
// multiple elements, not just one.
86+
if (it.mod(2) == 0) {
87+
deferred.invokeOnCompletion { }
88+
}
6689
launch(pool + deferred) {
6790
deferred.complete(Unit) // Transition deferred into "completing" state waiting for current child
6891
// **Asynchronously** submit task that launches a child so it races with completion

kotlinx-coroutines-core/jvm/test/JobHandlersUpgradeStressTest.kt

+4-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ class JobHandlersUpgradeStressTest : TestBase() {
3030
val state = atomic(0)
3131
}
3232

33+
/**
34+
* Tests handlers not being invoked more than once.
35+
*/
3336
@Test
3437
fun testStress() {
3538
println("--- JobHandlersUpgradeStressTest")
@@ -91,4 +94,4 @@ class JobHandlersUpgradeStressTest : TestBase() {
9194
println(" Fired handler ${fired.value} times")
9295

9396
}
94-
}
97+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
package kotlinx.coroutines
2+
3+
import kotlinx.coroutines.channels.*
4+
import kotlinx.coroutines.testing.*
5+
import java.util.concurrent.CyclicBarrier
6+
import java.util.concurrent.atomic.*
7+
import kotlin.test.*
8+
import kotlin.time.Duration.Companion.seconds
9+
10+
class JobOnCompletionStressTest: TestBase() {
11+
private val N_ITERATIONS = 10_000 * stressTestMultiplier
12+
private val pool = newFixedThreadPoolContext(2, "JobOnCompletionStressTest")
13+
14+
private val completionHandlerSeesCompletedParent = AtomicBoolean(false)
15+
private val completionHandlerSeesCancelledParent = AtomicBoolean(false)
16+
private val encounteredException = AtomicReference<Throwable?>(null)
17+
18+
@AfterTest
19+
fun tearDown() {
20+
pool.close()
21+
}
22+
23+
@Test
24+
fun testOnCompletionRacingWithCompletion() = runTest {
25+
testHandlerRacingWithCancellation(
26+
onCancelling = false,
27+
invokeImmediately = true,
28+
parentCompletion = { complete(Unit) }
29+
) {
30+
assertNull(encounteredException.get())
31+
assertTrue(completionHandlerSeesCompletedParent.get())
32+
assertFalse(completionHandlerSeesCancelledParent.get())
33+
}
34+
}
35+
36+
@Test
37+
fun testOnCompletionRacingWithCancellation() = runTest {
38+
testHandlerRacingWithCancellation(
39+
onCancelling = false,
40+
invokeImmediately = true,
41+
parentCompletion = { completeExceptionally(TestException()) }
42+
) {
43+
assertIs<TestException>(encounteredException.get())
44+
assertTrue(completionHandlerSeesCompletedParent.get())
45+
assertTrue(completionHandlerSeesCancelledParent.get())
46+
}
47+
}
48+
49+
@Test
50+
fun testOnCancellingRacingWithCompletion() = runTest {
51+
testHandlerRacingWithCancellation(
52+
onCancelling = true,
53+
invokeImmediately = true,
54+
parentCompletion = { complete(Unit) }
55+
) {
56+
assertNull(encounteredException.get())
57+
assertTrue(completionHandlerSeesCompletedParent.get())
58+
assertFalse(completionHandlerSeesCancelledParent.get())
59+
}
60+
}
61+
62+
@Test
63+
fun testOnCancellingRacingWithCancellation() = runTest {
64+
testHandlerRacingWithCancellation(
65+
onCancelling = true,
66+
invokeImmediately = true,
67+
parentCompletion = { completeExceptionally(TestException()) }
68+
) {
69+
assertIs<TestException>(encounteredException.get())
70+
assertTrue(completionHandlerSeesCancelledParent.get())
71+
}
72+
}
73+
74+
@Test
75+
fun testNonImmediateOnCompletionRacingWithCompletion() = runTest {
76+
testHandlerRacingWithCancellation(
77+
onCancelling = false,
78+
invokeImmediately = false,
79+
parentCompletion = { complete(Unit) }
80+
) {
81+
assertNull(encounteredException.get())
82+
assertTrue(completionHandlerSeesCompletedParent.get())
83+
assertFalse(completionHandlerSeesCancelledParent.get())
84+
}
85+
}
86+
87+
@Test
88+
fun testNonImmediateOnCompletionRacingWithCancellation() = runTest {
89+
testHandlerRacingWithCancellation(
90+
onCancelling = false,
91+
invokeImmediately = false,
92+
parentCompletion = { completeExceptionally(TestException()) }
93+
) {
94+
assertIs<TestException>(encounteredException.get())
95+
assertTrue(completionHandlerSeesCompletedParent.get())
96+
assertTrue(completionHandlerSeesCancelledParent.get())
97+
}
98+
}
99+
100+
@Test
101+
fun testNonImmediateOnCancellingRacingWithCompletion() = runTest {
102+
testHandlerRacingWithCancellation(
103+
onCancelling = true,
104+
invokeImmediately = false,
105+
parentCompletion = { complete(Unit) }
106+
) {
107+
assertNull(encounteredException.get())
108+
assertTrue(completionHandlerSeesCompletedParent.get())
109+
assertFalse(completionHandlerSeesCancelledParent.get())
110+
}
111+
}
112+
113+
@Test
114+
fun testNonImmediateOnCancellingRacingWithCancellation() = runTest {
115+
testHandlerRacingWithCancellation(
116+
onCancelling = true,
117+
invokeImmediately = false,
118+
parentCompletion = { completeExceptionally(TestException()) }
119+
) {
120+
assertIs<TestException>(encounteredException.get())
121+
assertTrue(completionHandlerSeesCancelledParent.get())
122+
}
123+
}
124+
125+
private suspend fun testHandlerRacingWithCancellation(
126+
onCancelling: Boolean,
127+
invokeImmediately: Boolean,
128+
parentCompletion: CompletableDeferred<Unit>.() -> Unit,
129+
validate: () -> Unit,
130+
) {
131+
repeat(N_ITERATIONS) {
132+
val entered = Channel<Unit>(1)
133+
completionHandlerSeesCompletedParent.set(false)
134+
completionHandlerSeesCancelledParent.set(false)
135+
encounteredException.set(null)
136+
val parent = createCompletableDeferredForTesting(it)
137+
val barrier = CyclicBarrier(2)
138+
val handlerInstallJob = coroutineScope {
139+
launch(pool) {
140+
barrier.await()
141+
parent.parentCompletion()
142+
}
143+
async(pool) {
144+
barrier.await()
145+
parent.invokeOnCompletion(
146+
onCancelling = onCancelling,
147+
invokeImmediately = invokeImmediately,
148+
) { exception ->
149+
encounteredException.set(exception)
150+
completionHandlerSeesCompletedParent.set(parent.isCompleted)
151+
completionHandlerSeesCancelledParent.set(parent.isCancelled)
152+
entered.trySend(Unit)
153+
}
154+
}
155+
}
156+
if (invokeImmediately || handlerInstallJob.getCompleted() !== NonDisposableHandle) {
157+
withTimeout(1.seconds) {
158+
entered.receive()
159+
}
160+
try {
161+
validate()
162+
} catch (e: Throwable) {
163+
println("Iteration $it failed")
164+
println("invokeOnCompletion returned ${handlerInstallJob.getCompleted()}")
165+
throw e
166+
}
167+
} else {
168+
assertTrue(entered.isEmpty)
169+
}
170+
}
171+
}
172+
}
173+
174+
/**
175+
* Creates a [CompletableDeferred], optionally adding completion handlers and/or other children to the job depending
176+
* on [iteration].
177+
* The purpose is to test not just attaching completion handlers to empty or one-element lists (see the [JobSupport]
178+
* implementation for details on what this means), but also to lists with multiple elements.
179+
*/
180+
fun createCompletableDeferredForTesting(iteration: Int): CompletableDeferred<Unit> {
181+
val parent = CompletableDeferred<Unit>()
182+
/* We optionally add completion handlers and/or other children to the parent job
183+
to test the scenarios where a child is placed into an empty list, a single-element list,
184+
or a list with multiple elements. */
185+
if (iteration.mod(2) == 0) {
186+
parent.invokeOnCompletion { }
187+
}
188+
if (iteration.mod(3) == 0) {
189+
GlobalScope.launch(parent) { }
190+
}
191+
return parent
192+
}

0 commit comments

Comments
 (0)