Skip to content

Commit 1952649

Browse files
committed
Ensure that flowOn does not resume downstream after cancellation
This is a problematic for Android when Main dispatcher is cancelled on destroyed activity. Atomic nature of the underlying channels is designed to prevent loss of elements, which is really not an issue for a typical application, but creates problem when used with channels. This change introduces an optional `atomic` parameter to an internal `ReceiveChannel.receiveOrClosed` method to control cancellation atomicity of this function and tweaks `emitAll(ReceiveChannel)` implementation to abandon atomicity in favor of predictable cancellation. Fixes #1265
1 parent 532368f commit 1952649

File tree

12 files changed

+73
-33
lines changed

12 files changed

+73
-33
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+6-1
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,7 @@ public abstract interface class kotlinx/coroutines/channels/ActorScope : kotlinx
548548

549549
public final class kotlinx/coroutines/channels/ActorScope$DefaultImpls {
550550
public static synthetic fun cancel (Lkotlinx/coroutines/channels/ActorScope;)V
551+
public static synthetic fun receiveOrClosed (Lkotlinx/coroutines/channels/ActorScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
551552
}
552553

553554
public abstract interface class kotlinx/coroutines/channels/BroadcastChannel : kotlinx/coroutines/channels/SendChannel {
@@ -583,6 +584,7 @@ public abstract interface class kotlinx/coroutines/channels/Channel : kotlinx/co
583584

584585
public final class kotlinx/coroutines/channels/Channel$DefaultImpls {
585586
public static synthetic fun cancel (Lkotlinx/coroutines/channels/Channel;)V
587+
public static synthetic fun receiveOrClosed (Lkotlinx/coroutines/channels/Channel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
586588
}
587589

588590
public final class kotlinx/coroutines/channels/Channel$Factory {
@@ -779,14 +781,17 @@ public abstract interface class kotlinx/coroutines/channels/ReceiveChannel {
779781
public abstract fun iterator ()Lkotlinx/coroutines/channels/ChannelIterator;
780782
public abstract fun poll ()Ljava/lang/Object;
781783
public abstract fun receive (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
782-
public abstract fun receiveOrClosed (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
784+
public abstract synthetic fun receiveOrClosed (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
785+
public abstract fun receiveOrClosed (ZLkotlin/coroutines/Continuation;)Ljava/lang/Object;
783786
public abstract fun receiveOrNull (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
784787
}
785788

786789
public final class kotlinx/coroutines/channels/ReceiveChannel$DefaultImpls {
787790
public static synthetic fun cancel (Lkotlinx/coroutines/channels/ReceiveChannel;)V
788791
public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/lang/Throwable;ILjava/lang/Object;)Z
789792
public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/concurrent/CancellationException;ILjava/lang/Object;)V
793+
public static synthetic fun receiveOrClosed (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
794+
public static synthetic fun receiveOrClosed$default (Lkotlinx/coroutines/channels/ReceiveChannel;ZLkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
790795
}
791796

792797
public abstract interface class kotlinx/coroutines/channels/SendChannel {

kotlinx-coroutines-core/common/src/CancellableContinuation.kt

+8-5
Original file line numberDiff line numberDiff line change
@@ -227,17 +227,20 @@ public suspend inline fun <T> suspendAtomicCancellableCoroutine(
227227
* Suspends coroutine similar to [suspendAtomicCancellableCoroutine], but an instance of [CancellableContinuationImpl] is reused if possible.
228228
*/
229229
internal suspend inline fun <T> suspendAtomicCancellableCoroutineReusable(
230+
resumeMode: Int = MODE_ATOMIC_DEFAULT,
230231
crossinline block: (CancellableContinuation<T>) -> Unit
231232
): T = suspendCoroutineUninterceptedOrReturn { uCont ->
232-
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted())
233+
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted(), resumeMode)
233234
block(cancellable)
234235
cancellable.getResult()
235236
}
236237

237-
internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>): CancellableContinuationImpl<T> {
238+
internal fun <T> getOrCreateCancellableContinuation(
239+
delegate: Continuation<T>, resumeMode: Int
240+
): CancellableContinuationImpl<T> {
238241
// If used outside of our dispatcher
239242
if (delegate !is DispatchedContinuation<T>) {
240-
return CancellableContinuationImpl(delegate, resumeMode = MODE_ATOMIC_DEFAULT)
243+
return CancellableContinuationImpl(delegate, resumeMode)
241244
}
242245
/*
243246
* Attempt to claim reusable instance.
@@ -253,8 +256,8 @@ internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>):
253256
* thus leaking CC instance for indefinite time.
254257
* 2) Continuation was cancelled. Then we should prevent any further reuse and bail out.
255258
*/
256-
return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetState() }
257-
?: return CancellableContinuationImpl(delegate, MODE_ATOMIC_DEFAULT)
259+
return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetState(resumeMode) }
260+
?: return CancellableContinuationImpl(delegate, resumeMode)
258261
}
259262

260263
/**

kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt

+4-3
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ internal open class CancellableContinuationImpl<in T>(
9292
* Invariant: used only by [suspendAtomicCancellableCoroutineReusable] in [REUSABLE_CLAIMED] state.
9393
*/
9494
@JvmName("resetState") // Prettier stack traces
95-
internal fun resetState(): Boolean {
95+
internal fun resetState(resumeMode: Int): Boolean {
9696
assert { parentHandle !== NonDisposableHandle }
9797
val state = _state.value
9898
assert { state !is NotCompleted }
@@ -102,6 +102,7 @@ internal open class CancellableContinuationImpl<in T>(
102102
}
103103
_decision.value = UNDECIDED
104104
_state.value = Active
105+
this.resumeMode = resumeMode
105106
return true
106107
}
107108

@@ -129,7 +130,7 @@ internal open class CancellableContinuationImpl<in T>(
129130

130131
private fun checkCompleted(): Boolean {
131132
val completed = isCompleted
132-
if (resumeMode != MODE_ATOMIC_DEFAULT) return completed // Do not check postponed cancellation for non-reusable continuations
133+
if (!resumeMode.isReusableMode) return completed // Do not check postponed cancellation for non-reusable continuations
133134
val dispatched = delegate as? DispatchedContinuation<*> ?: return completed
134135
val cause = dispatched.checkPostponedCancellation(this) ?: return completed
135136
if (!completed) {
@@ -158,7 +159,7 @@ internal open class CancellableContinuationImpl<in T>(
158159
* Attempt to postpone cancellation for reusable cancellable continuation
159160
*/
160161
private fun cancelLater(cause: Throwable): Boolean {
161-
if (resumeMode != MODE_ATOMIC_DEFAULT) return false
162+
if (!resumeMode.isReusableMode) return false
162163
val dispatched = (delegate as? DispatchedContinuation<*>) ?: return false
163164
return dispatched.postponeCancellation(cause)
164165
}

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+7-5
Original file line numberDiff line numberDiff line change
@@ -543,11 +543,13 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
543543
@Suppress("UNCHECKED_CAST")
544544
if (result !== POLL_FAILED && result !is Closed<*>) return result as E
545545
// slow-path does suspend
546-
return receiveSuspend(RECEIVE_THROWS_ON_CLOSE)
546+
return receiveSuspend(RECEIVE_THROWS_ON_CLOSE, MODE_ATOMIC_DEFAULT)
547547
}
548548

549549
@Suppress("UNCHECKED_CAST")
550-
private suspend fun <R> receiveSuspend(receiveMode: Int): R = suspendAtomicCancellableCoroutineReusable sc@ { cont ->
550+
private suspend fun <R> receiveSuspend(
551+
receiveMode: Int, resumeMode: Int
552+
): R = suspendAtomicCancellableCoroutineReusable(resumeMode) sc@ { cont ->
551553
val receive = ReceiveElement<E>(cont as CancellableContinuation<Any?>, receiveMode)
552554
while (true) {
553555
if (enqueueReceive(receive)) {
@@ -581,7 +583,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
581583
@Suppress("UNCHECKED_CAST")
582584
if (result !== POLL_FAILED && result !is Closed<*>) return result as E
583585
// slow-path does suspend
584-
return receiveSuspend(RECEIVE_NULL_ON_CLOSE)
586+
return receiveSuspend(RECEIVE_NULL_ON_CLOSE, MODE_ATOMIC_DEFAULT)
585587
}
586588

587589
@Suppress("UNCHECKED_CAST")
@@ -594,12 +596,12 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
594596
}
595597

596598
@Suppress("UNCHECKED_CAST")
597-
public final override suspend fun receiveOrClosed(): ValueOrClosed<E> {
599+
public final override suspend fun receiveOrClosed(atomic: Boolean): ValueOrClosed<E> {
598600
// fast path -- try poll non-blocking
599601
val result = pollInternal()
600602
if (result !== POLL_FAILED) return result.toResult()
601603
// slow-path does suspend
602-
return receiveSuspend(RECEIVE_RESULT)
604+
return receiveSuspend(RECEIVE_RESULT, if (atomic) MODE_ATOMIC_DEFAULT else MODE_CANCELLABLE_REUSABLE)
603605
}
604606

605607
@Suppress("UNCHECKED_CAST")

kotlinx-coroutines-core/common/src/channels/Channel.kt

+6-3
Original file line numberDiff line numberDiff line change
@@ -251,8 +251,8 @@ public interface ReceiveChannel<out E> {
251251
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
252252
* function is suspended, this function immediately resumes with a [CancellationException].
253253
*
254-
* *Cancellation of suspended `receive` is atomic*: when this function
255-
* throws a [CancellationException], it means that the element was not retrieved from this channel.
254+
* If [atomic] is set to `true` (by default) then *cancellation of suspended `receive` is atomic*:
255+
* when this function throws a [CancellationException], it means that the element was not retrieved from this channel.
256256
* As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
257257
* continue to execute even after it was cancelled from the same thread in the case when this receive operation
258258
* was already resumed and the continuation was posted for execution to the thread's queue.
@@ -267,7 +267,10 @@ public interface ReceiveChannel<out E> {
267267
* [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed.
268268
*/
269269
@InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed
270-
public suspend fun receiveOrClosed(): ValueOrClosed<E>
270+
public suspend fun receiveOrClosed(atomic: Boolean = true): ValueOrClosed<E>
271+
272+
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility") // Since version 1.4.0
273+
public suspend fun receiveOrClosed(): ValueOrClosed<E> = receiveOrClosed(atomic = true)
271274

272275
/**
273276
* Clause for the [select] expression of the [receiveOrClosed] suspending function that selects with the [ValueOrClosed] with a value

kotlinx-coroutines-core/common/src/flow/Channels.kt

+6-3
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
2020
* the channel afterwards. If you need to iterate over the channel without consuming it,
2121
* a regular `for` loop should be used instead.
2222
*
23-
* This function provides a more efficient shorthand for `channel.consumeEach { value -> emit(value) }`.
24-
* See [consumeEach][ReceiveChannel.consumeEach].
23+
* Note, that emitting values from a channel into a flow is not atomic. A value that was received from the
24+
* channel many not reach the flow collector if it was cancelled and will be lost.
25+
*
26+
* This function provides a more efficient shorthand for `channel.consumeEach { value -> emit(value) }` modulo
27+
* atomicity. See [consumeEach][ReceiveChannel.consumeEach].
2528
*/
2629
@ExperimentalCoroutinesApi // since version 1.3.0
2730
public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>): Unit =
@@ -45,7 +48,7 @@ private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>,
4548
// L$1 <- channel
4649
// L$2 <- cause
4750
// L$3 <- this$run (actually equal to this)
48-
val result = run { channel.receiveOrClosed() }
51+
val result = run { channel.receiveOrClosed(atomic = false) }
4952
if (result.isClosed) {
5053
result.closeCause?.let { throw it }
5154
break // returns normally when result.closeCause == null

kotlinx-coroutines-core/common/src/flow/operators/Context.kt

+5-1
Original file line numberDiff line numberDiff line change
@@ -170,13 +170,17 @@ public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
170170
*
171171
* For more explanation of context preservation please refer to [Flow] documentation.
172172
*
173-
* This operators retains a _sequential_ nature of flow if changing the context does not call for changing
173+
* This operator retains a _sequential_ nature of flow if changing the context does not call for changing
174174
* the [dispatcher][CoroutineDispatcher]. Otherwise, if changing dispatcher is required, it collects
175175
* flow emissions in one coroutine that is run using a specified [context] and emits them from another coroutines
176176
* with the original collector's context using a channel with a [default][Channel.BUFFERED] buffer size
177177
* between two coroutines similarly to [buffer] operator, unless [buffer] operator is explicitly called
178178
* before or after `flowOn`, which requests buffering behavior and specifies channel size.
179179
*
180+
* Note, that flows operating across different dispatchers might lose some in-flight elements when cancelled.
181+
* In particular, this operator ensures that downstream flow does not resume on cancellation even if the element
182+
* was already emitted by the upstream flow.
183+
*
180184
* ### Operator fusion
181185
*
182186
* Adjacent applications of [channelFlow], [flowOn], [buffer], [produceIn], and [broadcastIn] are

kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt

+11-6
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,17 @@ import kotlinx.coroutines.internal.*
88
import kotlin.coroutines.*
99
import kotlin.jvm.*
1010

11-
@PublishedApi internal const val MODE_ATOMIC_DEFAULT = 0 // schedule non-cancellable dispatch for suspendCoroutine
12-
@PublishedApi internal const val MODE_CANCELLABLE = 1 // schedule cancellable dispatch for suspendCancellableCoroutine
13-
@PublishedApi internal const val MODE_UNDISPATCHED = 2 // when the thread is right, but need to mark it with current coroutine
11+
@PublishedApi
12+
internal const val MODE_ATOMIC_DEFAULT = 0 // schedule non-cancellable dispatch for suspendCoroutine
13+
@PublishedApi
14+
internal const val MODE_CANCELLABLE = 1 // schedule cancellable dispatch for suspendCancellableCoroutine
1415

15-
internal val Int.isCancellableMode get() = this == MODE_CANCELLABLE
16-
internal val Int.isDispatchedMode get() = this == MODE_ATOMIC_DEFAULT || this == MODE_CANCELLABLE
16+
internal const val MODE_CANCELLABLE_REUSABLE = 2 // same as MODE_CANCELLABLE but supports reused
17+
internal const val MODE_UNDISPATCHED = 3 // when the thread is right, but need to mark it with current coroutine
18+
19+
internal val Int.isCancellableMode get() = this == MODE_CANCELLABLE || this == MODE_CANCELLABLE_REUSABLE
20+
internal val Int.isDispatchedMode get() = this != MODE_UNDISPATCHED
21+
internal val Int.isReusableMode get() = this == MODE_ATOMIC_DEFAULT || this == MODE_CANCELLABLE_REUSABLE
1722

1823
internal abstract class DispatchedTask<in T>(
1924
@JvmField public var resumeMode: Int
@@ -120,7 +125,7 @@ internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, useMode: In
120125
val result = if (exception != null) Result.failure(exception) else Result.success(state as T)
121126
when (useMode) {
122127
MODE_ATOMIC_DEFAULT -> delegate.resumeWith(result)
123-
MODE_CANCELLABLE -> delegate.resumeCancellableWith(result)
128+
MODE_CANCELLABLE, MODE_CANCELLABLE_REUSABLE -> delegate.resumeCancellableWith(result)
124129
MODE_UNDISPATCHED -> (delegate as DispatchedContinuation).resumeUndispatchedWith(result)
125130
else -> error("Invalid mode $useMode")
126131
}

kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ private class ChannelViaBroadcast<E>(
3939

4040
override suspend fun receive(): E = sub.receive()
4141
override suspend fun receiveOrNull(): E? = sub.receiveOrNull()
42-
override suspend fun receiveOrClosed(): ValueOrClosed<E> = sub.receiveOrClosed()
42+
override suspend fun receiveOrClosed(atomic: Boolean): ValueOrClosed<E> = sub.receiveOrClosed(atomic)
4343
override fun poll(): E? = sub.poll()
4444
override fun iterator(): ChannelIterator<E> = sub.iterator()
4545

kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt

+3-4
Original file line numberDiff line numberDiff line change
@@ -134,15 +134,14 @@ class CatchTest : TestBase() {
134134
// flowOn with a different dispatcher introduces asynchrony so that all exceptions in the
135135
// upstream flows are handled before they go downstream
136136
.onEach { value ->
137-
expect(8)
138-
assertEquals("OK", value)
137+
expectUnreached() // already cancelled
139138
}
140139
.catch { e ->
141-
expect(9)
140+
expect(8)
142141
assertTrue(e is TestException)
143142
assertSame(d0, kotlin.coroutines.coroutineContext[ContinuationInterceptor] as CoroutineContext)
144143
}
145144
.collect()
146-
finish(10)
145+
finish(9)
147146
}
148147
}

kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt

+15
Original file line numberDiff line numberDiff line change
@@ -341,4 +341,19 @@ class FlowOnTest : TestBase() {
341341
assertEquals(expected, value)
342342
}
343343
}
344+
345+
@Test
346+
fun testCancelledFlowOn() = runTest {
347+
assertFailsWith<CancellationException> {
348+
coroutineScope {
349+
flow {
350+
emit(Unit) // emit to buffer
351+
cancel() // now cancel
352+
}.flowOn(wrapperDispatcher()).collect {
353+
// should not be reached, because cancelled before it runs
354+
expectUnreached()
355+
}
356+
}
357+
}
358+
}
344359
}

kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class ReusableCancellableContinuationTest : TestBase() {
1414

1515
@Test
1616
fun testReusable() = runTest {
17-
testContinuationsCount(10, 1, ::suspendAtomicCancellableCoroutineReusable)
17+
testContinuationsCount(10, 1) { suspendAtomicCancellableCoroutineReusable(block = it) }
1818
}
1919

2020
@Test

0 commit comments

Comments
 (0)