Skip to content

Commit c438a0e

Browse files
committed
Numbers benchmark that exposes some of Flow weaknesses
1 parent c42d338 commit c438a0e

File tree

2 files changed

+126
-2
lines changed

2 files changed

+126
-2
lines changed

benchmarks/build.gradle

+3-2
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,13 @@ task removeRedundantFiles(type: Delete) {
3434
delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOptKt\$\$special\$\$inlined\$collect\$1\$1.class"
3535
delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOptKt\$\$special\$\$inlined\$collect\$2\$1.class"
3636
delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$histoOfLetters\$1\$\$special\$\$inlined\$fold\$1\$1.class"
37-
3837
delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleBase\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class"
3938
delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleBase\$play\$histoOfLetters\$1\$\$special\$\$inlined\$fold\$1\$1.class"
40-
4139
delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble//SaneFlowPlaysScrabble\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class"
4240

41+
// Primes
42+
delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/misc/Numbers\$\$special\$\$inlined\$filter\$1\$2\$1.class"
43+
delete "$buildDir/classes/kotlin/jmh/benchmarks/flow/misc/Numbers\$\$special\$\$inlined\$filter\$1\$1.class"
4344
}
4445

4546
jmhRunBytecodeGenerator.dependsOn(removeRedundantFiles)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
6+
package benchmarks.flow.misc
7+
8+
import benchmarks.flow.scrabble.flow
9+
import io.reactivex.*
10+
import io.reactivex.functions.*
11+
import kotlinx.coroutines.*
12+
import kotlinx.coroutines.flow.*
13+
import org.openjdk.jmh.annotations.*
14+
import java.util.concurrent.*
15+
16+
/*
17+
* Results:
18+
*
19+
* // Throw FlowAborted overhead
20+
* Numbers.primes avgt 7 4106.837 ± 59.672 us/op
21+
* Numbers.primesRx avgt 7 2777.232 ± 85.357 us/op
22+
*
23+
* // On par
24+
* Numbers.transformations avgt 7 20.290 ± 1.367 us/op
25+
* Numbers.transformationsRx avgt 7 22.932 ± 1.863 us/op
26+
*
27+
* // Channels overhead
28+
* Numbers.zip avgt 7 470.737 ± 10.838 us/op
29+
* Numbers.zipRx avgt 7 104.811 ± 9.073 us/op
30+
*
31+
*/
32+
@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
33+
@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
34+
@Fork(value = 1)
35+
@BenchmarkMode(Mode.AverageTime)
36+
@OutputTimeUnit(TimeUnit.MICROSECONDS)
37+
@State(Scope.Benchmark)
38+
open class Numbers {
39+
40+
companion object {
41+
private const val primes = 100
42+
private const val natural = 1000
43+
}
44+
45+
private fun numbers() = flow {
46+
for (i in 2L..Long.MAX_VALUE) emit(i)
47+
}
48+
49+
private fun primesFlow(): Flow<Long> = flow {
50+
var source = numbers()
51+
while (true) {
52+
val next = source.take(1).single()
53+
emit(next)
54+
source = source.filter { it % next != 0L }
55+
}
56+
}
57+
58+
private fun rxNumbers() =
59+
Flowable.generate(Callable { 1L }, BiFunction<Long, Emitter<Long>, Long> { state, emitter ->
60+
val newState = state + 1
61+
emitter.onNext(newState)
62+
newState
63+
})
64+
65+
private fun generateRxPrimes(): Flowable<Long> = Flowable.generate(Callable { rxNumbers() },
66+
BiFunction<Flowable<Long>, Emitter<Long>, Flowable<Long>> { state, emitter ->
67+
// Not the most fair comparison, but here we go
68+
val prime = state.firstElement().blockingGet()
69+
emitter.onNext(prime)
70+
state.filter { it % prime != 0L }
71+
})
72+
73+
@Benchmark
74+
fun primes() = runBlocking {
75+
primesFlow().take(primes).count()
76+
}
77+
78+
@Benchmark
79+
fun primesRx() = generateRxPrimes().take(primes.toLong()).count().blockingGet()
80+
81+
@Benchmark
82+
fun zip() = runBlocking {
83+
val numbers = numbers().take(natural)
84+
val first = numbers
85+
.filter { it % 2L != 0L }
86+
.map { it * it }
87+
val second = numbers
88+
.filter { it % 2L == 0L }
89+
.map { it * it }
90+
first.zip(second) { v1, v2 -> v1 + v2 }.filter { it % 3 == 0L }.count()
91+
}
92+
93+
@Benchmark
94+
fun zipRx() {
95+
val numbers = rxNumbers().take(natural.toLong())
96+
val first = numbers
97+
.filter { it % 2L != 0L }
98+
.map { it * it }
99+
val second = numbers
100+
.filter { it % 2L == 0L }
101+
.map { it * it }
102+
first.zipWith(second, BiFunction<Long, Long, Long> { v1, v2 -> v1 + v2 }).filter { it % 3 == 0L }.count()
103+
.blockingGet()
104+
}
105+
106+
@Benchmark
107+
fun transformations(): Int = runBlocking {
108+
numbers()
109+
.take(natural)
110+
.filter { it % 2L != 0L }
111+
.map { it * it }
112+
.filter { (it + 1) % 3 == 0L }.count()
113+
}
114+
115+
@Benchmark
116+
fun transformationsRx(): Long {
117+
return rxNumbers().take(natural.toLong())
118+
.filter { it % 2L != 0L }
119+
.map { it * it }
120+
.filter { (it + 1) % 3 == 0L }.count()
121+
.blockingGet()
122+
}
123+
}

0 commit comments

Comments
 (0)