Skip to content

Commit 6227c64

Browse files
elizarovqwwdfsad
authored andcommitted
Improved coroutine exception handling logic
* JobSupport.handleJobException (which is called before the coroutine goes to the final state) is now called only when parent did not handle exception and is used only in launch-like coroutines (launch and actor) to report uncaught exception. * The final result of other types coroutines that, by default, have an object "to store exceptions" is consistently processed after all other notifications when the coroutine state is already complete and is reported as uncaught exception as a last-resort processing tactic when it cannot be otherwise handled. * The above change makes the order of notifications for future { ... } and other async-like coroutine builders consistent with the order in which async { ... }.await() is notified. * The "handled" state of exception (whether it was processed or not) is now stored in CompletedExceptionally marker class (pulled this field up from CancelledContinuation). * AbstractCoroutine.onCompletedExceptionally is renamed to onCancelled and includes "handled" flag. * protected val JobSupport.completionCauseHandled is introduced to access "handled" flag, which simplified reactive streams integration. * "suppressed" flag is dropped from JobSupport.onCompletionInternal. * Fixed exception processing in reactive integrations -- exception that is thrown from a cancelled (disposed) coroutine is not lost anymore.
1 parent 9a81342 commit 6227c64

File tree

27 files changed

+360
-129
lines changed

27 files changed

+360
-129
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ public abstract class kotlinx/coroutines/AbstractCoroutine : kotlinx/coroutines/
66
public fun getCoroutineContext ()Lkotlin/coroutines/CoroutineContext;
77
public fun isActive ()Z
88
protected fun onCancellation (Ljava/lang/Throwable;)V
9+
protected fun onCancelled (Ljava/lang/Throwable;Z)V
910
protected fun onCompleted (Ljava/lang/Object;)V
10-
protected fun onCompletedExceptionally (Ljava/lang/Throwable;)V
1111
protected fun onStart ()V
1212
public final fun resumeWith (Ljava/lang/Object;)V
1313
public final fun start (Lkotlinx/coroutines/CoroutineStart;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)V
@@ -382,11 +382,12 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin
382382
public fun getChildJobCancellationCause ()Ljava/lang/Throwable;
383383
public final fun getChildren ()Lkotlin/sequences/Sequence;
384384
protected final fun getCompletionCause ()Ljava/lang/Throwable;
385+
protected final fun getCompletionCauseHandled ()Z
385386
public final fun getCompletionExceptionOrNull ()Ljava/lang/Throwable;
386387
protected fun getHandlesException ()Z
387388
public final fun getKey ()Lkotlin/coroutines/CoroutineContext$Key;
388389
public final fun getOnJoin ()Lkotlinx/coroutines/selects/SelectClause0;
389-
protected fun handleJobException (Ljava/lang/Throwable;Z)V
390+
protected fun handleJobException (Ljava/lang/Throwable;)Z
390391
public final fun invokeOnCompletion (Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/DisposableHandle;
391392
public final fun invokeOnCompletion (ZZLkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/DisposableHandle;
392393
public fun isActive ()Z

integration/kotlinx-coroutines-guava/src/ListenableFuture.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,10 @@ private class ListenableFutureCoroutine<T>(
6868
future.set(value)
6969
}
7070

71-
override fun handleJobException(exception: Throwable, handled: Boolean) {
72-
if (!future.setException(exception) && !handled) {
71+
override fun onCancelled(cause: Throwable, handled: Boolean) {
72+
if (!future.setException(cause) && !handled) {
7373
// prevents loss of exception that was not handled by parent & could not be set to SettableFuture
74-
handleCoroutineException(context, exception)
74+
handleCoroutineException(context, cause)
7575
}
7676
}
7777
}

integration/kotlinx-coroutines-jdk8/src/future/Future.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,10 @@ private class CompletableFutureCoroutine<T>(
5757
future.complete(value)
5858
}
5959

60-
override fun handleJobException(exception: Throwable, handled: Boolean) {
61-
if (!future.completeExceptionally(exception) && !handled) {
60+
override fun onCancelled(cause: Throwable, handled: Boolean) {
61+
if (!future.completeExceptionally(cause) && !handled) {
6262
// prevents loss of exception that was not handled by parent & could not be set to CompletableFuture
63-
handleCoroutineException(context, exception)
63+
handleCoroutineException(context, cause)
6464
}
6565
}
6666
}

kotlinx-coroutines-core/common/src/AbstractCoroutine.kt

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import kotlin.jvm.*
2424
* * [onCancellation] is invoked as soon as coroutine is _failing_, or is cancelled,
2525
* or when it completes for any reason.
2626
* * [onCompleted] is invoked when coroutine completes with a value.
27-
* * [onCompletedExceptionally] in invoked when coroutines completes with exception.
27+
* * [onCancelled] in invoked when coroutines completes with exception (cancelled).
2828
*
2929
* @param parentContext context of the parent coroutine.
3030
* @param active when `true` (by default) coroutine is created in _active_ state, when `false` in _new_ state.
@@ -89,20 +89,21 @@ public abstract class AbstractCoroutine<in T>(
8989
protected override fun onCancellation(cause: Throwable?) {}
9090

9191
/**
92-
* This function is invoked once when job is completed normally with the specified [value].
92+
* This function is invoked once when job was completed normally with the specified [value].
9393
*/
9494
protected open fun onCompleted(value: T) {}
9595

9696
/**
97-
* This function is invoked once when job is completed exceptionally with the specified [exception].
97+
* This function is invoked once when job was cancelled with the specified [cause].
98+
* @param cause The cancellation (failure) cause
99+
* @param handled `true` if the exception was handled by parent (always `true` when it is a [CancellationException])
98100
*/
99-
// todo: rename to onCancelled
100-
protected open fun onCompletedExceptionally(exception: Throwable) {}
101+
protected open fun onCancelled(cause: Throwable, handled: Boolean) {}
101102

102103
@Suppress("UNCHECKED_CAST")
103-
internal override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
104+
internal override fun onCompletionInternal(state: Any?, mode: Int) {
104105
if (state is CompletedExceptionally)
105-
onCompletedExceptionally(state.cause)
106+
onCancelled(state.cause, state.handled)
106107
else
107108
onCompleted(state as T)
108109
}

kotlinx-coroutines-core/common/src/Builders.common.kt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,9 @@ private open class StandaloneCoroutine(
179179
parentContext: CoroutineContext,
180180
active: Boolean
181181
) : AbstractCoroutine<Unit>(parentContext, active) {
182-
override fun handleJobException(exception: Throwable, handled: Boolean) {
183-
if (!handled) handleCoroutineException(context, exception)
182+
override fun handleJobException(exception: Throwable): Boolean {
183+
handleCoroutineException(context, exception)
184+
return true
184185
}
185186
}
186187

@@ -240,10 +241,10 @@ private class DispatchedCoroutine<in T>(
240241
}
241242
}
242243

243-
override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
244+
override fun onCompletionInternal(state: Any?, mode: Int) {
244245
if (tryResume()) return // completed before getResult invocation -- bail out
245246
// otherwise, getResult has already commenced, i.e. completed later or in other thread
246-
super.onCompletionInternal(state, mode, suppressed)
247+
super.onCompletionInternal(state, mode)
247248
}
248249

249250
fun getResult(): Any? {

kotlinx-coroutines-core/common/src/CompletedExceptionally.kt

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,12 @@ internal fun <T> Result<T>.toState(): Any? =
1818
* or artificial [CancellationException] if no cause was provided
1919
*/
2020
internal open class CompletedExceptionally(
21-
@JvmField public val cause: Throwable
21+
@JvmField public val cause: Throwable,
22+
handled: Boolean = false
2223
) {
24+
private val _handled = atomic(handled)
25+
val handled: Boolean get() = _handled.value
26+
fun makeHandled(): Boolean = _handled.compareAndSet(false, true)
2327
override fun toString(): String = "$classSimpleName[$cause]"
2428
}
2529

@@ -34,10 +38,7 @@ internal class CancelledContinuation(
3438
continuation: Continuation<*>,
3539
cause: Throwable?,
3640
handled: Boolean
37-
) : CompletedExceptionally(cause ?: CancellationException("Continuation $continuation was cancelled normally")) {
38-
private val resumed = atomic(false)
39-
private val handled = atomic(handled)
40-
41-
fun makeResumed(): Boolean = resumed.compareAndSet(false, true)
42-
fun makeHandled(): Boolean = handled.compareAndSet(false, true)
41+
) : CompletedExceptionally(cause ?: CancellationException("Continuation $continuation was cancelled normally"), handled) {
42+
private val _resumed = atomic(false)
43+
fun makeResumed(): Boolean = _resumed.compareAndSet(false, true)
4344
}

kotlinx-coroutines-core/common/src/JobSupport.kt

Lines changed: 32 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
118118
------ completion listeners are not admitted anymore, invokeOnCompletion returns NonDisposableHandle
119119
+ parentHandle.dispose
120120
+ notifyCompletion (invoke all completion listeners)
121-
+ onCompletionInternal / onCompleted / onCompletedExceptionally
121+
+ onCompletionInternal / onCompleted / onCancelled
122122
123123
---------------------------------------------------------------------------------
124124
*/
@@ -193,22 +193,20 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
193193
// ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
194194
private fun tryFinalizeFinishingState(state: Finishing, proposedUpdate: Any?, mode: Int): Boolean {
195195
/*
196-
* Note: proposed state can be Incompleted, e.g.
196+
* Note: proposed state can be Incomplete, e.g.
197197
* async {
198-
* smth.invokeOnCompletion {} // <- returns handle which implements Incomplete under the hood
198+
* something.invokeOnCompletion {} // <- returns handle which implements Incomplete under the hood
199199
* }
200200
*/
201201
require(this.state === state) // consistency check -- it cannot change
202202
require(!state.isSealed) // consistency check -- cannot be sealed yet
203203
require(state.isCompleting) // consistency check -- must be marked as completing
204204
val proposedException = (proposedUpdate as? CompletedExceptionally)?.cause
205205
// Create the final exception and seal the state so that no more exceptions can be added
206-
var suppressed = false
207206
val finalException = synchronized(state) {
208207
val exceptions = state.sealLocked(proposedException)
209208
val finalCause = getFinalRootCause(state, exceptions)
210-
// Report suppressed exceptions if initial cause doesn't match final cause (due to JCE unwrapping)
211-
if (finalCause != null) suppressed = suppressExceptions(finalCause, exceptions) || finalCause !== state.rootCause
209+
if (finalCause != null) addSuppressedExceptions(finalCause, exceptions)
212210
finalCause
213211
}
214212
// Create the final state object
@@ -222,13 +220,13 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
222220
}
223221
// Now handle the final exception
224222
if (finalException != null) {
225-
val handledByParent = cancelParent(finalException)
226-
handleJobException(finalException, handledByParent)
223+
val handled = cancelParent(finalException) || handleJobException(finalException)
224+
if (handled) (finalState as CompletedExceptionally).makeHandled()
227225
}
228226
// Then CAS to completed state -> it must succeed
229227
require(_state.compareAndSet(state, finalState.boxIncomplete())) { "Unexpected state: ${_state.value}, expected: $state, update: $finalState" }
230228
// And process all post-completion actions
231-
completeStateFinalization(state, finalState, mode, suppressed)
229+
completeStateFinalization(state, finalState, mode)
232230
return true
233231
}
234232

@@ -243,31 +241,28 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
243241
return exceptions.firstOrNull { it !is CancellationException } ?: exceptions[0]
244242
}
245243

246-
private fun suppressExceptions(rootCause: Throwable, exceptions: List<Throwable>): Boolean {
247-
if (exceptions.size <= 1) return false // nothing more to do here
244+
private fun addSuppressedExceptions(rootCause: Throwable, exceptions: List<Throwable>) {
245+
if (exceptions.size <= 1) return // nothing more to do here
248246
val seenExceptions = identitySet<Throwable>(exceptions.size)
249-
var suppressed = false
250247
for (exception in exceptions) {
251248
val unwrapped = unwrap(exception)
252249
if (unwrapped !== rootCause && unwrapped !is CancellationException && seenExceptions.add(unwrapped)) {
253250
rootCause.addSuppressedThrowable(unwrapped)
254-
suppressed = true
255251
}
256252
}
257-
return suppressed
258253
}
259254

260255
// fast-path method to finalize normally completed coroutines without children
261256
private fun tryFinalizeSimpleState(state: Incomplete, update: Any?, mode: Int): Boolean {
262257
check(state is Empty || state is JobNode<*>) // only simple state without lists where children can concurrently add
263258
check(update !is CompletedExceptionally) // only for normal completion
264259
if (!_state.compareAndSet(state, update.boxIncomplete())) return false
265-
completeStateFinalization(state, update, mode, false)
260+
completeStateFinalization(state, update, mode)
266261
return true
267262
}
268263

269264
// suppressed == true when any exceptions were suppressed while building the final completion cause
270-
private fun completeStateFinalization(state: Incomplete, update: Any?, mode: Int, suppressed: Boolean) {
265+
private fun completeStateFinalization(state: Incomplete, update: Any?, mode: Int) {
271266
/*
272267
* Now the job in THE FINAL state. We need to properly handle the resulting state.
273268
* Order of various invocations here is important.
@@ -303,7 +298,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
303298
* It should be last so all callbacks observe consistent state
304299
* of the job which doesn't depend on callback scheduling.
305300
*/
306-
onCompletionInternal(update, mode, suppressed)
301+
onCompletionInternal(update, mode)
307302
}
308303

309304
private fun notifyCancelling(list: NodeList, cause: Throwable) {
@@ -387,18 +382,21 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
387382
* [cancel] cause, [CancellationException] or **`null` if this job had completed normally**.
388383
* This function throws [IllegalStateException] when invoked for an job that has not [completed][isCompleted] nor
389384
* is being cancelled yet.
390-
*
391-
* @suppress **This is unstable API and it is subject to change.**
392385
*/
393-
protected fun getCompletionCause(): Throwable? = loopOnState { state ->
394-
return when (state) {
386+
protected val completionCause: Throwable?
387+
get() = when (val state = state) {
395388
is Finishing -> state.rootCause
396389
?: error("Job is still new or active: $this")
397390
is Incomplete -> error("Job is still new or active: $this")
398391
is CompletedExceptionally -> state.cause
399392
else -> null
400393
}
401-
}
394+
395+
/**
396+
* Returns `true` when [completionCause] exception was handled by parent coroutine.
397+
*/
398+
protected val completionCauseHandled: Boolean
399+
get() = state.let { it is CompletedExceptionally && it.handled }
402400

403401
@Suppress("OverridingDeprecatedMember")
404402
public final override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle =
@@ -859,8 +857,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
859857
}
860858

861859
public final override val children: Sequence<Job> get() = sequence {
862-
val state = this@JobSupport.state
863-
when (state) {
860+
when (val state = this@JobSupport.state) {
864861
is ChildHandleNode -> yield(state.childJob)
865862
is Incomplete -> state.list?.let { list ->
866863
list.forEach<ChildHandleNode> { yield(it.childJob) }
@@ -885,6 +882,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
885882
/**
886883
* Override to process any exceptions that were encountered while invoking completion handlers
887884
* installed via [invokeOnCompletion].
885+
*
888886
* @suppress **This is unstable API and it is subject to change.**
889887
*/
890888
internal open fun handleOnCompletionException(exception: Throwable) {
@@ -910,7 +908,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
910908
protected open val cancelsParent: Boolean get() = true
911909

912910
/**
913-
* Returns `true` for jobs that handle their exceptions via [handleJobException] or integrate them
911+
* Returns `true` for jobs that handle their exceptions or integrate them
914912
* into the job's result via [onCompletionInternal]. The only instance of the [Job] that does not
915913
* handle its exceptions is [JobImpl] and its subclass [SupervisorJobImpl].
916914
*
@@ -919,17 +917,18 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
919917
protected open val handlesException: Boolean get() = true
920918

921919
/**
922-
* Handles the final job [exception] after it was reported to the by the parent,
923-
* where [handled] is `true` when parent had already handled exception and `false` otherwise.
920+
* Handles the final job [exception] that was not handled by the parent coroutine.
921+
* Returns `true` if it handles exception (so handling at later stages is not needed).
922+
* It is designed to be overridden by launch-like coroutines
923+
* (`StandaloneCoroutine` and `ActorCoroutine`) that don't have a result type
924+
* that can represent exceptions.
924925
*
925926
* This method is invoked **exactly once** when the final exception of the job is determined
926927
* and before it becomes complete. At the moment of invocation the job and all its children are complete.
927928
*
928-
* Note, [handled] is always `true` when [exception] is [CancellationException].
929-
*
930929
* @suppress **This is unstable API and it is subject to change.*
931930
*/
932-
protected open fun handleJobException(exception: Throwable, handled: Boolean) {}
931+
protected open fun handleJobException(exception: Throwable): Boolean = false
933932

934933
private fun cancelParent(cause: Throwable): Boolean {
935934
// CancellationException is considered "normal" and parent is not cancelled when child produces it.
@@ -944,10 +943,10 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
944943
* Override for post-completion actions that need to do something with the state.
945944
* @param state the final state.
946945
* @param mode completion mode.
947-
* @param suppressed true when any exceptions were suppressed while building the final completion cause.
946+
*
948947
* @suppress **This is unstable API and it is subject to change.**
949948
*/
950-
internal open fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {}
949+
internal open fun onCompletionInternal(state: Any?, mode: Int) {}
951950

952951
// for nicer debugging
953952
public override fun toString(): String =
@@ -1015,8 +1014,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
10151014
return
10161015
}
10171016
if (exception === rootCause) return // nothing to do
1018-
val eh = _exceptionsHolder // volatile read
1019-
when (eh) {
1017+
when (val eh = _exceptionsHolder) { // volatile read
10201018
null -> _exceptionsHolder = exception
10211019
is Throwable -> {
10221020
if (exception === eh) return // nothing to do

kotlinx-coroutines-core/common/src/Timeout.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ private open class TimeoutCoroutine<U, in T: U>(
9595
}
9696

9797
@Suppress("UNCHECKED_CAST")
98-
internal override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
98+
internal override fun onCompletionInternal(state: Any?, mode: Int) {
9999
if (state is CompletedExceptionally)
100100
uCont.resumeUninterceptedWithExceptionMode(state.cause, mode)
101101
else

kotlinx-coroutines-core/common/src/channels/Broadcast.kt

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,13 @@ private open class BroadcastCoroutine<E>(
109109
return true // does not matter - result is used in DEPRECATED functions only
110110
}
111111

112-
override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
113-
val cause = (state as? CompletedExceptionally)?.cause
112+
override fun onCompleted(value: Unit) {
113+
_channel.close()
114+
}
115+
116+
override fun onCancelled(cause: Throwable, handled: Boolean) {
114117
val processed = _channel.close(cause)
115-
if (cause != null && !processed && suppressed) handleCoroutineException(context, cause)
118+
if (!processed && !handled) handleCoroutineException(context, cause)
116119
}
117120
}
118121

kotlinx-coroutines-core/common/src/channels/Produce.kt

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,12 @@ private class ProducerCoroutine<E>(
9494
override val isActive: Boolean
9595
get() = super.isActive
9696

97-
override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
98-
val cause = (state as? CompletedExceptionally)?.cause
97+
override fun onCompleted(value: Unit) {
98+
_channel.close()
99+
}
100+
101+
override fun onCancelled(cause: Throwable, handled: Boolean) {
99102
val processed = _channel.close(cause)
100-
if (cause != null && !processed && suppressed) handleCoroutineException(context, cause)
103+
if (!processed && !handled) handleCoroutineException(context, cause)
101104
}
102105
}

0 commit comments

Comments
 (0)