Skip to content

Commit 533666f

Browse files
committed
Introducing flowScope, builder necessary for creating cancellation-transparent flow operators
1 parent db52e97 commit 533666f

File tree

10 files changed

+188
-34
lines changed

10 files changed

+188
-34
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -377,11 +377,11 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin
377377
public fun cancel (Ljava/util/concurrent/CancellationException;)V
378378
public final fun cancelCoroutine (Ljava/lang/Throwable;)Z
379379
public fun cancelInternal (Ljava/lang/Throwable;)Z
380+
public fun cancelOnChildCancellation (Ljava/util/concurrent/CancellationException;)Z
380381
public fun childCancelled (Ljava/lang/Throwable;)Z
381382
public fun fold (Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)Ljava/lang/Object;
382383
public fun get (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext$Element;
383384
public final fun getCancellationException ()Ljava/util/concurrent/CancellationException;
384-
protected fun getCancelsParent ()Z
385385
public fun getChildJobCancellationCause ()Ljava/util/concurrent/CancellationException;
386386
public final fun getChildren ()Lkotlin/sequences/Sequence;
387387
protected final fun getCompletionCause ()Ljava/lang/Throwable;
@@ -396,6 +396,7 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin
396396
public final fun isCancelled ()Z
397397
public final fun isCompleted ()Z
398398
public final fun isCompletedExceptionally ()Z
399+
protected fun isScopedCoroutine ()Z
399400
public final fun join (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
400401
public fun minusKey (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext;
401402
protected fun onCancelling (Ljava/lang/Throwable;)V

kotlinx-coroutines-core/common/src/JobSupport.kt

Lines changed: 49 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,31 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
319319
cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
320320
}
321321

322+
/**
323+
* The method that is invoked when the job is cancelled to possible propagate cancellation to the parent.
324+
* Returns `true` if the parent is responsible for handling the exception, `false` otherwise.
325+
*
326+
* Invariant: never returns `true` for instances of [CancellationException], otherwise such exception
327+
* may leak to the [CoroutineExceptionHandler].
328+
*/
329+
private fun cancelParent(cause: Throwable): Boolean {
330+
/* CancellationException is considered "normal" and parent usually is not cancelled when child produces it.
331+
* This allow parent to cancel its children (normally) without being cancelled itself, unless
332+
* child crashes and produce some other exception during its completion.
333+
*/
334+
val isCancellation = cause is CancellationException
335+
val parent = parentHandle
336+
// No parent -- ignore CE, report other exceptions.
337+
if (parent === null || parent === NonDisposableHandle) {
338+
return isCancellation
339+
}
340+
341+
// Is scoped coroutine -- don't propagate, will be rethrown
342+
if (isScopedCoroutine) return isCancellation
343+
// Notify parent but don't forget to check cancellation
344+
return parent.childCancelled(cause) || isCancellation
345+
}
346+
322347
private fun NodeList.notifyCompletion(cause: Throwable?) =
323348
notifyHandlers<JobNode<*>>(this, cause)
324349

@@ -594,17 +619,28 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
594619
cancelImpl(parentJob)
595620
}
596621

597-
// Child was cancelled with cause
598-
// It is overridden in supervisor implementations to ignore child cancellation
599-
public open fun childCancelled(cause: Throwable): Boolean =
600-
cancelImpl(cause) && handlesException
622+
/**
623+
* Returns `true` if job should cancel itself during on child [CancellationException].
624+
*/
625+
public open fun cancelOnChildCancellation(cause: CancellationException) = false
626+
627+
/**
628+
* Child was cancelled with a cause.
629+
* At this point parent decides whether it cancels itself (e.g. on a critical failure) and
630+
* whether it handles the exception of the child.
631+
*
632+
* It is overridden in supervisor implementations to completely ignore any child cancellation
633+
*/
634+
public open fun childCancelled(cause: Throwable): Boolean {
635+
if (cause is CancellationException && !cancelOnChildCancellation(cause)) return true
636+
return cancelImpl(cause) && handlesException
637+
}
601638

602639
/**
603640
* Makes this [Job] cancelled with a specified [cause].
604641
* It is used in [AbstractCoroutine]-derived classes when there is an internal failure.
605642
*/
606-
public fun cancelCoroutine(cause: Throwable?) =
607-
cancelImpl(cause)
643+
public fun cancelCoroutine(cause: Throwable?) = cancelImpl(cause)
608644

609645
// cause is Throwable or ParentJob when cancelChild was invoked
610646
// returns true is exception was handled, false otherwise
@@ -912,14 +948,15 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
912948
protected open fun onCancelling(cause: Throwable?) {}
913949

914950
/**
915-
* When this function returns `true` the parent is cancelled on cancellation of this job.
916-
* Note that [CancellationException] is considered "normal" and parent is not cancelled when child produces it.
917-
* This allows parent to cancel its children (normally) without being cancelled itself, unless
918-
* child crashes and produce some other exception during its completion.
951+
* Returns `true` for scoped coroutines.
952+
* Scoped coroutine is a coroutine that is executed sequentially within the enclosing scope
953+
* without any concurrency.
954+
* Scoped coroutines always handle any exception happened within -- they just rethrow it
955+
* to the enclosing scope.
919956
*
920-
* @suppress **This is unstable API and it is subject to change.*
957+
* Examples of scoped coroutines are `coroutineScope`, `withTimeout` and `runBlocking`.
921958
*/
922-
protected open val cancelsParent: Boolean get() = true
959+
protected open val isScopedCoroutine: Boolean get() = false
923960

924961
/**
925962
* Returns `true` for jobs that handle their exceptions or integrate them into the job's result via [onCompletionInternal].
@@ -939,20 +976,9 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
939976
*
940977
* This method is invoked **exactly once** when the final exception of the job is determined
941978
* and before it becomes complete. At the moment of invocation the job and all its children are complete.
942-
*
943-
* @suppress **This is unstable API and it is subject to change.*
944979
*/
945980
protected open fun handleJobException(exception: Throwable): Boolean = false
946981

947-
private fun cancelParent(cause: Throwable): Boolean {
948-
// CancellationException is considered "normal" and parent is not cancelled when child produces it.
949-
// This allow parent to cancel its children (normally) without being cancelled itself, unless
950-
// child crashes and produce some other exception during its completion.
951-
if (cause is CancellationException) return true
952-
if (!cancelsParent) return false
953-
return parentHandle?.childCancelled(cause) == true
954-
}
955-
956982
/**
957983
* Override for completion actions that need to update some external object depending on job's state,
958984
* right before all the waiters for coroutine's completion are notified.

kotlinx-coroutines-core/common/src/Timeout.kt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,7 @@ private open class TimeoutCoroutine<U, in T: U>(
8585
override val defaultResumeMode: Int get() = MODE_DIRECT
8686
override val callerFrame: CoroutineStackFrame? get() = (uCont as? CoroutineStackFrame)
8787
override fun getStackTraceElement(): StackTraceElement? = null
88-
89-
override val cancelsParent: Boolean
90-
get() = false // it throws exception to parent instead of cancelling it
88+
override val isScopedCoroutine: Boolean get() = true
9189

9290
@Suppress("LeakingThis", "Deprecation")
9391
override fun run() {
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow.internal
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.internal.*
9+
import kotlinx.coroutines.intrinsics.*
10+
import kotlin.coroutines.*
11+
import kotlin.coroutines.intrinsics.*
12+
13+
/**
14+
* Creates a [CoroutineScope] and calls the specified suspend block with this scope.
15+
* This builder is similar to [coroutineScope] with the only exception that it *ties* lifecycle of children
16+
* and itself regarding the cancellation, thus being cancelled when one of the children becomes cancelled.
17+
*
18+
* For example:
19+
* ```
20+
* flowScope {
21+
* launch {
22+
* throw CancellationException()
23+
* }
24+
* } // <- CE will be rethrown here
25+
* ```
26+
*/
27+
internal suspend fun <R> flowScope(block: suspend CoroutineScope.() -> R): R =
28+
suspendCoroutineUninterceptedOrReturn { uCont ->
29+
val coroutine = FlowScope(uCont.context, uCont)
30+
coroutine.startUndispatchedOrReturn(coroutine, block)
31+
}
32+
33+
private class FlowScope<T>(context: CoroutineContext, uCont: Continuation<T>) : ScopeCoroutine<T>(context, uCont) {
34+
public override fun cancelOnChildCancellation(cause: CancellationException) = cause !is ChildCancelledException
35+
}
36+
37+
internal class ChildCancelledException : CancellationException(null)

kotlinx-coroutines-core/common/src/internal/Scopes.kt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,12 @@ internal open class ScopeCoroutine<in T>(
1717
) : AbstractCoroutine<T>(context, true), CoroutineStackFrame {
1818
final override val callerFrame: CoroutineStackFrame? get() = uCont as CoroutineStackFrame?
1919
final override fun getStackTraceElement(): StackTraceElement? = null
20+
final override val isScopedCoroutine: Boolean get() = true
21+
2022
override val defaultResumeMode: Int get() = MODE_DIRECT
2123

2224
internal val parent: Job? get() = parentContext[Job]
2325

24-
override val cancelsParent: Boolean
25-
get() = false // it throws exception to parent instead of cancelling it
26-
2726
@Suppress("UNCHECKED_CAST")
2827
override fun afterCompletionInternal(state: Any?, mode: Int) {
2928
if (state is CompletedExceptionally) {

kotlinx-coroutines-core/common/test/SupervisorTest.kt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,4 +219,22 @@ class SupervisorTest : TestBase() {
219219
yield() // to coroutineScope
220220
finish(7)
221221
}
222+
223+
@Test
224+
fun testSupervisorJobCancellationException() = runTest {
225+
val job = SupervisorJob()
226+
val child = launch(job + CoroutineExceptionHandler { _, _ -> expectUnreached() }) {
227+
expect(1)
228+
hang {
229+
expect(3)
230+
}
231+
}
232+
233+
yield()
234+
expect(2)
235+
child.cancelAndJoin()
236+
job.complete()
237+
job.join()
238+
finish(4)
239+
}
222240
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow.internal
6+
7+
import kotlinx.coroutines.*
8+
import kotlin.test.*
9+
10+
class FlowScopeTest : TestBase() {
11+
12+
@Test
13+
fun testCancellation() = runTest {
14+
assertFailsWith<CancellationException> {
15+
flowScope {
16+
expect(1)
17+
val child = launch {
18+
expect(3)
19+
hang { expect(5) }
20+
}
21+
expect(2)
22+
yield()
23+
expect(4)
24+
child.cancel()
25+
}
26+
}
27+
finish(6)
28+
}
29+
30+
@Test
31+
fun testCancellationWithChildCancelled() = runTest {
32+
flowScope {
33+
expect(1)
34+
val child = launch {
35+
expect(3)
36+
hang { expect(5) }
37+
}
38+
expect(2)
39+
yield()
40+
expect(4)
41+
child.cancel(ChildCancelledException())
42+
}
43+
finish(6)
44+
}
45+
46+
@Test
47+
fun testCancellationWithSuspensionPoint() = runTest {
48+
assertFailsWith<CancellationException> {
49+
flowScope {
50+
expect(1)
51+
val child = launch {
52+
expect(3)
53+
hang { expect(6) }
54+
}
55+
expect(2)
56+
yield()
57+
expect(4)
58+
child.cancel()
59+
hang { expect(5) }
60+
}
61+
}
62+
finish(7)
63+
}
64+
65+
@Test
66+
fun testNestedScopes() = runTest {
67+
assertFailsWith<CancellationException> {
68+
flowScope {
69+
flowScope {
70+
launch {
71+
throw CancellationException(null)
72+
}
73+
}
74+
}
75+
}
76+
}
77+
}

kotlinx-coroutines-core/jvm/src/Builders.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,7 @@ private class BlockingCoroutine<T>(
5959
private val blockedThread: Thread,
6060
private val eventLoop: EventLoop?
6161
) : AbstractCoroutine<T>(parentContext, true) {
62-
override val cancelsParent: Boolean
63-
get() = false // it throws exception to parent instead of cancelling it
62+
override val isScopedCoroutine: Boolean get() = true
6463

6564
override fun afterCompletionInternal(state: Any?, mode: Int) {
6665
// wake up blocked thread

kotlinx-coroutines-core/jvm/src/channels/Actor.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ private open class ActorCoroutine<E>(
127127
channel: Channel<E>,
128128
active: Boolean
129129
) : ChannelCoroutine<E>(parentContext, channel, active), ActorScope<E> {
130-
override val cancelsParent: Boolean get() = true
131130

132131
override fun onCancelling(cause: Throwable?) {
133132
_channel.cancel(cause?.let {

kotlinx-coroutines-core/native/src/Builders.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ private class BlockingCoroutine<T>(
5454
parentContext: CoroutineContext,
5555
private val eventLoop: EventLoop?
5656
) : AbstractCoroutine<T>(parentContext, true) {
57-
override val cancelsParent: Boolean
57+
override val isCoroutine: Boolean
5858
get() = false // it throws exception to parent instead of cancelling it
5959

6060
@Suppress("UNCHECKED_CAST")

0 commit comments

Comments
 (0)