-
Notifications
You must be signed in to change notification settings - Fork 1.9k
[Flow] combineTransformLatest() #1484
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Can you, please, explain a use-case for such an operation. An example code that is using this operation would be extremely helpful, too. |
Generally, I'm connecting user event flows (moving with a map) with some background sync jobs with users data (user's favoriteIds, ...). Then I fetch data needed for viewmodel and in the end expose the flow as LiveData. This example bellow needs combine's "map" and "latest", other my usecases need "transform" and would benefit from "latest". private val mapPlacesFlow: Flow<FeatureCollection> by lazy {
combineTransformLatest<Any?, FeatureCollection>(
mapTilesFlow,
mapFilterFlow,
mapActivePlaceIdFlow
.filter { placeId ->
// if place is already loaded & spread, then we do not run fetch again
mapPlaces.value?.features()?.find { it.getStringProperty("placeId") == placeId } == null
},
session.trip.asFlow(),
session.favoriteIds.asFlow(),
session.userPlaceIds.asFlow()
) { args ->
@Suppress("UNCHECKED_CAST")
loadPlaces(
args[0] as MapState,
args[1] as Filter,
args[2] as String?,
args[3] as Trip?,
args[4] as Set<String>,
args[5] as Session.UserPlaces
)
}
.flowOn(Dispatchers.Default)
.shareIn(viewModelScope)
}
val mapPlaces: LiveData<FeatureCollection> by lazy {
mapPlacesFlow
.asLiveData(viewModelScope)
}
private suspend fun FlowCollector<FeatureCollection>.loadPlaces(
mapState: MapState,
filter: Filter,
selectedPlaceId: String?,
trip: Trip?,
favoriteIds: Set<String>,
userPlaceIds: Session.UserPlaces
): Unit = coroutineScope {
// ...
emit(FeatureCollection...)
} Home this helps design also other API. |
Here is another use case: val manualReload = ConflatedBroadcastChannel<Unit>().also { it.offer(Unit }
val accessTokenFlow = ...
val stateFlow = accessTokenFlow.combineLatestTransform(manualReload.asFlow()) { accessToken, _ ->
if (accessToken == null) {
emit(State.LoggedOut)
} else {
emit(State.Loading)
observeUser(accessToken).doStuff()
}
} In this simple example the reload could be pulled into the inner flow but the use case remains the same however. Any time you want to transform into an infinite stream this would be very useful. |
We also need this. Our use case is an instagram like story that auto progresses. It would be nice if this was just part of the library. |
I managed to roll my own combineTransformLatest which is based on the existing operators 🎉 import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.transformLatest
inline fun <reified T, R> combineTransformLatest(
vararg flows: Flow<T>,
@BuilderInference noinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
): Flow<R> {
return combine(*flows) { it }
.transformLatest(transform)
}
fun <T1, T2, R> combineTransformLatest(
flow: Flow<T1>,
flow2: Flow<T2>,
@BuilderInference transform: suspend FlowCollector<R>.(T1, T2) -> Unit
): Flow<R> {
return combineTransformLatest(flow, flow2) { args ->
@Suppress("UNCHECKED_CAST")
transform(
args[0] as T1,
args[1] as T2
)
}
} |
In fact, |
Especially if you want to run a polling loop,
|
I'd like to propose introduce new
combineTransformLatest
operator , which behaves likecombineTransform
but cancels transformer if new value arrives.Example: loading places to map; user is moving with map, which cancels the previous fetching, also the loaded data depend on filter or other Flows.
I've implement this
Not totally sure if the channelFlow as wrapper is correct, probably possible to do it without it but only with access to intenals.
I'm ok to have just this signature - varargs and with transformer, which allows not to emit or emit multiple times in opposite combineLatest().
The text was updated successfully, but these errors were encountered: