Skip to content

Release reusability token when suspendCancellableCoroutineReusable's … #3634

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions kotlinx-coroutines-core/common/src/CancellableContinuation.kt
Original file line number Diff line number Diff line change
Expand Up @@ -331,12 +331,19 @@ internal suspend inline fun <T> suspendCancellableCoroutineReusable(
crossinline block: (CancellableContinuation<T>) -> Unit
): T = suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted())
block(cancellable)
try {
block(cancellable)
} catch (e: Throwable) {
// Here we catch any unexpected exception from user-supplied block (e.g. invariant violation)
// and release claimed continuation in order to leave it in a reasonable state (see #3613)
cancellable.releaseClaimedReusableContinuation()
throw e
}
cancellable.getResult()
}

internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>): CancellableContinuationImpl<T> {
// If used outside of our dispatcher
// If used outside our dispatcher
if (delegate !is DispatchedContinuation<T>) {
return CancellableContinuationImpl(delegate, MODE_CANCELLABLE)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,8 @@ internal open class CancellableContinuationImpl<in T>(
* Tries to release reusable continuation. It can fail is there was an asynchronous cancellation,
* in which case it detaches from the parent and cancels this continuation.
*/
private fun releaseClaimedReusableContinuation() {
// Cannot be casted if e.g. invoked from `installParentHandleReusable` for context without dispatchers, but with Job in it
internal fun releaseClaimedReusableContinuation() {
// Cannot be cast if e.g. invoked from `installParentHandleReusable` for context without dispatchers, but with Job in it
val cancellationCause = (delegate as? DispatchedContinuation<*>)?.tryReleaseClaimedContinuation(this) ?: return
detachChild()
cancel(cancellationCause)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,8 @@ internal open class BufferedChannel<E>(
* In case of coroutine cancellation, the element may be undelivered --
* the [onUndeliveredElement] feature is unsupported in this implementation.
*
* Note that this implementation always invokes [suspendCancellableCoroutineReusable],
* as we do not care about broadcasts performance -- they are already deprecated.
*/
internal open suspend fun sendBroadcast(element: E): Boolean = suspendCancellableCoroutineReusable { cont ->
internal open suspend fun sendBroadcast(element: E): Boolean = suspendCancellableCoroutine { cont ->
check(onUndeliveredElement == null) {
"the `onUndeliveredElement` feature is unsupported for `sendBroadcast(e)`"
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import org.junit.Test
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.*

// Stresses scenario from #3613
class ReusableCancellableContinuationInvariantStressTest : TestBase() {

// Tests have a timeout 10 sec because the bug they catch leads to an infinite spin-loop

@Test(timeout = 10_000)
fun testExceptionFromSuspendReusable() = doTest { /* nothing */ }


@Test(timeout = 10_000)
fun testExceptionFromCancelledSuspendReusable() = doTest { it.cancel() }


@Suppress("SuspendFunctionOnCoroutineScope")
private inline fun doTest(crossinline block: (Job) -> Unit) {
runTest {
repeat(10_000) {
val latch = CountDownLatch(1)
val continuationToResume = AtomicReference<Continuation<Unit>?>(null)
val j1 = launch(Dispatchers.Default) {
latch.await()
suspendCancellableCoroutineReusable {
continuationToResume.set(it)
block(coroutineContext.job)
throw CancellationException() // Don't let getResult() chance to execute
}
}

val j2 = launch(Dispatchers.Default) {
latch.await()
while (continuationToResume.get() == null) {
// spin
}
continuationToResume.get()!!.resume(Unit)
}

latch.countDown()
joinAll(j1, j2)
}
}
}
}