Skip to content

Commit e1d15b3

Browse files
committed
Remove atomic cancellation support
This is a problematic for Android when Main dispatcher is cancelled on destroyed activity. Atomic nature of channels is designed to prevent loss of elements, which is really not an issue for a typical application, but creates problem when used with channels. * Internal suspendAtomicCancellableCoroutine -> suspendCancellableCoroutine * Internal suspendAtomicCancellableCoroutineReusable -> suspendCancellableCoroutineReusable * Remove atomic cancellation from docs * Ensures that flowOn does not resume downstream after cancellation. * MODE_ATOMIC_DEFAULT renamed into MODE_ATOMIC * Introduced MODE_CANCELLABLE_REUSABLE to track suspendCancellableCoroutineReusable * Better documentation for MODE_XXX constants. * Added stress test for proper handling of MODE_CANCELLABLE_REUSABLE and fixed test for #1123 bug with job.join (working in MODE_CANCELLABLE) that was not properly failing in the absence of the proper code in CancellableContinuationImpl.getResult * Added test for Flow.combine that should be fixed * Support extended invokeOnCancellation contract * Introduced internal tryResumeAtomic Fixes #1265 Fixes #1813 Fixes #1915
1 parent e0dc7f4 commit e1d15b3

31 files changed

+492
-374
lines changed

benchmarks/src/jmh/kotlin/benchmarks/tailcall/SimpleChannel.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,12 @@ class NonCancellableChannel : SimpleChannel() {
7070
}
7171

