Skip to content

Commit f6430f4

Browse files
committed
Cleanup:
Implementation-independent extensions Remove and encapsulate parts of JobSupport Assert invariants in AbstractContinuation Use separate code path for Cancelling state
1 parent 80a2947 commit f6430f4

File tree

5 files changed

+82
-83
lines changed

5 files changed

+82
-83
lines changed

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/AbstractContinuation.kt

+41-44
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ internal abstract class AbstractContinuation<in T>(
104104
val handle = parent.invokeOnCompletion(onCancelling = true, handler = ChildContinuation(parent, this).asHandler)
105105

106106
parentHandle = handle
107-
// now check our state _after_ registering (see updateState order of actions)
107+
// now check our state _after_ registering (see updateStateToFinal order of actions)
108108
if (isCompleted) {
109109
handle.dispose()
110110
parentHandle = NonDisposableHandle // release it just in case, to aid GC
@@ -117,7 +117,7 @@ internal abstract class AbstractContinuation<in T>(
117117
loopOnState { state ->
118118
if (state !is NotCompleted) return false // quit if already complete
119119
if (state is Cancelling) return false // someone else succeeded
120-
if (updateStateCancelled(state, cause)) return true
120+
if (tryCancel(state, cause)) return true
121121
}
122122
}
123123

@@ -170,8 +170,8 @@ internal abstract class AbstractContinuation<in T>(
170170
is CancelledContinuation -> {
171171
/*
172172
* Continuation is complete, invoke directly.
173-
* NOTE: multiple invokeOnCancellation calls with different handlers are allowed on completed coroutine.
174-
* It's slightly inconsistent with running coroutine, but currently, we have no mechanism to check
173+
* NOTE: multiple invokeOnCancellation calls with different handlers are allowed on cancelled continuation.
174+
* It's inconsistent with running continuation, but currently, we have no mechanism to check
175175
* whether any handler was registered during continuation lifecycle without additional overhead.
176176
* This may be changed in the future.
177177
*
@@ -181,19 +181,28 @@ internal abstract class AbstractContinuation<in T>(
181181
handler.invokeIt((state as? CompletedExceptionally)?.cause)
182182
return
183183
}
184+
is Cancelling -> error("Cancellation handlers for continuations with 'Cancelling' state are not supported")
184185
else -> return
185186
}
186187
}
187188
}
188189

189-
private fun updateStateCancelled(state: NotCompleted, cause: Throwable?): Boolean {
190-
val update: Any = if (useCancellingState) {
191-
Cancelling(CancelledContinuation(this, cause))
192-
} else {
193-
CancelledContinuation(this, cause)
190+
private fun makeHandler(handler: CompletionHandler): CancellationHandlerImpl<*> {
191+
if (handler is CancellationHandlerImpl<*>) {
192+
require(handler.continuation === this) { "Handler has non-matching continuation ${handler.continuation}, current: $this" }
193+
return handler
194+
}
195+
196+
return InvokeOnCancel(this, handler)
197+
}
198+
199+
private fun tryCancel(state: NotCompleted, cause: Throwable?): Boolean {
200+
if (useCancellingState) {
201+
require(state !is CancellationHandler) { "Invariant: 'Cancelling' state and cancellation handlers cannot be used together" }
202+
return _state.compareAndSet(state, Cancelling(CancelledContinuation(this, cause)))
194203
}
195204

196-
return updateState(state, update, mode = MODE_ATOMIC_DEFAULT)
205+
return updateStateToFinal(state, CancelledContinuation(this, cause), mode = MODE_ATOMIC_DEFAULT)
197206
}
198207

199208
private fun onCompletionInternal(mode: Int) {
@@ -226,7 +235,7 @@ internal abstract class AbstractContinuation<in T>(
226235
*/
227236
if (proposedUpdate !is CompletedExceptionally) {
228237
val update = state.cancel
229-
if (updateState(state, update, resumeMode)) return
238+
if (updateStateToFinal(state, update, resumeMode)) return
230239
} else {
231240
/*
232241
* If already cancelled block is resumed with an exception,
@@ -271,14 +280,14 @@ internal abstract class AbstractContinuation<in T>(
271280
update = CompletedExceptionally(exception)
272281
}
273282

274-
if (updateState(state, update, resumeMode)) {
283+
if (updateStateToFinal(state, update, resumeMode)) {
275284
return
276285
}
277286
}
278287
}
279288

280289
is NotCompleted -> {
281-
if (updateState(state, proposedUpdate, resumeMode)) return
290+
if (updateStateToFinal(state, proposedUpdate, resumeMode)) return
282291
}
283292
is CancelledContinuation -> {
284293
if (proposedUpdate is NotCompleted || proposedUpdate is CompletedExceptionally) {
@@ -307,32 +316,30 @@ internal abstract class AbstractContinuation<in T>(
307316
}
308317
}
309318

310-
private fun makeHandler(handler: CompletionHandler): CancellationHandlerImpl<*> {
311-
if (handler is CancellationHandlerImpl<*>) {
312-
require(handler.continuation === this) { "Handler has non-matching continuation ${handler.continuation}, current: $this" }
313-
return handler
319+
/**
320+
* Tries to make transition from active to cancelled or completed state and invokes cancellation handler if necessary
321+
*/
322+
private fun updateStateToFinal(expect: NotCompleted, proposedUpdate: Any?, mode: Int): Boolean {
323+
if (!tryUpdateStateToFinal(expect, proposedUpdate)) {
324+
return false
314325
}
315326

316-
return InvokeOnCancel(this, handler)
317-
}
318-
319-
private fun handleException(exception: Throwable) {
320-
handleCoroutineException(context, exception)
327+
completeStateUpdate(expect, proposedUpdate, mode)
328+
return true
321329
}
322330

323-
private fun updateState(expect: NotCompleted, proposedUpdate: Any?, mode: Int): Boolean {
324-
// TODO slow path for cancelling?
325-
if (!tryUpdateState(expect, proposedUpdate)) {
326-
return false
327-
}
328-
329-
if (proposedUpdate !is Cancelling) {
330-
completeUpdateState(expect, proposedUpdate, mode)
331+
protected fun tryUpdateStateToFinal(expect: NotCompleted, update: Any?): Boolean {
332+
require(update !is NotCompleted) // only NotCompleted -> completed transition is allowed
333+
if (!_state.compareAndSet(expect, update)) return false
334+
// Unregister from parent job
335+
parentHandle?.let {
336+
it.dispose() // volatile read parentHandle _after_ state was updated
337+
parentHandle = NonDisposableHandle // release it just in case, to aid GC
331338
}
332-
return true
339+
return true // continues in completeStateUpdate
333340
}
334341

335-
protected fun completeUpdateState(expect: NotCompleted, update: Any?, mode: Int) {
342+
protected fun completeStateUpdate(expect: NotCompleted, update: Any?, mode: Int) {
336343
val exceptionally = update as? CompletedExceptionally
337344
onCompletionInternal(mode)
338345

@@ -346,18 +353,8 @@ internal abstract class AbstractContinuation<in T>(
346353
}
347354
}
348355

349-
protected fun tryUpdateState(expect: NotCompleted, update: Any?): Boolean {
350-
if (!_state.compareAndSet(expect, update)) return false
351-
352-
// TODO separate code path?
353-
if (update !is Cancelling) {
354-
// Unregister from parent job
355-
parentHandle?.let {
356-
it.dispose() // volatile read parentHandle _after_ state was updated
357-
parentHandle = NonDisposableHandle // release it just in case, to aid GC
358-
}
359-
}
360-
return true // continues in completeUpdateState
356+
private fun handleException(exception: Throwable) {
357+
handleCoroutineException(context, exception)
361358
}
362359

363360
// For nicer debugging

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt

+28-14
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import kotlin.coroutines.experimental.intrinsics.*
2424
// --------------- cancellable continuations ---------------
2525

2626
/**
27-
* Cancellable continuation. Its job is _completed_ when it is resumed or cancelled.
27+
* Cancellable continuation. It is _completed_ when it is resumed or cancelled.
2828
* When [cancel] function is explicitly invoked, this continuation immediately resumes with [CancellationException] or
2929
* with the specified cancel cause.
3030
*
@@ -211,16 +211,24 @@ public suspend inline fun <T> suspendAtomicCancellableCoroutine(
211211
replaceWith = ReplaceWith("removeOnCancellation(handler)"),
212212
level = DeprecationLevel.HIDDEN)
213213
public fun CancellableContinuation<*>.removeOnCancel(node: LockFreeLinkedListNode): DisposableHandle {
214-
invokeOnCancellation(handler = RemoveOnCancel(this as CancellableContinuationImpl<*>, node).asHandler)
214+
removeOnCancellation(node)
215215
return NonDisposableHandle
216216
}
217217

218218
/**
219219
* Removes a given node on cancellation.
220220
* @suppress **This is unstable API and it is subject to change.**
221221
*/
222-
public fun CancellableContinuation<*>.removeOnCancellation(node: LockFreeLinkedListNode): Unit =
223-
invokeOnCancellation(handler = RemoveOnCancel(this as CancellableContinuationImpl<*>, node).asHandler)
222+
public fun CancellableContinuation<*>.removeOnCancellation(node: LockFreeLinkedListNode): Unit {
223+
val handler: CompletionHandler
224+
if (this is CancellableContinuationImpl<*>) {
225+
handler = RemoveOnCancel(this, node).asHandler
226+
} else {
227+
handler = { node.remove() }
228+
}
229+
230+
invokeOnCancellation(handler)
231+
}
224232

225233
/**
226234
* Disposes a specified [handle] when this continuation is cancelled.
@@ -235,7 +243,7 @@ public fun CancellableContinuation<*>.removeOnCancellation(node: LockFreeLinkedL
235243
replaceWith = ReplaceWith("disposeOnCancellation(handler)"),
236244
level = DeprecationLevel.HIDDEN)
237245
public fun CancellableContinuation<*>.disposeOnCompletion(handle: DisposableHandle): DisposableHandle {
238-
invokeOnCancellation(handler = DisposeOnCancellation(this as CancellableContinuationImpl<*>, handle).asHandler)
246+
disposeOnCancellation(handle)
239247
return NonDisposableHandle
240248
}
241249

@@ -247,8 +255,17 @@ public fun CancellableContinuation<*>.disposeOnCompletion(handle: DisposableHand
247255
* invokeOnCancellation { handle.dispose() }
248256
* ```
249257
*/
250-
public fun CancellableContinuation<*>.disposeOnCancellation(handle: DisposableHandle) =
251-
invokeOnCancellation(handler = DisposeOnCancellation(this as CancellableContinuationImpl<*>, handle).asHandler)
258+
public fun CancellableContinuation<*>.disposeOnCancellation(handle: DisposableHandle) {
259+
val handler: CompletionHandler
260+
if (this is CancellableContinuationImpl<*>) {
261+
handler = DisposeOnCancellation(this, handle).asHandler
262+
} else {
263+
handler = { handle.dispose() }
264+
}
265+
266+
267+
invokeOnCancellation(handler)
268+
}
252269

253270
// --------------- implementation details ---------------
254271

@@ -259,8 +276,7 @@ private class RemoveOnCancel(
259276
) : CancellationHandlerImpl<CancellableContinuationImpl<*>>(cont) {
260277

261278
override fun invoke(cause: Throwable?) {
262-
if (continuation.isCancelled)
263-
node.remove()
279+
node.remove()
264280
}
265281

266282
override fun toString() = "RemoveOnCancel[$node]"
@@ -299,7 +315,7 @@ internal class CancellableContinuationImpl<in T>(
299315
is NotCompleted -> {
300316
val update: Any? = if (idempotent == null) value else
301317
CompletedIdempotentResult(idempotent, value, state)
302-
if (tryUpdateState(state, update)) return state
318+
if (tryUpdateStateToFinal(state, update)) return state
303319
}
304320
is CompletedIdempotentResult -> {
305321
if (state.idempotentResume === idempotent) {
@@ -317,16 +333,14 @@ internal class CancellableContinuationImpl<in T>(
317333
loopOnState { state ->
318334
when (state) {
319335
is NotCompleted -> {
320-
if (tryUpdateState(state, CompletedExceptionally(exception))) return state
336+
if (tryUpdateStateToFinal(state, CompletedExceptionally(exception))) return state
321337
}
322338
else -> return null // cannot resume -- not active anymore
323339
}
324340
}
325341
}
326342

327-
override fun completeResume(token: Any) {
328-
completeUpdateState(token as NotCompleted, state, resumeMode)
329-
}
343+
override fun completeResume(token: Any) = completeStateUpdate(token as NotCompleted, state, resumeMode)
330344

331345
override fun CoroutineDispatcher.resumeUndispatched(value: T) {
332346
val dc = delegate as? DispatchedContinuation

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/JobSupport.kt

+7-19
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
124124
/**
125125
* @suppress **This is unstable API and it is subject to change.**
126126
*/
127-
internal inline fun loopOnState(block: (Any?) -> Unit): Nothing {
127+
private inline fun loopOnState(block: (Any?) -> Unit): Nothing {
128128
while (true) {
129129
block(state)
130130
}
@@ -148,7 +148,7 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
148148
* Updates current [state] of this job. Returns `false` if current state is not equal to expected.
149149
* @suppress **This is unstable API and it is subject to change.**
150150
*/
151-
internal fun updateState(expect: Incomplete, proposedUpdate: Any?, mode: Int): Boolean {
151+
private fun updateState(expect: Incomplete, proposedUpdate: Any?, mode: Int): Boolean {
152152
val update = coerceProposedUpdate(expect, proposedUpdate)
153153
if (!tryUpdateState(expect, update)) return false
154154
completeUpdateState(expect, update, mode)
@@ -181,7 +181,7 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
181181
* Tries to initiate update of the current [state] of this job.
182182
* @suppress **This is unstable API and it is subject to change.**
183183
*/
184-
internal fun tryUpdateState(expect: Incomplete, update: Any?): Boolean {
184+
private fun tryUpdateState(expect: Incomplete, update: Any?): Boolean {
185185
require(update !is Incomplete) // only incomplete -> completed transition is allowed
186186
if (!_state.compareAndSet(expect, update)) return false
187187
// Unregister from parent job
@@ -196,7 +196,7 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
196196
* Completes update of the current [state] of this job.
197197
* @suppress **This is unstable API and it is subject to change.**
198198
*/
199-
internal fun completeUpdateState(expect: Incomplete, update: Any?, mode: Int) {
199+
private fun completeUpdateState(expect: Incomplete, update: Any?, mode: Int) {
200200
val exceptionally = update as? CompletedExceptionally
201201
// Do overridable processing before completion handlers
202202
if (!expect.isCancelling) onCancellationInternal(exceptionally) // only notify when was not cancelling before
@@ -338,7 +338,6 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
338338
promoteSingleToNodeList(state as JobNode<*>)
339339
} else {
340340
if (state is Finishing && state.cancelled != null && onCancelling) {
341-
check(onCancelMode != ON_CANCEL_MAKE_CANCELLED) // cannot be in this state unless were support cancelling state
342341
// installing cancellation handler on job that is being cancelled
343342
if (invokeImmediately) handler(state.cancelled.cause)
344343
return NonDisposableHandle
@@ -358,12 +357,11 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
358357
}
359358

360359
private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode<*> {
361-
val hasCancellingState = onCancelMode != ON_CANCEL_MAKE_CANCELLED
362-
return if (onCancelling && hasCancellingState)
360+
return if (onCancelling)
363361
(handler as? JobCancellationNode<*>)?.also { require(it.job === this) }
364362
?: InvokeOnCancellation(this, handler)
365363
else
366-
(handler as? JobNode<*>)?.also { require(it.job === this && (!hasCancellingState || it !is JobCancellationNode)) }
364+
(handler as? JobNode<*>)?.also { require(it.job === this && it !is JobCancellationNode) }
367365
?: InvokeOnCompletion(this, handler)
368366
}
369367

@@ -458,7 +456,6 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
458456
internal open val onCancelMode: Int get() = ON_CANCEL_MAKE_CANCELLING
459457

460458
public override fun cancel(cause: Throwable?): Boolean = when (onCancelMode) {
461-
ON_CANCEL_MAKE_CANCELLED -> makeCancelled(cause)
462459
ON_CANCEL_MAKE_CANCELLING -> makeCancelling(cause)
463460
ON_CANCEL_MAKE_COMPLETING -> makeCompletingOnCancel(cause)
464461
else -> error("Invalid onCancelMode $onCancelMode")
@@ -469,14 +466,6 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
469466
private fun updateStateCancelled(state: Incomplete, cause: Throwable?) =
470467
updateState(state, Cancelled(this, cause), mode = MODE_ATOMIC_DEFAULT)
471468

472-
// transitions to Cancelled state
473-
private fun makeCancelled(cause: Throwable?): Boolean {
474-
loopOnState { state ->
475-
if (state !is Incomplete) return false // quit if already complete
476-
if (updateStateCancelled(state, cause)) return true
477-
}
478-
}
479-
480469
// transitions to Cancelling state
481470
private fun makeCancelling(cause: Throwable?): Boolean {
482471
loopOnState { state ->
@@ -772,7 +761,7 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
772761
}
773762

774763
private suspend fun awaitSuspend(): Any? = suspendCancellableCoroutine { cont ->
775-
// We have to invoke await() ha ndler only on cancellation, on completion we will be resumed regularly without handlers
764+
// We have to invoke await() handler only on cancellation, on completion we will be resumed regularly without handlers
776765
cont.disposeOnCancellation(invokeOnCompletion {
777766
val state = this.state
778767
check(state !is Incomplete)
@@ -827,7 +816,6 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
827816
// --------------- helper classes to simplify job implementation
828817

829818

830-
internal const val ON_CANCEL_MAKE_CANCELLED = 0
831819
internal const val ON_CANCEL_MAKE_CANCELLING = 1
832820
internal const val ON_CANCEL_MAKE_COMPLETING = 2
833821

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Future.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,9 @@ private class CancelFutureOnCompletion(
7676
}
7777

7878
private class CancelFutureOnCancellation(
79-
continuation: CancellableContinuation<*>,
79+
continuation: AbstractContinuation<*>,
8080
private val future: Future<*>
81-
) : CancellationHandlerImpl<AbstractContinuation<*>>(continuation as AbstractContinuation<*>) {
81+
) : CancellationHandlerImpl<AbstractContinuation<*>>(continuation) {
8282

8383
override fun invoke(reason: Throwable?) {
8484
// Don't interrupt when cancelling future on completion, because no one is going to reset this

0 commit comments

Comments
 (0)