Skip to content

Flow.flattenMerge benchmark #1464

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Dec 26, 2019
2 changes: 2 additions & 0 deletions benchmarks/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,6 @@ dependencies {
compile "org.openjdk.jmh:jmh-core:1.21"
compile 'com.typesafe.akka:akka-actor_2.12:2.5.0'
compile project(':kotlinx-coroutines-core')
// add jmh dependency on main
jmh sourceSets.main.runtimeClasspath
}
75 changes: 75 additions & 0 deletions benchmarks/scripts/generate_plots_flow_flatten_merge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# To run this script run the command 'python3 scripts/generate_plots_flow_flatten_merge.py' in the /benchmarks folder


import pandas as pd
import sys
import locale
import matplotlib.pyplot as plt
from matplotlib.ticker import FormatStrFormatter

input_file = "build/reports/jmh/results.csv"
output_file = "out/flow-flatten-merge.svg"
# Please change the value of this variable according to the FlowFlattenMergeBenchmarkKt.ELEMENTS
elements = 100000
benchmark_name = "benchmarks.flow.FlowFlattenMergeBenchmark.flattenMerge"
csv_columns = ["Benchmark", "Score", "Unit", "Param: concurrency", "Param: flowsNumberStrategy"]
rename_columns = {"Benchmark": "benchmark", "Score" : "score", "Unit" : "unit",
"Param: concurrency" : "concurrency", "Param: flowsNumberStrategy" : "flows"}

markers = ['.', 'v', '^', '1', '2', '8', 'p', 'P', 'x', 'D', 'd', 's']
colours = ['red', 'gold', 'sienna', 'olivedrab', 'lightseagreen', 'navy', 'blue', 'm', 'crimson', 'yellow', 'orangered', 'slateblue', 'aqua', 'black', 'silver']

def next_colour():
i = 0
while True:
yield colours[i % len(colours)]
i += 1

def next_marker():
i = 0
while True:
yield markers[i % len(markers)]
i += 1

def draw(data, plt):
plt.xscale('log', basex=2)
plt.gca().xaxis.set_major_formatter(FormatStrFormatter('%0.f'))
plt.grid(linewidth='0.5', color='lightgray')
if data.unit.unique()[0] != "ops/s":
print("Unexpected time unit: " + data.unit.unique()[0])
sys.exit(1)
plt.ylabel("elements / ms")
plt.xlabel('concurrency')
plt.xticks(data.concurrency.unique())

colour_gen = next_colour()
marker_gen = next_marker()
for flows in data.flows.unique():
gen_colour = next(colour_gen)
gen_marker = next(marker_gen)
res = data[(data.flows == flows)]
# plt.plot(res.concurrency, res.score*elements/1000, label="flows={}".format(flows), color=gen_colour, marker=gen_marker)
plt.errorbar(x=res.concurrency, y=res.score*elements/1000, yerr=res.score_error*elements/1000, solid_capstyle='projecting',
label="flows={}".format(flows), capsize=4, color=gen_colour, linewidth=2.2)

langlocale = locale.getdefaultlocale()[0]
locale.setlocale(locale.LC_ALL, langlocale)
dp = locale.localeconv()['decimal_point']
if dp == ",":
csv_columns.append("Score Error (99,9%)")
rename_columns["Score Error (99,9%)"] = "score_error"
elif dp == ".":
csv_columns.append("Score Error (99.9%)")
rename_columns["Score Error (99.9%)"] = "score_error"
else:
print("Unexpected locale delimeter: " + dp)
sys.exit(1)
data = pd.read_csv(input_file, sep=",", decimal=dp)
data = data[csv_columns].rename(columns=rename_columns)
data = data[(data.benchmark == benchmark_name)]
plt.rcParams.update({'font.size': 15})
plt.figure(figsize=(12.5, 10))
draw(data, plt)
plt.legend(loc='upper center', borderpad=0, bbox_to_anchor=(0.5, 1.3), ncol=2, frameon=False, borderaxespad=2, prop={'size': 15})
plt.tight_layout(pad=12, w_pad=2, h_pad=1)
plt.savefig(output_file, bbox_inches='tight')
19 changes: 5 additions & 14 deletions benchmarks/src/jmh/kotlin/benchmarks/SemaphoreBenchmark.kt
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package benchmarks

import benchmarks.common.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.scheduling.ExperimentalCoroutineDispatcher
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import org.openjdk.jmh.annotations.*
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.TimeUnit

@Warmup(iterations = 3, time = 500, timeUnit = TimeUnit.MICROSECONDS)
Expand Down Expand Up @@ -50,9 +50,9 @@ open class SemaphoreBenchmark {
jobs += GlobalScope.launch {
repeat(n) {
semaphore.withPermit {
doWork(WORK_INSIDE)
doGeomDistrWork(WORK_INSIDE)
}
doWork(WORK_OUTSIDE)
doGeomDistrWork(WORK_OUTSIDE)
}
}
}
Expand All @@ -68,9 +68,9 @@ open class SemaphoreBenchmark {
jobs += GlobalScope.launch {
repeat(n) {
semaphore.send(Unit) // acquire
doWork(WORK_INSIDE)
doGeomDistrWork(WORK_INSIDE)
semaphore.receive() // release
doWork(WORK_OUTSIDE)
doGeomDistrWork(WORK_OUTSIDE)
}
}
}
Expand All @@ -83,15 +83,6 @@ enum class SemaphoreBenchDispatcherCreator(val create: (parallelism: Int) -> Cor
EXPERIMENTAL({ parallelism -> ExperimentalCoroutineDispatcher(corePoolSize = parallelism, maxPoolSize = parallelism) })
}

private fun doWork(work: Int) {
// We use geometric distribution here
val p = 1.0 / work
val r = ThreadLocalRandom.current()
while (true) {
if (r.nextDouble() < p) break
}
}

private const val WORK_INSIDE = 80
private const val WORK_OUTSIDE = 40
private const val BATCH_SIZE = 1000000
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package benchmarks.flow

import benchmarks.common.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import org.openjdk.jmh.annotations.*
import java.util.concurrent.*

/**
* Benchmark to measure performance of [kotlinx.coroutines.flow.FlowKt.flattenMerge].
* In addition to that, it can be considered as a macro benchmark for the [kotlinx.coroutines.sync.Semaphore]
*/
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 5, time = 1)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@State(Scope.Benchmark)
@Fork(1)
open class FlowFlattenMergeBenchmark {
@Param
private var flowsNumberStrategy: FlowsNumberStrategy = FlowsNumberStrategy.`10xConcurrency flows`

@Param("1", "2", "4", "8")
private var concurrency: Int = 0

private lateinit var flow: Flow<Flow<Int>>

@Setup
fun setup() {
val n = flowsNumberStrategy.get(concurrency)
val flowElementsToProcess = ELEMENTS / n

flow = (1..n).asFlow().map {
flow {
repeat(flowElementsToProcess) {
doGeomDistrWork(WORK)
emit(it)
}
}
}
}

@Benchmark
fun flattenMerge() = runBlocking(Dispatchers.Default) {
flow.flattenMerge(concurrency = concurrency).collect()
}
}

enum class FlowsNumberStrategy(val get: (concurrency: Int) -> Int) {
`10xConcurrency flows`({ concurrency -> concurrency * 10 }),
`1xConcurrency flows`({ it }),
`100 flows`({ 100 }),
`500 flows`({ 500 })
}

// If you change this variable please be sure that you change variable elements in the generate_plots_flow_flatten_merge.py
// python script as well
private const val ELEMENTS = 100_000
private const val WORK = 100
17 changes: 17 additions & 0 deletions benchmarks/src/main/kotlin/benchmarks/common/BenchmarkUtils.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package benchmarks.common

import java.util.concurrent.*

fun doGeomDistrWork(work: Int) {
// We use geometric distribution here. We also checked on macbook pro 13" (2017) that the resulting work times
// are distributed geometrically, see https://github.com/Kotlin/kotlinx.coroutines/pull/1464#discussion_r355705325
val p = 1.0 / work
val r = ThreadLocalRandom.current()
while (true) {
if (r.nextDouble() < p) break
}
}