Skip to content

Commit 5aff68a

Browse files
ndkovalelizarov
authored andcommitted
Add fast Semaphore.
In addition, the `SegmentQueue` data structure, which emulates an infinite array with fast removing from the middle, is introduced for storing suspended acquirers in semaphore/mutex/channel algorithms. Fixes #1088
1 parent aa3d1ae commit 5aff68a

File tree

9 files changed

+875
-0
lines changed

9 files changed

+875
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package benchmarks
2+
3+
import kotlinx.coroutines.*
4+
import kotlinx.coroutines.channels.Channel
5+
import kotlinx.coroutines.scheduling.ExperimentalCoroutineDispatcher
6+
import kotlinx.coroutines.sync.Semaphore
7+
import kotlinx.coroutines.sync.withPermit
8+
import org.openjdk.jmh.annotations.*
9+
import java.util.concurrent.ForkJoinPool
10+
import java.util.concurrent.ThreadLocalRandom
11+
import java.util.concurrent.TimeUnit
12+
13+
@Warmup(iterations = 3, time = 500, timeUnit = TimeUnit.MICROSECONDS)
14+
@Measurement(iterations = 10, time = 500, timeUnit = TimeUnit.MICROSECONDS)
15+
@Fork(value = 1)
16+
@BenchmarkMode(Mode.AverageTime)
17+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
18+
@State(Scope.Benchmark)
19+
open class SemaphoreBenchmark {
20+
@Param
21+
private var _1_dispatcher: SemaphoreBenchDispatcherCreator = SemaphoreBenchDispatcherCreator.FORK_JOIN
22+
23+
@Param("0", "1000")
24+
private var _2_coroutines: Int = 0
25+
26+
@Param("1", "2", "4", "8", "32", "128", "100000")
27+
private var _3_maxPermits: Int = 0
28+
29+
@Param("1", "2", "4") // local machine
30+
// @Param("1", "2", "4", "8", "16", "32", "64", "128", "144") // dasquad
31+
// @Param("1", "2", "4", "8", "16", "32", "64", "96") // Google Cloud
32+
private var _4_parallelism: Int = 0
33+
34+
private lateinit var dispatcher: CoroutineDispatcher
35+
private var coroutines = 0
36+
37+
@InternalCoroutinesApi
38+
@Setup
39+
fun setup() {
40+
dispatcher = _1_dispatcher.create(_4_parallelism)
41+
coroutines = if (_2_coroutines == 0) _4_parallelism else _2_coroutines
42+
}
43+
44+
@Benchmark
45+
fun semaphore() = runBlocking {
46+
val n = BATCH_SIZE / coroutines
47+
val semaphore = Semaphore(_3_maxPermits)
48+
val jobs = ArrayList<Job>(coroutines)
49+
repeat(coroutines) {
50+
jobs += GlobalScope.launch {
51+
repeat(n) {
52+
semaphore.withPermit {
53+
doWork(WORK_INSIDE)
54+
}
55+
doWork(WORK_OUTSIDE)
56+
}
57+
}
58+
}
59+
jobs.forEach { it.join() }
60+
}
61+
62+
@Benchmark
63+
fun channelAsSemaphore() = runBlocking {
64+
val n = BATCH_SIZE / coroutines
65+
val semaphore = Channel<Unit>(_3_maxPermits)
66+
val jobs = ArrayList<Job>(coroutines)
67+
repeat(coroutines) {
68+
jobs += GlobalScope.launch {
69+
repeat(n) {
70+
semaphore.send(Unit) // acquire
71+
doWork(WORK_INSIDE)
72+
semaphore.receive() // release
73+
doWork(WORK_OUTSIDE)
74+
}
75+
}
76+
}
77+
jobs.forEach { it.join() }
78+
}
79+
}
80+
81+
enum class SemaphoreBenchDispatcherCreator(val create: (parallelism: Int) -> CoroutineDispatcher) {
82+
FORK_JOIN({ parallelism -> ForkJoinPool(parallelism).asCoroutineDispatcher() }),
83+
EXPERIMENTAL({ parallelism -> ExperimentalCoroutineDispatcher(corePoolSize = parallelism, maxPoolSize = parallelism) })
84+
}
85+
86+
private fun doWork(work: Int) {
87+
// We use geometric distribution here
88+
val p = 1.0 / work
89+
val r = ThreadLocalRandom.current()
90+
while (true) {
91+
if (r.nextDouble() < p) break
92+
}
93+
}
94+
95+
private const val WORK_INSIDE = 80
96+
private const val WORK_OUTSIDE = 40
97+
private const val BATCH_SIZE = 1000000

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+13
Original file line numberDiff line numberDiff line change
@@ -1008,6 +1008,19 @@ public final class kotlinx/coroutines/sync/MutexKt {
10081008
public static synthetic fun withLock$default (Lkotlinx/coroutines/sync/Mutex;Ljava/lang/Object;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
10091009
}
10101010

1011+
public abstract interface class kotlinx/coroutines/sync/Semaphore {
1012+
public abstract fun acquire (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
1013+
public abstract fun getAvailablePermits ()I
1014+
public abstract fun release ()V
1015+
public abstract fun tryAcquire ()Z
1016+
}
1017+
1018+
public final class kotlinx/coroutines/sync/SemaphoreKt {
1019+
public static final fun Semaphore (II)Lkotlinx/coroutines/sync/Semaphore;
1020+
public static synthetic fun Semaphore$default (IIILjava/lang/Object;)Lkotlinx/coroutines/sync/Semaphore;
1021+
public static final fun withPermit (Lkotlinx/coroutines/sync/Semaphore;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
1022+
}
1023+
10111024
public final class kotlinx/coroutines/test/TestCoroutineContext : kotlin/coroutines/CoroutineContext {
10121025
public fun <init> ()V
10131026
public fun <init> (Ljava/lang/String;)V
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package kotlinx.coroutines.internal
2+
3+
import kotlinx.atomicfu.AtomicRef
4+
import kotlinx.atomicfu.atomic
5+
import kotlinx.atomicfu.loop
6+
7+
/**
8+
* Essentially, this segment queue is an infinite array of segments, which is represented as
9+
* a Michael-Scott queue of them. All segments are instances of [Segment] class and
10+
* follow in natural order (see [Segment.id]) in the queue.
11+
*/
12+
internal abstract class SegmentQueue<S: Segment<S>>() {
13+
private val _head: AtomicRef<S>
14+
/**
15+
* Returns the first segment in the queue.
16+
*/
17+
protected val head: S get() = _head.value
18+
19+
private val _tail: AtomicRef<S>
20+
/**
21+
* Returns the last segment in the queue.
22+
*/
23+
protected val tail: S get() = _tail.value
24+
25+
init {
26+
val initialSegment = newSegment(0)
27+
_head = atomic(initialSegment)
28+
_tail = atomic(initialSegment)
29+
}
30+
31+
/**
32+
* The implementation should create an instance of segment [S] with the specified id
33+
* and initial reference to the previous one.
34+
*/
35+
abstract fun newSegment(id: Long, prev: S? = null): S
36+
37+
/**
38+
* Finds a segment with the specified [id] following by next references from the
39+
* [startFrom] segment. The typical use-case is reading [tail] or [head], doing some
40+
* synchronization, and invoking [getSegment] or [getSegmentAndMoveHead] correspondingly
41+
* to find the required segment.
42+
*/
43+
protected fun getSegment(startFrom: S, id: Long): S? {
44+
// Go through `next` references and add new segments if needed,
45+
// similarly to the `push` in the Michael-Scott queue algorithm.
46+
// The only difference is that `CAS failure` means that the
47+
// required segment has already been added, so the algorithm just
48+
// uses it. This way, only one segment with each id can be in the queue.
49+
var cur: S = startFrom
50+
while (cur.id < id) {
51+
var curNext = cur.next
52+
if (curNext == null) {
53+
// Add a new segment.
54+
val newTail = newSegment(cur.id + 1, cur)
55+
curNext = if (cur.casNext(null, newTail)) {
56+
if (cur.removed) {
57+
cur.remove()
58+
}
59+
moveTailForward(newTail)
60+
newTail
61+
} else {
62+
cur.next!!
63+
}
64+
}
65+
cur = curNext
66+
}
67+
if (cur.id != id) return null
68+
return cur
69+
}
70+
71+
/**
72+
* Invokes [getSegment] and replaces [head] with the result if its [id] is greater.
73+
*/
74+
protected fun getSegmentAndMoveHead(startFrom: S, id: Long): S? {
75+
@Suppress("LeakingThis")
76+
if (startFrom.id == id) return startFrom
77+
val s = getSegment(startFrom, id) ?: return null
78+
moveHeadForward(s)
79+
return s
80+
}
81+
82+
/**
83+
* Updates [head] to the specified segment
84+
* if its `id` is greater.
85+
*/
86+
private fun moveHeadForward(new: S) {
87+
_head.loop { curHead ->
88+
if (curHead.id > new.id) return
89+
if (_head.compareAndSet(curHead, new)) {
90+
new.prev.value = null
91+
return
92+
}
93+
}
94+
}
95+
96+
/**
97+
* Updates [tail] to the specified segment
98+
* if its `id` is greater.
99+
*/
100+
private fun moveTailForward(new: S) {
101+
_tail.loop { curTail ->
102+
if (curTail.id > new.id) return
103+
if (_tail.compareAndSet(curTail, new)) return
104+
}
105+
}
106+
}
107+
108+
/**
109+
* Each segment in [SegmentQueue] has a unique id and is created by [SegmentQueue.newSegment].
110+
* Essentially, this is a node in the Michael-Scott queue algorithm, but with
111+
* maintaining [prev] pointer for efficient [remove] implementation.
112+
*/
113+
internal abstract class Segment<S: Segment<S>>(val id: Long, prev: S?) {
114+
// Pointer to the next segment, updates similarly to the Michael-Scott queue algorithm.
115+
private val _next = atomic<S?>(null)
116+
val next: S? get() = _next.value
117+
fun casNext(expected: S?, value: S?): Boolean = _next.compareAndSet(expected, value)
118+
// Pointer to the previous segment, updates in [remove] function.
119+
val prev = atomic<S?>(null)
120+
121+
/**
122+
* Returns `true` if this segment is logically removed from the queue.
123+
* The [remove] function should be called right after it becomes logically removed.
124+
*/
125+
abstract val removed: Boolean
126+
127+
init {
128+
this.prev.value = prev
129+
}
130+
131+
/**
132+
* Removes this segment physically from the segment queue. The segment should be
133+
* logically removed (so [removed] returns `true`) at the point of invocation.
134+
*/
135+
fun remove() {
136+
check(removed) { " The segment should be logically removed at first "}
137+
// Read `next` and `prev` pointers.
138+
var next = this._next.value ?: return // tail cannot be removed
139+
var prev = prev.value ?: return // head cannot be removed
140+
// Link `next` and `prev`.
141+
prev.moveNextToRight(next)
142+
while (prev.removed) {
143+
prev = prev.prev.value ?: break
144+
prev.moveNextToRight(next)
145+
}
146+
next.movePrevToLeft(prev)
147+
while (next.removed) {
148+
next = next.next ?: break
149+
next.movePrevToLeft(prev)
150+
}
151+
}
152+
153+
/**
154+
* Updates [next] pointer to the specified segment if
155+
* the [id] of the specified segment is greater.
156+
*/
157+
private fun moveNextToRight(next: S) {
158+
while (true) {
159+
val curNext = this._next.value as S
160+
if (next.id <= curNext.id) return
161+
if (this._next.compareAndSet(curNext, next)) return
162+
}
163+
}
164+
165+
/**
166+
* Updates [prev] pointer to the specified segment if
167+
* the [id] of the specified segment is lower.
168+
*/
169+
private fun movePrevToLeft(prev: S) {
170+
while (true) {
171+
val curPrev = this.prev.value ?: return
172+
if (curPrev.id <= prev.id) return
173+
if (this.prev.compareAndSet(curPrev, prev)) return
174+
}
175+
}
176+
}

0 commit comments

Comments
 (0)