diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt index 5e8d7f9102..daabce10ad 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt @@ -331,12 +331,19 @@ internal suspend inline fun suspendCancellableCoroutineReusable( crossinline block: (CancellableContinuation) -> Unit ): T = suspendCoroutineUninterceptedOrReturn { uCont -> val cancellable = getOrCreateCancellableContinuation(uCont.intercepted()) - block(cancellable) + try { + block(cancellable) + } catch (e: Throwable) { + // Here we catch any unexpected exception from user-supplied block (e.g. invariant violation) + // and release claimed continuation in order to leave it in a reasonable state (see #3613) + cancellable.releaseClaimedReusableContinuation() + throw e + } cancellable.getResult() } internal fun getOrCreateCancellableContinuation(delegate: Continuation): CancellableContinuationImpl { - // If used outside of our dispatcher + // If used outside our dispatcher if (delegate !is DispatchedContinuation) { return CancellableContinuationImpl(delegate, MODE_CANCELLABLE) } diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt index 423cb05d18..e59c6f6426 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt @@ -337,8 +337,8 @@ internal open class CancellableContinuationImpl( * Tries to release reusable continuation. It can fail is there was an asynchronous cancellation, * in which case it detaches from the parent and cancels this continuation. */ - private fun releaseClaimedReusableContinuation() { - // Cannot be casted if e.g. invoked from `installParentHandleReusable` for context without dispatchers, but with Job in it + internal fun releaseClaimedReusableContinuation() { + // Cannot be cast if e.g. invoked from `installParentHandleReusable` for context without dispatchers, but with Job in it val cancellationCause = (delegate as? DispatchedContinuation<*>)?.tryReleaseClaimedContinuation(this) ?: return detachChild() cancel(cancellationCause) diff --git a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt index e30486fa8c..2f92338856 100644 --- a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt @@ -245,10 +245,8 @@ internal open class BufferedChannel( * In case of coroutine cancellation, the element may be undelivered -- * the [onUndeliveredElement] feature is unsupported in this implementation. * - * Note that this implementation always invokes [suspendCancellableCoroutineReusable], - * as we do not care about broadcasts performance -- they are already deprecated. */ - internal open suspend fun sendBroadcast(element: E): Boolean = suspendCancellableCoroutineReusable { cont -> + internal open suspend fun sendBroadcast(element: E): Boolean = suspendCancellableCoroutine { cont -> check(onUndeliveredElement == null) { "the `onUndeliveredElement` feature is unsupported for `sendBroadcast(e)`" } diff --git a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationInvariantStressTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationInvariantStressTest.kt new file mode 100644 index 0000000000..4d8116c982 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationInvariantStressTest.kt @@ -0,0 +1,53 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import org.junit.Test +import java.util.concurrent.CountDownLatch +import java.util.concurrent.atomic.AtomicReference +import kotlin.coroutines.* + +// Stresses scenario from #3613 +class ReusableCancellableContinuationInvariantStressTest : TestBase() { + + // Tests have a timeout 10 sec because the bug they catch leads to an infinite spin-loop + + @Test(timeout = 10_000) + fun testExceptionFromSuspendReusable() = doTest { /* nothing */ } + + + @Test(timeout = 10_000) + fun testExceptionFromCancelledSuspendReusable() = doTest { it.cancel() } + + + @Suppress("SuspendFunctionOnCoroutineScope") + private inline fun doTest(crossinline block: (Job) -> Unit) { + runTest { + repeat(10_000) { + val latch = CountDownLatch(1) + val continuationToResume = AtomicReference?>(null) + val j1 = launch(Dispatchers.Default) { + latch.await() + suspendCancellableCoroutineReusable { + continuationToResume.set(it) + block(coroutineContext.job) + throw CancellationException() // Don't let getResult() chance to execute + } + } + + val j2 = launch(Dispatchers.Default) { + latch.await() + while (continuationToResume.get() == null) { + // spin + } + continuationToResume.get()!!.resume(Unit) + } + + latch.countDown() + joinAll(j1, j2) + } + } + } +}