Skip to content

Commit 0b5c4d4

Browse files
elizarovqwwdfsad
authored andcommitted
~ Rollback manual continuation dispose on suspend block exception
1 parent 33e4ef0 commit 0b5c4d4

File tree

10 files changed

+34
-197
lines changed

10 files changed

+34
-197
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+2-5
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,10 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/
8080

8181
public final class kotlinx/coroutines/CancellableContinuationKt {
8282
public static final fun disposeOnCancellation (Lkotlinx/coroutines/CancellableContinuation;Lkotlinx/coroutines/DisposableHandle;)V
83+
public static final fun suspendAtomicCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
8384
public static final fun suspendAtomicCancellableCoroutine (ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
8485
public static synthetic fun suspendAtomicCancellableCoroutine$default (ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
86+
public static final fun suspendCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
8587
}
8688

8789
public abstract interface class kotlinx/coroutines/ChildHandle : kotlinx/coroutines/DisposableHandle {
@@ -501,11 +503,6 @@ public final class kotlinx/coroutines/SupervisorKt {
501503
public static final fun supervisorScope (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
502504
}
503505

504-
public final class kotlinx/coroutines/SuspendCancellableCoroutineKt {
505-
public static final fun suspendAtomicCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
506-
public static final fun suspendCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
507-
}
508-
509506
public abstract interface class kotlinx/coroutines/ThreadContextElement : kotlin/coroutines/CoroutineContext$Element {
510507
public abstract fun restoreThreadContext (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Object;)V
511508
public abstract fun updateThreadContext (Lkotlin/coroutines/CoroutineContext;)Ljava/lang/Object;

kotlinx-coroutines-core/common/src/Builders.common.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ internal class DispatchedCoroutine<in T>(
295295
fun getResult(): Any? {
296296
if (trySuspend()) return COROUTINE_SUSPENDED
297297
// When scope coroutine does not suspend on Kotlin/Native it shall dispose its continuation which it will not use
298-
disposeContinuation { uCont }
298+
// disposeContinuation { uCont }
299299
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
300300
val state = this.state.unboxState()
301301
if (state is CompletedExceptionally) throw state.cause

kotlinx-coroutines-core/common/src/CancellableContinuation.kt

+31-7
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package kotlinx.coroutines
66

77
import kotlinx.coroutines.internal.*
88
import kotlin.coroutines.*
9+
import kotlin.coroutines.intrinsics.*
910

1011
// --------------- cancellable continuations ---------------
1112

@@ -182,9 +183,20 @@ public interface CancellableContinuation<in T> : Continuation<T> {
182183
* Suspends the coroutine like [suspendCoroutine], but providing a [CancellableContinuation] to
183184
* the [block]. This function throws a [CancellationException] if the coroutine is cancelled or completed while suspended.
184185
*/
185-
public expect suspend inline fun <T> suspendCancellableCoroutine(
186+
public suspend inline fun <T> suspendCancellableCoroutine(
186187
crossinline block: (CancellableContinuation<T>) -> Unit
187-
): T
188+
): T =
189+
suspendCoroutineUninterceptedOrReturn { uCont ->
190+
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
191+
/*
192+
* For non-atomic cancellation we setup parent-child relationship immediately
193+
* in case when `block` blocks the current thread (e.g. Rx2 with trampoline scheduler), but
194+
* properly supports cancellation.
195+
*/
196+
cancellable.initCancellability()
197+
block(cancellable)
198+
cancellable.getResult()
199+
}
188200

189201
/**
190202
* Suspends the coroutine like [suspendCancellableCoroutine], but with *atomic cancellation*.
@@ -197,19 +209,31 @@ public expect suspend inline fun <T> suspendCancellableCoroutine(
197209
* @suppress **This an internal API and should not be used from general code.**
198210
*/
199211
@InternalCoroutinesApi
200-
public expect suspend inline fun <T> suspendAtomicCancellableCoroutine(
212+
public suspend inline fun <T> suspendAtomicCancellableCoroutine(
201213
crossinline block: (CancellableContinuation<T>) -> Unit
202-
): T
214+
): T =
215+
suspendCoroutineUninterceptedOrReturn { uCont ->
216+
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_ATOMIC_DEFAULT)
217+
block(cancellable)
218+
cancellable.getResult()
219+
}
203220

204221
/**
205222
* Suspends coroutine similar to [suspendAtomicCancellableCoroutine], but an instance of [CancellableContinuationImpl] is reused if possible.
206223
*/
207-
internal expect suspend inline fun <T> suspendAtomicCancellableCoroutineReusable(
224+
internal suspend inline fun <T> suspendAtomicCancellableCoroutineReusable(
208225
crossinline block: (CancellableContinuation<T>) -> Unit
209-
): T
226+
): T = suspendCoroutineUninterceptedOrReturn { uCont ->
227+
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted())
228+
block(cancellable)
229+
cancellable.getResult()
230+
}
210231

211232
internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>): CancellableContinuationImpl<T> {
212-
if (delegate !is DispatchedContinuation<T>) {
233+
// If used outside of our dispatcher
234+
// NOTE: Reuse is not support on Kotlin/Native due to platform peculiarities making it had to properly
235+
// split DispatchedContinuation / CancellableContinuationImpl state across workers.
236+
if (!isReuseSupportedInPlatform() || delegate !is DispatchedContinuation<T> ) {
213237
return CancellableContinuationImpl(delegate, resumeMode = MODE_ATOMIC_DEFAULT)
214238
}
215239
/*

kotlinx-coroutines-core/common/src/internal/Sharing.common.kt

-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ internal expect fun <T> Continuation<T>.asLocalOrNullIfNotUsed() : Continuation<
1818
internal expect fun <T> Continuation<T>.useLocal() : Continuation<T>
1919
internal expect fun <T> Continuation<T>.shareableInterceptedResumeCancellableWith(result: Result<T>)
2020
internal expect fun <T> Continuation<T>.shareableInterceptedResumeWith(result: Result<T>)
21-
internal expect fun <T> Continuation<T>.shareableDispose()
2221
internal expect fun disposeContinuation(cont: () -> Continuation<*>)
2322
internal expect fun <T> CancellableContinuationImpl<T>.shareableResume(delegate: Continuation<T>, useMode: Int)
2423

kotlinx-coroutines-core/js/src/SuspendCancellableCoroutine.kt

-59
This file was deleted.

kotlinx-coroutines-core/js/src/internal/Sharing.kt

-3
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,6 @@ internal actual inline fun <T> Continuation<T>.shareableInterceptedResumeWith(re
4545
intercepted().resumeWith(result)
4646
}
4747

48-
@Suppress("NOTHING_TO_INLINE") // Should be NOP
49-
internal actual inline fun <T> Continuation<T>.shareableDispose() {}
50-
5148
@Suppress("NOTHING_TO_INLINE") // Should be NOP
5249
internal actual inline fun disposeContinuation(cont: () -> Continuation<*>) {}
5350

kotlinx-coroutines-core/jvm/src/SuspendCancellableCoroutine.kt

-59
This file was deleted.

kotlinx-coroutines-core/jvm/src/internal/Sharing.kt

-4
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,6 @@ internal actual inline fun <T> Continuation<T>.shareableInterceptedResumeWith(re
5858
intercepted().resumeWith(result)
5959
}
6060

61-
@InlineOnly
62-
@Suppress("NOTHING_TO_INLINE") // Should be NOP
63-
internal actual inline fun <T> Continuation<T>.shareableDispose() {}
64-
6561
@InlineOnly
6662
@Suppress("NOTHING_TO_INLINE") // Should be NOP
6763
internal actual inline fun disposeContinuation(cont: () -> Continuation<*>) {}

kotlinx-coroutines-core/native/src/SuspendCancellableCoroutine.kt

-53
This file was deleted.

kotlinx-coroutines-core/native/src/internal/Sharing.kt

-5
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,6 @@ internal actual fun <T> Continuation<T>.shareableInterceptedResumeWith(result: R
7878
}
7979
}
8080

81-
internal actual inline fun <T> Continuation<T>.shareableDispose() {
82-
this as ShareableContinuation<T> // must have been shared
83-
disposeRef()
84-
}
85-
8681
internal actual fun <T, R> (suspend (T) -> R).asShareable(): suspend (T) -> R =
8782
ShareableBlock(this)
8883

0 commit comments

Comments
 (0)