-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Proposal: Flow pause cooperatively #2223
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
+1 to this feature. |
Can you please, elaborate on what you are trying to achieve? Can you provide a specific problem that you are running with SharedFlow into and that you are trying to solve? |
@elizarov , not OP, but I think a use case is pausing the collection of flows when an Android LifecycleOwner (e.g. Activity) goes into the background (lifecycle in |
Why this would be really needed? Do you have any specific use-case in mind? |
Stopped Lifecycle is still collecting.This is my main problem. (I am already using CoroutinePausing for this.)
SharedFlow recollecting
Essentially: recollecting is something, I can see to be something one 'often' wants to avoid. Core pointBy this 'interface' being in a common library, 3rd party libraries can use it to coordinate. If SharedFlow implemented this, it would be even more useful. It doesn't strictly have to: One would always have to provide CoroutinePausing in context. It would just seem more 'correct' if it was integrated throughout the library. (Pausing jobs was another request. Pausing jobs seems like a very complicated thing, and unsafe (and while I find pausing of coroutines useful too, I hope hard pausing of jobs will never be the chosen solution). This completely* solves that. - In addition to being effectively needed for unwasteful hot flows.) (*1: After I started using CoroutinePausing, I am slowly refactoring away all LiveData, as it is no longer needed. For now, I still have to pass CoroutinePausing in the sharing context, though.) (Also, to not come of as deceitful: I am not using SharedFlow yet: I am using 1.3.9 atm. I am looking forward to SharedFlow, and think potentially integrating CoroutinePausing might even change the design a little before it's fully launched. - It seems like an important thing to have with hot flows.) (*2: I am already wrapping all Room queries in (*3 In practice, it's better to replace SharedFlow with LiveData directly, as it will have less overhead. (this spreads and pollutes all with LiveData 'nodes' in the flow 'graph') - But then: using the LiveData as flow will again keep it running continuously.) |
@psteiger Yes. This is the main point. (Hot) flows have no way to inform upstream of being Active/Inactive like LiveData has. Even if this were not considered enough "value / cost" to be in (*1: unless there is some better alternative I'm missing) (*2: I have already written a simple alternative to SharedFlow (wrapping StateFlow) that I intend to use instead, until something like this is supported 'natively' . |
I've also just realized that pausing flows is kinda what I'm looking for. At least it looks like a viable solution. ScenarioThe following happens whenever my long Flow
1, 2 & 3 are basically states that are occasionally updated. Actual behaviorIf the entire Flow Desired behaviorI'd like to avoid all expensive work in 1, 2 & 3 as nothing has changed since the Flow Thoughts
For me, controlling the state a Flow is in and whether or not a Flow is working on that state are two different things. I'd like to control them separately. Not collecting -> Paused -> Collecting -> Paused -> Not collecting No work and no state |
Here's a very stupid implementation to better explain the idea: I basically pause the endless loop in the Flow builder while there are no subscribers instead of canceling it and starting over later on.
More functionality could probably be built on top then. E.g. that chaining pauses with upstream pausable Flows or pausing by external means like a method in a |
I'm not sure what that means. In my understanding, flow cannot change between cold and hot.
... When a flow is hot, it cannot be cold anymore. I guess a flow can start sort of cold, and then become hot, but I don't see how a hot flow could become cold. Are you using the words 'hot' and 'cold' to mean 'is being collected' and 'is not being collected' respectively ? That's not how they are used with flow. (as far as I know) |
From my understanding, You are proposing a What are some benefits over my idea? To me, it just seems less generic, less orthogonal, and ... I don't see any advantage. (Sorry if rude. I don't mean to bash. I am legitimately interested in advantages of your proposal, because I think Your use-case seems to match what I described under "SharedFlow recollecting" and is one of the main reasons for this proposal, 'namely' "recollecting a flow might be expensive/impossible and it's better to keep collecting it* and just inform it, that nobody is interested in emissions at the moment (=paused)" (* be it from a terminal collector, or from a Here is my idea behind pausing upstream (or any coroutine, really):
For more, please read what I wrote in the posts above. I think you will like the idea. (If not, please tell me why.) Here are some other points that I find in favor of
*1: If my idea was integrated into the library, I have already provided essentially all necessary implementation in the first post, except for changes to ---- Your code using my implementation (example of how it would look like if it was part of the library (and names were not changed)): https://gist.github.com/Maartyl/c44ea10fe04d6fcf924abe36099888b8 You also write "It will also automatically join pauses before and after emissions." - I think this is not always a good idea, but if you want it, you can use Regarding the difference in outputs: The flow starts paused, because there are no collectors yet. |
@Maartyl This is for example from Cold flows, hot channels:
Nevermind. I had to read it like five times to see the nuance here. 😅 It does refer to the entire Flow and doesn't mean that Looking at the documentation of
It is a little confusing in general. In my case I do mean "is being collected" and "is not being collected". |
@Maartyl regarding your other points:
No. My Flow's implementation merely uses
Doesn't sound rude and is a valid question. Potential issues with being able to pause coroutinesI have a few issues with being able to pause coroutines in general:
Alternatives for Android lifecyclesI've created multiple coroutine scopes to account for different lifecycle stages. I have a scope "resumed", a scope "started" and the standard scope "created".
That does require recollecting the Flow afterwords but in most cases is fine. For the remaining cases where I merely need a Flow to pause I've created another Flow that emits Android lifecycle updates (created <-> started <-> resumed … -> destroyed). By combining my upstream Flow with that Flow I can influence downstream work as needed. It's not as flexible and easy as properly pausing Flows but can serve as a building block to decide when to pause. Pausing Flows
All three points should be covered by my suggestion.
If pause events downstream are properly propagated upstream then it can be used using I'd avoid that though. If joining pauses is readily available for every single Flow or even in any suspending function then developers would assume that their code properly pauses if needed. However pausing only works if a downstream Flow actually makes use of it. Therefor I'd not make it too easy to use pausing functionality so that developers have to think twice whether it works in their case. We could still allow Flows to join pauses anywhere along the stream through a function that internally goes through the collector or the coroutine context. It should just make clear that it's cooperative with the downstream. And still no need to make it a generic coroutines feature.
Could you please give a use case here? I can't think of a scenario right now.
I'd argue the opposite :) Getting a feature right that's used across all coroutine functionality will likely need a lot of considerations. Starting with API, over how it interacts with all the other coroutine functionality, over how it behaves in many cases (like merging two pausable coroutine contexts when using
Yes, you're right. That's why I've said that it's just a stupid implementation for explaining the idea and that more functionality can be built on top then, for example properly communicating downstream pauses upstream.
And as mentioned before, I consider it a good thing if pausing is opt-in rather than automagically there. Also, the likeliness that downstream supports/uses pausing is probably low.
Yeah, something like
I'm open to not have
You're right. With my approach it should also start paused as the shared Flow defaults to How would, with your suggestion, intermediate collectors pause upstream collectors? They cannot set I wonder if @elizarov is already getting gray hair from this discussion 😁 |
General point:
This would be a very valid point, I thought, except, when a scope (lifecycle is just a special case) is paused, all 'child scopes' should also be paused by default. The same way cancellation works. I think it should pause everything under it.
I don't see how that would matter. It is running in a scope, that is paused.
Properly having 2 parent contexts (parent scopes) is not possible in Kotlin. This is a sad thing, but there is no support for it. I don't think we can, or even should, solve this for a single
(Having no default merging of parent contexts has worse problems than this: For example, it makes
That tends to be a very hard thing to do, and pausing is such a general concept, that I can see it popping up in multiple places independently. I am not completely against implementing pausing only for (hot) There have been requests for pausing
Exceptions.
That is not really a problem. Pausing is just a 'hint' anyway. It is worse to emit stale data, once the flow is unpaused. (in my opinion anyway)
It definitely needs a lot of consideration. It will still be 'easier' than writing an overload for every flow operator, so it has a variant, that can propagate pausing; or rewriting the whole
I agree this is necessary. Your way seems to need to rewriting huge amounts of code. (maybe not?) I think, either the How do you propose I agree, that
I don't know if I agree with this... I think it's quite obvious that a flow is paused only when... it's paused... (on the contrary, it would be confusing if flows were paused for no reason)
Originally, it seemed like you did not want any automatic propagation through the flow. If there is, then it's fine. (still extra work to get it from the flow to the place that does the expensive work, but works)
The
It is only available, not forced. I agree that forcing pausing is a horrible idea. Code that does not use it, will never be affected by it, unless it's using some function, that does want to be aware of pausing, and I believe it should not be hidden from functions, they are running in a paused context.
They will change context of the upstream flow. (e.g. (incorrect, but the idea)) https://gist.github.com/Maartyl/b40a07e6cb265dd9037a8d5745ebc0bd
For now, but people will realize it's usefulness over time. It can be provided by libraries, and downstream can be other
... Why? :D ((
'Dropped' ? Do you mean, cancelled, and recollected? I'm not saying |
This is an interesting idea... Can anyone think of a situation, where it would be useful, and actually change when code should or should not be paused? My thoughts: The coroutine (or upstream, if limited to |
(this comment is about my beliefs and opinions) Having two features, both good on their own, but that combine well, tends to be a sign of good design.
It makes sense to "pause execution context". (to me, at least) - And it would combine very nicely, because (very immature thought, probably not a good idea) A "paused execution context" can even Making the This is definitely something that needs to be thought through properly. I'm not saying it will be easy to introduce, or that it should be easy. It must be considered in all scenarios. (and, sadly, I do not have enough insight) BUT I believe it's best to first think a lot about design, and then usually implement only something quite simple and abstract, that ends up solving a lot by combining well with the rest of the features. Coroutines and (btw. I would not want to tie pausing with |
As I commented in #2194, to achieve the use case of Motivation 2: Lifecycle (Android), what I ended up doing is, instead of relying on pausable coroutines such as the ones launched with class Observer<T>(
lifecycleOwner: LifecycleOwner,
private val flow: Flow<T>,
private val collector: suspend (T) -> Unit
) : DefaultLifecycleObserver {
var job: Job? = null
override fun onStart(owner: LifecycleOwner) {
job = owner.lifecycleScope.launch {
flow.collect {
collector(it)
}
}
}
override fun onStop(owner: LifecycleOwner) {
job?.cancel()
job = null
}
init {
lifecycleOwner.lifecycle.addObserver(this)
}
}
inline fun <reified T> Flow<T>.observe(
lifecycleOwner: LifecycleOwner,
noinline collector: suspend (T) -> Unit
) = Observer(lifecycleOwner, this, collector)
inline fun <reified T> Flow<T>.observeIn(
lifecycleOwner: LifecycleOwner
) = Observer(lifecycleOwner, this, {}) Then I launch such coroutine with: sharedFlow.observe(lifecycleOwner) {
// ...
} In my opinion, it would be nice, however, to have a built-in, more generalistic, non-Android-specific concept of pausable coroutines that integrates well with all the coroutines library, as proposed in this issue. |
@psteiger this solution makes good sense to me. It seems like this is more often what you want than the existing Others: why is pausing preferable over cancelling and recreating? It feels like shareIn + MutableSharedFlow/subscriptionCount + psteigers start/stop collection cover all the use cases i can think of. |
Hi @tom-pratt ,
No edge cases that I know of so far.
That depends solely on the SharingStarted parameter used in
Hum... that's a tough one. I think the first collector of a shared flow with replay 0 is guaranteed, by design, to get all emitted values. Only subsequent collectors can miss events. So, if a shared flow with replay 0 emits some value during activity recreation and there is no collector, the first collector to (re)appear will receive it. But don't quote me on that, I'd need to do some testing to be 100% sure no events would be lost.
I think having built-in extension functions would be nice as this is such a common use case on Android.
|
@tom-pratt , everyone If a
If you mean configuration change, then any solution will necessarily have to recollect, as the bound view is destroyed and new created. - Only the Configuration change (=rotation) (Activity being recreated) is less related. (The lifecycle will be destroyed, and new collect will run in a new scope anyway.)
Pausing, however, can deal with configuration change (=rotation) better!
I agree that
I agree it's not outright needed for Android, but I consider the
As for using recollecting, I have a similar solution (more similar to what @fluidsonic posted):
Examples where I think
|
I just discovered this proposal recently, after writing an article about issues and limitations when using Kotlin Flow on Android. |
There is so much useful information in this discussion. If this proposal is somehow approved, I hope there will be a detailed article about it. |
I actually need this feature, just like: EDIT: |
@cbeyls where do things now stand:
|
For us, we are resolving this issue by introducing the concept of "user presence" to the flow. When the flow is collected, a special object is inserted into flow's The concept is very similar to the "pausing" concept above, but we think it has better semantic meaning (it's not clear what "pause" is and what should downstream do with it). Then some downstream flow can decide to collect this and change its behavior when user is present or not (for example, GPS location is turned off when app leaves the foreground), while the main flow is still being collected. That way we get the most benefits of both alternative approaches (re-collecting on every start/stop event or not recollecting at all). |
Basic idea
Value of this comes from libraries using it, but it is not imposed on anyone.
Motivation 1: SharedFlow
Motivation 2: Lifecycle (Android)
LifecycleCoroutineScope
.StateFlow<Boolean>
is easy to provide.)Benefits
isPaused: StateFlow<Boolean>
is not enough to communicate all 'intents' from downstream to upstream.Downsides
Conceptual usage
(names are poor, ignore those)
The text was updated successfully, but these errors were encountered: