Skip to content

Commit f2604f6

Browse files
elizarovqwwdfsad
authored andcommitted
Tweaked order of notifications for better error handling & consistency
* JobSupport.onCompletionInternal(onCompleted/onCancelled) is now invoked before all the user-installed listeners with is consistent with how the onCancelling/invokeOnCancelling(onCancelling=true) works. * Now all the state processing (updating the future, reporting unhandled exception, etc) in onCompletionInternal happens before observers that .join/.wait the coroutine are resumed and even before the state is set to final (to avoid exception handing races) with the exception of fast-path successful completion of coroutine. * JobSupport.afterCompletionInternal is introduced. It is invoked after all use-installed listeners and that is where scoped coroutines resume the rest of the code. * Remove empty AbstractCoroutine.onCancellation, move docs to JobSupport * onCancellation renamed to onCancelling for consistency with invokeOnCompletion(onCancelling=true)
1 parent 6227c64 commit f2604f6

File tree

12 files changed

+95
-76
lines changed

12 files changed

+95
-76
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ public abstract class kotlinx/coroutines/AbstractCoroutine : kotlinx/coroutines/
55
public final fun getContext ()Lkotlin/coroutines/CoroutineContext;
66
public fun getCoroutineContext ()Lkotlin/coroutines/CoroutineContext;
77
public fun isActive ()Z
8-
protected fun onCancellation (Ljava/lang/Throwable;)V
98
protected fun onCancelled (Ljava/lang/Throwable;Z)V
109
protected fun onCompleted (Ljava/lang/Object;)V
10+
protected final fun onCompletionInternal (Ljava/lang/Object;)V
1111
protected fun onStart ()V
1212
public final fun resumeWith (Ljava/lang/Object;)V
1313
public final fun start (Lkotlinx/coroutines/CoroutineStart;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)V
@@ -368,6 +368,7 @@ public final class kotlinx/coroutines/JobKt {
368368

369369
public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlinx/coroutines/Job, kotlinx/coroutines/ParentJob, kotlinx/coroutines/selects/SelectClause0 {
370370
public fun <init> (Z)V
371+
protected fun afterCompletionInternal (Ljava/lang/Object;I)V
371372
public final fun attachChild (Lkotlinx/coroutines/ChildJob;)Lkotlinx/coroutines/ChildHandle;
372373
public synthetic fun cancel ()V
373374
public synthetic fun cancel (Ljava/lang/Throwable;)Z
@@ -396,7 +397,8 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin
396397
public final fun isCompletedExceptionally ()Z
397398
public final fun join (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
398399
public fun minusKey (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext;
399-
protected fun onCancellation (Ljava/lang/Throwable;)V
400+
protected fun onCancelling (Ljava/lang/Throwable;)V
401+
protected fun onCompletionInternal (Ljava/lang/Object;)V
400402
public final fun parentCancelled (Lkotlinx/coroutines/ParentJob;)V
401403
public fun plus (Lkotlin/coroutines/CoroutineContext;)Lkotlin/coroutines/CoroutineContext;
402404
public fun plus (Lkotlinx/coroutines/Job;)Lkotlinx/coroutines/Job;

kotlinx-coroutines-core/common/src/AbstractCoroutine.kt

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,8 @@ import kotlin.jvm.*
2020
*
2121
* The following methods are available for override:
2222
*
23-
* * [onStart] is invoked when coroutine is create in not active state and is [started][Job.start].
24-
* * [onCancellation] is invoked as soon as coroutine is _failing_, or is cancelled,
25-
* or when it completes for any reason.
23+
* * [onStart] is invoked when coroutine was created in not active state and is being [started][Job.start].
24+
* * [onCancelling] is invoked as soon as coroutine is being cancelled for any reason (or completes).
2625
* * [onCompleted] is invoked when coroutine completes with a value.
2726
* * [onCancelled] in invoked when coroutines completes with exception (cancelled).
2827
*
@@ -77,31 +76,26 @@ public abstract class AbstractCoroutine<in T>(
7776
}
7877

7978
/**
80-
* This function is invoked once when this coroutine is cancelled
81-
* similarly to [invokeOnCompletion] with `onCancelling` set to `true`.
82-
*
83-
* The meaning of [cause] parameter:
84-
* * Cause is `null` when job has completed normally.
85-
* * Cause is an instance of [CancellationException] when job was cancelled _normally_.
86-
* **It should not be treated as an error**. In particular, it should not be reported to error logs.
87-
* * Otherwise, the job had been cancelled or failed with exception.
88-
*/
89-
protected override fun onCancellation(cause: Throwable?) {}
90-
91-
/**
92-
* This function is invoked once when job was completed normally with the specified [value].
79+
* This function is invoked once when job was completed normally with the specified [value],
80+
* right before all the waiters for coroutine's completion are notified.
9381
*/
9482
protected open fun onCompleted(value: T) {}
9583

9684
/**
97-
* This function is invoked once when job was cancelled with the specified [cause].
85+
* This function is invoked once when job was cancelled with the specified [cause],
86+
* right before all the waiters for coroutine's completion are notified.
87+
*
88+
* **Note:** the state of the coroutine might not be final yet in this function and should not be queried.
89+
* You can use [completionCause] and [completionCauseHandled] to recover parameters that we passed
90+
* to this `onCancelled` invocation only when [isCompleted] returns `true`.
91+
*
9892
* @param cause The cancellation (failure) cause
9993
* @param handled `true` if the exception was handled by parent (always `true` when it is a [CancellationException])
10094
*/
10195
protected open fun onCancelled(cause: Throwable, handled: Boolean) {}
10296

10397
@Suppress("UNCHECKED_CAST")
104-
internal override fun onCompletionInternal(state: Any?, mode: Int) {
98+
protected final override fun onCompletionInternal(state: Any?) {
10599
if (state is CompletedExceptionally)
106100
onCancelled(state.cause, state.handled)
107101
else

kotlinx-coroutines-core/common/src/Builders.common.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -241,10 +241,10 @@ private class DispatchedCoroutine<in T>(
241241
}
242242
}
243243

244-
override fun onCompletionInternal(state: Any?, mode: Int) {
244+
override fun afterCompletionInternal(state: Any?, mode: Int) {
245245
if (tryResume()) return // completed before getResult invocation -- bail out
246246
// otherwise, getResult has already commenced, i.e. completed later or in other thread
247-
super.onCompletionInternal(state, mode)
247+
super.afterCompletionInternal(state, mode)
248248
}
249249

250250
fun getResult(): Any? {

kotlinx-coroutines-core/common/src/JobSupport.kt

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,9 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
9898
~ active coroutine is working (or scheduled to execution)
9999
>> childCancelled / cancelImpl invoked
100100
## CANCELLING: state is Finishing, state.rootCause != null
101-
------ cancelling listeners are not admitted anymore, invokeOnCompletion(onCancellation=true) returns NonDisposableHandle
101+
------ cancelling listeners are not admitted anymore, invokeOnCompletion(onCancelling=true) returns NonDisposableHandle
102102
------ new children get immediately cancelled, but are still admitted to the list
103-
+ onCancellation
103+
+ onCancelling
104104
+ notifyCancelling (invoke all cancelling listeners -- cancel all children, suspended functions resume with exception)
105105
+ cancelParent (rootCause of cancellation is communicated to the parent, parent is cancelled, too)
106106
~ waits for completion of coroutine body
@@ -203,7 +203,9 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
203203
require(state.isCompleting) // consistency check -- must be marked as completing
204204
val proposedException = (proposedUpdate as? CompletedExceptionally)?.cause
205205
// Create the final exception and seal the state so that no more exceptions can be added
206+
var wasCancelling = false // KLUDGE: we cannot have contract for our own expect fun synchronized
206207
val finalException = synchronized(state) {
208+
wasCancelling = state.isCancelling
207209
val exceptions = state.sealLocked(proposedException)
208210
val finalCause = getFinalRootCause(state, exceptions)
209211
if (finalCause != null) addSuppressedExceptions(finalCause, exceptions)
@@ -223,6 +225,10 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
223225
val handled = cancelParent(finalException) || handleJobException(finalException)
224226
if (handled) (finalState as CompletedExceptionally).makeHandled()
225227
}
228+
// Process state updates for the final state before the state of the Job is actually set to the final state
229+
// to avoid races where outside observer may see the job in the final state, yet exception is not handled yet.
230+
if (!wasCancelling) onCancelling(finalException)
231+
onCompletionInternal(finalState)
226232
// Then CAS to completed state -> it must succeed
227233
require(_state.compareAndSet(state, finalState.boxIncomplete())) { "Unexpected state: ${_state.value}, expected: $state, update: $finalState" }
228234
// And process all post-completion actions
@@ -257,6 +263,8 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
257263
check(state is Empty || state is JobNode<*>) // only simple state without lists where children can concurrently add
258264
check(update !is CompletedExceptionally) // only for normal completion
259265
if (!_state.compareAndSet(state, update.boxIncomplete())) return false
266+
onCancelling(null) // simple state is not a failure
267+
onCompletionInternal(update)
260268
completeStateFinalization(state, update, mode)
261269
return true
262270
}
@@ -275,14 +283,8 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
275283
}
276284
val cause = (update as? CompletedExceptionally)?.cause
277285
/*
278-
* 2) Invoke onCancellation: for resource cancellation resource cancellation etc.
279-
* Only notify is was not notified yet.
280-
* Note: we do not use notifyCancelling here, since we are going to invoke all completion as our next step
281-
*/
282-
if (!state.isCancelling) onCancellation(cause)
283-
/*
284-
* 3) Invoke completion handlers: .join(), callbacks etc.
285-
* It's important to invoke them only AFTER exception handling, see #208
286+
* 2) Invoke completion handlers: .join(), callbacks etc.
287+
* It's important to invoke them only AFTER exception handling and everything else, see #208
286288
*/
287289
if (state is JobNode<*>) { // SINGLE/SINGLE+ state -- one completion handler (common case)
288290
try {
@@ -294,16 +296,15 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
294296
state.list?.notifyCompletion(cause)
295297
}
296298
/*
297-
* 4) Invoke onCompletionInternal: onNext(), timeout de-registration etc.
298-
* It should be last so all callbacks observe consistent state
299-
* of the job which doesn't depend on callback scheduling.
299+
* 3) Resumes the rest of the code in scoped coroutines
300+
* (runBlocking, coroutineScope, withContext, withTimeout, etc)
300301
*/
301-
onCompletionInternal(update, mode)
302+
afterCompletionInternal(update, mode)
302303
}
303304

304305
private fun notifyCancelling(list: NodeList, cause: Throwable) {
305306
// first cancel our own children
306-
onCancellation(cause)
307+
onCancelling(cause)
307308
notifyHandlers<JobCancellingNode<*>>(list, cause)
308309
// then cancel parent
309310
cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
@@ -671,7 +672,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
671672
val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
672673
state.addExceptionLocked(causeException)
673674
}
674-
// take cause for notification is was not cancelling before
675+
// take cause for notification if was not in cancelling state before
675676
state.rootCause.takeIf { !wasCancelling }
676677
}
677678
notifyRootCause?.let { notifyCancelling(state.list, it) }
@@ -890,12 +891,21 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
890891
}
891892

892893
/**
893-
* This function is invoked once when job is being cancelled, fails, or is completed.
894-
* It's an optimization for [invokeOnCompletion] with `onCancellation` set to `true`.
894+
* This function is invoked once as soon as this job is being cancelled for any reason or completes,
895+
* similarly to [invokeOnCompletion] with `onCancelling` set to `true`.
896+
*
897+
* The meaning of [cause] parameter:
898+
* * Cause is `null` when job has completed normally.
899+
* * Cause is an instance of [CancellationException] when job was cancelled _normally_.
900+
* **It should not be treated as an error**. In particular, it should not be reported to error logs.
901+
* * Otherwise, the job had been cancelled or failed with exception.
902+
*
903+
* The specified [cause] is not the final cancellation cause of this job.
904+
* A job may produce other exceptions while it is failing and the final cause might be different.
895905
*
896906
* @suppress **This is unstable API and it is subject to change.*
897907
*/
898-
protected open fun onCancellation(cause: Throwable?) {}
908+
protected open fun onCancelling(cause: Throwable?) {}
899909

900910
/**
901911
* When this function returns `true` the parent is cancelled on cancellation of this job.
@@ -940,13 +950,24 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
940950
}
941951

942952
/**
943-
* Override for post-completion actions that need to do something with the state.
953+
* Override for completion actions that need to update some external object depending on job's state,
954+
* right before all the waiters for coroutine's completion are notified.
955+
*
956+
* @param state the final state.
957+
*
958+
* @suppress **This is unstable API and it is subject to change.**
959+
*/
960+
protected open fun onCompletionInternal(state: Any?) {}
961+
962+
/**
963+
* Override for the very last action on job's completion to resume the rest of the code in scoped coroutines.
964+
*
944965
* @param state the final state.
945966
* @param mode completion mode.
946967
*
947968
* @suppress **This is unstable API and it is subject to change.**
948969
*/
949-
internal open fun onCompletionInternal(state: Any?, mode: Int) {}
970+
protected open fun afterCompletionInternal(state: Any?, mode: Int) {}
950971

951972
// for nicer debugging
952973
public override fun toString(): String =

kotlinx-coroutines-core/common/src/Timeout.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ private open class TimeoutCoroutine<U, in T: U>(
9595
}
9696

9797
@Suppress("UNCHECKED_CAST")
98-
internal override fun onCompletionInternal(state: Any?, mode: Int) {
98+
override fun afterCompletionInternal(state: Any?, mode: Int) {
9999
if (state is CompletedExceptionally)
100100
uCont.resumeUninterceptedWithExceptionMode(state.cause, mode)
101101
else

kotlinx-coroutines-core/common/src/internal/Scopes.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ internal open class ScopeCoroutine<in T>(
2323
get() = false // it throws exception to parent instead of cancelling it
2424

2525
@Suppress("UNCHECKED_CAST")
26-
internal override fun onCompletionInternal(state: Any?, mode: Int) {
26+
override fun afterCompletionInternal(state: Any?, mode: Int) {
2727
if (state is CompletedExceptionally) {
2828
val exception = if (mode == MODE_IGNORE) state.cause else recoverStackTrace(state.cause, uCont)
2929
uCont.resumeUninterceptedWithExceptionMode(exception, mode)

kotlinx-coroutines-core/common/test/AbstractCoroutineTest.kt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@ class AbstractCoroutineTest : TestBase() {
1818
expect(3)
1919
}
2020

21-
override fun onCancellation(cause: Throwable?) {
21+
override fun onCancelling(cause: Throwable?) {
2222
assertEquals(null, cause)
2323
expect(5)
2424
}
2525

2626
override fun onCompleted(value: String) {
2727
assertEquals("OK", value)
28-
expect(8)
28+
expect(6)
2929
}
3030

3131
override fun onCancelled(cause: Throwable, handled: Boolean) {
@@ -35,12 +35,12 @@ class AbstractCoroutineTest : TestBase() {
3535

3636
coroutine.invokeOnCompletion(onCancelling = true) {
3737
assertEquals(null, it)
38-
expect(6)
38+
expect(7)
3939
}
4040

4141
coroutine.invokeOnCompletion {
4242
assertEquals(null, it)
43-
expect(7)
43+
expect(8)
4444
}
4545
expect(2)
4646
coroutine.start()
@@ -58,7 +58,7 @@ class AbstractCoroutineTest : TestBase() {
5858
expect(3)
5959
}
6060

61-
override fun onCancellation(cause: Throwable?) {
61+
override fun onCancelling(cause: Throwable?) {
6262
assertTrue(cause is TestException1)
6363
expect(5)
6464
}
@@ -69,7 +69,7 @@ class AbstractCoroutineTest : TestBase() {
6969

7070
override fun onCancelled(cause: Throwable, handled: Boolean) {
7171
assertTrue(cause is TestException1)
72-
expect(9)
72+
expect(8)
7373
}
7474
}
7575

@@ -80,7 +80,7 @@ class AbstractCoroutineTest : TestBase() {
8080

8181
coroutine.invokeOnCompletion {
8282
assertTrue(it is TestException1)
83-
expect(8)
83+
expect(9)
8484
}
8585

8686
expect(2)

kotlinx-coroutines-core/jvm/src/Builders.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ private class BlockingCoroutine<T>(
6161
override val cancelsParent: Boolean
6262
get() = false // it throws exception to parent instead of cancelling it
6363

64-
override fun onCompletionInternal(state: Any?, mode: Int) {
64+
override fun afterCompletionInternal(state: Any?, mode: Int) {
6565
// wake up blocked thread
6666
if (Thread.currentThread() != blockedThread)
6767
LockSupport.unpark(blockedThread)

kotlinx-coroutines-core/jvm/src/channels/Actor.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ private open class ActorCoroutine<E>(
129129
) : ChannelCoroutine<E>(parentContext, channel, active), ActorScope<E> {
130130
override val cancelsParent: Boolean get() = true
131131

132-
override fun onCancellation(cause: Throwable?) {
132+
override fun onCancelling(cause: Throwable?) {
133133
_channel.cancel(cause?.let {
134134
it as? CancellationException ?: CancellationException("$classSimpleName was cancelled", it)
135135
})

kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class SuppressionTests : TestBase() {
2121
expect(3)
2222
}
2323

24-
override fun onCancellation(cause: Throwable?) {
24+
override fun onCancelling(cause: Throwable?) {
2525
assertTrue(cause is ArithmeticException)
2626
assertTrue(cause.suppressed.isEmpty())
2727
expect(5)
@@ -34,7 +34,7 @@ class SuppressionTests : TestBase() {
3434
override fun onCancelled(cause: Throwable, handled: Boolean) {
3535
assertTrue(cause is ArithmeticException)
3636
checkException<IOException>(cause.suppressed[0])
37-
expect(9)
37+
expect(8)
3838
}
3939
}
4040

@@ -47,7 +47,7 @@ class SuppressionTests : TestBase() {
4747
coroutine.invokeOnCompletion {
4848
assertTrue(it is ArithmeticException)
4949
checkException<IOException>(it.suppressed[0])
50-
expect(8)
50+
expect(9)
5151
}
5252

5353
expect(2)

0 commit comments

Comments
 (0)