From f1bfe148f6f99649b086776553fa82e0b8861f94 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Sun, 21 Apr 2019 12:06:09 +0300 Subject: [PATCH 1/2] Fix a race in Job.join that sporadically results in normal completion The race happens in the slow-path of 'join' implementation when parent invokes join on a child coroutines that crashes and cancels the parent. Fixes #1123 --- .../common/src/CancellableContinuationImpl.kt | 12 ++++ ...llableContinuationResumeCloseStressTest.kt | 68 +++++++++++++++++++ .../jvm/test/JobStructuredJoinStressTest.kt | 33 +++++++++ 3 files changed, 113 insertions(+) create mode 100644 kotlinx-coroutines-core/jvm/test/CancellableContinuationResumeCloseStressTest.kt create mode 100644 kotlinx-coroutines-core/jvm/test/JobStructuredJoinStressTest.kt diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt index 1686330c5a..530284944e 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt @@ -170,6 +170,18 @@ internal open class CancellableContinuationImpl( // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state val state = this.state if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this) + // if the parent job was already cancelled, then throw the corresponding cancellation exception + // otherwise, there is a race is suspendCancellableCoroutine { cont -> ... } does cont.resume(...) + // before the block returns. This getResult would return a result as opposed to cancellation + // exception that should have happened if the continuation is dispatched for execution later. + if (resumeMode == MODE_CANCELLABLE) { + val job = context[Job] + if (job != null && !job.isActive) { + val cause = job.getCancellationException() + cancelResult(state, cause) + throw cause + } + } return getSuccessfulResult(state) } diff --git a/kotlinx-coroutines-core/jvm/test/CancellableContinuationResumeCloseStressTest.kt b/kotlinx-coroutines-core/jvm/test/CancellableContinuationResumeCloseStressTest.kt new file mode 100644 index 0000000000..7255b4ab2c --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/CancellableContinuationResumeCloseStressTest.kt @@ -0,0 +1,68 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import kotlinx.atomicfu.* +import org.junit.* +import java.util.concurrent.* +import kotlin.test.* +import kotlin.test.Test + +class CancellableContinuationResumeCloseStressTest : TestBase() { + private val dispatcher = + newFixedThreadPoolContext(2, "CancellableContinuationResumeCloseStressTest") + private val startBarrier = CyclicBarrier(3) + private val doneBarrier = CyclicBarrier(2) + private val nRepeats = 1_000 * stressTestMultiplier + + private val closed = atomic(false) + private var returnedOk = false + + @After + fun tearDown() { + dispatcher.close() + } + + @Test + @Suppress("BlockingMethodInNonBlockingContext") + fun testStress() = runTest { + repeat(nRepeats) { + closed.value = false + returnedOk = false + val job = testJob() + startBarrier.await() + job.cancel() // (1) cancel job + job.join() + // check consistency + doneBarrier.await() + if (returnedOk) { + assertFalse(closed.value, "should not have closed resource -- returned Ok") + } else { + assertTrue(closed.value, "should have closed resource -- was cancelled") + } + } + } + + private fun CoroutineScope.testJob(): Job = launch(dispatcher, start = CoroutineStart.ATOMIC) { + val ok = resumeClose() // might be cancelled + assertEquals("OK", ok) + returnedOk = true + } + + private suspend fun resumeClose() = suspendCancellableCoroutine { cont -> + dispatcher.executor.execute { + startBarrier.await() // (2) resume at the same time + cont.resume("OK") { + close() + } + doneBarrier.await() + } + startBarrier.await() // (3) return at the same time + } + + fun close() { + assertFalse(closed.getAndSet(true)) + } +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/JobStructuredJoinStressTest.kt b/kotlinx-coroutines-core/jvm/test/JobStructuredJoinStressTest.kt new file mode 100644 index 0000000000..ec3635ca36 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/JobStructuredJoinStressTest.kt @@ -0,0 +1,33 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import org.junit.* + +/** + * Test a race between job failure and join. + * + * See [#1123](https://github.com/Kotlin/kotlinx.coroutines/issues/1123). + */ +class JobStructuredJoinStressTest : TestBase() { + private val nRepeats = 1_000 * stressTestMultiplier + + @Test + fun testStress() { + repeat(nRepeats) { + assertFailsWith { + runBlocking { + // launch in background + val job = launch(Dispatchers.Default) { + throw TestException("OK") // crash + } + assertFailsWith { + job.join() + } + } + } + } + } +} \ No newline at end of file From 01652510f69e4fd7ee81c308aa9702893db51ad1 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Wed, 24 Apr 2019 10:54:51 +0300 Subject: [PATCH 2/2] Recover stacktrace of CancellationException from suspendCancellable --- .../common/src/CancellableContinuationImpl.kt | 2 +- kotlinx-coroutines-core/common/src/Dispatched.kt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt index 530284944e..a1a9097a2f 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt @@ -179,7 +179,7 @@ internal open class CancellableContinuationImpl( if (job != null && !job.isActive) { val cause = job.getCancellationException() cancelResult(state, cause) - throw cause + throw recoverStackTrace(cause, this) } } return getSuccessfulResult(state) diff --git a/kotlinx-coroutines-core/common/src/Dispatched.kt b/kotlinx-coroutines-core/common/src/Dispatched.kt index ffccf21368..b767cc135d 100644 --- a/kotlinx-coroutines-core/common/src/Dispatched.kt +++ b/kotlinx-coroutines-core/common/src/Dispatched.kt @@ -229,7 +229,7 @@ internal abstract class DispatchedTask( if (job != null && !job.isActive) { val cause = job.getCancellationException() cancelResult(state, cause) - continuation.resumeWithException(cause) + continuation.resumeWithStackTrace(cause) } else { val exception = getExceptionalResult(state) if (exception != null)