Skip to content

Merge doesn't emit the last item before error #2743

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
BraisGabin opened this issue Jun 3, 2021 · 1 comment
Closed

Merge doesn't emit the last item before error #2743

BraisGabin opened this issue Jun 3, 2021 · 1 comment

Comments

@BraisGabin
Copy link

When using merge, if I have a flow that instead of terminate it fails and I never get the last emitted value. Example:

merge(
    flowOf(1, 2)
        .onCompletion { cause -> if (cause == null) throw MyException() }
        .onEach { println("onEach $it") },
)
.collect { println("collect $it") }

the output is:

onEach 1
collect 1
onEach 2

There is a missing collect 2.

I checked that using .flattenMap and .flatMapMerge have the same issue.

My use case is that I want to implement a takeUntil using a coroutine:

fun <T> Flow<T>.takeUntil(predicate: suspend () -> Unit): Flow<T> {
    val upper = this
    return merge(
        upper.onCompletion { cause -> if (cause == null) throw MyException() },
        flow {
            predicate()
            throw MyException()
        }
    )
        .catch {
            if (it !is MyException) {
                throw it
            }
        }
}
@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Jun 8, 2021

This behaviour is expected, as merge (and the corresponding flat* operators) is concurrent operator and merges all the upstreams concurrently (the level of concurrency is determined by the dispatcher). Any exception effectively cancels both upstreams and downstream, throwing away all the in-flight elements, so the exception is delivered as soon as possible. Exceptions for a regular control flow are not recommended both in regular code and flows.

My use case is that I want to implement a takeUntil using a coroutine:

Here is the correct implementation #1850 (comment)

@qwwdfsad qwwdfsad closed this as completed Jun 8, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants