Skip to content

Commit 19f7ebb

Browse files
committed
Add combineLatest with multiple flow parameters
* combineLatest(Iterable<Flow<T>>) is not added deliberately, use-case is unclear * All specific overloads are marked as inline to reduce binary compatibility pressure Fixes #1193
1 parent 218dc97 commit 19f7ebb

File tree

4 files changed

+183
-2
lines changed

4 files changed

+183
-2
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+4
Original file line numberDiff line numberDiff line change
@@ -799,6 +799,10 @@ public final class kotlinx/coroutines/flow/FlowKt {
799799
public static synthetic fun broadcastIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/channels/BroadcastChannel;
800800
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
801801
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
802+
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
803+
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function5;)Lkotlinx/coroutines/flow/Flow;
804+
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function6;)Lkotlinx/coroutines/flow/Flow;
805+
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;[Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
802806
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
803807
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
804808
public static final fun debounce (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;

kotlinx-coroutines-core/common/src/flow/operators/Zip.kt

+99
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import kotlinx.coroutines.flow.unsafeFlow as flow
2828
* }
2929
* ```
3030
*/
31+
@FlowPreview
3132
public fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = flow {
3233
coroutineScope {
3334
val firstChannel = asFairChannel(this@combineLatest)
@@ -75,6 +76,103 @@ public fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspen
7576
}
7677
}
7778

79+
/**
80+
* Returns a [Flow] whose values are generated with [transform] function by combining
81+
* the most recently emitted values by each flow.
82+
*/
83+
@FlowPreview
84+
public inline fun <T1, T2, T3, R> Flow<T1>.combineLatest(
85+
other: Flow<T2>,
86+
other2: Flow<T3>,
87+
crossinline transform: suspend (T1, T2, T3) -> R
88+
): Flow<R> = (this as Flow<*>).combineLatest(other, other2) { args: Array<*> ->
89+
transform(
90+
args[0] as T1,
91+
args[1] as T2,
92+
args[2] as T3
93+
)
94+
}
95+
96+
/**
97+
* Returns a [Flow] whose values are generated with [transform] function by combining
98+
* the most recently emitted values by each flow.
99+
*/
100+
@FlowPreview
101+
public inline fun <T1, T2, T3, T4, R> Flow<T1>.combineLatest(
102+
other: Flow<T2>,
103+
other2: Flow<T3>,
104+
other3: Flow<T4>,
105+
crossinline transform: suspend (T1, T2, T3, T4) -> R
106+
): Flow<R> = (this as Flow<*>).combineLatest(other, other2, other3) { args: Array<*> ->
107+
transform(
108+
args[0] as T1,
109+
args[1] as T2,
110+
args[2] as T3,
111+
args[3] as T4
112+
)
113+
}
114+
115+
/**
116+
* Returns a [Flow] whose values are generated with [transform] function by combining
117+
* the most recently emitted values by each flow.
118+
*/
119+
@FlowPreview
120+
public inline fun <T1, T2, T3, T4, T5, R> Flow<T1>.combineLatest(
121+
other: Flow<T2>,
122+
other2: Flow<T3>,
123+
other3: Flow<T4>,
124+
other4: Flow<T5>,
125+
crossinline transform: suspend (T1, T2, T3, T4, T5) -> R
126+
): Flow<R> = (this as Flow<*>).combineLatest(other, other2, other3, other4) { args: Array<*> ->
127+
transform(
128+
args[0] as T1,
129+
args[1] as T2,
130+
args[2] as T3,
131+
args[3] as T4,
132+
args[4] as T5
133+
)
134+
}
135+
136+
/**
137+
* Returns a [Flow] whose values are generated with [transform] function by combining
138+
* the most recently emitted values by each flow.
139+
*/
140+
@FlowPreview
141+
public inline fun <reified T, R> Flow<T>.combineLatest(vararg others: Flow<T>, crossinline transform: suspend (Array<T>) -> R): Flow<R> =
142+
combineLatest(*others, arrayFactory = { arrayOfNulls(others.size + 1) }, transform = { transform(it) })
143+
144+
/**
145+
* Returns a [Flow] whose values are generated with [transform] function by combining
146+
* the most recently emitted values by each flow.
147+
*/
148+
@PublishedApi
149+
internal fun <T, R> Flow<T>.combineLatest(vararg others: Flow<T>, arrayFactory: () -> Array<T?>, transform: suspend (Array<T>) -> R): Flow<R> = flow {
150+
coroutineScope {
151+
val size = others.size + 1
152+
val channels =
153+
Array(size) { if (it == 0) asFairChannel(this@combineLatest) else asFairChannel(others[it - 1]) }
154+
val latestValues = arrayOfNulls<Any?>(size)
155+
val isClosed = Array(size) { false }
156+
157+
// See flow.combineLatest(other) for explanation.
158+
while (!isClosed.all { it }) {
159+
select<Unit> {
160+
for (i in 0 until size) {
161+
onReceive(isClosed[i], channels[i], { isClosed[i] = true }) { value ->
162+
latestValues[i] = value
163+
if (latestValues.all { it !== null }) {
164+
val arguments = arrayFactory()
165+
for (index in 0 until size) {
166+
arguments[index] = NullSurrogate.unbox(latestValues[index])
167+
}
168+
emit(transform(arguments as Array<T>))
169+
}
170+
}
171+
}
172+
}
173+
}
174+
}
175+
}
78176

79177
private inline fun SelectBuilder<Unit>.onReceive(
80178
isClosed: Boolean,
@@ -111,6 +209,7 @@ private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel<Any> = p
111209
* }
112210
* ```
113211
*/
212+
@FlowPreview
114213
public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = flow {
115214
coroutineScope {
116215
val first = asChannel(this@zip)

kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt

+12-2
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,15 @@
55
package kotlinx.coroutines.flow
66

77
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.combineLatest as combineLatestOriginal
89
import kotlin.test.*
910

1011
/*
1112
* Replace: { i, j -> i + j } -> { i, j -> i + j } as soon as KT-30991 is fixed
1213
*/
13-
class CombineLatestTest : TestBase() {
14+
abstract class CombineLatestTestBase : TestBase() {
15+
16+
abstract fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>
1417

1518
@Test
1619
fun testCombineLatest() = runTest {
@@ -195,6 +198,13 @@ class CombineLatestTest : TestBase() {
195198
assertFailsWith<TestException>(flow)
196199
finish(2)
197200
}
201+
}
198202

199-
private suspend fun sum(s: String?, i: Int?) = s + i
203+
class CombineLatestTest : CombineLatestTestBase() {
204+
override fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = combineLatestOriginal(other, transform)
200205
}
206+
207+
class CombineLatestVarargAdapterTest : CombineLatestTestBase() {
208+
override fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
209+
(this as Flow<*>).combineLatestOriginal(other) { args: Array<Any?> -> transform(args[0] as T1, args[1] as T2) }
210+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow.operators
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.*
9+
import kotlin.test.*
10+
11+
class CombineLatestVarargTest : TestBase() {
12+
13+
@Test
14+
fun testThreeParameters() = runTest {
15+
val flow = flowOf("1").combineLatest(flowOf(2), flowOf(null)) { a, b, c ->
16+
a + b + c
17+
}
18+
19+
assertEquals("12null", flow.single())
20+
}
21+
22+
@Test
23+
fun testFourParameters() = runTest {
24+
val flow = flowOf("1").combineLatest(flowOf(2), flowOf("3"), flowOf(null)) { a, b, c, d ->
25+
a + b + c + d
26+
}
27+
28+
assertEquals("123null", flow.single())
29+
}
30+
31+
@Test
32+
fun testFiveParameters() = runTest {
33+
val flow =
34+
flowOf("1").combineLatest(flowOf(2), flowOf("3"), flowOf(4.toByte()), flowOf(null)) { a, b, c, d, e ->
35+
a + b + c + d + e
36+
}
37+
38+
assertEquals("1234null", flow.single())
39+
}
40+
41+
@Test
42+
fun testVararg() = runTest {
43+
val flow = flowOf("1").combineLatest(
44+
flowOf(2),
45+
flowOf("3"),
46+
flowOf(4.toByte()),
47+
flowOf("5"),
48+
flowOf(null)
49+
) { arr -> arr.joinToString("") }
50+
assertEquals("12345null", flow.single())
51+
}
52+
53+
@Test
54+
fun testEmptyVararg() = runTest {
55+
val list = flowOf(1, 2, 3).combineLatest { args: Array<Any?> -> args[0] }.toList()
56+
assertEquals(listOf(1, 2, 3), list)
57+
}
58+
59+
@Test
60+
fun testNonNullableAny() = runTest {
61+
val value = flowOf(1).combineLatest(flowOf(2)) { args: Array<Int> ->
62+
@Suppress("USELESS_IS_CHECK")
63+
assertTrue(args is Array<Int>)
64+
args[0] + args[1]
65+
}.single()
66+
assertEquals(3, value)
67+
}
68+
}

0 commit comments

Comments
 (0)