Skip to content

Commit fa85428

Browse files
committed
Fix a race in Job.join that sporadically results in normal completion
The race happens in the slow-path of 'join' implementation when parent invokes join on a child coroutines that crashes and cancels the parent. Fixes #1123
1 parent 9b6e311 commit fa85428

File tree

3 files changed

+108
-0
lines changed

3 files changed

+108
-0
lines changed

kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt

+12
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,18 @@ internal open class CancellableContinuationImpl<in T>(
170170
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
171171
val state = this.state
172172
if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this)
173+
// if the parent job was already cancelled, then throw the corresponding cancellation exception
174+
// otherwise, there is a race is suspendCancellableCoroutine { cont -> ... } does cont.resume(...)
175+
// before the block returns. This getResult would return a result as opposed to cancellation
176+
// exception that should have happened if the continuation is dispatched for execution later.
177+
if (resumeMode == MODE_CANCELLABLE) {
178+
val job = context[Job]
179+
if (job != null && !job.isActive) {
180+
val cause = job.getCancellationException()
181+
cancelResult(state, cause)
182+
throw cause
183+
}
184+
}
173185
return getSuccessfulResult(state)
174186
}
175187

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlinx.atomicfu.*
8+
import org.junit.*
9+
import java.util.concurrent.*
10+
import kotlin.test.*
11+
import kotlin.test.Test
12+
13+
class CancellableContinuationResumeCloseStressTest : TestBase() {
14+
private val dispatcher =
15+
newFixedThreadPoolContext(2, "CancellableContinuationResumeCloseStressTest")
16+
private val barrier = CyclicBarrier(3)
17+
private val nRepeats = 1_000 * stressTestMultiplier
18+
19+
private val closed = atomic(false)
20+
21+
@After
22+
fun tearDown() {
23+
dispatcher.close()
24+
}
25+
26+
@Test
27+
fun testStress() = runTest {
28+
repeat(nRepeats) {
29+
closed.value = false
30+
val job = launch(dispatcher) {
31+
try {
32+
val ok = resumeClose()
33+
assertEquals("OK", ok)
34+
assertFalse(closed.value)
35+
} catch (e: CancellationException) {
36+
assertTrue(closed.value)
37+
}
38+
}
39+
@Suppress("BlockingMethodInNonBlockingContext")
40+
barrier.await()
41+
job.cancel() // (1) cancel job
42+
job.join()
43+
}
44+
}
45+
46+
private suspend fun resumeClose() = suspendCancellableCoroutine<String> { cont ->
47+
dispatcher.executor.execute {
48+
barrier.await() // (2) resume at the same time
49+
cont.resume("OK") {
50+
close()
51+
}
52+
}
53+
barrier.await() // (3) return at the same time
54+
}
55+
56+
fun close() {
57+
assertFalse(closed.getAndSet(true))
58+
}
59+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import org.junit.*
8+
9+
/**
10+
* Test a race between job failure and join.
11+
*
12+
* See [#1123](https://github.com/Kotlin/kotlinx.coroutines/issues/1123).
13+
*/
14+
class JobStructuredJoinStressTest : TestBase() {
15+
private val nRepeats = 1_000 * stressTestMultiplier
16+
17+
@Test
18+
fun testStress() {
19+
repeat(nRepeats) {
20+
assertFailsWith<TestException> {
21+
runBlocking {
22+
// launch in background
23+
val job = launch(Dispatchers.Default) {
24+
throw TestException("OK") // crash
25+
}
26+
try {
27+
job.join()
28+
} catch (e: CancellationException) {
29+
// expected -- proceed
30+
throw e
31+
}
32+
error("job.join returned normally, but it should not have")
33+
}
34+
}
35+
}
36+
}
37+
}

0 commit comments

Comments
 (0)