Skip to content

SharedFlow.emit() doesn't wait for a subscriber to complete collecting #2603

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
j2esu opened this issue Mar 23, 2021 · 39 comments
Open

SharedFlow.emit() doesn't wait for a subscriber to complete collecting #2603

j2esu opened this issue Mar 23, 2021 · 39 comments

Comments

@j2esu
Copy link

j2esu commented Mar 23, 2021

SharedFlow doc says:

For example, the following class encapsulates an event bus that distributes events to all subscribers in a rendezvous manner, suspending until all subscribers process each event

So, with the following code

fun main() = runBlocking {
    val numbers = MutableSharedFlow<Int>()
    GlobalScope.launch {
        delay(1000)
        repeat(3) {
            println("emit $it")
            numbers.emit(it)
        }
    }
    GlobalScope.launch {
        numbers.collect {
            delay(1000)
            println("$it collected")
        }
    }.join()
}

we could expect the following output:

emit 0
0 collected
emit 1
1 collected
emit 2
2 collected

But the actual output is:

emit 0
emit 1
0 collected
emit 2
1 collected
2 collected

Seems like the flow has an extra 1 size buffer and doesn't suspend on first emit() call. Is it a bug, or I'm missing something?

@elizarov
Copy link
Contributor

Thanks. There's already PR with a fix for the next release: #2437

@serandel
Copy link

Sorry for commenting in a closed thread, @elizarov , but I see the PR is just a tweak to the docs. Is there a way to make the SharedFlow suspend till all the subscribers have completed collecting, as we (mis)understood before? For me it would be a very good and easy way of syncronizing the emission of values through several flows.

@j2esu
Copy link
Author

j2esu commented Mar 28, 2021

Hi, @serandel
Hope that @elizarov has more elegant solution, but for now I have some thoughts about that. We need some sort of "sync" event in our shared flow. It can be null, specific value or type. Then, we can modify an example above:

fun main() = runBlocking {
    val numbers = MutableSharedFlow<Int?>() // UPD: allow null as sync event
    GlobalScope.launch {
        delay(1000)
        repeat(3) {
            println("emit $it")
            numbers.emit(it)
            numbers.emit(null) // UPD: suspend until subscribers receive sync event
            // here we have a guarantee that business event collected, cause sync event received
        }
    }
    GlobalScope.launch {
        numbers.filterNotNull() // UPD: just skip sync event by filtering
            .collect {
                delay(1000)
                println("$it collected")
            }
    }.join()
}

This code will print desired sequential emit-collect output. Of course, we need to use unbuffered shared flow to get suspend behavior.

@serandel
Copy link

Thanks, @j2esu!

I really like your solution for SharedFlow but unfortunately it messes up with StateFlow. :/

Right now I'm playing with a decorator like this, but I'm not 100% sure I'm race conditions-free.

class RendezvousSharedFlow<T>(private val flow: MutableSharedFlow<T>) {
    private val rendezvous = Channel<Unit>()

    suspend fun emit(value: T) {
        flow.emit(value)

        repeat(flow.subscriptionCount.value) {
            rendezvous.receive()
        }
    }

    suspend fun collect(collector: suspend (T) -> Unit) {
        flow.collect {
                collector(it)
                rendezvous.send(Unit)
        }
    }
}

@serandel
Copy link

I also have to think about managing flows with replay...

@j2esu
Copy link
Author

j2esu commented Mar 28, 2021

@serandel As for StateFlow, it seems like a dangerous idea to rely on collection completion, cause StateFlow is conflated by design, so some emissions can be skipped by slow collectors.

@serandel
Copy link

serandel commented May 18, 2021

In my use case (Kotlin gamedev, please don't judge :D ) I don't have a problem with slow collectors. In fact, I use the suspending emissions to synchronize game states and game events, so animations and sounds can drive the transitions between states in my state machines.

It works awesome and I really would like to have it built-in the flows instead of having my silly decorators, which make my types NOT proper Flows, so I lose the possibility of using the standard operators.

@allco
Copy link

allco commented Jul 7, 2022

1.6.3,
the problem is still there :-/
Perhaps is makes sense to reopen this issue?

@serandel
Copy link

serandel commented Jul 7, 2022

I would love this to be reopened.

The issue about some people wanting to avoid waiting for slow collectors and some other people (me) just wanting to do exactly that, for synchronization, sounds to me like a possible option to specify when creating the SharedFlow.

@dkhalanskyjb
Copy link
Collaborator

I don't think this being an option on a SharedFlow is a good idea. It is stated everywhere that SharedFlow is, semantically, a hot stream of values, that is, it executes independently of the behavior of its subscribers. If we added an option to change that, this would go against the whole mental model of SharedFlow. Likewise, with StateFlow, which is also a hot flow, doing flow.value = x behaves in a fire-and-forget manner. What you're asking for doesn't fit the model of SharedFlow at all, in my opinion.

I think you should open a separate issue instead, for a Flow that broadcasts values to multiple subscribers and awaits their consumption before proceeding.

@serandel
Copy link

serandel commented Jul 8, 2022

Well, if you look at the PR #2437, the documentation initially said that, as rendezvous, the emission was suspending until all collectors processed the event. So I don't think it's breaking the mental model, really, when even the team thought it worked like that at first. ;)

SharedFlow (and, in consequence, StateFlow) is hot in the sense (IMHO) that its generation of values is independent on how many collectors you have, instead of every collector executing a different instance of the emitting code in parallel. If it has a rendezvous behaviour or not doesn't affect this in my POV. Nevertheless, having a separate SharedFlow implementation instead of an option in the existing one, disabled by default, is perfectly fine as well.

Shall I open that ticket?

@dkhalanskyjb
Copy link
Collaborator

I think you are right. After all, we have https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-sharing-started/-companion/-while-subscribed.html, which stops the operation altogether if no subscribers are present, which is much more intrusive than just waiting for every subscriber to receive the value.

@allco
Copy link

allco commented Jul 10, 2022

IMHO either documentation or implementation has to be changed since they don't match each other at the moment. This is misleading and may significantly increase time of debugging.

@dovchinnikov
Copy link
Contributor

If emitter does not suspend waiting for collectors, then I don't understand why SharedFlow.emit has suspend modifier.

@dovchinnikov
Copy link
Contributor

in IJ I was going to replace our listener mechanism (lots of weird code, see com.intellij.util.messages.impl.MessageBusImpl) with a single MutableSharedFlow() because

  • it's possible to suspend in collectors;
  • a collector does not leak naturally, it's removed when its coroutine is cancelled.

It seemed like a win-win until I've discovered this issue. Are there any plans to revisit this in foreseeable future?

@dkhalanskyjb
Copy link
Collaborator

dkhalanskyjb commented Jan 24, 2023

A possible solution in many cases is performing the collection in Dispatchers.Unconfined.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main(): Unit = runBlocking {
    val sharedFlow = MutableSharedFlow<Int>()
    launch(Dispatchers.Unconfined) {
        sharedFlow.collect {
            println("Received $it")
            Thread.sleep(1000)
            println("Finished collecting $it")
        }
    }
    launch(Dispatchers.Unconfined) {
        sharedFlow.collect {
            println("Second collector: received $it")
            Thread.sleep(1000)
            println("Second collector: finished collecting $it")
        }
    }
    launch(Dispatchers.Default) {
        repeat(10) {
            println("Emitting $it")
            sharedFlow.emit(it)
            println("Emitted $it")
        }
    }
}

This way, the collection procedure is called from inside sharedFlow.emit(it), so emit never terminates until the callback is finished.

However, this is not a general solution:

  1. If emissions themselves happen from Dispatchers.Unconfined (including anywhere up the call stack!), this won't work.
  2. It is only guaranteed that the immediate execution will happen until the first suspension. If there's a delay or even just a yield in your collect procedure, emit will not wait for the whole callback to finish.
  3. Collecting in Dispatchers.Unconfined gives a stronger guarantee than "emit waits for the callbacks": the callbacks are executed by it. This means that if there are several callbacks, they will be executed sequentially and never in parallel, which may or may not be exactly what you want.

Could the interested parties share whether the limitations of collecting in Dispatchers.Unconfined affect you and how?

@dovchinnikov
Copy link
Contributor

As I was mentioning, collectors are expected to suspend. Also, I don't expect the immediate execution, I only expect emit to suspend until all collectors are run. Also, Unconfined is not confined (:)), and collectors require to be running in the EDT.

val events = MutableSharedFlow<String>() 
cs.launch { // inside some coroutine
  val data = prepareData()
  events.emit("prepared")
  // the following line should be invoked after all collectors are completed with "prepared" event,
  // as if the code is "sequential yet suspending" like all coroutines
  handle(data)
  events.emit("handled")
}

@serandel
Copy link

For my use case, Dispatchers.Unconfined wouldn't work, as some of my collectors have to be run in an OpenGL main thread. I assume that by point 2 if I do a launch(Dispatchers.Main) to run my effects the Flow will assume the collection is finished.

@elizarov
Copy link
Contributor

All flows (including SharedFlow) are asynchronous by design and include, out of the box, lots of asynchronous flow transformation operators. They do not fit, on the conceptual level, to the needs of a synchronous event processing system, when you must wait for the previous event to be fully processed before moving on to the next one. For a synchronous event processing system, it would be a mistake to even include all the operators that Flow has, so a synchronous event professing should not even implement a Flow interface in its primitives and must be designed from scratch with its own primitives and operators as needed in a particular domain.

intellij-monorepo-bot pushed a commit to JetBrains/intellij-community that referenced this issue Feb 1, 2023
Hack in question: Kotlin/kotlinx.coroutines#2603 (comment)
It was intended to ensure that the emitted value was collected, but it doesn't work with `collectLatest`.
Thankfully, `collectLatest` is not needed here, because logs are dumped once in 12/24 hours.

This change also ensures all coroutines work in correct scopes by getting rid
of `application.coroutineScope` and `project.coroutineScope` usages.

GitOrigin-RevId: 0f93cb3a59a722ee98936a93fcc1cb301e50a9f3
@tristancaron
Copy link

Following @elizarov 's comment, if someone needs emit to really wait for all subscribers

class EventBus<T> {
    private val context = newSingleThreadContext("EventBus")
    private val scope = CoroutineScope(context)

    private val _events = MutableSharedFlow<T>()

    suspend fun emit(event: T) = withContext(context) {
        _events.emit(event)
    }

    fun subscribe(block: (event: T) -> Unit) = _events
        .onEach { block(it) }
        .launchIn(scope)
}

// Example
val eventBus = EventBus<String>()
eventBus.emit("a")
println("a sent")
eventBus.emit("b")
println("b sent")

val job1 = eventBus
    .subscribe { event ->
        println("Received event: $event")
    }

eventBus.emit("c")
println("c sent")

val job2 = eventBus
    .subscribe { event ->
        println("Received event: $event")
    }

eventBus.emit("d")
println("d sent")

val job3 = eventBus
    .subscribe { event ->
        println("Received event: $event")
    }

eventBus.emit("e")
println("e sent")

job1.cancel()

eventBus.emit("f")
println("f sent")

job2.cancel()

eventBus.emit("g")
println("g sent")

job3.cancel()

It will output

a sent
b sent
Received event: c
c sent
Received event: d
Received event: d
d sent
Received event: e
Received event: e
Received event: e
e sent
Received event: f
Received event: f
f sent
Received event: g
g sent

Without using it newSingleThreadContext it would be

val eventBus = MutableSharedFlow<String>()
eventBus.emit("a")
println("a sent")
eventBus.emit("b")
println("b sent")

val job1 = eventBus
    .onEach { event ->
        println("Received event: $event")
    }
    .launchIn(this)

eventBus.emit("c")
println("c sent")

val job2 = eventBus
    .onEach { event ->
        println("Received event: $event")
    }
    .launchIn(this)

eventBus.emit("d")
println("d sent")

val job3 = eventBus
    .onEach { event ->
        println("Received event: $event")
    }
    .launchIn(this)

eventBus.emit("e")
println("e sent")

job1.cancel()

eventBus.emit("f")
println("f sent")

job2.cancel()

eventBus.emit("g")
println("g sent")

job3.cancel()

And will produce (notice that c is never consumed)

a sent
b sent
c sent
d sent
Received event: d
Received event: e
Received event: e
Received event: d
Received event: e
e sent
Received event: f
Received event: f
f sent
Received event: g
g sent

@serandel
Copy link

Really good idea, @tristancaron. Love it!

@serandel
Copy link

Thinking a bit more about this, what if I wanted all consumers to run in parallel and just block the producer, but no consumer should block any other? Can you think of anything simpler than the use of Channel I pasted before in this thread?

@tristancaron
Copy link

@serandel it sounds like your use case is documented here: https://kotlinlang.org/docs/channels.html#fan-out

@serandel
Copy link

Not really because I don't want the messages to be processed by one of the consumers, as in the "Fan-out" section, but to processed by all consumers in parallel.

@tristancaron
Copy link

@serandel Made an error while coding. Using val eventBus = MutableSharedFlow<String>() works just fine for parallel as well. My issue was that emit was called before launchIn actually started. Wrapping my code with the following code (just to confirm my theory) worked. And everything ran as expected.

    suspend fun subscribe(block: suspend (event: T) -> Unit) = suspendCoroutine {
        _events
            .onSubscription { it.resume(Unit) }
            .onEach { block(it) }
            .launchIn(scope)
        }
    }

@Atwa
Copy link

Atwa commented Mar 31, 2023

@tristancaron you're a genius

@odedniv
Copy link
Contributor

odedniv commented May 27, 2023

Gave it a go without using a CoroutineScope, can someone tell me what's wrong with it?

It worked in a small print test, but I feel like something's wrong.

class WaitingSharedFlow<T>() : Flow<T> {
    private val outputs = ConcurrentHashMap<SendChannel<T>, Mutex>()
    
    override suspend fun collect(collector: FlowCollector<T>) {
        val channel = Channel<T>()
        val mutex = Mutex(true)
        outputs[channel] = mutex
        
        try {
            while (true) {
                collector.emit(channel.receive())
                mutex.unlock()
            }
        } finally {
            outputs -= channel.also { it.close() }
        }
        
    }
    
    suspend fun emit(value: T) = coroutineScope {
        for ((channel, mutex) in outputs) {
            launch {
                try {
                    channel.send(value)
                } catch (_: ClosedSendChannelException) {
                    return@launch
                }
                mutex.lock()
            }
        }
    }
}

@dkhalanskyjb
Copy link
Collaborator

Here's a simple playground that shows some issues:

fun main(): Unit = runBlocking {
    val sharedFlow = WaitingSharedFlow<Int>()
    repeat(3) { collector ->
        launch(start = CoroutineStart.UNDISPATCHED) {
            sharedFlow.buffer(1).collect {
                println("$collector: received $it")
                delay(1000)
                println("$collector: finished receiving $it")
            }
        }
    }
    repeat(3) {
        println("Emitting $it")
        sharedFlow.emit(it)
        yield()
        println("Emitted $it")
    }
}

