Skip to content

Commit bb167b3

Browse files
committed
Optimize Flow.combine
* Get rid of two code paths * Get rid of accidental O(N^2) where N is the number of flows * Get rid of select that hits performance hard Fixes #2296
1 parent ec9d084 commit bb167b3

File tree

13 files changed

+232
-148
lines changed

13 files changed

+232
-148
lines changed

Diff for: benchmarks/build.gradle.kts

-26
Original file line numberDiff line numberDiff line change
@@ -31,33 +31,7 @@ tasks.named<KotlinCompile>("compileJmhKotlin") {
3131
}
3232
}
3333

34-
/*
35-
* Due to a bug in the inliner it sometimes does not remove inlined symbols (that are later renamed) from unused code paths,
36-
* and it breaks JMH that tries to post-process these symbols and fails because they are renamed.
37-
*/
38-
val removeRedundantFiles by tasks.registering(Delete::class) {
39-
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class")
40-
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$nBlanks\$1\$\$special\$\$inlined\$map\$1\$1.class")
41-
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$score2\$1\$\$special\$\$inlined\$map\$1\$1.class")
42-
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$bonusForDoubleLetter\$1\$\$special\$\$inlined\$map\$1\$1.class")
43-
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$nBlanks\$1\$\$special\$\$inlined\$map\$1\$2\$1.class")
44-
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$bonusForDoubleLetter\$1\$\$special\$\$inlined\$map\$1\$2\$1.class")
45-
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$score2\$1\$\$special\$\$inlined\$map\$1\$2\$1.class")
46-
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOptKt\$\$special\$\$inlined\$collect\$1\$1.class")
47-
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOptKt\$\$special\$\$inlined\$collect\$2\$1.class")
48-
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$histoOfLetters\$1\$\$special\$\$inlined\$fold\$1\$1.class")
49-
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleBase\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class")
50-
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleBase\$play\$histoOfLetters\$1\$\$special\$\$inlined\$fold\$1\$1.class")
51-
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/SaneFlowPlaysScrabble\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class")
5234

53-
// Primes
54-
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/misc/Numbers\$\$special\$\$inlined\$filter\$1\$2\$1.class")
55-
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/misc/Numbers\$\$special\$\$inlined\$filter\$1\$1.class")
56-
}
57-
58-
tasks.named("jmhRunBytecodeGenerator") {
59-
dependsOn(removeRedundantFiles)
60-
}
6135

6236
// It is better to use the following to run benchmarks, otherwise you may get unexpected errors:
6337
// ./gradlew --no-daemon cleanJmhJar jmh -Pjmh="MyBenchmark"

Diff for: benchmarks/src/jmh/kotlin/benchmarks/flow/CombineBenchmark.kt renamed to benchmarks/src/jmh/kotlin/benchmarks/flow/CombineFlowsBenchmark.kt

+11-13
Original file line numberDiff line numberDiff line change
@@ -12,25 +12,23 @@ import java.util.concurrent.*
1212
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
1313
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
1414
@Fork(value = 1)
15-
@BenchmarkMode(Mode.AverageTime)
15+
@BenchmarkMode(Mode.Throughput)
1616
@OutputTimeUnit(TimeUnit.MILLISECONDS)
1717
@State(Scope.Benchmark)
18-
open class CombineBenchmark {
18+
open class CombineFlowsBenchmarVolatilek {
1919

20-
@Benchmark
21-
fun measure10() = measure(10)
20+
@Param("10", "100", "1000")
21+
private var size = 10
2222

2323
@Benchmark
24-
fun measure100() = measure(100)
24+
fun combine() = runBlocking {
25+
combine((1 until size).map { flowOf(it) }) { a -> a}.collect()
26+
}
2527

2628
@Benchmark
27-
fun measure1000() = measure(1000)
28-
29-
fun measure(size: Int) = runBlocking {
30-
val flowList = (1..size).map { flowOf(it) }
31-
val listFlow = combine(flowList) { it.toList() }
32-
33-
listFlow.collect {
34-
}
29+
fun combineTransform() = runBlocking {
30+
val list = (1 until size).map { flowOf(it) }.toList()
31+
combineTransform((1 until size).map { flowOf(it) }) { emit(it) }.collect()
3532
}
3633
}
34+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package benchmarks.flow
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.*
9+
import kotlinx.coroutines.flow.internal.*
10+
import org.openjdk.jmh.annotations.*
11+
import java.util.concurrent.*
12+
13+
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
14+
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
15+
@Fork(value = 1)
16+
@BenchmarkMode(Mode.Throughput)
17+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
18+
@State(Scope.Benchmark)
19+
open class CombineTwoFlowsBenchmark {
20+
21+
@Param("100", "100000", "1000000")
22+
private var size = 100000
23+
24+
@Benchmark
25+
fun combinePlain() = runBlocking {
26+
val flow = (1 until size.toLong()).asFlow()
27+
flow.combine(flow) { a, b -> a + b }.collect()
28+
}
29+
30+
@Benchmark
31+
fun combineTransform() = runBlocking {
32+
val flow = (1 until size.toLong()).asFlow()
33+
flow.combineTransform(flow) { a, b -> emit(a + b) }.collect()
34+
}
35+
36+
@Benchmark
37+
fun combineVararg() = runBlocking {
38+
val flow = (1 until size.toLong()).asFlow()
39+
combine(listOf(flow, flow)) { arr -> arr[0] + arr[1] }.collect()
40+
}
41+
42+
@Benchmark
43+
fun combineTransformVararg() = runBlocking {
44+
val flow = (1 until size.toLong()).asFlow()
45+
combineTransform(listOf(flow, flow)) { arr -> emit(arr[0] + arr[1]) }.collect()
46+
}
47+
}

Diff for: kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

-8
Original file line numberDiff line numberDiff line change
@@ -137,14 +137,6 @@ internal abstract class AbstractSendChannel<E>(
137137
return sendSuspend(element)
138138
}
139139

140-
internal suspend fun sendFair(element: E) {
141-
if (offerInternal(element) === OFFER_SUCCESS) {
142-
yield() // Works only on fast path to properly work in sequential use-cases
143-
return
144-
}
145-
return sendSuspend(element)
146-
}
147-
148140
public final override fun offer(element: E): Boolean {
149141
val result = offerInternal(element)
150142
return when {

Diff for: kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt

-5
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,4 @@ internal open class ChannelCoroutine<E>(
3434
_channel.cancel(exception) // cancel the channel
3535
cancelCoroutine(exception) // cancel the job
3636
}
37-
38-
@Suppress("UNCHECKED_CAST")
39-
suspend fun sendFair(element: E) {
40-
(_channel as AbstractSendChannel<E>).sendFair(element)
41-
}
4237
}

Diff for: kotlinx-coroutines-core/common/src/flow/internal/Combine.kt

+42-85
Original file line numberDiff line numberDiff line change
@@ -9,107 +9,51 @@ import kotlinx.coroutines.*
99
import kotlinx.coroutines.channels.*
1010
import kotlinx.coroutines.flow.*
1111
import kotlinx.coroutines.internal.*
12-
import kotlinx.coroutines.selects.*
1312
import kotlin.coroutines.*
1413
import kotlin.coroutines.intrinsics.*
1514

16-
internal fun getNull(): Symbol = NULL // Workaround for JS BE bug
17-
18-
internal suspend fun <T1, T2, R> FlowCollector<R>.combineTransformInternal(
19-
first: Flow<T1>, second: Flow<T2>,
20-
transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
21-
) {
22-
coroutineScope {
23-
val firstChannel = asFairChannel(first)
24-
val secondChannel = asFairChannel(second)
25-
var firstValue: Any? = null
26-
var secondValue: Any? = null
27-
var firstIsClosed = false
28-
var secondIsClosed = false
29-
while (!firstIsClosed || !secondIsClosed) {
30-
select<Unit> {
31-
onReceive(firstIsClosed, firstChannel, { firstIsClosed = true }) { value ->
32-
firstValue = value
33-
if (secondValue !== null) {
34-
transform(getNull().unbox(firstValue), getNull().unbox(secondValue) as T2)
35-
}
36-
}
37-
38-
onReceive(secondIsClosed, secondChannel, { secondIsClosed = true }) { value ->
39-
secondValue = value
40-
if (firstValue !== null) {
41-
transform(getNull().unbox(firstValue) as T1, getNull().unbox(secondValue) as T2)
42-
}
43-
}
44-
}
45-
}
46-
}
47-
}
48-
4915
@PublishedApi
5016
internal suspend fun <R, T> FlowCollector<R>.combineInternal(
5117
flows: Array<out Flow<T>>,
5218
arrayFactory: () -> Array<T?>,
5319
transform: suspend FlowCollector<R>.(Array<T>) -> Unit
54-
): Unit = coroutineScope {
20+
): Unit = flowScope { // flow scope so any cancellation within the source flow will cancel the whole scope
5521
val size = flows.size
56-
val channels = Array(size) { asFairChannel(flows[it]) }
57-
val latestValues = arrayOfNulls<Any?>(size)
22+
val latestValues = Array<Any?>(size) { NULL }
5823
val isClosed = Array(size) { false }
59-
var nonClosed = size
60-
var remainingNulls = size
61-
// See flow.combine(other) for explanation of the logic
62-
// Reuse receive blocks to avoid allocations on each iteration
63-
val onReceiveBlocks = Array<suspend (Any?) -> Unit>(size) { i ->
64-
{ value ->
65-
if (value === null) {
66-
isClosed[i] = true;
67-
--nonClosed
68-
}
69-
else {
70-
if (latestValues[i] == null) --remainingNulls
71-
latestValues[i] = value
72-
if (remainingNulls == 0) {
73-
val arguments = arrayFactory()
74-
for (index in 0 until size) {
75-
arguments[index] = NULL.unbox(latestValues[index])
24+
val resultChannel = Channel<Array<T>>(Channel.CONFLATED)
25+
val nonClosed = LocalAtomicInt(size)
26+
val remainingAbsentValues = LocalAtomicInt(size)
27+
for (i in 0 until size) {
28+
// Coroutine per flow that keeps track of its value and sends result to downstream
29+
launch {
30+
try {
31+
flows[i].collect { value ->
32+
val previous = latestValues[i]
33+
latestValues[i] = value
34+
if (previous === NULL) remainingAbsentValues.decrementAndGet()
35+
if (remainingAbsentValues.value == 0) {
36+
val results = arrayFactory()
37+
for (index in 0 until size) {
38+
results[index] = NULL.unbox(latestValues[index])
39+
}
40+
// NB: here actually "stale" array can overwrite a fresh one and break linearizability
41+
resultChannel.send(results as Array<T>)
7642
}
77-
transform(arguments as Array<T>)
43+
yield() // Emulate fairness for backward compatibility
44+
}
45+
} finally {
46+
isClosed[i] = true
47+
// Close the channel when there is no more flows
48+
if (nonClosed.decrementAndGet() == 0) {
49+
resultChannel.close()
7850
}
7951
}
8052
}
8153
}
8254

83-
while (nonClosed != 0) {
84-
select<Unit> {
85-
for (i in 0 until size) {
86-
if (isClosed[i]) continue
87-
channels[i].onReceiveOrNull(onReceiveBlocks[i])
88-
}
89-
}
90-
}
91-
}
92-
93-
private inline fun SelectBuilder<Unit>.onReceive(
94-
isClosed: Boolean,
95-
channel: ReceiveChannel<Any>,
96-
crossinline onClosed: () -> Unit,
97-
noinline onReceive: suspend (value: Any) -> Unit
98-
) {
99-
if (isClosed) return
100-
@Suppress("DEPRECATION")
101-
channel.onReceiveOrNull {
102-
// TODO onReceiveOrClosed when boxing issues are fixed
103-
if (it === null) onClosed()
104-
else onReceive(it)
105-
}
106-
}
107-
108-
// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
109-
private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
110-
val channel = channel as ChannelCoroutine<Any>
111-
flow.collect { value ->
112-
return@collect channel.sendFair(value ?: NULL)
55+
resultChannel.consumeEach {
56+
transform(it)
11357
}
11458
}
11559

@@ -131,12 +75,25 @@ internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: sus
13175
val collectJob = Job()
13276
val scopeJob = currentCoroutineContext()[Job]!!
13377
(second as SendChannel<*>).invokeOnClose {
78+
// Optimization to avoid AFE allocation when the other flow is done
13479
if (!collectJob.isActive) collectJob.cancel(AbortFlowException(this@unsafeFlow))
13580
}
13681

13782
val newContext = coroutineContext + scopeJob
13883
val cnt = threadContextElements(newContext)
13984
try {
85+
/*
86+
* Non-trivial undispatched (because we are in the right context and there is no structured concurrency)
87+
* hierarchy:
88+
* -Outer coroutineScope that owns the whole zip process
89+
* - First flow is collected by the child of coroutineScope, collectJob.
90+
* So it can be safely cancelled as soon as the second flow is done
91+
* - **But** the downstream MUST NOT be cancelled when the second flow is done,
92+
* so we emit to downstream from coroutineScope job.
93+
* Typically, such hierarchy requires coroutine for collector that communicates
94+
* with coroutines scope via a channel, but it's way too expensive, so
95+
* we are using this trick instead.
96+
*/
14097
withContextUndispatched( coroutineContext + collectJob) {
14198
flow.collect { value ->
14299
val otherValue = second.receiveOrNull() ?: return@collect

Diff for: kotlinx-coroutines-core/common/src/flow/operators/Zip.kt

+6-7
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,7 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
3131
*/
3232
@JvmName("flowCombine")
3333
public fun <T1, T2, R> Flow<T1>.combine(flow: Flow<T2>, transform: suspend (a: T1, b: T2) -> R): Flow<R> = flow {
34-
combineTransformInternal(this@combine, flow) { a, b ->
35-
emit(transform(a, b))
36-
}
34+
combineInternal(arrayOf(this@combine, flow), { arrayOfNulls(2) }, { emit(transform(it[0] as T1, it[1] as T2)) })
3735
}
3836

3937
/**
@@ -75,10 +73,11 @@ public fun <T1, T2, R> combine(flow: Flow<T1>, flow2: Flow<T2>, transform: suspe
7573
public fun <T1, T2, R> Flow<T1>.combineTransform(
7674
flow: Flow<T2>,
7775
@BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
78-
): Flow<R> = safeFlow {
79-
combineTransformInternal(this@combineTransform, flow) { a, b ->
80-
transform(a, b)
81-
}
76+
): Flow<R> = combineTransform(this, flow) { args: Array<*> ->
77+
transform(
78+
args[0] as T1,
79+
args[1] as T2
80+
)
8281
}
8382

8483
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
7+
/*
8+
* These are atomics that are used as local variables
9+
* where atomicfu doesn't support its tranformations.
10+
*
11+
* Have `Local` prefix to avoid AFU clashes during star-imports
12+
*/
13+
14+
// In fact, used as @Volatile
15+
internal expect class LocalAtomicRef<T>(value: T) {
16+
fun get(): T
17+
fun set(value: T)
18+
}
19+
20+
internal inline var LocalAtomicRef<Any?>.value
21+
get() = get()
22+
set(value) = set(value)
23+
24+
internal expect class LocalAtomicInt(value: Int) {
25+
fun get(): Int
26+
fun set(value: Int)
27+
fun decrementAndGet(): Int
28+
}
29+
30+
internal inline var LocalAtomicInt.value
31+
get() = get()
32+
set(value) = set(value)

Diff for: kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -211,16 +211,16 @@ abstract class CombineTestBase : TestBase() {
211211
hang { expect(3) }
212212
}
213213

214-
val flow = f1.combineLatest(f2, { _, _ -> 1 }).onEach { expect(2) }
214+
val flow = f1.combineLatest(f2, { _, _ -> 1 }).onEach { expectUnreached() }
215215
assertFailsWith<CancellationException>(flow)
216-
finish(4)
216+
finish(2)
217217
}
218218

219219
@Test
220220
fun testCancellationExceptionDownstream() = runTest {
221221
val f1 = flow {
222222
emit(1)
223-
expect(2)
223+
expect(1)
224224
hang { expect(5) }
225225
}
226226
val f2 = flow {
@@ -230,7 +230,7 @@ abstract class CombineTestBase : TestBase() {
230230
}
231231

232232
val flow = f1.combineLatest(f2, { _, _ -> 1 }).onEach {
233-
expect(1)
233+
expect(2)
234234
yield()
235235
expect(4)
236236
throw CancellationException("")

0 commit comments

Comments
 (0)