Skip to content

Commit 7d01777

Browse files
committed
Ensure that flowOn does not resume downstream after cancellation
This is a problematic for Android when Main dispatcher is cancelled on destroyed activity. Atomic nature of the underlying channels is designed to prevent loss of elements, which is really not an issue for a typical application, but creates problem when used with channels. This change introduces an optional `atomic` parameter to an internal `ReceiveChannel.receiveOrClosed` method to control cancellation atomicity of this function and tweaks `emitAll(ReceiveChannel)` implementation to abandon atomicity in favor of predictable cancellation. Fixes #1265
1 parent bf9509d commit 7d01777

File tree

12 files changed

+73
-33
lines changed

12 files changed

+73
-33
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+6-1
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,7 @@ public abstract interface class kotlinx/coroutines/channels/ActorScope : kotlinx
537537

538538
public final class kotlinx/coroutines/channels/ActorScope$DefaultImpls {
539539
public static synthetic fun cancel (Lkotlinx/coroutines/channels/ActorScope;)V
540+
public static synthetic fun receiveOrClosed (Lkotlinx/coroutines/channels/ActorScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
540541
}
541542

542543
public abstract interface class kotlinx/coroutines/channels/BroadcastChannel : kotlinx/coroutines/channels/SendChannel {
@@ -572,6 +573,7 @@ public abstract interface class kotlinx/coroutines/channels/Channel : kotlinx/co
572573

573574
public final class kotlinx/coroutines/channels/Channel$DefaultImpls {
574575
public static synthetic fun cancel (Lkotlinx/coroutines/channels/Channel;)V
576+
public static synthetic fun receiveOrClosed (Lkotlinx/coroutines/channels/Channel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
575577
}
576578

577579
public final class kotlinx/coroutines/channels/Channel$Factory {
@@ -768,14 +770,17 @@ public abstract interface class kotlinx/coroutines/channels/ReceiveChannel {
768770
public abstract fun iterator ()Lkotlinx/coroutines/channels/ChannelIterator;
769771
public abstract fun poll ()Ljava/lang/Object;
770772
public abstract fun receive (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
771-
public abstract fun receiveOrClosed (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
773+
public abstract synthetic fun receiveOrClosed (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
774+
public abstract fun receiveOrClosed (ZLkotlin/coroutines/Continuation;)Ljava/lang/Object;
772775
public abstract fun receiveOrNull (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
773776
}
774777

775778
public final class kotlinx/coroutines/channels/ReceiveChannel$DefaultImpls {
776779
public static synthetic fun cancel (Lkotlinx/coroutines/channels/ReceiveChannel;)V
777780
public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/lang/Throwable;ILjava/lang/Object;)Z
778781
public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/concurrent/CancellationException;ILjava/lang/Object;)V
782+
public static synthetic fun receiveOrClosed (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
783+
public static synthetic fun receiveOrClosed$default (Lkotlinx/coroutines/channels/ReceiveChannel;ZLkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
779784
}
780785

781786
public abstract interface class kotlinx/coroutines/channels/SendChannel {

kotlinx-coroutines-core/common/src/CancellableContinuation.kt

+8-5
Original file line numberDiff line numberDiff line change
@@ -227,17 +227,20 @@ public suspend inline fun <T> suspendAtomicCancellableCoroutine(
227227
* Suspends coroutine similar to [suspendAtomicCancellableCoroutine], but an instance of [CancellableContinuationImpl] is reused if possible.
228228
*/
229229
internal suspend inline fun <T> suspendAtomicCancellableCoroutineReusable(
230+
resumeMode: Int = MODE_ATOMIC_DEFAULT,
230231
crossinline block: (CancellableContinuation<T>) -> Unit
231232
): T = suspendCoroutineUninterceptedOrReturn { uCont ->
232-
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted())
233+
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted(), resumeMode)
233234
block(cancellable)
234235
cancellable.getResult()
235236
}
236237

237-
internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>): CancellableContinuationImpl<T> {
238+
internal fun <T> getOrCreateCancellableContinuation(
239+
delegate: Continuation<T>, resumeMode: Int
240+
): CancellableContinuationImpl<T> {
238241
// If used outside of our dispatcher
239242
if (delegate !is DispatchedContinuation<T>) {
240-
return CancellableContinuationImpl(delegate, resumeMode = MODE_ATOMIC_DEFAULT)
243+
return CancellableContinuationImpl(delegate, resumeMode)
241244
}
242245
/*
243246
* Attempt to claim reusable instance.
@@ -253,8 +256,8 @@ internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>):
253256
* thus leaking CC instance for indefinite time.
254257
* 2) Continuation was cancelled. Then we should prevent any further reuse and bail out.
255258
*/
256-
return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetState() }
257-
?: return CancellableContinuationImpl(delegate, MODE_ATOMIC_DEFAULT)
259+
return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetState(resumeMode) }
260+
?: return CancellableContinuationImpl(delegate, resumeMode)
258261
}
259262

260263
/**

kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt

+4-3
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ internal open class CancellableContinuationImpl<in T>(
9191
* Resets cancellability state in order to [suspendAtomicCancellableCoroutineReusable] to work.
9292
* Invariant: used only by [suspendAtomicCancellableCoroutineReusable] in [REUSABLE_CLAIMED] state.
9393
*/
94-
internal fun resetState(): Boolean {
94+
internal fun resetState(resumeMode: Int): Boolean {
9595
assert { parentHandle !== NonDisposableHandle }
9696
val state = _state.value
9797
assert { state !is NotCompleted }
@@ -101,6 +101,7 @@ internal open class CancellableContinuationImpl<in T>(
101101
}
102102
_decision.value = UNDECIDED
103103
_state.value = Active
104+
this.resumeMode = resumeMode
104105
return true
105106
}
106107

@@ -128,7 +129,7 @@ internal open class CancellableContinuationImpl<in T>(
128129

129130
private fun checkCompleted(): Boolean {
130131
val completed = isCompleted
131-
if (resumeMode != MODE_ATOMIC_DEFAULT) return completed // Do not check postponed cancellation for non-reusable continuations
132+
if (!resumeMode.isReusableMode) return completed // Do not check postponed cancellation for non-reusable continuations
132133
val dispatched = delegate as? DispatchedContinuation<*> ?: return completed
133134
val cause = dispatched.checkPostponedCancellation(this) ?: return completed
134135
if (!completed) {
@@ -157,7 +158,7 @@ internal open class CancellableContinuationImpl<in T>(
157158
* Attempt to postpone cancellation for reusable cancellable continuation
158159
*/
159160
private fun cancelLater(cause: Throwable): Boolean {
160-
if (resumeMode != MODE_ATOMIC_DEFAULT) return false
161+
if (!resumeMode.isReusableMode) return false
161162
val dispatched = (delegate as? DispatchedContinuation<*>) ?: return false
162163
return dispatched.postponeCancellation(cause)
163164
}

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+7-5
Original file line numberDiff line numberDiff line change
@@ -559,11 +559,13 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
559559
@Suppress("UNCHECKED_CAST")
560560
if (result !== POLL_FAILED && result !is Closed<*>) return result as E
561561
// slow-path does suspend
562-
return receiveSuspend(RECEIVE_THROWS_ON_CLOSE)
562+
return receiveSuspend(RECEIVE_THROWS_ON_CLOSE, MODE_ATOMIC_DEFAULT)
563563
}
564564

565565
@Suppress("UNCHECKED_CAST")
566-
private suspend fun <R> receiveSuspend(receiveMode: Int): R = suspendAtomicCancellableCoroutineReusable sc@ { cont ->
566+
private suspend fun <R> receiveSuspend(
567+
receiveMode: Int, resumeMode: Int
568+
): R = suspendAtomicCancellableCoroutineReusable(resumeMode) sc@ { cont ->
567569
val receive = ReceiveElement<E>(cont as CancellableContinuation<Any?>, receiveMode)
568570
while (true) {
569571
if (enqueueReceive(receive)) {
@@ -597,7 +599,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
597599
@Suppress("UNCHECKED_CAST")
598600
if (result !== POLL_FAILED && result !is Closed<*>) return result as E
599601
// slow-path does suspend
600-
return receiveSuspend(RECEIVE_NULL_ON_CLOSE)
602+
return receiveSuspend(RECEIVE_NULL_ON_CLOSE, MODE_ATOMIC_DEFAULT)
601603
}
602604

603605
@Suppress("UNCHECKED_CAST")
@@ -610,12 +612,12 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
610612
}
611613

612614
@Suppress("UNCHECKED_CAST")
613-
public final override suspend fun receiveOrClosed(): ValueOrClosed<E> {
615+
public final override suspend fun receiveOrClosed(atomic: Boolean): ValueOrClosed<E> {
614616
// fast path -- try poll non-blocking
615617
val result = pollInternal()
616618
if (result !== POLL_FAILED) return result.toResult()
617619
// slow-path does suspend
618-
return receiveSuspend(RECEIVE_RESULT)
620+
return receiveSuspend(RECEIVE_RESULT, if (atomic) MODE_ATOMIC_DEFAULT else MODE_CANCELLABLE_REUSABLE)
619621
}
620622

621623
@Suppress("UNCHECKED_CAST")

kotlinx-coroutines-core/common/src/channels/Channel.kt

+6-3
Original file line numberDiff line numberDiff line change
@@ -251,8 +251,8 @@ public interface ReceiveChannel<out E> {
251251
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
252252
* function is suspended, this function immediately resumes with a [CancellationException].
253253
*
254-
* *Cancellation of suspended `receive` is atomic*: when this function
255-
* throws a [CancellationException], it means that the element was not retrieved from this channel.
254+
* If [atomic] is set to `true` (by default) then *cancellation of suspended `receive` is atomic*:
255+
* when this function throws a [CancellationException], it means that the element was not retrieved from this channel.
256256
* As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
257257
* continue to execute even after it was cancelled from the same thread in the case when this receive operation
258258
* was already resumed and the continuation was posted for execution to the thread's queue.
@@ -267,7 +267,10 @@ public interface ReceiveChannel<out E> {
267267
* [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed.
268268
*/
269269
@InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed
270-
public suspend fun receiveOrClosed(): ValueOrClosed<E>
270+
public suspend fun receiveOrClosed(atomic: Boolean = true): ValueOrClosed<E>
271+
272+
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility") // Since version 1.4.0
273+
public suspend fun receiveOrClosed(): ValueOrClosed<E> = receiveOrClosed(atomic = true)
271274

272275
/**
273276
* Clause for the [select] expression of the [receiveOrClosed] suspending function that selects with the [ValueOrClosed] with a value

kotlinx-coroutines-core/common/src/flow/Channels.kt

+6-3
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
2020
* the channel afterwards. If you need to iterate over the channel without consuming it,
2121
* a regular `for` loop should be used instead.
2222
*
23-
* This function provides a more efficient shorthand for `channel.consumeEach { value -> emit(value) }`.
24-
* See [consumeEach][ReceiveChannel.consumeEach].
23+
* Note, that emitting values from a channel into a flow is not atomic. A value that was received from the
24+
* channel many not reach the flow collector if it was cancelled and will be lost.
25+
*
26+
* This function provides a more efficient shorthand for `channel.consumeEach { value -> emit(value) }` modulo
27+
* atomicity. See [consumeEach][ReceiveChannel.consumeEach].
2528
*/
2629
@ExperimentalCoroutinesApi // since version 1.3.0
2730
public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>) =
@@ -45,7 +48,7 @@ private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>,
4548
// L$1 <- channel
4649
// L$2 <- cause
4750
// L$3 <- this$run (actually equal to this)
48-
val result = run { channel.receiveOrClosed() }
51+
val result = run { channel.receiveOrClosed(atomic = false) }
4952
if (result.isClosed) {
5053
result.closeCause?.let { throw it }
5154
break // returns normally when result.closeCause == null

kotlinx-coroutines-core/common/src/flow/operators/Context.kt

+5-1
Original file line numberDiff line numberDiff line change
@@ -170,13 +170,17 @@ public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
170170
*
171171
* For more explanation of context preservation please refer to [Flow] documentation.
172172
*
173-
* This operators retains a _sequential_ nature of flow if changing the context does not call for changing
173+
* This operator retains a _sequential_ nature of flow if changing the context does not call for changing
174174
* the [dispatcher][CoroutineDispatcher]. Otherwise, if changing dispatcher is required, it collects
175175
* flow emissions in one coroutine that is run using a specified [context] and emits them from another coroutines
176176
* with the original collector's context using a channel with a [default][Channel.BUFFERED] buffer size
177177
* between two coroutines similarly to [buffer] operator, unless [buffer] operator is explicitly called
178178
* before or after `flowOn`, which requests buffering behavior and specifies channel size.
179179
*
180+
* Note, that flows operating across different dispatchers might lose some in-flight elements when cancelled.
181+
* In particular, this operator ensures that downstream flow does not resume on cancellation even if the element
182+
* was already emitted by the upstream flow.
183+
*
180184
* ### Operator fusion
181185
*
182186
* Adjacent applications of [channelFlow], [flowOn], [buffer], [produceIn], and [broadcastIn] are

kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt

+11-6
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,17 @@ import kotlinx.coroutines.internal.*
88
import kotlin.coroutines.*
99
import kotlin.jvm.*
1010

11-
@PublishedApi internal const val MODE_ATOMIC_DEFAULT = 0 // schedule non-cancellable dispatch for suspendCoroutine
12-
@PublishedApi internal const val MODE_CANCELLABLE = 1 // schedule cancellable dispatch for suspendCancellableCoroutine
13-
@PublishedApi internal const val MODE_UNDISPATCHED = 2 // when the thread is right, but need to mark it with current coroutine
11+
@PublishedApi
12+
internal const val MODE_ATOMIC_DEFAULT = 0 // schedule non-cancellable dispatch for suspendCoroutine
13+
@PublishedApi
14+
internal const val MODE_CANCELLABLE = 1 // schedule cancellable dispatch for suspendCancellableCoroutine
1415

15-
internal val Int.isCancellableMode get() = this == MODE_CANCELLABLE
16-
internal val Int.isDispatchedMode get() = this == MODE_ATOMIC_DEFAULT || this == MODE_CANCELLABLE
16+
internal const val MODE_CANCELLABLE_REUSABLE = 2 // same as MODE_CANCELLABLE but supports reused
17+
internal const val MODE_UNDISPATCHED = 3 // when the thread is right, but need to mark it with current coroutine
18+
19+
internal val Int.isCancellableMode get() = this == MODE_CANCELLABLE || this == MODE_CANCELLABLE_REUSABLE
20+
internal val Int.isDispatchedMode get() = this != MODE_UNDISPATCHED
21+
internal val Int.isReusableMode get() = this == MODE_ATOMIC_DEFAULT || this == MODE_CANCELLABLE_REUSABLE
1722

1823
internal abstract class DispatchedTask<in T>(
1924
@JvmField public var resumeMode: Int
@@ -120,7 +125,7 @@ internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, useMode: In
120125
val result = if (exception != null) Result.failure(exception) else Result.success(state as T)
121126
when (useMode) {
122127
MODE_ATOMIC_DEFAULT -> delegate.resumeWith(result)
123-
MODE_CANCELLABLE -> delegate.resumeCancellableWith(result)
128+
MODE_CANCELLABLE, MODE_CANCELLABLE_REUSABLE -> delegate.resumeCancellableWith(result)
124129
MODE_UNDISPATCHED -> (delegate as DispatchedContinuation).resumeUndispatchedWith(result)
125130
else -> error("Invalid mode $useMode")
126131
}

kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ private class ChannelViaBroadcast<E>(
3939

4040
override suspend fun receive(): E = sub.receive()
4141
override suspend fun receiveOrNull(): E? = sub.receiveOrNull()
42-
override suspend fun receiveOrClosed(): ValueOrClosed<E> = sub.receiveOrClosed()
42+
override suspend fun receiveOrClosed(atomic: Boolean): ValueOrClosed<E> = sub.receiveOrClosed(atomic)
4343
override fun poll(): E? = sub.poll()
4444
override fun iterator(): ChannelIterator<E> = sub.iterator()
4545

kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt

+3-4
Original file line numberDiff line numberDiff line change
@@ -134,15 +134,14 @@ class CatchTest : TestBase() {
134134
// flowOn with a different dispatcher introduces asynchrony so that all exceptions in the
135135
// upstream flows are handled before they go downstream
136136
.onEach { value ->
137-
expect(8)
138-
assertEquals("OK", value)
137+
expectUnreached() // already cancelled
139138
}
140139
.catch { e ->
141-
expect(9)
140+
expect(8)
142141
assertTrue(e is TestException)
143142
assertSame(d0, kotlin.coroutines.coroutineContext[ContinuationInterceptor] as CoroutineContext)
144143
}
145144
.collect()
146-
finish(10)
145+
finish(9)
147146
}
148147
}

kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt

+15
Original file line numberDiff line numberDiff line change
@@ -314,4 +314,19 @@ class FlowOnTest : TestBase() {
314314
assertEquals(expected, value)
315315
}
316316
}
317+
318+
@Test
319+
fun testCancelledFlowOn() = runTest {
320+
assertFailsWith<CancellationException> {
321+
coroutineScope {
322+
flow {
323+
emit(Unit) // emit to buffer
324+
cancel() // now cancel
325+
}.flowOn(wrapperDispatcher()).collect {
326+
// should not be reached, because cancelled before it runs
327+
expectUnreached()
328+
}
329+
}
330+
}
331+
}
317332
}

kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class ReusableCancellableContinuationTest : TestBase() {
1414

1515
@Test
1616
fun testReusable() = runTest {
17-
testContinuationsCount(10, 1, ::suspendAtomicCancellableCoroutineReusable)
17+
testContinuationsCount(10, 1) { suspendAtomicCancellableCoroutineReusable(block = it) }
1818
}
1919

2020
@Test

0 commit comments

Comments
 (0)