Skip to content

Flow.flattenMerge benchmark #1464

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Dec 26, 2019
4 changes: 2 additions & 2 deletions benchmarks/scripts/generate_plots_flow_flatten_merge.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# To run this script run the command 'python3 scripts/generate_plots_flow_flatten_merge.py' in the benchmarks/ folder or
# 'python3 generate_plots_flow_flatten_merge.py' in the benchmarks/scripts/ folder
# To run this script run the command 'python3 scripts/generate_plots_flow_flatten_merge.py' in the benchmarks/ folder


import pandas as pd
import sys
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@
package benchmarks.flow

import benchmarks.common.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import org.openjdk.jmh.annotations.*
import java.util.concurrent.TimeUnit
import java.util.concurrent.*

/**
* Benchmark to measure performance of [kotlinx.coroutines.flow.FlowKt.flattenMerge].
* In addition to that, it can be considered as a macro benchmark for the [kotlinx.coroutines.sync.Semaphore]
*/
@Warmup(iterations = 3, time = 1)
@Measurement(iterations = 10, time = 1)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 5, time = 1)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@State(Scope.Benchmark)
Expand All @@ -26,18 +25,17 @@ open class FlowFlattenMergeBenchmark {
* Number of flows that are merged in this benchmark. Negative number means that number of flows
* will be computed as -([flows] * [concurrency]), positive number will be chosen as number of flows.
*/
@Param("-10", "-1", "100", "500")
private var flows: Int = 0
@Param
private var flows: Flows = Flows.`10xConcurrency flows`

@Param("1", "2", "4") // local machine
// @Param("1", "2", "4", "8", "16", "32", "64", "96") // Google Cloud
@Param("1", "2", "4")
private var concurrency: Int = 0

private lateinit var flow: Flow<Flow<Int>>

@Setup
fun setup() {
val n = if (flows >= 0) flows else -(flows * concurrency)
val n = flows.getFlows(concurrency)
val flowElementsToProcess = ELEMENTS / n

flow = (1..n).asFlow().map {
Expand All @@ -56,6 +54,23 @@ open class FlowFlattenMergeBenchmark {
}
}

enum class Flows {
`10xConcurrency flows` {
override fun getFlows(concurrency: Int): Int = 10 * concurrency
},
`1xConcurrency flows` {
override fun getFlows(concurrency: Int): Int = concurrency
},
`100 flows` {
override fun getFlows(concurrency: Int): Int = 100
},
`500 flows` {
override fun getFlows(concurrency: Int): Int = 500
};

abstract fun getFlows(concurrency : Int): Int
}

// If you change this variable please be sure that you change variable elements in the generate_plots_flow_flatten_merge.py
// python script as well
private const val ELEMENTS = 100_000
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package benchmarks.common

import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.*

fun doGeomDistrWork(work: Int) {
// We use geometric distribution here
Expand Down