Skip to content

Flow.flattenMerge benchmark #1464

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Dec 26, 2019
2 changes: 2 additions & 0 deletions benchmarks/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,6 @@ dependencies {
compile "org.openjdk.jmh:jmh-core:1.21"
compile 'com.typesafe.akka:akka-actor_2.12:2.5.0'
compile project(':kotlinx-coroutines-core')
// add jmh dependency on main
jmh sourceSets.main.runtimeClasspath
}
57 changes: 57 additions & 0 deletions benchmarks/scripts/generate_plots_flow_flatten_merge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import pandas as pd
import sys
import matplotlib.pyplot as plt
from matplotlib.ticker import FormatStrFormatter

input_file = "build/reports/jmh/results.csv"
output_file = "flow-flatten-merge.svg"
elements = 100000
benchmark_name = "benchmarks.flow.FlowFlattenMergeBenchmark.flattenMerge"
csv_columns = ["Benchmark", "Score", "Score Error (99.9%)", "Unit", "Param: concurrency", "Param: flows"]
rename_columns = {"Benchmark": "benchmark", "Score" : "score", "Score Error (99.9%)" : "score_error", "Unit" : "unit",
"Param: concurrency" : "concurrency", "Param: flows" : "flows"}

markers = ['.', 'v', '^', '1', '2', '8', 'p', 'P', 'x', 'D', 'd', 's']
colours = ['red', 'gold', 'sienna', 'olivedrab', 'lightseagreen', 'navy', 'blue', 'm', 'crimson', 'yellow', 'orangered', 'slateblue', 'aqua', 'black', 'silver']

def next_colour():
i = 0
while True:
yield colours[i % len(colours)]
i += 1

def next_marker():
i = 0
while True:
yield markers[i % len(markers)]
i += 1

def draw(data, plt):
plt.xscale('log', basex=2)
plt.gca().xaxis.set_major_formatter(FormatStrFormatter('%0.f'))
plt.grid(linewidth='0.5', color='lightgray')
if data.unit.unique()[0] != "ops/s":
print("Unexpected time unit: " + data.unit.unique()[0])
sys.exit(1)
plt.ylabel("elements / ms")
plt.xlabel('concurrency')
plt.xticks(data.concurrency.unique())

colour_gen = next_colour()
marker_gen = next_marker()
for flows in data.flows.unique():
gen_colour = next(colour_gen)
gen_marker = next(marker_gen)
res = data[(data.flows == flows)]
# plt.plot(res.concurrency, res.score*elements/1000, label="flows={}".format(flows), color=gen_colour, marker=gen_marker)
plt.errorbar(x=res.concurrency, y=res.score*elements/1000, yerr=res.score_error*elements/1000, solid_capstyle='projecting',
label="flows={}".format(flows), capsize=4, color=gen_colour)

data = pd.read_table(input_file, sep=",")
data = data[csv_columns].rename(columns=rename_columns)
data = data[(data.benchmark == benchmark_name)]
plt.figure(figsize=(8, 6.5))
draw(data, plt)
plt.legend(loc='upper center', borderpad=0, ncol=4, frameon=False, borderaxespad=4, prop={'size': 8})
plt.tight_layout(pad=12, w_pad=2, h_pad=1)
plt.savefig(output_file, bbox_inches='tight')
19 changes: 5 additions & 14 deletions benchmarks/src/jmh/kotlin/benchmarks/SemaphoreBenchmark.kt
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package benchmarks

import doGeomDistrWork
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.scheduling.ExperimentalCoroutineDispatcher
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import org.openjdk.jmh.annotations.*
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.TimeUnit

@Warmup(iterations = 3, time = 500, timeUnit = TimeUnit.MICROSECONDS)
Expand Down Expand Up @@ -50,9 +50,9 @@ open class SemaphoreBenchmark {
jobs += GlobalScope.launch {
repeat(n) {
semaphore.withPermit {
doWork(WORK_INSIDE)
doGeomDistrWork(WORK_INSIDE)
}
doWork(WORK_OUTSIDE)
doGeomDistrWork(WORK_OUTSIDE)
}
}
}
Expand All @@ -68,9 +68,9 @@ open class SemaphoreBenchmark {
jobs += GlobalScope.launch {
repeat(n) {
semaphore.send(Unit) // acquire
doWork(WORK_INSIDE)
doGeomDistrWork(WORK_INSIDE)
semaphore.receive() // release
doWork(WORK_OUTSIDE)
doGeomDistrWork(WORK_OUTSIDE)
}
}
}
Expand All @@ -83,15 +83,6 @@ enum class SemaphoreBenchDispatcherCreator(val create: (parallelism: Int) -> Cor
EXPERIMENTAL({ parallelism -> ExperimentalCoroutineDispatcher(corePoolSize = parallelism, maxPoolSize = parallelism) })
}

private fun doWork(work: Int) {
// We use geometric distribution here
val p = 1.0 / work
val r = ThreadLocalRandom.current()
while (true) {
if (r.nextDouble() < p) break
}
}

private const val WORK_INSIDE = 80
private const val WORK_OUTSIDE = 40
private const val BATCH_SIZE = 1000000
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package benchmarks.flow

import doGeomDistrWork
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import org.openjdk.jmh.annotations.*
import java.util.concurrent.TimeUnit

/**
* Benchmark to measure performance of [kotlinx.coroutines.flow.FlowKt.flattenMerge].
* In addition to that, it can be considered as a macro benchmark for the [kotlinx.coroutines.sync.Semaphore]
*/
@Warmup(iterations = 3, time = 1)
@Measurement(iterations = 10, time = 1)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@State(Scope.Benchmark)
@Fork(1)
open class FlowFlattenMergeBenchmark {

/**
* Number of flows that are merged in this benchmark. Negative number means that number of flows
* will be computed as -([flows] * [concurrency]), positive number will be chosen as number of flows.
*/
@Param("-10", "-1", "100", "500")
private var flows: Int = 0

@Param("1", "2", "4") // local machine
// @Param("1", "2", "4", "8", "16", "32", "64", "96") // Google Cloud
private var concurrency: Int = 0

private lateinit var flow: Flow<Flow<Int>>

@Setup
fun setup() {
val n = if (flows >= 0) flows else -(flows * concurrency)
val flowElementsToProcess = ELEMENTS / n

flow = (1..n).asFlow().map {
flow {
repeat(flowElementsToProcess) {
doGeomDistrWork(WORK)
emit(it)
}
}
}
}

@Benchmark
fun flattenMerge() = runBlocking(Dispatchers.Default) {
flow.flattenMerge(concurrency = concurrency).collect()
}
}

// If you change this variable please be sure that you change variable elements in the corresponding python script as well
private const val ELEMENTS = 100_000
private const val WORK = 100
14 changes: 14 additions & 0 deletions benchmarks/src/main/kotlin/BenchmarkUtils.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

import java.util.concurrent.ThreadLocalRandom

fun doGeomDistrWork(work: Int) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you verified that this function actually behaves like a geometric distribution when it is stressed on JVM?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tested this function on different means, and created histograms of execution times. You can see the code here https://gist.github.com/fmixing/badbfc029e154486c8766ab9cc70a2a6
Geom distr benchmark

// We use geometric distribution here
val p = 1.0 / work
val r = ThreadLocalRandom.current()
while (true) {
if (r.nextDouble() < p) break
}
}