Skip to content

Commit 63ebdda

Browse files
committed
Remove atomic cancellation support
This is a problematic for Android when Main dispatcher is cancelled on destroyed activity. Atomic nature of channels is designed to prevent loss of elements, which is really not an issue for a typical application, but creates problem when used with channels. * Internal suspendAtomicCancellableCoroutine -> suspendCancellableCoroutine * Internal suspendAtomicCancellableCoroutineReusable -> suspendCancellableCoroutineReusable * Remove atomic cancellation from docs * Ensures that flowOn does not resume downstream after cancellation. * MODE_ATOMIC_DEFAULT renamed into MODE_ATOMIC * Introduced MODE_CANCELLABLE_REUSABLE to track suspendCancellableCoroutineReusable * Better documentation for MODE_XXX constants. * Added stress test for proper handling of MODE_CANCELLABLE_REUSABLE and fixed test for #1123 bug with job.join (working in MODE_CANCELLABLE) that was not properly failing in the absence of the proper code in CancellableContinuationImpl.getResult * Added test for Flow.combine that should be fixed * Support extended invokeOnCancellation contract * Introduced internal tryResumeAtomic Fixes #1265 Fixes #1813 Fixes #1915
1 parent 38d7747 commit 63ebdda

31 files changed

+491
-371
lines changed

benchmarks/src/jmh/kotlin/benchmarks/tailcall/SimpleChannel.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,12 @@ class NonCancellableChannel : SimpleChannel() {
7070
}
7171

