Skip to content

Commit b71481d

Browse files
elizarovLouisCADqwwdfsad
authored andcommitted
Breaking: Get rid of atomic cancellation and provide a replacement (Kotlin#1937)
This is a problematic for Android when Main dispatcher is cancelled on destroyed activity. Atomic nature of channels is designed to prevent loss of elements, which is really not an issue for a typical application, but creates problem when used with channels. * Internal suspendAtomicCancellableCoroutine -> suspendCancellableCoroutine * Internal suspendAtomicCancellableCoroutineReusable -> suspendCancellableCoroutineReusable * Remove atomic cancellation from docs * Ensures that flowOn does not resume downstream after cancellation. * MODE_ATOMIC_DEFAULT renamed into MODE_ATOMIC * Introduced MODE_CANCELLABLE_REUSABLE to track suspendCancellableCoroutineReusable * Better documentation for MODE_XXX constants. * Added stress test for proper handling of MODE_CANCELLABLE_REUSABLE and fixed test for Kotlin#1123 bug with job.join (working in MODE_CANCELLABLE) that was not properly failing in the absence of the proper code in CancellableContinuationImpl.getResult * Added test for Flow.combine that should be fixed * Support extended invokeOnCancellation contract * Introduced internal tryResumeAtomic * Channel onUnderliveredElement is introduced as a replacement. Fixes Kotlin#1265 Fixes Kotlin#1813 Fixes Kotlin#1915 Fixes Kotlin#1936 Co-authored-by: Louis CAD <[email protected]> Co-authored-by: Vsevolod Tolstopyatov <[email protected]>
1 parent 892f278 commit b71481d

File tree

72 files changed

+2098
-679
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+2098
-679
lines changed

benchmarks/src/jmh/kotlin/benchmarks/tailcall/SimpleChannel.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,12 @@ class NonCancellableChannel : SimpleChannel() {
7070
}
7171

7272
class CancellableChannel : SimpleChannel() {
73-
override suspend fun suspendReceive(): Int = suspendAtomicCancellableCoroutine {
73+
override suspend fun suspendReceive(): Int = suspendCancellableCoroutine {
7474
consumer = it.intercepted()
7575
COROUTINE_SUSPENDED
7676
}
7777

78-
override suspend fun suspendSend(element: Int) = suspendAtomicCancellableCoroutine<Unit> {
78+
override suspend fun suspendSend(element: Int) = suspendCancellableCoroutine<Unit> {
7979
enqueuedValue = element
8080
producer = it.intercepted()
8181
COROUTINE_SUSPENDED
@@ -84,13 +84,13 @@ class CancellableChannel : SimpleChannel() {
8484

8585
class CancellableReusableChannel : SimpleChannel() {
8686
@Suppress("INVISIBLE_MEMBER")
87-
override suspend fun suspendReceive(): Int = suspendAtomicCancellableCoroutineReusable {
87+
override suspend fun suspendReceive(): Int = suspendCancellableCoroutineReusable {
8888
consumer = it.intercepted()
8989
COROUTINE_SUSPENDED
9090
}
9191

9292
@Suppress("INVISIBLE_MEMBER")
93-
override suspend fun suspendSend(element: Int) = suspendAtomicCancellableCoroutineReusable<Unit> {
93+
override suspend fun suspendSend(element: Int) = suspendCancellableCoroutineReusable<Unit> {
9494
enqueuedValue = element
9595
producer = it.intercepted()
9696
COROUTINE_SUSPENDED

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+7-8
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public abstract interface class kotlinx/coroutines/CancellableContinuation : kot
4646
public abstract fun resumeUndispatched (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Object;)V
4747
public abstract fun resumeUndispatchedWithException (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Throwable;)V
4848
public abstract fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
49+
public abstract fun tryResume (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
4950
public abstract fun tryResumeWithException (Ljava/lang/Throwable;)Ljava/lang/Object;
5051
}
5152

@@ -56,6 +57,8 @@ public final class kotlinx/coroutines/CancellableContinuation$DefaultImpls {
5657

5758
public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/jvm/internal/CoroutineStackFrame, kotlinx/coroutines/CancellableContinuation {
5859
public fun <init> (Lkotlin/coroutines/Continuation;I)V
60+
public final fun callCancelHandler (Lkotlinx/coroutines/CancelHandler;Ljava/lang/Throwable;)V
61+
public final fun callOnCancellation (Lkotlin/jvm/functions/Function1;Ljava/lang/Throwable;)V
5962
public fun cancel (Ljava/lang/Throwable;)Z
6063
public fun completeResume (Ljava/lang/Object;)V
6164
public fun getCallerFrame ()Lkotlin/coroutines/jvm/internal/CoroutineStackFrame;
@@ -75,14 +78,12 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/
7578
public fun resumeWith (Ljava/lang/Object;)V
7679
public fun toString ()Ljava/lang/String;
7780
public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
81+
public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
7882
public fun tryResumeWithException (Ljava/lang/Throwable;)Ljava/lang/Object;
7983
}
8084

8185
public final class kotlinx/coroutines/CancellableContinuationKt {
8286
public static final fun disposeOnCancellation (Lkotlinx/coroutines/CancellableContinuation;Lkotlinx/coroutines/DisposableHandle;)V
83-
public static final fun suspendAtomicCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
84-
public static final fun suspendAtomicCancellableCoroutine (ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
85-
public static synthetic fun suspendAtomicCancellableCoroutine$default (ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
8687
public static final fun suspendCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
8788
}
8889

@@ -272,10 +273,6 @@ public final class kotlinx/coroutines/DelayKt {
272273
public static final fun delay-p9JZ4hM (DLkotlin/coroutines/Continuation;)Ljava/lang/Object;
273274
}
274275

275-
public final class kotlinx/coroutines/DispatchedContinuationKt {
276-
public static final fun resumeCancellableWith (Lkotlin/coroutines/Continuation;Ljava/lang/Object;)V
277-
}
278-
279276
public final class kotlinx/coroutines/Dispatchers {
280277
public static final field INSTANCE Lkotlinx/coroutines/Dispatchers;
281278
public static final fun getDefault ()Lkotlinx/coroutines/CoroutineDispatcher;
@@ -613,8 +610,10 @@ public final class kotlinx/coroutines/channels/ChannelIterator$DefaultImpls {
613610
}
614611

615612
public final class kotlinx/coroutines/channels/ChannelKt {
616-
public static final fun Channel (I)Lkotlinx/coroutines/channels/Channel;
613+
public static final synthetic fun Channel (I)Lkotlinx/coroutines/channels/Channel;
614+
public static final fun Channel (ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/channels/Channel;
617615
public static synthetic fun Channel$default (IILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel;
616+
public static synthetic fun Channel$default (ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel;
618617
}
619618

620619
public final class kotlinx/coroutines/channels/ChannelsKt {

kotlinx-coroutines-core/common/src/Await.kt

+13-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package kotlinx.coroutines
66

77
import kotlinx.atomicfu.*
8+
import kotlinx.coroutines.channels.*
89
import kotlin.coroutines.*
910

1011
/**
@@ -18,6 +19,8 @@ import kotlin.coroutines.*
1819
* This suspending function is cancellable.
1920
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
2021
* this function immediately resumes with [CancellationException].
22+
* There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
23+
* suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
2124
*/
2225
public suspend fun <T> awaitAll(vararg deferreds: Deferred<T>): List<T> =
2326
if (deferreds.isEmpty()) emptyList() else AwaitAll(deferreds).await()
@@ -33,6 +36,8 @@ public suspend fun <T> awaitAll(vararg deferreds: Deferred<T>): List<T> =
3336
* This suspending function is cancellable.
3437
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
3538
* this function immediately resumes with [CancellationException].
39+
* There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
40+
* suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
3641
*/
3742
public suspend fun <T> Collection<Deferred<T>>.awaitAll(): List<T> =
3843
if (isEmpty()) emptyList() else AwaitAll(toTypedArray()).await()
@@ -41,17 +46,23 @@ public suspend fun <T> Collection<Deferred<T>>.awaitAll(): List<T> =
4146
* Suspends current coroutine until all given jobs are complete.
4247
* This method is semantically equivalent to joining all given jobs one by one with `jobs.forEach { it.join() }`.
4348
*
44-
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
49+
* This suspending function is cancellable.
50+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
4551
* this function immediately resumes with [CancellationException].
52+
* There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
53+
* suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
4654
*/
4755
public suspend fun joinAll(vararg jobs: Job): Unit = jobs.forEach { it.join() }
4856

4957
/**
5058
* Suspends current coroutine until all given jobs are complete.
5159
* This method is semantically equivalent to joining all given jobs one by one with `forEach { it.join() }`.
5260
*
53-
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
61+
* This suspending function is cancellable.
62+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
5463
* this function immediately resumes with [CancellationException].
64+
* There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
65+
* suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
5566
*/
5667
public suspend fun Collection<Job>.joinAll(): Unit = forEach { it.join() }
5768

kotlinx-coroutines-core/common/src/Builders.common.kt

+3-2
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,9 @@ private class LazyDeferredCoroutine<T>(
129129
* This function uses dispatcher from the new context, shifting execution of the [block] into the
130130
* different thread if a new dispatcher is specified, and back to the original dispatcher
131131
* when it completes. Note that the result of `withContext` invocation is
132-
* dispatched into the original context in a cancellable way, which means that if the original [coroutineContext],
133-
* in which `withContext` was invoked, is cancelled by the time its dispatcher starts to execute the code,
132+
* dispatched into the original context in a cancellable way with a **prompt cancellation guarantee**,
133+
* which means that if the original [coroutineContext], in which `withContext` was invoked,
134+
* is cancelled by the time its dispatcher starts to execute the code,
134135
* it discards the result of `withContext` and throws [CancellationException].
135136
*/
136137
public suspend fun <T> withContext(

0 commit comments

Comments
 (0)