Skip to content

Add start parameter to launchIn extension for Flow #1326

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
LouisCAD opened this issue Jul 7, 2019 · 10 comments
Open

Add start parameter to launchIn extension for Flow #1326

LouisCAD opened this issue Jul 7, 2019 · 10 comments

Comments

@LouisCAD
Copy link
Contributor

LouisCAD commented Jul 7, 2019

Usage would look like so:

someFlow.launchIn(lifecycleScope, start = CoroutineStart.UNDISPATCHED)

My current use-case is to have the UI be set up synchronously by the flow collection start, avoiding an extra unneeded layout pass in an Android app in case collecting the flow has such side-effects.

@elizarov
Copy link
Contributor

elizarov commented Jul 8, 2019

Can you give a slightly more context in your example, please (some more code around it). Where is your code that is trying to collect the flow? And what kind of flow you are collecting? I'm trying to grasp why it is important to be UNDISPATCHED in your case.

@elizarov elizarov added the flow label Jul 8, 2019
@fvasco
Copy link
Contributor

fvasco commented Jul 8, 2019

This is my use case:

import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch

suspend fun main() {
    coroutineScope {
        val eventBus = BroadcastChannel<Int>(5)
        val subscription = eventBus.asFlow()
        launch(/* start = CoroutineStart.UNDISPATCHED */) {
            subscription.collect { println(it) }
        }

        repeat(10) { eventBus.send(it) }
        eventBus.close()
    }
}

I used the same trick, an active subscription should not lost messages.

@LouisCAD
Copy link
Contributor Author

LouisCAD commented Jul 9, 2019

@elizarov, is the following example meaningful to you?

someFlow.onEach { // Start of collection of someFlow enables UI component.
    ui.updateUi(it) // If value is already available, we might benefit from
    // updating UI ASAP, not waiting for an extra dispatch.
}.launchIn(this) // We would prefer it to be undispatched to avoid 2 layout passes.

@elizarov
Copy link
Contributor

Would simply using Dispatchers.Main.immediate have the same effect (globally)?

@LouisCAD
Copy link
Contributor Author

It wouldn't exactly, as it would avoid dispatch for every coming value as opposed to the first one until the first suspension point.

Using Dispatchers.Main.immediate might be an alternative, but I read you discouraged its use as it might cause deadlocks in case of suspending code always taking non suspending paths.

Now that I'm thinking about it though, I wonder if this is plausible as even dispatched, collecting a flow leads to a suspending function to be called, which always passes through a dispatch through the ContinuationInterceptor making synchronous collection of first value impossible, right?

@elizarov
Copy link
Contributor

Both start = UNDISPATCHED and immediate dispatchers present similar dangers and provide similar optimization opportunities with respect to a avoiding an extra redispatch onto the event queue. That is why I'm asking.

Indeed, start = UNDISPATCHED is somewhat more controllable, more explicit and, thus, maybe the less dangerous of the two, so it might be warranted to support it for the cases where you understand what's going on and want to have a bit of an extra performance.

@fvasco
Copy link
Contributor

fvasco commented Jul 15, 2019

Hi @elizarov,
my use case is slightly different by @LouisCAD one, I made a little demo above.

I use BroadcastChannel as event bus, I made a subscription before any event was sent but, in the above example, all events were losts.

Frankly it is not really clear for my if the asFlow fits my use case.

@fvasco
Copy link
Contributor

fvasco commented Jul 15, 2019

#1340 can fix my use case

@steve-the-edwards
Copy link
Contributor

@qwwdfsad Is the behaviour for

A:

someFlow.onEach {
  updateValue()
}.launchIn(scope + Dispatchers.Unconfined)

equivalent to

B:

scope.launch(start = CoroutineStart.UNDISPATCHED, context = Dispatchers.Unconfined) {
   someFlow.collect {
     updateValue()
}

I would have thought so, given the intention of Dispatchers.Unconfined. In this case, updateValue() is thread safe but we want it to happen synchronously as this collection is setup.

I have a bug in a test (only on the test environment) where A does not happen synchronously but B does.

There are other complications involved (updateValue() is setting a Compose State value) but I'm not sure that's the culprit here yet.

@qwwdfsad
Copy link
Collaborator

See also #3679 and #3681

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants