Skip to content

Commit e68cb3b

Browse files
committed
Remove atomic cancellation support
This is a problematic for Android when Main dispatcher is cancelled on destroyed activity. Atomic nature of channels is designed to prevent loss of elements, which is really not an issue for a typical application, but creates problem when used with channels. * Internal suspendAtomicCancellableCoroutine -> suspendCancellableCoroutine * Internal suspendAtomicCancellableCoroutineReusable -> suspendCancellableCoroutineReusable * Remove atomic cancellation from docs * Ensures that flowOn does not resume downstream after cancellation. * MODE_ATOMIC_DEFAULT renamed into MODE_ATOMIC * Introduced MODE_CANCELLABLE_REUSABLE to track suspendCancellableCoroutineReusable * Better documentation for MODE_XXX constants. * Added stress test for proper handling of MODE_CANCELLABLE_REUSABLE and fixed test for #1123 bug with job.join (working in MODE_CANCELLABLE) that was not properly failing in the absence of the proper code in CancellableContinuationImpl.getResult * Added test for Flow.combine that should be fixed * Support extended invokeOnCancellation contract * Introduced internal tryResumeAtomic Fixes #1265 Fixes #1813 Fixes #1915
1 parent 074ea3d commit e68cb3b

31 files changed

+491
-371
lines changed

benchmarks/src/jmh/kotlin/benchmarks/tailcall/SimpleChannel.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,12 @@ class NonCancellableChannel : SimpleChannel() {
7070
}
7171

7272
class CancellableChannel : SimpleChannel() {
73-
override suspend fun suspendReceive(): Int = suspendAtomicCancellableCoroutine {
73+
override suspend fun suspendReceive(): Int = suspendCancellableCoroutine {
7474
consumer = it.intercepted()
7575
COROUTINE_SUSPENDED
7676
}
7777

78-
override suspend fun suspendSend(element: Int) = suspendAtomicCancellableCoroutine<Unit> {
78+
override suspend fun suspendSend(element: Int) = suspendCancellableCoroutine<Unit> {
7979
enqueuedValue = element
8080
producer = it.intercepted()
8181
COROUTINE_SUSPENDED
@@ -84,13 +84,13 @@ class CancellableChannel : SimpleChannel() {
8484

8585
class CancellableReusableChannel : SimpleChannel() {
8686
@Suppress("INVISIBLE_MEMBER")
87-
override suspend fun suspendReceive(): Int = suspendAtomicCancellableCoroutineReusable {
87+
override suspend fun suspendReceive(): Int = suspendCancellableCoroutineReusable {
8888
consumer = it.intercepted()
8989
COROUTINE_SUSPENDED
9090
}
9191

9292
@Suppress("INVISIBLE_MEMBER")
93-
override suspend fun suspendSend(element: Int) = suspendAtomicCancellableCoroutineReusable<Unit> {
93+
override suspend fun suspendSend(element: Int) = suspendCancellableCoroutineReusable<Unit> {
9494
enqueuedValue = element
9595
producer = it.intercepted()
9696
COROUTINE_SUSPENDED

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

-3
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,6 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/
8080

8181
public final class kotlinx/coroutines/CancellableContinuationKt {
8282
public static final fun disposeOnCancellation (Lkotlinx/coroutines/CancellableContinuation;Lkotlinx/coroutines/DisposableHandle;)V
83-
public static final fun suspendAtomicCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
84-
public static final fun suspendAtomicCancellableCoroutine (ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
85-
public static synthetic fun suspendAtomicCancellableCoroutine$default (ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
8683
public static final fun suspendCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
8784
}
8885

kotlinx-coroutines-core/common/src/CancellableContinuation.kt

+27-44
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,14 @@ public interface CancellableContinuation<in T> : Continuation<T> {
7676
@InternalCoroutinesApi
7777
public fun tryResume(value: T, idempotent: Any? = null): Any?
7878

79+
/**
80+
* Same as [tryResume] but with [onCancellation] handler that called if and only if the value is not
81+
* delivered to the caller because of the dispatch in the process, so that atomicity delivery
82+
* guaranteed can be provided by having a cancellation fallback.
83+
*/
84+
@InternalCoroutinesApi
85+
public fun tryResumeAtomic(value: T, idempotent: Any?, onCancellation: (cause: Throwable) -> Unit): Any?
86+
7987
/**
8088
* Tries to resume this continuation with the specified [exception] and returns a non-null object token if successful,
8189
* or `null` otherwise (it was already resumed or cancelled). When a non-null object is returned,
@@ -110,8 +118,8 @@ public interface CancellableContinuation<in T> : Continuation<T> {
110118
public fun cancel(cause: Throwable? = null): Boolean
111119

112120
/**
113-
* Registers a [handler] to be **synchronously** invoked on cancellation (regular or exceptional) of this continuation.
114-
* When the continuation is already cancelled, the handler will be immediately invoked
121+
* Registers a [handler] to be **synchronously** invoked on [cancellation][cancel] (regular or exceptional) of this continuation.
122+
* When the continuation is already cancelled, the handler is immediately invoked
115123
* with the cancellation exception. Otherwise, the handler will be invoked as soon as this
116124
* continuation is cancelled.
117125
*
@@ -120,7 +128,12 @@ public interface CancellableContinuation<in T> : Continuation<T> {
120128
* processed as an uncaught exception in the context of the current coroutine
121129
* (see [CoroutineExceptionHandler]).
122130
*
123-
* At most one [handler] can be installed on a continuation.
131+
* At most one [handler] can be installed on a continuation. Attempt to call `invokeOnCancellation` second
132+
* time produces [IllegalStateException].
133+
*
134+
* This handler is also called when this continuation [resumes][resume] normally (with a value) and then
135+
* is cancelled while waiting to be dispatched. More generally speaking, this handler is called whenever
136+
* the caller of [suspendCancellableCoroutine] is getting a [CancellationException].
124137
*
125138
* **Note**: Implementation of `CompletionHandler` must be fast, non-blocking, and thread-safe.
126139
* This `handler` can be invoked concurrently with the surrounding code.
@@ -199,40 +212,24 @@ public suspend inline fun <T> suspendCancellableCoroutine(
199212
}
200213

201214
/**
202-
* Suspends the coroutine like [suspendCancellableCoroutine], but with *atomic cancellation*.
203-
*
204-
* When the suspended function throws a [CancellationException], it means that the continuation was not resumed.
205-
* As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
206-
* continue to execute even after it was cancelled from the same thread in the case when the continuation
207-
* was already resumed and was posted for execution to the thread's queue.
208-
*
209-
* @suppress **This an internal API and should not be used from general code.**
210-
*/
211-
@InternalCoroutinesApi
212-
public suspend inline fun <T> suspendAtomicCancellableCoroutine(
213-
crossinline block: (CancellableContinuation<T>) -> Unit
214-
): T =
215-
suspendCoroutineUninterceptedOrReturn { uCont ->
216-
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_ATOMIC_DEFAULT)
217-
block(cancellable)
218-
cancellable.getResult()
219-
}
220-
221-
/**
222-
* Suspends coroutine similar to [suspendAtomicCancellableCoroutine], but an instance of [CancellableContinuationImpl] is reused if possible.
215+
* Suspends the coroutine similar to [suspendCancellableCoroutine], but an instance of
216+
* [CancellableContinuationImpl] is reused.
223217
*/
224-
internal suspend inline fun <T> suspendAtomicCancellableCoroutineReusable(
218+
internal suspend inline fun <T> suspendCancellableCoroutineReusable(
225219
crossinline block: (CancellableContinuation<T>) -> Unit
226220
): T = suspendCoroutineUninterceptedOrReturn { uCont ->
227-
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted())
221+
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted(), resumeMode = MODE_CANCELLABLE_REUSABLE)
228222
block(cancellable)
229223
cancellable.getResult()
230224
}
231225

232-
internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>): CancellableContinuationImpl<T> {
226+
internal fun <T> getOrCreateCancellableContinuation(
227+
delegate: Continuation<T>, resumeMode: Int
228+
): CancellableContinuationImpl<T> {
229+
assert { resumeMode.isReusableMode }
233230
// If used outside of our dispatcher
234231
if (delegate !is DispatchedContinuation<T>) {
235-
return CancellableContinuationImpl(delegate, resumeMode = MODE_ATOMIC_DEFAULT)
232+
return CancellableContinuationImpl(delegate, resumeMode)
236233
}
237234
/*
238235
* Attempt to claim reusable instance.
@@ -248,24 +245,10 @@ internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>):
248245
* thus leaking CC instance for indefinite time.
249246
* 2) Continuation was cancelled. Then we should prevent any further reuse and bail out.
250247
*/
251-
return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetState() }
252-
?: return CancellableContinuationImpl(delegate, MODE_ATOMIC_DEFAULT)
248+
return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetState(resumeMode) }
249+
?: return CancellableContinuationImpl(delegate, resumeMode)
253250
}
254251

255-
/**
256-
* @suppress **Deprecated**
257-
*/
258-
@Deprecated(
259-
message = "holdCancellability parameter is deprecated and is no longer used",
260-
replaceWith = ReplaceWith("suspendAtomicCancellableCoroutine(block)")
261-
)
262-
@InternalCoroutinesApi
263-
public suspend inline fun <T> suspendAtomicCancellableCoroutine(
264-
holdCancellability: Boolean = false,
265-
crossinline block: (CancellableContinuation<T>) -> Unit
266-
): T =
267-
suspendAtomicCancellableCoroutine(block)
268-
269252
/**
270253
* Removes the specified [node] on cancellation.
271254
*/

0 commit comments

Comments
 (0)