-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Introduce SharedFlow #2034
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
I really like the personalization possibilities provided by this proposal! Would it be possible to have a custom conflation function? typealias Conflation <T> = (old: T, new: T) -> T I actually use a |
First impression: Loved it, up until the point of error handling. As I understood, your proposed solution is to introduce another container type holding Simple example: data coming in from the network and shared via SharedFlow. One of collectors is writing it to the file on disk. So it needs to know when the flow completes to flush/close the file and in case of server error to move incomplete file to some other location and write an email to admin about the incident. Other collectors might need to perform other actions in case of errors and completion so one central handler on the upstream is not helpful. We are too much used to onError/onComplete combo from Rx. |
@pacher We are considering the option of providing a ready-to-use wrapper and operators to materialize/dematerialize completion and errors for sharing so that you don't have to write them yourself. The core design presented here is mostly aimed at sharing never-ending event flows. However, in order to design those we need to closely look at use-cases of sharing limited-sizes flows:
This is the most quoted use-case we've seen so far and it does not seem to fold nicely into a sharing design at all. This use-case is all about replicating an incoming flow in a number of downstream flows. The key difference from sharing here is that you know the number of those flows in advance and you know that all of them must receive every emitted value without loss. None of the problems of error-handling we see with sharing apply to this use-case. It seems that a dedicated replication operation would be a better fit for this use-case and this replication operation will naturally support completion and error propagation out-of-the-box. See also my reply on #1261 (comment) |
This may not be ideal for retrying use case; I may want to share the retry strategy of upstream and downstream errors, but have different retry strategy between different collectors of the same The proposed solution, if I understand correctly, then would be to That certainly works, but it's a little bit convoluted solution. I'd prefer just writing a retry block downstream, without having to modify anything upstream. If there's no plan to address error handling in any other way, then at such point, it would be at least useful to have a |
Could you elaborate on this a bit, please? A gist with a code example would be helpful, too. I don't understand how different collectors for a shared (single) upstream can have different retry strategies in any useful sense. |
@elizarov Alright, in this case thumbs up! Maybe we should mention it in the description somehow, and maybe you can wait with deprecating channels until all those goodies are here. As for the second part, I still believe that sharing needs a mechanism of error propagation and I don't see how the number and timing of collectors (difference to replicating) affects that. My use case: I want to open websocket connection and expose never-ending stream of incoming events/messages as |
Coming from Android, one simple example. Let's say my app has a shopping cart. I can get number of items in the shopping cart and its contents in one call... Let's say I have two seperate screens in the app, one that displays the whole shopping cart content and one, that just displays the number. On the screen where just number is displayed, perhaps I'd want the retry strategy to be something like, try 3-times and then fail (and don't show the number, it's not as critical). On the screen with contents of the shopping cart, I'd like the user to be able to use a retry button. The error that creates the necessity to retry could come from the shared upstream flow that made request to server, or it could be caused by some further processing downstream (for example combining it with other data from another REST API relevant only for that one single downstream -> such as some additional information about products in the shopping cart, that are not included in the original response to shopping cart contents) |
@pacher With
This solution ensures that a failure to parse each individual message can be processed by clients independently (they get either |
@lukas1 It does not seem that you need a |
@elizarov Thanks, I start to see a bigger picture now. Another question: will there be an option for unlimited buffer backed by linkedlist like in channels? |
@elizarov If I use a regular cold In principle I agree that in this case I could use regular suspending function, but having the data represented as flow gives me opportunity to use other operators on it. Another possibility is, that the data about shopping cart could come from source other than REST, it could be a websocket that could send more messages than one. (Shopping cart example is not useful for this one, but in the past I have worked with a websocket that actually returned two messages over the socket for a single request) |
It seems weird to me that |
This behaves exactly like |
Will Be "active" (hot) while subscriber count >= 1, and shut down when subscriber count = 0? And the |
#974 discusses renaming
I believe so, although that particular implementation would probably live in fun Flow<T>.refCountIn(scope: CoroutineScope): SharedFlow<T> {
val upstream = this
val sharedFlow = MutableSharedFlow<T>()
scope.launch {
sharedFlow.collectorsCount
.filter { it <= 1 }
.collectLatest { count ->
if (count == 1) {
// This collection will be cancelled if collectorsCount drops back to 0.
upstream.collect(sharedFlow)
}
}
}
return sharedFlow
} You could also add parameters to forward to |
@pacher You can specify
What is the |
@ZakTaccardi Indeed. We are actually looking at a separate (less-priority) design on how to retrofit this The conceptual difference is that it only makes sense to call it conflation with a buffer size of 1. For larger sizes, neither the name nor the concept works correctly. To generalize it to a larger buffer size we need a more explicit name like |
@ZakTaccardi |
A bit of a nitpic, I think |
@Virgiel We could, but it is harder to fit into the general framework of the shared flow. Right now a slow collector can only "miss" an event that was dropped out of the buffer with With a custom conflation there is a possibility that some collector was fast enough to receive an old value before it was conflated, but others were slow and were affected by this custom conflation, receiving a merged combination of old and new event instead. It is both harder to explain and harder to implement properly. What we can do for this use-case is to provide this kind of custom-conflation operator out-of-the-box (
Now, this solution cannot ensure that only slow collectors conflate events and all slow collectors share CPU effort to conflate at the same time. I'm not sure that getting this last bit of improvement in CPU consumption is really worthy of added complexity in implementation. As a side-note, the rationale to pick this set of configuration parameters to |
Summary of changes: * SharedFlow, MutableSharedFlow and its constructor. * StateFlow implements SharedFlow. * SharedFlow.onSubscription operator, clarified docs in other onXxx operators. * BufferOverflow strategy in kotlinx.coroutines.channels package. * shareIn and stateIn operators and SharingStarted strategies for them. * SharedFlow.flowOn error lint (up from StateFlow). * Precise cancellable() operator fusion. * Precise distinctUntilChanged() operator fusion. * StateFlow.compareAndSet function. * asStateFlow and asSharedFlow read-only view functions. * Consistently clarified docs on cold vs hot flows. * Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn. * Channel(...) constructor function has onBufferOverflow parameter. * buffer(...) operator has onBufferOverflow parameter. * shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators. * shareIn/stateIn fuse with upstream flowOn for more efficient execution. * conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities. * Added reactive operator migration hints. * WhileSubscribed with kotlin.time.Duration params Fixes #2034 Fixes #2047 Co-authored-by: Ibraheem Zaman <[email protected]> Co-authored-by: Thomas Vos <[email protected]> Co-authored-by: Travis Wyatt <[email protected]>
Summary of changes: * SharedFlow, MutableSharedFlow and its constructor. * StateFlow implements SharedFlow. * SharedFlow.onSubscription operator, clarified docs in other onXxx operators. * BufferOverflow strategy in kotlinx.coroutines.channels package. * shareIn and stateIn operators and SharingStarted strategies for them. * SharedFlow.flowOn error lint (up from StateFlow). * Precise cancellable() operator fusion. * Precise distinctUntilChanged() operator fusion. * StateFlow.compareAndSet function. * asStateFlow and asSharedFlow read-only view functions. * Consistently clarified docs on cold vs hot flows. * Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn. * Channel(...) constructor function has onBufferOverflow parameter. * buffer(...) operator has onBufferOverflow parameter. * shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators. * shareIn/stateIn fuse with upstream flowOn for more efficient execution. * conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities. * Added reactive operator migration hints. * WhileSubscribed with kotlin.time.Duration params Fixes #2034 Fixes #2047 Co-authored-by: Ibraheem Zaman <[email protected]> Co-authored-by: Thomas Vos <[email protected]> Co-authored-by: Travis Wyatt <[email protected]>
If it is not too late and not too difficult, please consider returning current |
@pacher Can you, please, elaborate on why would you need it? What's the use-case? |
UPDATE: Last-minute design change - |
UPDATE: Very last-minute design change - |
A couple of questions/observations regarding "rendezvous shared flow" (with zero replay and zero buffer):
|
For item 2, Is there a way to get the previously documented behavior?
It would make transitioning to |
Not really. Even if it is implemented this way it is going to an extremely fragile guarantee and all Flow operators will have to contain a separate section in their documentation on whether they preserve or break this guarantee. It will be a very big foot gun. Fundamentally, Flow is an asynchronous reactive framework. You could adapt it for use in the case where you need some kind of synchronous processing, but that would go against all the design principles on which Flow is based. Flow is fundamentally centered around the idea that your application is structured as one-way asynchronous pipelines with a single source of truth. Data producers should be in no way affected by how and when the consumers receive and process the respective values. |
Hey @elizarov, do you maybe have an ETA when will the |
https://mvnrepository.com/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core/1.4.1-native-mt |
Thanks! It's also published here https://jcenter.bintray.com/org/jetbrains/kotlinx/kotlinx-coroutines-core/ |
* Introduce SharedFlow and sharing operators Summary of changes: * SharedFlow, MutableSharedFlow and its constructor. * StateFlow implements SharedFlow. * SharedFlow.onSubscription operator, clarified docs in other onXxx operators. * BufferOverflow strategy in kotlinx.coroutines.channels package. * shareIn and stateIn operators and SharingStarted strategies for them. * SharedFlow.flowOn error lint (up from StateFlow). * Precise cancellable() operator fusion. * Precise distinctUntilChanged() operator fusion. * StateFlow.compareAndSet function. * asStateFlow and asSharedFlow read-only view functions. * Consistently clarified docs on cold vs hot flows. * Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn. * Channel(...) constructor function has onBufferOverflow parameter. * buffer(...) operator has onBufferOverflow parameter. * shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators. * shareIn/stateIn fuse with upstream flowOn for more efficient execution. * conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities. * Added reactive operator migration hints. * WhileSubscribed with kotlin.time.Duration params Fixes Kotlin#2034 Fixes Kotlin#2047 Co-authored-by: Ibraheem Zaman <[email protected]> Co-authored-by: Thomas Vos <[email protected]> Co-authored-by: Travis Wyatt <[email protected]>
* Introduce SharedFlow and sharing operators Summary of changes: * SharedFlow, MutableSharedFlow and its constructor. * StateFlow implements SharedFlow. * SharedFlow.onSubscription operator, clarified docs in other onXxx operators. * BufferOverflow strategy in kotlinx.coroutines.channels package. * shareIn and stateIn operators and SharingStarted strategies for them. * SharedFlow.flowOn error lint (up from StateFlow). * Precise cancellable() operator fusion. * Precise distinctUntilChanged() operator fusion. * StateFlow.compareAndSet function. * asStateFlow and asSharedFlow read-only view functions. * Consistently clarified docs on cold vs hot flows. * Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn. * Channel(...) constructor function has onBufferOverflow parameter. * buffer(...) operator has onBufferOverflow parameter. * shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators. * shareIn/stateIn fuse with upstream flowOn for more efficient execution. * conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities. * Added reactive operator migration hints. * WhileSubscribed with kotlin.time.Duration params Fixes Kotlin#2034 Fixes Kotlin#2047 Co-authored-by: Ibraheem Zaman <[email protected]> Co-authored-by: Thomas Vos <[email protected]> Co-authored-by: Travis Wyatt <[email protected]>
So how does one create a StateFlow/MutableStateFlow/SharedFlow that supports duplicate emissions? |
My library ReactiveState-Kotlin contains a |
This is what I've been using when I need to mimic a StateFlow behavior (not API) but allow duplicate emissions:
Not sure if it's 100% the same behavior, but it's close enough for my use case. |
Introduction
There is a need to have a
Flow
implementation that is hot (always active independently of collectors) and shares emitted values among all collectors that subscribe to it. Its design generalizesStateFlow
beyond the narrow set of use-case it supports (see #1973). Previously it was discussed in various issues under the tentative name ofEventFlow
, but having analyzed use-cases and worked out the draft design we believe thatSharedFlow
better describes what it is. It will replace all sorts ofBroadcastChannel
implementations and serves as a basis for the design ofshareIn
operator. The simplest version of sharing operator builds a combination of aSharedFlow
that downstream collects from and an upstream flow that emits into thisSharedFlow
. See #2047 for details on sharing operators.Use-cases
Design overview
The design has two interfaces
SharedFlow
/MutableSharedFlow
just likeSharedState
/MutableSharedState
:SharedFlow
is a regularFlow
plus:replayCache
is a snapshot of the current replay cache for non-reactive use (show dialog, etc).MutableSharedFlow
is aFlowCollector
(upstream flow can directlyemit
to it) plus:tryEmit
- non suspending variant ofemit
(for regular non-suspending event listeners, named consistently with upcomingtrySend
, see SendChannel.offer should never throw #974);subscriptionCount
- exposes the current number of subscriptions (active collectors) as aStateFlow
, which allows to easily implement any kind of advanced upstream data connection policy if needed;resetReplayCache
- resets the replay cache. It is useful for very transient data that should not be replayed to collectors when its upstream is not active.Instances of
MutableSharedFlow
are created with the constructor function:It has the following parameters:
replay
- how many items are replayed to new subscribers (optional, defaults to zero);extraBufferCapacity
- how many items are buffered in addition to replay so that emit does not suspend while there is a buffer remaining (optional, defaults to zero);onBufferOverflow
- configures an action on buffer overflow (optional, defaults to suspendingemit
call, supported only whenreplay > 0
orextraBufferCapacity > 0
).Buffer overflow
The case of the shared flow without buffer (created with
MutableSharedFlow()
) is special. It is a rendezvous shared flow where eachemit
call suspends until all collectors process the emitted value.tryEmit
call on such a flow never succeeds and always returnsfalse
.In a case with buffering the call to
emit
may ether put the value into the buffer and return immediately (tryEmit
does the same and returnstrue
in this case) or, if there are slow collectors that have not processed previously buffered values yet,emit
call will suspend (tryEmit
does nothing and returnsfalse
in this case). The action on buffer overflow can be configured withonBufferOverflow
parameter, which essentially configures what should be sacrificed on overflow:If anything other than the default
SUSPEND
is configured, thenemit
never suspends andtryEmit
always returnstrue
, handling the buffer overflow by eitherDROP_OLDEST
strategy (drops the oldest value in the buffer, the slow collectors will not see it, makes sure that latest value is kept) orDROP_LATEST
strategy (drops the newly emitted value, keeps the old buffered values).Example usage in code
Error handling
The big elephant in the room of
SharedFlow
design is error handling.BroadcastChannel
design and similar primitives in Rx propagate errors from upstream to downstream. We could do the same, but propagating errors immensely complicates the design and creates the following problems:resetBuffer
or is there a separate way to reset the error state? etc...SharedFlow
for unbounded time, and is not being reported to any kind of error-handling code. This is not good for reliable applications.What are the actual use-cases for error handling? What needs to happen when there is an error in the upstream? It looks like the following error handling strategies observed in the wild account for the majority of error-handling use-cases:
Both of the strategies already have existing Flow APIs to retry and to catch errors and they can be used on the upstream flow before emitting events to the
SharedFlow
. In essence, all kinds of completion and error states can be always materialized as emitted values to share them. This way, sharing design becomes completely decoupled and orthogonal to error handling.SharedFlow never completes
As a side-effect of this error-handling design decision, the
SharedFlow
never completes. A callcollect { ... }
on aSharedFlow
must be canceled to terminate it. However, if completion is needed, then it can always be materialized by a special emitted value. A collector can applytakeWhile
operator to complete the resulting flow when this special value is encountered.Deprecation of BroadcastChannel
The
SharedFlow
is designed to completely replace all kinds ofBroadcastChannel
implementations. They will be supported but will be deprecated as soon asSharedFlow
becomes stable:ConflatedBroadcastChannel()
→MutableStateFlow()
BroadcastChannel(capacity)
→MutableSharedFlow(replay = 0, extraBufferCapacity = capacity)
MutableSharedFlow
is not a drop-in replacement forBroadcastChannel
. There are the following major functional differences that have to be addressed during replacement:BroadcastChannel
is actually capable of directly representing an error state (and causes the error-handling problem described above, that one this being one of the reasons to deprecate it). So, a code that was relying on this error-handling capability ofBroadcastChannel
will have to be changed to materialize those errors usingcatch
operator.BroadcastChannel
supports various advancedChannel
APIs, likeselect
expression. If needed on the downstream side, the resultingSharedFlow
can be converted to a channel viaproduceIn
operator, if needed on the upstream side, thechannelFlow
builder can be used.StateFlow is SharedFlow
Last, but not the least,
StateFlow
(see #1973) is trivially retrofitted to implement aSharedFlow
withMutableStateFlow
implementingMutableSharedFlow
:replayCache
is singleton list of the currentvalue
;tryEmit
,emit
that could be useful for composition (can emit another flow intoStateFlow
)subscriptionCount
could be useful for state flows, too.resetReplayCache
is not supported.With this change,
SharedFlow
consistently becomes the only hot flow in the library; easy to see and learn by its type. When something is aSharedFlow
it means that it is always live regardless of the presence of collector and all emissions are shared among collectors.StateFlow
becomes a special-purpose, high-performance, and efficient implementation ofSharedFlow
for narrow, but widely used case of sharing a state. In fact, a shared flow behaves identically to a state flow when it is created with the following parameters anddistinctUntilChanged
operator is applied to it:The above information will be added to
StateFlow
documentation, thus giving easily actionable advice on what to do when you have a use-case that is almost like aStateFlow
, but with some tweaks (more buffer space, support duplicate emissions, does not have an initial value, etc).onSubscription operator
The following operator that is specific to shared flows is added:
It is similar to
onStart
with one big difference.onStart
calls the action before a subscription to the shared flow is established. It means that ifonStart
action initiates an operation that emits the values into the shared flow, there is no guarantee that those emissions will be received by this downstream collector. However,onSubscription
calls the action after a subscription to the shared flow is established, guaranteeing the reception of all the emitted values after this moment (assuming they are not dropped on buffer overflow).Implementation
Implementation is in PR #2069.
The text was updated successfully, but these errors were encountered: