Skip to content

async and async-like builders cancel parent on failure #630

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,14 @@ public final class kotlinx/coroutines/experimental/CancellableContinuation$Defau
public static synthetic fun tryResume$default (Lkotlinx/coroutines/experimental/CancellableContinuation;Ljava/lang/Object;Ljava/lang/Object;ILjava/lang/Object;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/experimental/CancellableContinuationImpl : java/lang/Runnable, kotlinx/coroutines/experimental/CancellableContinuation {
public class kotlinx/coroutines/experimental/CancellableContinuationImpl : java/lang/Runnable, kotlinx/coroutines/experimental/CancellableContinuation {
public fun <init> (Lkotlin/coroutines/experimental/Continuation;I)V
public fun completeResume (Ljava/lang/Object;)V
public fun getContext ()Lkotlin/coroutines/experimental/CoroutineContext;
public fun getSuccessfulResult (Ljava/lang/Object;)Ljava/lang/Object;
public fun initCancellability ()V
public fun invokeOnCompletion (ZZLkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/experimental/DisposableHandle;
protected fun nameString ()Ljava/lang/String;
public fun resumeUndispatched (Lkotlinx/coroutines/experimental/CoroutineDispatcher;Ljava/lang/Object;)V
public fun resumeUndispatchedWithException (Lkotlinx/coroutines/experimental/CoroutineDispatcher;Ljava/lang/Throwable;)V
public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
Expand Down
20 changes: 15 additions & 5 deletions common/kotlinx-coroutines-core-common/src/AbstractContinuation.kt
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,23 @@ internal abstract class AbstractContinuation<in T>(

override fun takeState(): Any? = state

public fun cancel(cause: Throwable?): Boolean {
public fun cancel(cause: Throwable?): Boolean =
cancelImpl(cause)

fun cancelImpl(cause: Throwable?): Boolean {
loopOnState { state ->
if (state !is NotCompleted) return false // quit if already complete
if (tryCancel(state, cause)) return true
val update = CancelledContinuation(this, cause)
if (updateStateToFinal(state, update, mode = MODE_ATOMIC_DEFAULT)) return true
}
}

/**
* It is used when parent is cancelled to get the cancellation cause for this continuation.
*/
open fun getParentCancellationCause(parent: Job): Throwable =
parent.getCancellationException()

private fun trySuspend(): Boolean {
_decision.loop { decision ->
when (decision) {
Expand Down Expand Up @@ -133,6 +143,9 @@ internal abstract class AbstractContinuation<in T>(
override fun resumeWithException(exception: Throwable) =
resumeImpl(CompletedExceptionally(exception), resumeMode)

internal fun resumeWithExceptionMode(exception: Throwable, mode: Int) =
resumeImpl(CompletedExceptionally(exception), mode)

public fun invokeOnCancellation(handler: CompletionHandler) {
var handleCache: CancelHandler? = null
loopOnState { state ->
Expand Down Expand Up @@ -166,9 +179,6 @@ internal abstract class AbstractContinuation<in T>(
private fun makeHandler(handler: CompletionHandler): CancelHandler =
if (handler is CancelHandler) handler else InvokeOnCancel(handler)

private fun tryCancel(state: NotCompleted, cause: Throwable?): Boolean =
updateStateToFinal(state, CancelledContinuation(this, cause), mode = MODE_ATOMIC_DEFAULT)

private fun dispatchResume(mode: Int) {
if (tryResume()) return // completed before getResult invocation -- bail out
// otherwise, getResult has already commenced, i.e. completed later or in other thread
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ private open class DeferredCoroutine<T>(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> {
override val cancelsParent: Boolean get() = true
override fun getCompleted(): T = getCompletedInternal() as T
override suspend fun await(): T = awaitInternal() as T
override val onAwait: SelectClause1<T> get() = this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ private class DisposeOnCancel(private val handle: DisposableHandle) : CancelHand
}

@PublishedApi
internal class CancellableContinuationImpl<in T>(
internal open class CancellableContinuationImpl<in T>(
delegate: Continuation<T>,
resumeMode: Int
) : AbstractContinuation<T>(delegate, resumeMode), CancellableContinuation<T>, Runnable {
Expand Down Expand Up @@ -317,7 +317,7 @@ internal class CancellableContinuationImpl<in T>(

override fun tryResumeWithException(exception: Throwable): Any? {
loopOnState { state ->
when (state) {
when (state) {
is NotCompleted -> {
if (tryUpdateStateToFinal(state, CompletedExceptionally(exception))) return state
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ private class CompletableDeferredImpl<T>(
parent: Job?
) : JobSupport(true), CompletableDeferred<T>, SelectClause1<T> {
init { initParentJobInternal(parent) }
override val cancelsParent: Boolean get() = true
override val onCancelComplete get() = true
override fun getCompleted(): T = getCompletedInternal() as T
override suspend fun await(): T = awaitInternal() as T
Expand Down
61 changes: 50 additions & 11 deletions common/kotlinx-coroutines-core-common/src/JobSupport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import kotlinx.coroutines.experimental.internal.*
import kotlinx.coroutines.experimental.intrinsics.*
import kotlinx.coroutines.experimental.selects.*
import kotlin.coroutines.experimental.*
import kotlin.coroutines.experimental.intrinsics.*

/**
* A concrete implementation of [Job]. It is optionally a child to a parent job.
Expand Down Expand Up @@ -1051,6 +1052,25 @@ internal open class JobSupport constructor(active: Boolean) : Job, ChildJob, Sel
"ChildCompletion[$child, $proposedUpdate]"
}

private class AwaitContinuation<T>(
delegate: Continuation<T>,
private val job: JobSupport
) : CancellableContinuationImpl<T>(delegate, MODE_CANCELLABLE) {
override fun getParentCancellationCause(parent: Job): Throwable {
val state = job.state
/*
* When the job we are waiting for had already completely completed exceptionally or
* is failing, we shall use its root/completion cause for await's result.
*/
if (state is Finishing) state.rootCause?.let { return it }
if (state is CompletedExceptionally) return state.cause
return parent.getCancellationException()
}

protected override fun nameString(): String =
"AwaitContinuation(${delegate.toDebugString()})"
}

/*
* =================================================================================================
* This is ready-to-use implementation for Deferred interface.
Expand Down Expand Up @@ -1095,16 +1115,16 @@ internal open class JobSupport constructor(active: Boolean) : Job, ChildJob, Sel
return awaitSuspend() // slow-path
}

private suspend fun awaitSuspend(): Any? = suspendCancellableCoroutine { cont ->
// We have to invoke await() handler only on cancellation, on completion we will be resumed regularly without handlers
cont.disposeOnCancellation(invokeOnCompletion {
val state = this.state
check(state !is Incomplete)
if (state is CompletedExceptionally)
cont.resumeWithException(state.cause)
else
cont.resume(state)
})
private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->
/*
* Custom code here, so that parent coroutine that is using await
* on its child deferred (async) coroutine would throw the exception that this child had
* thrown and not a JobCancellationException.
*/
val cont = AwaitContinuation(uCont.intercepted(), this)
cont.initCancellability()
invokeOnCompletion(ResumeAwaitOnCompletion(this, cont).asHandler)
cont.getResult()
}

/**
Expand Down Expand Up @@ -1232,6 +1252,25 @@ private class ResumeOnCompletion(
override fun toString() = "ResumeOnCompletion[$continuation]"
}

private class ResumeAwaitOnCompletion<T>(
job: JobSupport,
private val continuation: AbstractContinuation<T>
) : JobNode<JobSupport>(job) {
override fun invoke(cause: Throwable?) {
val state = job.state
check(state !is Incomplete)
if (state is CompletedExceptionally) {
// Resume with exception in atomic way to preserve exception
continuation.resumeWithExceptionMode(state.cause, MODE_ATOMIC_DEFAULT)
} else {
// Resuming with value in a cancellable way (AwaitContinuation is configured for this mode).
@Suppress("UNCHECKED_CAST")
continuation.resume(state as T)
}
}
override fun toString() = "ResumeAwaitOnCompletion[$continuation]"
}

internal class DisposeOnCompletion(
job: Job,
private val handle: DisposableHandle
Expand Down Expand Up @@ -1299,7 +1338,7 @@ internal class ChildContinuation(
@JvmField val child: AbstractContinuation<*>
) : JobCancellingNode<Job>(parent) {
override fun invoke(cause: Throwable?) {
child.cancel(job.getCancellationException())
child.cancelImpl(child.getParentCancellationCause(job))
}
override fun toString(): String =
"ChildContinuation[$child]"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private open class BroadcastCoroutine<E>(
protected val _channel: BroadcastChannel<E>,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active), ProducerScope<E>, BroadcastChannel<E> by _channel {

override val cancelsParent: Boolean get() = true
override val isActive: Boolean get() = super<AbstractCoroutine>.isActive

override val channel: SendChannel<E>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ internal open class ChannelCoroutine<E>(
protected val _channel: Channel<E>,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active), Channel<E> by _channel {
override val cancelsParent: Boolean get() = true

val channel: Channel<E> get() = this

override fun cancel() = cancel(null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package kotlinx.coroutines.experimental
import kotlin.test.*

class AbstractCoroutineTest : TestBase() {

@Test
fun testNotifications() = runTest {
expect(1)
Expand All @@ -18,7 +17,7 @@ class AbstractCoroutineTest : TestBase() {
}

override fun onCancellation(cause: Throwable?) {
assertTrue(cause == null)
assertEquals(null, cause)
expect(5)
}

Expand All @@ -33,12 +32,12 @@ class AbstractCoroutineTest : TestBase() {
}

coroutine.invokeOnCompletion(onCancelling = true) {
assertTrue(it == null)
assertEquals(null, it)
expect(6)
}

coroutine.invokeOnCompletion {
assertTrue(it == null)
assertEquals(null, it)
expect(7)
}
expect(2)
Expand All @@ -52,7 +51,7 @@ class AbstractCoroutineTest : TestBase() {
fun testNotificationsWithException() = runTest {
expect(1)
val coroutineContext = coroutineContext // workaround for KT-22984
val coroutine = object : AbstractCoroutine<String>(coroutineContext, false) {
val coroutine = object : AbstractCoroutine<String>(coroutineContext + NonCancellable, false) {
override fun onStart() {
expect(3)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class AsyncLazyTest : TestBase() {
@Test
fun testCatchException() = runTest {
expect(1)
val d = async(start = CoroutineStart.LAZY) {
val d = async(NonCancellable, start = CoroutineStart.LAZY) {
expect(3)
throw TestException()
}
Expand Down
58 changes: 30 additions & 28 deletions common/kotlinx-coroutines-core-common/test/AsyncTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ class AsyncTest : TestBase() {
}

@Test
fun testCancellationWithCause() = runTest(expected = { it is AssertionError }) {
fun testCancellationWithCause() = runTest(expected = { it is TestException }) {
expect(1)
val d = async(start = CoroutineStart.ATOMIC) {
val d = async(NonCancellable, start = CoroutineStart.ATOMIC) {
finish(3)
yield()
}

expect(2)
d.cancel(AssertionError())
d.cancel(TestException())
d.await()
}

Expand All @@ -78,46 +78,50 @@ class AsyncTest : TestBase() {

@Test
fun testParallelDecompositionCaughtException() = runTest {
val deferred = async(Job()) {
val decomposed = async {
throw AssertionError()
val deferred = async(NonCancellable) {
val decomposed = async(NonCancellable) {
throw TestException()
1
}

try {
decomposed.await()
} catch (e: AssertionError) {
} catch (e: TestException) {
42
}
}

assertEquals(42, deferred.await())
}


@Test
fun testParallelDecompositionCaughtExceptionWithInheritedParent() = runTest {
val deferred = async {
val decomposed = async {
throw AssertionError()
expect(1)
val deferred = async(NonCancellable) {
expect(2)
val decomposed = async { // inherits parent job!
expect(3)
throw TestException()
1
}

try {
decomposed.await()
} catch (e: AssertionError) {
} catch (e: TestException) {
expect(4) // Should catch this exception, but parent is already cancelled
42
}
}

assertEquals(42, deferred.await())
try {
// This will fail
assertEquals(42, deferred.await())
} catch (e: TestException) {
finish(5)
}
}

@Test
fun testParallelDecompositionUncaughtExceptionWithInheritedParent() = runTest(expected = { it is AssertionError }) {
val deferred = async {
fun testParallelDecompositionUncaughtExceptionWithInheritedParent() = runTest(expected = { it is TestException }) {
val deferred = async(NonCancellable) {
val decomposed = async {
throw AssertionError()
throw TestException()
1
}

Expand All @@ -129,10 +133,10 @@ class AsyncTest : TestBase() {
}

@Test
fun testParallelDecompositionUncaughtException() = runTest(expected = { it is AssertionError }) {
val deferred = async(Job()) {
fun testParallelDecompositionUncaughtException() = runTest(expected = { it is TestException }) {
val deferred = async(NonCancellable) {
val decomposed = async {
throw AssertionError()
throw TestException()
1
}

Expand All @@ -145,17 +149,15 @@ class AsyncTest : TestBase() {

@Test
fun testCancellationTransparency() = runTest {
val deferred = async(kotlin.coroutines.experimental.coroutineContext, CoroutineStart.ATOMIC) {
val deferred = async(NonCancellable, start = CoroutineStart.ATOMIC) {
expect(2)
throw TestException()
}

expect(1)
deferred.cancel(UnsupportedOperationException())

deferred.cancel(TestException())
try {
deferred.await()
} catch (e: UnsupportedOperationException) {
} catch (e: TestException) {
finish(3)
}
}
Expand Down
Loading