Skip to content

Commit 2740aa5

Browse files
committed
Fix a race in Job.join that sporadically results in normal completion
The race happens in the slow-path of 'join' implementation when parent invokes join on a child coroutines that crashes and cancels the parent. Fixes #1123
1 parent c961fb6 commit 2740aa5

File tree

3 files changed

+113
-0
lines changed

3 files changed

+113
-0
lines changed

kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt

+12
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,18 @@ internal open class CancellableContinuationImpl<in T>(
170170
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
171171
val state = this.state
172172
if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this)
173+
// if the parent job was already cancelled, then throw the corresponding cancellation exception
174+
// otherwise, there is a race is suspendCancellableCoroutine { cont -> ... } does cont.resume(...)
175+
// before the block returns. This getResult would return a result as opposed to cancellation
176+
// exception that should have happened if the continuation is dispatched for execution later.
177+
if (resumeMode == MODE_CANCELLABLE) {
178+
val job = context[Job]
179+
if (job != null && !job.isActive) {
180+
val cause = job.getCancellationException()
181+
cancelResult(state, cause)
182+
throw cause
183+
}
184+
}
173185
return getSuccessfulResult(state)
174186
}
175187

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlinx.atomicfu.*
8+
import org.junit.*
9+
import java.util.concurrent.*
10+
import kotlin.test.*
11+
import kotlin.test.Test
12+
13+
class CancellableContinuationResumeCloseStressTest : TestBase() {
14+
private val dispatcher =
15+
newFixedThreadPoolContext(2, "CancellableContinuationResumeCloseStressTest")
16+
private val startBarrier = CyclicBarrier(3)
17+
private val doneBarrier = CyclicBarrier(2)
18+
private val nRepeats = 1_000 * stressTestMultiplier
19+
20+
private val closed = atomic(false)
21+
private var returnedOk = false
22+
23+
@After
24+
fun tearDown() {
25+
dispatcher.close()
26+
}
27+
28+
@Test
29+
@Suppress("BlockingMethodInNonBlockingContext")
30+
fun testStress() = runTest {
31+
repeat(nRepeats) {
32+
closed.value = false
33+
returnedOk = false
34+
val job = testJob()
35+
startBarrier.await()
36+
job.cancel() // (1) cancel job
37+
job.join()
38+
// check consistency
39+
doneBarrier.await()
40+
if (returnedOk) {
41+
assertFalse(closed.value, "should not have closed resource -- returned Ok")
42+
} else {
43+
assertTrue(closed.value, "should have closed resource -- was cancelled")
44+
}
45+
}
46+
}
47+
48+
private fun CoroutineScope.testJob(): Job = launch(dispatcher, start = CoroutineStart.ATOMIC) {
49+
val ok = resumeClose() // might be cancelled
50+
assertEquals("OK", ok)
51+
returnedOk = true
52+
}
53+
54+
private suspend fun resumeClose() = suspendCancellableCoroutine<String> { cont ->
55+
dispatcher.executor.execute {
56+
startBarrier.await() // (2) resume at the same time
57+
cont.resume("OK") {
58+
close()
59+
}
60+
doneBarrier.await()
61+
}
62+
startBarrier.await() // (3) return at the same time
63+
}
64+
65+
fun close() {
66+
assertFalse(closed.getAndSet(true))
67+
}
68+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import org.junit.*
8+
9+
/**
10+
* Test a race between job failure and join.
11+
*
12+
* See [#1123](https://github.com/Kotlin/kotlinx.coroutines/issues/1123).
13+
*/
14+
class JobStructuredJoinStressTest : TestBase() {
15+
private val nRepeats = 1_000 * stressTestMultiplier
16+
17+
@Test
18+
fun testStress() {
19+
repeat(nRepeats) {
20+
assertFailsWith<TestException> {
21+
runBlocking {
22+
// launch in background
23+
val job = launch(Dispatchers.Default) {
24+
throw TestException("OK") // crash
25+
}
26+
assertFailsWith<CancellationException> {
27+
job.join()
28+
}
29+
}
30+
}
31+
}
32+
}
33+
}

0 commit comments

Comments
 (0)