Skip to content

Commit 67df6e6

Browse files
committed
[WIP] Improve performance of comine(Iterable<Flow>) by reusing lambdas for onReceive
Addresses #2296
1 parent 20341f2 commit 67df6e6

File tree

2 files changed

+57
-8
lines changed

2 files changed

+57
-8
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package benchmarks.flow
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.*
9+
import org.openjdk.jmh.annotations.*
10+
import java.util.concurrent.*
11+
12+
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
13+
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
14+
@Fork(value = 1)
15+
@BenchmarkMode(Mode.AverageTime)
16+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
17+
@State(Scope.Benchmark)
18+
open class CombineBenchmark {
19+
20+
@Benchmark
21+
fun measure10() = measure(10)
22+
23+
@Benchmark
24+
fun measure100() = measure(100)
25+
26+
@Benchmark
27+
fun measure1000() = measure(1000)
28+
29+
fun measure(size: Int) = runBlocking {
30+
val flowList = (1..size).map { flowOf(it) }
31+
val listFlow = combine(flowList) { it.toList() }
32+
33+
listFlow.collect {
34+
}
35+
}
36+
}

Diff for: kotlinx-coroutines-core/common/src/flow/internal/Combine.kt

+21-8
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,18 @@ internal suspend fun <R, T> FlowCollector<R>.combineInternal(
5656
val isClosed = Array(size) { false }
5757
var nonClosed = size
5858
var remainingNulls = size
59-
// See flow.combine(other) for explanation.
60-
while (nonClosed != 0) {
61-
select<Unit> {
62-
for (i in 0 until size) {
63-
onReceive(isClosed[i], channels[i], { isClosed[i] = true; --nonClosed }) { value ->
64-
if (latestValues[i] == null) --remainingNulls
65-
latestValues[i] = value
66-
if (remainingNulls != 0) return@onReceive
59+
// See flow.combine(other) for explanation of the logic
60+
// Reuse receive blocks to avoid allocations on each iteration
61+
val onReceiveBlocks = Array<suspend (Any?) -> Unit>(size) { i ->
62+
{ value ->
63+
if (value === null) {
64+
isClosed[i] = true;
65+
--nonClosed
66+
}
67+
else {
68+
if (latestValues[i] == null) --remainingNulls
69+
latestValues[i] = value
70+
if (remainingNulls == 0) {
6771
val arguments = arrayFactory()
6872
for (index in 0 until size) {
6973
arguments[index] = NULL.unbox(latestValues[index])
@@ -73,6 +77,15 @@ internal suspend fun <R, T> FlowCollector<R>.combineInternal(
7377
}
7478
}
7579
}
80+
81+
while (nonClosed != 0) {
82+
select<Unit> {
83+
for (i in 0 until size) {
84+
if (isClosed[i]) continue
85+
channels[i].onReceiveOrNull(onReceiveBlocks[i])
86+
}
87+
}
88+
}
7689
}
7790

7891
private inline fun SelectBuilder<Unit>.onReceive(

0 commit comments

Comments
 (0)