From bc658d75e4ad678fccf48344dfc28749512b8bf0 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 21 Feb 2023 18:10:45 +0100 Subject: [PATCH 1/3] Release reusability token when suspendCancellableCoroutineReusable's block throws an exception Otherwise, continuation instance is left in REUSABLE_CLAIMED state that asynchronous resumer awaits in an infinite spin-loop, potentially causing deadlock with 100% CPU consumption. Originally, the bug was reproduced on old (pre-#3020) implementation where this very pattern was encountered: it was possible to fail owner's invariant check right in the supplied 'block'. This is no longer the case, so the situation is emulated manually (but still is possible in production environments, e.g. when OOM is thrown). Also, suspendCancellableCoroutineReusable is removed from obsolete BroadcastChannel implementation. Fixes #3613 --- .../common/src/CancellableContinuation.kt | 11 +++- .../common/src/CancellableContinuationImpl.kt | 2 +- .../common/src/channels/BufferedChannel.kt | 2 +- ...cellableContinuationInvariantStressTest.kt | 53 +++++++++++++++++++ 4 files changed, 64 insertions(+), 4 deletions(-) create mode 100644 kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationInvariantStressTest.kt diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt index 5e8d7f9102..daabce10ad 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt @@ -331,12 +331,19 @@ internal suspend inline fun suspendCancellableCoroutineReusable( crossinline block: (CancellableContinuation) -> Unit ): T = suspendCoroutineUninterceptedOrReturn { uCont -> val cancellable = getOrCreateCancellableContinuation(uCont.intercepted()) - block(cancellable) + try { + block(cancellable) + } catch (e: Throwable) { + // Here we catch any unexpected exception from user-supplied block (e.g. invariant violation) + // and release claimed continuation in order to leave it in a reasonable state (see #3613) + cancellable.releaseClaimedReusableContinuation() + throw e + } cancellable.getResult() } internal fun getOrCreateCancellableContinuation(delegate: Continuation): CancellableContinuationImpl { - // If used outside of our dispatcher + // If used outside our dispatcher if (delegate !is DispatchedContinuation) { return CancellableContinuationImpl(delegate, MODE_CANCELLABLE) } diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt index 423cb05d18..06dfcd512b 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt @@ -337,7 +337,7 @@ internal open class CancellableContinuationImpl( * Tries to release reusable continuation. It can fail is there was an asynchronous cancellation, * in which case it detaches from the parent and cancels this continuation. */ - private fun releaseClaimedReusableContinuation() { + fun releaseClaimedReusableContinuation() { // Cannot be casted if e.g. invoked from `installParentHandleReusable` for context without dispatchers, but with Job in it val cancellationCause = (delegate as? DispatchedContinuation<*>)?.tryReleaseClaimedContinuation(this) ?: return detachChild() diff --git a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt index e30486fa8c..7e8d48c69a 100644 --- a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt @@ -248,7 +248,7 @@ internal open class BufferedChannel( * Note that this implementation always invokes [suspendCancellableCoroutineReusable], * as we do not care about broadcasts performance -- they are already deprecated. */ - internal open suspend fun sendBroadcast(element: E): Boolean = suspendCancellableCoroutineReusable { cont -> + internal open suspend fun sendBroadcast(element: E): Boolean = suspendCancellableCoroutine { cont -> check(onUndeliveredElement == null) { "the `onUndeliveredElement` feature is unsupported for `sendBroadcast(e)`" } diff --git a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationInvariantStressTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationInvariantStressTest.kt new file mode 100644 index 0000000000..4d8116c982 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationInvariantStressTest.kt @@ -0,0 +1,53 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import org.junit.Test +import java.util.concurrent.CountDownLatch +import java.util.concurrent.atomic.AtomicReference +import kotlin.coroutines.* + +// Stresses scenario from #3613 +class ReusableCancellableContinuationInvariantStressTest : TestBase() { + + // Tests have a timeout 10 sec because the bug they catch leads to an infinite spin-loop + + @Test(timeout = 10_000) + fun testExceptionFromSuspendReusable() = doTest { /* nothing */ } + + + @Test(timeout = 10_000) + fun testExceptionFromCancelledSuspendReusable() = doTest { it.cancel() } + + + @Suppress("SuspendFunctionOnCoroutineScope") + private inline fun doTest(crossinline block: (Job) -> Unit) { + runTest { + repeat(10_000) { + val latch = CountDownLatch(1) + val continuationToResume = AtomicReference?>(null) + val j1 = launch(Dispatchers.Default) { + latch.await() + suspendCancellableCoroutineReusable { + continuationToResume.set(it) + block(coroutineContext.job) + throw CancellationException() // Don't let getResult() chance to execute + } + } + + val j2 = launch(Dispatchers.Default) { + latch.await() + while (continuationToResume.get() == null) { + // spin + } + continuationToResume.get()!!.resume(Unit) + } + + latch.countDown() + joinAll(j1, j2) + } + } + } +} From c2dd7b8d982c3cd110555097d04954804974d030 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 21 Feb 2023 18:37:34 +0100 Subject: [PATCH 2/3] ~make releaseClaimedReusableContinuation internal in order to reduce public ABI --- .../common/src/CancellableContinuationImpl.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt index 06dfcd512b..e59c6f6426 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt @@ -337,8 +337,8 @@ internal open class CancellableContinuationImpl( * Tries to release reusable continuation. It can fail is there was an asynchronous cancellation, * in which case it detaches from the parent and cancels this continuation. */ - fun releaseClaimedReusableContinuation() { - // Cannot be casted if e.g. invoked from `installParentHandleReusable` for context without dispatchers, but with Job in it + internal fun releaseClaimedReusableContinuation() { + // Cannot be cast if e.g. invoked from `installParentHandleReusable` for context without dispatchers, but with Job in it val cancellationCause = (delegate as? DispatchedContinuation<*>)?.tryReleaseClaimedContinuation(this) ?: return detachChild() cancel(cancellationCause) From f4df9fd36807d8b4fa5cd08cfbc87d784868fc48 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Fri, 3 Mar 2023 18:23:09 +0300 Subject: [PATCH 3/3] Update kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt Co-authored-by: Dmitry Khalanskiy <52952525+dkhalanskyjb@users.noreply.github.com> --- kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt | 2 -- 1 file changed, 2 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt index 7e8d48c69a..2f92338856 100644 --- a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt @@ -245,8 +245,6 @@ internal open class BufferedChannel( * In case of coroutine cancellation, the element may be undelivered -- * the [onUndeliveredElement] feature is unsupported in this implementation. * - * Note that this implementation always invokes [suspendCancellableCoroutineReusable], - * as we do not care about broadcasts performance -- they are already deprecated. */ internal open suspend fun sendBroadcast(element: E): Boolean = suspendCancellableCoroutine { cont -> check(onUndeliveredElement == null) {