Skip to content

Structured concurrency & Job cancellation improvements #1020

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Mar 13, 2019

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,17 @@ class ListenableFutureTest : TestBase() {
}

@Test
fun testAwaitWithContextCancellation() = runTest(expected = {it is IOException}) {
fun testAwaitWithCancellation() = runTest(expected = {it is TestCancellationException}) {
val future = SettableFuture.create<Int>()
val deferred = async {
withContext(Dispatchers.Default) {
future.await()
}
}

deferred.cancel(IOException())
deferred.await()
deferred.cancel(TestCancellationException())
deferred.await() // throws TCE
expectUnreached()
}

@Test
Expand Down
7 changes: 6 additions & 1 deletion integration/kotlinx-coroutines-jdk8/src/future/Future.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package kotlinx.coroutines.future

import kotlinx.coroutines.*
import kotlinx.coroutines.CancellationException
import java.util.concurrent.*
import java.util.function.*
import kotlin.coroutines.*
Expand Down Expand Up @@ -70,7 +71,11 @@ private class CompletableFutureCoroutine<T>(
*/
public fun <T> Deferred<T>.asCompletableFuture(): CompletableFuture<T> {
val future = CompletableFuture<T>()
future.whenComplete { _, exception -> cancel(exception) }
future.whenComplete { _, exception ->
cancel(exception?.let {
it as? CancellationException ?: CancellationException("Future failed", it)
})
}
invokeOnCompletion {
try {
future.complete(getCompleted())
Expand Down
10 changes: 6 additions & 4 deletions kotlinx-coroutines-core/common/src/CoroutineScope.kt
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,12 @@ public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
ContextScope(if (context[Job] != null) context else context + Job())

/**
* Cancels this scope, including its job and all its children.
* Cancels this scope, including its job and all its children with an optional cancellation [cause].
* A cause can be used to specify an error message or to provide other details on
* a cancellation reason for debugging purposes.
* Throws [IllegalStateException] if the scope does not have a job in it.
**/
public fun CoroutineScope.cancel() {
*/
public fun CoroutineScope.cancel(cause: CancellationException? = null) {
val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this")
job.cancel()
job.cancel(cause)
}
3 changes: 3 additions & 0 deletions kotlinx-coroutines-core/common/src/Exceptions.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ public expect class CompletionHandlerException(message: String, cause: Throwable

public expect open class CancellationException(message: String?) : IllegalStateException

@Suppress("FunctionName")
public expect fun CancellationException(message: String?, cause: Throwable?) : CancellationException

internal expect class JobCancellationException(
message: String,
cause: Throwable?,
Expand Down
95 changes: 50 additions & 45 deletions kotlinx-coroutines-core/common/src/Job.kt
Original file line number Diff line number Diff line change
Expand Up @@ -154,27 +154,25 @@ public interface Job : CoroutineContext.Element {
*/
public fun start(): Boolean


/**
* @suppress
* Cancels this job with an optional cancellation [cause].
* A cause can be used to specify an error message or to provide other details on
* a cancellation reason for debugging purposes.
* See [Job] documentation for full explanation of cancellation machinery.
*/
@Suppress("INAPPLICABLE_JVM_NAME", "DEPRECATION")
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Left here for binary compatibility")
@JvmName("cancel")
public fun cancel0(): Boolean = cancel(null)
public fun cancel(cause: CancellationException? = null)

/**
* Cancels this job.
* See [Job] documentation for full explanation of cancellation machinery.
* @suppress This method implements old version of JVM ABI. Use [cancel].
*/
public fun cancel(): Unit
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility only")
public fun cancel() = cancel(null)

/**
* @suppress
* @suppress This method has bad semantics when cause is not a [CancellationException]. Use [cancel].
*/
@ObsoleteCoroutinesApi
@Deprecated(level = DeprecationLevel.WARNING, message = "Use CompletableDeferred.completeExceptionally(cause) or Job.cancel() instead",
replaceWith = ReplaceWith("cancel()")
)
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility only")
public fun cancel(cause: Throwable? = null): Boolean

// ------------ parent-child ------------
Expand Down Expand Up @@ -479,21 +477,26 @@ public suspend fun Job.cancelAndJoin() {
}

/**
* @suppress
* Cancels all [children][Job.children] jobs of this coroutine using [Job.cancel] for all of them
* with an optional cancellation [cause].
* Unlike [Job.cancel] on this job as a whole, the state of this job itself is not affected.
*/
@ObsoleteCoroutinesApi
@Deprecated(level = DeprecationLevel.WARNING, message = "Use cancelChildren() without cause", replaceWith = ReplaceWith("cancelChildren()"))
public fun Job.cancelChildren(cause: Throwable? = null) {
@Suppress("DEPRECATION")
public fun Job.cancelChildren(cause: CancellationException? = null) {
children.forEach { it.cancel(cause) }
}

/**
* Cancels all [children][Job.children] jobs of this coroutine using [Job.cancel] for all of them.
* Unlike [Job.cancel] on this job as a whole, the state of this job itself is not affected.
* @suppress This method implements old version of JVM ABI. Use [cancel].
*/
public fun Job.cancelChildren() {
children.forEach { it.cancel() }
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility")
public fun Job.cancelChildren() = cancelChildren(null)

/**
* @suppress This method has bad semantics when cause is not a [CancellationException]. Use [Job.cancelChildren].
*/
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility")
public fun Job.cancelChildren(cause: Throwable? = null) {
children.forEach { (it as? JobSupport)?.cancelInternal(cause) }
}

// -------------------- CoroutineContext extensions --------------------
Expand All @@ -517,47 +520,49 @@ public fun Job.cancelChildren() {
public val CoroutineContext.isActive: Boolean
get() = this[Job]?.isActive == true


/**
* @suppress
* Cancels [Job] of this context with an optional cancellation cause.
* See [Job.cancel] for details.
*/
@Suppress("unused")
@JvmName("cancel")
@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
public fun CoroutineContext.cancel0(): Boolean {
this[Job]?.cancel()
return true
public fun CoroutineContext.cancel(cause: CancellationException? = null) {
this[Job]?.cancel(cause)
}

/**
* Cancels [Job] of this context. See [Job.cancel] for details.
* @suppress This method implements old version of JVM ABI. Use [CoroutineContext.cancel].
*/
public fun CoroutineContext.cancel(): Unit {
this[Job]?.cancel()
}
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility")
public fun CoroutineContext.cancel() = cancel(null)

/**
* @suppress
* @suppress This method has bad semantics when cause is not a [CancellationException]. Use [CoroutineContext.cancel].
*/
@ObsoleteCoroutinesApi
@Deprecated(level = DeprecationLevel.WARNING, message = "Use cancel() without cause", replaceWith = ReplaceWith("cancel()"))
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility")
public fun CoroutineContext.cancel(cause: Throwable? = null): Boolean =
@Suppress("DEPRECATION")
this[Job]?.cancel(cause) ?: false
(this[Job] as? JobSupport)?.cancelInternal(cause) ?: false

/**
* Cancels all children of the [Job] in this context, without touching the the state of this job itself.
* Cancels all children of the [Job] in this context, without touching the the state of this job itself
* with an optional cancellation cause. See [Job.cancel].
* It does not do anything if there is no job in the context or it has no children.
*/
public fun CoroutineContext.cancelChildren() {
this[Job]?.children?.forEach { it.cancel() }
public fun CoroutineContext.cancelChildren(cause: CancellationException? = null) {
this[Job]?.children?.forEach { it.cancel(cause) }
}

@ObsoleteCoroutinesApi
@Deprecated(level = DeprecationLevel.WARNING, message = "Use cancelChildren() without cause", replaceWith = ReplaceWith("cancelChildren()"))
/**
* @suppress This method implements old version of JVM ABI. Use [CoroutineContext.cancelChildren].
*/
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility")
public fun CoroutineContext.cancelChildren() = cancelChildren(null)

/**
* @suppress This method has bad semantics when cause is not a [CancellationException]. Use [CoroutineContext.cancelChildren].
*/
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility")
public fun CoroutineContext.cancelChildren(cause: Throwable? = null) {
@Suppress("DEPRECATION")
this[Job]?.children?.forEach { it.cancel(cause) }
this[Job]?.children?.forEach { (it as? JobSupport)?.cancelInternal(cause) }
}

/**
Expand Down
33 changes: 22 additions & 11 deletions kotlinx-coroutines-core/common/src/JobSupport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -370,16 +370,17 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
public final override fun getCancellationException(): CancellationException {
val state = this.state
return when (state) {
is Finishing -> state.rootCause?.toCancellationException("Job is cancelling")
is Finishing -> state.rootCause?.toCancellationException("$classSimpleName is cancelling")
?: error("Job is still new or active: $this")
is Incomplete -> error("Job is still new or active: $this")
is CompletedExceptionally -> state.cause.toCancellationException("Job was cancelled")
else -> JobCancellationException("Job has completed normally", null, this)
is CompletedExceptionally -> state.cause.toCancellationException()
else -> JobCancellationException("$classSimpleName has completed normally", null, this)
}
}

private fun Throwable.toCancellationException(message: String): CancellationException =
this as? CancellationException ?: JobCancellationException(message, this, this@JobSupport)
protected fun Throwable.toCancellationException(message: String? = null): CancellationException =
this as? CancellationException ?:
JobCancellationException(message ?: "$classSimpleName was cancelled", this, this@JobSupport)

/**
* Returns the cause that signals the completion of this job -- it returns the original
Expand Down Expand Up @@ -565,14 +566,20 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
*/
internal open val onCancelComplete: Boolean get() = false

// external cancel without cause, never invoked implicitly from internal machinery
public override fun cancel() {
@Suppress("DEPRECATION")
cancel(null) // must delegate here, because some classes override cancel(x)
// external cancel with cause, never invoked implicitly from internal machinery
public override fun cancel(cause: CancellationException?) {
cancelInternal(cause) // must delegate here, because some classes override cancelInternal(x)
}

// HIDDEN in Job interface. Invoked only by legacy compiled code.
// external cancel with (optional) cause, never invoked implicitly from internal machinery
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility only")
public override fun cancel(cause: Throwable?): Boolean =
cancelInternal(cause)

// It is overridden in channel-linked implementation
// Note: Boolean result is used only in DEPRECATED functions
public open fun cancelInternal(cause: Throwable?): Boolean =
cancelImpl(cause) && handlesException

// Parent is cancelling child
Expand All @@ -581,11 +588,15 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
}

// Child was cancelled with cause
// It is overridden in supervisor implementations to ignore child cancellation
public open fun childCancelled(cause: Throwable): Boolean =
cancelImpl(cause) && handlesException

// For AbstractCoroutine implementations
protected fun cancelCoroutine(cause: Throwable?) =
/**
* Makes this [Job] cancelled with a specified [cause].
* It is used in [AbstractCoroutine]-derived classes when there is an internal failure.
*/
public fun cancelCoroutine(cause: Throwable?) =
cancelImpl(cause)

// cause is Throwable or ParentJob when cancelChild was invoked
Expand Down
8 changes: 3 additions & 5 deletions kotlinx-coroutines-core/common/src/NonCancellable.kt
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,13 @@ public object NonCancellable : AbstractCoroutineContextElement(Job), Job {
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
@Suppress("RETURN_TYPE_MISMATCH_ON_OVERRIDE")
override fun cancel(): Unit {
}
override fun cancel(cause: CancellationException?) {}

/**
* Always returns `false`.
* @suppress **This an internal API and should not be used from general code.**
* @suppress This method has bad semantics when cause is not a [CancellationException]. Use [cancel].
*/
@InternalCoroutinesApi
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility only")
override fun cancel(cause: Throwable?): Boolean = false // never handles exceptions

/**
Expand Down
12 changes: 8 additions & 4 deletions kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -655,12 +655,16 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
return if (result === POLL_FAILED) null else receiveOrNullResult(result)
}

override fun cancel() {
@Suppress("DEPRECATION")
cancel(null)
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility only")
final override fun cancel(cause: Throwable?): Boolean =
cancelInternal(cause)

final override fun cancel(cause: CancellationException?) {
cancelInternal(cause)
}

override fun cancel(cause: Throwable?): Boolean =
// It needs to be internal to support deprecated cancel(Throwable?) API
internal open fun cancelInternal(cause: Throwable?): Boolean =
close(cause).also {
cleanupSendQueueOnCancel()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,17 @@ internal class ArrayBroadcastChannel<E>(
return true
}

public override fun cancel(cause: Throwable?): Boolean =
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility only")
override fun cancel(cause: Throwable?): Boolean =
cancelInternal(cause)

override fun cancel(cause: CancellationException?) {
cancelInternal(cause)
}

private fun cancelInternal(cause: Throwable?): Boolean =
close(cause).also {
@Suppress("DEPRECATION")
for (sub in subscribers) sub.cancel(cause)
for (sub in subscribers) sub.cancelInternal(cause)
}

// result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
Expand Down Expand Up @@ -201,7 +208,7 @@ internal class ArrayBroadcastChannel<E>(
override val isBufferAlwaysFull: Boolean get() = error("Should not be used")
override val isBufferFull: Boolean get() = error("Should not be used")

override fun cancel(cause: Throwable?): Boolean =
override fun cancelInternal(cause: Throwable?): Boolean =
close(cause).also { closed ->
if (closed) broadcastChannel.updateHead(removeSub = this)
clearBuffer()
Expand Down
17 changes: 12 additions & 5 deletions kotlinx-coroutines-core/common/src/channels/Broadcast.kt
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,18 @@ private open class BroadcastCoroutine<E>(
override val channel: SendChannel<E>
get() = this

override fun cancel(cause: Throwable?): Boolean {
val wasCancelled = _channel.cancel(cause)
@Suppress("DEPRECATION")
if (wasCancelled) cancelCoroutine(cause) // cancel the job
return wasCancelled
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility only")
final override fun cancel(cause: Throwable?): Boolean =
cancelInternal(cause)

final override fun cancel(cause: CancellationException?) {
cancelInternal(cause)
}

override fun cancelInternal(cause: Throwable?): Boolean {
_channel.cancel(cause?.toCancellationException()) // cancel the channel
cancelCoroutine(cause) // cancel the job
return true // does not matter - result is used in DEPRECATED functions only
}

override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
Expand Down
Loading