Skip to content

Structured concurrency & Job cancellation improvements #1020

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Mar 13, 2019

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions integration/kotlinx-coroutines-guava/src/ListenableFuture.kt
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@ public fun <T> CoroutineScope.future(

private class ListenableFutureCoroutine<T>(
context: CoroutineContext,
private val completion: SettableFuture<T>
private val future: SettableFuture<T>
) : AbstractCoroutine<T>(context), FutureCallback<T> {

/*
* We register coroutine as callback to the future this coroutine completes.
* But when future is cancelled externally, we'd like to cancel coroutine,
Expand All @@ -66,12 +65,13 @@ private class ListenableFutureCoroutine<T>(
}

override fun onCompleted(value: T) {
completion.set(value)
future.set(value)
}

override fun onCompletedExceptionally(exception: Throwable) {
if (!completion.setException(exception)) {
handleCoroutineException(parentContext, exception, this)
override fun handleJobException(exception: Throwable, handled: Boolean) {
if (!future.setException(exception) && !handled) {
// prevents loss of exception that was not handled by parent & could not be set to SettableFuture
handleCoroutineException(context, exception)
}
}
}
Expand Down
41 changes: 36 additions & 5 deletions integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,17 @@ class ListenableFutureTest : TestBase() {
}

@Test
fun testAwaitWithContextCancellation() = runTest(expected = {it is IOException}) {
fun testAwaitWithCancellation() = runTest(expected = {it is TestCancellationException}) {
val future = SettableFuture.create<Int>()
val deferred = async {
withContext(Dispatchers.Default) {
future.await()
}
}

deferred.cancel(IOException())
deferred.await()
deferred.cancel(TestCancellationException())
deferred.await() // throws TCE
expectUnreached()
}

@Test
Expand Down Expand Up @@ -258,13 +259,24 @@ class ListenableFutureTest : TestBase() {
}

@Test
fun testChildException() = runTest {
fun testStructuredException() = runTest(
expected = { it is TestException } // exception propagates to parent with structured concurrency
) {
val result = future<Int>(Dispatchers.Unconfined) {
throw TestException("FAIL")
}
result.checkFutureException<TestException>()
}

@Test
fun testChildException() = runTest(
expected = { it is TestException } // exception propagates to parent with structured concurrency
) {
val result = future(Dispatchers.Unconfined) {
// child crashes
launch { throw TestException("FAIL") }
42
}

result.checkFutureException<TestException>()
}

Expand Down Expand Up @@ -295,7 +307,26 @@ class ListenableFutureTest : TestBase() {
throw TestException()
}
}
result.cancel(true)
finish(3)
}

@Test
fun testUnhandledExceptionOnExternalCancellation() = runTest(
unhandled = listOf(
{ it -> it is TestException } // exception is unhandled because there is no parent
)
) {
expect(1)
// No parent here (NonCancellable), so nowhere to propagate exception
val result = future(NonCancellable + Dispatchers.Unconfined) {
try {
delay(Long.MAX_VALUE)
} finally {
expect(2)
throw TestException() // this exception cannot be handled
}
}
result.cancel(true)
finish(3)
}
Expand Down
19 changes: 12 additions & 7 deletions integration/kotlinx-coroutines-jdk8/src/future/Future.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package kotlinx.coroutines.future

import kotlinx.coroutines.*
import kotlinx.coroutines.CancellationException
import java.util.concurrent.*
import java.util.function.*
import kotlin.coroutines.*
Expand Down Expand Up @@ -46,20 +47,20 @@ public fun <T> CoroutineScope.future(

private class CompletableFutureCoroutine<T>(
context: CoroutineContext,
private val completion: CompletableFuture<T>
private val future: CompletableFuture<T>
) : AbstractCoroutine<T>(context), BiConsumer<T?, Throwable?> {

override fun accept(value: T?, exception: Throwable?) {
cancel()
}

override fun onCompleted(value: T) {
completion.complete(value)
future.complete(value)
}

override fun onCompletedExceptionally(exception: Throwable) {
if (!completion.completeExceptionally(exception)) {
handleCoroutineException(parentContext, exception, this)
override fun handleJobException(exception: Throwable, handled: Boolean) {
if (!future.completeExceptionally(exception) && !handled) {
// prevents loss of exception that was not handled by parent & could not be set to CompletableFuture
handleCoroutineException(context, exception)
}
}
}
Expand All @@ -70,7 +71,11 @@ private class CompletableFutureCoroutine<T>(
*/
public fun <T> Deferred<T>.asCompletableFuture(): CompletableFuture<T> {
val future = CompletableFuture<T>()
future.whenComplete { _, exception -> cancel(exception) }
future.whenComplete { _, exception ->
cancel(exception?.let {
it as? CancellationException ?: CancellationException("CompletableFuture was completed exceptionally", it)
})
}
invokeOnCompletion {
try {
future.complete(getCompleted())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ fun main(args: Array<String>) {
log("g should not execute this line")
}
log("Started futures f && g... will not wait -- cancel them!!!")
job.cancel(CancellationException("I don't want it"))
job.cancel()
check(f.isCancelled)
check(g.isCancelled)
log("f result = ${Try<Unit> { f.get() }}")
Expand Down
47 changes: 39 additions & 8 deletions integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -368,29 +368,39 @@ class FutureTest : TestBase() {
}

@Test
fun testChildException() = runTest {
fun testStructuredException() = runTest(
expected = { it is TestException } // exception propagates to parent with structured concurrency
) {
val result = future<Int>(Dispatchers.Unconfined) {
throw TestException("FAIL")
}
result.checkFutureException<TestException>()
}

@Test
fun testChildException() = runTest(
expected = { it is TestException } // exception propagates to parent with structured concurrency
) {
val result = future(Dispatchers.Unconfined) {
// child crashes
launch { throw TestException("FAIL") }
42
}

result.checkFutureException<TestException>()
}

@Test
fun testExceptionAggregation() = runTest {
fun testExceptionAggregation() = runTest(
expected = { it is TestException } // exception propagates to parent with structured concurrency
) {
val result = future(Dispatchers.Unconfined) {
// child crashes
launch(start = CoroutineStart.ATOMIC) { throw TestException1("FAIL") }
launch(start = CoroutineStart.ATOMIC) { throw TestException2("FAIL") }
throw TestException()
}

expect(1)
result.checkFutureException<TestException>(TestException1::class, TestException2::class)
yield()
finish(2) // we are not cancelled
finish(1)
}

@Test
Expand All @@ -409,7 +419,9 @@ class FutureTest : TestBase() {
}

@Test
fun testExceptionOnExternalCompletion() = runTest(expected = {it is TestException}) {
fun testExceptionOnExternalCompletion() = runTest(
expected = { it is TestException } // exception propagates to parent with structured concurrency
) {
expect(1)
val result = future(Dispatchers.Unconfined) {
try {
Expand All @@ -419,7 +431,26 @@ class FutureTest : TestBase() {
throw TestException()
}
}
result.complete(Unit)
finish(3)
}

@Test
fun testUnhandledExceptionOnExternalCompletion() = runTest(
unhandled = listOf(
{ it -> it is TestException } // exception is unhandled because there is no parent
)
) {
expect(1)
// No parent here (NonCancellable), so nowhere to propagate exception
val result = future(NonCancellable + Dispatchers.Unconfined) {
try {
delay(Long.MAX_VALUE)
} finally {
expect(2)
throw TestException() // this exception cannot be handled
}
}
result.complete(Unit)
finish(3)
}
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/AbstractCoroutine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public abstract class AbstractCoroutine<in T>(
}

internal final override fun handleOnCompletionException(exception: Throwable) {
handleCoroutineException(parentContext, exception, this)
handleCoroutineException(context, exception)
}

internal override fun nameString(): String {
Expand Down
6 changes: 3 additions & 3 deletions kotlinx-coroutines-core/common/src/Builders.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ private open class DeferredCoroutine<T>(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> {
override val cancelsParent: Boolean get() = true
override fun getCompleted(): T = getCompletedInternal() as T
override suspend fun await(): T = awaitInternal() as T
override val onAwait: SelectClause1<T> get() = this
Expand Down Expand Up @@ -169,8 +168,9 @@ private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
override val cancelsParent: Boolean get() = true
override fun handleJobException(exception: Throwable) = handleExceptionViaHandler(parentContext, exception)
override fun handleJobException(exception: Throwable, handled: Boolean) {
if (!handled) handleCoroutineException(context, exception)
}
}

private class LazyStandaloneCoroutine(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ internal open class CancellableContinuationImpl<in T>(
try {
block()
} catch (ex: Throwable) {
// Handler should never fail, if it does -- it is an unhandled exception
handleCoroutineException(
context,
CompletionHandlerException("Exception in cancellation handler for $this", ex)
Expand Down
1 change: 0 additions & 1 deletion kotlinx-coroutines-core/common/src/CompletableDeferred.kt
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ private class CompletableDeferredImpl<T>(
parent: Job?
) : JobSupport(true), CompletableDeferred<T>, SelectClause1<T> {
init { initParentJobInternal(parent) }
override val cancelsParent: Boolean get() = true
override val onCancelComplete get() = true
override fun getCompleted(): T = getCompletedInternal() as T
override suspend fun await(): T = awaitInternal() as T
Expand Down
33 changes: 7 additions & 26 deletions kotlinx-coroutines-core/common/src/CoroutineExceptionHandler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,16 @@ import kotlin.coroutines.*
internal expect fun handleCoroutineExceptionImpl(context: CoroutineContext, exception: Throwable)

/**
* Helper function for coroutine builder implementations to handle uncaught exception in coroutines.
* Helper function for coroutine builder implementations to handle uncaught and unexpected exceptions in coroutines,
* that could not be otherwise handled in a normal way through structured concurrency, saving them to a future, and
* cannot be rethrown. This is a last resort handler to prevent lost exceptions.
*
* It tries to handle uncaught exception in the following way:
* If current exception is [CancellationException], it's ignored: [CancellationException] is a normal way to cancel
* coroutine.
*
* If there is a [Job] in the context and it's not a [caller], then [Job.cancel] is invoked.
* If invocation returned `true`, method terminates: now [Job] is responsible for handling an exception.
* Otherwise, If there is [CoroutineExceptionHandler] in the context, it is used. If it throws an exception during handling
* or is absent, all instances of [CoroutineExceptionHandler] found via [ServiceLoader] and [Thread.uncaughtExceptionHandler] are invoked
* If there is [CoroutineExceptionHandler] in the context, then it is used. If it throws an exception during handling
* or is absent, all instances of [CoroutineExceptionHandler] found via [ServiceLoader] and
* [Thread.uncaughtExceptionHandler] are invoked.
*/
@InternalCoroutinesApi
public fun handleCoroutineException(context: CoroutineContext, exception: Throwable, caller: Job? = null) {
// Ignore CancellationException (they are normal ways to terminate a coroutine)
if (exception is CancellationException) return // nothing to do
// Try propagate exception to parent
val job = context[Job]
@Suppress("DEPRECATION")
if (job !== null && job !== caller && job.cancel(exception)) return // handle by parent
// otherwise -- use exception handlers
handleExceptionViaHandler(context, exception)
}

/**
* @suppress This is an internal API and it is subject to change.
*/
@InternalCoroutinesApi
public fun handleExceptionViaHandler(context: CoroutineContext, exception: Throwable) {
public fun handleCoroutineException(context: CoroutineContext, exception: Throwable) {
// Invoke exception handler from the context if present
try {
context[CoroutineExceptionHandler]?.let {
Expand All @@ -47,7 +29,6 @@ public fun handleExceptionViaHandler(context: CoroutineContext, exception: Throw
handleCoroutineExceptionImpl(context, handlerException(exception, t))
return
}

// If handler is not present in the context or exception was thrown, fallback to the global handler
handleCoroutineExceptionImpl(context, exception)
}
Expand Down
10 changes: 6 additions & 4 deletions kotlinx-coroutines-core/common/src/CoroutineScope.kt
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,12 @@ public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
ContextScope(if (context[Job] != null) context else context + Job())

/**
* Cancels this scope, including its job and all its children.
* Cancels this scope, including its job and all its children with an optional cancellation [cause].
* A cause can be used to specify an error message or to provide other details on
* a cancellation reason for debugging purposes.
* Throws [IllegalStateException] if the scope does not have a job in it.
**/
public fun CoroutineScope.cancel() {
*/
public fun CoroutineScope.cancel(cause: CancellationException? = null) {
val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this")
job.cancel()
job.cancel(cause)
}
8 changes: 7 additions & 1 deletion kotlinx-coroutines-core/common/src/Exceptions.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ public expect class CompletionHandlerException(message: String, cause: Throwable

public expect open class CancellationException(message: String?) : IllegalStateException

@Suppress("FunctionName")
public expect fun CancellationException(message: String?, cause: Throwable?) : CancellationException

internal expect class JobCancellationException(
message: String,
cause: Throwable?,
Expand All @@ -22,4 +25,7 @@ internal expect class JobCancellationException(

internal expect class DispatchException(message: String, cause: Throwable) : RuntimeException

internal expect fun Throwable.addSuppressedThrowable(other: Throwable)
internal expect fun Throwable.addSuppressedThrowable(other: Throwable)

// For use in tests
internal expect val RECOVER_STACK_TRACES: Boolean
Loading