-
Notifications
You must be signed in to change notification settings - Fork 1.9k
merge() and combine() detaches emission from collection #3765
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
It is not a bug, but rather an intrinsic property of any concurrent operator. What we indeed can do better is to document it thoroughly.
Could you please elaborate on where this expectation comes from? There is no such invariant by design, all we state is that all emissions are sequential, and this invariant isn't broken. |
This is documented in Collection and emission should not be concurrent though, right? They should also be sequential as they are supposed to run on the same Why does it work with |
I don't mind that the different flows are collected concurrently inside |
Come to think of it, I find this val f = flow {
println("f1: ${currentCoroutineContext()}")
emit(Unit)
}.onEach {
println("f2: ${currentCoroutineContext()}")
}
merge(f).onEach {
println("f3: ${currentCoroutineContext()}")
}.collect {
println("cl: ${currentCoroutineContext()}")
}
// Prints:
f1: [CoroutineId(3), "coroutine#3":StandaloneCoroutine{Active}@556e8754, Dispatchers.Default]
f2: [CoroutineId(3), "coroutine#3":StandaloneCoroutine{Active}@556e8754, Dispatchers.Default]
f3: [CoroutineId(1), "coroutine#1":ScopeCoroutine{Active}@3b5d09e5, Dispatchers.Default]
cl: [CoroutineId(1), "coroutine#1":ScopeCoroutine{Active}@3b5d09e5, Dispatchers.Default] |
I think I understand that the That being said, I created a generic solution for preventing unwanted buffering methods from... buffering. Note that if // I guess this is a separate suggestion :-)
fun <T> Flow<T>.afterEach(block: suspend (T) -> Unit): Flow<T> = flow {
collect {
emit(it)
block(it)
}
}
/**
* Returns the transformation of the receiver [Flow], blocking its emission on collection of the returned flow,
* practically ignoring any buffering made inside the transformation.
*/
fun <T, R> Flow<T>.waitingFlow(
bufferingTransform: Flow<T>.() -> Flow<R>
): Flow<R> = flow {
val mutex = Mutex(true)
emitAll(
afterEach { mutex.lock() }
.bufferingTransform()
.afterEach { mutex.unlock() }
)
}
// Signature for N flows, so it can be used with combine() or merge():
fun <T1, T2, R> waitingFlow(
flow: Flow<T1>,
flow2: Flow<T2>,
// ..., or vararg, etc.
bufferingTransform: (Flow<T1>, Flow<T2>) -> Flow<R>
): Flow<R> = flow {
val mutex = Array(2) { Mutex(true) }
emitAll(
bufferingTransform(
flow.afterEach { mutex[0].lock() },
flow2.afterEach { mutex[1].lock() },
).afterEach {
mutex.forEach { it.unlock() }
}
)
}
// Basic (useless) example:
val f = (1..3).asFlow().onEach { println("f$it") }
f.waitingFlow { map { it * 2 }.conflate() }.collect { println("c$it"); delay(100) }
// Prints: f1, c2, f2, c4, f3, c6 (ignoring conflate()).
// Negating combine buffering example:
val fa = (1..3).asFlow().onEach { println("fa$it") }
val fb = (1..3).asFlow().onEach { println("fb$it"); delay(10) }
waitingFlow(fa, fb) { wfa, wfb ->
combine(wfa, wfb) { va, vb -> va + vb }
}.collect { println("c$it"); delay(100) }
// Prints: fa1, fb1, c2, fa2, fb2, c4, fa3, fb3, c6 (faN and fbN might be reversed) |
The above solution won't work as is for I don't have a solution on hand, but I'm sure it's solvable. |
Flow is conceptually an asynchronous primitive. The thing you are trying to achieve would be much better to do using a different [synchronous] primitive that will not have any kind of asynchronous operations like buffer and others. |
To be clear, the solution should not make val fa = (1..3).asFlow().onEach { println("fa$it") } // No delay
val fb = (1..3).asFlow().onEach { println("fb$it"); delay(10) }
waitingFlow(fa, fb) { wfa, wfb ->
combine(wfa, wfb) { va, vb -> va + vb }
}.collect { println("c$it") } // No delay
// Prints: fa1, fb1, c2, fa2, c3, fa3, c4, fb2, c5, cf3, c6 Unfortunately the issue mentioned above re Note that adding val f = (1..3).asFlow()
f.conflate().collect { println(it); delay(100) }
// Prints: 1, 3
// And not: 3 (like combine would today) Given this equivalency (which is more deterministic), I still suggest that this would be the default |
I don't think being "conceptually an asynchronous primitive" is conflicting with this issue. It is still asynchronous, still suspend functions that do not block the thread. Nobody expects |
Could you please quote the part that gives that impression? I intended to change that, but for me, it explicitly states that it introduces a concurrency and splits sequential operators chain into multiple coroutines.
Here is where we disagree, and that's what we are highlighting in the documentation, both in Maybe something like #3274 could've been helpful as well, I suggest checking it out |
I'd appreciate reopening this for the sake of discussion, as I think there's more to have. Feel free to ignore the request, but either way this is my response to your comment:
Documentation that states flows are sequential:
Documentation that states flows are not sequential:
The points that I'm trying to make are:
|
Describe the bug
Flows expect that
emit()
will block until the value has finished collecting, unless explicit buffering is used (e.g. by call toconflate()
orbuffer()
). Manipulating flows usingmerge()
andcombine()
break that promise, acting asbuffer(UNLIMITED)
and weirdconflate()
respectively.combine
is documented to conflate the flows until all flows emit at least once, but: 1. It breaks the above promise even after all flows emitted, and 2. There's no reason to - if the caller wants to conflate the inputs they can (withconflate()
), and if they don't it makes sense that the flows will be blocked by default until all flows have emitted (and thus collection of their emission can begin).merge
has no excuse - all emissions are supposed to be collected, so there is no reason to break the above promise.I think the issue is the unsynchronized usage of
Channel
to communicate the input to the output, in both implementations. A possible simple solution would be to hold aMutex
on collecting from the input until after emitting to the output.This is similar to issue #2603 (which has a potential solution similar to the one described in the previous paragraph), but it's less acceptable because these are manipulator functions rather than implementations of
Flow
(which can state whatever they want in the emission docs, like "SharedFlow
doesn't wait" and "StateFlow
conflates").Provide a Reproducer
Regular flow:
Merge:
Combine:
Adding more emissions to the
merge
example show that they all finish before the first collection is done, while doing the same for thecombine
example create results that I can't explain.The text was updated successfully, but these errors were encountered: