Skip to content

Unexpected tryEmit behaviour #2387

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
jmp0x0000 opened this issue Nov 13, 2020 · 17 comments
Closed

Unexpected tryEmit behaviour #2387

jmp0x0000 opened this issue Nov 13, 2020 · 17 comments

Comments

@jmp0x0000
Copy link

tryEmit doesn't attempt to emit a value after first subscriber joined and returns false.
Setting replay or extraBufferCapacity > 0 or replacing tryEmit by emit resolves the issue

    @Test
    fun tryEmitExample() = runBlocking {
        val sharedFlow = MutableSharedFlow<Long>()

        val asyncReceiver = async(){
            delay(300)
                sharedFlow.collect{
                    println("Received on 1 $it")
                }
            println("Done")
        }

        repeat(4) {
            delay(100)
            val res = sharedFlow.tryEmit(System.currentTimeMillis())
            println("Emitted ${System.currentTimeMillis()} Subscribers: ${sharedFlow.subscriptionCount.value} try: $res")
        }
        asyncReceiver.cancel()
    }
Emitted 1605303026489 Subscribers: 0 try: true
Emitted 1605303026592 Subscribers: 0 try: true
Emitted 1605303026693 Subscribers: 1 try: false
Emitted 1605303026799 Subscribers: 1 try: false
    @Test
    fun emitExample() = runBlocking {
        val sharedFlow = MutableSharedFlow<Long>()

        val asyncReceiver = async(){
            delay(300)
            sharedFlow.collect{
                println("Received on 1 $it")
            }
            println("Done")
        }

        repeat(4) {
            delay(100)
            sharedFlow.emit(System.currentTimeMillis())
            println("Emitted ${System.currentTimeMillis()} Subscribers: ${sharedFlow.subscriptionCount.value}")
        }
        asyncReceiver.cancel()
    }
Emitted 1605303080955 Subscribers: 0
Emitted 1605303081061 Subscribers: 0
Received on 1 1605303081162
Emitted 1605303081166 Subscribers: 1
Received on 1 1605303081267
Emitted 1605303081267 Subscribers: 1
@Dominaezzz
Copy link
Contributor

Have a look at #2346

@jmp0x0000
Copy link
Author

It's not a bug, it's a feature... :)
This kind of implementation feels to me counterintuitive.

@elizarov
Copy link
Contributor

tryEmit (unlike emit) is not a suspending function, so it clearly cannot operate without a buffer where it can store emitted value for all the suspending subscribers to process. On the other hand, emit is suspending, so it does not need buffer space, as it can always suspend in case any of the subscribers are not ready yet.

@lukas1
Copy link

lukas1 commented May 29, 2021

@elizarov I would like to renew discussion about this one.

First of all, the documentation about default implementation of MutableSharedFlow() is positioned above SharedFlow. To me, this is completely unexpected. The way I usually look at the documentation is, that I cmd click on the type, that I am working with and look through documentation there. So I'd click on the invocation of MutableSharedFlow and read what it has to offer. That explanation is not present there at all, so I have completely overlooked it until I found this issue (which I had been specifically searching for).

I'd at least suggest to either move that piece of documentation above the function MutableSharedFlow, or if nothing else, add a link from documentation of MutableSharedFlow to documentation of SharedFlow with the explanation, that some concepts regarding the extraBufferCapacity and default values are explained in SharedFlow's documentation. (I'd probably vote for the former solution)

Second, the default implementation of MutableSharedFlow is unexpected to me. Probably nobody wants that implementation. I can't imagine there are many developers, who would expect, that if they create their flow like this: val testFlow = MutableSharedFlow<String>(), it means that all attempts to tryEmit() will immediately drop the event. Even if developers would expect that, it's rarely the case that they would want that default behavior.

So I'd suggest to remove the default parameter values at least for replay and extraBufferCapacity parameters and require developers to provide some values of their own, to match their needs. Default values make sense if a sensible default is chosen, but personally I don't believe replay=0, extraBufferCapacity=0 is a sensible default. With my suggestions, everybody will have to think about what values to choose and that way they'll avoid bugs in advance. (Somebody just starting to learn about coroutines API is guaranteed to use default implementation of MutableSharedFlow and expect tryEmit to work, they won't think twice about it)

What do you think about these suggestions?

Also, what do other devs think?

@Dominaezzz
Copy link
Contributor

Just my 2 cents.

I'm fairly use to Ctrl+Clicking up the inheritance hierarchy to find docs but maybe that's me. Some links could be nice I guess.

When I first used mutable shared flow, I 100% expected that behaviour, at some point I went to check the docs to confirm that it was like this. The fact that tryEmit has try in the name should warrant suspicion as to when it can fail, encouraging you to read the docs.
I was using it to replace the typical list of listeners pattern. If the list is empty, no one is listening, so no one gets the event and the event is dropped.

I personally prefer if all the arguments are zero by default, if I didn't ask for buffer capacity, don't give me any, seems fair. I hate the fact that in the WhileSubscribed constructor, it has non zero default, which to me was completely unexpected. I didn't ask for any replay cache timeouts, why am I getting one? It should be zero. Luckily I read the docs and explicitly set it now.

I feel the only people who might get confused with flow default behaviour, are those coming from some other reactive library and not carefully reading docs/making assumptions. But idk about this. There haven't been too many complaints about it.

