Skip to content

Commit 6be7f54

Browse files
committed
Add documentation + small fixes
1 parent 800e4d3 commit 6be7f54

File tree

4 files changed

+179
-198
lines changed

4 files changed

+179
-198
lines changed

kotlinx-coroutines-core/common/src/internal/SegmentQueue.kt

+66-45
Original file line numberDiff line numberDiff line change
@@ -3,35 +3,62 @@ package kotlinx.coroutines.internal
33
import kotlinx.atomicfu.AtomicRef
44
import kotlinx.atomicfu.atomic
55

6-
internal abstract class SegmentQueue<S: Segment<S>>(createFirstSegment: Boolean = true) {
6+
/**
7+
* Essentially, this segment queue is an infinite array of segments, which is represented as
8+
* a Michael-Scott queue of them. All segments are instances of [Segment] interface and
9+
* follow in natural order (see [Segment.id]) in the queue.
10+
*
11+
* In some data structures, like `Semaphore`, this queue is used for storing suspended continuations
12+
* and is always empty in uncontended scenarios. Therefore, there is no need in creating
13+
* the first segment in advance in this case. A special `createFirstSegmentLazily` is introduced
14+
* to create segments lazily, on the first [getSegment] invocation; it is set to `false` by default.
15+
*/
16+
internal abstract class SegmentQueue<S: Segment<S>>(createFirstSegmentLazily: Boolean = false) {
717
private val _head: AtomicRef<S?>
8-
protected val head: S? get() = _head.value
18+
/**
19+
* Returns the first segment in the queue. All segments with lower [id]
20+
*/
21+
protected val first: S? get() = _head.value
922

1023
private val _tail: AtomicRef<S?>
11-
protected val tail: S? get() = _tail.value
24+
protected val last: S? get() = _tail.value
1225

1326
init {
14-
val initialSegment = if (createFirstSegment) newSegment(0) else null
27+
val initialSegment = if (createFirstSegmentLazily) null else newSegment(0)
1528
_head = atomic(initialSegment)
1629
_tail = atomic(initialSegment)
1730
}
1831

32+
/**
33+
* The implementation should create an instance of segment [S] with the specified id
34+
* and initial reference to the previous one.
35+
*/
1936
abstract fun newSegment(id: Long, prev: S? = null): S
2037

38+
/**
39+
* Finds a segment with the specified [id] following by next references from the
40+
* [startFrom] segment. The typical use-case is reading [last] or [first], doing some
41+
* synchronization, and invoking [getSegment] or [getSegmentAndMoveFirst] correspondingly
42+
* to find the required segment.
43+
*/
2144
protected fun getSegment(startFrom: S?, id: Long): S? {
45+
// Try to create the first segment if [startFrom] is null.
46+
// This occurs if `createFirstSegmentLazily` was set to `true`.
2247
var startFrom = startFrom
2348
if (startFrom === null) {
2449
val firstSegment = newSegment(0)
2550
if (_head.compareAndSet(null, firstSegment))
2651
startFrom = firstSegment
2752
else {
28-
startFrom = head!!
53+
startFrom = first!!
2954
}
3055
}
3156
if (startFrom.id > id) return null
32-
// This method goes through `next` references and
33-
// adds new segments if needed, similarly to the `push` in
34-
// the Michael-Scott queue algorithm.
57+
// Go through `next` references and add new segments if needed,
58+
// similarly to the `push` in the Michael-Scott queue algorithm.
59+
// The only difference is that `CAS failure` means that the
60+
// required segment has already been added, so the algorithm just
61+
// uses it. This way, only one segment with each id can be in the queue.
3562
var cur: S = startFrom
3663
while (cur.id < id) {
3764
var curNext = cur.next.value
@@ -54,20 +81,23 @@ internal abstract class SegmentQueue<S: Segment<S>>(createFirstSegment: Boolean
5481
return cur
5582
}
5683

57-
protected fun getSegmentAndMoveHeadForward(startFrom: S?, id: Long): S? {
84+
/**
85+
* Invokes [getSegment] and replaces [first] with the result if its [id] is greater.
86+
*/
87+
protected fun getSegmentAndMoveFirst(startFrom: S?, id: Long): S? {
5888
if (startFrom !== null && startFrom.id == id) return startFrom
5989
val s = getSegment(startFrom, id) ?: return null
6090
moveHeadForward(s)
6191
return s
6292
}
6393

6494
/**
65-
* Updates [head] to the specified segment
95+
* Updates [_head] to the specified segment
6696
* if its `id` is greater.
6797
*/
6898
private fun moveHeadForward(new: S) {
6999
while (true) {
70-
val cur = head!!
100+
val cur = first!!
71101
if (cur.id > new.id) return
72102
if (_head.compareAndSet(cur, new)) {
73103
new.prev.value = null
@@ -77,71 +107,62 @@ internal abstract class SegmentQueue<S: Segment<S>>(createFirstSegment: Boolean
77107
}
78108

79109
/**
80-
* Updates [tail] to the specified segment
110+
* Updates [_tail] to the specified segment
81111
* if its `id` is greater.
82112
*/
83113
private fun moveTailForward(new: S) {
84114
while (true) {
85-
val cur = tail
115+
val cur = last
86116
if (cur !== null && cur.id > new.id) return
87117
if (_tail.compareAndSet(cur, new)) return
88118
}
89119
}
90120
}
91121

122+
/**
123+
* Each segment in [SegmentQueue] has a unique id and is created by [SegmentQueue.newSegment].
124+
* Essentially, this is a node in the Michael-Scott queue algorithm, but with
125+
* maintaining [prev] pointer for efficient [remove] implementation.
126+
*/
92127
internal abstract class Segment<S: Segment<S>>(val id: Long, prev: S?) {
93-
// Pointer to the next segments, updates
94-
// similarly to the Michael-Scott queue algorithm.
95-
val next = atomic<S?>(null) // null (not set) | Segment | CLOSED
96-
// Pointer to the previous non-empty segment (can be null!),
97-
// updates lazily (see `remove()` function).
128+
// Pointer to the next segment, updates similarly to the Michael-Scott queue algorithm.
129+
val next = atomic<S?>(null)
130+
// Pointer to the previous segment, updates in [remove] function.
98131
val prev = atomic<S?>(null)
99132

133+
/**
134+
* Returns `true` if this segment is logically removed from the queue.
135+
* The [remove] function should be called right after it becomes logically removed.
136+
*/
100137
abstract val removed: Boolean
101138

102139
init {
103140
this.prev.value = prev
104141
}
105142

106143
/**
107-
* Removes this node from the waiting queue and cleans all references to it.
144+
* Removes this segment physically from the segment queue. The segment should be
145+
* logically removed (so [removed] returns `true`) at the point of invocation.
108146
*/
109147
fun remove() {
110148
check(removed) { " The segment should be logically removed at first "}
111-
val next = this.next.value ?: return // tail can't be removed
112-
// Find the first non-removed node (tail is always non-removed)
149+
// Read `next` and `prev` pointers.
150+
val next = this.next.value ?: return // tail cannot be removed
113151
val prev = prev.value ?: return // head cannot be removed
152+
// Link `next` and `prev`.
114153
next.movePrevToLeft(prev)
115154
prev.movePrevNextToRight(next)
155+
// Check whether `prev` and `next` are still in the queue
156+
// and help with removing them if needed.
116157
if (prev.removed)
117158
prev.remove()
118159
if (next.removed)
119160
next.remove()
120-
121-
// while (next.removed) {
122-
// next = next.next.value ?: return
123-
// }
124-
// // Find the first non-removed `prev` and remove this node
125-
// var prev = prev.value
126-
// while (true) {
127-
// if (prev === null) {
128-
// next.prev.value = null
129-
// return
130-
// }
131-
// if (prev.removed) {
132-
// prev = prev.prev.value
133-
// continue
134-
// }
135-
// next.movePrevToLeft(prev)
136-
// prev.movePrevNextToRight(next)
137-
// if (next.removed || !prev.removed) return
138-
// prev = prev.prev.value
139-
// }
140161
}
141162

142163
/**
143-
* Update [Segment.next] pointer to the specified one if
144-
* the `id` of the specified segment is greater.
164+
* Updates [next] pointer to the specified segment if
165+
* the [id] of the specified segment is greater.
145166
*/
146167
private fun movePrevNextToRight(next: S) {
147168
while (true) {
@@ -152,8 +173,8 @@ internal abstract class Segment<S: Segment<S>>(val id: Long, prev: S?) {
152173
}
153174

154175
/**
155-
* Update [Segment.prev] pointer to the specified segment if
156-
* its `id` is lower.
176+
* Updates [prev] pointer to the specified segment if
177+
* the [id] of the specified segment is lower.
157178
*/
158179
private fun movePrevToLeft(prev: S) {
159180
while (true) {

kotlinx-coroutines-core/common/src/sync/Semaphore.kt

+37-32
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package kotlinx.coroutines.sync
33
import kotlinx.atomicfu.*
44
import kotlinx.coroutines.*
55
import kotlinx.coroutines.internal.*
6-
import kotlinx.coroutines.selects.select
76
import kotlin.coroutines.resume
87
import kotlin.jvm.JvmField
98
import kotlin.math.max
@@ -74,24 +73,30 @@ public suspend inline fun <T> Semaphore.withSemaphore(action: () -> T): T {
7473
}
7574

7675
private class SemaphoreImpl(@JvmField val maxPermits: Int, acquiredPermits: Int)
77-
: Semaphore, SegmentQueue<SemaphoreSegment>(createFirstSegment = false)
76+
: Semaphore, SegmentQueue<SemaphoreSegment>(createFirstSegmentLazily = true)
7877
{
7978
init {
8079
require(maxPermits > 0) { "Semaphore should have at least 1 permit"}
81-
require(acquiredPermits in 0..maxPermits) { "TODO" }
80+
require(acquiredPermits in 0..maxPermits) { "The number of acquired permits should be ≥ 0 and ≤ maxPermits" }
8281
}
8382

8483
override fun newSegment(id: Long, prev: SemaphoreSegment?)= SemaphoreSegment(id, prev)
8584

85+
/**
86+
* This counter indicates a number of available permits if it is non-negative,
87+
* or the size with minus sign otherwise. Note, that 32-bit counter is enough here
88+
* since the maximal number of available permits is [maxPermits] which is [Int],
89+
* and the maximum number of waiting acquirers cannot be greater than 2^31 in any
90+
* real application.
91+
*/
8692
private val _availablePermits = atomic(maxPermits)
87-
override val availablePermits: Int get() = max(_availablePermits.value.toInt(), 0)
88-
89-
// Queue of waiting `acquire` requests. The queue is represented via enqueue and dequeue
90-
// indices (`enqIdx` and `deqIdx` respectively) and infinite array. Each enqueue and dequeue
91-
// operations increment the corresponding counter and goes to the corresponding unique cell.
92-
// Due to the fact that we do not need to really send anything through this queue, we can make
93-
// this queue wait-free. In order to do this,
93+
override val availablePermits: Int get() = max(_availablePermits.value, 0)
9494

95+
// The queue of waiting acquirers is essentially an infinite array based on `SegmentQueue`;
96+
// each segment contains a fixed number of slots. To determine a slot for each enqueue
97+
// and dequeue operation, we increment the corresponding counter at the beginning of the operation
98+
// and use the value before the increment as a slot number. This way, each enqueue-dequeue pair
99+
// works with an individual cell.
95100
private val enqIdx = atomic(0L)
96101
private val deqIdx = atomic(0L)
97102

@@ -110,69 +115,69 @@ private class SemaphoreImpl(@JvmField val maxPermits: Int, acquiredPermits: Int)
110115

111116
override fun release() {
112117
val p = _availablePermits.getAndUpdate { cur ->
113-
check(cur < maxPermits) { "Cannot TODO" }
118+
check(cur < maxPermits) { "The number of acquired permits cannot be greater than maxPermits" }
114119
cur + 1
115120
}
116121
if (p >= 0) return // no waiters
117122
resumeNextFromQueue()
118123
}
119124

120125
private suspend fun addToQueueAndSuspend() = suspendAtomicCancellableCoroutine<Unit> sc@ { cont ->
121-
val tail = this.tail
126+
val last = this.last
122127
val enqIdx = enqIdx.getAndIncrement()
123-
val segment = getSegment(tail, enqIdx / SEGMENT_SIZE)
128+
val segment = getSegment(last, enqIdx / SEGMENT_SIZE)
124129
val i = (enqIdx % SEGMENT_SIZE).toInt()
125130
if (segment === null || segment[i].value === RESUMED || !segment[i].compareAndSet(null, cont)) {
131+
// already resumed
126132
cont.resume(Unit)
127133
return@sc
128134
}
129135
cont.invokeOnCancellation(handler = object : CancelHandler() {
130136
override fun invoke(cause: Throwable?) {
131-
segment.clean(i)
137+
segment.cancel(i)
132138
release()
133139
}
134140
}.asHandler)
135141
}
136142

137143
private fun resumeNextFromQueue() {
138-
val head = this.head
144+
val first = this.first
139145
val deqIdx = deqIdx.getAndIncrement()
140-
val segment = getSegmentAndMoveHeadForward(head, deqIdx / SEGMENT_SIZE) ?: return
146+
val segment = getSegmentAndMoveFirst(first, deqIdx / SEGMENT_SIZE) ?: return
141147
val i = (deqIdx % SEGMENT_SIZE).toInt()
142148
val cont = segment[i].getAndUpdate {
143-
if (it === CLEANED) it else RESUMED
149+
if (it === CANCELLED) CANCELLED else RESUMED
144150
}
145-
if (cont === CLEANED) return
151+
if (cont === null) return // just resumed
152+
if (cont === CANCELLED) return // Cancelled continuation invokes `release`
153+
// and resumes next suspended acquirer if needed.
146154
cont as CancellableContinuation<Unit>
147155
cont.resume(Unit)
148156
}
149157
}
150158

151159
private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?): Segment<SemaphoreSegment>(id, prev) {
152-
// == Waiters Array ==
153-
private val waiters = atomicArrayOfNulls<Any?>(SEGMENT_SIZE)
160+
private val acquirers = atomicArrayOfNulls<Any?>(SEGMENT_SIZE)
154161

155-
operator fun get(index: Int): AtomicRef<Any?> = waiters[index]
162+
operator fun get(index: Int): AtomicRef<Any?> = acquirers[index]
156163

157-
private val cleaned = atomic(0)
158-
override val removed get() = cleaned.value == SEGMENT_SIZE
164+
private val cancelledSlots = atomic(0)
165+
override val removed get() = cancelledSlots.value == SEGMENT_SIZE
159166

160-
/**
161-
* Cleans the waiter located by the specified index in this segment.
162-
*/
163-
fun clean(index: Int) {
164-
// Clean the specified waiter and
165-
// check if all node items are cleaned.
166-
waiters[index].value = CLEANED
167+
// Cleans the acquirer slot located by the specified index
168+
// and removes this segment physically if all slots are cleaned.
169+
fun cancel(index: Int) {
170+
// Clean the specified waiter
171+
acquirers[index].value = CANCELLED
167172
// Remove this segment if needed
168-
if (cleaned.incrementAndGet() == SEGMENT_SIZE)
173+
if (cancelledSlots.incrementAndGet() == SEGMENT_SIZE)
169174
remove()
170175
}
171176
}
172177

173178
@SharedImmutable
174179
private val RESUMED = Symbol("RESUMED")
175180
@SharedImmutable
176-
private val CLEANED = Symbol("CLEANED")
181+
private val CANCELLED = Symbol("CANCELLED")
177182
@SharedImmutable
178183
private val SEGMENT_SIZE = systemProp("kotlinx.coroutines.semaphore.segmentSize", 32)

0 commit comments

Comments
 (0)