diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index 16ba32a385..13e24be9f9 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -799,6 +799,10 @@ public final class kotlinx/coroutines/flow/FlowKt { public static synthetic fun broadcastIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/channels/BroadcastChannel; public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; + public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow; + public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function5;)Lkotlinx/coroutines/flow/Flow; + public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function6;)Lkotlinx/coroutines/flow/Flow; + public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;[Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun debounce (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow; diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt index ea46a09833..40061abd36 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt @@ -28,6 +28,7 @@ import kotlinx.coroutines.flow.unsafeFlow as flow * } * ``` */ +@FlowPreview public fun Flow.combineLatest(other: Flow, transform: suspend (T1, T2) -> R): Flow = flow { coroutineScope { val firstChannel = asFairChannel(this@combineLatest) @@ -75,6 +76,103 @@ public fun Flow.combineLatest(other: Flow, transform: suspen } } +/** + * Returns a [Flow] whose values are generated with [transform] function by combining + * the most recently emitted values by each flow. + */ +@FlowPreview +public inline fun Flow.combineLatest( + other: Flow, + other2: Flow, + crossinline transform: suspend (T1, T2, T3) -> R +): Flow = (this as Flow<*>).combineLatest(other, other2) { args: Array<*> -> + transform( + args[0] as T1, + args[1] as T2, + args[2] as T3 + ) +} + +/** + * Returns a [Flow] whose values are generated with [transform] function by combining + * the most recently emitted values by each flow. + */ +@FlowPreview +public inline fun Flow.combineLatest( + other: Flow, + other2: Flow, + other3: Flow, + crossinline transform: suspend (T1, T2, T3, T4) -> R +): Flow = (this as Flow<*>).combineLatest(other, other2, other3) { args: Array<*> -> + transform( + args[0] as T1, + args[1] as T2, + args[2] as T3, + args[3] as T4 + ) +} + +/** + * Returns a [Flow] whose values are generated with [transform] function by combining + * the most recently emitted values by each flow. + */ +@FlowPreview +public inline fun Flow.combineLatest( + other: Flow, + other2: Flow, + other3: Flow, + other4: Flow, + crossinline transform: suspend (T1, T2, T3, T4, T5) -> R +): Flow = (this as Flow<*>).combineLatest(other, other2, other3, other4) { args: Array<*> -> + transform( + args[0] as T1, + args[1] as T2, + args[2] as T3, + args[3] as T4, + args[4] as T5 + ) +} + +/** + * Returns a [Flow] whose values are generated with [transform] function by combining + * the most recently emitted values by each flow. + */ +@FlowPreview +public inline fun Flow.combineLatest(vararg others: Flow, crossinline transform: suspend (Array) -> R): Flow = + combineLatest(*others, arrayFactory = { arrayOfNulls(others.size + 1) }, transform = { transform(it) }) + +/** + * Returns a [Flow] whose values are generated with [transform] function by combining + * the most recently emitted values by each flow. + */ +@PublishedApi +internal fun Flow.combineLatest(vararg others: Flow, arrayFactory: () -> Array, transform: suspend (Array) -> R): Flow = flow { + coroutineScope { + val size = others.size + 1 + val channels = + Array(size) { if (it == 0) asFairChannel(this@combineLatest) else asFairChannel(others[it - 1]) } + val latestValues = arrayOfNulls(size) + val isClosed = Array(size) { false } + + // See flow.combineLatest(other) for explanation. + while (!isClosed.all { it }) { + select { + for (i in 0 until size) { + onReceive(isClosed[i], channels[i], { isClosed[i] = true }) { value -> + latestValues[i] = value + if (latestValues.all { it !== null }) { + val arguments = arrayFactory() + for (index in 0 until size) { + arguments[index] = NullSurrogate.unbox(latestValues[index]) + } + emit(transform(arguments as Array)) + } + } + } + } + } + } +} private inline fun SelectBuilder.onReceive( isClosed: Boolean, @@ -111,6 +209,7 @@ private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel = p * } * ``` */ +@FlowPreview public fun Flow.zip(other: Flow, transform: suspend (T1, T2) -> R): Flow = flow { coroutineScope { val first = asChannel(this@zip) diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt index 25bb75be57..5e50a95365 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt @@ -5,12 +5,15 @@ package kotlinx.coroutines.flow import kotlinx.coroutines.* +import kotlinx.coroutines.flow.combineLatest as combineLatestOriginal import kotlin.test.* /* * Replace: { i, j -> i + j } -> { i, j -> i + j } as soon as KT-30991 is fixed */ -class CombineLatestTest : TestBase() { +abstract class CombineLatestTestBase : TestBase() { + + abstract fun Flow.combineLatest(other: Flow, transform: suspend (T1, T2) -> R): Flow @Test fun testCombineLatest() = runTest { @@ -195,6 +198,13 @@ class CombineLatestTest : TestBase() { assertFailsWith(flow) finish(2) } +} - private suspend fun sum(s: String?, i: Int?) = s + i +class CombineLatestTest : CombineLatestTestBase() { + override fun Flow.combineLatest(other: Flow, transform: suspend (T1, T2) -> R): Flow = combineLatestOriginal(other, transform) } + +class CombineLatestVarargAdapterTest : CombineLatestTestBase() { + override fun Flow.combineLatest(other: Flow, transform: suspend (T1, T2) -> R): Flow = + (this as Flow<*>).combineLatestOriginal(other) { args: Array -> transform(args[0] as T1, args[1] as T2) } +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestVarargTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestVarargTest.kt new file mode 100644 index 0000000000..37726fad27 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestVarargTest.kt @@ -0,0 +1,68 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow.operators + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import kotlin.test.* + +class CombineLatestVarargTest : TestBase() { + + @Test + fun testThreeParameters() = runTest { + val flow = flowOf("1").combineLatest(flowOf(2), flowOf(null)) { a, b, c -> + a + b + c + } + + assertEquals("12null", flow.single()) + } + + @Test + fun testFourParameters() = runTest { + val flow = flowOf("1").combineLatest(flowOf(2), flowOf("3"), flowOf(null)) { a, b, c, d -> + a + b + c + d + } + + assertEquals("123null", flow.single()) + } + + @Test + fun testFiveParameters() = runTest { + val flow = + flowOf("1").combineLatest(flowOf(2), flowOf("3"), flowOf(4.toByte()), flowOf(null)) { a, b, c, d, e -> + a + b + c + d + e + } + + assertEquals("1234null", flow.single()) + } + + @Test + fun testVararg() = runTest { + val flow = flowOf("1").combineLatest( + flowOf(2), + flowOf("3"), + flowOf(4.toByte()), + flowOf("5"), + flowOf(null) + ) { arr -> arr.joinToString("") } + assertEquals("12345null", flow.single()) + } + + @Test + fun testEmptyVararg() = runTest { + val list = flowOf(1, 2, 3).combineLatest { args: Array -> args[0] }.toList() + assertEquals(listOf(1, 2, 3), list) + } + + @Test + fun testNonNullableAny() = runTest { + val value = flowOf(1).combineLatest(flowOf(2)) { args: Array -> + @Suppress("USELESS_IS_CHECK") + assertTrue(args is Array) + args[0] + args[1] + }.single() + assertEquals(3, value) + } +}