Skip to content

Commit 638760f

Browse files
committed
In progress
Signed-off-by: Nikita Koval <[email protected]>
1 parent 34b7b98 commit 638760f

File tree

7 files changed

+74
-56
lines changed

7 files changed

+74
-56
lines changed

kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ internal val RESUME_TOKEN = Symbol("RESUME_TOKEN")
3333
internal open class CancellableContinuationImpl<in T>(
3434
final override val delegate: Continuation<T>,
3535
resumeMode: Int
36-
) : DispatchedTask<T>(resumeMode), CancellableContinuation<T>, CoroutineStackFrame, Waiter {
36+
) : DispatchedTask<T>(resumeMode), CancellableContinuation<T>, CoroutineStackFrame, Waiter, SegmentDisposable {
3737
init {
3838
assert { resumeMode != MODE_UNINITIALIZED } // invalid mode for CancellableContinuationImpl
3939
}
@@ -246,7 +246,7 @@ internal open class CancellableContinuationImpl<in T>(
246246
private fun callSegmentOnCancellation(segment: Segment<*>, cause: Throwable?) {
247247
val index = _decisionAndIndex.value.index
248248
check(index != NO_INDEX) { "The index for segment.invokeOnCancellation(..) is broken" }
249-
callCancelHandlerSafely { segment.invokeOnCancellation(index, cause) }
249+
callCancelHandlerSafely { segment.onCancellation(index, cause) }
250250
}
251251

252252

@@ -372,11 +372,11 @@ internal open class CancellableContinuationImpl<in T>(
372372
* [segment] and [index] in this [CancellableContinuationImpl].
373373
* ```
374374
* invokeOnCancellation { cause ->
375-
* segment.invokeOnCancellation(index, cause)
375+
* segment.onCancellation(index, cause)
376376
* }
377377
* ```
378378
*/
379-
internal fun invokeOnCancellation(segment: Segment<*>, index: Int) {
379+
override fun disposeOnCancellation(segment: Segment<*>, index: Int) {
380380
_decisionAndIndex.update {
381381
check(it.index == NO_INDEX) {
382382
"invokeOnCancellation should be invoked at most once"

kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt

+12-16
Original file line numberDiff line numberDiff line change
@@ -175,24 +175,13 @@ internal open class BufferedChannel<E>(
175175
index: Int
176176
) {
177177
if (onUndeliveredElement == null) {
178-
invokeOnCancellation(SenderOrReceiverCancellationHandler(segment, index).asHandler)
178+
this as CancellableContinuationImpl<*>
179+
disposeOnCancellation(segment, index)
179180
} else {
180181
invokeOnCancellation(SenderWithOnUndeliveredElementCancellationHandler(segment, index, context).asHandler)
181182
}
182183
}
183184

184-
// TODO: Replace with a more efficient cancellation mechanism for segments when #3084 is finished.
185-
private inner class SenderOrReceiverCancellationHandler(
186-
private val segment: ChannelSegment<E>,
187-
private val index: Int
188-
) : BeforeResumeCancelHandler(), DisposableHandle {
189-
override fun dispose() {
190-
segment.onCancellation(index)
191-
}
192-
193-
override fun invoke(cause: Throwable?) = dispose()
194-
}
195-
196185
private inner class SenderWithOnUndeliveredElementCancellationHandler(
197186
private val segment: ChannelSegment<E>,
198187
private val index: Int,
@@ -743,7 +732,8 @@ internal open class BufferedChannel<E>(
743732

744733
private fun CancellableContinuation<*>.prepareReceiverForSuspension(segment: ChannelSegment<E>, index: Int) {
745734
onReceiveEnqueued()
746-
invokeOnCancellation(SenderOrReceiverCancellationHandler(segment, index).asHandler)
735+
this as CancellableContinuationImpl<*>
736+
disposeOnCancellation(segment, index)
747737
}
748738

749739
private fun onClosedReceiveOnNoWaiterSuspend(cont: CancellableContinuation<E>) {
@@ -1508,7 +1498,8 @@ internal open class BufferedChannel<E>(
15081498
index: Int
15091499
) {
15101500
if (onUndeliveredElement == null) {
1511-
disposeOnCompletion(SenderOrReceiverCancellationHandler(segment, index))
1501+
this as SelectImplementation<*>
1502+
disposeOnCancellation(segment, index)
15121503
} else {
15131504
disposeOnCompletion(SenderWithOnUndeliveredElementCancellationHandler(segment, index, context))
15141505
}
@@ -1567,7 +1558,8 @@ internal open class BufferedChannel<E>(
15671558
index: Int
15681559
) {
15691560
onReceiveEnqueued()
1570-
disposeOnCompletion(SenderOrReceiverCancellationHandler(segment, index))
1561+
this as SelectImplementation<*>
1562+
disposeOnCancellation(segment, index)
15711563
}
15721564

15731565
private fun onClosedSelectOnReceive(select: SelectInstance<*>) {
@@ -2871,6 +2863,10 @@ internal class ChannelSegment<E>(id: Long, prev: ChannelSegment<E>?, channel: Bu
28712863
}
28722864
}
28732865

2866+
override fun onCancellation(index: Int, cause: Throwable?) {
2867+
onCancellation(index)
2868+
}
2869+
28742870
/**
28752871
* Returns `true` if the request is successfully cancelled,
28762872
* and no rendezvous has happened. We need this knowledge

kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt

+6-2
Original file line numberDiff line numberDiff line change
@@ -223,9 +223,9 @@ internal abstract class Segment<S : Segment<S>>(val id: Long, prev: S?, pointers
223223
/**
224224
* This function is invoked on continuation cancellation when this segment
225225
* with the specified [index] are installed as cancellation handler via
226-
* `CancellableContinuationImpl.invokeOnCancellation(Segment, Int)`.
226+
* `SegmentDisposable.disposeOnCancellation(Segment, Int)`.
227227
*/
228-
internal open fun invokeOnCancellation(index: Int, cause: Throwable?) {}
228+
internal open fun onCancellation(index: Int, cause: Throwable?) {}
229229

230230
/**
231231
* Invoked on each slot clean-up; should not be invoked twice for the same slot.
@@ -235,6 +235,10 @@ internal abstract class Segment<S : Segment<S>>(val id: Long, prev: S?, pointers
235235
}
236236
}
237237

238+
internal interface SegmentDisposable {
239+
fun disposeOnCancellation(segment: Segment<*>, index: Int)
240+
}
241+
238242
private inline fun AtomicInt.addConditionally(delta: Int, condition: (cur: Int) -> Boolean): Boolean {
239243
while (true) {
240244
val cur = this.value

kotlinx-coroutines-core/common/src/selects/Select.kt

+36-10
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ public sealed interface SelectInstance<in R> {
238238
*/
239239
public fun selectInRegistrationPhase(internalResult: Any?)
240240
}
241-
internal interface SelectInstanceInternal<R>: SelectInstance<R>, Waiter
241+
internal interface SelectInstanceInternal<R>: SelectInstance<R>, Waiter, SegmentDisposable
242242

243243
@PublishedApi
244244
internal open class SelectImplementation<R> constructor(
@@ -380,7 +380,12 @@ internal open class SelectImplementation<R> constructor(
380380
* After that, if the clause is successfully registered (so, it has not completed immediately),
381381
* this [DisposableHandle] is stored into the corresponding [ClauseData] instance.
382382
*/
383-
private var disposableHandle: DisposableHandle? = null
383+
private var disposableHandleOrSegment: Any? = null
384+
385+
/**
386+
* TODO
387+
*/
388+
private var indexInSegment: Int = -1
384389

385390
/**
386391
* Stores the result passed via [selectInRegistrationPhase] during clause registration
@@ -469,8 +474,10 @@ internal open class SelectImplementation<R> constructor(
469474
// This also guarantees that the list of clauses cannot be cleared
470475
// in the registration phase, so it is safe to read it with "!!".
471476
if (!reregister) clauses!! += this
472-
disposableHandle = this@SelectImplementation.disposableHandle
473-
this@SelectImplementation.disposableHandle = null
477+
disposableHandleOrSegment = this@SelectImplementation.disposableHandleOrSegment
478+
indexInSegment = this@SelectImplementation.indexInSegment
479+
this@SelectImplementation.disposableHandleOrSegment = null
480+
this@SelectImplementation.indexInSegment = -1
474481
} else {
475482
// This clause has been selected!
476483
// Update the state correspondingly.
@@ -493,7 +500,12 @@ internal open class SelectImplementation<R> constructor(
493500
}
494501

495502
override fun disposeOnCompletion(disposableHandle: DisposableHandle) {
496-
this.disposableHandle = disposableHandle
503+
this.disposableHandleOrSegment = disposableHandle
504+
}
505+
506+
override fun disposeOnCancellation(segment: Segment<*>, index: Int) {
507+
this.disposableHandleOrSegment = segment
508+
this.indexInSegment = index
497509
}
498510

499511
override fun selectInRegistrationPhase(internalResult: Any?) {
@@ -556,7 +568,8 @@ internal open class SelectImplementation<R> constructor(
556568
*/
557569
private fun reregisterClause(clauseObject: Any) {
558570
val clause = findClause(clauseObject)!! // it is guaranteed that the corresponding clause is presented
559-
clause.disposableHandle = null
571+
clause.disposableHandleOrSegment = null
572+
clause.indexInSegment = -1
560573
clause.register(reregister = true)
561574
}
562575

@@ -692,7 +705,7 @@ internal open class SelectImplementation<R> constructor(
692705
// Invoke all cancellation handlers except for the
693706
// one related to the selected clause, if specified.
694707
clauses.forEach { clause ->
695-
if (clause !== selectedClause) clause.disposableHandle?.dispose()
708+
if (clause !== selectedClause) clause.dispose()
696709
}
697710
// We do need to clean all the data to avoid memory leaks.
698711
this.state.value = STATE_COMPLETED
@@ -716,7 +729,7 @@ internal open class SelectImplementation<R> constructor(
716729
// a concurrent clean-up procedure has already completed, and it is safe to finish.
717730
val clauses = this.clauses ?: return
718731
// Remove this `select` instance from all the clause object (channels, mutexes, etc.).
719-
clauses.forEach { it.disposableHandle?.dispose() }
732+
clauses.forEach { it.dispose() }
720733
// We do need to clean all the data to avoid memory leaks.
721734
this.internalResult = NO_RESULT
722735
this.clauses = null
@@ -731,9 +744,11 @@ internal open class SelectImplementation<R> constructor(
731744
private val processResFunc: ProcessResultFunction,
732745
private val param: Any?, // the user-specified param
733746
private val block: Any, // the user-specified block, which should be called if this clause becomes selected
734-
@JvmField val onCancellationConstructor: OnCancellationConstructor?,
735-
@JvmField var disposableHandle: DisposableHandle? = null
747+
@JvmField val onCancellationConstructor: OnCancellationConstructor?
736748
) {
749+
@JvmField var disposableHandleOrSegment: Any? = null
750+
@JvmField var indexInSegment: Int = -1
751+
737752
/**
738753
* Tries to register the specified [select] instance in [clauseObject] and check
739754
* whether the registration succeeded or a rendezvous has happened during the registration.
@@ -788,6 +803,17 @@ internal open class SelectImplementation<R> constructor(
788803
}
789804
}
790805

806+
fun dispose() {
807+
with(disposableHandleOrSegment) {
808+
if (this is Segment<*>) {
809+
this.onCancellation(indexInSegment, null)
810+
} else {
811+
this as DisposableHandle
812+
this.dispose()
813+
}
814+
}
815+
}
816+
791817
fun createOnCancellationAction(select: SelectInstance<*>, internalResult: Any?) =
792818
onCancellationConstructor?.invoke(select, param, internalResult)
793819
}

kotlinx-coroutines-core/common/src/sync/Mutex.kt

+7-6
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,8 @@ internal open class MutexImpl(locked: Boolean) : SemaphoreImpl(1, if (locked) 1
165165
lockSuspend(owner)
166166
}
167167

168-
private suspend fun lockSuspend(owner: Any?) = suspendCancellableCoroutineReusable { cont ->
168+
private suspend fun lockSuspend(owner: Any?) = suspendCancellableCoroutineReusable<Unit> { cont ->
169+
cont as CancellableContinuationImpl<Unit>
169170
val contWithOwner = CancellableContinuationWithOwner(cont, owner)
170171
acquire(contWithOwner)
171172
}
@@ -205,7 +206,7 @@ internal open class MutexImpl(locked: Boolean) : SemaphoreImpl(1, if (locked) 1
205206
)
206207

207208
protected open fun onLockRegFunction(select: SelectInstance<*>, owner: Any?) {
208-
onAcquireRegFunction(SelectInstanceWithOwner(select, owner), owner)
209+
onAcquireRegFunction(SelectInstanceWithOwner(select as SelectInstanceInternal<*>, owner), owner)
209210
}
210211

211212
protected open fun onLockProcessResult(owner: Any?, result: Any?): Any? {
@@ -214,10 +215,10 @@ internal open class MutexImpl(locked: Boolean) : SemaphoreImpl(1, if (locked) 1
214215

215216
private inner class CancellableContinuationWithOwner(
216217
@JvmField
217-
val cont: CancellableContinuation<Unit>,
218+
val cont: CancellableContinuationImpl<Unit>,
218219
@JvmField
219220
val owner: Any?
220-
) : CancellableContinuation<Unit> by cont {
221+
) : CancellableContinuation<Unit> by cont, SegmentDisposable by cont {
221222
override fun tryResume(value: Unit, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any? {
222223
assert { this@MutexImpl.owner.value === NO_OWNER }
223224
val token = cont.tryResume(value, idempotent) {
@@ -241,10 +242,10 @@ internal open class MutexImpl(locked: Boolean) : SemaphoreImpl(1, if (locked) 1
241242

242243
private inner class SelectInstanceWithOwner<Q>(
243244
@JvmField
244-
val select: SelectInstance<Q>,
245+
val select: SelectInstanceInternal<Q>,
245246
@JvmField
246247
val owner: Any?
247-
) : SelectInstanceInternal<Q> by select as SelectInstanceInternal<Q> {
248+
) : SelectInstanceInternal<Q> by select {
248249
override fun trySelect(clauseObject: Any, result: Any?): Boolean {
249250
assert { this@MutexImpl.owner.value === NO_OWNER }
250251
return select.trySelect(clauseObject, result).also { success ->

kotlinx-coroutines-core/common/src/sync/Semaphore.kt

+6-15
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ internal open class SemaphoreImpl(private val permits: Int, acquiredPermits: Int
185185

186186
private suspend fun acquireSlowPath() = suspendCancellableCoroutineReusable<Unit> sc@ { cont ->
187187
// Try to suspend.
188-
if (addAcquireToQueue(cont)) return@sc
188+
if (addAcquireToQueue(cont as SegmentDisposable)) return@sc
189189
// The suspension has been failed
190190
// due to the synchronous resumption mode.
191191
// Restart the whole `acquire`.
@@ -195,7 +195,7 @@ internal open class SemaphoreImpl(private val permits: Int, acquiredPermits: Int
195195
@JsName("acquireCont")
196196
protected fun acquire(waiter: CancellableContinuation<Unit>) = acquire(
197197
waiter = waiter,
198-
suspend = { cont -> addAcquireToQueue(cont) },
198+
suspend = { cont -> addAcquireToQueue(cont as SegmentDisposable) },
199199
onAcquired = { cont -> cont.resume(Unit, onCancellationRelease) }
200200
)
201201

@@ -219,7 +219,7 @@ internal open class SemaphoreImpl(private val permits: Int, acquiredPermits: Int
219219
protected fun onAcquireRegFunction(select: SelectInstance<*>, ignoredParam: Any?) =
220220
acquire(
221221
waiter = select,
222-
suspend = { s -> addAcquireToQueue(s) },
222+
suspend = { s -> addAcquireToQueue(s as SegmentDisposable) },
223223
onAcquired = { s -> s.selectInRegistrationPhase(Unit) }
224224
)
225225

@@ -281,24 +281,15 @@ internal open class SemaphoreImpl(private val permits: Int, acquiredPermits: Int
281281
/**
282282
* Returns `false` if the received permit cannot be used and the calling operation should restart.
283283
*/
284-
private fun addAcquireToQueue(waiter: Any): Boolean {
284+
private fun addAcquireToQueue(waiter: SegmentDisposable): Boolean {
285285
val curTail = this.tail.value
286286
val enqIdx = enqIdx.getAndIncrement()
287287
val segment = this.tail.findSegmentAndMoveForward(id = enqIdx / SEGMENT_SIZE, startFrom = curTail,
288288
createNewSegment = ::createSegment).segment // cannot be closed
289289
val i = (enqIdx % SEGMENT_SIZE).toInt()
290290
// the regular (fast) path -- if the cell is empty, try to install continuation
291291
if (segment.cas(i, null, waiter)) { // installed continuation successfully
292-
when (waiter) {
293-
is CancellableContinuation<*> -> {
294-
waiter as CancellableContinuationImpl<*>
295-
waiter.invokeOnCancellation(segment, i)
296-
}
297-
is SelectInstance<*> -> {
298-
waiter.disposeOnCompletion(CancelSemaphoreAcquisitionHandler(segment, i))
299-
}
300-
else -> error("unexpected: $waiter")
301-
}
292+
waiter.disposeOnCancellation(segment, i)
302293
return true
303294
}
304295
// On CAS failure -- the cell must be either PERMIT or BROKEN
@@ -385,7 +376,7 @@ private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?, pointers: Int)
385376

386377
// Cleans the acquirer slot located by the specified index
387378
// and removes this segment physically if all slots are cleaned.
388-
override fun invokeOnCancellation(index: Int, cause: Throwable?) {
379+
override fun onCancellation(index: Int, cause: Throwable?) {
389380
// Clean the slot
390381
set(index, CANCELLED)
391382
// Remove this segment if needed

kotlinx-coroutines-core/common/test/CancellableContinuationHandlersTest.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -164,10 +164,10 @@ class CancellableContinuationHandlersTest : TestBase() {
164164
@Test
165165
fun testSegmentAsHandler() = runTest {
166166
class MySegment : Segment<MySegment>(0, null, 0) {
167-
override val maxSlots: Int get() = 0
167+
override val numberOfSlots: Int get() = 0
168168

169169
var invokeOnCancellationCalled = false
170-
override fun invokeOnCancellation(index: Int, cause: Throwable?) {
170+
override fun onCancellation(index: Int, cause: Throwable?) {
171171
invokeOnCancellationCalled = true
172172
}
173173
}
@@ -177,7 +177,7 @@ class CancellableContinuationHandlersTest : TestBase() {
177177
suspendCancellableCoroutine<Unit> { c ->
178178
expect(2)
179179
c as CancellableContinuationImpl<*>
180-
c.invokeOnCancellation(s, 0)
180+
c.disposeOnCancellation(s, 0)
181181
c.cancel()
182182
}
183183
} catch (e: CancellationException) {

0 commit comments

Comments
 (0)