diff --git a/benchmarks/src/jmh/kotlin/benchmarks/tailcall/SimpleChannel.kt b/benchmarks/src/jmh/kotlin/benchmarks/tailcall/SimpleChannel.kt index c217fcae91..7d3f3abc93 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/tailcall/SimpleChannel.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/tailcall/SimpleChannel.kt @@ -70,12 +70,12 @@ class NonCancellableChannel : SimpleChannel() { } class CancellableChannel : SimpleChannel() { - override suspend fun suspendReceive(): Int = suspendAtomicCancellableCoroutine { + override suspend fun suspendReceive(): Int = suspendCancellableCoroutine { consumer = it.intercepted() COROUTINE_SUSPENDED } - override suspend fun suspendSend(element: Int) = suspendAtomicCancellableCoroutine { + override suspend fun suspendSend(element: Int) = suspendCancellableCoroutine { enqueuedValue = element producer = it.intercepted() COROUTINE_SUSPENDED @@ -84,13 +84,13 @@ class CancellableChannel : SimpleChannel() { class CancellableReusableChannel : SimpleChannel() { @Suppress("INVISIBLE_MEMBER") - override suspend fun suspendReceive(): Int = suspendAtomicCancellableCoroutineReusable { + override suspend fun suspendReceive(): Int = suspendCancellableCoroutineReusable(MODE_ATOMIC_REUSABLE) { consumer = it.intercepted() COROUTINE_SUSPENDED } @Suppress("INVISIBLE_MEMBER") - override suspend fun suspendSend(element: Int) = suspendAtomicCancellableCoroutineReusable { + override suspend fun suspendSend(element: Int) = suspendCancellableCoroutineReusable(MODE_ATOMIC_REUSABLE) { enqueuedValue = element producer = it.intercepted() COROUTINE_SUSPENDED diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 54e355ec37..cfd404ce18 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -80,9 +80,6 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/ public final class kotlinx/coroutines/CancellableContinuationKt { public static final fun disposeOnCancellation (Lkotlinx/coroutines/CancellableContinuation;Lkotlinx/coroutines/DisposableHandle;)V - public static final fun suspendAtomicCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static final fun suspendAtomicCancellableCoroutine (ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static synthetic fun suspendAtomicCancellableCoroutine$default (ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public static final fun suspendCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } @@ -548,6 +545,7 @@ public abstract interface class kotlinx/coroutines/channels/ActorScope : kotlinx public final class kotlinx/coroutines/channels/ActorScope$DefaultImpls { public static synthetic fun cancel (Lkotlinx/coroutines/channels/ActorScope;)V + public static synthetic fun receiveOrClosed (Lkotlinx/coroutines/channels/ActorScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } public abstract interface class kotlinx/coroutines/channels/BroadcastChannel : kotlinx/coroutines/channels/SendChannel { @@ -583,6 +581,7 @@ public abstract interface class kotlinx/coroutines/channels/Channel : kotlinx/co public final class kotlinx/coroutines/channels/Channel$DefaultImpls { public static synthetic fun cancel (Lkotlinx/coroutines/channels/Channel;)V + public static synthetic fun receiveOrClosed (Lkotlinx/coroutines/channels/Channel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } public final class kotlinx/coroutines/channels/Channel$Factory { @@ -779,7 +778,8 @@ public abstract interface class kotlinx/coroutines/channels/ReceiveChannel { public abstract fun iterator ()Lkotlinx/coroutines/channels/ChannelIterator; public abstract fun poll ()Ljava/lang/Object; public abstract fun receive (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public abstract fun receiveOrClosed (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract synthetic fun receiveOrClosed (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun receiveOrClosed (ZLkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun receiveOrNull (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } @@ -787,6 +787,8 @@ public final class kotlinx/coroutines/channels/ReceiveChannel$DefaultImpls { public static synthetic fun cancel (Lkotlinx/coroutines/channels/ReceiveChannel;)V public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/lang/Throwable;ILjava/lang/Object;)Z public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/concurrent/CancellationException;ILjava/lang/Object;)V + public static synthetic fun receiveOrClosed (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun receiveOrClosed$default (Lkotlinx/coroutines/channels/ReceiveChannel;ZLkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; } public abstract interface class kotlinx/coroutines/channels/SendChannel { diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt index e01dc82521..f0e3b9d2a2 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt @@ -204,40 +204,32 @@ public suspend inline fun suspendCancellableCoroutine( } /** - * Suspends the coroutine like [suspendCancellableCoroutine], but with *atomic cancellation*. + * Suspends the coroutine similar to [suspendCancellableCoroutine], but an instance of + * [CancellableContinuationImpl] is reused. * - * When the suspended function throws a [CancellationException], it means that the continuation was not resumed. - * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may - * continue to execute even after it was cancelled from the same thread in the case when the continuation - * was already resumed and was posted for execution to the thread's queue. - * - * @suppress **This an internal API and should not be used from general code.** - */ -@InternalCoroutinesApi -public suspend inline fun suspendAtomicCancellableCoroutine( - crossinline block: (CancellableContinuation) -> Unit -): T = - suspendCoroutineUninterceptedOrReturn { uCont -> - val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_ATOMIC_DEFAULT) - block(cancellable) - cancellable.getResult() - } - -/** - * Suspends coroutine similar to [suspendAtomicCancellableCoroutine], but an instance of [CancellableContinuationImpl] is reused if possible. + * * when [resumeMode] is [MODE_CANCELLABLE_REUSABLE] works like [suspendCancellableCoroutine]. + * * when [resumeMode] is [MODE_ATOMIC_REUSABLE] it has *atomic cancellation*. + * When the suspended function throws a [CancellationException], it means that the continuation was not resumed. + * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may + * continue to execute even after it was cancelled from the same thread in the case when the continuation + * was already resumed and was posted for execution to the thread's queue. */ -internal suspend inline fun suspendAtomicCancellableCoroutineReusable( +internal suspend inline fun suspendCancellableCoroutineReusable( + resumeMode: Int, crossinline block: (CancellableContinuation) -> Unit ): T = suspendCoroutineUninterceptedOrReturn { uCont -> - val cancellable = getOrCreateCancellableContinuation(uCont.intercepted()) + val cancellable = getOrCreateCancellableContinuation(uCont.intercepted(), resumeMode) block(cancellable) cancellable.getResult() } -internal fun getOrCreateCancellableContinuation(delegate: Continuation): CancellableContinuationImpl { +internal fun getOrCreateCancellableContinuation( + delegate: Continuation, resumeMode: Int +): CancellableContinuationImpl { + assert { resumeMode.isReusableMode } // If used outside of our dispatcher if (delegate !is DispatchedContinuation) { - return CancellableContinuationImpl(delegate, resumeMode = MODE_ATOMIC_DEFAULT) + return CancellableContinuationImpl(delegate, resumeMode) } /* * Attempt to claim reusable instance. @@ -253,24 +245,10 @@ internal fun getOrCreateCancellableContinuation(delegate: Continuation): * thus leaking CC instance for indefinite time. * 2) Continuation was cancelled. Then we should prevent any further reuse and bail out. */ - return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetState() } - ?: return CancellableContinuationImpl(delegate, MODE_ATOMIC_DEFAULT) + return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetState(resumeMode) } + ?: return CancellableContinuationImpl(delegate, resumeMode) } -/** - * @suppress **Deprecated** - */ -@Deprecated( - message = "holdCancellability parameter is deprecated and is no longer used", - replaceWith = ReplaceWith("suspendAtomicCancellableCoroutine(block)") -) -@InternalCoroutinesApi -public suspend inline fun suspendAtomicCancellableCoroutine( - holdCancellability: Boolean = false, - crossinline block: (CancellableContinuation) -> Unit -): T = - suspendAtomicCancellableCoroutine(block) - /** * Removes the specified [node] on cancellation. */ diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt index 1f67dd3c6c..9eedd3a79c 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt @@ -88,11 +88,11 @@ internal open class CancellableContinuationImpl( private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable(this) /** - * Resets cancellability state in order to [suspendAtomicCancellableCoroutineReusable] to work. - * Invariant: used only by [suspendAtomicCancellableCoroutineReusable] in [REUSABLE_CLAIMED] state. + * Resets cancellability state in order to [suspendCancellableCoroutineReusable] to work. + * Invariant: used only by [suspendCancellableCoroutineReusable] in [REUSABLE_CLAIMED] state. */ @JvmName("resetState") // Prettier stack traces - internal fun resetState(): Boolean { + internal fun resetState(resumeMode: Int): Boolean { assert { parentHandle !== NonDisposableHandle } val state = _state.value assert { state !is NotCompleted } @@ -102,6 +102,7 @@ internal open class CancellableContinuationImpl( } _decision.value = UNDECIDED _state.value = Active + this.resumeMode = resumeMode return true } @@ -129,7 +130,7 @@ internal open class CancellableContinuationImpl( private fun checkCompleted(): Boolean { val completed = isCompleted - if (resumeMode != MODE_ATOMIC_DEFAULT) return completed // Do not check postponed cancellation for non-reusable continuations + if (!resumeMode.isReusableMode) return completed // Do not check postponed cancellation for non-reusable continuations val dispatched = delegate as? DispatchedContinuation<*> ?: return completed val cause = dispatched.checkPostponedCancellation(this) ?: return completed if (!completed) { @@ -158,7 +159,7 @@ internal open class CancellableContinuationImpl( * Attempt to postpone cancellation for reusable cancellable continuation */ private fun cancelLater(cause: Throwable): Boolean { - if (resumeMode != MODE_ATOMIC_DEFAULT) return false + if (!resumeMode.isReusableMode) return false val dispatched = (delegate as? DispatchedContinuation<*>) ?: return false return dispatched.postponeCancellation(cause) } @@ -173,7 +174,7 @@ internal open class CancellableContinuationImpl( if (state is CancelHandler) invokeHandlerSafely { state.invoke(cause) } // Complete state update detachChildIfNonResuable() - dispatchResume(mode = MODE_ATOMIC_DEFAULT) + dispatchResume(mode = MODE_ATOMIC) // no need for additional cancellation checks return true } } @@ -231,10 +232,10 @@ internal open class CancellableContinuationImpl( val state = this.state if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this) // if the parent job was already cancelled, then throw the corresponding cancellation exception - // otherwise, there is a race is suspendCancellableCoroutine { cont -> ... } does cont.resume(...) + // otherwise, there is a race if suspendCancellableCoroutine { cont -> ... } does cont.resume(...) // before the block returns. This getResult would return a result as opposed to cancellation // exception that should have happened if the continuation is dispatched for execution later. - if (resumeMode == MODE_CANCELLABLE) { + if (resumeMode.isCancellableMode) { val job = context[Job] if (job != null && !job.isActive) { val cause = job.getCancellationException() diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt index 28c7ceabe1..89d05a55f9 100644 --- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt @@ -165,7 +165,7 @@ internal abstract class AbstractSendChannel : SendChannel { return closed.sendException } - private suspend fun sendSuspend(element: E): Unit = suspendAtomicCancellableCoroutineReusable sc@ { cont -> + private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutineReusable(MODE_ATOMIC_REUSABLE) sc@ { cont -> loop@ while (true) { if (isFullImpl) { val send = SendElement(element, cont) @@ -543,11 +543,13 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel) return result as E // slow-path does suspend - return receiveSuspend(RECEIVE_THROWS_ON_CLOSE) + return receiveSuspend(RECEIVE_THROWS_ON_CLOSE, MODE_ATOMIC_REUSABLE) } @Suppress("UNCHECKED_CAST") - private suspend fun receiveSuspend(receiveMode: Int): R = suspendAtomicCancellableCoroutineReusable sc@ { cont -> + private suspend fun receiveSuspend( + receiveMode: Int, resumeMode: Int + ): R = suspendCancellableCoroutineReusable(resumeMode) sc@ { cont -> val receive = ReceiveElement(cont as CancellableContinuation, receiveMode) while (true) { if (enqueueReceive(receive)) { @@ -581,7 +583,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel) return result as E // slow-path does suspend - return receiveSuspend(RECEIVE_NULL_ON_CLOSE) + return receiveSuspend(RECEIVE_NULL_ON_CLOSE, MODE_ATOMIC_REUSABLE) } @Suppress("UNCHECKED_CAST") @@ -594,12 +596,12 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel { + public final override suspend fun receiveOrClosed(atomic: Boolean): ValueOrClosed { // fast path -- try poll non-blocking val result = pollInternal() if (result !== POLL_FAILED) return result.toResult() // slow-path does suspend - return receiveSuspend(RECEIVE_RESULT) + return receiveSuspend(RECEIVE_RESULT, if (atomic) MODE_ATOMIC_REUSABLE else MODE_CANCELLABLE_REUSABLE) } @Suppress("UNCHECKED_CAST") @@ -814,7 +816,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel + private suspend fun hasNextSuspend(): Boolean = suspendCancellableCoroutineReusable(MODE_ATOMIC_REUSABLE) sc@ { cont -> val receive = ReceiveHasNext(this, cont) while (true) { if (channel.enqueueReceive(receive)) { diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt index c4b4a9b25e..351db86c30 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channel.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt @@ -251,8 +251,8 @@ public interface ReceiveChannel { * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this * function is suspended, this function immediately resumes with a [CancellationException]. * - * *Cancellation of suspended `receive` is atomic*: when this function - * throws a [CancellationException], it means that the element was not retrieved from this channel. + * If [atomic] is set to `true` (by default) then *cancellation of suspended `receive` is atomic*: + * when this function throws a [CancellationException], it means that the element was not retrieved from this channel. * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may * continue to execute even after it was cancelled from the same thread in the case when this receive operation * was already resumed and the continuation was posted for execution to the thread's queue. @@ -267,7 +267,10 @@ public interface ReceiveChannel { * [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed. */ @InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed - public suspend fun receiveOrClosed(): ValueOrClosed + public suspend fun receiveOrClosed(atomic: Boolean = true): ValueOrClosed + + @Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility") // Since version 1.4.0 + public suspend fun receiveOrClosed(): ValueOrClosed = receiveOrClosed(atomic = true) /** * Clause for the [select] expression of the [receiveOrClosed] suspending function that selects with the [ValueOrClosed] with a value diff --git a/kotlinx-coroutines-core/common/src/flow/Channels.kt b/kotlinx-coroutines-core/common/src/flow/Channels.kt index 130ffc72dc..0b4612c12c 100644 --- a/kotlinx-coroutines-core/common/src/flow/Channels.kt +++ b/kotlinx-coroutines-core/common/src/flow/Channels.kt @@ -20,8 +20,11 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow * the channel afterwards. If you need to iterate over the channel without consuming it, * a regular `for` loop should be used instead. * - * This function provides a more efficient shorthand for `channel.consumeEach { value -> emit(value) }`. - * See [consumeEach][ReceiveChannel.consumeEach]. + * Note, that emitting values from a channel into a flow is not atomic. A value that was received from the + * channel many not reach the flow collector if it was cancelled and will be lost. + * + * This function provides a more efficient shorthand for `channel.consumeEach { value -> emit(value) }` modulo + * atomicity. See [consumeEach][ReceiveChannel.consumeEach]. */ @ExperimentalCoroutinesApi // since version 1.3.0 public suspend fun FlowCollector.emitAll(channel: ReceiveChannel): Unit = @@ -45,7 +48,7 @@ private suspend fun FlowCollector.emitAllImpl(channel: ReceiveChannel, // L$1 <- channel // L$2 <- cause // L$3 <- this$run (actually equal to this) - val result = run { channel.receiveOrClosed() } + val result = run { channel.receiveOrClosed(atomic = false) } if (result.isClosed) { result.closeCause?.let { throw it } break // returns normally when result.closeCause == null diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt index 4e6167f0cd..c0fe72fcb4 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt @@ -170,13 +170,17 @@ public fun Flow.conflate(): Flow = buffer(CONFLATED) * * For more explanation of context preservation please refer to [Flow] documentation. * - * This operators retains a _sequential_ nature of flow if changing the context does not call for changing + * This operator retains a _sequential_ nature of flow if changing the context does not call for changing * the [dispatcher][CoroutineDispatcher]. Otherwise, if changing dispatcher is required, it collects * flow emissions in one coroutine that is run using a specified [context] and emits them from another coroutines * with the original collector's context using a channel with a [default][Channel.BUFFERED] buffer size * between two coroutines similarly to [buffer] operator, unless [buffer] operator is explicitly called * before or after `flowOn`, which requests buffering behavior and specifies channel size. * + * Note, that flows operating across different dispatchers might lose some in-flight elements when cancelled. + * In particular, this operator ensures that downstream flow does not resume on cancellation even if the element + * was already emitted by the upstream flow. + * * ### Operator fusion * * Adjacent applications of [channelFlow], [flowOn], [buffer], [produceIn], and [broadcastIn] are diff --git a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt index cf31fcf07d..45b9a232a0 100644 --- a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt +++ b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt @@ -19,7 +19,7 @@ internal val REUSABLE_CLAIMED = Symbol("REUSABLE_CLAIMED") internal class DispatchedContinuation( @JvmField val dispatcher: CoroutineDispatcher, @JvmField val continuation: Continuation -) : DispatchedTask(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation by continuation { +) : DispatchedTask(MODE_ATOMIC), CoroutineStackFrame, Continuation by continuation { @JvmField @Suppress("PropertyName") internal var _state: Any? = UNDEFINED @@ -43,7 +43,7 @@ internal class DispatchedContinuation( * } * // state == CC * ``` - * 4) [Throwable] continuation was cancelled with this cause while being in [suspendAtomicCancellableCoroutineReusable], + * 4) [Throwable] continuation was cancelled with this cause while being in [suspendCancellableCoroutineReusable], * [CancellableContinuationImpl.getResult] will check for cancellation later. * * [REUSABLE_CLAIMED] state is required to prevent the lost resume in the channel. @@ -83,7 +83,7 @@ internal class DispatchedContinuation( } /** - * Claims the continuation for [suspendAtomicCancellableCoroutineReusable] block, + * Claims the continuation for [suspendCancellableCoroutineReusable] block, * so all cancellations will be postponed. */ @Suppress("UNCHECKED_CAST") @@ -180,10 +180,10 @@ internal class DispatchedContinuation( val state = result.toState() if (dispatcher.isDispatchNeeded(context)) { _state = state - resumeMode = MODE_ATOMIC_DEFAULT + resumeMode = MODE_ATOMIC dispatcher.dispatch(context, this) } else { - executeUnconfined(state, MODE_ATOMIC_DEFAULT) { + executeUnconfined(state, MODE_ATOMIC) { withCoroutineContext(this.context, countOrElement) { continuation.resumeWith(result) } diff --git a/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt b/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt index 9588d22b17..99fbb09758 100644 --- a/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt +++ b/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt @@ -8,12 +8,44 @@ import kotlinx.coroutines.internal.* import kotlin.coroutines.* import kotlin.jvm.* -@PublishedApi internal const val MODE_ATOMIC_DEFAULT = 0 // schedule non-cancellable dispatch for suspendCoroutine -@PublishedApi internal const val MODE_CANCELLABLE = 1 // schedule cancellable dispatch for suspendCancellableCoroutine -@PublishedApi internal const val MODE_UNDISPATCHED = 2 // when the thread is right, but need to mark it with current coroutine +/** + * Non-cancellable dispatch mode. + * + * **DO NOT CHANGE THE CONSTANT VALUE**. It might be inlined into legacy user code that was calling + * inline `suspendAtomicCancellableCoroutine` function and did not support reuse. + */ +internal const val MODE_ATOMIC = 0 + +/** + * Cancellable dispatch mode. It is used by user-facing [suspendCancellableCoroutine]. + * Note, that implementation of cancellability checks mode via [Int.isCancellableMode] extension. + * + * **DO NOT CHANGE THE CONSTANT VALUE**. It is being into the user code from [suspendCancellableCoroutine]. + */ +@PublishedApi +internal const val MODE_CANCELLABLE = 1 + +/** + * Atomic dispatch mode for [suspendCancellableCoroutineReusable]. + * Note, that implementation of reuse checks mode via [Int.isReusableMode] extension. + */ +internal const val MODE_ATOMIC_REUSABLE = 2 + +/** + * Cancellable dispatch mode for [suspendCancellableCoroutineReusable]. + * Note, that implementation of cancellability checks mode via [Int.isCancellableMode] extension; + * implementation of reuse checks mode via [Int.isReusableMode] extension. + */ +internal const val MODE_CANCELLABLE_REUSABLE = 3 + +/** + * Undispatched mode for [CancellableContinuation.resumeUndispatched]. + * It is used when the thread is right, but it needs to be mark it with the current coroutine. + */ +internal const val MODE_UNDISPATCHED = 4 -internal val Int.isCancellableMode get() = this == MODE_CANCELLABLE -internal val Int.isDispatchedMode get() = this == MODE_ATOMIC_DEFAULT || this == MODE_CANCELLABLE +internal val Int.isCancellableMode get() = this == MODE_CANCELLABLE || this == MODE_CANCELLABLE_REUSABLE +internal val Int.isReusableMode get() = this == MODE_ATOMIC_REUSABLE || this == MODE_CANCELLABLE_REUSABLE internal abstract class DispatchedTask( @JvmField public var resumeMode: Int @@ -98,7 +130,7 @@ internal abstract class DispatchedTask( internal fun DispatchedTask.dispatch(mode: Int) { val delegate = this.delegate - if (mode.isDispatchedMode && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) { + if (mode != MODE_UNDISPATCHED && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) { // dispatch directly using this instance's Runnable implementation val dispatcher = delegate.dispatcher val context = delegate.context @@ -119,8 +151,8 @@ internal fun DispatchedTask.resume(delegate: Continuation, useMode: In val exception = getExceptionalResult(state)?.let { recoverStackTrace(it, delegate) } val result = if (exception != null) Result.failure(exception) else Result.success(state as T) when (useMode) { - MODE_ATOMIC_DEFAULT -> delegate.resumeWith(result) - MODE_CANCELLABLE -> delegate.resumeCancellableWith(result) + MODE_ATOMIC, MODE_ATOMIC_REUSABLE -> delegate.resumeWith(result) + MODE_CANCELLABLE, MODE_CANCELLABLE_REUSABLE -> delegate.resumeCancellableWith(result) MODE_UNDISPATCHED -> (delegate as DispatchedContinuation).resumeUndispatchedWith(result) else -> error("Invalid mode $useMode") } diff --git a/kotlinx-coroutines-core/common/src/sync/Mutex.kt b/kotlinx-coroutines-core/common/src/sync/Mutex.kt index 1b11bc96cc..7411417314 100644 --- a/kotlinx-coroutines-core/common/src/sync/Mutex.kt +++ b/kotlinx-coroutines-core/common/src/sync/Mutex.kt @@ -188,7 +188,7 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2 { return lockSuspend(owner) } - private suspend fun lockSuspend(owner: Any?) = suspendAtomicCancellableCoroutineReusable sc@ { cont -> + private suspend fun lockSuspend(owner: Any?) = suspendCancellableCoroutineReusable(MODE_ATOMIC_REUSABLE) sc@ { cont -> val waiter = LockCont(owner, cont) _state.loop { state -> when (state) { diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt index aa7ed63d3d..02e6b554ae 100644 --- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt +++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt @@ -8,7 +8,6 @@ import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.internal.* import kotlin.coroutines.* -import kotlin.jvm.* import kotlin.math.* import kotlin.native.concurrent.* @@ -136,7 +135,7 @@ private class SemaphoreImpl( cur + 1 } - private suspend fun addToQueueAndSuspend() = suspendAtomicCancellableCoroutineReusable sc@ { cont -> + private suspend fun addToQueueAndSuspend() = suspendCancellableCoroutineReusable(MODE_ATOMIC_REUSABLE) sc@ { cont -> val last = this.tail val enqIdx = enqIdx.getAndIncrement() val segment = getSegment(last, enqIdx / SEGMENT_SIZE) diff --git a/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt b/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt index 69d8fd03e3..344782316a 100644 --- a/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt +++ b/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt @@ -39,7 +39,7 @@ private class ChannelViaBroadcast( override suspend fun receive(): E = sub.receive() override suspend fun receiveOrNull(): E? = sub.receiveOrNull() - override suspend fun receiveOrClosed(): ValueOrClosed = sub.receiveOrClosed() + override suspend fun receiveOrClosed(atomic: Boolean): ValueOrClosed = sub.receiveOrClosed(atomic) override fun poll(): E? = sub.poll() override fun iterator(): ChannelIterator = sub.iterator() diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt index 802ba1ef2f..eedfac2ea3 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt @@ -134,15 +134,14 @@ class CatchTest : TestBase() { // flowOn with a different dispatcher introduces asynchrony so that all exceptions in the // upstream flows are handled before they go downstream .onEach { value -> - expect(8) - assertEquals("OK", value) + expectUnreached() // already cancelled } .catch { e -> - expect(9) + expect(8) assertTrue(e is TestException) assertSame(d0, kotlin.coroutines.coroutineContext[ContinuationInterceptor] as CoroutineContext) } .collect() - finish(10) + finish(9) } } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt index a619355b68..b4641f9c41 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.flow @@ -238,6 +238,20 @@ abstract class CombineTestBase : TestBase() { assertFailsWith(flow) finish(7) } + + @Test + fun testCancelledCombine() = runTest { + coroutineScope { + val flow = flow { + emit(Unit) // emit to buffer + cancel() // now cancel + } + flow.combineLatest(flow) { u, _ -> u }.collect { + // should not be reached, because cancelled before it runs + expectUnreached() + } + } + } } class CombineTest : CombineTestBase() { diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt index f8350ff584..0eae1a3860 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt @@ -341,4 +341,19 @@ class FlowOnTest : TestBase() { assertEquals(expected, value) } } + + @Test + fun testCancelledFlowOn() = runTest { + assertFailsWith { + coroutineScope { + flow { + emit(Unit) // emit to buffer + cancel() // now cancel + }.flowOn(wrapperDispatcher()).collect { + // should not be reached, because cancelled before it runs + expectUnreached() + } + } + } + } } diff --git a/kotlinx-coroutines-core/jvm/test/JobStructuredJoinStressTest.kt b/kotlinx-coroutines-core/jvm/test/JobStructuredJoinStressTest.kt index ec3635ca36..9528efd71c 100644 --- a/kotlinx-coroutines-core/jvm/test/JobStructuredJoinStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/JobStructuredJoinStressTest.kt @@ -5,6 +5,7 @@ package kotlinx.coroutines import org.junit.* +import kotlin.coroutines.* /** * Test a race between job failure and join. @@ -12,22 +13,52 @@ import org.junit.* * See [#1123](https://github.com/Kotlin/kotlinx.coroutines/issues/1123). */ class JobStructuredJoinStressTest : TestBase() { - private val nRepeats = 1_000 * stressTestMultiplier + private val nRepeats = 10_000 * stressTestMultiplier @Test - fun testStress() { - repeat(nRepeats) { + fun testStressRegularJoin() { + stress(Job::join) + } + + @Test + fun testStressSuspendCancellable() { + stress { job -> + suspendCancellableCoroutine { cont -> + job.invokeOnCompletion { cont.resume(Unit) } + } + } + } + + @Test + fun testStressSuspendCancellableReusable() { + stress { job -> + suspendCancellableCoroutineReusable(MODE_CANCELLABLE_REUSABLE) { cont -> + job.invokeOnCompletion { cont.resume(Unit) } + } + } + } + + private fun stress(join: suspend (Job) -> Unit) { + expect(1) + repeat(nRepeats) { index -> assertFailsWith { runBlocking { // launch in background val job = launch(Dispatchers.Default) { throw TestException("OK") // crash } - assertFailsWith { - job.join() + try { + join(job) + error("Should not complete successfully") + } catch (e: CancellationException) { + // must always crash with cancellation exception + expect(2 + index) + } catch (e: Throwable) { + error("Unexpected exception", e) } } } } + finish(2 + nRepeats) } } \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt index 5f5620c632..045dabdea0 100644 --- a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt +++ b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt @@ -11,15 +11,23 @@ import kotlin.coroutines.* import kotlin.test.* class ReusableCancellableContinuationTest : TestBase() { + @Test + fun testAtomicReusable() = runTest { + testContinuationsCount(10, 1) { + suspendCancellableCoroutineReusable(MODE_ATOMIC_REUSABLE, block = it) + } + } @Test - fun testReusable() = runTest { - testContinuationsCount(10, 1, ::suspendAtomicCancellableCoroutineReusable) + fun testCancellableReusable() = runTest { + testContinuationsCount(10, 1) { + suspendCancellableCoroutineReusable(MODE_CANCELLABLE_REUSABLE, block = it) + } } @Test fun testRegular() = runTest { - testContinuationsCount(10, 10, ::suspendAtomicCancellableCoroutine) + testContinuationsCount(10, 10, ::suspendCancellableCoroutine) } private suspend inline fun CoroutineScope.testContinuationsCount( @@ -51,7 +59,7 @@ class ReusableCancellableContinuationTest : TestBase() { fun testCancelledOnClaimedCancel() = runTest { expect(1) try { - suspendAtomicCancellableCoroutineReusable { + suspendCancellableCoroutineReusable(MODE_ATOMIC_REUSABLE) { it.cancel() } expectUnreached() @@ -65,7 +73,7 @@ class ReusableCancellableContinuationTest : TestBase() { expect(1) // Bind child at first var continuation: Continuation<*>? = null - suspendAtomicCancellableCoroutineReusable { + suspendCancellableCoroutineReusable(MODE_ATOMIC_REUSABLE) { expect(2) continuation = it launch { // Attach to the parent, avoid fast path @@ -77,7 +85,7 @@ class ReusableCancellableContinuationTest : TestBase() { ensureActive() // Verify child was bound FieldWalker.assertReachableCount(1, coroutineContext[Job]) { it === continuation } - suspendAtomicCancellableCoroutineReusable { + suspendCancellableCoroutineReusable(MODE_ATOMIC_REUSABLE) { expect(5) coroutineContext[Job]!!.cancel() it.resume(Unit) @@ -93,7 +101,7 @@ class ReusableCancellableContinuationTest : TestBase() { launch { cont!!.resumeWith(Result.success(Unit)) } - suspendAtomicCancellableCoroutineReusable { + suspendCancellableCoroutineReusable(MODE_ATOMIC_REUSABLE) { cont = it } ensureActive() @@ -108,7 +116,7 @@ class ReusableCancellableContinuationTest : TestBase() { launch { // Attach to the parent, avoid fast path cont!!.resumeWith(Result.success(Unit)) } - suspendAtomicCancellableCoroutine { + suspendCancellableCoroutine { cont = it } ensureActive() @@ -121,7 +129,7 @@ class ReusableCancellableContinuationTest : TestBase() { expect(1) var cont: Continuation<*>? = null try { - suspendAtomicCancellableCoroutineReusable { + suspendCancellableCoroutineReusable(MODE_ATOMIC_REUSABLE) { cont = it it.cancel() } @@ -137,7 +145,7 @@ class ReusableCancellableContinuationTest : TestBase() { val currentJob = coroutineContext[Job]!! expect(1) // Bind child at first - suspendAtomicCancellableCoroutineReusable { + suspendCancellableCoroutineReusable(MODE_ATOMIC_REUSABLE) { expect(2) // Attach to the parent, avoid fast path launch { @@ -153,12 +161,12 @@ class ReusableCancellableContinuationTest : TestBase() { assertFalse(isActive) // Child detached FieldWalker.assertReachableCount(0, currentJob) { it is CancellableContinuation<*> } - suspendAtomicCancellableCoroutineReusable { it.resume(Unit) } - suspendAtomicCancellableCoroutineReusable { it.resume(Unit) } + suspendCancellableCoroutineReusable(MODE_ATOMIC_REUSABLE) { it.resume(Unit) } + suspendCancellableCoroutineReusable(MODE_ATOMIC_REUSABLE) { it.resume(Unit) } FieldWalker.assertReachableCount(0, currentJob) { it is CancellableContinuation<*> } try { - suspendAtomicCancellableCoroutineReusable {} + suspendCancellableCoroutineReusable(MODE_ATOMIC_REUSABLE) {} } catch (e: CancellationException) { FieldWalker.assertReachableCount(0, currentJob) { it is CancellableContinuation<*> } finish(5)