-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Flow.shareIn and stateIn operators #2047
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
I love the
Just to confirm, opting in to preserve the cache between upstream collections (ala RxReplayingShare) is as simple as filtering |
@zach-klippenstein Filtering does not work, since |
Can you please elaborate a little bit more on error handling?
But what happens to collectors (launched in different scopes)? As far as I can understand they will hang there forever. Same goes for completion. P.S. |
@pacher Yes. The basic rule of shared flow is that subscriber never completes. We'll have it written very near the top in shared flow docs and repeat in So what does it mean in practice? It depends on the error-handling policy of the app:
|
@pacher During design and community review we have not found use-cases for sharing finite-size cold flows to subscribers whose number is not known in advance. We've found that there are some use-cases for replicating a finite-size cold flow into fixed number, known in advance, flows. This case is robust with respect to error propagation since it is syntactically scoped in nature and we plan to address it separately by a dedicated operator for flow replication that will be both completion and error propagating. |
@elizarov Thanks for clarifications, very helpful I do believe that materialization should be baked-in, maybe as another operator which uses It's nice to see that you are still thinking about P.S. I remember how proud Andrey looked in his keynote when he talked about kotlin being used in the banks. I can't imagine them being happy with "crashing the app" either ;-) |
Materializing error could be done along the lines of upstream.
catch {
emit( materializedError )
throw it
} This way it would be delivered both downstream and to the scope. |
📣 There is an important question on the design of sharing operators we need community help with. We need to figure out what should be the default behavior of The question only matters for an 👍 Immediately reset the cache to the initial value (if it was specified; clear if it was not) so that the next time subscribers appear they will not get stale value(s) from the previous upstream flow collection but will receive some kind of initial value (or an empty flow) to explicitly tell them to wait while upstream emits anything (establishes network connection and gets data, etc) 🚀 Keep the last value(s) emitted by the upstream so that the next time subscribers appear they will immediately get previously emitted last value(s) without having to wait until upstream flow emits anything. Let's do quick poll: What do you use in your code most often and want to see as a default? |
I haven't really used Rx in over 2 years since I've been using coroutines, and I can't remember what scenarios I preferred the
Downside: less performant than quickly emitting a cached value
Downside: unexpected weird buggy behavior. For example, with the network connection example, if you had a My personal opinion is that the |
But move |
@elizarov If you use To me, for For |
I'd have to use |
@elizarov for the poll, it might be useful to show an example of the |
This is a bit of a tricky question, 🚀 seems to represents the concept that this flow always maintains the most recent value of the state machine . But emitting a state that could be stale sounds pretty dangerous lol. Than this one 👍 seems safer but breaks that concept... I agree @elizarov is always better , as long devs have a way to opt in to 🚀 |
Good solution for this could maybe allow developer to receive close callback and allow him to transform data before going into "long-term cache"? Going from Zak's example, one could have |
I would vote to remove |
UPDATE: Based on your feedback and having discussed the issue in the team we've decided to make the following changes to this proposed design:
I'll update this issue shortly. We're also changing for buffer terminology that is related to operator-fusion. The shared flow design will be tweaked a bit, too. |
Summary of changes: * SharedFlow, MutableSharedFlow and its constructor. * StateFlow implements SharedFlow. * SharedFlow.onSubscription operator, clarified docs in other onXxx operators. * BufferOverflow strategy in kotlinx.coroutines.channels package. * shareIn and stateIn operators and SharingStarted strategies for them. * SharedFlow.flowOn error lint (up from StateFlow). * Precise cancellable() operator fusion. * Precise distinctUntilChanged() operator fusion. * StateFlow.compareAndSet function. * asStateFlow and asSharedFlow read-only view functions. * Consistently clarified docs on cold vs hot flows. * Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn. This is a DRAFT. Parts that are not done yet: * BufferOverflow strategy support in channels. * Sharing operators conflation with preceding buffer(...) operator and the corresponding tests. * Better functional tests for SharingStarted strategies. * Test cancellability of shared flows. * Add reactive operator migration hints. Fixes #2034 Fixes #2047
Draft PR with implementation 👉 #2069 |
I have a question about CoroutineScope passed into shareIn operator. Since SharedFlow can be collected from many different Scopes with variable lifecycle durations, does the Scope passed into shareIn operator must be longer lasting than all the collector scopes? For example, if I "shareIn" with Fragment lifecycle scope, but then collect with GlobalScope, that collector will be canceled as soon as my Fragment destroys, even though I'm collecting inside GlobalScope? Thanks. |
@gajicm93 It does not have to be. When the sharing scope is cancelled, the sharing corouitne stops. It means it no longer collects from the upstream, but it does not affect downstream subscribers. They can still be active, although they will not receive any further updates. Moreover, shared flow's replay cache is still preserved, so new subscribers can appear and will get a snapshot of a replay cache, too. |
Thank you, this is the key takeaway for me. So essentially I will want the shareIn scope to be "wider" than any of the collectors. |
What about providing an operator to collect to an existing suspend fun <T> Flow<T>.collectWhen(shared: MutableSharedFlow<T>, started: SharingStarted) This would be useful for cases where there are multiple upstreams, or the upstream can change, but we still want to have the benefit of using a Of course, there can be no buffer fusion here, since buffer configuration is immutable in |
@1zaman It looks to be too narrow and quite confusing to be provided out-of-the-box and you can always write this kind of
|
Agreed that it's probably a narrow use-case, and might not be warranted in the standard library. Thinking a bit more on the method definition, it might also be better to define it as an extension method on |
📣 UPDATE: Some final tweaks in the design and worked out PR #2069 with full implementation:
|
📣 UPDATE: PR is now done. Additional changes in the design:
|
Summary of changes: * SharedFlow, MutableSharedFlow and its constructor. * StateFlow implements SharedFlow. * SharedFlow.onSubscription operator, clarified docs in other onXxx operators. * BufferOverflow strategy in kotlinx.coroutines.channels package. * shareIn and stateIn operators and SharingStarted strategies for them. * SharedFlow.flowOn error lint (up from StateFlow). * Precise cancellable() operator fusion. * Precise distinctUntilChanged() operator fusion. * StateFlow.compareAndSet function. * asStateFlow and asSharedFlow read-only view functions. * Consistently clarified docs on cold vs hot flows. * Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn. * Channel(...) constructor function has onBufferOverflow parameter. * buffer(...) operator has onBufferOverflow parameter. * shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators. * shareIn/stateIn fuse with upstream flowOn for more efficient execution. * conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities. * Added reactive operator migration hints. * WhileSubscribed with kotlin.time.Duration params Fixes #2034 Fixes #2047
I can't wait to use this, I hope it comes out soon like how stateflow came out so quick following it's GitHub issue. |
Is this in the new coroutines 1.3.8 release that is part of the Kotlin 1.4RC? |
Any updates on when this might come out? |
Summary of changes: * SharedFlow, MutableSharedFlow and its constructor. * StateFlow implements SharedFlow. * SharedFlow.onSubscription operator, clarified docs in other onXxx operators. * BufferOverflow strategy in kotlinx.coroutines.channels package. * shareIn and stateIn operators and SharingStarted strategies for them. * SharedFlow.flowOn error lint (up from StateFlow). * Precise cancellable() operator fusion. * Precise distinctUntilChanged() operator fusion. * StateFlow.compareAndSet function. * asStateFlow and asSharedFlow read-only view functions. * Consistently clarified docs on cold vs hot flows. * Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn. * Channel(...) constructor function has onBufferOverflow parameter. * buffer(...) operator has onBufferOverflow parameter. * shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators. * shareIn/stateIn fuse with upstream flowOn for more efficient execution. * conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities. * Added reactive operator migration hints. * WhileSubscribed with kotlin.time.Duration params Fixes Kotlin#2034 Fixes Kotlin#2047
Summary of changes: * SharedFlow, MutableSharedFlow and its constructor. * StateFlow implements SharedFlow. * SharedFlow.onSubscription operator, clarified docs in other onXxx operators. * BufferOverflow strategy in kotlinx.coroutines.channels package. * shareIn and stateIn operators and SharingStarted strategies for them. * SharedFlow.flowOn error lint (up from StateFlow). * Precise cancellable() operator fusion. * Precise distinctUntilChanged() operator fusion. * StateFlow.compareAndSet function. * asStateFlow and asSharedFlow read-only view functions. * Consistently clarified docs on cold vs hot flows. * Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn. * Channel(...) constructor function has onBufferOverflow parameter. * buffer(...) operator has onBufferOverflow parameter. * shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators. * shareIn/stateIn fuse with upstream flowOn for more efficient execution. * conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities. * Added reactive operator migration hints. * WhileSubscribed with kotlin.time.Duration params Fixes #2034 Fixes #2047 Co-authored-by: Ibraheem Zaman <[email protected]> Co-authored-by: Thomas Vos <[email protected]>
Summary of changes: * SharedFlow, MutableSharedFlow and its constructor. * StateFlow implements SharedFlow. * SharedFlow.onSubscription operator, clarified docs in other onXxx operators. * BufferOverflow strategy in kotlinx.coroutines.channels package. * shareIn and stateIn operators and SharingStarted strategies for them. * SharedFlow.flowOn error lint (up from StateFlow). * Precise cancellable() operator fusion. * Precise distinctUntilChanged() operator fusion. * StateFlow.compareAndSet function. * asStateFlow and asSharedFlow read-only view functions. * Consistently clarified docs on cold vs hot flows. * Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn. * Channel(...) constructor function has onBufferOverflow parameter. * buffer(...) operator has onBufferOverflow parameter. * shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators. * shareIn/stateIn fuse with upstream flowOn for more efficient execution. * conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities. * Added reactive operator migration hints. * WhileSubscribed with kotlin.time.Duration params Fixes #2034 Fixes #2047 Co-authored-by: Ibraheem Zaman <[email protected]> Co-authored-by: Thomas Vos <[email protected]>
Summary of changes: * SharedFlow, MutableSharedFlow and its constructor. * StateFlow implements SharedFlow. * SharedFlow.onSubscription operator, clarified docs in other onXxx operators. * BufferOverflow strategy in kotlinx.coroutines.channels package. * shareIn and stateIn operators and SharingStarted strategies for them. * SharedFlow.flowOn error lint (up from StateFlow). * Precise cancellable() operator fusion. * Precise distinctUntilChanged() operator fusion. * StateFlow.compareAndSet function. * asStateFlow and asSharedFlow read-only view functions. * Consistently clarified docs on cold vs hot flows. * Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn. * Channel(...) constructor function has onBufferOverflow parameter. * buffer(...) operator has onBufferOverflow parameter. * shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators. * shareIn/stateIn fuse with upstream flowOn for more efficient execution. * conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities. * Added reactive operator migration hints. * WhileSubscribed with kotlin.time.Duration params Fixes #2034 Fixes #2047 Co-authored-by: Ibraheem Zaman <[email protected]> Co-authored-by: Thomas Vos <[email protected]> Co-authored-by: Travis Wyatt <[email protected]>
Summary of changes: * SharedFlow, MutableSharedFlow and its constructor. * StateFlow implements SharedFlow. * SharedFlow.onSubscription operator, clarified docs in other onXxx operators. * BufferOverflow strategy in kotlinx.coroutines.channels package. * shareIn and stateIn operators and SharingStarted strategies for them. * SharedFlow.flowOn error lint (up from StateFlow). * Precise cancellable() operator fusion. * Precise distinctUntilChanged() operator fusion. * StateFlow.compareAndSet function. * asStateFlow and asSharedFlow read-only view functions. * Consistently clarified docs on cold vs hot flows. * Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn. * Channel(...) constructor function has onBufferOverflow parameter. * buffer(...) operator has onBufferOverflow parameter. * shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators. * shareIn/stateIn fuse with upstream flowOn for more efficient execution. * conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities. * Added reactive operator migration hints. * WhileSubscribed with kotlin.time.Duration params Fixes #2034 Fixes #2047 Co-authored-by: Ibraheem Zaman <[email protected]> Co-authored-by: Thomas Vos <[email protected]> Co-authored-by: Travis Wyatt <[email protected]>
UPDATE: Last-minute design change. There will be no default value for |
I still find this confusing to understand. More explanation and examples would be great (in this issue) on:
The network request example was brought up which is exactly what I'm struggling with at the moment, but in a slightly different manner. Instead of just fetching data from a server once and then push it downstream, I want include refreshes.
Here's a quick example how it could look like in a fully hot scenario.
import java.time.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
data class Data(val num: Int, val expires: Instant)
fun CoroutineScope.fetchData(initial: Data): Flow<Data> =
flow {
var data = initial
log("I: $initial")
emit(initial)
while (true) {
delay(Duration.between(Instant.now(), data.expires).toMillis())
data = updateData(data).also { log("U: $it") }
emit(data)
}
}
.broadcastIn(this)
.asFlow()
fun log(message: String) = println("${Instant.now()} $message")
fun updateData(data: Data) = Data(num = data.num + 1, expires = Instant.now() + Duration.ofSeconds(3))
suspend fun main(): Unit = coroutineScope {
val dataFlow = fetchData(initial = Data(num = 1, expires = Instant.now() + Duration.ofSeconds(7)))
log("-- Flow created. No collectors. Starting in 5s…")
delay(5_000)
log("-- Adding 2 collectors")
coroutineScope {
launch { dataFlow.take(3).onCompletion { log("-- Removed collector 1") }.collect { log("1: $it") } }
launch { dataFlow.take(5).onCompletion { log("-- Removed collector 2") }.collect { log("2: $it") } }
}
delay(5_000)
log("-- Adding 1 collector")
dataFlow.take(2).collect { log("3: $it") }
log("-- Done")
coroutineContext.cancelChildren()
}
Here's an improved example with a rudimentary import java.time.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.*
data class Data(val num: Int, val expires: Instant)
fun fetchData(initial: Data): Flow<Data> {
var data = initial
return flow {
log("I: $data")
emit(data)
while (true) {
delay(Duration.between(Instant.now(), data.expires).toMillis())
data = updateData(data).also { log("U: $it") }
emit(data)
}
}
}
fun <T> Flow<T>.shareIn(scope: CoroutineScope): Flow<T> {
val upstream = this
var broadcastChannel: BroadcastChannel<T>? = null
val mutex = Mutex()
var subscriberCount = 0
return flow {
val activeChannel = mutex.withLock {
subscriberCount += 1
broadcastChannel ?: upstream.broadcastIn(scope).also {
broadcastChannel = it
log("-- now hot")
}
}
try {
activeChannel.consumeEach { emit(it) }
} finally {
mutex.withLock {
subscriberCount -= 1
if (subscriberCount == 0) {
broadcastChannel?.cancel()
broadcastChannel = null
log("-- now cold")
}
}
}
}
}
fun log(message: String) = println("${Instant.now()} $message")
fun updateData(data: Data) = Data(num = data.num + 1, expires = Instant.now() + Duration.ofSeconds(3))
suspend fun main(): Unit {
coroutineScope {
val dataFlow = fetchData(initial = Data(num = 1, expires = Instant.now() + Duration.ofSeconds(7)))
.shareIn(this)
log("-- Flow created. No collectors. Starting in 5s…")
delay(5_000)
log("-- Adding 2 collectors")
coroutineScope {
launch { dataFlow.take(3).onCompletion { log("-- Removed collector 1") }.collect { log("1: $it") } }
launch { dataFlow.take(5).onCompletion { log("-- Removed collector 2") }.collect { log("2: $it") } }
}
delay(5_000)
log("-- Adding 1 collector")
dataFlow.take(2).collect { log("3: $it") }
}
log("-- Done")
}
|
The It answers most of my questions above.
|
@fluidsonic It all boils down on whether you work with a state of something (and only need the latest most recent value of this state) or with a shared stream of events |
UPDATE: Very last-minute design change: The order of parameters in |
@elizarov ah, I've completely missed that a |
* Introduce SharedFlow and sharing operators Summary of changes: * SharedFlow, MutableSharedFlow and its constructor. * StateFlow implements SharedFlow. * SharedFlow.onSubscription operator, clarified docs in other onXxx operators. * BufferOverflow strategy in kotlinx.coroutines.channels package. * shareIn and stateIn operators and SharingStarted strategies for them. * SharedFlow.flowOn error lint (up from StateFlow). * Precise cancellable() operator fusion. * Precise distinctUntilChanged() operator fusion. * StateFlow.compareAndSet function. * asStateFlow and asSharedFlow read-only view functions. * Consistently clarified docs on cold vs hot flows. * Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn. * Channel(...) constructor function has onBufferOverflow parameter. * buffer(...) operator has onBufferOverflow parameter. * shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators. * shareIn/stateIn fuse with upstream flowOn for more efficient execution. * conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities. * Added reactive operator migration hints. * WhileSubscribed with kotlin.time.Duration params Fixes Kotlin#2034 Fixes Kotlin#2047 Co-authored-by: Ibraheem Zaman <[email protected]> Co-authored-by: Thomas Vos <[email protected]> Co-authored-by: Travis Wyatt <[email protected]>
* Introduce SharedFlow and sharing operators Summary of changes: * SharedFlow, MutableSharedFlow and its constructor. * StateFlow implements SharedFlow. * SharedFlow.onSubscription operator, clarified docs in other onXxx operators. * BufferOverflow strategy in kotlinx.coroutines.channels package. * shareIn and stateIn operators and SharingStarted strategies for them. * SharedFlow.flowOn error lint (up from StateFlow). * Precise cancellable() operator fusion. * Precise distinctUntilChanged() operator fusion. * StateFlow.compareAndSet function. * asStateFlow and asSharedFlow read-only view functions. * Consistently clarified docs on cold vs hot flows. * Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn. * Channel(...) constructor function has onBufferOverflow parameter. * buffer(...) operator has onBufferOverflow parameter. * shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators. * shareIn/stateIn fuse with upstream flowOn for more efficient execution. * conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities. * Added reactive operator migration hints. * WhileSubscribed with kotlin.time.Duration params Fixes Kotlin#2034 Fixes Kotlin#2047 Co-authored-by: Ibraheem Zaman <[email protected]> Co-authored-by: Thomas Vos <[email protected]> Co-authored-by: Travis Wyatt <[email protected]>
This issue supersedes #1261 and is based on the
SharedFlow
#2034 andStateFlow
#1973 framework. See #2034 for most of the conceptual details on shared flows.Introduction
A
MutableSharedFlow
provides convenient means to own a shared flow of values that other parts of code can subscribe to. However, it is often convenient to take an existing coldFlow
that is defined in some other piece of code and start sharing it by launching a coroutine that collects this upstream flow and emits into the shared flow.shareIn operator
The
shareIn
operator is introduced:It has the following parameters:
scope
- a coroutine scope in which the sharing coroutine is launched.started
- a strategy that controls when sharing is started and stopped.replay
- a number specifying how many values are replayed for each new subscriber (defaults to zero — no replay).Starting sharing
There are the following out-of-the-box implementations of
SharingStarted
:SharingStarted.Eagerly
- start sharing coroutine immediately.SharingStarted.Lazily
- start sharing after the first subscriber appears and keep on going forever.SharingStarted.WhileSubscribed()
- maintain sharing coroutine only while there is at least one subscriber (start when the first one appears, stop when the last one disappears). It keeps the replay cache forever when subscribers disappear by default.The
WhileSubscribed
strategy is a function with optional parameters:stopTimeoutMillis
- how long to wait (in ms) before stopping sharing after the number of subscribers becomes zero.replayExpirationMillis
- how long to wait (in ms) after stopping sharing before resetting the replay cache.All the above values and
WhileSubscribed
function are defined inSharingStarted
companion object. This way, additional values/functions can be defined as extensions. Variants ofWhileSubscribed
function that work with different representations of duration (java.time.Duration
,kotlin.time.Duration
,Long, TimeUnit
) will be defined by the library in the appropriate modules.stateIn operator
As
StateFlow
is a specialized version ofSharedFlow
,stateIn
operator is a specialized version ofshareIn
operator. It does not have areplay
parameter (it is always equal to 1 for the state flow) and has a requiredinitialValue
:When execution happens in suspending context and you want to compute and wait for the initial value of the state to arrive from the upstream flow, there is a suspending variant of
stateIn
without initial value and with the hard-codedsharingStarted = Eagerly
:Custom starting strategies
SharingStarted
is an interface that supports 3rd-party implementations, allowing any starting strategy to be plugged into the sharing operators:An implementation of
SharingStarted
provides a single functioncommand
that transforms thesubscriptionCount
of the shared flow into the flow of commands that control sharing coroutine and are represented withSharingCommand
enum:Error handling
Any error in the upstream flow cancels the sharing coroutine and resets the buffer of the shared flow. The error is delivered to the
scope
. If this behavior is not desirable, then error-handling operators (such asretry
andcatch
) should be applied to the upstream flow beforeshareIn
operator. If the upstream completes normally, then nothing happens.Conceptual implementation
The conceptual implementation of
shareIn
operator is simple:Operator fusion
The actual implementation of
shareIn
operator is more complex. It fuses with the immediately precedingflowOn
operators, directly launching the sharing coroutine in the corresponding context (without additional coroutine and channel to change the context).It also fuses with the immediately preceding
Flow.buffer
operators. It allows for explicit configuration of the buffer size, creating a shared flow that takes a configured buffer size into account:buffer(0).shareIn(scope, 0, started)
- creates shared flow withextraBufferCapacity = 0
. It overrides a default buffer size and thus configures full rendezvous between upstream emitter and subscribers (emitter is suspended until all subscribers process the value).buffer(b).shareIn(scope, started, r)
- creates shared flow withreplay = r
andextraBufferCapacity = b
.conflate().shareIn(scope, started, r)
- creates shared flow withreplay = r
,extraBufferCapacity = 1 when replay == 0
, andonBufferOverflow = DROP_OLDEST
.Implementation
Implementation is in PR #2069.
The text was updated successfully, but these errors were encountered: