Skip to content

Commit 22f8626

Browse files
authored
Introduce fast and scalable channels (#3103)
1 parent e946cd7 commit 22f8626

File tree

74 files changed

+4190
-2780
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+4190
-2780
lines changed

.idea/dictionaries/shared.xml

+5
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benchmarks/src/jmh/kotlin/benchmarks/ChannelProducerConsumerBenchmark.kt

+30-25
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,14 @@
44

55
package benchmarks
66

7+
import benchmarks.common.*
78
import kotlinx.coroutines.*
89
import kotlinx.coroutines.channels.Channel
10+
import kotlinx.coroutines.scheduling.*
911
import kotlinx.coroutines.selects.select
1012
import org.openjdk.jmh.annotations.*
11-
import org.openjdk.jmh.infra.Blackhole
1213
import java.lang.Integer.max
13-
import java.util.concurrent.ForkJoinPool
1414
import java.util.concurrent.Phaser
15-
import java.util.concurrent.ThreadLocalRandom
1615
import java.util.concurrent.TimeUnit
1716

1817

@@ -26,14 +25,14 @@ import java.util.concurrent.TimeUnit
2625
* Please, be patient, this benchmark takes quite a lot of time to complete.
2726
*/
2827
@Warmup(iterations = 3, time = 500, timeUnit = TimeUnit.MICROSECONDS)
29-
@Measurement(iterations = 10, time = 500, timeUnit = TimeUnit.MICROSECONDS)
30-
@Fork(value = 3)
31-
@BenchmarkMode(Mode.AverageTime)
28+
@Measurement(iterations = 20, time = 500, timeUnit = TimeUnit.MICROSECONDS)
29+
@Fork(value = 1)
30+
@BenchmarkMode(Mode.Throughput)
3231
@OutputTimeUnit(TimeUnit.MILLISECONDS)
3332
@State(Scope.Benchmark)
3433
open class ChannelProducerConsumerBenchmark {
3534
@Param
36-
private var _0_dispatcher: DispatcherCreator = DispatcherCreator.FORK_JOIN
35+
private var _0_dispatcher: DispatcherCreator = DispatcherCreator.DEFAULT
3736

3837
@Param
3938
private var _1_channel: ChannelCreator = ChannelCreator.RENDEZVOUS
@@ -44,12 +43,13 @@ open class ChannelProducerConsumerBenchmark {
4443
@Param("false", "true")
4544
private var _3_withSelect: Boolean = false
4645

47-
@Param("1", "2", "4") // local machine
48-
// @Param("1", "2", "4", "8", "12") // local machine
49-
// @Param("1", "2", "4", "8", "16", "32", "64", "128", "144") // dasquad
50-
// @Param("1", "2", "4", "8", "16", "32", "64", "96") // Google Cloud
46+
@Param("1", "2", "4", "8", "16") // local machine
47+
// @Param("1", "2", "4", "8", "16", "32", "64", "128") // Server
5148
private var _4_parallelism: Int = 0
5249

50+
@Param("50")
51+
private var _5_workSize: Int = 0
52+
5353
private lateinit var dispatcher: CoroutineDispatcher
5454
private lateinit var channel: Channel<Int>
5555

@@ -61,13 +61,21 @@ open class ChannelProducerConsumerBenchmark {
6161
}
6262

6363
@Benchmark
64-
fun spmc() {
64+
fun mcsp() {
6565
if (_2_coroutines != 0) return
6666
val producers = max(1, _4_parallelism - 1)
6767
val consumers = 1
6868
run(producers, consumers)
6969
}
7070

71+
@Benchmark
72+
fun spmc() {
73+
if (_2_coroutines != 0) return
74+
val producers = 1
75+
val consumers = max(1, _4_parallelism - 1)
76+
run(producers, consumers)
77+
}
78+
7179
@Benchmark
7280
fun mpmc() {
7381
val producers = if (_2_coroutines == 0) (_4_parallelism + 1) / 2 else _2_coroutines / 2
@@ -76,7 +84,7 @@ open class ChannelProducerConsumerBenchmark {
7684
}
7785

7886
private fun run(producers: Int, consumers: Int) {
79-
val n = APPROX_BATCH_SIZE / producers * producers
87+
val n = (APPROX_BATCH_SIZE / producers * producers) / consumers * consumers
8088
val phaser = Phaser(producers + consumers + 1)
8189
// Run producers
8290
repeat(producers) {
@@ -111,7 +119,7 @@ open class ChannelProducerConsumerBenchmark {
111119
} else {
112120
channel.send(element)
113121
}
114-
doWork()
122+
doWork(_5_workSize)
115123
}
116124

117125
private suspend fun consume(dummy: Channel<Int>?) {
@@ -123,28 +131,25 @@ open class ChannelProducerConsumerBenchmark {
123131
} else {
124132
channel.receive()
125133
}
126-
doWork()
134+
doWork(_5_workSize)
127135
}
128136
}
129137

130138
enum class DispatcherCreator(val create: (parallelism: Int) -> CoroutineDispatcher) {
131-
FORK_JOIN({ parallelism -> ForkJoinPool(parallelism).asCoroutineDispatcher() })
139+
//FORK_JOIN({ parallelism -> ForkJoinPool(parallelism).asCoroutineDispatcher() }),
140+
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
141+
DEFAULT({ parallelism -> ExperimentalCoroutineDispatcher(corePoolSize = parallelism, maxPoolSize = parallelism) })
132142
}
133143

134144
enum class ChannelCreator(private val capacity: Int) {
135145
RENDEZVOUS(Channel.RENDEZVOUS),
136-
// BUFFERED_1(1),
137-
BUFFERED_2(2),
138-
// BUFFERED_4(4),
139-
BUFFERED_32(32),
140-
BUFFERED_128(128),
146+
BUFFERED_16(16),
147+
BUFFERED_64(64),
141148
BUFFERED_UNLIMITED(Channel.UNLIMITED);
142149

143150
fun create(): Channel<Int> = Channel(capacity)
144151
}
145152

146-
private fun doWork(): Unit = Blackhole.consumeCPU(ThreadLocalRandom.current().nextLong(WORK_MIN, WORK_MAX))
153+
private fun doWork(workSize: Int): Unit = doGeomDistrWork(workSize)
147154

148-
private const val WORK_MIN = 50L
149-
private const val WORK_MAX = 100L
150-
private const val APPROX_BATCH_SIZE = 100000
155+
private const val APPROX_BATCH_SIZE = 100_000

benchmarks/src/jmh/kotlin/benchmarks/SemaphoreBenchmark.kt

+10-9
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package benchmarks
77
import benchmarks.common.*
88
import kotlinx.coroutines.*
99
import kotlinx.coroutines.channels.*
10+
import kotlinx.coroutines.scheduling.*
1011
import kotlinx.coroutines.sync.*
1112
import org.openjdk.jmh.annotations.*
1213
import java.util.concurrent.*
@@ -19,17 +20,16 @@ import java.util.concurrent.*
1920
@State(Scope.Benchmark)
2021
open class SemaphoreBenchmark {
2122
@Param
22-
private var _1_dispatcher: SemaphoreBenchDispatcherCreator = SemaphoreBenchDispatcherCreator.FORK_JOIN
23+
private var _1_dispatcher: SemaphoreBenchDispatcherCreator = SemaphoreBenchDispatcherCreator.DEFAULT
2324

2425
@Param("0", "1000")
2526
private var _2_coroutines: Int = 0
2627

2728
@Param("1", "2", "4", "8", "32", "128", "100000")
2829
private var _3_maxPermits: Int = 0
2930

30-
@Param("1", "2", "4") // local machine
31-
// @Param("1", "2", "4", "8", "16", "32", "64", "128", "144") // dasquad
32-
// @Param("1", "2", "4", "8", "16", "32", "64", "96") // Google Cloud
31+
@Param("1", "2", "4", "8", "16") // local machine
32+
// @Param("1", "2", "4", "8", "16", "32", "64", "128") // Server
3333
private var _4_parallelism: Int = 0
3434

3535
private lateinit var dispatcher: CoroutineDispatcher
@@ -80,10 +80,11 @@ open class SemaphoreBenchmark {
8080
}
8181

8282
enum class SemaphoreBenchDispatcherCreator(val create: (parallelism: Int) -> CoroutineDispatcher) {
83-
FORK_JOIN({ parallelism -> ForkJoinPool(parallelism).asCoroutineDispatcher() }),
84-
EXPERIMENTAL({ parallelism -> Dispatchers.Default }) // TODO doesn't take parallelism into account
83+
// FORK_JOIN({ parallelism -> ForkJoinPool(parallelism).asCoroutineDispatcher() }),
84+
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
85+
DEFAULT({ parallelism -> ExperimentalCoroutineDispatcher(corePoolSize = parallelism, maxPoolSize = parallelism) })
8586
}
8687

87-
private const val WORK_INSIDE = 80
88-
private const val WORK_OUTSIDE = 40
89-
private const val BATCH_SIZE = 1000000
88+
private const val WORK_INSIDE = 50
89+
private const val WORK_OUTSIDE = 50
90+
private const val BATCH_SIZE = 100000

gradle.properties

+4-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ junit5_version=5.7.0
1313
atomicfu_version=0.19.0
1414
knit_version=0.4.0
1515
html_version=0.7.2
16-
lincheck_version=2.14.1
16+
lincheck_version=2.16
1717
dokka_version=1.7.20
1818
byte_buddy_version=1.10.9
1919
reactor_version=3.4.1
@@ -59,4 +59,6 @@ org.gradle.jvmargs=-Xmx3g
5959
kotlin.mpp.enableCompatibilityMetadataVariant=true
6060
kotlin.mpp.stability.nowarn=true
6161
kotlinx.atomicfu.enableJvmIrTransformation=true
62-
kotlinx.atomicfu.enableJsIrTransformation=true
62+
# When the flag below is set to `true`, AtomicFU cannot process
63+
# usages of `moveForward` in `ConcurrentLinkedList.kt` correctly.
64+
kotlinx.atomicfu.enableJsIrTransformation=false

gradle/test-mocha-js.gradle

+4-3
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ task testMochaNode(type: NodeTask, dependsOn: [compileTestJsLegacy, installDepen
2929

3030
def jsLegacyTestTask = project.tasks.findByName('jsLegacyTest') ? jsLegacyTest : jsTest
3131

32-
jsLegacyTestTask.dependsOn testMochaNode
32+
// TODO
33+
//jsLegacyTestTask.dependsOn testMochaNode
3334

3435
// -- Testing with Mocha under headless Chrome
3536

@@ -100,5 +101,5 @@ task testMochaJsdom(type: NodeTask, dependsOn: [compileTestJsLegacy, installDepe
100101
if (project.hasProperty("teamcity")) args.addAll(['--reporter', 'mocha-teamcity-reporter'])
101102
}
102103

103-
jsLegacyTestTask.dependsOn testMochaJsdom
104-
104+
// TODO
105+
//jsLegacyTestTask.dependsOn testMochaJsdom

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public final class kotlinx/coroutines/CancellableContinuation$DefaultImpls {
5151
public static synthetic fun tryResume$default (Lkotlinx/coroutines/CancellableContinuation;Ljava/lang/Object;Ljava/lang/Object;ILjava/lang/Object;)Ljava/lang/Object;
5252
}
5353

54-
public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/jvm/internal/CoroutineStackFrame, kotlinx/coroutines/CancellableContinuation {
54+
public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/jvm/internal/CoroutineStackFrame, kotlinx/coroutines/CancellableContinuation, kotlinx/coroutines/channels/Waiter {
5555
public fun <init> (Lkotlin/coroutines/Continuation;I)V
5656
public final fun callCancelHandler (Lkotlinx/coroutines/CancelHandler;Ljava/lang/Throwable;)V
5757
public final fun callOnCancellation (Lkotlin/jvm/functions/Function1;Ljava/lang/Throwable;)V

kotlinx-coroutines-core/build.gradle

+25-4
Original file line numberDiff line numberDiff line change
@@ -265,8 +265,11 @@ task jvmStressTest(type: Test, dependsOn: compileTestKotlinJvm) {
265265
enableAssertions = true
266266
testLogging.showStandardStreams = true
267267
systemProperty 'kotlinx.coroutines.scheduler.keep.alive.sec', '100000' // any unpark problem hangs test
268-
systemProperty 'kotlinx.coroutines.semaphore.segmentSize', '2'
268+
// Adjust internal algorithmic parameters to increase the testing quality instead of performance.
269+
systemProperty 'kotlinx.coroutines.semaphore.segmentSize', '1'
269270
systemProperty 'kotlinx.coroutines.semaphore.maxSpinCycles', '10'
271+
systemProperty 'kotlinx.coroutines.bufferedChannel.segmentSize', '2'
272+
systemProperty 'kotlinx.coroutines.bufferedChannel.expandBufferCompletionWaitIterations', '1'
270273
}
271274

272275
task jvmLincheckTest(type: Test, dependsOn: compileTestKotlinJvm) {
@@ -278,17 +281,35 @@ task jvmLincheckTest(type: Test, dependsOn: compileTestKotlinJvm) {
278281
configureJvmForLincheck(jvmLincheckTest)
279282
}
280283

281-
static void configureJvmForLincheck(task) {
284+
// Additional Lincheck tests with `segmentSize = 2`.
285+
// Some bugs cannot be revealed when storing one request per segment,
286+
// and some are hard to detect when storing multiple requests.
287+
task jvmLincheckTestAdditional(type: Test, dependsOn: compileTestKotlinJvm) {
288+
classpath = files { jvmTest.classpath }
289+
testClassesDirs = files { jvmTest.testClassesDirs }
290+
include '**/RendezvousChannelLincheckTest*'
291+
include '**/Buffered1ChannelLincheckTest*'
292+
include '**/Semaphore*LincheckTest*'
293+
enableAssertions = true
294+
testLogging.showStandardStreams = true
295+
configureJvmForLincheck(jvmLincheckTestAdditional, true)
296+
}
297+
298+
static void configureJvmForLincheck(task, additional = false) {
282299
task.minHeapSize = '1g'
283300
task.maxHeapSize = '4g' // we may need more space for building an interleaving tree in the model checking mode
284301
task.jvmArgs = ['--add-opens', 'java.base/jdk.internal.misc=ALL-UNNAMED', // required for transformation
285302
'--add-exports', 'java.base/jdk.internal.util=ALL-UNNAMED'] // in the model checking mode
286-
task.systemProperty 'kotlinx.coroutines.semaphore.segmentSize', '2'
303+
// Adjust internal algorithmic parameters to increase the testing quality instead of performance.
304+
var segmentSize = additional ? '2' : '1'
305+
task.systemProperty 'kotlinx.coroutines.semaphore.segmentSize', segmentSize
287306
task.systemProperty 'kotlinx.coroutines.semaphore.maxSpinCycles', '1' // better for the model checking mode
307+
task.systemProperty 'kotlinx.coroutines.bufferedChannel.segmentSize', segmentSize
308+
task.systemProperty 'kotlinx.coroutines.bufferedChannel.expandBufferCompletionWaitIterations', '1'
288309
}
289310

290311
// Always check additional test sets
291-
task moreTest(dependsOn: [jvmStressTest, jvmLincheckTest])
312+
task moreTest(dependsOn: [jvmStressTest, jvmLincheckTest, jvmLincheckTestAdditional])
292313
check.dependsOn moreTest
293314

294315
tasks.jvmLincheckTest {

kotlinx-coroutines-core/common/src/CancellableContinuation.kt

-14
Original file line numberDiff line numberDiff line change
@@ -358,13 +358,6 @@ internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>):
358358
?: return CancellableContinuationImpl(delegate, MODE_CANCELLABLE_REUSABLE)
359359
}
360360

361-
/**
362-
* Removes the specified [node] on cancellation. This function assumes that this node is already
363-
* removed on successful resume and does not try to remove it if the continuation is cancelled during dispatch.
364-
*/
365-
internal fun CancellableContinuation<*>.removeOnCancellation(node: LockFreeLinkedListNode) =
366-
invokeOnCancellation(handler = RemoveOnCancel(node).asHandler)
367-
368361
/**
369362
* Disposes the specified [handle] when this continuation is cancelled.
370363
*
@@ -379,13 +372,6 @@ internal fun CancellableContinuation<*>.removeOnCancellation(node: LockFreeLinke
379372
public fun CancellableContinuation<*>.disposeOnCancellation(handle: DisposableHandle): Unit =
380373
invokeOnCancellation(handler = DisposeOnCancel(handle).asHandler)
381374

382-
// --------------- implementation details ---------------
383-
384-
private class RemoveOnCancel(private val node: LockFreeLinkedListNode) : BeforeResumeCancelHandler() {
385-
override fun invoke(cause: Throwable?) { node.remove() }
386-
override fun toString() = "RemoveOnCancel[$node]"
387-
}
388-
389375
private class DisposeOnCancel(private val handle: DisposableHandle) : CancelHandler() {
390376
override fun invoke(cause: Throwable?) = handle.dispose()
391377
override fun toString(): String = "DisposeOnCancel[$handle]"

kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package kotlinx.coroutines
66

77
import kotlinx.atomicfu.*
8+
import kotlinx.coroutines.channels.Waiter
89
import kotlinx.coroutines.internal.*
910
import kotlin.coroutines.*
1011
import kotlin.coroutines.intrinsics.*
@@ -24,7 +25,7 @@ internal val RESUME_TOKEN = Symbol("RESUME_TOKEN")
2425
internal open class CancellableContinuationImpl<in T>(
2526
final override val delegate: Continuation<T>,
2627
resumeMode: Int
27-
) : DispatchedTask<T>(resumeMode), CancellableContinuation<T>, CoroutineStackFrame {
28+
) : DispatchedTask<T>(resumeMode), CancellableContinuation<T>, CoroutineStackFrame, Waiter {
2829
init {
2930
assert { resumeMode != MODE_UNINITIALIZED } // invalid mode for CancellableContinuationImpl
3031
}

0 commit comments

Comments
 (0)