Skip to content

Introduce fast and scalable channels #3103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .idea/dictionaries/shared.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@

package benchmarks

import benchmarks.common.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.scheduling.*
import kotlinx.coroutines.selects.select
import org.openjdk.jmh.annotations.*
import org.openjdk.jmh.infra.Blackhole
import java.lang.Integer.max
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.Phaser
import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.TimeUnit


Expand All @@ -26,14 +25,14 @@ import java.util.concurrent.TimeUnit
* Please, be patient, this benchmark takes quite a lot of time to complete.
*/
@Warmup(iterations = 3, time = 500, timeUnit = TimeUnit.MICROSECONDS)
@Measurement(iterations = 10, time = 500, timeUnit = TimeUnit.MICROSECONDS)
@Fork(value = 3)
@BenchmarkMode(Mode.AverageTime)
@Measurement(iterations = 20, time = 500, timeUnit = TimeUnit.MICROSECONDS)
@Fork(value = 1)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
open class ChannelProducerConsumerBenchmark {
@Param
private var _0_dispatcher: DispatcherCreator = DispatcherCreator.FORK_JOIN
private var _0_dispatcher: DispatcherCreator = DispatcherCreator.DEFAULT

@Param
private var _1_channel: ChannelCreator = ChannelCreator.RENDEZVOUS
Expand All @@ -44,12 +43,13 @@ open class ChannelProducerConsumerBenchmark {
@Param("false", "true")
private var _3_withSelect: Boolean = false

@Param("1", "2", "4") // local machine
// @Param("1", "2", "4", "8", "12") // local machine
// @Param("1", "2", "4", "8", "16", "32", "64", "128", "144") // dasquad
// @Param("1", "2", "4", "8", "16", "32", "64", "96") // Google Cloud
@Param("1", "2", "4", "8", "16") // local machine
// @Param("1", "2", "4", "8", "16", "32", "64", "128") // Server
private var _4_parallelism: Int = 0

@Param("50")
private var _5_workSize: Int = 0

private lateinit var dispatcher: CoroutineDispatcher
private lateinit var channel: Channel<Int>

Expand All @@ -61,13 +61,21 @@ open class ChannelProducerConsumerBenchmark {
}

@Benchmark
fun spmc() {
fun mcsp() {
if (_2_coroutines != 0) return
val producers = max(1, _4_parallelism - 1)
val consumers = 1
run(producers, consumers)
}

@Benchmark
fun spmc() {
if (_2_coroutines != 0) return
val producers = 1
val consumers = max(1, _4_parallelism - 1)
run(producers, consumers)
}

@Benchmark
fun mpmc() {
val producers = if (_2_coroutines == 0) (_4_parallelism + 1) / 2 else _2_coroutines / 2
Expand All @@ -76,7 +84,7 @@ open class ChannelProducerConsumerBenchmark {
}

private fun run(producers: Int, consumers: Int) {
val n = APPROX_BATCH_SIZE / producers * producers
val n = (APPROX_BATCH_SIZE / producers * producers) / consumers * consumers
val phaser = Phaser(producers + consumers + 1)
// Run producers
repeat(producers) {
Expand Down Expand Up @@ -111,7 +119,7 @@ open class ChannelProducerConsumerBenchmark {
} else {
channel.send(element)
}
doWork()
doWork(_5_workSize)
}

private suspend fun consume(dummy: Channel<Int>?) {
Expand All @@ -123,28 +131,25 @@ open class ChannelProducerConsumerBenchmark {
} else {
channel.receive()
}
doWork()
doWork(_5_workSize)
}
}

enum class DispatcherCreator(val create: (parallelism: Int) -> CoroutineDispatcher) {
FORK_JOIN({ parallelism -> ForkJoinPool(parallelism).asCoroutineDispatcher() })
//FORK_JOIN({ parallelism -> ForkJoinPool(parallelism).asCoroutineDispatcher() }),
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
DEFAULT({ parallelism -> ExperimentalCoroutineDispatcher(corePoolSize = parallelism, maxPoolSize = parallelism) })
}

enum class ChannelCreator(private val capacity: Int) {
RENDEZVOUS(Channel.RENDEZVOUS),
// BUFFERED_1(1),
BUFFERED_2(2),
// BUFFERED_4(4),
BUFFERED_32(32),
BUFFERED_128(128),
BUFFERED_16(16),
BUFFERED_64(64),
BUFFERED_UNLIMITED(Channel.UNLIMITED);

fun create(): Channel<Int> = Channel(capacity)
}

private fun doWork(): Unit = Blackhole.consumeCPU(ThreadLocalRandom.current().nextLong(WORK_MIN, WORK_MAX))
private fun doWork(workSize: Int): Unit = doGeomDistrWork(workSize)

private const val WORK_MIN = 50L
private const val WORK_MAX = 100L
private const val APPROX_BATCH_SIZE = 100000
private const val APPROX_BATCH_SIZE = 100_000
19 changes: 10 additions & 9 deletions benchmarks/src/jmh/kotlin/benchmarks/SemaphoreBenchmark.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package benchmarks
import benchmarks.common.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.scheduling.*
import kotlinx.coroutines.sync.*
import org.openjdk.jmh.annotations.*
import java.util.concurrent.*
Expand All @@ -19,17 +20,16 @@ import java.util.concurrent.*
@State(Scope.Benchmark)
open class SemaphoreBenchmark {
@Param
private var _1_dispatcher: SemaphoreBenchDispatcherCreator = SemaphoreBenchDispatcherCreator.FORK_JOIN
private var _1_dispatcher: SemaphoreBenchDispatcherCreator = SemaphoreBenchDispatcherCreator.DEFAULT

@Param("0", "1000")
private var _2_coroutines: Int = 0

@Param("1", "2", "4", "8", "32", "128", "100000")
private var _3_maxPermits: Int = 0

@Param("1", "2", "4") // local machine
// @Param("1", "2", "4", "8", "16", "32", "64", "128", "144") // dasquad
// @Param("1", "2", "4", "8", "16", "32", "64", "96") // Google Cloud
@Param("1", "2", "4", "8", "16") // local machine
// @Param("1", "2", "4", "8", "16", "32", "64", "128") // Server
private var _4_parallelism: Int = 0

private lateinit var dispatcher: CoroutineDispatcher
Expand Down Expand Up @@ -80,10 +80,11 @@ open class SemaphoreBenchmark {
}

enum class SemaphoreBenchDispatcherCreator(val create: (parallelism: Int) -> CoroutineDispatcher) {
FORK_JOIN({ parallelism -> ForkJoinPool(parallelism).asCoroutineDispatcher() }),
EXPERIMENTAL({ parallelism -> Dispatchers.Default }) // TODO doesn't take parallelism into account
// FORK_JOIN({ parallelism -> ForkJoinPool(parallelism).asCoroutineDispatcher() }),
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
DEFAULT({ parallelism -> ExperimentalCoroutineDispatcher(corePoolSize = parallelism, maxPoolSize = parallelism) })
}

private const val WORK_INSIDE = 80
private const val WORK_OUTSIDE = 40
private const val BATCH_SIZE = 1000000
private const val WORK_INSIDE = 50
private const val WORK_OUTSIDE = 50
private const val BATCH_SIZE = 100000
6 changes: 4 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ junit5_version=5.7.0
atomicfu_version=0.19.0
knit_version=0.4.0
html_version=0.7.2
lincheck_version=2.14.1
lincheck_version=2.16
dokka_version=1.7.20
byte_buddy_version=1.10.9
reactor_version=3.4.1
Expand Down Expand Up @@ -59,4 +59,6 @@ org.gradle.jvmargs=-Xmx3g
kotlin.mpp.enableCompatibilityMetadataVariant=true
kotlin.mpp.stability.nowarn=true
kotlinx.atomicfu.enableJvmIrTransformation=true
kotlinx.atomicfu.enableJsIrTransformation=true
# When the flag below is set to `true`, AtomicFU cannot process
# usages of `moveForward` in `ConcurrentLinkedList.kt` correctly.
kotlinx.atomicfu.enableJsIrTransformation=false
7 changes: 4 additions & 3 deletions gradle/test-mocha-js.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ task testMochaNode(type: NodeTask, dependsOn: [compileTestJsLegacy, installDepen

def jsLegacyTestTask = project.tasks.findByName('jsLegacyTest') ? jsLegacyTest : jsTest

jsLegacyTestTask.dependsOn testMochaNode
// TODO
//jsLegacyTestTask.dependsOn testMochaNode

// -- Testing with Mocha under headless Chrome

Expand Down Expand Up @@ -100,5 +101,5 @@ task testMochaJsdom(type: NodeTask, dependsOn: [compileTestJsLegacy, installDepe
if (project.hasProperty("teamcity")) args.addAll(['--reporter', 'mocha-teamcity-reporter'])
}

jsLegacyTestTask.dependsOn testMochaJsdom

// TODO
//jsLegacyTestTask.dependsOn testMochaJsdom
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public final class kotlinx/coroutines/CancellableContinuation$DefaultImpls {
public static synthetic fun tryResume$default (Lkotlinx/coroutines/CancellableContinuation;Ljava/lang/Object;Ljava/lang/Object;ILjava/lang/Object;)Ljava/lang/Object;
}

public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/jvm/internal/CoroutineStackFrame, kotlinx/coroutines/CancellableContinuation {
public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/jvm/internal/CoroutineStackFrame, kotlinx/coroutines/CancellableContinuation, kotlinx/coroutines/channels/Waiter {
public fun <init> (Lkotlin/coroutines/Continuation;I)V
public final fun callCancelHandler (Lkotlinx/coroutines/CancelHandler;Ljava/lang/Throwable;)V
public final fun callOnCancellation (Lkotlin/jvm/functions/Function1;Ljava/lang/Throwable;)V
Expand Down
29 changes: 25 additions & 4 deletions kotlinx-coroutines-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,11 @@ task jvmStressTest(type: Test, dependsOn: compileTestKotlinJvm) {
enableAssertions = true
testLogging.showStandardStreams = true
systemProperty 'kotlinx.coroutines.scheduler.keep.alive.sec', '100000' // any unpark problem hangs test
systemProperty 'kotlinx.coroutines.semaphore.segmentSize', '2'
// Adjust internal algorithmic parameters to increase the testing quality instead of performance.
systemProperty 'kotlinx.coroutines.semaphore.segmentSize', '1'
systemProperty 'kotlinx.coroutines.semaphore.maxSpinCycles', '10'
systemProperty 'kotlinx.coroutines.bufferedChannel.segmentSize', '2'
systemProperty 'kotlinx.coroutines.bufferedChannel.expandBufferCompletionWaitIterations', '1'
}

task jvmLincheckTest(type: Test, dependsOn: compileTestKotlinJvm) {
Expand All @@ -278,17 +281,35 @@ task jvmLincheckTest(type: Test, dependsOn: compileTestKotlinJvm) {
configureJvmForLincheck(jvmLincheckTest)
}

static void configureJvmForLincheck(task) {
// Additional Lincheck tests with `segmentSize = 2`.
// Some bugs cannot be revealed when storing one request per segment,
// and some are hard to detect when storing multiple requests.
task jvmLincheckTestAdditional(type: Test, dependsOn: compileTestKotlinJvm) {
classpath = files { jvmTest.classpath }
testClassesDirs = files { jvmTest.testClassesDirs }
include '**/RendezvousChannelLincheckTest*'
include '**/Buffered1ChannelLincheckTest*'
include '**/Semaphore*LincheckTest*'
enableAssertions = true
testLogging.showStandardStreams = true
configureJvmForLincheck(jvmLincheckTestAdditional, true)
}

static void configureJvmForLincheck(task, additional = false) {
task.minHeapSize = '1g'
task.maxHeapSize = '4g' // we may need more space for building an interleaving tree in the model checking mode
task.jvmArgs = ['--add-opens', 'java.base/jdk.internal.misc=ALL-UNNAMED', // required for transformation
'--add-exports', 'java.base/jdk.internal.util=ALL-UNNAMED'] // in the model checking mode
task.systemProperty 'kotlinx.coroutines.semaphore.segmentSize', '2'
// Adjust internal algorithmic parameters to increase the testing quality instead of performance.
var segmentSize = additional ? '2' : '1'
task.systemProperty 'kotlinx.coroutines.semaphore.segmentSize', segmentSize
task.systemProperty 'kotlinx.coroutines.semaphore.maxSpinCycles', '1' // better for the model checking mode
task.systemProperty 'kotlinx.coroutines.bufferedChannel.segmentSize', segmentSize
task.systemProperty 'kotlinx.coroutines.bufferedChannel.expandBufferCompletionWaitIterations', '1'
}

// Always check additional test sets
task moreTest(dependsOn: [jvmStressTest, jvmLincheckTest])
task moreTest(dependsOn: [jvmStressTest, jvmLincheckTest, jvmLincheckTestAdditional])
check.dependsOn moreTest

tasks.jvmLincheckTest {
Expand Down
14 changes: 0 additions & 14 deletions kotlinx-coroutines-core/common/src/CancellableContinuation.kt
Original file line number Diff line number Diff line change
Expand Up @@ -358,13 +358,6 @@ internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>):
?: return CancellableContinuationImpl(delegate, MODE_CANCELLABLE_REUSABLE)
}

/**
* Removes the specified [node] on cancellation. This function assumes that this node is already
* removed on successful resume and does not try to remove it if the continuation is cancelled during dispatch.
*/
internal fun CancellableContinuation<*>.removeOnCancellation(node: LockFreeLinkedListNode) =
invokeOnCancellation(handler = RemoveOnCancel(node).asHandler)

/**
* Disposes the specified [handle] when this continuation is cancelled.
*
Expand All @@ -379,13 +372,6 @@ internal fun CancellableContinuation<*>.removeOnCancellation(node: LockFreeLinke
public fun CancellableContinuation<*>.disposeOnCancellation(handle: DisposableHandle): Unit =
invokeOnCancellation(handler = DisposeOnCancel(handle).asHandler)

// --------------- implementation details ---------------

private class RemoveOnCancel(private val node: LockFreeLinkedListNode) : BeforeResumeCancelHandler() {
override fun invoke(cause: Throwable?) { node.remove() }
override fun toString() = "RemoveOnCancel[$node]"
}

private class DisposeOnCancel(private val handle: DisposableHandle) : CancelHandler() {
override fun invoke(cause: Throwable?) = handle.dispose()
override fun toString(): String = "DisposeOnCancel[$handle]"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package kotlinx.coroutines

import kotlinx.atomicfu.*
import kotlinx.coroutines.channels.Waiter
import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
Expand All @@ -24,7 +25,7 @@ internal val RESUME_TOKEN = Symbol("RESUME_TOKEN")
internal open class CancellableContinuationImpl<in T>(
final override val delegate: Continuation<T>,
resumeMode: Int
) : DispatchedTask<T>(resumeMode), CancellableContinuation<T>, CoroutineStackFrame {
) : DispatchedTask<T>(resumeMode), CancellableContinuation<T>, CoroutineStackFrame, Waiter {
init {
assert { resumeMode != MODE_UNINITIALIZED } // invalid mode for CancellableContinuationImpl
}
Expand Down
Loading