Skip to content

Commit 340d501

Browse files
committed
Optimize combine operator
* Avoid linear complexity for emits * Reduce bytecode size
1 parent adb6164 commit 340d501

File tree

1 file changed

+20
-21
lines changed
  • kotlinx-coroutines-core/common/src/flow/internal

1 file changed

+20
-21
lines changed

kotlinx-coroutines-core/common/src/flow/internal/Combine.kt

+20-21
Original file line numberDiff line numberDiff line change
@@ -49,28 +49,26 @@ internal suspend fun <R, T> FlowCollector<R>.combineInternal(
4949
flows: Array<out Flow<T>>,
5050
arrayFactory: () -> Array<T?>,
5151
transform: suspend FlowCollector<R>.(Array<T>) -> Unit
52-
) {
53-
coroutineScope {
54-
val size = flows.size
55-
val channels =
56-
Array(size) { asFairChannel(flows[it]) }
57-
val latestValues = arrayOfNulls<Any?>(size)
58-
val isClosed = Array(size) { false }
59-
60-
// See flow.combine(other) for explanation.
61-
while (!isClosed.all { it }) {
62-
select<Unit> {
63-
for (i in 0 until size) {
64-
onReceive(isClosed[i], channels[i], { isClosed[i] = true }) { value ->
65-
latestValues[i] = value
66-
if (latestValues.all { it !== null }) {
67-
val arguments = arrayFactory()
68-
for (index in 0 until size) {
69-
arguments[index] = NULL.unbox(latestValues[index])
70-
}
71-
transform(arguments as Array<T>)
72-
}
52+
): Unit = coroutineScope {
53+
val size = flows.size
54+
val channels = Array(size) { asFairChannel(flows[it]) }
55+
val latestValues = arrayOfNulls<Any?>(size)
56+
val isClosed = Array(size) { false }
57+
var nonClosed = size
58+
var remainingNulls = size
59+
// See flow.combine(other) for explanation.
60+
while (nonClosed != 0) {
61+
select<Unit> {
62+
for (i in 0 until size) {
63+
onReceive(isClosed[i], channels[i], { isClosed[i] = true; --nonClosed }) { value ->
64+
if (latestValues[i] == null) --remainingNulls
65+
latestValues[i] = value
66+
if (remainingNulls != 0) return@onReceive
67+
val arguments = arrayFactory()
68+
for (index in 0 until size) {
69+
arguments[index] = NULL.unbox(latestValues[index])
7370
}
71+
transform(arguments as Array<T>)
7472
}
7573
}
7674
}
@@ -84,6 +82,7 @@ private inline fun SelectBuilder<Unit>.onReceive(
8482
noinline onReceive: suspend (value: Any) -> Unit
8583
) {
8684
if (isClosed) return
85+
@Suppress("DEPRECATION")
8786
channel.onReceiveOrNull {
8887
// TODO onReceiveOrClosed when boxing issues are fixed
8988
if (it === null) onClosed()

0 commit comments

Comments
 (0)