1
1
package kotlinx.coroutines.sync
2
2
3
- import kotlinx.atomicfu.atomic
4
- import kotlinx.atomicfu.atomicArrayOfNulls
5
- import kotlinx.atomicfu.getAndUpdate
6
- import kotlinx.atomicfu.loop
3
+ import kotlinx.atomicfu.*
7
4
import kotlinx.coroutines.*
8
5
import kotlinx.coroutines.internal.*
9
- import kotlin.coroutines.resume
10
- import kotlin.math.max
6
+ import kotlin.coroutines.*
7
+ import kotlin.jvm.*
8
+ import kotlin.math.*
11
9
12
10
/* *
13
- * A counting semaphore for coroutines. It maintains a number of available permits.
14
- * Each [acquire] suspends if necessary until a permit is available, and then takes it .
11
+ * A counting semaphore for coroutines that logically maintains a number of available permits.
12
+ * Each [acquire] takes a single permit or suspends until it is available .
15
13
* Each [release] adds a permit, potentially releasing a suspended acquirer.
14
+ * Semaphore is fair and maintains a FIFO order of acquirers.
16
15
*
16
+ * Semaphores are mostly used to limit the number of coroutines that have an access to particular resource.
17
17
* Semaphore with `permits = 1` is essentially a [Mutex].
18
18
**/
19
19
public interface Semaphore {
@@ -29,11 +29,12 @@ public interface Semaphore {
29
29
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
30
30
* function is suspended, this function immediately resumes with [CancellationException].
31
31
*
32
- * *Cancellation of suspended semaphore acquisition` is atomic* -- when this function
32
+ * *Cancellation of suspended semaphore acquisition is atomic* -- when this function
33
33
* throws [CancellationException] it means that the semaphore was not acquired.
34
34
*
35
- * Note, that this function does not check for cancellation when it is not suspended.
36
- * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
35
+ * Note, that this function does not check for cancellation when it does not suspend.
36
+ * Use [CoroutineScope.isActive] or [CoroutineScope.ensureActive] to periodically
37
+ * check for cancellation in tight loops if needed.
37
38
*
38
39
* Use [tryAcquire] to try acquire a permit of this semaphore without suspension.
39
40
*/
@@ -49,8 +50,7 @@ public interface Semaphore {
49
50
/* *
50
51
* Releases a permit, returning it into this semaphore. Resumes the first
51
52
* suspending acquirer if there is one at the point of invocation.
52
- * Throws [IllegalStateException] if there is no acquired permit
53
- * at the point of invocation.
53
+ * Throws [IllegalStateException] if the number of [release] invocations is greater than the number of preceding [acquire].
54
54
*/
55
55
public fun release ()
56
56
}
@@ -83,8 +83,8 @@ private class SemaphoreImpl(
83
83
private val permits : Int , acquiredPermits : Int
84
84
) : Semaphore, SegmentQueue<SemaphoreSegment>() {
85
85
init {
86
- require(permits > 0 ) { " Semaphore should have at least 1 permit" }
87
- require(acquiredPermits in 0 .. permits) { " The number of acquired permits should be in 0..permits" }
86
+ require(permits > 0 ) { " Semaphore should have at least 1 permit, but had $permits " }
87
+ require(acquiredPermits in 0 .. permits) { " The number of acquired permits should be in 0..$ permits" }
88
88
}
89
89
90
90
override fun newSegment (id : Long , prev : SemaphoreSegment ? ) = SemaphoreSegment (id, prev)
@@ -126,8 +126,8 @@ private class SemaphoreImpl(
126
126
resumeNextFromQueue()
127
127
}
128
128
129
- internal fun incPermits () = _availablePermits .getAndUpdate { cur ->
130
- check(cur < permits) { " The number of acquired permits cannot be greater than ` permits` " }
129
+ fun incPermits () = _availablePermits .getAndUpdate { cur ->
130
+ check(cur < permits) { " The number of released permits cannot be greater than $ permits" }
131
131
cur + 1
132
132
}
133
133
@@ -176,6 +176,8 @@ private class CancelSemaphoreAcquisitionHandler(
176
176
177
177
private class SemaphoreSegment (id : Long , prev : SemaphoreSegment ? ): Segment<SemaphoreSegment>(id, prev) {
178
178
val acquirers = atomicArrayOfNulls<Any ?>(SEGMENT_SIZE )
179
+ private val cancelledSlots = atomic(0 )
180
+ override val removed get() = cancelledSlots.value == SEGMENT_SIZE
179
181
180
182
@Suppress(" NOTHING_TO_INLINE" )
181
183
inline fun get (index : Int ): Any? = acquirers[index].value
@@ -186,9 +188,6 @@ private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?): Segment<Semap
186
188
@Suppress(" NOTHING_TO_INLINE" )
187
189
inline fun getAndSet (index : Int , value : Any? ) = acquirers[index].getAndSet(value)
188
190
189
- private val cancelledSlots = atomic(0 )
190
- override val removed get() = cancelledSlots.value == SEGMENT_SIZE
191
-
192
191
// Cleans the acquirer slot located by the specified index
193
192
// and removes this segment physically if all slots are cleaned.
194
193
fun cancel (index : Int ): Boolean {
0 commit comments