Skip to content

Commit 8c364c9

Browse files
committed
Introduce SegmentQueueSynchronizer abstraction and add ReadWriteMutex
1 parent 9587590 commit 8c364c9

File tree

9 files changed

+2347
-179
lines changed

9 files changed

+2347
-179
lines changed

Diff for: benchmarks/scripts/generate_plots_semaphore_jvm.py

+74
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
# To run this script run the command 'python3 scripts/generate_plots_semaphore_jvm.py' in the /benchmarks folder
2+
3+
4+
import pandas as pd
5+
import sys
6+
import locale
7+
import matplotlib.pyplot as plt
8+
from matplotlib.ticker import FormatStrFormatter
9+
10+
input_file = "build/reports/jmh/results.csv"
11+
output_file = "out/semaphore_jvm.svg"
12+
# Please change the value of this variable according to the FlowFlattenMergeBenchmarkKt.ELEMENTS
13+
operations = 1000000
14+
csv_columns = ["Score", "Param: parallelism", "Param: maxPermits", "Param: algo"]
15+
rename_columns = {"Score": "score", "Param: parallelism" : "threads", "Param: maxPermits" : "permits", "Param: algo": "algo"}
16+
17+
markers = ['v', 'P', 'x', '8', 'd', '1', '2', '8', 'p']
18+
# markers = ['.', 'v', 'P', 'x', '8', 'd', '1', '2', '8', 'p']
19+
colours = ["darkorange", "seagreen", "red", "blueviolet", "sienna"]
20+
# colours = ["royalblue", "darkorange", "seagreen", "red", "blueviolet", "sienna"]
21+
22+
def next_colour():
23+
i = 0
24+
while True:
25+
yield colours[i % len(colours)]
26+
i += 1
27+
28+
def next_marker():
29+
i = 0
30+
while True:
31+
yield markers[i % len(markers)]
32+
i += 1
33+
34+
def draw(data, plt):
35+
plt.xscale('log', basex=2)
36+
plt.gca().xaxis.set_major_formatter(FormatStrFormatter('%0.f'))
37+
plt.grid(linewidth='0.5', color='lightgray')
38+
plt.ylabel("us / op")
39+
plt.xlabel('threads')
40+
plt.xticks(data.threads.unique())
41+
42+
colour_gen = next_colour()
43+
marker_gen = next_marker()
44+
for algo in data.algo.unique():
45+
gen_colour = next(colour_gen)
46+
gen_marker = next(marker_gen)
47+
res = data[(data.algo == algo)]
48+
plt.plot(res.threads, res.score * 1000 / operations, label="{}".format(algo), color=gen_colour, marker=gen_marker)
49+
# plt.errorbar(x=res.concurrency, y=res.score*elements/1000, yerr=res.score_error*elements/1000, solid_capstyle='projecting',
50+
# label="flows={}".format(flows), capsize=4, color=gen_colour, linewidth=2.2)
51+
52+
langlocale = locale.getdefaultlocale()[0]
53+
locale.setlocale(locale.LC_ALL, langlocale)
54+
dp = locale.localeconv()['decimal_point']
55+
if dp == ",":
56+
csv_columns.append("Score Error (99,9%)")
57+
rename_columns["Score Error (99,9%)"] = "score_error"
58+
elif dp == ".":
59+
csv_columns.append("Score Error (99.9%)")
60+
rename_columns["Score Error (99.9%)"] = "score_error"
61+
else:
62+
print("Unexpected locale delimeter: " + dp)
63+
sys.exit(1)
64+
data = pd.read_csv(input_file, sep=",", decimal=dp)
65+
data = data[csv_columns].rename(columns=rename_columns)
66+
data = data[(data.permits==8)]
67+
data = data[(data.algo!="Java ReentrantLock")]
68+
data = data[(data.algo!="Java Semaphore")]
69+
plt.rcParams.update({'font.size': 15})
70+
plt.figure(figsize=(12, 12))
71+
draw(data, plt)
72+
plt.legend(loc='lower center', borderpad=0, bbox_to_anchor=(0.5, 1.3), ncol=2, frameon=False, borderaxespad=2, prop={'size': 15})
73+
plt.tight_layout(pad=12, w_pad=2, h_pad=1)
74+
plt.savefig(output_file, bbox_inches='tight')

Diff for: gradle.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ junit_version=4.12
1212
atomicfu_version=0.14.4
1313
knit_version=0.2.2
1414
html_version=0.6.8
15-
lincheck_version=2.7.1
15+
lincheck_version=2.9-SNAPSHOT
1616
dokka_version=0.9.16-rdev-2-mpp-hacks
1717
byte_buddy_version=1.10.9
1818
reactor_version=3.2.5.RELEASE

Diff for: kotlinx-coroutines-core/build.gradle

+2-2
Original file line numberDiff line numberDiff line change
@@ -201,8 +201,8 @@ task jvmStressTest(type: Test, dependsOn: compileTestKotlinJvm) {
201201
enableAssertions = true
202202
testLogging.showStandardStreams = true
203203
systemProperty 'kotlinx.coroutines.scheduler.keep.alive.sec', '100000' // any unpark problem hangs test
204-
systemProperty 'kotlinx.coroutines.semaphore.segmentSize', '2'
205-
systemProperty 'kotlinx.coroutines.semaphore.maxSpinCycles', '10'
204+
systemProperty 'kotlinx.coroutines.sqs.segmentSize', '2'
205+
systemProperty 'kotlinx.coroutines.sqs.maxSpinCycles', '10'
206206
}
207207

208208
task jdk16Test(type: Test, dependsOn: [compileTestKotlinJvm, checkJdk16]) {

Diff for: kotlinx-coroutines-core/common/src/Annotations.kt

+9
Original file line numberDiff line numberDiff line change
@@ -64,3 +64,12 @@ public annotation class ObsoleteCoroutinesApi
6464
"so stable API could be provided instead"
6565
)
6666
public annotation class InternalCoroutinesApi
67+
68+
@MustBeDocumented
69+
@Retention(value = AnnotationRetention.BINARY)
70+
@RequiresOptIn(
71+
level = RequiresOptIn.Level.ERROR,
72+
message = "TODO"
73+
)
74+
@Target(AnnotationTarget.FUNCTION, AnnotationTarget.PROPERTY)
75+
public annotation class HazardousConcurrentApi

Diff for: kotlinx-coroutines-core/common/src/internal/SegmentQueueSynchronizer.kt

+606
Large diffs are not rendered by default.
+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.sync
6+
7+
import kotlinx.coroutines.*
8+
9+
public interface ReadWriteMutex {
10+
/**
11+
*
12+
*/
13+
@HazardousConcurrentApi
14+
public suspend fun readLock()
15+
16+
/**
17+
*
18+
*/
19+
@HazardousConcurrentApi
20+
public fun readUnlock()
21+
22+
/**
23+
*
24+
*/
25+
@HazardousConcurrentApi
26+
public suspend fun writeLock()
27+
28+
/**
29+
*
30+
*/
31+
@HazardousConcurrentApi
32+
public fun writeUnlock()
33+
}
34+
35+
/**
36+
*
37+
*/
38+
@OptIn(HazardousConcurrentApi::class)
39+
public suspend inline fun <T> ReadWriteMutex.withReadLock(action: () -> T): T {
40+
readLock()
41+
try {
42+
return action()
43+
} finally {
44+
readUnlock()
45+
}
46+
}
47+
48+
/**
49+
*
50+
*/
51+
@OptIn(HazardousConcurrentApi::class)
52+
public suspend inline fun <T> ReadWriteMutex.withWriteLock(action: () -> T): T {
53+
writeLock()
54+
try {
55+
return action()
56+
} finally {
57+
writeUnlock()
58+
}
59+
}

0 commit comments

Comments
 (0)