-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Flow collect does not honor cancellation #1177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
It is not a bug, but intended behavior. Why do you expect this snippet not to throw?
In your snippet, one thread is doing collect inside the
It doesn't really help either, because it is "check and act" race:
The main source of confusion here comes from the fact that cancellation is concurrent and most of the familiar check-and-act patterns do not work. Additionally, |
Yes right, my example was apparently wrong. I tried to boil down my actual problem to a minimal self contained example. My actual use case is actually pretty simple: I try to boil it down once again. I use a import kotlinx.coroutines.*
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
@FlowPreview
fun main() {
val flow = flow {
withContext(Dispatchers.IO) {
}
emit(Unit)
}
val mainThreadContext = newSingleThreadContext("main-thread")
runBlocking {
repeat(100) {
val job = launch(mainThreadContext) {
flow.collect {
if (!isActive) {
throw IllegalStateException("DOH")
}
}
}
launch(mainThreadContext) {
job.cancel()
}
}
}
} |
It's a race if different threads are involved, as @qwwdfsad pointed out. However it's common in Android apps to consume a stream from the main thread, and also cancel it from the main thread (there's concurrency, but no parallelism). Coming from RxJava, this Flow behavior is surprising because when everything is on a single thread like this, a subscriber is guaranteed to stop getting emissions as soon as it is unsubscribed. In this case, a single |
@PaulWoitaschek thanks for the reproducer!
I agree with that. When there is no parallelism and one has a (deterministic) sequential order of events, it is unexpected to observe a cancelled coroutine resumed normally. Most of the time cancellation works as expected in sequential scenarios, For the ones interested in why did it happen, some implementation details: |
…able). * Reasoning about cancellation is simplified in sequential scenarios, if 'cancel' was invoked before withContext return it will throw an exception, thus "isActive == false" cannot be observed in sequential scenarios after cancellation * withContext now complies its own documentation Fixes #1177
* It allows to apply the same reasoning about cancellation as in dispatched coroutines * Makes cancellation modes consistent Related to #1177
I would expect the following code not to throw an exception because the reason I'm cancelling it is that it does not emit further values.
What helps here is to call
ensureActive()
as the first statement in the collect block.Is this a bug or is it necessary to always call
ensureActive
on each collect?For my code it is crucial that it does not execute the body of collect after I cancelled the job.
The text was updated successfully, but these errors were encountered: