Skip to content

Commit 44c4c56

Browse files
committed
combineLatest rework
* Operator renamed to combine * Introduced combineTransform operator with custom transformer * Decouple API and implementation details to improve user experience from IDE * combine(Iterable<Flow>) and combineTransform(Iterable<Flow>) are introduced Fixes #1224 Fixes #1262
1 parent 0172998 commit 44c4c56

File tree

8 files changed

+622
-237
lines changed

8 files changed

+622
-237
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+19-2
Original file line numberDiff line numberDiff line change
@@ -847,12 +847,22 @@ public final class kotlinx/coroutines/flow/FlowKt {
847847
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
848848
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
849849
public static final fun collectIndexed (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
850+
public static final synthetic fun combine (Ljava/lang/Iterable;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
851+
public static final fun combine (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
852+
public static final fun combine (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
853+
public static final fun combine (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function5;)Lkotlinx/coroutines/flow/Flow;
854+
public static final fun combine (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function6;)Lkotlinx/coroutines/flow/Flow;
855+
public static final synthetic fun combine ([Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
850856
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
851857
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
852858
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function5;)Lkotlinx/coroutines/flow/Flow;
853859
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function6;)Lkotlinx/coroutines/flow/Flow;
854-
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;[Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
855-
public static final synthetic fun combineLatest (Lkotlinx/coroutines/flow/Flow;[Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
860+
public static final synthetic fun combineTransform (Ljava/lang/Iterable;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
861+
public static final fun combineTransform (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
862+
public static final fun combineTransform (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function5;)Lkotlinx/coroutines/flow/Flow;
863+
public static final fun combineTransform (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function6;)Lkotlinx/coroutines/flow/Flow;
864+
public static final fun combineTransform (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function7;)Lkotlinx/coroutines/flow/Flow;
865+
public static final synthetic fun combineTransform ([Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
856866
public static final fun compose (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
857867
public static final fun concatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
858868
public static final fun concatWith (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
@@ -887,13 +897,16 @@ public final class kotlinx/coroutines/flow/FlowKt {
887897
public static final fun flattenMerge (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
888898
public static synthetic fun flattenMerge$default (Lkotlinx/coroutines/flow/Flow;IILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
889899
public static final fun flow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
900+
public static final fun flowCombine (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
901+
public static final fun flowCombineTransform (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
890902
public static final fun flowOf (Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
891903
public static final fun flowOf ([Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
892904
public static final fun flowOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
893905
public static final fun flowViaChannel (ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
894906
public static synthetic fun flowViaChannel$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
895907
public static final fun flowWith (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
896908
public static synthetic fun flowWith$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
909+
public static final fun flowZip (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
897910
public static final fun fold (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
898911
public static final fun forEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)V
899912
public static final fun getDEFAULT_CONCURRENCY ()I
@@ -964,6 +977,10 @@ public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/cor
964977
public static synthetic fun update$default (Lkotlinx/coroutines/flow/internal/ChannelFlow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/internal/ChannelFlow;
965978
}
966979

980+
public final class kotlinx/coroutines/flow/internal/CombineKt {
981+
public static final fun combine ([Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
982+
}
983+
967984
public final class kotlinx/coroutines/flow/internal/FlowExceptions_commonKt {
968985
public static final fun checkIndexOverflow (I)I
969986
}

kotlinx-coroutines-core/common/src/flow/Migration.kt

+44
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,50 @@ public fun <T> Flow<T>.concatWith(value: T): Flow<T> = noImpl()
364364
)
365365
public fun <T> Flow<T>.concatWith(other: Flow<T>): Flow<T> = noImpl()
366366

367+
@Deprecated(
368+
level = DeprecationLevel.ERROR,
369+
message = "Flow analogue of 'combineLatest' is 'combine'",
370+
replaceWith = ReplaceWith("this.combine(other, transform)")
371+
)
372+
public fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
373+
combine(this, other, transform)
374+
375+
@Deprecated(
376+
level = DeprecationLevel.ERROR,
377+
message = "Flow analogue of 'combineLatest' is 'combine'",
378+
replaceWith = ReplaceWith("combine(this, other, other2, transform)")
379+
)
380+
public inline fun <T1, T2, T3, R> Flow<T1>.combineLatest(
381+
other: Flow<T2>,
382+
other2: Flow<T3>,
383+
crossinline transform: suspend (T1, T2, T3) -> R
384+
) = combine(this, other, other2, transform)
385+
386+
@Deprecated(
387+
level = DeprecationLevel.ERROR,
388+
message = "Flow analogue of 'combineLatest' is 'combine'",
389+
replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)")
390+
)
391+
public inline fun <T1, T2, T3, T4, R> Flow<T1>.combineLatest(
392+
other: Flow<T2>,
393+
other2: Flow<T3>,
394+
other3: Flow<T4>,
395+
crossinline transform: suspend (T1, T2, T3, T4) -> R
396+
) = combine(this, other, other2, other3, transform)
397+
398+
@Deprecated(
399+
level = DeprecationLevel.ERROR,
400+
message = "Flow analogue of 'combineLatest' is 'combine'",
401+
replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)")
402+
)
403+
public inline fun <T1, T2, T3, T4, T5, R> Flow<T1>.combineLatest(
404+
other: Flow<T2>,
405+
other2: Flow<T3>,
406+
other3: Flow<T4>,
407+
other4: Flow<T5>,
408+
crossinline transform: suspend (T1, T2, T3, T4, T5) -> R
409+
): Flow<R> = combine(this, other, other2, other3, other4, transform)
410+
367411
/**
368412
* Delays the emission of values from this flow for the given [timeMillis].
369413
* Use `onStart { delay(timeMillis) }`.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
@file:Suppress("UNCHECKED_CAST", "NON_APPLICABLE_CALL_FOR_BUILDER_INFERENCE") // KT-32203
5+
6+
package kotlinx.coroutines.flow.internal
7+
8+
import kotlinx.coroutines.*
9+
import kotlinx.coroutines.channels.*
10+
import kotlinx.coroutines.flow.*
11+
import kotlinx.coroutines.internal.Symbol
12+
import kotlinx.coroutines.selects.*
13+
14+
internal fun getNull(): Symbol = NULL // Workaround for JS BE bug
15+
16+
internal suspend inline fun <T1, T2, R> FlowCollector<R>.combineTransformInternal(
17+
first: Flow<T1>, second: Flow<T2>,
18+
crossinline transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
19+
) {
20+
coroutineScope {
21+
val firstChannel = asFairChannel(first)
22+
val secondChannel = asFairChannel(second)
23+
var firstValue: Any? = null
24+
var secondValue: Any? = null
25+
var firstIsClosed = false
26+
var secondIsClosed = false
27+
while (!firstIsClosed || !secondIsClosed) {
28+
select<Unit> {
29+
onReceive(firstIsClosed, firstChannel, { firstIsClosed = true }) { value ->
30+
firstValue = value
31+
if (secondValue !== null) {
32+
transform(getNull().unbox(firstValue), getNull().unbox(secondValue) as T2)
33+
}
34+
}
35+
36+
onReceive(secondIsClosed, secondChannel, { secondIsClosed = true }) { value ->
37+
secondValue = value
38+
if (firstValue !== null) {
39+
transform(getNull().unbox(firstValue) as T1, getNull().unbox(secondValue) as T2)
40+
}
41+
}
42+
}
43+
}
44+
}
45+
}
46+
47+
@PublishedApi
48+
internal fun <T, R> combine(
49+
vararg flows: Flow<T>,
50+
arrayFactory: () -> Array<T?>,
51+
transform: suspend FlowCollector<R>.(Array<T>) -> Unit
52+
): Flow<R> = flow {
53+
coroutineScope {
54+
val size = flows.size
55+
val channels =
56+
Array(size) { asFairChannel(flows[it]) }
57+
val latestValues = arrayOfNulls<Any?>(size)
58+
val isClosed = Array(size) { false }
59+
60+
// See flow.combine(other) for explanation.
61+
while (!isClosed.all { it }) {
62+
select<Unit> {
63+
for (i in 0 until size) {
64+
onReceive(isClosed[i], channels[i], { isClosed[i] = true }) { value ->
65+
latestValues[i] = value
66+
if (latestValues.all { it !== null }) {
67+
val arguments = arrayFactory()
68+
for (index in 0 until size) {
69+
arguments[index] = NULL.unbox(latestValues[index])
70+
}
71+
transform(arguments as Array<T>)
72+
}
73+
}
74+
}
75+
}
76+
}
77+
}
78+
}
79+
80+
private inline fun SelectBuilder<Unit>.onReceive(
81+
isClosed: Boolean,
82+
channel: ReceiveChannel<Any>,
83+
crossinline onClosed: () -> Unit,
84+
noinline onReceive: suspend (value: Any) -> Unit
85+
) {
86+
if (isClosed) return
87+
channel.onReceiveOrNull {
88+
// TODO onReceiveOrClosed when boxing issues are fixed
89+
if (it === null) onClosed()
90+
else onReceive(it)
91+
}
92+
}
93+
94+
// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
95+
private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
96+
val channel = channel as ChannelCoroutine<Any>
97+
flow.collect { value ->
98+
return@collect channel.sendFair(value ?: NULL)
99+
}
100+
}
101+
102+
internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = unsafeFlow {
103+
coroutineScope {
104+
val first = asChannel(flow)
105+
val second = asChannel(flow2)
106+
/*
107+
* This approach only works with rendezvous channel and is required to enforce correctness
108+
* in the following scenario:
109+
* ```
110+
* val f1 = flow { emit(1); delay(Long.MAX_VALUE) }
111+
* val f2 = flowOf(1)
112+
* f1.zip(f2) { ... }
113+
* ```
114+
*
115+
* Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
116+
*/
117+
(second as SendChannel<*>).invokeOnClose {
118+
if (!first.isClosedForReceive) first.cancel(AbortFlowException())
119+
}
120+
121+
val otherIterator = second.iterator()
122+
try {
123+
first.consumeEach { value ->
124+
if (!otherIterator.hasNext()) {
125+
return@consumeEach
126+
}
127+
emit(transform(NULL.unbox(value), NULL.unbox(otherIterator.next())))
128+
}
129+
} catch (e: AbortFlowException) {
130+
// complete
131+
} finally {
132+
if (!second.isClosedForReceive) second.cancel(AbortFlowException())
133+
}
134+
}
135+
}
136+
137+
// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
138+
private fun CoroutineScope.asChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
139+
flow.collect { value ->
140+
return@collect channel.send(value ?: NULL)
141+
}
142+
}

kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt

-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import kotlin.jvm.*
2424
* generic function that may transform emitted element, skip it or emit it multiple times.
2525
*
2626
* This operator can be used as a building block for other operators, for example:
27-
*
2827
* ```
2928
* fun Flow<Int>.skipOddAndDuplicateEven(): Flow<Int> = transform { value ->
3029
* if (value % 2 == 0) { // Emit only even values, but twice

0 commit comments

Comments
 (0)