-
Notifications
You must be signed in to change notification settings - Fork 1.9k
SharedFlow.emit() doesn't wait for a subscriber to complete collecting #2603
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Thanks. There's already PR with a fix for the next release: #2437 |
Sorry for commenting in a closed thread, @elizarov , but I see the PR is just a tweak to the docs. Is there a way to make the SharedFlow suspend till all the subscribers have completed collecting, as we (mis)understood before? For me it would be a very good and easy way of syncronizing the emission of values through several flows. |
Hi, @serandel
This code will print desired sequential emit-collect output. Of course, we need to use unbuffered shared flow to get suspend behavior. |
Thanks, @j2esu! I really like your solution for SharedFlow but unfortunately it messes up with StateFlow. :/ Right now I'm playing with a decorator like this, but I'm not 100% sure I'm race conditions-free.
|
I also have to think about managing flows with replay... |
@serandel As for StateFlow, it seems like a dangerous idea to rely on collection completion, cause StateFlow is conflated by design, so some emissions can be skipped by slow collectors. |
In my use case (Kotlin gamedev, please don't judge :D ) I don't have a problem with slow collectors. In fact, I use the suspending emissions to synchronize game states and game events, so animations and sounds can drive the transitions between states in my state machines. It works awesome and I really would like to have it built-in the flows instead of having my silly decorators, which make my types NOT proper Flows, so I lose the possibility of using the standard operators. |
1.6.3, |
I would love this to be reopened. The issue about some people wanting to avoid waiting for slow collectors and some other people (me) just wanting to do exactly that, for synchronization, sounds to me like a possible option to specify when creating the SharedFlow. |
I don't think this being an option on a SharedFlow is a good idea. It is stated everywhere that SharedFlow is, semantically, a hot stream of values, that is, it executes independently of the behavior of its subscribers. If we added an option to change that, this would go against the whole mental model of SharedFlow. Likewise, with StateFlow, which is also a hot flow, doing I think you should open a separate issue instead, for a |
Well, if you look at the PR #2437, the documentation initially said that, as rendezvous, the emission was suspending until all collectors processed the event. So I don't think it's breaking the mental model, really, when even the team thought it worked like that at first. ;) SharedFlow (and, in consequence, StateFlow) is hot in the sense (IMHO) that its generation of values is independent on how many collectors you have, instead of every collector executing a different instance of the emitting code in parallel. If it has a rendezvous behaviour or not doesn't affect this in my POV. Nevertheless, having a separate SharedFlow implementation instead of an option in the existing one, disabled by default, is perfectly fine as well. Shall I open that ticket? |
I think you are right. After all, we have https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-sharing-started/-companion/-while-subscribed.html, which stops the operation altogether if no subscribers are present, which is much more intrusive than just waiting for every subscriber to receive the value. |
IMHO either documentation or implementation has to be changed since they don't match each other at the moment. This is misleading and may significantly increase time of debugging. |
If emitter does not suspend waiting for collectors, then I don't understand why |
in IJ I was going to replace our listener mechanism (lots of weird code, see
It seemed like a win-win until I've discovered this issue. Are there any plans to revisit this in foreseeable future? |
A possible solution in many cases is performing the collection in import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main(): Unit = runBlocking {
val sharedFlow = MutableSharedFlow<Int>()
launch(Dispatchers.Unconfined) {
sharedFlow.collect {
println("Received $it")
Thread.sleep(1000)
println("Finished collecting $it")
}
}
launch(Dispatchers.Unconfined) {
sharedFlow.collect {
println("Second collector: received $it")
Thread.sleep(1000)
println("Second collector: finished collecting $it")
}
}
launch(Dispatchers.Default) {
repeat(10) {
println("Emitting $it")
sharedFlow.emit(it)
println("Emitted $it")
}
}
} This way, the collection procedure is called from inside However, this is not a general solution:
Could the interested parties share whether the limitations of collecting in |
As I was mentioning, collectors are expected to
|
For my use case, |
All flows (including |
Hack in question: Kotlin/kotlinx.coroutines#2603 (comment) It was intended to ensure that the emitted value was collected, but it doesn't work with `collectLatest`. Thankfully, `collectLatest` is not needed here, because logs are dumped once in 12/24 hours. This change also ensures all coroutines work in correct scopes by getting rid of `application.coroutineScope` and `project.coroutineScope` usages. GitOrigin-RevId: 0f93cb3a59a722ee98936a93fcc1cb301e50a9f3
Following @elizarov 's comment, if someone needs class EventBus<T> {
private val context = newSingleThreadContext("EventBus")
private val scope = CoroutineScope(context)
private val _events = MutableSharedFlow<T>()
suspend fun emit(event: T) = withContext(context) {
_events.emit(event)
}
fun subscribe(block: (event: T) -> Unit) = _events
.onEach { block(it) }
.launchIn(scope)
}
// Example
val eventBus = EventBus<String>()
eventBus.emit("a")
println("a sent")
eventBus.emit("b")
println("b sent")
val job1 = eventBus
.subscribe { event ->
println("Received event: $event")
}
eventBus.emit("c")
println("c sent")
val job2 = eventBus
.subscribe { event ->
println("Received event: $event")
}
eventBus.emit("d")
println("d sent")
val job3 = eventBus
.subscribe { event ->
println("Received event: $event")
}
eventBus.emit("e")
println("e sent")
job1.cancel()
eventBus.emit("f")
println("f sent")
job2.cancel()
eventBus.emit("g")
println("g sent")
job3.cancel() It will output
Without using it val eventBus = MutableSharedFlow<String>()
eventBus.emit("a")
println("a sent")
eventBus.emit("b")
println("b sent")
val job1 = eventBus
.onEach { event ->
println("Received event: $event")
}
.launchIn(this)
eventBus.emit("c")
println("c sent")
val job2 = eventBus
.onEach { event ->
println("Received event: $event")
}
.launchIn(this)
eventBus.emit("d")
println("d sent")
val job3 = eventBus
.onEach { event ->
println("Received event: $event")
}
.launchIn(this)
eventBus.emit("e")
println("e sent")
job1.cancel()
eventBus.emit("f")
println("f sent")
job2.cancel()
eventBus.emit("g")
println("g sent")
job3.cancel() And will produce (notice that
|
Really good idea, @tristancaron. Love it! |
Thinking a bit more about this, what if I wanted all consumers to run in parallel and just block the producer, but no consumer should block any other? Can you think of anything simpler than the use of Channel I pasted before in this thread? |
@serandel it sounds like your use case is documented here: https://kotlinlang.org/docs/channels.html#fan-out |
Not really because I don't want the messages to be processed by one of the consumers, as in the "Fan-out" section, but to processed by all consumers in parallel. |
@serandel Made an error while coding. Using suspend fun subscribe(block: suspend (event: T) -> Unit) = suspendCoroutine {
_events
.onSubscription { it.resume(Unit) }
.onEach { block(it) }
.launchIn(scope)
}
} |
@tristancaron you're a genius |
Gave it a go without using a It worked in a small print test, but I feel like something's wrong. class WaitingSharedFlow<T>() : Flow<T> {
private val outputs = ConcurrentHashMap<SendChannel<T>, Mutex>()
override suspend fun collect(collector: FlowCollector<T>) {
val channel = Channel<T>()
val mutex = Mutex(true)
outputs[channel] = mutex
try {
while (true) {
collector.emit(channel.receive())
mutex.unlock()
}
} finally {
outputs -= channel.also { it.close() }
}
}
suspend fun emit(value: T) = coroutineScope {
for ((channel, mutex) in outputs) {
launch {
try {
channel.send(value)
} catch (_: ClosedSendChannelException) {
return@launch
}
mutex.lock()
}
}
}
} |
Here's a simple playground that shows some issues: fun main(): Unit = runBlocking {
val sharedFlow = WaitingSharedFlow<Int>()
repeat(3) { collector ->
launch(start = CoroutineStart.UNDISPATCHED) {
sharedFlow.buffer(1).collect {
println("$collector: received $it")
delay(1000)
println("$collector: finished receiving $it")
}
}
}
repeat(3) {
println("Emitting $it")
sharedFlow.emit(it)
yield()
println("Emitted $it")
}
} As you can see, a single A less obvious example is fun main(): Unit = runBlocking {
val sharedFlow = WaitingSharedFlow<Int>()
val infiniteFlow = flow {
var i = 0
while (true) {
emit(i++)
}
}
repeat(3) { collector ->
launch(start = CoroutineStart.UNDISPATCHED) {
sharedFlow.combine(infiniteFlow){ a, _ -> a }.collect {
println("$collector: received $it")
delay(1000)
println("$collector: finished receiving $it")
}
}
}
repeat(3) {
println("Emitting $it")
sharedFlow.emit(it)
yield()
println("Emitted $it")
}
} |
Re // try with and without buffer(1)
fun main(): Unit = runBlocking(Dispatchers.Default) {
val f = flow {
for (i in 1..10) {
println("before: $i")
emit(i)
println("after: $i")
}
}
f.buffer(1).collect { println("collect: $it") } Re // Even after both flows start emitting, collection is not between the before/after.
fun main(): Unit = runBlocking(Dispatchers.Default) {
val f1 = flow {
for (i in 1..10) {
println("f1 before: $i")
emit(i)
println("f1 after: $i")
}
}
val f2 = flow {
for (i in 1..10) {
println("f2 before: $i")
emit(i)
println("f2 after: $i")
}
}
combine(listOf(f1, f2)) { it.toList() }
.collect { println("collect: $it") }
} I think my implementation stands (and I think Kotlin owners should consider something similar for |
This should work best for the mentioned case. Obviously, before any asynchronous flow transformation operators (aka Channel send receive)
|
… config screen fix #6 The kotlin flow emit won't waiting for subscribers collected. Kotlin/kotlinx.coroutines#2603 So, switching to `onEach` so that the loading is complete when emitted
@odedniv, why do you think the solution of @firmeldir is incorrect? Thanks. |
It looks like
|
Actually the The worst problem is that |
Another implementation with class WaitingSharedFlow<T>() : Flow<T> {
private val allChannels = mutableSetOf<Channels<T>>()
override suspend fun collect(collector: FlowCollector<T>) {
val channels = Channels<T>()
synchronized(allChannels) {
allChannels += channels
}
try {
while (true) {
collector.emit(channels.data.receive())
channels.done.send(Unit)
}
} finally {
synchronized(allChannels) {
allChannels -= channels.also { it.close() }
}
}
}
suspend fun emit(value: T) = coroutineScope {
synchronized(allChannels) {} // Ensuring memory barrier with collectors.
for (channels in allChannels) {
launch {
try {
channels.data.send(value)
} catch (_: ClosedSendChannelException) {
return@launch
}
try {
channels.done.receive()
} catch (_: ClosedReceiveChannelException) {}
}
}
}
private data class Channels<T>(val data: Channel<T> = Channel(), val done: Channel<Unit> = Channel()) {
fun close() {
data.close()
done.close()
}
}
} There's quite a lot of people here, any chance there will be an official API for this request? |
I can send a PR if the maintainers agree to having this. |
@odedniv Liked your final solution, probably the cleanest way to approach this. Based my final solution on yours with small changes:
Let me know what you think and if you found any issues with your original implementation since you wrote it. internal interface SyncFlow<T> : Flow<T>
internal interface MutableSyncFlow<T> : SyncFlow<T> {
suspend fun emit(value: T)
} internal class ChannelsMutableSyncFlow<T> : MutableSyncFlow<T> {
private var channels = Collections.synchronizedList(mutableListOf<SyncChannel<T>>())
override suspend fun collect(collector: FlowCollector<T>) {
createChannel().use { channel ->
while (true) {
channel.receive { collector.emit(it) }
}
}
}
private fun createChannel() =
SyncChannel(onClose = { channels.remove(it) }).also { channels.add(it) }
override suspend fun emit(value: T) = coroutineScope {
synchronized(channels) { channels.forEach { launch { it.send(value) } } }
}
private class SyncChannel<T>(
private val data: Channel<T> = Channel(),
private val ack: Channel<Unit> = Channel(),
private val onClose: (SyncChannel<T>) -> Unit
) : Closeable {
suspend fun send(value: T) {
try {
data.send(value)
ack.receive()
} catch (_: ClosedReceiveChannelException) {} catch (_: ClosedSendChannelException) {}
}
suspend fun receive(block: suspend (T) -> Unit) {
try {
block(data.receive())
ack.send(Unit)
} catch (_: ClosedReceiveChannelException) {} catch (_: ClosedSendChannelException) {}
}
override fun close() {
data.close()
ack.close()
onClose(this)
}
}
} |
Looks nice, I really hope someone from the kotlinx.coroutines would look at these and find some unexpected issues... this is hard to get right. |
Roamed the Internet in search of a solution to this problem. The latest realization from @hrofeh works well! For myself, I’ve changed it this way.
|
@xRamazaNx Thanks for verifying. |
SharedFlow doc says:
So, with the following code
we could expect the following output:
But the actual output is:
Seems like the flow has an extra 1 size buffer and doesn't suspend on first emit() call. Is it a bug, or I'm missing something?
The text was updated successfully, but these errors were encountered: