Skip to content

Commit 2f8744c

Browse files
committed
Introduce SegmentQueueSynchronizer abstraction for synchronization primitives and ReadWriteMutex
1 parent b2a7f97 commit 2f8744c

File tree

10 files changed

+2565
-216
lines changed

10 files changed

+2565
-216
lines changed

Diff for: benchmarks/src/jmh/kotlin/benchmarks/SemaphoreBenchmark.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ open class SemaphoreBenchmark {
4848
val semaphore = Semaphore(_3_maxPermits)
4949
val jobs = ArrayList<Job>(coroutines)
5050
repeat(coroutines) {
51-
jobs += GlobalScope.launch {
51+
jobs += GlobalScope.launch(dispatcher) {
5252
repeat(n) {
5353
semaphore.withPermit {
5454
doGeomDistrWork(WORK_INSIDE)
@@ -66,7 +66,7 @@ open class SemaphoreBenchmark {
6666
val semaphore = Channel<Unit>(_3_maxPermits)
6767
val jobs = ArrayList<Job>(coroutines)
6868
repeat(coroutines) {
69-
jobs += GlobalScope.launch {
69+
jobs += GlobalScope.launch(dispatcher) {
7070
repeat(n) {
7171
semaphore.send(Unit) // acquire
7272
doGeomDistrWork(WORK_INSIDE)
@@ -87,4 +87,4 @@ enum class SemaphoreBenchDispatcherCreator(val create: (parallelism: Int) -> Cor
8787

8888
private const val WORK_INSIDE = 50
8989
private const val WORK_OUTSIDE = 50
90-
private const val BATCH_SIZE = 100000
90+
private const val BATCH_SIZE = 1000000

Diff for: kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+12
Original file line numberDiff line numberDiff line change
@@ -1329,6 +1329,18 @@ public final class kotlinx/coroutines/sync/MutexKt {
13291329
public static synthetic fun withLock$default (Lkotlinx/coroutines/sync/Mutex;Ljava/lang/Object;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
13301330
}
13311331

1332+
public abstract interface class kotlinx/coroutines/sync/ReadWriteMutex {
1333+
public abstract fun getWrite ()Lkotlinx/coroutines/sync/Mutex;
1334+
public abstract fun readLock (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
1335+
public abstract fun readUnlock ()V
1336+
}
1337+
1338+
public final class kotlinx/coroutines/sync/ReadWriteMutexKt {
1339+
public static final fun ReadWriteMutex ()Lkotlinx/coroutines/sync/ReadWriteMutex;
1340+
public static final fun read (Lkotlinx/coroutines/sync/ReadWriteMutex;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
1341+
public static final fun write (Lkotlinx/coroutines/sync/ReadWriteMutex;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
1342+
}
1343+
13321344
public abstract interface class kotlinx/coroutines/sync/Semaphore {
13331345
public abstract fun acquire (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
13341346
public abstract fun getAvailablePermits ()I

Diff for: kotlinx-coroutines-core/build.gradle

+4-4
Original file line numberDiff line numberDiff line change
@@ -266,8 +266,8 @@ task jvmStressTest(type: Test, dependsOn: compileTestKotlinJvm) {
266266
testLogging.showStandardStreams = true
267267
systemProperty 'kotlinx.coroutines.scheduler.keep.alive.sec', '100000' // any unpark problem hangs test
268268
// Adjust internal algorithmic parameters to increase the testing quality instead of performance.
269-
systemProperty 'kotlinx.coroutines.semaphore.segmentSize', '1'
270-
systemProperty 'kotlinx.coroutines.semaphore.maxSpinCycles', '10'
269+
systemProperty 'kotlinx.coroutines.sqs.segmentSize', '1'
270+
systemProperty 'kotlinx.coroutines.sqs.maxSpinCycles', '10'
271271
systemProperty 'kotlinx.coroutines.bufferedChannel.segmentSize', '2'
272272
systemProperty 'kotlinx.coroutines.bufferedChannel.expandBufferCompletionWaitIterations', '1'
273273
}
@@ -302,8 +302,8 @@ static void configureJvmForLincheck(task, additional = false) {
302302
'--add-exports', 'java.base/jdk.internal.util=ALL-UNNAMED'] // in the model checking mode
303303
// Adjust internal algorithmic parameters to increase the testing quality instead of performance.
304304
var segmentSize = additional ? '2' : '1'
305-
task.systemProperty 'kotlinx.coroutines.semaphore.segmentSize', segmentSize
306-
task.systemProperty 'kotlinx.coroutines.semaphore.maxSpinCycles', '1' // better for the model checking mode
305+
task.systemProperty 'kotlinx.coroutines.sqs.segmentSize', segmentSize
306+
task.systemProperty 'kotlinx.coroutines.sqs.maxSpinCycles', '1' // better for the model checking mode
307307
task.systemProperty 'kotlinx.coroutines.bufferedChannel.segmentSize', segmentSize
308308
task.systemProperty 'kotlinx.coroutines.bufferedChannel.expandBufferCompletionWaitIterations', '1'
309309
}

Diff for: kotlinx-coroutines-core/common/src/Debug.common.kt

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ internal expect val DEBUG: Boolean
88
internal expect val Any.hexAddress: String
99
internal expect val Any.classSimpleName: String
1010
internal expect fun assert(value: () -> Boolean)
11+
internal inline fun assertNot(crossinline value: () -> Boolean) = assert { !value() }
1112

1213
/**
1314
* Throwable which can be cloned during stacktrace recovery in a class-specific way.

Diff for: kotlinx-coroutines-core/common/src/internal/SegmentQueueSynchronizer.kt

+669
Large diffs are not rendered by default.

Diff for: kotlinx-coroutines-core/common/src/sync/ReadWriteMutex.kt

+612
Large diffs are not rendered by default.

0 commit comments

Comments
 (0)