Skip to content

Commit fe15b6d

Browse files
fmixingqwwdfsad
authored andcommitted
Flow.flattenMerge benchmark (#1464)
1 parent 4f24a7a commit fe15b6d

File tree

5 files changed

+162
-14
lines changed

5 files changed

+162
-14
lines changed

benchmarks/build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,6 @@ dependencies {
7474
compile "org.openjdk.jmh:jmh-core:1.21"
7575
compile 'com.typesafe.akka:akka-actor_2.12:2.5.0'
7676
compile project(':kotlinx-coroutines-core')
77+
// add jmh dependency on main
78+
jmh sourceSets.main.runtimeClasspath
7779
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# To run this script run the command 'python3 scripts/generate_plots_flow_flatten_merge.py' in the /benchmarks folder
2+
3+
4+
import pandas as pd
5+
import sys
6+
import locale
7+
import matplotlib.pyplot as plt
8+
from matplotlib.ticker import FormatStrFormatter
9+
10+
input_file = "build/reports/jmh/results.csv"
11+
output_file = "out/flow-flatten-merge.svg"
12+
# Please change the value of this variable according to the FlowFlattenMergeBenchmarkKt.ELEMENTS
13+
elements = 100000
14+
benchmark_name = "benchmarks.flow.FlowFlattenMergeBenchmark.flattenMerge"
15+
csv_columns = ["Benchmark", "Score", "Unit", "Param: concurrency", "Param: flowsNumberStrategy"]
16+
rename_columns = {"Benchmark": "benchmark", "Score" : "score", "Unit" : "unit",
17+
"Param: concurrency" : "concurrency", "Param: flowsNumberStrategy" : "flows"}
18+
19+
markers = ['.', 'v', '^', '1', '2', '8', 'p', 'P', 'x', 'D', 'd', 's']
20+
colours = ['red', 'gold', 'sienna', 'olivedrab', 'lightseagreen', 'navy', 'blue', 'm', 'crimson', 'yellow', 'orangered', 'slateblue', 'aqua', 'black', 'silver']
21+
22+
def next_colour():
23+
i = 0
24+
while True:
25+
yield colours[i % len(colours)]
26+
i += 1
27+
28+
def next_marker():
29+
i = 0
30+
while True:
31+
yield markers[i % len(markers)]
32+
i += 1
33+
34+
def draw(data, plt):
35+
plt.xscale('log', basex=2)
36+
plt.gca().xaxis.set_major_formatter(FormatStrFormatter('%0.f'))
37+
plt.grid(linewidth='0.5', color='lightgray')
38+
if data.unit.unique()[0] != "ops/s":
39+
print("Unexpected time unit: " + data.unit.unique()[0])
40+
sys.exit(1)
41+
plt.ylabel("elements / ms")
42+
plt.xlabel('concurrency')
43+
plt.xticks(data.concurrency.unique())
44+
45+
colour_gen = next_colour()
46+
marker_gen = next_marker()
47+
for flows in data.flows.unique():
48+
gen_colour = next(colour_gen)
49+
gen_marker = next(marker_gen)
50+
res = data[(data.flows == flows)]
51+
# plt.plot(res.concurrency, res.score*elements/1000, label="flows={}".format(flows), color=gen_colour, marker=gen_marker)
52+
plt.errorbar(x=res.concurrency, y=res.score*elements/1000, yerr=res.score_error*elements/1000, solid_capstyle='projecting',
53+
label="flows={}".format(flows), capsize=4, color=gen_colour, linewidth=2.2)
54+
55+
langlocale = locale.getdefaultlocale()[0]
56+
locale.setlocale(locale.LC_ALL, langlocale)
57+
dp = locale.localeconv()['decimal_point']
58+
if dp == ",":
59+
csv_columns.append("Score Error (99,9%)")
60+
rename_columns["Score Error (99,9%)"] = "score_error"
61+
elif dp == ".":
62+
csv_columns.append("Score Error (99.9%)")
63+
rename_columns["Score Error (99.9%)"] = "score_error"
64+
else:
65+
print("Unexpected locale delimeter: " + dp)
66+
sys.exit(1)
67+
data = pd.read_csv(input_file, sep=",", decimal=dp)
68+
data = data[csv_columns].rename(columns=rename_columns)
69+
data = data[(data.benchmark == benchmark_name)]
70+
plt.rcParams.update({'font.size': 15})
71+
plt.figure(figsize=(12.5, 10))
72+
draw(data, plt)
73+
plt.legend(loc='upper center', borderpad=0, bbox_to_anchor=(0.5, 1.3), ncol=2, frameon=False, borderaxespad=2, prop={'size': 15})
74+
plt.tight_layout(pad=12, w_pad=2, h_pad=1)
75+
plt.savefig(output_file, bbox_inches='tight')

benchmarks/src/jmh/kotlin/benchmarks/SemaphoreBenchmark.kt

+5-14
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
package benchmarks
22

3+
import benchmarks.common.*
34
import kotlinx.coroutines.*
45
import kotlinx.coroutines.channels.Channel
56
import kotlinx.coroutines.scheduling.ExperimentalCoroutineDispatcher
67
import kotlinx.coroutines.sync.Semaphore
78
import kotlinx.coroutines.sync.withPermit
89
import org.openjdk.jmh.annotations.*
910
import java.util.concurrent.ForkJoinPool
10-
import java.util.concurrent.ThreadLocalRandom
1111
import java.util.concurrent.TimeUnit
1212

1313
@Warmup(iterations = 3, time = 500, timeUnit = TimeUnit.MICROSECONDS)
@@ -50,9 +50,9 @@ open class SemaphoreBenchmark {
5050
jobs += GlobalScope.launch {
5151
repeat(n) {
5252
semaphore.withPermit {
53-
doWork(WORK_INSIDE)
53+
doGeomDistrWork(WORK_INSIDE)
5454
}
55-
doWork(WORK_OUTSIDE)
55+
doGeomDistrWork(WORK_OUTSIDE)
5656
}
5757
}
5858
}
@@ -68,9 +68,9 @@ open class SemaphoreBenchmark {
6868
jobs += GlobalScope.launch {
6969
repeat(n) {
7070
semaphore.send(Unit) // acquire
71-
doWork(WORK_INSIDE)
71+
doGeomDistrWork(WORK_INSIDE)
7272
semaphore.receive() // release
73-
doWork(WORK_OUTSIDE)
73+
doGeomDistrWork(WORK_OUTSIDE)
7474
}
7575
}
7676
}
@@ -83,15 +83,6 @@ enum class SemaphoreBenchDispatcherCreator(val create: (parallelism: Int) -> Cor
8383
EXPERIMENTAL({ parallelism -> ExperimentalCoroutineDispatcher(corePoolSize = parallelism, maxPoolSize = parallelism) })
8484
}
8585

86-
private fun doWork(work: Int) {
87-
// We use geometric distribution here
88-
val p = 1.0 / work
89-
val r = ThreadLocalRandom.current()
90-
while (true) {
91-
if (r.nextDouble() < p) break
92-
}
93-
}
94-
9586
private const val WORK_INSIDE = 80
9687
private const val WORK_OUTSIDE = 40
9788
private const val BATCH_SIZE = 1000000
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package benchmarks.flow
6+
7+
import benchmarks.common.*
8+
import kotlinx.coroutines.*
9+
import kotlinx.coroutines.flow.*
10+
import org.openjdk.jmh.annotations.*
11+
import java.util.concurrent.*
12+
13+
/**
14+
* Benchmark to measure performance of [kotlinx.coroutines.flow.FlowKt.flattenMerge].
15+
* In addition to that, it can be considered as a macro benchmark for the [kotlinx.coroutines.sync.Semaphore]
16+
*/
17+
@Warmup(iterations = 5, time = 1)
18+
@Measurement(iterations = 5, time = 1)
19+
@BenchmarkMode(Mode.Throughput)
20+
@OutputTimeUnit(TimeUnit.SECONDS)
21+
@State(Scope.Benchmark)
22+
@Fork(1)
23+
open class FlowFlattenMergeBenchmark {
24+
@Param
25+
private var flowsNumberStrategy: FlowsNumberStrategy = FlowsNumberStrategy.`10xConcurrency flows`
26+
27+
@Param("1", "2", "4", "8")
28+
private var concurrency: Int = 0
29+
30+
private lateinit var flow: Flow<Flow<Int>>
31+
32+
@Setup
33+
fun setup() {
34+
val n = flowsNumberStrategy.get(concurrency)
35+
val flowElementsToProcess = ELEMENTS / n
36+
37+
flow = (1..n).asFlow().map {
38+
flow {
39+
repeat(flowElementsToProcess) {
40+
doGeomDistrWork(WORK)
41+
emit(it)
42+
}
43+
}
44+
}
45+
}
46+
47+
@Benchmark
48+
fun flattenMerge() = runBlocking(Dispatchers.Default) {
49+
flow.flattenMerge(concurrency = concurrency).collect()
50+
}
51+
}
52+
53+
enum class FlowsNumberStrategy(val get: (concurrency: Int) -> Int) {
54+
`10xConcurrency flows`({ concurrency -> concurrency * 10 }),
55+
`1xConcurrency flows`({ it }),
56+
`100 flows`({ 100 }),
57+
`500 flows`({ 500 })
58+
}
59+
60+
// If you change this variable please be sure that you change variable elements in the generate_plots_flow_flatten_merge.py
61+
// python script as well
62+
private const val ELEMENTS = 100_000
63+
private const val WORK = 100
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package benchmarks.common
6+
7+
import java.util.concurrent.*
8+
9+
fun doGeomDistrWork(work: Int) {
10+
// We use geometric distribution here. We also checked on macbook pro 13" (2017) that the resulting work times
11+
// are distributed geometrically, see https://github.com/Kotlin/kotlinx.coroutines/pull/1464#discussion_r355705325
12+
val p = 1.0 / work
13+
val r = ThreadLocalRandom.current()
14+
while (true) {
15+
if (r.nextDouble() < p) break
16+
}
17+
}

0 commit comments

Comments
 (0)