@@ -9,6 +9,7 @@ import kotlinx.coroutines.*
9
9
import kotlinx.coroutines.internal.*
10
10
import kotlinx.coroutines.selects.*
11
11
import kotlin.contracts.*
12
+ import kotlin.coroutines.*
12
13
import kotlin.jvm.*
13
14
14
15
/* *
@@ -131,7 +132,7 @@ public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T
131
132
}
132
133
133
134
134
- internal open class MutexImpl (locked : Boolean ) : SemaphoreImpl( 1 , if (locked) 1 else 0 ), Mutex {
135
+ internal open class MutexImpl (locked : Boolean ) : SegmentQueueSynchronizer<Unit>( ), Mutex {
135
136
/* *
136
137
* After the lock is acquired, the corresponding owner is stored in this field.
137
138
* The [unlock] operation checks the owner and either re-sets it to [NO_OWNER],
@@ -140,13 +141,15 @@ internal open class MutexImpl(locked: Boolean) : SemaphoreImpl(1, if (locked) 1
140
141
*/
141
142
private val owner = atomic<Any ?>(if (locked) null else NO_OWNER )
142
143
144
+ private val availablePermits = atomic(if (locked) 0 else 1 )
145
+
143
146
private val onSelectCancellationUnlockConstructor: OnCancellationConstructor =
144
147
{ _: SelectInstance <* >, owner: Any? , _: Any? ->
145
148
{ unlock(owner) }
146
149
}
147
150
148
151
override val isLocked: Boolean get() =
149
- availablePermits = = 0
152
+ availablePermits.value < = 0
150
153
151
154
override fun holdsLock (owner : Any ): Boolean {
152
155
while (true ) {
@@ -161,13 +164,84 @@ internal open class MutexImpl(locked: Boolean) : SemaphoreImpl(1, if (locked) 1
161
164
}
162
165
163
166
override suspend fun lock (owner : Any? ) {
164
- if (tryLock(owner)) return
167
+ // if (tryLock(owner)) return
165
168
lockSuspend(owner)
166
169
}
167
170
168
- private suspend fun lockSuspend (owner : Any? ) = suspendCancellableCoroutineReusable { cont ->
171
+ private suspend fun lockSuspend (owner : Any? ) = suspendCancellableCoroutineReusable<Unit > { cont ->
172
+ cont as CancellableContinuationImpl <Unit >
169
173
val contWithOwner = CancellableContinuationWithOwner (cont, owner)
170
- acquire(contWithOwner)
174
+ lockImpl(contWithOwner, owner)
175
+ }
176
+
177
+ private fun lockImpl (waiter : Waiter , owner : Any? ) {
178
+ xxx@ while (true ) {
179
+ // Get the current number of available permits.
180
+ val p = availablePermits.getAndDecrement()
181
+ // Try to decrement the number of available
182
+ // permits if it is greater than zero.
183
+ if (p <= 0 ) {
184
+ // The semaphore permit acquisition has failed.
185
+ // However, we need to check that this mutex is not
186
+ // locked by our owner.
187
+ if (owner != null ) {
188
+ // Is this mutex locked by our owner?
189
+ var curOwner = this .owner.value
190
+
191
+ if (curOwner == = owner) {
192
+ if (suspendCancelled() != null ) release()
193
+ when (waiter) {
194
+ is CancellableContinuation <* > -> {
195
+ waiter.resumeWithException(IllegalStateException (" ERROR" ))
196
+ }
197
+ is SelectInstance <* > -> {
198
+ waiter.selectInRegistrationPhase(ON_LOCK_ALREADY_LOCKED_BY_OWNER )
199
+ }
200
+ }
201
+ return
202
+ }
203
+
204
+ while (curOwner == = NO_OWNER ) {
205
+ curOwner = this .owner.value
206
+ if (! isLocked) {
207
+ if (suspendCancelled() != null ) release()
208
+ continue @xxx
209
+ }
210
+ }
211
+ if (curOwner == = owner) {
212
+ if (suspendCancelled() != null ) release()
213
+ when (waiter) {
214
+ is CancellableContinuation <* > -> {
215
+ waiter.resumeWithException(IllegalStateException (" ERROR" ))
216
+ }
217
+ is SelectInstance <* > -> {
218
+ waiter.selectInRegistrationPhase(ON_LOCK_ALREADY_LOCKED_BY_OWNER )
219
+ }
220
+ }
221
+ return
222
+ }
223
+ // This mutex is either locked by another owner or unlocked.
224
+ // In the latter case, it is possible that it WAS locked by
225
+ // our owner when the semaphore permit acquisition has failed.
226
+ // To preserve linearizability, the operation restarts in this case.
227
+ // if (!isLocked) continuex
228
+ }
229
+ if (suspend (waiter)) return
230
+ } else {
231
+ assert { p == 1 }
232
+ assert { this .owner.value == = NO_OWNER }
233
+ when (waiter) {
234
+ is CancellableContinuation <* > -> {
235
+ waiter as CancellableContinuation <Unit >
236
+ waiter.resume(Unit , null )
237
+ }
238
+ is SelectInstance <* > -> {
239
+ waiter.selectInRegistrationPhase(Unit )
240
+ }
241
+ }
242
+ return
243
+ }
244
+ }
171
245
}
172
246
173
247
override fun tryLock (owner : Any? ): Boolean = when (tryLockImpl(owner)) {
@@ -179,25 +253,27 @@ internal open class MutexImpl(locked: Boolean) : SemaphoreImpl(1, if (locked) 1
179
253
180
254
private fun tryLockImpl (owner : Any? ): Int {
181
255
while (true ) {
182
- if (tryAcquire()) {
183
- assert { this .owner. value == = NO_OWNER }
184
- this .owner.value = owner
185
- return TRY_LOCK_SUCCESS
186
- } else {
256
+ // Get the current number of available permits.
257
+ val p = availablePermits. value
258
+ // Try to decrement the number of available
259
+ // permits if it is greater than zero.
260
+ if (p <= 0 ) {
187
261
// The semaphore permit acquisition has failed.
188
262
// However, we need to check that this mutex is not
189
263
// locked by our owner.
190
264
if (owner != null ) {
191
265
// Is this mutex locked by our owner?
192
- if (holdsLock(owner)) return TRY_LOCK_ALREADY_LOCKED_BY_OWNER
193
- // This mutex is either locked by another owner or unlocked.
194
- // In the latter case, it is possible that it WAS locked by
195
- // our owner when the semaphore permit acquisition has failed.
196
- // To preserve linearizability, the operation restarts in this case.
197
- if (! isLocked) continue
266
+ val curOwner = this .owner.value
267
+ if (curOwner == = NO_OWNER ) continue
268
+ if (curOwner == = owner) return TRY_LOCK_ALREADY_LOCKED_BY_OWNER
198
269
}
199
270
return TRY_LOCK_FAILED
200
271
}
272
+ if (availablePermits.compareAndSet(p, p - 1 )) {
273
+ assert { this .owner.value == = NO_OWNER }
274
+ this .owner.value = owner
275
+ return TRY_LOCK_SUCCESS
276
+ }
201
277
}
202
278
}
203
279
@@ -218,6 +294,27 @@ internal open class MutexImpl(locked: Boolean) : SemaphoreImpl(1, if (locked) 1
218
294
}
219
295
}
220
296
297
+ fun release () {
298
+ while (true ) {
299
+ // Increment the number of available permits.
300
+ val p = availablePermits.value
301
+ // Is this `release` call correct and does not
302
+ // exceed the maximal number of permits?
303
+ if (p >= 1 ) {
304
+ error(" This mutex is not locked" )
305
+ }
306
+ if (availablePermits.compareAndSet(p, p + 1 )) {
307
+ // Is there a waiter that should be resumed?
308
+ if (p == 0 ) return
309
+ // Try to resume the first waiter, and
310
+ // restart the operation if either this
311
+ // first waiter is cancelled or
312
+ // due to `SYNC` resumption mode.
313
+ if (resume(Unit )) return
314
+ }
315
+ }
316
+ }
317
+
221
318
@Suppress(" UNCHECKED_CAST" , " OverridingDeprecatedMember" , " OVERRIDE_DEPRECATION" )
222
319
override val onLock: SelectClause2 <Any ?, Mutex > get() = SelectClause2Impl (
223
320
clauseObject = this ,
@@ -227,11 +324,7 @@ internal open class MutexImpl(locked: Boolean) : SemaphoreImpl(1, if (locked) 1
227
324
)
228
325
229
326
protected open fun onLockRegFunction (select : SelectInstance <* >, owner : Any? ) {
230
- if (owner != null && holdsLock(owner)) {
231
- select.selectInRegistrationPhase(ON_LOCK_ALREADY_LOCKED_BY_OWNER )
232
- } else {
233
- onAcquireRegFunction(SelectInstanceWithOwner (select, owner), owner)
234
- }
327
+ lockImpl(SelectInstanceWithOwner (select as SelectInstanceInternal <* >, owner), owner)
235
328
}
236
329
237
330
protected open fun onLockProcessResult (owner : Any? , result : Any? ): Any? {
@@ -243,10 +336,10 @@ internal open class MutexImpl(locked: Boolean) : SemaphoreImpl(1, if (locked) 1
243
336
244
337
private inner class CancellableContinuationWithOwner (
245
338
@JvmField
246
- val cont : CancellableContinuation <Unit >,
339
+ val cont : CancellableContinuationImpl <Unit >,
247
340
@JvmField
248
341
val owner : Any?
249
- ) : CancellableContinuation<Unit> by cont {
342
+ ) : CancellableContinuation<Unit> by cont, Waiter by cont {
250
343
override fun tryResume (value : Unit , idempotent : Any? , onCancellation : ((cause: Throwable ) -> Unit )? ): Any? {
251
344
assert { this @MutexImpl.owner.value == = NO_OWNER }
252
345
val token = cont.tryResume(value, idempotent) {
@@ -270,10 +363,10 @@ internal open class MutexImpl(locked: Boolean) : SemaphoreImpl(1, if (locked) 1
270
363
271
364
private inner class SelectInstanceWithOwner <Q >(
272
365
@JvmField
273
- val select : SelectInstance <Q >,
366
+ val select : SelectInstanceInternal <Q >,
274
367
@JvmField
275
368
val owner : Any?
276
- ) : SelectInstanceInternal<Q> by select as SelectInstanceInternal<Q> {
369
+ ) : SelectInstanceInternal<Q> by select {
277
370
override fun trySelect (clauseObject : Any , result : Any? ): Boolean {
278
371
assert { this @MutexImpl.owner.value == = NO_OWNER }
279
372
return select.trySelect(clauseObject, result).also { success ->
@@ -282,12 +375,16 @@ internal open class MutexImpl(locked: Boolean) : SemaphoreImpl(1, if (locked) 1
282
375
}
283
376
284
377
override fun selectInRegistrationPhase (internalResult : Any? ) {
285
- assert { this @MutexImpl.owner.value == = NO_OWNER }
286
- this @MutexImpl.owner.value = owner
378
+ if (internalResult != = ON_LOCK_ALREADY_LOCKED_BY_OWNER ) {
379
+ assert { this @MutexImpl.owner.value == = NO_OWNER }
380
+ this @MutexImpl.owner.value = owner
381
+ }
287
382
select.selectInRegistrationPhase(internalResult)
288
383
}
289
384
}
290
385
386
+ internal val debugStateRepresentation: String get() = " p=${availablePermits.value} ,owner=${owner.value} ,SQS=${super .toString()} "
387
+
291
388
override fun toString () = " Mutex@${hexAddress} [isLocked=$isLocked ,owner=${owner.value} ]"
292
389
}
293
390
0 commit comments