-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Ensure that flowOn does not resume downstream after cancellation #1730
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
24b84b0
to
f5dda94
Compare
|
||
@Test | ||
fun testCancelledFlowOn() = runTest { | ||
assertFailsWith<CancellationException> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about the original issue (#1265), I am concerned about the fact that other channel-related operators may expose unexpected behaviour as well (aka touching view after its destruction).
E.g. the similar test with combine
operator will execute its collect
block.
I'd say we should re-visit our channel-related operators, measure the performance impact of checking cancellation in collect
and either do it, or do it only in channel-related operators (and, maybe, provide a checkedCollect
or something like this)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not about checking in collect
. It is about checking on resume after suspension. All our suspending function do check for cancellation on resume with exception of Mutex.lock
, Semaphore.acquire
, and Channel.send/receive
function that do not do so, we need to revisit operation that use those functions (flowOn
was one of them). In fact, we should really revisit atomicity of those underlying operations in the first place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to do it separately or as part of this PR?
E.g. the following code:
coroutineScope {
val flow = flow {
emit(Unit) // emit to buffer
cancel() // now cancel
}
flow.combine(flow) { u, _ -> u }.collect {
// should not be reached, because cancelled before it runs
expectUnreached()
}
}
will throw ISE(Should not be reached)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll fix it all as a part of this PR.
f5dda94
to
ca2fa6a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good modulo my comment about revisiting.
|
||
@Test | ||
fun testCancelledFlowOn() = runTest { | ||
assertFailsWith<CancellationException> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to do it separately or as part of this PR?
E.g. the following code:
coroutineScope {
val flow = flow {
emit(Unit) // emit to buffer
cancel() // now cancel
}
flow.combine(flow) { u, _ -> u }.collect {
// should not be reached, because cancelled before it runs
expectUnreached()
}
}
will throw ISE(Should not be reached)
ca2fa6a
to
07c5f7b
Compare
We need a more thorough solution as fixing all those issue one-by-one creates quite an ugly code. Here's a separate issue: #1813. Let's fix it as part of this PR, too. |
4a49830
to
aff8202
Compare
This is a problematic for Android when Main dispatcher is cancelled on destroyed activity. Atomic nature of the underlying channels is designed to prevent loss of elements, which is really not an issue for a typical application, but creates problem when used with channels. This change introduces an optional `atomic` parameter to an internal `ReceiveChannel.receiveOrClosed` method to control cancellation atomicity of this function and tweaks `emitAll(ReceiveChannel)` implementation to abandon atomicity in favor of predictable cancellation. Fixes #1265
…CancellableCoroutine * MODE_ATOMIC_DEFAULT split into MODE_ATOMIC (for dispatch) and MODE_ATOMIC_REUSABLE (for suspendCancellableCoroutineReusable only). Dispatch modes are orthogonal to additional REUSE capability now. * Better documentation for MODE_XXX constants. * suspendCancellableCoroutineReusable does not have a default mode anymore, so its use is more explicit. * Completely drop (inline) suspendAtomicCancellableCoroutine. Any kind of legacy code where this call might have been inlined still works because the constant value of MODE_ATOMIC = 0 is retained and carries its legacy meaning (no continuation reuse). * Added stress test for proper handling of MODE_CANCELLABLE_REUSABLE and fixed test for #1123 bug with job.join (working in MODE_CANCELLABLE) that was not properly failing in the absence of the proper code in CancellableContinuationImpl.getResult
07c5f7b
to
fd8692e
Compare
Aborting this PR in favor of #1937 that completely gets rid of "atomic cancellation", thus making |
This is a problematic for Android when Main dispatcher is cancelled on destroyed activity. Atomic nature of the underlying channels is designed to prevent loss of elements, which is really not an issue for a typical application, but creates problem when used with channels.
This change introduces an optional
atomic
parameter to an internalReceiveChannel.receiveOrClosed
method to control cancellation atomicity of this function and tweaksemitAll(ReceiveChannel)
implementation to abandon atomicity in favor of predictable cancellation.Fixes #1265