Skip to content

Commit 52bfdab

Browse files
committed
async and async-like builders cancel parent on failure
* Affects async, CompletableDeferred, and all Rx integration builders. * This makes all coroutine builders totally consistent. They all cancel parent on failure, but they all consider "CancellationException" to be the case of "normal cancellation" that does not propagate to parent. The only missing case is Job() that should be fixed together with introduction of SupervisorJob(). * Note that "scoping" builders don't "cancel the parent", but rethrow the corresponding exception instead, so it that is how it gets propagated up the stack. * This makes parallel decomposition exception-safe. You cannot loose an exception as along as default (child async) behavior is not overridden. Fixes #552
1 parent ffda1da commit 52bfdab

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+507
-153
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+2-1
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,14 @@ public final class kotlinx/coroutines/experimental/CancellableContinuation$Defau
7171
public static synthetic fun tryResume$default (Lkotlinx/coroutines/experimental/CancellableContinuation;Ljava/lang/Object;Ljava/lang/Object;ILjava/lang/Object;)Ljava/lang/Object;
7272
}
7373

74-
public final class kotlinx/coroutines/experimental/CancellableContinuationImpl : java/lang/Runnable, kotlinx/coroutines/experimental/CancellableContinuation {
74+
public class kotlinx/coroutines/experimental/CancellableContinuationImpl : java/lang/Runnable, kotlinx/coroutines/experimental/CancellableContinuation {
7575
public fun <init> (Lkotlin/coroutines/experimental/Continuation;I)V
7676
public fun completeResume (Ljava/lang/Object;)V
7777
public fun getContext ()Lkotlin/coroutines/experimental/CoroutineContext;
7878
public fun getSuccessfulResult (Ljava/lang/Object;)Ljava/lang/Object;
7979
public fun initCancellability ()V
8080
public fun invokeOnCompletion (ZZLkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/experimental/DisposableHandle;
81+
protected fun nameString ()Ljava/lang/String;
8182
public fun resumeUndispatched (Lkotlinx/coroutines/experimental/CoroutineDispatcher;Ljava/lang/Object;)V
8283
public fun resumeUndispatchedWithException (Lkotlinx/coroutines/experimental/CoroutineDispatcher;Ljava/lang/Throwable;)V
8384
public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;

common/kotlinx-coroutines-core-common/src/AbstractContinuation.kt

+15-5
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,23 @@ internal abstract class AbstractContinuation<in T>(
9191

9292
override fun takeState(): Any? = state
9393

94-
public fun cancel(cause: Throwable?): Boolean {
94+
public fun cancel(cause: Throwable?): Boolean =
95+
cancelImpl(cause)
96+
97+
fun cancelImpl(cause: Throwable?): Boolean {
9598
loopOnState { state ->
9699
if (state !is NotCompleted) return false // quit if already complete
97-
if (tryCancel(state, cause)) return true
100+
val update = CancelledContinuation(this, cause)
101+
if (updateStateToFinal(state, update, mode = MODE_ATOMIC_DEFAULT)) return true
98102
}
99103
}
100104

105+
/**
106+
* It is used when parent is cancelled to get the cancellation cause for this continuation.
107+
*/
108+
open fun getParentCancellationCause(parent: Job): Throwable =
109+
parent.getCancellationException()
110+
101111
private fun trySuspend(): Boolean {
102112
_decision.loop { decision ->
103113
when (decision) {
@@ -133,6 +143,9 @@ internal abstract class AbstractContinuation<in T>(
133143
override fun resumeWithException(exception: Throwable) =
134144
resumeImpl(CompletedExceptionally(exception), resumeMode)
135145

146+
internal fun resumeWithExceptionMode(exception: Throwable, mode: Int) =
147+
resumeImpl(CompletedExceptionally(exception), mode)
148+
136149
public fun invokeOnCancellation(handler: CompletionHandler) {
137150
var handleCache: CancelHandler? = null
138151
loopOnState { state ->
@@ -166,9 +179,6 @@ internal abstract class AbstractContinuation<in T>(
166179
private fun makeHandler(handler: CompletionHandler): CancelHandler =
167180
if (handler is CancelHandler) handler else InvokeOnCancel(handler)
168181

169-
private fun tryCancel(state: NotCompleted, cause: Throwable?): Boolean =
170-
updateStateToFinal(state, CancelledContinuation(this, cause), mode = MODE_ATOMIC_DEFAULT)
171-
172182
private fun dispatchResume(mode: Int) {
173183
if (tryResume()) return // completed before getResult invocation -- bail out
174184
// otherwise, getResult has already commenced, i.e. completed later or in other thread

common/kotlinx-coroutines-core-common/src/Builders.common.kt

+1
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ private open class DeferredCoroutine<T>(
157157
parentContext: CoroutineContext,
158158
active: Boolean
159159
) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> {
160+
override val cancelsParent: Boolean get() = true
160161
override fun getCompleted(): T = getCompletedInternal() as T
161162
override suspend fun await(): T = awaitInternal() as T
162163
override val onAwait: SelectClause1<T> get() = this

common/kotlinx-coroutines-core-common/src/CancellableContinuation.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ private class DisposeOnCancel(private val handle: DisposableHandle) : CancelHand
278278
}
279279

280280
@PublishedApi
281-
internal class CancellableContinuationImpl<in T>(
281+
internal open class CancellableContinuationImpl<in T>(
282282
delegate: Continuation<T>,
283283
resumeMode: Int
284284
) : AbstractContinuation<T>(delegate, resumeMode), CancellableContinuation<T>, Runnable {
@@ -317,7 +317,7 @@ internal class CancellableContinuationImpl<in T>(
317317

318318
override fun tryResumeWithException(exception: Throwable): Any? {
319319
loopOnState { state ->
320-
when (state) {
320+
when (state) {
321321
is NotCompleted -> {
322322
if (tryUpdateStateToFinal(state, CompletedExceptionally(exception))) return state
323323
}

common/kotlinx-coroutines-core-common/src/CompletableDeferred.kt

+1
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ private class CompletableDeferredImpl<T>(
6363
parent: Job?
6464
) : JobSupport(true), CompletableDeferred<T>, SelectClause1<T> {
6565
init { initParentJobInternal(parent) }
66+
override val cancelsParent: Boolean get() = true
6667
override val onCancelComplete get() = true
6768
override fun getCompleted(): T = getCompletedInternal() as T
6869
override suspend fun await(): T = awaitInternal() as T

common/kotlinx-coroutines-core-common/src/JobSupport.kt

+50-11
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import kotlinx.coroutines.experimental.internal.*
99
import kotlinx.coroutines.experimental.intrinsics.*
1010
import kotlinx.coroutines.experimental.selects.*
1111
import kotlin.coroutines.experimental.*
12+
import kotlin.coroutines.experimental.intrinsics.*
1213

1314
/**
1415
* A concrete implementation of [Job]. It is optionally a child to a parent job.
@@ -1055,6 +1056,25 @@ internal open class JobSupport constructor(active: Boolean) : Job, ChildJob, Sel
10551056
"ChildCompletion[$child, $proposedUpdate]"
10561057
}
10571058

1059+
private class AwaitContinuation<T>(
1060+
delegate: Continuation<T>,
1061+
private val job: JobSupport
1062+
) : CancellableContinuationImpl<T>(delegate, MODE_CANCELLABLE) {
1063+
override fun getParentCancellationCause(parent: Job): Throwable {
1064+
val state = job.state
1065+
/*
1066+
* When the job we are waiting for had already completely completed exceptionally or
1067+
* is failing, we shall use its root/completion cause for await's result.
1068+
*/
1069+
if (state is Finishing) state.rootCause?.let { return it }
1070+
if (state is CompletedExceptionally) return state.cause
1071+
return parent.getCancellationException()
1072+
}
1073+
1074+
protected override fun nameString(): String =
1075+
"AwaitContinuation(${delegate.toDebugString()})"
1076+
}
1077+
10581078
/*
10591079
* =================================================================================================
10601080
* This is ready-to-use implementation for Deferred interface.
@@ -1099,16 +1119,16 @@ internal open class JobSupport constructor(active: Boolean) : Job, ChildJob, Sel
10991119
return awaitSuspend() // slow-path
11001120
}
11011121

1102-
private suspend fun awaitSuspend(): Any? = suspendCancellableCoroutine { cont ->
1103-
// We have to invoke await() handler only on cancellation, on completion we will be resumed regularly without handlers
1104-
cont.disposeOnCancellation(invokeOnCompletion {
1105-
val state = this.state
1106-
check(state !is Incomplete)
1107-
if (state is CompletedExceptionally)
1108-
cont.resumeWithException(state.cause)
1109-
else
1110-
cont.resume(state)
1111-
})
1122+
private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->
1123+
/*
1124+
* Custom code here, so that parent coroutine that is using await
1125+
* on its child deferred (async) coroutine would throw the exception that this child had
1126+
* thrown and not a JobCancellationException.
1127+
*/
1128+
val cont = AwaitContinuation(uCont.intercepted(), this)
1129+
cont.initCancellability()
1130+
invokeOnCompletion(ResumeAwaitOnCompletion(this, cont).asHandler)
1131+
cont.getResult()
11121132
}
11131133

11141134
/**
@@ -1236,6 +1256,25 @@ private class ResumeOnCompletion(
12361256
override fun toString() = "ResumeOnCompletion[$continuation]"
12371257
}
12381258

1259+
private class ResumeAwaitOnCompletion<T>(
1260+
job: JobSupport,
1261+
private val continuation: AbstractContinuation<T>
1262+
) : JobNode<JobSupport>(job) {
1263+
override fun invoke(cause: Throwable?) {
1264+
val state = job.state
1265+
check(state !is Incomplete)
1266+
if (state is CompletedExceptionally) {
1267+
// Resume with exception in atomic way to preserve exception
1268+
continuation.resumeWithExceptionMode(state.cause, MODE_ATOMIC_DEFAULT)
1269+
} else {
1270+
// Resuming with value in a cancellable way (AwaitContinuation is configured for this mode).
1271+
@Suppress("UNCHECKED_CAST")
1272+
continuation.resume(state as T)
1273+
}
1274+
}
1275+
override fun toString() = "ResumeAwaitOnCompletion[$continuation]"
1276+
}
1277+
12391278
internal class DisposeOnCompletion(
12401279
job: Job,
12411280
private val handle: DisposableHandle
@@ -1303,7 +1342,7 @@ internal class ChildContinuation(
13031342
@JvmField val child: AbstractContinuation<*>
13041343
) : JobCancellingNode<Job>(parent) {
13051344
override fun invoke(cause: Throwable?) {
1306-
child.cancel(job.getCancellationException())
1345+
child.cancelImpl(child.getParentCancellationCause(job))
13071346
}
13081347
override fun toString(): String =
13091348
"ChildContinuation[$child]"

common/kotlinx-coroutines-core-common/src/channels/Broadcast.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ private open class BroadcastCoroutine<E>(
105105
protected val _channel: BroadcastChannel<E>,
106106
active: Boolean
107107
) : AbstractCoroutine<Unit>(parentContext, active), ProducerScope<E>, BroadcastChannel<E> by _channel {
108-
108+
override val cancelsParent: Boolean get() = true
109109
override val isActive: Boolean get() = super<AbstractCoroutine>.isActive
110110

111111
override val channel: SendChannel<E>

common/kotlinx-coroutines-core-common/src/channels/ChannelCoroutine.kt

+2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ internal open class ChannelCoroutine<E>(
1212
protected val _channel: Channel<E>,
1313
active: Boolean
1414
) : AbstractCoroutine<Unit>(parentContext, active), Channel<E> by _channel {
15+
override val cancelsParent: Boolean get() = true
16+
1517
val channel: Channel<E> get() = this
1618

1719
override fun cancel() = cancel(null)

common/kotlinx-coroutines-core-common/test/AbstractCoroutineTest.kt

+4-5
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ package kotlinx.coroutines.experimental
77
import kotlin.test.*
88

99
class AbstractCoroutineTest : TestBase() {
10-
1110
@Test
1211
fun testNotifications() = runTest {
1312
expect(1)
@@ -18,7 +17,7 @@ class AbstractCoroutineTest : TestBase() {
1817
}
1918

2019
override fun onCancellation(cause: Throwable?) {
21-
assertTrue(cause == null)
20+
assertEquals(null, cause)
2221
expect(5)
2322
}
2423

@@ -33,12 +32,12 @@ class AbstractCoroutineTest : TestBase() {
3332
}
3433

3534
coroutine.invokeOnCompletion(onCancelling = true) {
36-
assertTrue(it == null)
35+
assertEquals(null, it)
3736
expect(6)
3837
}
3938

4039
coroutine.invokeOnCompletion {
41-
assertTrue(it == null)
40+
assertEquals(null, it)
4241
expect(7)
4342
}
4443
expect(2)
@@ -52,7 +51,7 @@ class AbstractCoroutineTest : TestBase() {
5251
fun testNotificationsWithException() = runTest {
5352
expect(1)
5453
val coroutineContext = coroutineContext // workaround for KT-22984
55-
val coroutine = object : AbstractCoroutine<String>(coroutineContext, false) {
54+
val coroutine = object : AbstractCoroutine<String>(coroutineContext + NonCancellable, false) {
5655
override fun onStart() {
5756
expect(3)
5857
}

common/kotlinx-coroutines-core-common/test/AsyncLazyTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ class AsyncLazyTest : TestBase() {
104104
@Test
105105
fun testCatchException() = runTest {
106106
expect(1)
107-
val d = async(start = CoroutineStart.LAZY) {
107+
val d = async(NonCancellable, start = CoroutineStart.LAZY) {
108108
expect(3)
109109
throw TestException()
110110
}

common/kotlinx-coroutines-core-common/test/AsyncTest.kt

+30-28
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,15 @@ class AsyncTest : TestBase() {
5151
}
5252

5353
@Test
54-
fun testCancellationWithCause() = runTest(expected = { it is AssertionError }) {
54+
fun testCancellationWithCause() = runTest(expected = { it is TestException }) {
5555
expect(1)
56-
val d = async(start = CoroutineStart.ATOMIC) {
56+
val d = async(NonCancellable, start = CoroutineStart.ATOMIC) {
5757
finish(3)
5858
yield()
5959
}
6060

6161
expect(2)
62-
d.cancel(AssertionError())
62+
d.cancel(TestException())
6363
d.await()
6464
}
6565

@@ -78,46 +78,50 @@ class AsyncTest : TestBase() {
7878

7979
@Test
8080
fun testParallelDecompositionCaughtException() = runTest {
81-
val deferred = async(Job()) {
82-
val decomposed = async {
83-
throw AssertionError()
81+
val deferred = async(NonCancellable) {
82+
val decomposed = async(NonCancellable) {
83+
throw TestException()
8484
1
8585
}
86-
8786
try {
8887
decomposed.await()
89-
} catch (e: AssertionError) {
88+
} catch (e: TestException) {
9089
42
9190
}
9291
}
93-
9492
assertEquals(42, deferred.await())
9593
}
9694

97-
9895
@Test
9996
fun testParallelDecompositionCaughtExceptionWithInheritedParent() = runTest {
100-
val deferred = async {
101-
val decomposed = async {
102-
throw AssertionError()
97+
expect(1)
98+
val deferred = async(NonCancellable) {
99+
expect(2)
100+
val decomposed = async { // inherits parent job!
101+
expect(3)
102+
throw TestException()
103103
1
104104
}
105-
106105
try {
107106
decomposed.await()
108-
} catch (e: AssertionError) {
107+
} catch (e: TestException) {
108+
expect(4) // Should catch this exception, but parent is already cancelled
109109
42
110110
}
111111
}
112-
113-
assertEquals(42, deferred.await())
112+
try {
113+
// This will fail
114+
assertEquals(42, deferred.await())
115+
} catch (e: TestException) {
116+
finish(5)
117+
}
114118
}
115119

116120
@Test
117-
fun testParallelDecompositionUncaughtExceptionWithInheritedParent() = runTest(expected = { it is AssertionError }) {
118-
val deferred = async {
121+
fun testParallelDecompositionUncaughtExceptionWithInheritedParent() = runTest(expected = { it is TestException }) {
122+
val deferred = async(NonCancellable) {
119123
val decomposed = async {
120-
throw AssertionError()
124+
throw TestException()
121125
1
122126
}
123127

@@ -129,10 +133,10 @@ class AsyncTest : TestBase() {
129133
}
130134

131135
@Test
132-
fun testParallelDecompositionUncaughtException() = runTest(expected = { it is AssertionError }) {
133-
val deferred = async(Job()) {
136+
fun testParallelDecompositionUncaughtException() = runTest(expected = { it is TestException }) {
137+
val deferred = async(NonCancellable) {
134138
val decomposed = async {
135-
throw AssertionError()
139+
throw TestException()
136140
1
137141
}
138142

@@ -145,17 +149,15 @@ class AsyncTest : TestBase() {
145149

146150
@Test
147151
fun testCancellationTransparency() = runTest {
148-
val deferred = async(kotlin.coroutines.experimental.coroutineContext, CoroutineStart.ATOMIC) {
152+
val deferred = async(NonCancellable, start = CoroutineStart.ATOMIC) {
149153
expect(2)
150154
throw TestException()
151155
}
152-
153156
expect(1)
154-
deferred.cancel(UnsupportedOperationException())
155-
157+
deferred.cancel(TestException())
156158
try {
157159
deferred.await()
158-
} catch (e: UnsupportedOperationException) {
160+
} catch (e: TestException) {
159161
finish(3)
160162
}
161163
}

0 commit comments

Comments
 (0)