Skip to content

Commit c9f25fc

Browse files
committed
Introducing zip and combineLatest Flow operators
* Introduce NullSurrogate.unbox to simplify juggling with nulls * Introduce sendFair internal method to make combineLatest less unfair and surprising
1 parent d3cc25f commit c9f25fc

File tree

9 files changed

+605
-7
lines changed

9 files changed

+605
-7
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+2
Original file line numberDiff line numberDiff line change
@@ -793,6 +793,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
793793
public static final fun broadcastIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
794794
public static synthetic fun broadcastIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/channels/BroadcastChannel;
795795
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
796+
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
796797
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
797798
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
798799
public static final fun delayEach (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
@@ -843,6 +844,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
843844
public static synthetic fun toSet$default (Lkotlinx/coroutines/flow/Flow;Ljava/util/Set;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
844845
public static final fun transform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
845846
public static final fun unsafeFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
847+
public static final fun zip (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
846848
}
847849

848850
public final class kotlinx/coroutines/flow/MigrationKt {

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+8
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,14 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
177177
return sendSuspend(element)
178178
}
179179

180+
internal suspend fun sendFair(element: E) {
181+
if (offer(element)) {
182+
yield() // Works only on fast path to properly work in sequential use-cases
183+
return
184+
}
185+
return sendSuspend(element)
186+
}
187+
180188
public final override fun offer(element: E): Boolean {
181189
val result = offerInternal(element)
182190
return when {

kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt

+8-1
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,11 @@
44

55
package kotlinx.coroutines.flow.internal
66

7-
internal object NullSurrogate
7+
import kotlin.jvm.*
8+
9+
internal object NullSurrogate {
10+
11+
@Suppress("NULL_FOR_NONNULL_TYPE")
12+
@JvmStatic
13+
internal fun <T> unbox(value: Any?): T = if (value === NullSurrogate) null else value as T
14+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
@file:JvmMultifileClass
6+
@file:JvmName("FlowKt")
7+
@file:Suppress("UNCHECKED_CAST")
8+
9+
package kotlinx.coroutines.flow
10+
11+
import kotlinx.coroutines.*
12+
import kotlinx.coroutines.channels.*
13+
import kotlinx.coroutines.flow.internal.*
14+
import kotlinx.coroutines.selects.*
15+
import kotlin.jvm.*
16+
import kotlinx.coroutines.flow.unsafeFlow as flow
17+
18+
/**
19+
* Returns a [Flow] whose values are generated with [transform] function by combining
20+
* the most recently emitted values by each flow.
21+
*
22+
* It can be demonstrated with the following example:
23+
* ```
24+
* val flow = flowOf(1, 2).delayEach(10)
25+
* val flow2 = flowOf("a", "b", "c").delayEach(15)
26+
* flow.combineLatest(flow2) { i, s -> i.toString() + s }.collect {
27+
* println(it) // Will print "1a 2a 2b 2c"
28+
* }
29+
* ```
30+
*/
31+
public fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = flow {
32+
coroutineScope {
33+
val firstChannel = asFairChannel(this@combineLatest)
34+
val secondChannel = asFairChannel(other)
35+
var firstValue: Any? = null
36+
var secondValue: Any? = null
37+
var firstIsClosed = false
38+
var secondIsClosed = false
39+
40+
/*
41+
* Fun fact, this select **semantically** equivalent of the following:
42+
* ```
43+
* selectWhile<Unit> {
44+
* channel.onReceive {
45+
* emitCombined(...)
46+
* }
47+
* channel2.onReceive {
48+
* emitCombined(...)
49+
* }
50+
* }
51+
* ```
52+
* but we are waiting for `channels` branch to get merged where we will change semantics of the select
53+
* to ignore finished clauses.
54+
*
55+
* Instead (especially in the face of non-fair channels) we are using our own hand-rolled select emulation
56+
* on top of previous select.
57+
*/
58+
while (!firstIsClosed || !secondIsClosed) {
59+
select<Unit> {
60+
onReceive(firstIsClosed, firstChannel, { firstIsClosed = true }) { value ->
61+
firstValue = value
62+
if (secondValue !== null) {
63+
emit(transform(NullSurrogate.unbox(firstValue), NullSurrogate.unbox(secondValue)))
64+
}
65+
}
66+
67+
onReceive(secondIsClosed, secondChannel, { secondIsClosed = true }) { value ->
68+
secondValue = value
69+
if (firstValue !== null) {
70+
emit(transform(NullSurrogate.unbox(firstValue), NullSurrogate.unbox(secondValue)))
71+
}
72+
}
73+
}
74+
}
75+
}
76+
}
77+
78+
79+
private inline fun SelectBuilder<Unit>.onReceive(
80+
isClosed: Boolean,
81+
channel: Channel<Any>,
82+
crossinline onClosed: () -> Unit,
83+
noinline onReceive: suspend (value: Any) -> Unit
84+
) {
85+
if (isClosed) return
86+
channel.onReceiveOrNull {
87+
if (it === null) onClosed()
88+
else onReceive(it)
89+
}
90+
}
91+
92+
// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
93+
private fun CoroutineScope.asFairChannel(flow: Flow<*>): Channel<Any> {
94+
val channel = RendezvousChannel<Any>() // Explicit type
95+
launch {
96+
try {
97+
flow.collect { value ->
98+
channel.sendFair(value ?: NullSurrogate)
99+
}
100+
} finally {
101+
channel.close()
102+
}
103+
}
104+
return channel
105+
}
106+
107+
108+
/**
109+
* Zips values from the current flow (`this`) with [other] flow using provided [transform] function applied to each pair of values.
110+
* The resulting flow completes as soon as one of the flows completes and cancel is called on the remaining flow.
111+
*
112+
* It can be demonstrated with the following example:
113+
* ```
114+
* val flow = flowOf(1, 2, 3).delayEach(10)
115+
* val flow2 = flowOf("a", "b", "c", "d").delayEach(15)
116+
* flow.zip(flow2) { i, s -> i.toString() + s }.collect {
117+
* println(it) // Will print "1a 2b 3c"
118+
* }
119+
* ```
120+
*/
121+
public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = flow {
122+
coroutineScope {
123+
val first = asChannel(this@zip)
124+
val second = asChannel(other)
125+
/*
126+
* This approach only works with rendezvous channel and is required to enforce correctness
127+
* in the following scenario:
128+
* ```
129+
* val f1 = flow { emit(1); delay(Long.MAX_VALUE) }
130+
* val f2 = flowOf(1)
131+
* f1.zip(f2) { ... }
132+
* ```
133+
*
134+
* Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
135+
*/
136+
(second as SendChannel<*>).invokeOnClose { first.cancel() }
137+
138+
val otherIterator = second.iterator()
139+
try {
140+
first.consumeEach { value ->
141+
if (!otherIterator.hasNext()) {
142+
return@consumeEach
143+
}
144+
val secondValue = NullSurrogate.unbox<T2>(otherIterator.next())
145+
emit(transform(NullSurrogate.unbox(value), NullSurrogate.unbox(secondValue)))
146+
}
147+
} finally {
148+
second.cancel()
149+
}
150+
}
151+
}
152+
153+
// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
154+
private fun CoroutineScope.asChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
155+
flow.collect { value ->
156+
channel.send(value ?: NullSurrogate)
157+
}
158+
}

0 commit comments

Comments
 (0)