Skip to content

Commit b5e4718

Browse files
committed
~ Easier to understand code & style
1 parent 2103151 commit b5e4718

File tree

1 file changed

+64
-51
lines changed

1 file changed

+64
-51
lines changed

kotlinx-coroutines-core/common/src/sync/Semaphore.kt

+64-51
Original file line numberDiff line numberDiff line change
@@ -84,40 +84,41 @@ public suspend inline fun <T> Semaphore.withPermit(action: () -> T): T {
8484
}
8585

8686
private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Semaphore {
87+
/*
88+
The queue of waiting acquirers is essentially an infinite array based on the list of segments
89+
(see `SemaphoreSegment`); each segment contains a fixed number of slots. To determine a slot for each enqueue
90+
and dequeue operation, we increment the corresponding counter at the beginning of the operation
91+
and use the value before the increment as a slot number. This way, each enqueue-dequeue pair
92+
works with an individual cell. We use the corresponding segment pointers to find the required ones.
93+
94+
Here is a state machine for cells. Note that only one `acquire` and at most one `release` operation
95+
can deal with each cell, and that `release` uses `getAndSet(PERMIT)` to perform transitions for performance reasons
96+
so that the state `PERMIT` represents different logical states.
97+
98+
+------+ `acquire` suspends +------+ `release` tries +--------+ // if `cont.tryResume(..)` succeeds, then
99+
| NULL | -------------------> | cont | -------------------> | PERMIT | (cont RETRIEVED) // the corresponding `acquire` operation gets
100+
+------+ +------+ to resume `cont` +--------+ // a permit and the `release` one completes.
101+
| |
102+
| | `acquire` request is cancelled and the continuation is
103+
| `release` comes | replaced with a special `CANCEL` token to avoid memory leaks
104+
| to the slot before V
105+
| `acquire` and puts +-----------+ `release` has +--------+
106+
| a permit into the | CANCELLED | -----------------> | PERMIT | (RElEASE FAILED)
107+
| slot, waiting for +-----------+ failed +--------+
108+
| `acquire` after
109+
| that.
110+
|
111+
| `acquire` gets +-------+
112+
| +-----------------> | TAKEN | (ELIMINATION HAPPENED)
113+
V | the permit +-------+
114+
+--------+ |
115+
| PERMIT | -<
116+
+--------+ |
117+
| `release` has waited a bounded time, +--------+
118+
+---------------------------------------> | BROKEN | (BOTH RELEASE AND ACQUIRE FAILED)
119+
but `acquire` has not come +--------+
120+
*/
87121

88-
// The queue of waiting acquirers is essentially an infinite array based on the list of segments
89-
// (see `SemaphoreSegment`); each segment contains a fixed number of slots. To determine a slot for each enqueue
90-
// and dequeue operation, we increment the corresponding counter at the beginning of the operation
91-
// and use the value before the increment as a slot number. This way, each enqueue-dequeue pair
92-
// works with an individual cell. We use the corresponding segment pointers to find the required ones.
93-
//
94-
// Here is a state machine for cells. Note that only one `acquire` and at most one `release` operation
95-
// can deal with each cell, and that `release` uses `getAndSet(PERMIT)` to perform transitions for perfomance reasons
96-
// so that the state `PERMIT` represents different logical states.
97-
//
98-
// +------+ `acquire` suspends +------+ `release` tries +--------+ // if `cont.tryResume(..)` succeeds, then
99-
// | NULL | -------------------> | cont | -------------------> | PERMIT | (cont RETRIEVED) // the corresponding `acquire` operation gets
100-
// +------+ +------+ to resume `cont` +--------+ // a permit and the `release` one completes.
101-
// | |
102-
// | | `acquire` request is cancelled and the continuation is
103-
// | `release` comes | replaced with a special `CANCEL` token to avoid memory leaks
104-
// | to the slot before V
105-
// | `acquire` and puts +-----------+ `release` has +--------+
106-
// | a permit into the | CANCELLED | -----------------> | PERMIT | (RElEASE FAILED)
107-
// | slot, waiting for +-----------+ been failed +--------+
108-
// | `acquire` after
109-
// | that.
110-
// |
111-
// | `acquire` gets +-------+
112-
// | +-----------------> | TAKEN | (ELIMINATION HAPPENED)
113-
// V | the permit +-------+
114-
// +--------+ |
115-
// | PERMIT | -<
116-
// +--------+ |
117-
// | `release` has waited a bounded time, +--------+
118-
// +---------------------------------------> | BROKEN | (BOTH RELEASE AND ACQUIRE FAILED)
119-
// but `acquire` has not come +--------+
120-
//
121122
private val head: AtomicRef<SemaphoreSegment>
122123
private val deqIdx = atomic(0L)
123124
private val tail: AtomicRef<SemaphoreSegment>
@@ -159,7 +160,7 @@ private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Se
159160

160161
private suspend fun acquireSlowPath() = suspendAtomicCancellableCoroutineReusable<Unit> sc@ { cont ->
161162
while (true) {
162-
if (addToQueueAndSuspend(cont)) return@sc
163+
if (addAcquireToQueue(cont)) return@sc
163164
val p = _availablePermits.getAndDecrement()
164165
if (p > 0) { // permit acquired
165166
cont.resume(Unit)
@@ -182,20 +183,29 @@ private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Se
182183
/**
183184
* Returns `false` if the received permit cannot be used and the calling operation should restart.
184185
*/
185-
private fun addToQueueAndSuspend(cont: CancellableContinuation<Unit>): Boolean {
186+
private fun addAcquireToQueue(cont: CancellableContinuation<Unit>): Boolean {
186187
val curTail = this.tail.value
187188
val enqIdx = enqIdx.getAndIncrement()
188189
val segment = this.tail.findSegmentAndMoveForward(id = enqIdx / SEGMENT_SIZE, startFrom = curTail,
189190
createNewSegment = ::createSegment).segment // cannot be closed
190191
val i = (enqIdx % SEGMENT_SIZE).toInt()
191-
if (segment.get(i) === PERMIT || !segment.cas(i, null, cont)) {
192-
// The permit is already in the queue, try to grab it
193-
val acquired = segment.cas(i, PERMIT, TAKEN)
194-
if (acquired) cont.resume(Unit)
195-
return acquired
192+
while (true) { // cas loop on cell state
193+
val cellState = segment.get(i)
194+
when {
195+
cellState === null -> // the cell if empty, try to install continuation
196+
if (segment.cas(i, null, cont)) { // fast path -- installed continuation successfully
197+
cont.invokeOnCancellation(CancelSemaphoreAcquisitionHandler(segment, i).asHandler)
198+
return true
199+
}
200+
cellState === PERMIT -> // the cell already has permit from tryResumeNextFromQueue, try to grab it
201+
if (segment.cas(i, PERMIT, TAKEN)) { // took permit thus eliminating acquire/release pair
202+
cont.resume(Unit)
203+
return true
204+
}
205+
cellState === BROKEN -> return false // broken cell, need to retry on a different cell
206+
else -> error("Invalid state $cellState") // this cannot happen
207+
}
196208
}
197-
cont.invokeOnCancellation(CancelSemaphoreAcquisitionHandler(segment, i).asHandler)
198-
return true
199209
}
200210

201211
@Suppress("UNCHECKED_CAST")
@@ -208,17 +218,20 @@ private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Se
208218
segment.cleanPrev()
209219
if (segment.id > id) return false
210220
val i = (deqIdx % SEGMENT_SIZE).toInt()
211-
val cont = segment.getAndSet(i, PERMIT)
212-
if (cont === CANCELLED) return false
213-
if (cont === null) {
214-
// Wait until an opposite operation comes for a bounded time
215-
repeat(MAX_SPIN_CYCLES) {
216-
if (segment.get(i) === TAKEN) return true
221+
val cellState = segment.getAndSet(i, PERMIT) // set PERMIT and retrieve the prev cell state
222+
when {
223+
cellState === null -> {
224+
// Acquire has not touched this cell yet, wait until it comes for a bounded time
225+
// The cell state can only transition from PERMIT to TAKEN by addAcquireToQueue
226+
repeat(MAX_SPIN_CYCLES) {
227+
if (segment.get(i) === TAKEN) return true
228+
}
229+
// Try to break the slot in order not to wait
230+
return !segment.cas(i, PERMIT, BROKEN)
217231
}
218-
// Try to break the slot in order not to wait
219-
return !segment.cas(i, PERMIT, BROKEN)
232+
cellState === CANCELLED -> return false // the acquire was already cancelled
233+
else -> return (cellState as CancellableContinuation<Unit>).tryResume()
220234
}
221-
return (cont as CancellableContinuation<Unit>).tryResume()
222235
}
223236
}
224237

0 commit comments

Comments
 (0)