Skip to content

Commit 3c15b6f

Browse files
ndkovalelizarov
authored andcommitted
Add Semaphore.
In addition, a `SegmentQueue` data structure, which emulates an infinite array with fast removing from the middle, is introduced for storing suspended acquirers in the semaphore algorithm. Fixes #1088
1 parent 6e81083 commit 3c15b6f

File tree

6 files changed

+772
-0
lines changed

6 files changed

+772
-0
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+13
Original file line numberDiff line numberDiff line change
@@ -981,6 +981,19 @@ public final class kotlinx/coroutines/sync/MutexKt {
981981
public static synthetic fun withLock$default (Lkotlinx/coroutines/sync/Mutex;Ljava/lang/Object;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
982982
}
983983

984+
public abstract interface class kotlinx/coroutines/sync/Semaphore {
985+
public abstract fun acquire (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
986+
public abstract fun getAvailablePermits ()I
987+
public abstract fun release ()V
988+
public abstract fun tryAcquire ()Z
989+
}
990+
991+
public final class kotlinx/coroutines/sync/SemaphoreKt {
992+
public static final fun Semaphore (II)Lkotlinx/coroutines/sync/Semaphore;
993+
public static synthetic fun Semaphore$default (IIILjava/lang/Object;)Lkotlinx/coroutines/sync/Semaphore;
994+
public static final fun withSemaphore (Lkotlinx/coroutines/sync/Semaphore;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
995+
}
996+
984997
public final class kotlinx/coroutines/test/TestCoroutineContext : kotlin/coroutines/CoroutineContext {
985998
public fun <init> ()V
986999
public fun <init> (Ljava/lang/String;)V
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
package kotlinx.coroutines.internal
2+
3+
import kotlinx.atomicfu.AtomicRef
4+
import kotlinx.atomicfu.atomic
5+
6+
/**
7+
* Essentially, this segment queue is an infinite array of segments, which is represented as
8+
* a Michael-Scott queue of them. All segments are instances of [Segment] interface and
9+
* follow in natural order (see [Segment.id]) in the queue.
10+
*
11+
* In some data structures, like `Semaphore`, this queue is used for storing suspended continuations
12+
* and is always empty in uncontended scenarios. Therefore, there is no need in creating
13+
* the first segment in advance in this case. A special `createFirstSegmentLazily` is introduced
14+
* to create segments lazily, on the first [getSegment] invocation; it is set to `false` by default.
15+
*/
16+
internal abstract class SegmentQueue<S: Segment<S>>(createFirstSegmentLazily: Boolean = false) {
17+
private val _head: AtomicRef<S?>
18+
/**
19+
* Returns the first segment in the queue. All segments with lower [id]
20+
*/
21+
protected val first: S? get() = _head.value
22+
23+
private val _tail: AtomicRef<S?>
24+
protected val last: S? get() = _tail.value
25+
26+
init {
27+
val initialSegment = if (createFirstSegmentLazily) null else newSegment(0)
28+
_head = atomic(initialSegment)
29+
_tail = atomic(initialSegment)
30+
}
31+
32+
/**
33+
* The implementation should create an instance of segment [S] with the specified id
34+
* and initial reference to the previous one.
35+
*/
36+
abstract fun newSegment(id: Long, prev: S? = null): S
37+
38+
/**
39+
* Finds a segment with the specified [id] following by next references from the
40+
* [startFrom] segment. The typical use-case is reading [last] or [first], doing some
41+
* synchronization, and invoking [getSegment] or [getSegmentAndMoveFirst] correspondingly
42+
* to find the required segment.
43+
*/
44+
protected fun getSegment(startFrom: S?, id: Long): S? {
45+
// Try to create the first segment if [startFrom] is null.
46+
// This occurs if `createFirstSegmentLazily` was set to `true`.
47+
var startFrom = startFrom
48+
if (startFrom === null) {
49+
val firstSegment = newSegment(0)
50+
if (_head.compareAndSet(null, firstSegment))
51+
startFrom = firstSegment
52+
else {
53+
startFrom = first!!
54+
}
55+
}
56+
if (startFrom.id > id) return null
57+
// Go through `next` references and add new segments if needed,
58+
// similarly to the `push` in the Michael-Scott queue algorithm.
59+
// The only difference is that `CAS failure` means that the
60+
// required segment has already been added, so the algorithm just
61+
// uses it. This way, only one segment with each id can be in the queue.
62+
var cur: S = startFrom
63+
while (cur.id < id) {
64+
var curNext = cur.next.value
65+
if (curNext == null) {
66+
// Add a new segment.
67+
val newTail = newSegment(cur.id + 1, cur)
68+
curNext = if (cur.next.compareAndSet(null, newTail)) {
69+
if (cur.removed) {
70+
cur.remove()
71+
}
72+
moveTailForward(newTail)
73+
newTail
74+
} else {
75+
cur.next.value!!
76+
}
77+
}
78+
cur = curNext
79+
}
80+
if (cur.id != id) return null
81+
return cur
82+
}
83+
84+
/**
85+
* Invokes [getSegment] and replaces [first] with the result if its [id] is greater.
86+
*/
87+
protected fun getSegmentAndMoveFirst(startFrom: S?, id: Long): S? {
88+
if (startFrom !== null && startFrom.id == id) return startFrom
89+
val s = getSegment(startFrom, id) ?: return null
90+
moveHeadForward(s)
91+
return s
92+
}
93+
94+
/**
95+
* Updates [_head] to the specified segment
96+
* if its `id` is greater.
97+
*/
98+
private fun moveHeadForward(new: S) {
99+
while (true) {
100+
val cur = first!!
101+
if (cur.id > new.id) return
102+
if (_head.compareAndSet(cur, new)) {
103+
new.prev.value = null
104+
return
105+
}
106+
}
107+
}
108+
109+
/**
110+
* Updates [_tail] to the specified segment
111+
* if its `id` is greater.
112+
*/
113+
private fun moveTailForward(new: S) {
114+
while (true) {
115+
val cur = last
116+
if (cur !== null && cur.id > new.id) return
117+
if (_tail.compareAndSet(cur, new)) return
118+
}
119+
}
120+
}
121+
122+
/**
123+
* Each segment in [SegmentQueue] has a unique id and is created by [SegmentQueue.newSegment].
124+
* Essentially, this is a node in the Michael-Scott queue algorithm, but with
125+
* maintaining [prev] pointer for efficient [remove] implementation.
126+
*/
127+
internal abstract class Segment<S: Segment<S>>(val id: Long, prev: S?) {
128+
// Pointer to the next segment, updates similarly to the Michael-Scott queue algorithm.
129+
val next = atomic<S?>(null)
130+
// Pointer to the previous segment, updates in [remove] function.
131+
val prev = atomic<S?>(null)
132+
133+
/**
134+
* Returns `true` if this segment is logically removed from the queue.
135+
* The [remove] function should be called right after it becomes logically removed.
136+
*/
137+
abstract val removed: Boolean
138+
139+
init {
140+
this.prev.value = prev
141+
}
142+
143+
/**
144+
* Removes this segment physically from the segment queue. The segment should be
145+
* logically removed (so [removed] returns `true`) at the point of invocation.
146+
*/
147+
fun remove() {
148+
check(removed) { " The segment should be logically removed at first "}
149+
// Read `next` and `prev` pointers.
150+
val next = this.next.value ?: return // tail cannot be removed
151+
val prev = prev.value ?: return // head cannot be removed
152+
// Link `next` and `prev`.
153+
next.movePrevToLeft(prev)
154+
prev.movePrevNextToRight(next)
155+
// Check whether `prev` and `next` are still in the queue
156+
// and help with removing them if needed.
157+
if (prev.removed)
158+
prev.remove()
159+
if (next.removed)
160+
next.remove()
161+
}
162+
163+
/**
164+
* Updates [next] pointer to the specified segment if
165+
* the [id] of the specified segment is greater.
166+
*/
167+
private fun movePrevNextToRight(next: S) {
168+
while (true) {
169+
val curNext = this.next.value as S
170+
if (next.id <= curNext.id) return
171+
if (this.next.compareAndSet(curNext, next)) return
172+
}
173+
}
174+
175+
/**
176+
* Updates [prev] pointer to the specified segment if
177+
* the [id] of the specified segment is lower.
178+
*/
179+
private fun movePrevToLeft(prev: S) {
180+
while (true) {
181+
val curPrev = this.prev.value ?: return
182+
if (curPrev.id <= prev.id) return
183+
if (this.prev.compareAndSet(curPrev, prev)) return
184+
}
185+
}
186+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
package kotlinx.coroutines.sync
2+
3+
import kotlinx.atomicfu.*
4+
import kotlinx.coroutines.*
5+
import kotlinx.coroutines.internal.*
6+
import kotlin.coroutines.resume
7+
import kotlin.jvm.JvmField
8+
import kotlin.math.max
9+
10+
/**
11+
* A counting semaphore for coroutines. It maintains a number of available permits.
12+
* Each [acquire] suspends if necessary until a permit is available, and then takes it.
13+
* Each [release] adds a permit, potentially releasing a suspended acquirer.
14+
*
15+
* Semaphore with `maxPermits = 1` is essentially a [Mutex].
16+
**/
17+
public interface Semaphore {
18+
/**
19+
* Returns the current number of available permits available in this semaphore.
20+
*/
21+
public val availablePermits: Int
22+
23+
/**
24+
* Acquires a permit from this semaphore, suspending until one is available.
25+
* All suspending acquirers are processed in first-in-first-out (FIFO) order.
26+
*
27+
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
28+
* function is suspended, this function immediately resumes with [CancellationException].
29+
*
30+
* *Cancellation of suspended lock invocation is atomic* -- when this function
31+
* throws [CancellationException] it means that the mutex was not locked.
32+
*
33+
* Note, that this function does not check for cancellation when it is not suspended.
34+
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
35+
*
36+
* Use [tryAcquire] to try acquire a permit of this semaphore without suspension.
37+
*/
38+
public suspend fun acquire()
39+
40+
/**
41+
* Tries to acquire a permit from this semaphore without suspension.
42+
*
43+
* @return `true` if a permit was acquired, `false` otherwise.
44+
*/
45+
public fun tryAcquire(): Boolean
46+
47+
/**
48+
* Releases a permit, returning it into this semaphore. Resumes the first
49+
* suspending acquirer if there is one at the point of invocation.
50+
*/
51+
public fun release()
52+
}
53+
54+
/**
55+
* Creates new [Semaphore] instance.
56+
*/
57+
@Suppress("FunctionName")
58+
public fun Semaphore(maxPermits: Int, acquiredPermits: Int = 0): Semaphore = SemaphoreImpl(maxPermits, acquiredPermits)
59+
60+
/**
61+
* Executes the given [action] with acquiring a permit from this semaphore at the beginning
62+
* and releasing it after the [action] is completed.
63+
*
64+
* @return the return value of the [action].
65+
*/
66+
public suspend inline fun <T> Semaphore.withSemaphore(action: () -> T): T {
67+
acquire()
68+
try {
69+
return action()
70+
} finally {
71+
release()
72+
}
73+
}
74+
75+
private class SemaphoreImpl(@JvmField val maxPermits: Int, acquiredPermits: Int)
76+
: Semaphore, SegmentQueue<SemaphoreSegment>(createFirstSegmentLazily = true)
77+
{
78+
init {
79+
require(maxPermits > 0) { "Semaphore should have at least 1 permit"}
80+
require(acquiredPermits in 0..maxPermits) { "The number of acquired permits should be ≥ 0 and ≤ maxPermits" }
81+
}
82+
83+
override fun newSegment(id: Long, prev: SemaphoreSegment?)= SemaphoreSegment(id, prev)
84+
85+
/**
86+
* This counter indicates a number of available permits if it is non-negative,
87+
* or the size with minus sign otherwise. Note, that 32-bit counter is enough here
88+
* since the maximal number of available permits is [maxPermits] which is [Int],
89+
* and the maximum number of waiting acquirers cannot be greater than 2^31 in any
90+
* real application.
91+
*/
92+
private val _availablePermits = atomic(maxPermits)
93+
override val availablePermits: Int get() = max(_availablePermits.value, 0)
94+
95+
// The queue of waiting acquirers is essentially an infinite array based on `SegmentQueue`;
96+
// each segment contains a fixed number of slots. To determine a slot for each enqueue
97+
// and dequeue operation, we increment the corresponding counter at the beginning of the operation
98+
// and use the value before the increment as a slot number. This way, each enqueue-dequeue pair
99+
// works with an individual cell.
100+
private val enqIdx = atomic(0L)
101+
private val deqIdx = atomic(0L)
102+
103+
override fun tryAcquire(): Boolean {
104+
_availablePermits.loop { p ->
105+
if (p <= 0) return false
106+
if (_availablePermits.compareAndSet(p, p - 1)) return true
107+
}
108+
}
109+
110+
override suspend fun acquire() {
111+
val p = _availablePermits.getAndDecrement()
112+
if (p > 0) return // permit acquired
113+
addToQueueAndSuspend()
114+
}
115+
116+
override fun release() {
117+
val p = _availablePermits.getAndUpdate { cur ->
118+
check(cur < maxPermits) { "The number of acquired permits cannot be greater than maxPermits" }
119+
cur + 1
120+
}
121+
if (p >= 0) return // no waiters
122+
resumeNextFromQueue()
123+
}
124+
125+
private suspend fun addToQueueAndSuspend() = suspendAtomicCancellableCoroutine<Unit> sc@ { cont ->
126+
val last = this.last
127+
val enqIdx = enqIdx.getAndIncrement()
128+
val segment = getSegment(last, enqIdx / SEGMENT_SIZE)
129+
val i = (enqIdx % SEGMENT_SIZE).toInt()
130+
if (segment === null || segment[i].value === RESUMED || !segment[i].compareAndSet(null, cont)) {
131+
// already resumed
132+
cont.resume(Unit)
133+
return@sc
134+
}
135+
cont.invokeOnCancellation(handler = object : CancelHandler() {
136+
override fun invoke(cause: Throwable?) {
137+
segment.cancel(i)
138+
release()
139+
}
140+
}.asHandler)
141+
}
142+
143+
private fun resumeNextFromQueue() {
144+
val first = this.first
145+
val deqIdx = deqIdx.getAndIncrement()
146+
val segment = getSegmentAndMoveFirst(first, deqIdx / SEGMENT_SIZE) ?: return
147+
val i = (deqIdx % SEGMENT_SIZE).toInt()
148+
val cont = segment[i].getAndUpdate {
149+
if (it === CANCELLED) CANCELLED else RESUMED
150+
}
151+
if (cont === null) return // just resumed
152+
if (cont === CANCELLED) return // Cancelled continuation invokes `release`
153+
// and resumes next suspended acquirer if needed.
154+
cont as CancellableContinuation<Unit>
155+
cont.resume(Unit)
156+
}
157+
}
158+
159+
private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?): Segment<SemaphoreSegment>(id, prev) {
160+
private val acquirers = atomicArrayOfNulls<Any?>(SEGMENT_SIZE)
161+
162+
operator fun get(index: Int): AtomicRef<Any?> = acquirers[index]
163+
164+
private val cancelledSlots = atomic(0)
165+
override val removed get() = cancelledSlots.value == SEGMENT_SIZE
166+
167+
// Cleans the acquirer slot located by the specified index
168+
// and removes this segment physically if all slots are cleaned.
169+
fun cancel(index: Int) {
170+
// Clean the specified waiter
171+
acquirers[index].value = CANCELLED
172+
// Remove this segment if needed
173+
if (cancelledSlots.incrementAndGet() == SEGMENT_SIZE)
174+
remove()
175+
}
176+
}
177+
178+
@SharedImmutable
179+
private val RESUMED = Symbol("RESUMED")
180+
@SharedImmutable
181+
private val CANCELLED = Symbol("CANCELLED")
182+
@SharedImmutable
183+
private val SEGMENT_SIZE = systemProp("kotlinx.coroutines.semaphore.segmentSize", 32)

0 commit comments

Comments
 (0)