7272
class CancellableChannel : SimpleChannel() {
73-
override suspend fun suspendReceive(): Int = suspendAtomicCancellableCoroutine {
73+
override suspend fun suspendReceive(): Int = suspendCancellableCoroutine {
7474
consumer = it.intercepted()
7575
COROUTINE_SUSPENDED
7676
}
7777

78-
override suspend fun suspendSend(element: Int) = suspendAtomicCancellableCoroutine<Unit> {
78+
override suspend fun suspendSend(element: Int) = suspendCancellableCoroutine<Unit> {
7979
enqueuedValue = element
8080
producer = it.intercepted()
8181
COROUTINE_SUSPENDED
@@ -84,13 +84,13 @@ class CancellableChannel : SimpleChannel() {
8484

8585
class CancellableReusableChannel : SimpleChannel() {
8686
@Suppress("INVISIBLE_MEMBER")
87-
override suspend fun suspendReceive(): Int = suspendAtomicCancellableCoroutineReusable {
87+
override suspend fun suspendReceive(): Int = suspendCancellableCoroutineReusable {
8888
consumer = it.intercepted()
8989
COROUTINE_SUSPENDED
9090
}
9191

9292
@Suppress("INVISIBLE_MEMBER")
93-
override suspend fun suspendSend(element: Int) = suspendAtomicCancellableCoroutineReusable<Unit> {
93+
override suspend fun suspendSend(element: Int) = suspendCancellableCoroutineReusable<Unit> {
9494
enqueuedValue = element
9595
producer = it.intercepted()
9696
COROUTINE_SUSPENDED

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

-3
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,6 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/
8080

8181
public final class kotlinx/coroutines/CancellableContinuationKt {
8282
public static final fun disposeOnCancellation (Lkotlinx/coroutines/CancellableContinuation;Lkotlinx/coroutines/DisposableHandle;)V
83-
public static final fun suspendAtomicCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
84-
public static final fun suspendAtomicCancellableCoroutine (ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
85-
public static synthetic fun suspendAtomicCancellableCoroutine$default (ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
8683
public static final fun suspendCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
8784
}
8885

kotlinx-coroutines-core/common/src/CancellableContinuation.kt

+27-44
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,14 @@ public interface CancellableContinuation<in T> : Continuation<T> {
7878
@InternalCoroutinesApi
7979
public fun tryResume(value: T, idempotent: Any? = null): Any?
8080

81+
/**
82+
* Same as [tryResume] but with [onCancellation] handler that called if and only if the value is not
83+
* delivered to the caller because of the dispatch in the process, so that atomicity delivery
84+
* guaranteed can be provided by having a cancellation fallback.
85+
*/
86+
@InternalCoroutinesApi
87+
public fun tryResumeAtomic(value: T, idempotent: Any?, onCancellation: (cause: Throwable) -> Unit): Any?
88+
8189
/**
8290
* Tries to resume this continuation with the specified [exception] and returns a non-null object token if successful,
8391
* or `null` otherwise (it was already resumed or cancelled). When a non-null object is returned,
@@ -112,8 +120,8 @@ public interface CancellableContinuation<in T> : Continuation<T> {
112120
public fun cancel(cause: Throwable? = null): Boolean
113121

114122
/**
115-
* Registers a [handler] to be **synchronously** invoked on cancellation (regular or exceptional) of this continuation.
116-
* When the continuation is already cancelled, the handler will be immediately invoked
123+
* Registers a [handler] to be **synchronously** invoked on [cancellation][cancel] (regular or exceptional) of this continuation.
124+
* When the continuation is already cancelled, the handler is immediately invoked
117125
* with the cancellation exception. Otherwise, the handler will be invoked as soon as this
118126
* continuation is cancelled.
119127
*
@@ -122,7 +130,12 @@ public interface CancellableContinuation<in T> : Continuation<T> {
122130
* processed as an uncaught exception in the context of the current coroutine
123131
* (see [CoroutineExceptionHandler]).
124132
*
125-
* At most one [handler] can be installed on a continuation.
133+
* At most one [handler] can be installed on a continuation. Attempt to call `invokeOnCancellation` second
134+
* time produces [IllegalStateException].
135+
*
136+
* This handler is also called when this continuation [resumes][resume] normally (with a value) and then
137+
* is cancelled while waiting to be dispatched. More generally speaking, this handler is called whenever
138+
* the caller of [suspendCancellableCoroutine] is getting a [CancellationException].
126139
*
127140
* **Note**: Implementation of `CompletionHandler` must be fast, non-blocking, and thread-safe.
128141
* This `handler` can be invoked concurrently with the surrounding code.
@@ -201,40 +214,24 @@ public suspend inline fun <T> suspendCancellableCoroutine(
201214
}
202215

203216
/**
204-
* Suspends the coroutine like [suspendCancellableCoroutine], but with *atomic cancellation*.
205-
*
206-
* When the suspended function throws a [CancellationException], it means that the continuation was not resumed.
207-
* As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
208-
* continue to execute even after it was cancelled from the same thread in the case when the continuation
209-
* was already resumed and was posted for execution to the thread's queue.
210-
*
211-
* @suppress **This an internal API and should not be used from general code.**
212-
*/
213-
@InternalCoroutinesApi
214-
public suspend inline fun <T> suspendAtomicCancellableCoroutine(
215-
crossinline block: (CancellableContinuation<T>) -> Unit
216-
): T =
217-
suspendCoroutineUninterceptedOrReturn { uCont ->
218-
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_ATOMIC_DEFAULT)
219-
block(cancellable)
220-
cancellable.getResult()
221-
}
222-
223-
/**
224-
* Suspends coroutine similar to [suspendAtomicCancellableCoroutine], but an instance of [CancellableContinuationImpl] is reused if possible.
217+
* Suspends the coroutine similar to [suspendCancellableCoroutine], but an instance of
218+
* [CancellableContinuationImpl] is reused.
225219
*/
226-
internal suspend inline fun <T> suspendAtomicCancellableCoroutineReusable(
220+
internal suspend inline fun <T> suspendCancellableCoroutineReusable(
227221
crossinline block: (CancellableContinuation<T>) -> Unit
228222
): T = suspendCoroutineUninterceptedOrReturn { uCont ->
229-
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted())
223+
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted(), resumeMode = MODE_CANCELLABLE_REUSABLE)
230224
block(cancellable)
231225
cancellable.getResult()
232226
}
233227

234-
internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>): CancellableContinuationImpl<T> {
228+
internal fun <T> getOrCreateCancellableContinuation(
229+
delegate: Continuation<T>, resumeMode: Int
230+
): CancellableContinuationImpl<T> {
231+
assert { resumeMode.isReusableMode }
235232
// If used outside of our dispatcher
236233
if (delegate !is DispatchedContinuation<T>) {
237-
return CancellableContinuationImpl(delegate, resumeMode = MODE_ATOMIC_DEFAULT)
234+
return CancellableContinuationImpl(delegate, resumeMode)
238235
}
239236
/*
240237
* Attempt to claim reusable instance.
@@ -250,24 +247,10 @@ internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>):
250247
* thus leaking CC instance for indefinite time.
251248
* 2) Continuation was cancelled. Then we should prevent any further reuse and bail out.
252249
*/
253-
return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetState() }
254-
?: return CancellableContinuationImpl(delegate, MODE_ATOMIC_DEFAULT)
250+
return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetState(resumeMode) }
251+
?: return CancellableContinuationImpl(delegate, resumeMode)
255252
}
256253

257-
/**
258-
* @suppress **Deprecated**
259-
*/
260-
@Deprecated(
261-
message = "holdCancellability parameter is deprecated and is no longer used",
262-
replaceWith = ReplaceWith("suspendAtomicCancellableCoroutine(block)")
263-
)
264-
@InternalCoroutinesApi
265-
public suspend inline fun <T> suspendAtomicCancellableCoroutine(
266-
holdCancellability: Boolean = false,
267-
crossinline block: (CancellableContinuation<T>) -> Unit
268-
): T =
269-
suspendAtomicCancellableCoroutine(block)
270-
271254
/**
272255
* Removes the specified [node] on cancellation.
273256
*/

0 commit comments

Comments
 (0)