Skip to content

Most of the times sharedFlow doesn't emit the initial value #2338

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
CoreFloDev opened this issue Oct 26, 2020 · 14 comments
Open

Most of the times sharedFlow doesn't emit the initial value #2338

CoreFloDev opened this issue Oct 26, 2020 · 14 comments
Labels

Comments

@CoreFloDev
Copy link

Hi,

I have been testing sharedFlow over the weekend and I got a strange behaviour with it.
I have got some code like that:

class MainScreen(
    private val incrementCounterUseCase: IncrementCounterUseCase,
    private val loadMovieListUseCase: LoadMovieListUseCase
) : Screen<MainInput, MainOutput>() {

    override fun output(): Flow<MainOutput> = input.receiveAsFlow()
        .let(inputToAction())
        .let { stream ->
            val upstream = stream.shareIn(scope, SharingStarted.Lazily)

            listOf(
                upstream.filterIsInstance<Action.InitialAction>().let(loadMovieListUseCase()),
                upstream.filterIsInstance<Action.IncrementNumber>().let(incrementCounterUseCase())
            )
                .merge()
        }

    companion object {
        fun inputToAction() = FlowTransformer<MainInput, Action> { flow ->
            flow.map { input ->
                when (input) {
                    MainInput.Click -> Action.IncrementNumber
                    MainInput.RetryClicked -> Action.InitialAction
                }
            }.onStart {
                emit(Action.InitialAction)
            }
        }
    }
}

If I debug using doOnEach, I was able to see that the Action.InitialAction is emitted every time. However after the shareIn, the stream is not always setup and the function loadMovieListUseCase doesn't receive that InitialAction. And the strange part, it is that the next event on the stream, always respond properly. I have tried both SharingStarted.Lazily and Eagerly, it seems that eagerly make it worth.

Maybe I am missing something, note that I have also tried with coroutine 1.4.0 and it didn't fixed the issue.

Thank you for your time!

@CoreFloDev
Copy link
Author

I have noticed that if I add a delay(100) on the onStart it works fine everytime

@diousk
Copy link

diousk commented Oct 28, 2020

Have you tried onSubscription mentioned in #2034 ?

It is similar to onStart with one big difference. onStart calls the action before a subscription to the shared flow is established. It means that if onStart action initiates an operation that emits the values into the shared flow, there is no guarantee that those emissions will be received by this downstream collector. However, onSubscription calls the action after a subscription to the shared flow is established, guaranteeing the reception of all the emitted values after this moment (assuming they are not dropped on buffer overflow).

Is that the case?

@CoreFloDev
Copy link
Author

Hi @diousk

Thanks for your reply. I had a look at it when you mentioned onSubscription. I think that operator would work but it seems that it isn't available upstream on the flow class.

@elizarov
Copy link
Contributor

The onSubscription operator is supported only on a SharedFlow by design, since only shared flows have a concept of "subscription".

Does it help?

@CoreFloDev
Copy link
Author

Hi @elizarov

yes I got that thanks. But that's still an issue for me as that onStart/OnSubscribe action if made on the upstream. I think it would help if the first value received by a sharedFlow was waiting for it initialisation before consuming it. Would that be possible to do?

Thanks!

@qwwdfsad qwwdfsad added the flow label Nov 12, 2020
@elizarov
Copy link
Contributor

@CoreFloDev Can you, please, explain in more detail what are you trying to achieve. Please, give some more background. What kind of events you are sending? How many consumers of those events your application architecture has? Maybe you are looking for a Channel to transfer your data, not a shared flow?

@CoreFloDev
Copy link
Author

Sure, I am creating a classic map reduce application also called redux pattern.

the overall architecture looks like this:
Screenshot from 2020-11-24 23-00-19

I am also using the action mapper as a kicker to start the system. To do so, it send an initialAction event that is consumed by all the useCases that needs to execute at the application startup (usually a network request to fetch some data).

Also by digging more into it and with the help of a friend, we figured out that the system was working if we used the sharedflow like this:

val upstream = stream.shareIn(scope, SharingStarted.Eagerly, 1)

It seems that in that configuration, the way it is used make that it never do the replay, I am not really sure why.

Does that help? I can also share the entire project but that could be a bit too much of code.

Thanks!

@twyatt
Copy link
Contributor

twyatt commented Nov 24, 2020

@CoreFloDev are you possibly running into a similar behavior I was initially confused by as well? #2069 (comment)

@elizarov
Copy link
Contributor

@CoreFloDev Can you, please, share some more snippets of code -- a full example of the code for one use-case would be great.

@CoreFloDev
Copy link
Author

Sure the code lives there: https://github.com/CoreFloDev/flowarch/blob/main/app/src/main/java/com/example/flow_arch/main/arch/MainScreen.kt, I have make the repository public as it is only a prototype for a migration from rxjava to flow. The overall structure takes the shape of an existing rxjava implementation.

The usecases implementation are available here:
https://github.com/CoreFloDev/flowarch/tree/main/app/src/main/java/com/example/flow_arch/main/usecases

Let me know if that helps, thanks!

@elizarov
Copy link
Contributor

elizarov commented Dec 2, 2020

In your particular architecture the easiest solution I see if to inject InitialAction downstream from shareIn operator, not upstream of it. You can also change replay to zero:

stream.shareIn(scope, SharingStarted.Eagerly).onStart { emit(Action.InitialAction) }

This way, every subscriber is guaranteed to get InitialAction first, followed by further (shared) actions.

@CoreFloDev
Copy link
Author

That's similar to the onSubscribe solution, I guess that's all the possible solution at the moment. Maybe I was thinking to build a new type of SharingStarted that would support onStart from upstream, do you think that's achievable? I am not very familiar with coroutine internals.

@elizarov
Copy link
Contributor

elizarov commented Dec 4, 2020

In general -- no. The design of the flow is very simple and it does not support the concept of "subscription" in general case. For an arbitrary flow you cannot tell when the "subscription" had happened, so that's why onSubscribe operator is only supported by special, shared flows.

@CoreFloDev
Copy link
Author

CoreFloDev commented Sep 27, 2021

Hi,

My friend Pawel also got that issue, he found a way around my using a merge like this:

        fun inputToAction() = FlowTransformer<MainInput, Action> { flow ->
            merge(flowOf(Action.InitialAction),
                flow.map { input ->
                    when (input) {
                        MainInput.Click -> Action.IncrementNumber
                        MainInput.RetryClicked -> Action.InitialAction
                    }
                })
        }

Just posting the solution for future reference in case someone get the same issue
Thanks!

Erratum: Nan doesn't work still

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants