1
+ package benchmarks.flow
2
+
3
+ import kotlinx.coroutines.*
4
+ import kotlinx.coroutines.channels.consumesAll
5
+ import kotlinx.coroutines.flow.*
6
+ import org.openjdk.jmh.annotations.*
7
+ import org.openjdk.jmh.infra.Blackhole
8
+ import java.util.concurrent.TimeUnit
9
+
10
+ /* *
11
+ * This benchmark can be considered as a macro benchmark for the [kotlinx.coroutines.sync.Semaphore]
12
+ */
13
+ @Warmup(iterations = 5 )
14
+ @Measurement(iterations = 10 )
15
+ @BenchmarkMode(Mode .Throughput )
16
+ @OutputTimeUnit(TimeUnit .MICROSECONDS )
17
+ @State(Scope .Benchmark )
18
+ @Fork(1 )
19
+ open class FlattenMergeBenchmark {
20
+
21
+ @Param(" -10" , " -1" , " 100" , " 500" )
22
+ private var flows: Int = 0
23
+
24
+ @Param(" 1" , " 2" , " 4" ) // local machine
25
+ // @Param("1", "2", "4", "8", "16", "32", "64", "128", "144") // dasquad
26
+ // @Param("1", "2", "4", "8", "16", "32", "64", "96") // Google Cloud
27
+ private var concurrency: Int = 0
28
+
29
+ private lateinit var flow: Flow <Flow <Int >>
30
+
31
+ @Setup
32
+ fun setup () {
33
+ val flowsCount = if (flows < 0 ) {
34
+ - flows * concurrency
35
+ }
36
+ else {
37
+ flows
38
+ }
39
+
40
+ flow = (1 .. flowsCount).asFlow().map {
41
+ flow {
42
+ repeat(ELEMENTS / flowsCount) {
43
+ emit(it)
44
+ }
45
+ }
46
+ }
47
+ }
48
+
49
+ @Benchmark
50
+ fun flattenMerge () = runBlocking {
51
+ flow.flattenMerge(concurrency = concurrency).collect()
52
+ }
53
+ }
54
+
55
+ private const val ELEMENTS = 100
0 commit comments