Skip to content

Add combineLatest with multiple flows #1198

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,10 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static synthetic fun broadcastIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/channels/BroadcastChannel;
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function5;)Lkotlinx/coroutines/flow/Flow;
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function6;)Lkotlinx/coroutines/flow/Flow;
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;[Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun debounce (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
Expand Down
99 changes: 99 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/operators/Zip.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import kotlinx.coroutines.flow.unsafeFlow as flow
* }
* ```
*/
@FlowPreview
public fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = flow {
coroutineScope {
val firstChannel = asFairChannel(this@combineLatest)
Expand Down Expand Up @@ -75,6 +76,103 @@ public fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspen
}
}

/**
* Returns a [Flow] whose values are generated with [transform] function by combining
* the most recently emitted values by each flow.
*/
@FlowPreview
public inline fun <T1, T2, T3, R> Flow<T1>.combineLatest(
other: Flow<T2>,
other2: Flow<T3>,
crossinline transform: suspend (T1, T2, T3) -> R
): Flow<R> = (this as Flow<*>).combineLatest(other, other2) { args: Array<*> ->
transform(
args[0] as T1,
args[1] as T2,
args[2] as T3
)
}

/**
* Returns a [Flow] whose values are generated with [transform] function by combining
* the most recently emitted values by each flow.
*/
@FlowPreview
public inline fun <T1, T2, T3, T4, R> Flow<T1>.combineLatest(
other: Flow<T2>,
other2: Flow<T3>,
other3: Flow<T4>,
crossinline transform: suspend (T1, T2, T3, T4) -> R
): Flow<R> = (this as Flow<*>).combineLatest(other, other2, other3) { args: Array<*> ->
transform(
args[0] as T1,
args[1] as T2,
args[2] as T3,
args[3] as T4
)
}

/**
* Returns a [Flow] whose values are generated with [transform] function by combining
* the most recently emitted values by each flow.
*/
@FlowPreview
public inline fun <T1, T2, T3, T4, T5, R> Flow<T1>.combineLatest(
other: Flow<T2>,
other2: Flow<T3>,
other3: Flow<T4>,
other4: Flow<T5>,
crossinline transform: suspend (T1, T2, T3, T4, T5) -> R
): Flow<R> = (this as Flow<*>).combineLatest(other, other2, other3, other4) { args: Array<*> ->
transform(
args[0] as T1,
args[1] as T2,
args[2] as T3,
args[3] as T4,
args[4] as T5
)
}

/**
* Returns a [Flow] whose values are generated with [transform] function by combining
* the most recently emitted values by each flow.
*/
@FlowPreview
public inline fun <reified T, R> Flow<T>.combineLatest(vararg others: Flow<T>, crossinline transform: suspend (Array<T>) -> R): Flow<R> =
combineLatest(*others, arrayFactory = { arrayOfNulls(others.size + 1) }, transform = { transform(it) })

/**
* Returns a [Flow] whose values are generated with [transform] function by combining
* the most recently emitted values by each flow.
*/
@PublishedApi
internal fun <T, R> Flow<T>.combineLatest(vararg others: Flow<T>, arrayFactory: () -> Array<T?>, transform: suspend (Array<T>) -> R): Flow<R> = flow {
coroutineScope {
val size = others.size + 1
val channels =
Array(size) { if (it == 0) asFairChannel(this@combineLatest) else asFairChannel(others[it - 1]) }
val latestValues = arrayOfNulls<Any?>(size)
val isClosed = Array(size) { false }

// See flow.combineLatest(other) for explanation.
while (!isClosed.all { it }) {
select<Unit> {
for (i in 0 until size) {
onReceive(isClosed[i], channels[i], { isClosed[i] = true }) { value ->
latestValues[i] = value
if (latestValues.all { it !== null }) {
val arguments = arrayFactory()
for (index in 0 until size) {
arguments[index] = NullSurrogate.unbox(latestValues[index])
}
emit(transform(arguments as Array<T>))
}
}
}
}
}
}
}

private inline fun SelectBuilder<Unit>.onReceive(
isClosed: Boolean,
Expand Down Expand Up @@ -111,6 +209,7 @@ private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel<Any> = p
* }
* ```
*/
@FlowPreview
public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = flow {
coroutineScope {
val first = asChannel(this@zip)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.combineLatest as combineLatestOriginal
import kotlin.test.*

/*
* Replace: { i, j -> i + j } -> { i, j -> i + j } as soon as KT-30991 is fixed
*/
class CombineLatestTest : TestBase() {
abstract class CombineLatestTestBase : TestBase() {

abstract fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>

@Test
fun testCombineLatest() = runTest {
Expand Down Expand Up @@ -195,6 +198,13 @@ class CombineLatestTest : TestBase() {
assertFailsWith<TestException>(flow)
finish(2)
}
}

private suspend fun sum(s: String?, i: Int?) = s + i
class CombineLatestTest : CombineLatestTestBase() {
override fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = combineLatestOriginal(other, transform)
}

class CombineLatestVarargAdapterTest : CombineLatestTestBase() {
override fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
(this as Flow<*>).combineLatestOriginal(other) { args: Array<Any?> -> transform(args[0] as T1, args[1] as T2) }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.flow.operators

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.test.*

class CombineLatestVarargTest : TestBase() {

@Test
fun testThreeParameters() = runTest {
val flow = flowOf("1").combineLatest(flowOf(2), flowOf(null)) { a, b, c ->
a + b + c
}

assertEquals("12null", flow.single())
}

@Test
fun testFourParameters() = runTest {
val flow = flowOf("1").combineLatest(flowOf(2), flowOf("3"), flowOf(null)) { a, b, c, d ->
a + b + c + d
}

assertEquals("123null", flow.single())
}

@Test
fun testFiveParameters() = runTest {
val flow =
flowOf("1").combineLatest(flowOf(2), flowOf("3"), flowOf(4.toByte()), flowOf(null)) { a, b, c, d, e ->
a + b + c + d + e
}

assertEquals("1234null", flow.single())
}

@Test
fun testVararg() = runTest {
val flow = flowOf("1").combineLatest(
flowOf(2),
flowOf("3"),
flowOf(4.toByte()),
flowOf("5"),
flowOf(null)
) { arr -> arr.joinToString("") }
assertEquals("12345null", flow.single())
}

@Test
fun testEmptyVararg() = runTest {
val list = flowOf(1, 2, 3).combineLatest { args: Array<Any?> -> args[0] }.toList()
assertEquals(listOf(1, 2, 3), list)
}

@Test
fun testNonNullableAny() = runTest {
val value = flowOf(1).combineLatest(flowOf(2)) { args: Array<Int> ->
@Suppress("USELESS_IS_CHECK")
assertTrue(args is Array<Int>)
args[0] + args[1]
}.single()
assertEquals(3, value)
}
}