Skip to content

Ensure that flowOn does not resume downstream after cancellation #1730

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ class NonCancellableChannel : SimpleChannel() {
}

class CancellableChannel : SimpleChannel() {
override suspend fun suspendReceive(): Int = suspendAtomicCancellableCoroutine {
override suspend fun suspendReceive(): Int = suspendCancellableCoroutine {
consumer = it.intercepted()
COROUTINE_SUSPENDED
}

override suspend fun suspendSend(element: Int) = suspendAtomicCancellableCoroutine<Unit> {
override suspend fun suspendSend(element: Int) = suspendCancellableCoroutine<Unit> {
enqueuedValue = element
producer = it.intercepted()
COROUTINE_SUSPENDED
Expand All @@ -84,13 +84,13 @@ class CancellableChannel : SimpleChannel() {

class CancellableReusableChannel : SimpleChannel() {
@Suppress("INVISIBLE_MEMBER")
override suspend fun suspendReceive(): Int = suspendAtomicCancellableCoroutineReusable {
override suspend fun suspendReceive(): Int = suspendCancellableCoroutineReusable(MODE_ATOMIC_REUSABLE) {
consumer = it.intercepted()
COROUTINE_SUSPENDED
}

@Suppress("INVISIBLE_MEMBER")
override suspend fun suspendSend(element: Int) = suspendAtomicCancellableCoroutineReusable<Unit> {
override suspend fun suspendSend(element: Int) = suspendCancellableCoroutineReusable<Unit>(MODE_ATOMIC_REUSABLE) {
enqueuedValue = element
producer = it.intercepted()
COROUTINE_SUSPENDED
Expand Down
10 changes: 6 additions & 4 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,6 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/

public final class kotlinx/coroutines/CancellableContinuationKt {
public static final fun disposeOnCancellation (Lkotlinx/coroutines/CancellableContinuation;Lkotlinx/coroutines/DisposableHandle;)V
public static final fun suspendAtomicCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun suspendAtomicCancellableCoroutine (ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun suspendAtomicCancellableCoroutine$default (ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun suspendCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

Expand Down Expand Up @@ -548,6 +545,7 @@ public abstract interface class kotlinx/coroutines/channels/ActorScope : kotlinx

public final class kotlinx/coroutines/channels/ActorScope$DefaultImpls {
public static synthetic fun cancel (Lkotlinx/coroutines/channels/ActorScope;)V
public static synthetic fun receiveOrClosed (Lkotlinx/coroutines/channels/ActorScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class kotlinx/coroutines/channels/BroadcastChannel : kotlinx/coroutines/channels/SendChannel {
Expand Down Expand Up @@ -583,6 +581,7 @@ public abstract interface class kotlinx/coroutines/channels/Channel : kotlinx/co

public final class kotlinx/coroutines/channels/Channel$DefaultImpls {
public static synthetic fun cancel (Lkotlinx/coroutines/channels/Channel;)V
public static synthetic fun receiveOrClosed (Lkotlinx/coroutines/channels/Channel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/channels/Channel$Factory {
Expand Down Expand Up @@ -779,14 +778,17 @@ public abstract interface class kotlinx/coroutines/channels/ReceiveChannel {
public abstract fun iterator ()Lkotlinx/coroutines/channels/ChannelIterator;
public abstract fun poll ()Ljava/lang/Object;
public abstract fun receive (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun receiveOrClosed (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract synthetic fun receiveOrClosed (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun receiveOrClosed (ZLkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun receiveOrNull (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/channels/ReceiveChannel$DefaultImpls {
public static synthetic fun cancel (Lkotlinx/coroutines/channels/ReceiveChannel;)V
public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/lang/Throwable;ILjava/lang/Object;)Z
public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/concurrent/CancellationException;ILjava/lang/Object;)V
public static synthetic fun receiveOrClosed (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun receiveOrClosed$default (Lkotlinx/coroutines/channels/ReceiveChannel;ZLkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
}

public abstract interface class kotlinx/coroutines/channels/SendChannel {
Expand Down
58 changes: 18 additions & 40 deletions kotlinx-coroutines-core/common/src/CancellableContinuation.kt
Original file line number Diff line number Diff line change
Expand Up @@ -204,40 +204,32 @@ public suspend inline fun <T> suspendCancellableCoroutine(
}

/**
* Suspends the coroutine like [suspendCancellableCoroutine], but with *atomic cancellation*.
* Suspends the coroutine similar to [suspendCancellableCoroutine], but an instance of
* [CancellableContinuationImpl] is reused.
*
* When the suspended function throws a [CancellationException], it means that the continuation was not resumed.
* As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
* continue to execute even after it was cancelled from the same thread in the case when the continuation
* was already resumed and was posted for execution to the thread's queue.
*
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
public suspend inline fun <T> suspendAtomicCancellableCoroutine(
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_ATOMIC_DEFAULT)
block(cancellable)
cancellable.getResult()
}

/**
* Suspends coroutine similar to [suspendAtomicCancellableCoroutine], but an instance of [CancellableContinuationImpl] is reused if possible.
* * when [resumeMode] is [MODE_CANCELLABLE_REUSABLE] works like [suspendCancellableCoroutine].
* * when [resumeMode] is [MODE_ATOMIC_REUSABLE] it has *atomic cancellation*.
* When the suspended function throws a [CancellationException], it means that the continuation was not resumed.
* As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
* continue to execute even after it was cancelled from the same thread in the case when the continuation
* was already resumed and was posted for execution to the thread's queue.
*/
internal suspend inline fun <T> suspendAtomicCancellableCoroutineReusable(
internal suspend inline fun <T> suspendCancellableCoroutineReusable(
resumeMode: Int,
crossinline block: (CancellableContinuation<T>) -> Unit
): T = suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted())
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted(), resumeMode)
block(cancellable)
cancellable.getResult()
}

internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>): CancellableContinuationImpl<T> {
internal fun <T> getOrCreateCancellableContinuation(
delegate: Continuation<T>, resumeMode: Int
): CancellableContinuationImpl<T> {
assert { resumeMode.isReusableMode }
// If used outside of our dispatcher
if (delegate !is DispatchedContinuation<T>) {
return CancellableContinuationImpl(delegate, resumeMode = MODE_ATOMIC_DEFAULT)
return CancellableContinuationImpl(delegate, resumeMode)
}
/*
* Attempt to claim reusable instance.
Expand All @@ -253,24 +245,10 @@ internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>):
* thus leaking CC instance for indefinite time.
* 2) Continuation was cancelled. Then we should prevent any further reuse and bail out.
*/
return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetState() }
?: return CancellableContinuationImpl(delegate, MODE_ATOMIC_DEFAULT)
return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetState(resumeMode) }
?: return CancellableContinuationImpl(delegate, resumeMode)
}

/**
* @suppress **Deprecated**
*/
@Deprecated(
message = "holdCancellability parameter is deprecated and is no longer used",
replaceWith = ReplaceWith("suspendAtomicCancellableCoroutine(block)")
)
@InternalCoroutinesApi
public suspend inline fun <T> suspendAtomicCancellableCoroutine(
holdCancellability: Boolean = false,
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
suspendAtomicCancellableCoroutine(block)

/**
* Removes the specified [node] on cancellation.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ internal open class CancellableContinuationImpl<in T>(
private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable(this)

/**
* Resets cancellability state in order to [suspendAtomicCancellableCoroutineReusable] to work.
* Invariant: used only by [suspendAtomicCancellableCoroutineReusable] in [REUSABLE_CLAIMED] state.
* Resets cancellability state in order to [suspendCancellableCoroutineReusable] to work.
* Invariant: used only by [suspendCancellableCoroutineReusable] in [REUSABLE_CLAIMED] state.
*/
@JvmName("resetState") // Prettier stack traces
internal fun resetState(): Boolean {
internal fun resetState(resumeMode: Int): Boolean {
assert { parentHandle !== NonDisposableHandle }
val state = _state.value
assert { state !is NotCompleted }
Expand All @@ -102,6 +102,7 @@ internal open class CancellableContinuationImpl<in T>(
}
_decision.value = UNDECIDED
_state.value = Active
this.resumeMode = resumeMode
return true
}

Expand Down Expand Up @@ -129,7 +130,7 @@ internal open class CancellableContinuationImpl<in T>(

private fun checkCompleted(): Boolean {
val completed = isCompleted
if (resumeMode != MODE_ATOMIC_DEFAULT) return completed // Do not check postponed cancellation for non-reusable continuations
if (!resumeMode.isReusableMode) return completed // Do not check postponed cancellation for non-reusable continuations
val dispatched = delegate as? DispatchedContinuation<*> ?: return completed
val cause = dispatched.checkPostponedCancellation(this) ?: return completed
if (!completed) {
Expand Down Expand Up @@ -158,7 +159,7 @@ internal open class CancellableContinuationImpl<in T>(
* Attempt to postpone cancellation for reusable cancellable continuation
*/
private fun cancelLater(cause: Throwable): Boolean {
if (resumeMode != MODE_ATOMIC_DEFAULT) return false
if (!resumeMode.isReusableMode) return false
val dispatched = (delegate as? DispatchedContinuation<*>) ?: return false
return dispatched.postponeCancellation(cause)
}
Expand All @@ -173,7 +174,7 @@ internal open class CancellableContinuationImpl<in T>(
if (state is CancelHandler) invokeHandlerSafely { state.invoke(cause) }
// Complete state update
detachChildIfNonResuable()
dispatchResume(mode = MODE_ATOMIC_DEFAULT)
dispatchResume(mode = MODE_ATOMIC) // no need for additional cancellation checks
return true
}
}
Expand Down Expand Up @@ -231,10 +232,10 @@ internal open class CancellableContinuationImpl<in T>(
val state = this.state
if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this)
// if the parent job was already cancelled, then throw the corresponding cancellation exception
// otherwise, there is a race is suspendCancellableCoroutine { cont -> ... } does cont.resume(...)
// otherwise, there is a race if suspendCancellableCoroutine { cont -> ... } does cont.resume(...)
// before the block returns. This getResult would return a result as opposed to cancellation
// exception that should have happened if the continuation is dispatched for execution later.
if (resumeMode == MODE_CANCELLABLE) {
if (resumeMode.isCancellableMode) {
val job = context[Job]
if (job != null && !job.isActive) {
val cause = job.getCancellationException()
Expand Down
16 changes: 9 additions & 7 deletions kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
return closed.sendException
}

private suspend fun sendSuspend(element: E): Unit = suspendAtomicCancellableCoroutineReusable sc@ { cont ->
private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutineReusable(MODE_ATOMIC_REUSABLE) sc@ { cont ->
loop@ while (true) {
if (isFullImpl) {
val send = SendElement(element, cont)
Expand Down Expand Up @@ -543,11 +543,13 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
@Suppress("UNCHECKED_CAST")
if (result !== POLL_FAILED && result !is Closed<*>) return result as E
// slow-path does suspend
return receiveSuspend(RECEIVE_THROWS_ON_CLOSE)
return receiveSuspend(RECEIVE_THROWS_ON_CLOSE, MODE_ATOMIC_REUSABLE)
}

@Suppress("UNCHECKED_CAST")
private suspend fun <R> receiveSuspend(receiveMode: Int): R = suspendAtomicCancellableCoroutineReusable sc@ { cont ->
private suspend fun <R> receiveSuspend(
receiveMode: Int, resumeMode: Int
): R = suspendCancellableCoroutineReusable(resumeMode) sc@ { cont ->
val receive = ReceiveElement<E>(cont as CancellableContinuation<Any?>, receiveMode)
while (true) {
if (enqueueReceive(receive)) {
Expand Down Expand Up @@ -581,7 +583,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
@Suppress("UNCHECKED_CAST")
if (result !== POLL_FAILED && result !is Closed<*>) return result as E
// slow-path does suspend
return receiveSuspend(RECEIVE_NULL_ON_CLOSE)
return receiveSuspend(RECEIVE_NULL_ON_CLOSE, MODE_ATOMIC_REUSABLE)
}

@Suppress("UNCHECKED_CAST")
Expand All @@ -594,12 +596,12 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
}

@Suppress("UNCHECKED_CAST")
public final override suspend fun receiveOrClosed(): ValueOrClosed<E> {
public final override suspend fun receiveOrClosed(atomic: Boolean): ValueOrClosed<E> {
// fast path -- try poll non-blocking
val result = pollInternal()
if (result !== POLL_FAILED) return result.toResult()
// slow-path does suspend
return receiveSuspend(RECEIVE_RESULT)
return receiveSuspend(RECEIVE_RESULT, if (atomic) MODE_ATOMIC_REUSABLE else MODE_CANCELLABLE_REUSABLE)
}

@Suppress("UNCHECKED_CAST")
Expand Down Expand Up @@ -814,7 +816,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
return true
}

private suspend fun hasNextSuspend(): Boolean = suspendAtomicCancellableCoroutineReusable sc@ { cont ->
private suspend fun hasNextSuspend(): Boolean = suspendCancellableCoroutineReusable(MODE_ATOMIC_REUSABLE) sc@ { cont ->
val receive = ReceiveHasNext(this, cont)
while (true) {
if (channel.enqueueReceive(receive)) {
Expand Down
9 changes: 6 additions & 3 deletions kotlinx-coroutines-core/common/src/channels/Channel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,8 @@ public interface ReceiveChannel<out E> {
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
* function is suspended, this function immediately resumes with a [CancellationException].
*
* *Cancellation of suspended `receive` is atomic*: when this function
* throws a [CancellationException], it means that the element was not retrieved from this channel.
* If [atomic] is set to `true` (by default) then *cancellation of suspended `receive` is atomic*:
* when this function throws a [CancellationException], it means that the element was not retrieved from this channel.
* As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
* continue to execute even after it was cancelled from the same thread in the case when this receive operation
* was already resumed and the continuation was posted for execution to the thread's queue.
Expand All @@ -267,7 +267,10 @@ public interface ReceiveChannel<out E> {
* [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed.
*/
@InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed
public suspend fun receiveOrClosed(): ValueOrClosed<E>
public suspend fun receiveOrClosed(atomic: Boolean = true): ValueOrClosed<E>

@Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility") // Since version 1.4.0
public suspend fun receiveOrClosed(): ValueOrClosed<E> = receiveOrClosed(atomic = true)

/**
* Clause for the [select] expression of the [receiveOrClosed] suspending function that selects with the [ValueOrClosed] with a value
Expand Down
9 changes: 6 additions & 3 deletions kotlinx-coroutines-core/common/src/flow/Channels.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
* the channel afterwards. If you need to iterate over the channel without consuming it,
* a regular `for` loop should be used instead.
*
* This function provides a more efficient shorthand for `channel.consumeEach { value -> emit(value) }`.
* See [consumeEach][ReceiveChannel.consumeEach].
* Note, that emitting values from a channel into a flow is not atomic. A value that was received from the
* channel many not reach the flow collector if it was cancelled and will be lost.
*
* This function provides a more efficient shorthand for `channel.consumeEach { value -> emit(value) }` modulo
* atomicity. See [consumeEach][ReceiveChannel.consumeEach].
*/
@ExperimentalCoroutinesApi // since version 1.3.0
public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>): Unit =
Expand All @@ -45,7 +48,7 @@ private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>,
// L$1 <- channel
// L$2 <- cause
// L$3 <- this$run (actually equal to this)
val result = run { channel.receiveOrClosed() }
val result = run { channel.receiveOrClosed(atomic = false) }
if (result.isClosed) {
result.closeCause?.let { throw it }
break // returns normally when result.closeCause == null
Expand Down
6 changes: 5 additions & 1 deletion kotlinx-coroutines-core/common/src/flow/operators/Context.kt
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,17 @@ public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
*
* For more explanation of context preservation please refer to [Flow] documentation.
*
* This operators retains a _sequential_ nature of flow if changing the context does not call for changing
* This operator retains a _sequential_ nature of flow if changing the context does not call for changing
* the [dispatcher][CoroutineDispatcher]. Otherwise, if changing dispatcher is required, it collects
* flow emissions in one coroutine that is run using a specified [context] and emits them from another coroutines
* with the original collector's context using a channel with a [default][Channel.BUFFERED] buffer size
* between two coroutines similarly to [buffer] operator, unless [buffer] operator is explicitly called
* before or after `flowOn`, which requests buffering behavior and specifies channel size.
*
* Note, that flows operating across different dispatchers might lose some in-flight elements when cancelled.
* In particular, this operator ensures that downstream flow does not resume on cancellation even if the element
* was already emitted by the upstream flow.
*
* ### Operator fusion
*
* Adjacent applications of [channelFlow], [flowOn], [buffer], [produceIn], and [broadcastIn] are
Expand Down
Loading