As you can see, a single buffer call will break the guarantee.

A less obvious example is combine:

fun main(): Unit = runBlocking {
    val sharedFlow = WaitingSharedFlow<Int>()
    val infiniteFlow = flow {
        var i = 0
        while (true) {
            emit(i++)
        }
    }
    repeat(3) { collector ->
        launch(start = CoroutineStart.UNDISPATCHED) {
            sharedFlow.combine(infiniteFlow){ a, _ -> a }.collect {
                println("$collector: received $it")
                delay(1000)
                println("$collector: finished receiving $it")
            }
        }
    }
    repeat(3) {
        println("Emitting $it")
        sharedFlow.emit(it)
        yield()
        println("Emitted $it")
    }
}

@odedniv
Copy link
Contributor

odedniv commented May 27, 2023

Re buffer: The whole point of buffer() is to break that guarantee by buffering the collection. Its documentation states that it the emitter will not wait for the collector. Nobody will say that emit() in flow { emit(...) } is bugged because when buffered it will not wait for the "final" collection even though it's meant to. This can be seen with:

// try with and without buffer(1)
fun main(): Unit = runBlocking(Dispatchers.Default) {
    val f = flow {
        for (i in 1..10) {
            println("before: $i")
            emit(i)
            println("after:  $i")
        }
    }
    f.buffer(1).collect { println("collect: $it") }

Re combine: The above is partially true there because combine's contract states that not all emissions will immediately reach the collectors because they are conflated (kind of buffering) until all combined flows emit at least once. That being said, the reason I asked "what's wrong with this implementation" is that I had an odd experience of it with combine. Now I think that combine's implementation wrongly "detaches" emissions from collections, even if all flows have emitted at least once. This can be seen with flow { emit(...) } with (that states emit() should wait for collection):

// Even after both flows start emitting, collection is not between the before/after.
fun main(): Unit = runBlocking(Dispatchers.Default) {
    val f1 = flow {
        for (i in 1..10) {
            println("f1 before: $i")
            emit(i)
            println("f1 after:  $i")
        }
    }
    val f2 = flow {
        for (i in 1..10) {
            println("f2 before: $i")
            emit(i)
            println("f2 after:  $i")
        }
    }
    combine(listOf(f1, f2)) { it.toList() }
        .collect { println("collect:  $it") }
}

I think my implementation stands (and I think Kotlin owners should consider something similar for SharedFlow), and that there's a separate bug to file for combine.

@firmeldir
Copy link

firmeldir commented May 30, 2023

This should work best for the mentioned case. Obviously, before any asynchronous flow transformation operators (aka Channel send receive)

class MutableSharedWaitFlow<T> : FlowCollector<T>, Flow<T> {

    private val collectorsMutex = Mutex()
    private val collectors = mutableSetOf<FlowCollector<T>>()

    override suspend fun emit(value: T) {
        val emitted = collectorsMutex.withLock { collectors }
        coroutineScope {
            emitted
                .map { collector ->
                    launch {
                        try {
                            collector.emit(value)
                        } catch (e: CancellationException) {
                            collectorsMutex.withLock { collectors.remove(collector) }
                        }
                    }
                }
                .joinAll()
        }
    }

    override suspend fun collect(collector: FlowCollector<T>) {
        collectorsMutex.withLock { collectors.add(collector) }
    }
}

SettingDust added a commit to SettingDust/ModSets that referenced this issue Aug 1, 2023
… config screen

fix #6
The kotlin flow emit won't waiting for subscribers collected. Kotlin/kotlinx.coroutines#2603
So, switching to `onEach` so that the loading is complete when emitted
@NorbertSandor
Copy link

This should work best for the mentioned case. Obviously, before any asynchronous flow transformation operators (aka Channel send receive)

class MutableSharedWaitFlow<T> : FlowCollector<T>, Flow<T> {
...

@odedniv, why do you think the solution of @firmeldir is incorrect? Thanks.

@odedniv
Copy link
Contributor

odedniv commented Dec 15, 2023

It looks like collect() returns immediately, which is incorrect. Sorry I just emoji'd instead of replying, dunno why I did that!

emit() seems to be waiting correctly without a mutex, making that part nicer than my solution. Maybe those 2 concepts can be merged.

@odedniv
Copy link
Contributor

odedniv commented Dec 15, 2023

Actually the collect() returning immediately is a simple problem - since it's a ShraedFlow we can just block forever there.

The worst problem is that collector.emit() is done in the wrong CoroutineContext - it should be done in the collect()'s context.

@odedniv
Copy link
Contributor

odedniv commented Apr 18, 2024

Another implementation with Channel for waiting instead of Mutex, and no ConcurrentHashMap that's I think isn't available in:

class WaitingSharedFlow<T>() : Flow<T> {
    private val allChannels = mutableSetOf<Channels<T>>()
    
    override suspend fun collect(collector: FlowCollector<T>) {
        val channels = Channels<T>()
        synchronized(allChannels) {
            allChannels += channels
        }
        
        try {
            while (true) {
                collector.emit(channels.data.receive())
                channels.done.send(Unit)
            }
        } finally {
            synchronized(allChannels) {
                allChannels -= channels.also { it.close() }
            }
        }
        
    }
    
    suspend fun emit(value: T) = coroutineScope {
        synchronized(allChannels) {} // Ensuring memory barrier with collectors.
        for (channels in allChannels) {
            launch {
                try {
                    channels.data.send(value)
                } catch (_: ClosedSendChannelException) {
                    return@launch
                }
                try {
                    channels.done.receive()
                } catch (_: ClosedReceiveChannelException) {}
            }
        }
    }
    
    private data class Channels<T>(val data: Channel<T> = Channel(), val done: Channel<Unit> = Channel()) {
        fun close() {
            data.close()
            done.close()
        }
    }
}

There's quite a lot of people here, any chance there will be an official API for this request?

@odedniv
Copy link
Contributor

odedniv commented Apr 18, 2024

I can send a PR if the maintainers agree to having this.

@hrofeh
Copy link

hrofeh commented Jun 26, 2024

@odedniv Liked your final solution, probably the cleanest way to approach this.
Would be nice if we got an official Kotlin API though.

Based my final solution on yours with small changes:

  1. Using Collections.synchronizedList instead of synchronizing manually, fits here since we don't sync on complex operations (also didn't see an issue with leaving channel close out of sync block).
  2. Made "data" & "done" channels private, SyncChannel exposes the needed api.
  3. Based on change 1, using Closeable in SyncChannel and Kotlin's use() function instead of try/finally.

Let me know what you think and if you found any issues with your original implementation since you wrote it.

internal interface SyncFlow<T> : Flow<T>
internal interface MutableSyncFlow<T> : SyncFlow<T> {
    suspend fun emit(value: T)
}
internal class ChannelsMutableSyncFlow<T> : MutableSyncFlow<T> {

    private var channels = Collections.synchronizedList(mutableListOf<SyncChannel<T>>())

    override suspend fun collect(collector: FlowCollector<T>) {
        createChannel().use { channel ->
            while (true) {
                channel.receive { collector.emit(it) }
            }
        }
    }

    private fun createChannel() =
        SyncChannel(onClose = { channels.remove(it) }).also { channels.add(it) }

    override suspend fun emit(value: T) = coroutineScope {
        synchronized(channels) { channels.forEach { launch { it.send(value) } } }
    }

    private class SyncChannel<T>(
        private val data: Channel<T> = Channel(),
        private val ack: Channel<Unit> = Channel(),
        private val onClose: (SyncChannel<T>) -> Unit
    ) : Closeable {

        suspend fun send(value: T) {
            try {
                data.send(value)
                ack.receive()
            } catch (_: ClosedReceiveChannelException) {} catch (_: ClosedSendChannelException) {}
        }

        suspend fun receive(block: suspend (T) -> Unit) {
            try {
                block(data.receive())
                ack.send(Unit)
            } catch (_: ClosedReceiveChannelException) {} catch (_: ClosedSendChannelException) {}
        }

        override fun close() {
            data.close()
            ack.close()
            onClose(this)
        }
    }
}

@odedniv
Copy link
Contributor

odedniv commented Jun 28, 2024

Looks nice, I really hope someone from the kotlinx.coroutines would look at these and find some unexpected issues... this is hard to get right.

@xRamazaNx
Copy link

Roamed the Internet in search of a solution to this problem. The latest realization from @hrofeh works well!
But I’m concerned about using while(true) in collect.

For myself, I’ve changed it this way.

override suspend fun collect(collector: FlowCollector<T>) = coroutineScope {
        createChannel().use { channel ->
                while (isActive) {
                        channel.receive { collector.emit(it) }
                }
                channel.close()
        }
}

@hrofeh
Copy link

hrofeh commented Jul 21, 2024

@xRamazaNx Thanks for verifying.
Regarding the "isActive", you don't really need it since the CancellationException thrown when we are waiting for the channel will abort the while loop.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests