Skip to content

Canceling flatMapMerge not canceling the source? #1392

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
akarnokd opened this issue Jul 27, 2019 · 2 comments
Closed

Canceling flatMapMerge not canceling the source? #1392

akarnokd opened this issue Jul 27, 2019 · 2 comments
Milestone

Comments

@akarnokd
Copy link

In the following code, the source Flow is not aborted as I'd expect:

@Test
fun flatMapCancel() = runBlocking {
    flow {
        try {
            println("-> 1")
            emit(1)
            println("-> 2")
            emit(2)
            println("-> 3")
            emit(3)
            println("-> 4")
            emit(4)
            println("-> 5")
        } catch (ex: Throwable) {
            ex.printStackTrace()
        }
    }
    .flatMapMerge(2) { v -> flow { emit(v) } }
    .take(2)
    .collect { println (it) }
}

prints:

-> 1
-> 2
-> 3
1
2
-> 4
-> 5

To check, setting concurrency to 1, thus using flatMapConcat underneath does abort.

Is this an expected property of flatMapMerge?

@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Jul 27, 2019

Is this an expected property of flatMapMerge?

Yes and no. In coroutines, cancellation is asynchronous (not in your case, but in general) and cooperative: either it is checked during suspension by our primitives or manually with isActive or ensureActive primitives. But probably we can do better and check cancellation on some code paths.

qwwdfsad added a commit that referenced this issue Jul 30, 2019
  * Implementation detail (launch on each value) is leaking into upstream behaviour
  * The overhead is negligible compared to launching a new coroutines and sending to channel, but it provides a much approachable mental model when no suspension in the upstream flow happens (note: upstream never sends elements to the channel)

Fixes #1392
@qwwdfsad
Copy link
Collaborator

Returned to this issue again, turns out some implementation detail is leaking into upstream as well.
Now the next emit after successful collection will throw CE.
Will be fixed in 1.3.0.

@elizarov elizarov added this to the 1.3 milestone Aug 2, 2019
@elizarov elizarov removed the question label Aug 2, 2019
qwwdfsad added a commit that referenced this issue Aug 5, 2019
  * Implementation detail (launch on each value) is leaking into upstream behaviour
  * The overhead is negligible compared to launching a new coroutines and sending to channel, but it provides a much approachable mental model when no suspension in the upstream flow happens (note: upstream never sends elements to the channel)

Fixes #1392
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants