Skip to content

[Flow] combineTransformLatest() #1484

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
hrach opened this issue Aug 28, 2019 · 7 comments
Open

[Flow] combineTransformLatest() #1484

hrach opened this issue Aug 28, 2019 · 7 comments
Labels

Comments

@hrach
Copy link
Contributor

hrach commented Aug 28, 2019

I'd like to propose introduce new combineTransformLatest operator , which behaves like combineTransform but cancels transformer if new value arrives.

Example: loading places to map; user is moving with map, which cancels the previous fetching, also the loaded data depend on filter or other Flows.

I've implement this
class Symbol(val symbol: String) {
	override fun toString(): String = symbol

	@Suppress("UNCHECKED_CAST", "NOTHING_TO_INLINE")
	inline fun <T> unbox(value: Any?): T = if (value === this) null as T else value as T
}

val NULL = Symbol("NULL")

inline fun <reified T, R> combineTransformLatest(
	vararg flows: Flow<T>,
	@BuilderInference crossinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
): Flow<R> {
	return channelFlow {
		coroutineScope {
			val size = flows.size
			val channels = flows.map { flow ->
				this@coroutineScope.produce<Any> {
					flow.collect { value ->
						this@produce.send(value ?: NULL)
					}
				}
			}
		var job: Job? = null
		val latestValues = arrayOfNulls<Any?>(size)
		val isClosed = Array(size) { false }

		while (!isClosed.all { it }) {
			select<Unit> {
				for (i in 0 until size) {
					if (isClosed[i]) continue
					@Suppress("DEPRECATION")
					channels[i].onReceiveOrNull { receivedValue ->
						if (receivedValue == null) {
							isClosed[i] = true
						} else {
							latestValues[i] = receivedValue
							if (latestValues.all { it !== null }) {
								job?.apply {
									cancel(CancellationException())
									join()
								}
								job = launch {
									val arguments = arrayOfNulls<T>(size)
									for (index in 0 until size) {
										arguments[index] = NULL.unbox(latestValues[index])
									}
									flow<R> {
										@Suppress("UNCHECKED_CAST")
										this@flow.transform(arguments as Array<T>)
									}.collect {
										this@channelFlow.send(it)
									}
								}
							}
						}
					}
				}
			}
		}
	}
}

Not totally sure if the channelFlow as wrapper is correct, probably possible to do it without it but only with access to intenals.

I'm ok to have just this signature - varargs and with transformer, which allows not to emit or emit multiple times in opposite combineLatest().

@elizarov elizarov added the flow label Sep 3, 2019
@elizarov
Copy link
Contributor

elizarov commented Sep 3, 2019

Can you, please, explain a use-case for such an operation. An example code that is using this operation would be extremely helpful, too.

@hrach
Copy link
Contributor Author

hrach commented Sep 3, 2019

Generally, I'm connecting user event flows (moving with a map) with some background sync jobs with users data (user's favoriteIds, ...). Then I fetch data needed for viewmodel and in the end expose the flow as LiveData.

This example bellow needs combine's "map" and "latest", other my usecases need "transform" and would benefit from "latest".

private val mapPlacesFlow: Flow<FeatureCollection> by lazy {
	combineTransformLatest<Any?, FeatureCollection>(
		mapTilesFlow,
		mapFilterFlow,
		mapActivePlaceIdFlow
			.filter { placeId ->
				// if place is already loaded & spread, then we do not run fetch again
				mapPlaces.value?.features()?.find { it.getStringProperty("placeId") == placeId } == null
			},
		session.trip.asFlow(),
		session.favoriteIds.asFlow(),
		session.userPlaceIds.asFlow()
	) { args ->
		@Suppress("UNCHECKED_CAST")
		loadPlaces(
			args[0] as MapState,
			args[1] as Filter,
			args[2] as String?,
			args[3] as Trip?,
			args[4] as Set<String>,
			args[5] as Session.UserPlaces
		)
	}
		.flowOn(Dispatchers.Default)
		.shareIn(viewModelScope)
}
val mapPlaces: LiveData<FeatureCollection> by lazy {
	mapPlacesFlow
		.asLiveData(viewModelScope)
}

private suspend fun FlowCollector<FeatureCollection>.loadPlaces(
	mapState: MapState,
	filter: Filter,
	selectedPlaceId: String?,
	trip: Trip?,
	favoriteIds: Set<String>,
	userPlaceIds: Session.UserPlaces
): Unit = coroutineScope {
	// ...
	emit(FeatureCollection...)
}

Home this helps design also other API.

@ansman
Copy link
Contributor

ansman commented Sep 4, 2020

Here is another use case:

val manualReload = ConflatedBroadcastChannel<Unit>().also { it.offer(Unit }
val accessTokenFlow = ...

val stateFlow = accessTokenFlow.combineLatestTransform(manualReload.asFlow()) { accessToken, _ ->
  if (accessToken == null) {
    emit(State.LoggedOut)
  } else {
    emit(State.Loading)
    observeUser(accessToken).doStuff()
  }
}

In this simple example the reload could be pulled into the inner flow but the use case remains the same however. Any time you want to transform into an infinite stream this would be very useful.

@PaulWoitaschek
Copy link
Contributor

PaulWoitaschek commented Sep 26, 2020

We also need this.

Our use case is an instagram like story that auto progresses.
There are two flows, one Flow<Data> and one flow that represents the current page index, backed by MutableStateFlow<Int>.
Now based on the amount of text inside a story, the transform function emits a view state with a progress in 0F..1F.
When the user presses the left side of the story, the current page index gets decremented and the story goes to the previous page.
Therefore I need to transform the latest value only.

It would be nice if this was just part of the library.

@PaulWoitaschek
Copy link
Contributor

I managed to roll my own combineTransformLatest which is based on the existing operators 🎉

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.transformLatest

inline fun <reified T, R> combineTransformLatest(
  vararg flows: Flow<T>,
  @BuilderInference noinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
): Flow<R> {
  return combine(*flows) { it }
    .transformLatest(transform)
}

fun <T1, T2, R> combineTransformLatest(
  flow: Flow<T1>,
  flow2: Flow<T2>,
  @BuilderInference transform: suspend FlowCollector<R>.(T1, T2) -> Unit
): Flow<R> {
  return combineTransformLatest(flow, flow2) { args ->
    @Suppress("UNCHECKED_CAST")
    transform(
      args[0] as T1,
      args[1] as T2
    )
  }
}

@Shusek
Copy link

Shusek commented Dec 24, 2021

In fact, combineTransform should be called combineTransformConcat because any function that is suspended inside transform collector also suspend combines a bit like flatMapConcat operator.

@Ic-ks
Copy link

Ic-ks commented Aug 25, 2023

Especially if you want to run a polling loop, combineTransformLatest is very handy, because the running loop is cancelled as soon the input has changed:

fun pollingPageContentFlow(
    isAuthorizedFlow: Flow<Boolean>,
    urlFlow: Flow<String>,
) = combineTransformLatest(isAuthorizedFlow, urlFlow) { isAuthorized, url ->
    while (currentCoroutineContext().isActive) {
        emit(if(isAuthorized) "Page Content $url" else "Not authorized")
        delay(2000)
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

6 participants