Skip to content

Commit 1e26cde

Browse files
committed
Fixed according to the code review
1 parent 243beea commit 1e26cde

File tree

5 files changed

+38
-38
lines changed

5 files changed

+38
-38
lines changed

benchmarks/build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,6 @@ dependencies {
7474
compile "org.openjdk.jmh:jmh-core:1.21"
7575
compile 'com.typesafe.akka:akka-actor_2.12:2.5.0'
7676
compile project(':kotlinx-coroutines-core')
77+
// add jmh dependency on main
78+
jmh sourceSets.main.runtimeClasspath
7779
}

benchmarks/plots-generators/generate-flatten-merge-flow-plots.py renamed to benchmarks/scripts/generate_plots_flow_flatten_merge.py

+12-14
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
from matplotlib.ticker import FormatStrFormatter
44

55
inputFile = "jmh-result.csv"
6-
outputFile = "flatten-merge-plots.svg"
7-
elements = 10000
8-
benchmarkName="benchmarks.flow.FlattenMergeBenchmark.flattenMerge"
6+
outputFile = "flow-flatten-merge.svg"
7+
elements = 100000
8+
benchmarkName="benchmarks.flow.FlowFlattenMergeBenchmark.flattenMerge"
99

1010
markers = ['.', 'v', '^', '1', '2', '8', 'p', 'P', 'x', 'D', 'd', 's']
1111
colours = ['black', 'silver', 'red', 'gold', 'sienna', 'olivedrab', 'lightseagreen', 'navy', 'blue', 'm', 'crimson', 'yellow', 'orangered', 'slateblue', 'aqua']
@@ -29,7 +29,7 @@ def draw(data, plt):
2929
plt.gca().xaxis.set_major_formatter(FormatStrFormatter('%0.f'))
3030
plt.grid(linewidth='0.5', color='lightgray')
3131
plt.ylabel(data.unit.unique()[0])
32-
plt.xlabel('parallelism')
32+
plt.xlabel('concurrency')
3333
plt.xticks(data.concurrency.unique())
3434

3535
colourGen = next_colour()
@@ -39,13 +39,11 @@ def draw(data, plt):
3939
genMarker = next(markerGen)
4040
res = data[(data.flows == flows)]
4141
plt.plot(res.concurrency, res.score*elements, label="flows={}".format(flows), color=genColour, marker=genMarker)
42-
43-
def genFile():
44-
data = pd.read_table(inputFile, sep=",", skiprows=1, names=["benchmark","mode","threads","samples","score","scoreError","unit","concurrency","flows"])
45-
plt.figure(figsize=(20, 20))
46-
draw(data, plt)
47-
plt.legend(loc='upper center', borderpad=0, ncol=4, frameon=False, borderaxespad=4, prop={'size': 8})
48-
plt.tight_layout(pad=12, w_pad=2, h_pad=1)
49-
plt.savefig(outputFile, bbox_inches='tight')
50-
51-
genFile()
42+
plt.errorbar(x=res.concurrency, y=res.score*elements, yerr=res.scoreError*elements, solid_capstyle='projecting', capsize=5)
43+
44+
data = pd.read_table(inputFile, sep=",", skiprows=1, names=["benchmark","mode","threads","samples","score","scoreError","unit","concurrency","flows"])
45+
plt.figure(figsize=(20, 20))
46+
draw(data, plt)
47+
plt.legend(loc='upper center', borderpad=0, ncol=4, frameon=False, borderaxespad=4, prop={'size': 8})
48+
plt.tight_layout(pad=12, w_pad=2, h_pad=1)
49+
plt.savefig(outputFile, bbox_inches='tight')

benchmarks/src/jmh/kotlin/benchmarks/SemaphoreBenchmark.kt

+2-10
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
package benchmarks
22

3+
import doWork
34
import kotlinx.coroutines.*
45
import kotlinx.coroutines.channels.Channel
56
import kotlinx.coroutines.scheduling.ExperimentalCoroutineDispatcher
67
import kotlinx.coroutines.sync.Semaphore
78
import kotlinx.coroutines.sync.withPermit
89
import org.openjdk.jmh.annotations.*
910
import java.util.concurrent.ForkJoinPool
10-
import java.util.concurrent.ThreadLocalRandom
1111
import java.util.concurrent.TimeUnit
1212

1313
@Warmup(iterations = 3, time = 500, timeUnit = TimeUnit.MICROSECONDS)
@@ -83,15 +83,7 @@ enum class SemaphoreBenchDispatcherCreator(val create: (parallelism: Int) -> Cor
8383
EXPERIMENTAL({ parallelism -> ExperimentalCoroutineDispatcher(corePoolSize = parallelism, maxPoolSize = parallelism) })
8484
}
8585

86-
fun doWork(work: Int) {
87-
// We use geometric distribution here
88-
val p = 1.0 / work
89-
val r = ThreadLocalRandom.current()
90-
while (true) {
91-
if (r.nextDouble() < p) break
92-
}
93-
}
94-
9586
private const val WORK_INSIDE = 80
9687
private const val WORK_OUTSIDE = 40
88+
// If you change this variable please be sure that you change variable elements in the corresponding python script as well
9789
private const val BATCH_SIZE = 1000000

benchmarks/src/jmh/kotlin/benchmarks/flow/FlattenMergeBenchmark.kt renamed to benchmarks/src/jmh/kotlin/benchmarks/flow/FlowFlattenMergeBenchmark.kt

+8-14
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,18 @@
11
package benchmarks.flow
22

3-
import benchmarks.doWork
4-
import kotlinx.coroutines.*
3+
import doWork
54
import kotlinx.coroutines.flow.*
5+
import kotlinx.coroutines.runBlocking
66
import org.openjdk.jmh.annotations.*
7-
import java.util.concurrent.ThreadLocalRandom
87
import java.util.concurrent.TimeUnit
9-
import kotlin.math.ceil
108

11-
/**
12-
* This benchmark can be considered as a macro benchmark for the [kotlinx.coroutines.sync.Semaphore]
13-
*/
14-
@Warmup(iterations = 5)
9+
@Warmup(iterations = 3)
1510
@Measurement(iterations = 10)
1611
@BenchmarkMode(Mode.Throughput)
1712
@OutputTimeUnit(TimeUnit.MICROSECONDS)
1813
@State(Scope.Benchmark)
1914
@Fork(1)
20-
open class FlattenMergeBenchmark {
15+
open class FlowFlattenMergeBenchmark {
2116

2217
/**
2318
* Number of flows that are merged in this benchmark. Negative number means that number of flows
@@ -27,20 +22,19 @@ open class FlattenMergeBenchmark {
2722
private var flows: Int = 0
2823

2924
@Param("1", "2", "4") // local machine
30-
// @Param("1", "2", "4", "8", "16", "32", "64", "128", "144") // dasquad
3125
// @Param("1", "2", "4", "8", "16", "32", "64", "96") // Google Cloud
3226
private var concurrency: Int = 0
3327

3428
private lateinit var flow: Flow<Flow<Int>>
3529

3630
@Setup
3731
fun setup() {
38-
val flowsNumber = if (flows >= 0) flows else -(flows * concurrency)
39-
val flowElementsToProcess = ceil(ELEMENTS / flowsNumber.toDouble())
32+
val n = if (flows >= 0) flows else -(flows * concurrency)
33+
val flowElementsToProcess = ELEMENTS / n
4034

41-
flow = (1..flowsNumber).asFlow().map {
35+
flow = (1..n).asFlow().map {
4236
flow {
43-
repeat(flowElementsToProcess.toInt()) {
37+
repeat(flowElementsToProcess) {
4438
doWork(WORK)
4539
emit(it)
4640
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
import java.util.concurrent.ThreadLocalRandom
6+
7+
fun doWork(work: Int) {
8+
// We use geometric distribution here
9+
val p = 1.0 / work
10+
val r = ThreadLocalRandom.current()
11+
while (true) {
12+
if (r.nextDouble() < p) break
13+
}
14+
}

0 commit comments

Comments
 (0)