7272
class CancellableChannel : SimpleChannel() {
73-
override suspend fun suspendReceive(): Int = suspendAtomicCancellableCoroutine {
73+
override suspend fun suspendReceive(): Int = suspendCancellableCoroutine {
7474
consumer = it.intercepted()
7575
COROUTINE_SUSPENDED
7676
}
7777

78-
override suspend fun suspendSend(element: Int) = suspendAtomicCancellableCoroutine<Unit> {
78+
override suspend fun suspendSend(element: Int) = suspendCancellableCoroutine<Unit> {
7979
enqueuedValue = element
8080
producer = it.intercepted()
8181
COROUTINE_SUSPENDED
@@ -84,13 +84,13 @@ class CancellableChannel : SimpleChannel() {
8484

8585
class CancellableReusableChannel : SimpleChannel() {
8686
@Suppress("INVISIBLE_MEMBER")
87-
override suspend fun suspendReceive(): Int = suspendAtomicCancellableCoroutineReusable {
87+
override suspend fun suspendReceive(): Int = suspendCancellableCoroutineReusable {
8888
consumer = it.intercepted()
8989
COROUTINE_SUSPENDED
9090
}
9191

9292
@Suppress("INVISIBLE_MEMBER")
93-
override suspend fun suspendSend(element: Int) = suspendAtomicCancellableCoroutineReusable<Unit> {
93+
override suspend fun suspendSend(element: Int) = suspendCancellableCoroutineReusable<Unit> {
9494
enqueuedValue = element
9595
producer = it.intercepted()
9696
COROUTINE_SUSPENDED

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

-3
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,6 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/
8080

8181
public final class kotlinx/coroutines/CancellableContinuationKt {
8282
public static final fun disposeOnCancellation (Lkotlinx/coroutines/CancellableContinuation;Lkotlinx/coroutines/DisposableHandle;)V
83-
public static final fun suspendAtomicCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
84-
public static final fun suspendAtomicCancellableCoroutine (ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
85-
public static synthetic fun suspendAtomicCancellableCoroutine$default (ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
8683
public static final fun suspendCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
8784
}
8885

kotlinx-coroutines-core/common/src/CancellableContinuation.kt

+27-44
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,14 @@ public interface CancellableContinuation<in T> : Continuation<T> {
7878
@InternalCoroutinesApi
7979
public fun tryResume(value: T, idempotent: Any? = null): Any?
8080

81+
/**
82+
* Same as [tryResume] but with [onCancellation] handler that called if and only if the value is not
83+
* delivered to the caller because of the dispatch in the process, so that atomicity delivery
84+
* guaranteed can be provided by having a cancellation fallback.
85+
*/
86+
@InternalCoroutinesApi
87+
public fun tryResumeAtomic(value: T, idempotent: Any?, onCancellation: (cause: Throwable) -> Unit): Any?
88+
8189
/**
8290
* Tries to resume this continuation with the specified [exception] and returns a non-null object token if successful,
8391
* or `null` otherwise (it was already resumed or cancelled). When a non-null object is returned,
@@ -118,8 +126,8 @@ public interface CancellableContinuation<in T> : Continuation<T> {
118126
public fun cancel(cause: Throwable? = null): Boolean
119127

120128
/**
121-
* Registers a [handler] to be **synchronously** invoked on cancellation (regular or exceptional) of this continuation.
122-
* When the continuation is already cancelled, the handler will be immediately invoked
129+
* Registers a [handler] to be **synchronously** invoked on [cancellation][cancel] (regular or exceptional) of this continuation.
130+
* When the continuation is already cancelled, the handler is immediately invoked
123131
* with the cancellation exception. Otherwise, the handler will be invoked as soon as this
124132
* continuation is cancelled.
125133
*
@@ -128,7 +136,12 @@ public interface CancellableContinuation<in T> : Continuation<T> {
128136
* processed as an uncaught exception in the context of the current coroutine
129137
* (see [CoroutineExceptionHandler]).
130138
*
131-
* At most one [handler] can be installed on a continuation.
139+
* At most one [handler] can be installed on a continuation. Attempt to call `invokeOnCancellation` second
140+
* time produces [IllegalStateException].
141+
*
142+
* This handler is also called when this continuation [resumes][resume] normally (with a value) and then
143+
* is cancelled while waiting to be dispatched. More generally speaking, this handler is called whenever
144+
* the caller of [suspendCancellableCoroutine] is getting a [CancellationException].
132145
*
133146
* **Note**: Implementation of `CompletionHandler` must be fast, non-blocking, and thread-safe.
134147
* This `handler` can be invoked concurrently with the surrounding code.
@@ -204,40 +217,24 @@ public suspend inline fun <T> suspendCancellableCoroutine(
204217
}
205218

206219
/**
207-
* Suspends the coroutine like [suspendCancellableCoroutine], but with *atomic cancellation*.
208-
*
209-
* When the suspended function throws a [CancellationException], it means that the continuation was not resumed.
210-
* As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
211-
* continue to execute even after it was cancelled from the same thread in the case when the continuation
212-
* was already resumed and was posted for execution to the thread's queue.
213-
*
214-
* @suppress **This an internal API and should not be used from general code.**
215-
*/
216-
@InternalCoroutinesApi
217-
public suspend inline fun <T> suspendAtomicCancellableCoroutine(
218-
crossinline block: (CancellableContinuation<T>) -> Unit
219-
): T =
220-
suspendCoroutineUninterceptedOrReturn { uCont ->
221-
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_ATOMIC_DEFAULT)
222-
block(cancellable)
223-
cancellable.getResult()
224-
}
225-
226-
/**
227-
* Suspends coroutine similar to [suspendAtomicCancellableCoroutine], but an instance of [CancellableContinuationImpl] is reused if possible.
220+
* Suspends the coroutine similar to [suspendCancellableCoroutine], but an instance of
221+
* [CancellableContinuationImpl] is reused.
228222
*/
229-
internal suspend inline fun <T> suspendAtomicCancellableCoroutineReusable(
223+
internal suspend inline fun <T> suspendCancellableCoroutineReusable(
230224
crossinline block: (CancellableContinuation<T>) -> Unit
231225
): T = suspendCoroutineUninterceptedOrReturn { uCont ->
232-
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted())
226+
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted(), resumeMode = MODE_CANCELLABLE_REUSABLE)
233227
block(cancellable)
234228
cancellable.getResult()
235229
}
236230

237-
internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>): CancellableContinuationImpl<T> {
231+
internal fun <T> getOrCreateCancellableContinuation(
232+
delegate: Continuation<T>, resumeMode: Int
233+
): CancellableContinuationImpl<T> {
234+
assert { resumeMode.isReusableMode }
238235
// If used outside of our dispatcher
239236
if (delegate !is DispatchedContinuation<T>) {
240-
return CancellableContinuationImpl(delegate, resumeMode = MODE_ATOMIC_DEFAULT)
237+
return CancellableContinuationImpl(delegate, resumeMode)
241238
}
242239
/*
243240
* Attempt to claim reusable instance.
@@ -253,24 +250,10 @@ internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>):
253250
* thus leaking CC instance for indefinite time.
254251
* 2) Continuation was cancelled. Then we should prevent any further reuse and bail out.
255252
*/
256-
return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetState() }
257-
?: return CancellableContinuationImpl(delegate, MODE_ATOMIC_DEFAULT)
253+
return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetState(resumeMode) }
254+
?: return CancellableContinuationImpl(delegate, resumeMode)
258255
}
259256

260-
/**
261-
* @suppress **Deprecated**
262-
*/
263-
@Deprecated(
264-
message = "holdCancellability parameter is deprecated and is no longer used",
265-
replaceWith = ReplaceWith("suspendAtomicCancellableCoroutine(block)")
266-
)
267-
@InternalCoroutinesApi
268-
public suspend inline fun <T> suspendAtomicCancellableCoroutine(
269-
holdCancellability: Boolean = false,
270-
crossinline block: (CancellableContinuation<T>) -> Unit
271-
): T =
272-
suspendAtomicCancellableCoroutine(block)
273-
274257
/**
275258
* Removes the specified [node] on cancellation.
276259
*/

0 commit comments

Comments
 (0)