1
1
package benchmarks.flow
2
2
3
3
import kotlinx.coroutines.*
4
- import kotlinx.coroutines.channels.consumesAll
5
4
import kotlinx.coroutines.flow.*
6
5
import org.openjdk.jmh.annotations.*
7
- import org.openjdk.jmh.infra.Blackhole
8
6
import java.util.concurrent.TimeUnit
7
+ import kotlin.math.ceil
9
8
10
9
/* *
11
10
* This benchmark can be considered as a macro benchmark for the [kotlinx.coroutines.sync.Semaphore]
@@ -18,6 +17,10 @@ import java.util.concurrent.TimeUnit
18
17
@Fork(1 )
19
18
open class FlattenMergeBenchmark {
20
19
20
+ /* *
21
+ * Number of flows that are merged in this benchmark. Negative number means that number of flows
22
+ * will be computed as -([flows] * [concurrency]), positive number will be chosen as number of flows.
23
+ */
21
24
@Param(" -10" , " -1" , " 100" , " 500" )
22
25
private var flows: Int = 0
23
26
@@ -30,16 +33,16 @@ open class FlattenMergeBenchmark {
30
33
31
34
@Setup
32
35
fun setup () {
33
- val flowsCount = if (flows < 0 ) {
36
+ val flowsNumber = if (flows < 0 ) {
34
37
- flows * concurrency
35
38
}
36
39
else {
37
40
flows
38
41
}
39
42
40
- flow = (1 .. flowsCount ).asFlow().map {
43
+ flow = (1 .. flowsNumber ).asFlow().map {
41
44
flow {
42
- repeat(ELEMENTS / flowsCount ) {
45
+ repeat(ceil( ELEMENTS / flowsNumber.toDouble()).toInt() ) {
43
46
emit(it)
44
47
}
45
48
}
@@ -52,4 +55,4 @@ open class FlattenMergeBenchmark {
52
55
}
53
56
}
54
57
55
- private const val ELEMENTS = 100
58
+ private const val ELEMENTS = 10_000
0 commit comments