@lukas1
Copy link

lukas1 commented May 29, 2021

I feel the only people who might get confused with flow default behaviour, are those coming from some other reactive library and not carefully reading docs/making assumptions. But idk about this.

I think people who are new to coroutines and Kotlin Flow in general would be confused about this as well. I myself was surprised when I encountered it first (and I'm not even new to reactive frameworks or coroutines) and so was a colleague of mine. I think this is confusing to anyone with little experience.

I personally prefer if all the arguments are zero by default, if I didn't ask for buffer capacity, don't give me any, seems fair. I hate the fact that in the WhileSubscribed constructor, it has non zero default

You're describing different situation. I am not suggesting to define non-zero default. I'm suggesting to force developer to select values by themselves, to force them to think about it.

In my projects a sensible default might be MutableSharedFlow(replay = 0, extraBufferCapacity = 1, bufferOverflow = BufferOverflow.DROP_OLDEST)

This might not serve everyone, so I don't want this to be the default, but at least to me, I NEVER want MutableSharedFlow(replay = 0, extraBufferCapacity = 0, bufferOverflow = BufferOverflow.SUSPEND). It would render tryEmit completely useless, which I don't want probably 100% of time. Should anybody find it necessary or useful, they will still be able to do so, they'll just specify the buffer capacity, to ensure they understand implications of their decision.

@elizarov
Copy link
Contributor

elizarov commented Aug 5, 2021

The above discussion definitely and proposal definitely has merit, but I would like to point out one key observation. The whole conundrum centers around tryEmit. If not for tryEmit, then the MutableSharedFlow defaults look reasonable.

@MatrixDev
Copy link

I feel the only people who might get confused with flow default behaviour, are those coming from some other reactive library

I have used coroutines, flows and channels a lot. And still this issue with tryEmit really caught me off-guard. It is very un-intuitive, especially as it is a direct replacement for broadcast channels, which AFAIK doesn't have such problems.

PS: I'm using MutableSharedFlow extensively in my project to send 'latest-important' events from non-suspend function to the suspend collectors.

@Dominaezzz
Copy link
Contributor

Looking at this again I guess one could argue that tryEmit should at least try to see if all subscribers are ready to receive without suspending.

There's a general lack of "rendezvous" when it comes to SharedFlows. See #2818 .

@MatrixDev
Copy link

One more solution (I don't like it but still) might be to just crash tryEmit when extraBufferCapacity = 0. IMHO these two together make no sense and developers should be informed about it.

It is far to easy to just forget to set extraBufferCapacity and finding this when fixing a bug in real world might get time consuming.

@lukas1
Copy link

lukas1 commented Nov 7, 2021

It is far to easy to just forget to set extraBufferCapacity and finding this when fixing a bug in real world might get time consuming.

This!

@wuseal
Copy link

wuseal commented Nov 22, 2021

Same problem here even I used coroutine for several years😂
I even thought that tryEmit equals launch { emit( value ) } in default MutableSharedFlow()

@LouisCAD
Copy link
Contributor

Got bitten by this, I wish the default value for extraBufferCapacity was 1, and was overridden to 0 for the shareIn operator.

@duquewu
Copy link

duquewu commented Jan 19, 2022

Got bitten by this too.

I prefer tryEmit because I want to try emit something without coroutine scope, and get the emit result is success or not.

When the default MutableSharedFlow<Int>( replay = 0, bufferCapacity = 0, onBufferOverflow = BufferOverflow.SUSPEND) meet with tryEmit function, tryEmit function is not working as my expected.

I notificed that tryEmitLocked inside in tryEmit has logic below, which the problem is.

- if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
    when (onBufferOverflow) {
-     BufferOverflow.SUSPEND -> return false // will suspend
      BufferOverflow.DROP_LATEST -> return true // just drop incoming
      BufferOverflow.DROP_OLDEST -> {} // force enqueue & drop oldest instead
    }
  }

Since the default bufferCapacity value is 0, and the default onBufferOverflow value is BufferOverflow.SUSPEND when I create a shared flow with MutableSharedFlow<Int>(), so the tryEmitLocked function is terminated before emit action, thus make my collector could not receive the item which I try to emit.

@duquewu
Copy link

duquewu commented Jan 19, 2022

Maybe the extraBufferCapacity should be 1 in MutableSharedFlow like below?

public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 1,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>

@MatrixDev
Copy link

@geek5nan, yes, it should be. but it should be 1 by default, otherwise people encounter unexpected errors like these.

@DjakaTechnology
Copy link

I just got bitten by this, spend a lot of time figuring out why tryEmit() doesn't work. I think it should be documented properly or throw exception if trying to do tryEmit() with in proper setup because it basically unusable

Nodrex added a commit to Nodrex/kotlinx.coroutines that referenced this issue Oct 20, 2022
Coroutines and Flows are amazing I love them and many many thanks for them 🤩🤩🤩🤩
You guys in JetBrains rock 🤘🤘🤘 🚀🚀🚀
I have a small suggestion:
According to this big discussion: 
Kotlin#2387 
and a lot of confusion, including myself, default value of `extraBufferCapacity` should be 1 instead of 0 and I think this parameter also needs better explanation
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants