Skip to content

Proposal: Flow pause cooperatively #2223

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
Maartyl opened this issue Sep 1, 2020 · 24 comments
Open

Proposal: Flow pause cooperatively #2223

Maartyl opened this issue Sep 1, 2020 · 24 comments
Labels

Comments

@Maartyl
Copy link

Maartyl commented Sep 1, 2020

Basic idea

interface CoroutinePausing : CoroutineContext.Element {
    val isPaused: StateFlow<Boolean>

    override val key: CoroutineContext.Key<*> get() = Key
    companion object Key : CoroutineContext.Key<CoroutinePausing>
}
  • Downstream may provide this to signal, if 'currently interested in emissions'. If CoroutinePausing is not in context, downstream is automatically considered Unpaused.
  • Upstream may respect that, and observe it to avoid unnecessary work.

Value of this comes from libraries using it, but it is not imposed on anyone.

Motivation 1: SharedFlow

  • upstream is Unpaused when*: at least one collector is Unpaused (or, equivalently: Paused when: no collectors, or all Paused)
    • It is mostly orthogonal, and the key can just always be provided to the upstream flow, regardless of other strategies. (Although, some pausing strategies can just be equivalent to wrapping the upstream flow in some pausing-aware operator.)
    • (* Should also respect Key from context in which it runs. Probably as 'if context is paused, pause upstream even if there are Unpaused collectors'.)
  • Makes it possible to pause upstream without 'recollecting' it, which might be expensive or impossible.

Motivation 2: Lifecycle (Android)

  • Key can be provided in LifecycleCoroutineScope.
  • Libraries (e.g. Room) would use this just like they do with LiveData now.
  • This was the (I think) last case where LiveData was needed over just using Flow everywhere.
  • (Obviously, not only Lifecycle. Can be tied to, say, a view being visible or not. It's very flexible and StateFlow<Boolean> is easy to provide.)

Benefits

  • Simple.
  • Lots of value for little work.
  • No change to most of codebase. (library or clients)
  • Direct communication between both ends of flow by default. (only 'intercepted' when needed: i.e. SharedFlow, or operators that add pausing, or even prevent pausing...)
  • No need to affect the 'processing' part of a flow: Directly pauses source, and thus nothing is emitted through the flow.
  • Significantly simplifies usage of SharedFlow, and potentially even internals.
  • Can be easily extended, if it turns out isPaused: StateFlow<Boolean> is not enough to communicate all 'intents' from downstream to upstream.
  • Not limited to Flow: Can be a general pausing mechanism for all coroutines. (hence my initial name above not being DownstreamPausing or HotFlowPausing)

Downsides

  • Requires cooperation. (but that is already the case for coroutine cancellation, etc.)
    • Possibly even a good thing, as 'pausing' in an arbitrary point of execution is probably a horrible idea.

Conceptual usage

(names are poor, ignore those)

suspend fun awaitUnpaused() {
    val p = coroutineContext[CoroutinePausing] ?: return
    if (!p.isPaused.value) return
    p.isPaused.takeWhile { it }.collect()
}

//to wrap a flow that does not support this mechanism:
fun <T> Flow<T>.awaitingUnpaused() = flow {
    awaitUnpaused()
    collect {
        emit(it)
        //after emit, so an old value is not emitted later, once unpaused
        // instead immediately 'resumes' upsteram
        awaitUnpaused()
    }
}

//to wrap a flow that does not support this mechanism:
fun <T> Flow<T>.recollectWhenUnpaused(timeoutMillis: Long = 0) = flow<T> {
    //this is, I believe, the currently intended way for SharedFlow to handle upstream 'pausing'
    //it could be separated from SharedFlow
    TODO("stop collecting when paused; start collecting when unpaused")
}
@psteiger
Copy link

+1 to this feature.

@elizarov
Copy link
Contributor

Can you please, elaborate on what you are trying to achieve? Can you provide a specific problem that you are running with SharedFlow into and that you are trying to solve?

@psteiger
Copy link

psteiger commented Sep 17, 2020

@elizarov , not OP, but I think a use case is pausing the collection of flows when an Android LifecycleOwner (e.g. Activity) goes into the background (lifecycle in onPause() or onStop()). This is closely related to #2194. I'm not sure how subscriptionCount will work for SharedFlow, but I believe counters will not decrease if a collection job, launched in a LifecycleCoroutineScope with launchWhenStarted(), gets paused, so subscriptionCount in itself would not be enough to serve as a substitute for LiveData's onActive()/onInactive() as suggested in #2194.

@elizarov
Copy link
Contributor

Why this would be really needed? Do you have any specific use-case in mind?

@Maartyl
Copy link
Author

Maartyl commented Sep 17, 2020

@elizarov

Stopped Lifecycle is still collecting.

This is my main problem. (I am already using CoroutinePausing for this.)

  • both flow.launchIn(lifecycleScope) and lifecycleScope.launchWhenStarted{flow.collect{}} will collect until the Lifecycle is destroyed. That includes all the time the (say) activity is stopped . It is in the back-stack, and updating all views, wastefully.
  • LiveData solves this. It informs 'upstream' if it has any started observers.
  • Only sensible way to use Flow currently is: flow.asLiveData().observe(lfo){} which will automatically cancel collection after Lifecycle stopped (after timeout) and recollect when started again. (issues with recollecting below) *3
  • For this reason: one still has to use LiveData, even though I would prefer to use Flow everywhere. *1

SharedFlow recollecting

  • Not all flows can be recollected.
    • example: any flow that (somewhere inside) collects ReceiveChannel.consumeAsFlow() .
    • example detail: a flow can be: consumeAsFlow().combine(expensiveHotFlow)...{expensive processing} where expensive hot flow could benefit from knowledge that it should not produce anything at the time. Even 'expensive processing' could use it... (I did not need this yet, but if I will, there is no way around it but to keep the flow running.)
  • Recollecting a flow might have unwanted side-effects (even if possible).
    • example I already encountered: recollecting Room query causes the (in this case expensive) query to run again, even though the database has not changed, and then the whole subsequent processing (also expensive) runs again. In this case it just wastes energy, but it happens every time user returns from sub-activity or a fragment is reStarted, etc. *2

Essentially: recollecting is something, I can see to be something one 'often' wants to avoid.

Core point

By this 'interface' being in a common library, 3rd party libraries can use it to coordinate.

If SharedFlow implemented this, it would be even more useful. It doesn't strictly have to: One would always have to provide CoroutinePausing in context. It would just seem more 'correct' if it was integrated throughout the library.

(Pausing jobs was another request. Pausing jobs seems like a very complicated thing, and unsafe (and while I find pausing of coroutines useful too, I hope hard pausing of jobs will never be the chosen solution). This completely* solves that. - In addition to being effectively needed for unwasteful hot flows.)
(* as long as the code that needs to be paused cooperates)

(*1: After I started using CoroutinePausing, I am slowly refactoring away all LiveData, as it is no longer needed. For now, I still have to pass CoroutinePausing in the sharing context, though.) (Also, to not come of as deceitful: I am not using SharedFlow yet: I am using 1.3.9 atm. I am looking forward to SharedFlow, and think potentially integrating CoroutinePausing might even change the design a little before it's fully launched. - It seems like an important thing to have with hot flows.)

(*2: I am already wrapping all Room queries in .awaitingUnpaused() and it works well, but I imagine it would work better, and be more efficient, if it was implemented in Room directly.)

(*3 In practice, it's better to replace SharedFlow with LiveData directly, as it will have less overhead. (this spreads and pollutes all with LiveData 'nodes' in the flow 'graph') - But then: using the LiveData as flow will again keep it running continuously.)

@Maartyl
Copy link
Author

Maartyl commented Sep 17, 2020

@psteiger Yes. This is the main point. subscriptionCount will not change when lifecycle is stopped.
Even if it did, a flow far upstream would not have access to it.
Even if it did, it would be tightly coupled with SharedFlow - all it cares about is 'if someone is interest in it running'.

(Hot) flows have no way to inform upstream of being Active/Inactive like LiveData has.


Even if this were not considered enough "value / cost" to be in kotlinx.coroutines
I consider it an absolute must-have, and will use it regardless. *1
I just think others could benefit from it too.
It would also be nicer for me, if it were integrated with core library functions, and with other libraries. *2

(*1: unless there is some better alternative I'm missing)

(*2: I have already written a simple alternative to SharedFlow (wrapping StateFlow) that I intend to use instead, until something like this is supported 'natively' .
Also, my implementations are definitively not as efficient as properly integrated solutions would have been.)

@fluidsonic
Copy link

fluidsonic commented Oct 14, 2020

I've also just realized that pausing flows is kinda what I'm looking for. At least it looks like a viable solution.

Scenario

The following happens whenever my long Flow turns hot is being collected:

  1. Load lots of cached data from database (minutes)
  2. Periodically refresh data from server when cache is stale (seconds to minutes, against rate-limited API)
  3. Perform expensive transformations whenever new data is available (many minutes)
  4. Actually consume the final data (e.g. in my case print to console on demand) (quick)

1, 2 & 3 are basically states that are occasionally updated.

Actual behavior

If the entire Flow turns cold all the way to upstream is no longer collected and then hot collected again (same Flow instance), the expensive chain starts all over.

Desired behavior

I'd like to avoid all expensive work in 1, 2 & 3 as nothing has changed since the Flow has turned cold was previously collected. It's still the same Flow instance.

Thoughts

  • While this is about a single Flow instance there can be multiple collectors. The state however is "per Flow", not "per collector per Flow". That means some form of sharing must be involved after (1), (2) and (3).
  • distinctUntilChange() doesn't help in the cold case after the Flow was no longer collected as it will lose its previous value needed for comparison.
  • Making upstreams hot with stateIn() is not helpful because it wouldn't just keep the upstream's state live but also whatever work is performed. E.g. periodic data updates from the servers wouldn't stop.
  • stateIn() will replay state downstream at all levels. While it's desired for 3->4 it's not desired for 1->2 and 2->3 because they would unnecessarily have to perform expensive work again. Their input hasn't changed.
  • distinctUntilChange() and stateIn() use equals to avoid redundant emissions downstream. That operation is also very expensive for large data sets. I need to keep lists with millions of elements up-to-date in memory.
  • Somehow forcing backpressure is also not useful as it would only pause upstream work after expensive work was performed, i.e. upon emission.
  • Using stateIn has the additional problem that an initial value needs to be provided upfront. However downstream should not receive anything until the entire Flow was run through at least once with actual data. Since that data is nullable, I'd have to resort to either using <Any> Flows and set a special marker as initial value or wrap every single item in another class and make that nullable.

For me, controlling the state a Flow is in and whether or not a Flow is working on that state are two different things. I'd like to control them separately.

Not collecting -> Paused -> Collecting -> Paused -> Not collecting

No work and no state during Cold while not collected.
No work during Paused.
Has state and is working while Hot being collected.

@fluidsonic
Copy link

Here's a very stupid implementation to better explain the idea:
https://gist.github.com/fluidsonic/01702dbab744595a8dbdd41befe6829c

I basically pause the endless loop in the Flow builder while there are no subscribers instead of canceling it and starting over later on.

pausableFlow { … } creates a SharedFlow. It keeps the passed suspendable block hot while there are no subscribers. The block however can call joinPause() at any time to suspend itself if the Flow is paused. It will automatically resume once the Flow has subscribers again and is unpaused. It will also automatically join pauses before and after emissions.

More functionality could probably be built on top then. E.g. that chaining pauses with upstream pausable Flows or pausing by external means like a method in a PausableFlow.

@Maartyl
Copy link
Author

Maartyl commented Oct 15, 2020

@fluidsonic

Flow turns cold all the way to upstream and then hot again

I'm not sure what that means. In my understanding, flow cannot change between cold and hot.

  • Cold : It recreates the result from the start, every time. Every collector will get all emits from the beginning, independently. Upstream waits for downstream (each call to emit) before continuing producing. Upstream will never 'change' (would produce emits) when not collected. (and usually, runs through a 'list' of things, and completes, like an iterator)
  • Hot : Collector does not receive all emits from the beginning, only since they started collecting (+ a few from repeat, possibly). Upstream does not wait for downstream: it is some kind of 'live' (hot) thing, that changes on it's own, and only emits events, when someone 'is subscribed' (=collects). Upstream will 'change' (would produce emits) whether it is being collected or not. (and usually does not complete, and represents subscription, until collector is cancelled)

... When a flow is hot, it cannot be cold anymore. I guess a flow can start sort of cold, and then become hot, but I don't see how a hot flow could become cold.

Are you using the words 'hot' and 'cold' to mean 'is being collected' and 'is not being collected' respectively ? That's not how they are used with flow. (as far as I know)

@Maartyl
Copy link
Author

Maartyl commented Oct 15, 2020

@fluidsonic

Here's a very stupid implementation to better explain the idea:
https://gist.github.com/fluidsonic/01702dbab744595a8dbdd41befe6829c

From my understanding, You are proposing a sharedFlow, which instead of providing 'subscriptionCount' only provides 'subscriptionCount > 0' ...? (but through a special collector)

What are some benefits over my idea? To me, it just seems less generic, less orthogonal, and ... I don't see any advantage. (Sorry if rude. I don't mean to bash. I am legitimately interested in advantages of your proposal, because I think CoroutinePausing is very good, but I might be wrong, and would like to combine any good ideas if possible and useful.)

Your use-case seems to match what I described under "SharedFlow recollecting" and is one of the main reasons for this proposal, 'namely' "recollecting a flow might be expensive/impossible and it's better to keep collecting it* and just inform it, that nobody is interested in emissions at the moment (=paused)" (* be it from a terminal collector, or from a SharedFlow etc.)

Here is my idea behind pausing upstream (or any coroutine, really):

  • They are not forced to pause, that's important.
  • It is also for collectors that will 'stay collecting' even as they are paused. (that includes SharedFlow - which is effectively collecting the wrapped flow *1)
    • (but also a paused activity in android, or something you don't want to stop collecting, but only want to pause collecting temporarily)
  • All of upstream can use it, without explicitly passing a reference anywhere, that would be tightly coupled with it being used in a SharedFlow or pausingFlow.

For more, please read what I wrote in the posts above. I think you will like the idea. (If not, please tell me why.)

Here are some other points that I find in favor of CoroutinePausing:

  • Your solution will not work with all(any) existing operators: the PausableFlowCollector will not propagate through. They will however propagate CoroutineContext automatically. Even across ChannelFlow (so, paused downstream can be easily observed from asynchronous producers).
    • CoroutinePausing way assumes there are many operators between the final collector, and the producer: all of which will work exactly the same way as now, unchanged, even if pausing was introduced. (all, except those that could optionally take advantage of pausing mechanism; and shared flow, which would provide pausing similar to what you are proposing, but in this generic way)
  • Producer does not need to be tightly coupled with a PausableFlowCollector
    • The whole coroutine is simply aware, whether it is paused or not. (and the reason for being paused could even be different (in the future) from some "pausabe Flow")
    • You would need to pass a reference to all the code that might need it, instead of just deriving the scope.
  • Most likely, it is much simpler to implement.
  • Only using subscriberCount is not enough, as the subscriber can be paused too: One needs 'unpaused subscriber count'. (I tend to forget about this too, but it's crucial for big flow graphs)

*1: If my idea was integrated into the library, SharedFlow would already work as a pausingFlow (i.e. making the wrapped flow aware whether it has any (not paused again!) collectors) but unlike PausableFlowCollector or subscriptionCount which are only available directly to the code that is using the sharedFlow this will provide it in coroutineContext of the entire upstream. Any coroutine, anywhere in the producer can access it, and supend itself until the context it lives in becomes unpaused. (simply by calling awaitUnpaused()) Even code in libraries, that has no idea about you using it in some sharedFlow it has no reference to.

I have already provided essentially all necessary implementation in the first post, except for changes to ShareFlow implementation and a few utility methods, like Flow.pausedWhen(isPaused:StateFlow<Boolean>) that would make upstream paused when either this isPaused is true, or the one provided from downstream context is - one needs unsafe flow, to implement this, as it changes context, though. There is obviously a few more, but none too complicated. (I can share what I'm using (in production, btw.) but it's still very rough.)

---- Your code using my implementation (example of how it would look like if it was part of the library (and names were not changed)): https://gist.github.com/Maartyl/c44ea10fe04d6fcf924abe36099888b8

You also write "It will also automatically join pauses before and after emissions." - I think this is not always a good idea, but if you want it, you can use awaitingUnpaused() above (but nobody is forced to use it), or alternative that even awaits before the emit (but I explained in the comment, why I think it's a bad idea to generally pause before the emit). It does not use finally, as I think a failure should propagate immediately regardless.

Regarding the difference in outputs: The flow starts paused, because there are no collectors yet. pausingStateIn works as shareIn(...,started=Eagerly) and only starts paused, instead of starting lazily. If it did start lazily, the flow would start unpaused.

@fluidsonic
Copy link

fluidsonic commented Oct 15, 2020

@Maartyl the terminology doesn't seem to be 100% consistent.

This is for example from Cold flows, hot channels:

Terminal operators on a flow collect all values emitted by the flow, activating the flow code only for the duration of the corresponding operation. It makes the flow cold — it is not active before the call to terminal operation, not active after, releasing all resources before returning from the call.

According to that, a Flow becomes cold as soon as the last collector is gone.

Nevermind. I had to read it like five times to see the nuance here. 😅 It does refer to the entire Flow and doesn't mean that collect makes it cold.

Looking at the documentation of SharedFlow for example that doesn't seem to always be the case:

A hot Flow that shares emitted values among all its collectors in a broadcast fashion, so that all collectors get all emitted values. A shared flow is called hot because its active instance exists independently of the presence of collectors. This is opposed to a regular Flow, such as defined by the flow { ... } function, which is cold and is started separately for each collector.

It is a little confusing in general.

In my case I do mean "is being collected" and "is not being collected".

@fluidsonic
Copy link

fluidsonic commented Oct 15, 2020

@Maartyl regarding your other points:

From my understanding, You are proposing a sharedFlow, which instead of providing 'subscriptionCount' only provides 'subscriptionCount > 0' ...? (but through a special collector)

No. My Flow's implementation merely uses subscriptionCount to detect if the Flow is being collected at least once or not.

What are some benefits over my idea? To me, it just seems less generic, less orthogonal, and ... I don't see any advantage. (Sorry if rude. I don't mean to bash. I am legitimately interested in advantages of your proposal, because I think CoroutinePausing is very good, but I might be wrong, and would like to combine any good ideas if possible and useful.)

Doesn't sound rude and is a valid question.

Potential issues with being able to pause coroutines

I have a few issues with being able to pause coroutines in general:

  1. It's shared mutable state. It can be very hard to reason about when a coroutine was paused and where because isPaused would be accessible everywhere. That's a likely source of bugs. It's already difficult enough to figure out why and where a coroutine was canceled. Nevermind, I've misread that.
  2. Just because a lifecycle is considered paused doesn't mean that all coroutines in that lifecycle's scope should be paused. That would effectively be the case here. Coroutines would not have the ability to difference between "is paused because X" and "is paused because Y". What if two parent contexts pause based on different scenarios? How do they cooperate?
  3. All examples seem to be Flow-related. Pausing can be designed much better and easier for Flows than for coroutines in general. Such a solution or something similar could be provided for Flows at first and if useful still be extended to all coroutines later on.
  4. There are ways to model Android lifecycles in a more reliable way. I've had the same problem in an app I was writing. See below.

Alternatives for Android lifecycles

I've created multiple coroutine scopes to account for different lifecycle stages. I have a scope "resumed", a scope "started" and the standard scope "created".

  • As soon as the activity is paused, everything in the "resumed" scope is canceled.
  • Once it's stopped, everything in the "started" lifecycle is canceled.

That does require recollecting the Flow afterwords but in most cases is fine. For the remaining cases where I merely need a Flow to pause I've created another Flow that emits Android lifecycle updates (created <-> started <-> resumed … -> destroyed). By combining my upstream Flow with that Flow I can influence downstream work as needed. It's not as flexible and easy as properly pausing Flows but can serve as a building block to decide when to pause.

Pausing Flows

Here is my idea behind pausing upstream (or any coroutine, really):

  • They are not forced to pause, that's important.
  • It is also for collectors that will 'stay collecting' even as they are paused. (that includes SharedFlow - which is effectively collecting the wrapped flow *1)
    (but also a paused activity in android, or something you don't want to stop collecting, but only want to pause collecting temporarily)
  • All of upstream can use it, without explicitly passing a reference anywhere, that would be tightly coupled with it being used in a SharedFlow or pausingFlow.

All three points should be covered by my suggestion.

  • It's cooperative through joinPause. As for automatically pausing when emitting we could remove that and only allow for explicit joining of pauses.
  • Collectors can also easily become pausable by flatMap-ing to a pausableFlow. If that's a common case there could also be a special operator like collectPausable.
  • For the same reason everywhere in the upstream you can use joinPause by flatMap-ing to a pausableFlow. It just requires the upstream communication of the paused state downstream as I've mentioned before. So it must be coorperative across the entire Flow.

Your solution will not work with all(any) existing operators: the PausableFlowCollector will not propagate through. They will however propagate CoroutineContext automatically. Even across ChannelFlow (so, paused downstream can be easily observed from asynchronous producers).

If pause events downstream are properly propagated upstream then it can be used using flatMap as mentioned above. Theoretically we could add an extension to FlowCollector to allow something like joinPause in any operator that uses it.

I'd avoid that though. If joining pauses is readily available for every single Flow or even in any suspending function then developers would assume that their code properly pauses if needed. However pausing only works if a downstream Flow actually makes use of it. Therefor I'd not make it too easy to use pausing functionality so that developers have to think twice whether it works in their case.

We could still allow Flows to join pauses anywhere along the stream through a function that internally goes through the collector or the coroutine context. It should just make clear that it's cooperative with the downstream. And still no need to make it a generic coroutines feature.

Producer does not need to be tightly coupled with a PausableFlowCollector
Why not? See above why I think that opt-in should be preferred.

You would need to pass a reference to all the code that might need it, instead of just deriving the scope.

Could you please give a use case here? I can't think of a scenario right now.

Most likely, it is much simpler to implement.

I'd argue the opposite :) Getting a feature right that's used across all coroutine functionality will likely need a lot of considerations. Starting with API, over how it interacts with all the other coroutine functionality, over how it behaves in many cases (like merging two pausable coroutine contexts when using withContext, launch, etc.) and so on. I see a lot of rabbit holes lurking there.

Only using subscriberCount is not enough, as the subscriber can be paused too: One needs 'unpaused subscriber count'. (I tend to forget about this too, but it's crucial for big flow graphs)

Yes, you're right. That's why I've said that it's just a stupid implementation for explaining the idea and that more functionality can be built on top then, for example properly communicating downstream pauses upstream.

If my idea was integrated into the library, SharedFlow would already work as a pausingFlow (i.e. making the wrapped flow aware whether it has any (not paused again!) collectors) but unlike PausableFlowCollector or subscriptionCount which are only available directly to the code that is using the sharedFlow this will provide it in coroutineContext of the entire upstream. Any coroutine, anywhere in the producer can access it, and supend itself until the context it lives in becomes unpaused. (simply by calling awaitUnpaused()) Even code in libraries, that has no idea about you using it in some sharedFlow it has no reference to.

I thought about that too. But making SharedFlow pausing by default or even only allow for pausing is not a good idea. What if I do want upstream collections to be dropped instead of paused?
I think I misunderstood you here. You merely write that a SharedFlow provides the ability to pause upstream. But how/where would I actually set isPaused? Where's the MutableStateFlow?

And as mentioned before, I consider it a good thing if pausing is opt-in rather than automagically there. Also, the likeliness that downstream supports/uses pausing is probably low.

I have already provided essentially all necessary implementation in the first post, except for changes to ShareFlow implementation and a few utility methods, like Flow.pausedWhen(isPaused:StateFlow) that would make upstream paused when either this isPaused is true, or the one provided from downstream context is - one needs unsafe flow, to implement this, as it changes context, though. There is obviously a few more, but none too complicated. (I can share what I'm using (in production, btw.) but it's still very rough.)

Yeah, something like pausedWhen or pausedWhile would definitely make sense.

You also write "It will also automatically join pauses before and after emissions." - I think this is not always a good idea, but if you want it, you can use awaitingUnpaused() above (but nobody is forced to use it), or alternative that even awaits before the emit (but I explained in the comment, why I think it's a bad idea to generally pause before the emit). It does not use finally, as I think a failure should propagate immediately regardless.

I'm open to not have emit automatically join pauses. I haven't thought deeply about what default behavior makes most sense: Join by default or don't join by default with opposite = opt-in.
I don't know what you refer to by failure propagation. My code joins pauses before emissions to avoid emissions while paused and joins pauses after emissions to avoid work when the Flow was paused as a result of the emission. emit can block so there can be quite some time between before and after.

Regarding the difference in outputs: The flow starts paused, because there are no collectors yet. pausingStateIn works as shareIn(...,started=Eagerly) and only starts paused, instead of starting lazily. If it did start lazily, the flow would start unpaused.

You're right. With my approach it should also start paused as the shared Flow defaults to Eagerly. I haven't considered that yet.

How would, with your suggestion, intermediate collectors pause upstream collectors? They cannot set isPaused of CoroutinePausing.

I wonder if @elizarov is already getting gray hair from this discussion 😁

@Maartyl
Copy link
Author

Maartyl commented Oct 15, 2020

@fluidsonic

General point: CoroutinePausing is provided when building a CoroutineScope. It should propagate in a similar way a Job would.

Just because a lifecycle is considered paused doesn't mean that all coroutines in that lifecycle's scope should be paused.

This would be a very valid point, I thought, except, when a scope (lifecycle is just a special case) is paused, all 'child scopes' should also be paused by default. The same way cancellation works. I think it should pause everything under it.
However, just like coroutines can do withContext(NonCancellable) they could do withContext(NonPausing) (in my current impl, I'm calling that val pausingAlwaysFalse )
As for a lifecycle, I think everything under a lifecycleScope should be paused when the lifecycle is stopped. If it should not be paused, it probably should not be a child of that scope. (Either it should use viewModelScope, or something even more removed from the LifecycleOwner)

"is paused because X" and "is paused because Y".

I don't see how that would matter. It is running in a scope, that is paused.

What if two parent contexts pause based on different scenarios?

(like merging two pausable coroutine contexts when using withContext, launch, etc.)

Properly having 2 parent contexts (parent scopes) is not possible in Kotlin. This is a sad thing, but there is no support for it. I don't think we can, or even should, solve this for a single CoroutineContext.Element.

  • Either you can do what is normal, and with plus use the latter.
  • Or you can manually do mergePausing(CoroutinePausing, StateFlow<Boolean>) (actually a different signature, but this idea) which would create a CoroutinePausing that is paused, when either is. (I am already using this)
  • Other merge 'operators' are also possible (only paused when both are paused) etc.

(Having no default merging of parent contexts has worse problems than this: For example, it makes ContinuationInterceptor nearly unusable for anything but Dispatchers, as any common Dispatcher switch will override it.) (Jobs do not combine at all, etc.)

[All Flow related.] Such a solution or something similar could be provided for Flows at first and if useful still be extended to all coroutines later on.

That tends to be a very hard thing to do, and pausing is such a general concept, that I can see it popping up in multiple places independently.
I would definitely not want to end up with 4 different common pausing mechanisms in different places. It's probably better to take care and do the first one carefully, and orthogonal.

I am not completely against implementing pausing only for (hot) Flows. That is the main place where I need it too. But, I'm afraid, it will be the wrong long term decision.

There have been requests for pausing Jobs, for example. This would probably be enough for the people who needed that. (as an example) and Android already has a pausing Dispatcher (I think it's unusable, but it's another place where someone implemented unrelated, incompatible pausing.)

I don't know what you refer to by failure propagation.

Exceptions.

avoid emissions while paused

That is not really a problem. Pausing is just a 'hint' anyway. It is worse to emit stale data, once the flow is unpaused. (in my opinion anyway)

[simple to implement] Getting a feature right that's used across all coroutine functionality will likely need a lot of considerations

It definitely needs a lot of consideration. It will still be 'easier' than writing an overload for every flow operator, so it has a variant, that can propagate pausing; or rewriting the whole Flow codebase.

properly communicating downstream pauses upstream

So it must be cooperative across the entire Flow.

I agree this is necessary. Your way seems to need to rewriting huge amounts of code. (maybe not?) I think, either the FlowCollector interface would need to be extended (as you suggested), or... not sure, but either overloading all operators, or changing all existing. I don't think the provided basic idea can well be built on... and I don't think that a feature that requires rewriting the whole codebase has much chance of being integrated...

How do you propose flatMap would fix things? The main issue is that different parts of the flow are completely unrelated, and I cannot put something deep 'inside' the flow (upstream) to pause it there. Any pausing would just be local, at the very 'end' of the flow. (without some propagation). (What is collectPausable ? Collect with extra argument isPaused:StateFlow ?)

I agree, that val isPaused:StateFlow<Boolean> could be placed on FlowCollector instead of in the context, but I think it will be a lot less versatile, and a lot more work to implement and use. (apart from thinking things through)
It could be on FlowCollector as a @InternalCoroutinesApi. (but I think it's probably cleaner on context (i.e. runs in "paused context") and even cleaner as orthogonal)

Therefor I'd not make it too easy to use pausing functionality so that developers have to think twice whether it works in their case.

I don't know if I agree with this... I think it's quite obvious that a flow is paused only when... it's paused... (on the contrary, it would be confusing if flows were paused for no reason)
Pausing only works, when some is set in the context, or 'in the scope' (which includes downstream) - That seems perfectly sensible to me, and easy to learn. It's the same as cancelling (coroutines will not be cancelled, if nothing cancels them, yet people use it freely).

You would need to pass a reference to all the code that might need it, instead of just deriving the scope.

Could you please give a use case here? I can't think of a scenario right now.

Originally, it seemed like you did not want any automatic propagation through the flow. If there is, then it's fine. (still extra work to get it from the flow to the place that does the expensive work, but works)

SharedFlow provides the ability to pause upstream. But how/where would I actually set isPaused? Where's the MutableStateFlow?

The MutableStateFlow would be private in the SharedFlow implementation, and then 'merged' with CoroutinePausing from context to shareIn.
(StateFlow, btw. should should just pass through normal context, as it is always considered 'unpaused' - no need to merge onto it)
Here is an example (untested, partial) implementation: https://gist.github.com/Maartyl/ee0d88b4f9ecc5f816deeaccc80bf4e2

And as mentioned before, I consider it a good thing if pausing is opt-in rather than automagically there.

It is only available, not forced. I agree that forcing pausing is a horrible idea. Code that does not use it, will never be affected by it, unless it's using some function, that does want to be aware of pausing, and I believe it should not be hidden from functions, they are running in a paused context.

How would, with your suggestion, intermediate collectors pause upstream collectors? They cannot set isPaused of CoroutinePausing.

They will change context of the upstream flow. (e.g. (incorrect, but the idea)) https://gist.github.com/Maartyl/b40a07e6cb265dd9037a8d5745ebc0bd

likeliness that downstream supports/uses pausing is probably low.

For now, but people will realize it's usefulness over time. It can be provided by libraries, and downstream can be other SharedFlow too, etc.

I wonder if @elizarov is already getting gray hair from this discussion 😁

... Why? :D

((

I thought about that too. But making SharedFlow pausing by default or even only allow for pausing is not a good idea. What if I do want upstream collections to be dropped instead of paused?

'Dropped' ? Do you mean, cancelled, and recollected? I'm not saying SharedFlow should not support that at all, but, even if it only supported passing CoroutinePausing to wrapped flow, you can implement recollecting with recollectWhenUnpaused in my first post.
Pausing by default will never hurt anything, as pausing-aware code should be paused, when nobody is subscribed to the hot flow (SharedFlow).
))
(//response to android stuff will be in another comment)

@Maartyl
Copy link
Author

Maartyl commented Oct 16, 2020

difference between "is paused because X" and "is paused because Y".

This is an interesting idea... Can anyone think of a situation, where it would be useful, and actually change when code should or should not be paused?

My thoughts: The coroutine (or upstream, if limited to Flow) is already running in some context, the function called by someone for some reason. (It is not called by 2 'callers' - it would be 2 different 'contexts' and 'calls') - If it is a Flow collected in 2 places, they are either independent (cold) or it's a SharedFlow, where each is a different subscriber, and this coroutine must run when at least one subscriber is unpaused, regardless of 'reason'...

@Maartyl
Copy link
Author

Maartyl commented Oct 16, 2020

(this comment is about my beliefs and opinions)

Having two features, both good on their own, but that combine well, tends to be a sign of good design.

Flow already has this beautiful property, that downstream provides execution context for upstream.

It makes sense to "pause execution context". (to me, at least) - And it would combine very nicely, because Flow is already made for this.

(very immature thought, probably not a good idea) A "paused execution context" can even awaitUnpaused() automatically in some places, that are 'sure' to be safe, similar to how cancellation works today. (Main problems I have thought about are: code under a Mutex; and suspension points, that are expected to {not suspend / be fast} at least in some context known by the programmer (e.g. await() is probably not safe, but yield() likely is))

Making the CoroutineContext.Key private and specific to the flow library, and only allow interacting with it through some flow API ... I'm not saying it's bad by any means, but it looks less beautiful... I'm definitely not the right person to have the final say on what design will end up being the best. :D
Changing the API is similar, but probably even more work than that.

This is definitely something that needs to be thought through properly. I'm not saying it will be easy to introduce, or that it should be easy. It must be considered in all scenarios. (and, sadly, I do not have enough insight) BUT I believe it's best to first think a lot about design, and then usually implement only something quite simple and abstract, that ends up solving a lot by combining well with the rest of the features. Coroutines and Flow are definitely beautiful features with such design. (Flow is literally just a higher order function, with some rules on how the 'callback' will be invoked, and all this is built on it. :D)

(btw. I would not want to tie pausing with Jobs. They are both in context, but probably should not be 'one thing' bound together. I mean, isPaused being a property on a Job. It feels like 'complecting' two things, that don't need to be, even though I sometimes make comments about their similarities (e.g. cancelling before))

@psteiger
Copy link

psteiger commented Nov 16, 2020

As I commented in #2194, to achieve the use case of Motivation 2: Lifecycle (Android), what I ended up doing is, instead of relying on pausable coroutines such as the ones launched with LifecycleCoroutinesScope.launchWhenStarted {}, using coroutines that get canceled on onStop() and recreated on onStart() to leverage SharedFlow.subscriptionCount (or SharedFlows launched with shareIn(CoroutineScope, SharingStarted.WhileSubscribed(), replay)):

class Observer<T>(
    lifecycleOwner: LifecycleOwner,
    private val flow: Flow<T>,
    private val collector: suspend (T) -> Unit
) : DefaultLifecycleObserver {

    var job: Job? = null

    override fun onStart(owner: LifecycleOwner) {
        job = owner.lifecycleScope.launch {
            flow.collect {
                collector(it)
            }
        }
    }

    override fun onStop(owner: LifecycleOwner) {
        job?.cancel()
        job = null
    }

    init {
        lifecycleOwner.lifecycle.addObserver(this)
    }
}

inline fun <reified T> Flow<T>.observe(
    lifecycleOwner: LifecycleOwner,
    noinline collector: suspend (T) -> Unit
) = Observer(lifecycleOwner, this, collector)

inline fun <reified T> Flow<T>.observeIn(
    lifecycleOwner: LifecycleOwner
) = Observer(lifecycleOwner, this, {})

Then I launch such coroutine with:

sharedFlow.observe(lifecycleOwner) {
    // ...
}

In my opinion, it would be nice, however, to have a built-in, more generalistic, non-Android-specific concept of pausable coroutines that integrates well with all the coroutines library, as proposed in this issue.

@tom-pratt
Copy link

@psteiger this solution makes good sense to me. It seems like this is more often what you want than the existing asLiveData() implementation. Are you aware of any edge cases where your solution is not desirable or causes unnecessary work? Does it cancel and restart on rotation? Does it work with a SingleLiveEvent type of SharedFlow (replay=0) without potentially missing events if they are emitted exactly while activity is being recreated? I was wondering if it's worth implementing something like observeWhileCreated (wrapper around asLiveData) and observeWhileStarted (your implementation).

Others: why is pausing preferable over cancelling and recreating? It feels like shareIn + MutableSharedFlow/subscriptionCount + psteigers start/stop collection cover all the use cases i can think of.

@psteiger
Copy link

psteiger commented Jan 1, 2021

Hi @tom-pratt ,

Are you aware of any edge cases where your solution is not desirable or causes unnecessary work?

No edge cases that I know of so far.

Does it cancel and restart on rotation?

That depends solely on the SharingStarted parameter used in shareIn(). A delay, if any, can be configured there.

Does it work with a SingleLiveEvent type of SharedFlow (replay=0) without potentially missing events if they are emitted exactly while activity is being recreated?

Hum... that's a tough one. I think the first collector of a shared flow with replay 0 is guaranteed, by design, to get all emitted values. Only subsequent collectors can miss events. So, if a shared flow with replay 0 emits some value during activity recreation and there is no collector, the first collector to (re)appear will receive it. But don't quote me on that, I'd need to do some testing to be 100% sure no events would be lost.

I was wondering if it's worth implementing something like observeWhileCreated (wrapper around asLiveData) and observeWhileStarted (your implementation).

I think having built-in extension functions would be nice as this is such a common use case on Android.

Others: why is pausing preferable over cancelling and recreating? It feels like shareIn + MutableSharedFlow/subscriptionCount + psteigers start/stop collection cover all the use cases i can think of.

@Maartyl
Copy link
Author

Maartyl commented Jan 4, 2021

@tom-pratt , everyone

If a SharedFlow is observed (@psteiger method) from multiple places, it is possible, that only one will get the event, and not the rest.

  • With replay=0, it is even possible that some of them will not see the latest value.
    ... In fact, not just possible: I think it will necessarily happen always, if SharedFlow is updated while the Lifecycle is Stopped, only the first one to start collecting will get the value.
    (I did not test this, though)
  • This is not as big problem, though. One can say it's necessary to have replay=1 for leaf SharedFlowss, and in View bindings, you generally only need the latest value. (It can be a problem, if you need something like difference from last, however. But either, diff should be from what is shown (stored in view), or be in ViewModel if it has value as data, not just visual.)
  • I can imagine more problems could arise, if the SharedFlow were not observed directly, but through operators (on top of being more expensive to recollect)

Does it cancel and restart on rotation?

If you mean configuration change, then any solution will necessarily have to recollect, as the bound view is destroyed and new created. - Only the SharedFlow inside ViewModel will survive. (as it replaces LiveData, that had to be stored on ViewModel as well, even for asLiveData())

Configuration change (=rotation) (Activity being recreated) is less related. (The lifecycle will be destroyed, and new collect will run in a new scope anyway.)

  • More important would be:
    • User clicks 'task switcher' (by accident, or whatever - happens to me often) and then returns to the App.
    • Activity gets temporarily overlaid by another. (Can be you, showing another on top of this one)
    • User leaves from activity, and returns before the runtime kills it.
    • It's not just Activity:
      • A FragmentAdapter (RecyclerView) item moves from being visible.
      • Fragment is temporarily hidden/replaced (but not yet destroyed)
      • One can design custom 'light fragments', and provide a 'lifecycle' to them as well. - I do that. (They do not even have a full Lifecycle: only associated CoroutinePausing)

Pausing, however, can deal with configuration change (=rotation) better!
Without timeouts, The whole SharedFlow 'graph' inside ViewModel will be cancelled, and recreated on a configuration change. This will trigger recomputing everything, even if some replay is available. (as the underlaying flow was cancelled, and must be recollected now)

  • That is definitely not desired, so one must use timeouts. They have their own problems, on top of complicating things, by assuring timeouts in the right places, and not everywhere, leading to compounding (very long to reach deep in the graph) timeouts.
  • This is about Motivation 1 - recollecting , but it shouldn't be underestimated.
  • Pausing solves this, by not destroying the whole pipeline, and recreating it, but keeping the pipeline alive, and only directly informing the deepest parts of the graph, to not emit for a while. - No work is being done, apart from propagating the paused state, and the parts that need it reacting to it (usually only something cheap)
  • Using only timeout, on the other hand, provides no information to the 'flow graph' that it is 'paused', even though the information is provided by the Lifecycle.

I agree that CoroutinePausing is mostly a systemic "performance optimization" which also provides a nice 'API' for some problems (but those could of course be solved in another way, but that is true about everything, and all libraries)

  • the Lifecycle Stopped state exists only as a performance optimization. - Theoretically, you could keep the ~Activity updated until it is destroyed.

I agree it's not outright needed for Android, but I consider the observe=recollect solution to be more of a workaround, than a proper solution.

  • It is essentially what the google flowLiveData/ Flow.asLiveData/... already does, with extra timeout, and ability to be used as LiveData, and not just a direct observe (as it needs to be stored in ViewModel).
  • This direct Flow.observe lowers the overhead, but it still doesn't match the 'contract' perfectly.
  • Strictly speaking: Without Pausing, SharedFlow is not a full replacement of LiveData. (I mean, there could be a different way to implement it, but I don't think it would be as simple and nice as just pausing the Coroutine tree of the flow)

As for using recollecting, I have a similar solution (more similar to what @fluidsonic posted):
Lifecycle.launchRestarting(atLeast:Lifecycle.State){} that works similarly to what @psteiger posted, but for any suspend block, not just a Flow.collect, and for any minimal State, not just Start. (also takes a timeout and an identifying instance of my logger for unhandled exceptions, etc.)

  • I am using it, for some use-cases, but still don't like it as much as CoroutinePausing, especially for Flows and pausing.
  • (The timeout here turns out to not be necessary if one has a good overal design. (All timeouts should be inside ViewModel, not one in each 'leaf observer' , or SharedFlow (ideally, no Lifecycle related timeouts would be needed)) - Alternatively, timeouts could only be in leaf 'observers' and nowhere in ViewModel, but that has it's own issues.)

Examples where I think CouroutinePausing is useful

Room

Imagine the following (slightly extreme, to be obvious) scenario: You have an expensive query, that takes 20 seconds to compute. - Then, you have a large pipeline, that depends on that result, and computes many other things, collected from many places. - Computing that takes another 40 seconds.

  • You are observing the query results in Room. (Returns Flow or LiveData)
Scenario 1

Activity only becomes Stopped for a second (user steps outsdie and returns / another activity is shown over it / ...)

  • Using LiveData:
    • Room query was run once, the first time.
    • LiveData propagates active=false
    • Room will take note of that
    • State returns to Started, LiveData propagates active=true
    • Room will note that, but since nothing has changed, will not run the query again.
    • Nothing else was run, no views were updated.
  • Using plain SharedFlow (without timeout)
    • Room query was run once, the first time. (same)
    • Flow is cascade-cancelled, all the way to Room.
    • Room will forget about the query-observing state.
    • State returns to Started, all Flows will be recreated
    • All views are updated (using values from replay) (looks unchanged)
    • Room will have to rerun the query, as it is being asked for it anew.
    • All pipeline processing runs again
    • All views are updated again after one minute (using values computed this time) (looks unchanged)
    • (You can see why this is horrible, and makes timeouts necessary. (Keep in mind, LiveData had no timeouts, and it's generally a code smell to use timeouts where they are not necessary (e.g. communication with outside the process)))
  • Using SharedFlow with CoroutinePausing
    • Essentially the same as the LiveData case above.
    • If it is a part of a common library, Room can even directly observe the paused state, acting exactly the same.
    • In my program, I delay asking for next emit, until Coroutine is unpaused. It works surprisingly well, but nowhere near as well, as if Room was aware of the pausing mechanism.

Why timeout is problem (on top of complicating design) you can imagine you have a source that emits a lot.

  • If you have lots of processing depending on that, it will all run, until the timeout is reached.
  • This can even annoy the Android Runtime, and it will destroy your activity, because it's using a lot of CPU, while Stopped. - Possibly even killing your whole process sooner.
  • Overall, the reason why Stopped state even exists.
Scenario 2

Activity is rotated, forcing configuration change, and destroying and recreating the Activity.

... I don't have the time to write the whole detail, but you can imagine.

  • The difference will be that all views will need to be rebound and get latest values.
  • With a good design (and Pausing), though: All of pipeline in ViewModel stayed 'alive' and just paused. Nothing was destroyed, nothing needs to be restarted and recomputed. Just the emitting far upstream was paused, so the rest of the pipeline did no work.
  • Plain SharedFlow will lose all collectors, be cancelled, force rerun of the query, and all processing...
  • This case is reasonably well handled by just having timeouts with SharedFlow. (but it doesn't solve design complications introduced by dealing with timeouts everywhere)

Animations

Another place where I needed pausing was animations* (reacting to user action). - In the end, we removed most animations for budget reasons, but I think it would have worked well. - They would be controlled by coroutines, not even Flow. (*Not animations per se, stuff was moving around in non-trivial ways without user directly causing it, but sometimes it had to be paused. - Calling it animations is close enough, and I can imagine if some were doing different kinds of animatsion, it would be useful as well.)

I cannot fully vouche for this, though, as I did not end up using it. (I am using pausing with Room)

Finally

  • I predict this will eventually exist (either in this language, or another) and people will like it.
  • It needs to be in a common library, for different libraries to cooperate.
  • It can have other uses too: Pausing downloads, pausing animations, pausing a server ~Job for some reason (needs to focus on other work / network/outside state... (importantly, abstract, so it can work with all)), pausing a game (if it was written using coroutines), (I'm sure there are many better examples I didn't think of), I know there were requests for pausing Jobs - this would most likely solve most of such needs...
    • I think it's important that it's not limited to only Flow.
      • (another reason for this is, that many suspending functions might not even know they run as part of a Flow)

Most importantly, I think the implementation is simple enough, that it definitely would be worthwhile. (but of course, there might be some crucial insight I'm missing.) - I think only the following is needed:

  • implemented in SharedFlow - (simple, only possible negative: small overhead for people, who do not use it - but it could even largely replace the current SharingStarted mechanics of shareIn - or be integrated, so the cost is lowered, if at all noticable)
  • implemented operator(s) to add pausing (probably quite simple, only has* to sanitize Context like SafeFlow)
    -- (cannot just replace context element (like flowOn): always must merge with downstream context, so either can pause upstream)
  • consider, if 'this being possible', would have any negative impact on any other part of the system (It doesn't seem to me it would - But this needs to be done by someone with high insight into the whole library.)

(((* regarding sanitizing Context, it might even be fine to 'leak' the merged CoroutinePausing downstream. It will always 'encompass' the downstream-outside pausing, and can only be more paused. - Which might even be useful, considering it is running inside callback of the flow: That block could be seen as paused too... - It would probably be weird to have this sole exception, though. - There could probably be problems with this, that I haven't thought about as well.)))

@cbeyls
Copy link
Contributor

cbeyls commented Sep 2, 2021

I just discovered this proposal recently, after writing an article about issues and limitations when using Kotlin Flow on Android.
I invite you to read it, I think it's a good introduction to the issues this proposal would help solving on Android.

@qwwdfsad qwwdfsad added the flow label Sep 9, 2021
@sonphan12
Copy link

There is so much useful information in this discussion. If this proposal is somehow approved, I hope there will be a detailed article about it.

@kapaseker
Copy link

kapaseker commented Dec 15, 2021

I actually need this feature, just like:
Activity paused -> flow pause collecting
Activity resumed -> flow resume collecting

EDIT:
so sorry, "launch when start" feet my needs.
But I also want to control flow's pausing and resuming.

@kworth
Copy link

kworth commented Mar 1, 2023

I just discovered this proposal recently, after writing an article about issues and limitations when using Kotlin Flow on Android. I invite you to read it, I think it's a good introduction to the issues this proposal would help solving on Android.

@cbeyls where do things now stand:

  1. Considering your part 2 (2022) https://bladecoder.medium.com/smarter-shared-kotlin-flows-d6b75fc66754
  2. Considering it's now 2023 (and therefore anything that has come after your part 2)

@matejdro
Copy link

matejdro commented Apr 24, 2025

For us, we are resolving this issue by introducing the concept of "user presence" to the flow. When the flow is collected, a special object is inserted into flow's coroutineContext that exposes isUserPresentFlow(): Flow<Boolean>. This flow is true when user is present (app is open and resumed), but false if app is put into the background.

The concept is very similar to the "pausing" concept above, but we think it has better semantic meaning (it's not clear what "pause" is and what should downstream do with it).

Then some downstream flow can decide to collect this and change its behavior when user is present or not (for example, GPS location is turned off when app leaves the foreground), while the main flow is still being collected. That way we get the most benefits of both alternative approaches (re-collecting on every start/stop event or not recollecting at all).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests