Skip to content

Commit 523638f

Browse files
committed
Force implementors of AbstractCoroutine deliberately choose whether parent-child relationship should be established
* Test all coroutine builders * Fix FlowSubscription
1 parent 45e0652 commit 523638f

File tree

35 files changed

+230
-53
lines changed

35 files changed

+230
-53
lines changed

integration/kotlinx-coroutines-guava/src/ListenableFuture.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ private class ToContinuation<T>(
299299
*/
300300
private class ListenableFutureCoroutine<T>(
301301
context: CoroutineContext
302-
) : AbstractCoroutine<T>(context) {
302+
) : AbstractCoroutine<T>(context, initParentJob = true, active = true) {
303303

304304
// JobListenableFuture propagates external cancellation to `this` coroutine. See JobListenableFuture.
305305
@JvmField

integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt

+9-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines.guava
@@ -747,4 +747,12 @@ class ListenableFutureTest : TestBase() {
747747
latch.countDown()
748748
return future
749749
}
750+
751+
@Test
752+
fun testCancelledParent() = runTest({ it is CancellationException }) {
753+
cancel()
754+
future { expectUnreached() }
755+
future(start = CoroutineStart.ATOMIC) { }
756+
future(start = CoroutineStart.UNDISPATCHED) { }
757+
}
750758
}

integration/kotlinx-coroutines-jdk8/src/future/Future.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public fun <T> CoroutineScope.future(
4848
private class CompletableFutureCoroutine<T>(
4949
context: CoroutineContext,
5050
private val future: CompletableFuture<T>
51-
) : AbstractCoroutine<T>(context), BiConsumer<T?, Throwable?> {
51+
) : AbstractCoroutine<T>(context, initParentJob = true, active = true), BiConsumer<T?, Throwable?> {
5252
override fun accept(value: T?, exception: Throwable?) {
5353
cancel()
5454
}

integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt

+9-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines.future
@@ -567,4 +567,12 @@ class FutureTest : TestBase() {
567567
assertFailsWith<CancellationException> { stage.await() }
568568
finish(4)
569569
}
570+
571+
@Test
572+
fun testCancelledParent() = runTest({ it is java.util.concurrent.CancellationException }) {
573+
cancel()
574+
future { expectUnreached() }
575+
future(start = CoroutineStart.ATOMIC) { }
576+
future(start = CoroutineStart.UNDISPATCHED) { }
577+
}
570578
}

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
public abstract class kotlinx/coroutines/AbstractCoroutine : kotlinx/coroutines/JobSupport, kotlin/coroutines/Continuation, kotlinx/coroutines/CoroutineScope, kotlinx/coroutines/Job {
2-
public fun <init> (Lkotlin/coroutines/CoroutineContext;Z)V
3-
public synthetic fun <init> (Lkotlin/coroutines/CoroutineContext;ZILkotlin/jvm/internal/DefaultConstructorMarker;)V
2+
public fun <init> (Lkotlin/coroutines/CoroutineContext;ZZ)V
43
protected fun afterResume (Ljava/lang/Object;)V
54
protected fun cancellationExceptionMessage ()Ljava/lang/String;
65
public final fun getContext ()Lkotlin/coroutines/CoroutineContext;
@@ -418,6 +417,7 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin
418417
public final fun getKey ()Lkotlin/coroutines/CoroutineContext$Key;
419418
public final fun getOnJoin ()Lkotlinx/coroutines/selects/SelectClause0;
420419
protected fun handleJobException (Ljava/lang/Throwable;)Z
420+
protected final fun initParentJob (Lkotlinx/coroutines/Job;)V
421421
public final fun invokeOnCompletion (Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/DisposableHandle;
422422
public final fun invokeOnCompletion (ZZLkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/DisposableHandle;
423423
public fun isActive ()Z

kotlinx-coroutines-core/common/src/AbstractCoroutine.kt

+17-12
Original file line numberDiff line numberDiff line change
@@ -25,33 +25,38 @@ import kotlin.coroutines.*
2525
* * [onCancelled] in invoked when the coroutine completes with an exception (cancelled).
2626
*
2727
* @param parentContext the context of the parent coroutine.
28+
* @param initParentJob specifies whether the parent-child relationship should be instantiated directly
29+
* in `AbstractCoroutine` constructor. If set to `false`, it's the responsibility of the child class
30+
* to invoke [initParentJob] manually.
2831
* @param active when `true` (by default), the coroutine is created in the _active_ state, otherwise it is created in the _new_ state.
2932
* See [Job] for details.
3033
*
3134
* @suppress **This an internal API and should not be used from general code.**
3235
*/
3336
@InternalCoroutinesApi
3437
public abstract class AbstractCoroutine<in T>(
35-
/**
36-
* The context of the parent coroutine.
37-
*/
3838
parentContext: CoroutineContext,
39-
active: Boolean = true
39+
initParentJob: Boolean,
40+
active: Boolean
4041
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
42+
43+
init {
44+
/*
45+
* Setup parent-child relationship between the parent in the context and the current coroutine.
46+
* It may cause this coroutine to become _cancelling_ if the parent is already cancelled.
47+
* It is dangerous to install parent-child relationship here if the coroutine class
48+
* operates its state from within onCancelled or onCancelling
49+
* (with exceptions for rx integrations that can't have any parent)
50+
*/
51+
if (initParentJob) initParentJob(parentContext[Job])
52+
}
53+
4154
/**
4255
* The context of this coroutine that includes this coroutine as a [Job].
4356
*/
4457
@Suppress("LeakingThis")
4558
public final override val context: CoroutineContext = parentContext + this
4659

47-
init {
48-
/*
49-
* Setup parent-child relationship between the parent in the context
50-
* and the current coroutine.
51-
* It may cause this coroutine to become cancelled if the parent is already cancelled
52-
*/
53-
initParentJobInternal(parentContext[Job])
54-
}
5560
/**
5661
* The context of this scope which is the same as the [context] of this coroutine.
5762
*/

kotlinx-coroutines-core/common/src/Builders.common.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public fun <T> CoroutineScope.async(
9696
private open class DeferredCoroutine<T>(
9797
parentContext: CoroutineContext,
9898
active: Boolean
99-
) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> {
99+
) : AbstractCoroutine<T>(parentContext, true, active = active), Deferred<T>, SelectClause1<T> {
100100
override fun getCompleted(): T = getCompletedInternal() as T
101101
override suspend fun await(): T = awaitInternal() as T
102102
override val onAwait: SelectClause1<T> get() = this
@@ -187,7 +187,7 @@ public suspend inline operator fun <T> CoroutineDispatcher.invoke(
187187
private open class StandaloneCoroutine(
188188
parentContext: CoroutineContext,
189189
active: Boolean
190-
) : AbstractCoroutine<Unit>(parentContext, active) {
190+
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
191191
override fun handleJobException(exception: Throwable): Boolean {
192192
handleCoroutineException(context, exception)
193193
return true

kotlinx-coroutines-core/common/src/CompletableDeferred.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public fun <T> CompletableDeferred(value: T): CompletableDeferred<T> = Completab
8080
private class CompletableDeferredImpl<T>(
8181
parent: Job?
8282
) : JobSupport(true), CompletableDeferred<T>, SelectClause1<T> {
83-
init { initParentJobInternal(parent) }
83+
init { initParentJob(parent) }
8484
override val onCancelComplete get() = true
8585
override fun getCompleted(): T = getCompletedInternal() as T
8686
override suspend fun await(): T = awaitInternal() as T

kotlinx-coroutines-core/common/src/JobSupport.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
139139
* Initializes parent job.
140140
* It shall be invoked at most once after construction after all other initialization.
141141
*/
142-
internal fun initParentJobInternal(parent: Job?) {
142+
protected fun initParentJob(parent: Job?) {
143143
assert { parentHandle == null }
144144
if (parent == null) {
145145
parentHandle = NonDisposableHandle
@@ -1311,7 +1311,7 @@ private class Empty(override val isActive: Boolean) : Incomplete {
13111311
}
13121312

13131313
internal open class JobImpl(parent: Job?) : JobSupport(true), CompletableJob {
1314-
init { initParentJobInternal(parent) }
1314+
init { initParentJob(parent) }
13151315
override val onCancelComplete get() = true
13161316
/*
13171317
* Check whether parent is able to handle exceptions as well.

kotlinx-coroutines-core/common/src/channels/Broadcast.kt

+7-1
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,13 @@ private open class BroadcastCoroutine<E>(
127127
parentContext: CoroutineContext,
128128
protected val _channel: BroadcastChannel<E>,
129129
active: Boolean
130-
) : AbstractCoroutine<Unit>(parentContext, active), ProducerScope<E>, BroadcastChannel<E> by _channel {
130+
) : AbstractCoroutine<Unit>(parentContext, initParentJob = false, active = active),
131+
ProducerScope<E>, BroadcastChannel<E> by _channel {
132+
133+
init {
134+
initParentJob(parentContext[Job])
135+
}
136+
131137
override val isActive: Boolean get() = super.isActive
132138

133139
override val channel: SendChannel<E>

kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@ import kotlin.coroutines.*
1111
internal open class ChannelCoroutine<E>(
1212
parentContext: CoroutineContext,
1313
protected val _channel: Channel<E>,
14+
initParentJob: Boolean,
1415
active: Boolean
15-
) : AbstractCoroutine<Unit>(parentContext, active), Channel<E> by _channel {
16+
) : AbstractCoroutine<Unit>(parentContext, initParentJob, active), Channel<E> by _channel {
17+
1618
val channel: Channel<E> get() = this
1719

1820
override fun cancel() {

kotlinx-coroutines-core/common/src/channels/Produce.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ internal fun <E> CoroutineScope.produce(
139139

140140
internal open class ProducerCoroutine<E>(
141141
parentContext: CoroutineContext, channel: Channel<E>
142-
) : ChannelCoroutine<E>(parentContext, channel, active = true), ProducerScope<E> {
142+
) : ChannelCoroutine<E>(parentContext, channel, true, active = true), ProducerScope<E> {
143143
override val isActive: Boolean
144144
get() = super.isActive
145145

kotlinx-coroutines-core/common/src/internal/Scopes.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import kotlin.jvm.*
1515
internal open class ScopeCoroutine<in T>(
1616
context: CoroutineContext,
1717
@JvmField val uCont: Continuation<T> // unintercepted continuation
18-
) : AbstractCoroutine<T>(context, true), CoroutineStackFrame {
18+
) : AbstractCoroutine<T>(context, true, true), CoroutineStackFrame {
1919

2020
final override val callerFrame: CoroutineStackFrame? get() = uCont as? CoroutineStackFrame
2121
final override fun getStackTraceElement(): StackTraceElement? = null

kotlinx-coroutines-core/common/test/AbstractCoroutineTest.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines
@@ -13,7 +13,7 @@ class AbstractCoroutineTest : TestBase() {
1313
fun testNotifications() = runTest {
1414
expect(1)
1515
val coroutineContext = coroutineContext // workaround for KT-22984
16-
val coroutine = object : AbstractCoroutine<String>(coroutineContext, false) {
16+
val coroutine = object : AbstractCoroutine<String>(coroutineContext, true, false) {
1717
override fun onStart() {
1818
expect(3)
1919
}
@@ -53,7 +53,7 @@ class AbstractCoroutineTest : TestBase() {
5353
fun testNotificationsWithException() = runTest {
5454
expect(1)
5555
val coroutineContext = coroutineContext // workaround for KT-22984
56-
val coroutine = object : AbstractCoroutine<String>(coroutineContext + NonCancellable, false) {
56+
val coroutine = object : AbstractCoroutine<String>(coroutineContext + NonCancellable, true, false) {
5757
override fun onStart() {
5858
expect(3)
5959
}
@@ -91,4 +91,4 @@ class AbstractCoroutineTest : TestBase() {
9191
coroutine.resumeWithException(TestException2())
9292
finish(10)
9393
}
94-
}
94+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlinx.coroutines.channels.*
8+
import kotlinx.coroutines.flow.internal.*
9+
import kotlin.test.*
10+
11+
class CancelledParentAttachTest : TestBase() {
12+
13+
@Test
14+
fun testAsync() = CoroutineStart.values().forEach(::testAsyncCancelledParent)
15+
16+
private fun testAsyncCancelledParent(start: CoroutineStart) =
17+
runTest({ it is CancellationException }) {
18+
cancel()
19+
expect(1)
20+
val d = async<Int>(start = start) { 42 }
21+
expect(2)
22+
d.invokeOnCompletion {
23+
finish(3)
24+
reset()
25+
}
26+
}
27+
28+
@Test
29+
fun testLaunch() = CoroutineStart.values().forEach(::testLaunchCancelledParent)
30+
31+
private fun testLaunchCancelledParent(start: CoroutineStart) =
32+
runTest({ it is CancellationException }) {
33+
cancel()
34+
expect(1)
35+
val d = launch(start = start) { }
36+
expect(2)
37+
d.invokeOnCompletion {
38+
finish(3)
39+
reset()
40+
}
41+
}
42+
43+
@Test
44+
fun testProduce() =
45+
runTest({ it is CancellationException }) {
46+
cancel()
47+
expect(1)
48+
val d = produce<Int> { }
49+
expect(2)
50+
(d as Job).invokeOnCompletion {
51+
finish(3)
52+
reset()
53+
}
54+
}
55+
56+
@Test
57+
fun testBroadcast() = CoroutineStart.values().forEach(::testBroadcastCancelledParent)
58+
59+
private fun testBroadcastCancelledParent(start: CoroutineStart) =
60+
runTest({ it is CancellationException }) {
61+
cancel()
62+
expect(1)
63+
val bc = broadcast<Int>(start = start) {}
64+
expect(2)
65+
(bc as Job).invokeOnCompletion {
66+
finish(3)
67+
reset()
68+
}
69+
}
70+
71+
@Test
72+
fun testScopes() {
73+
testScope { coroutineScope { } }
74+
testScope { supervisorScope { } }
75+
testScope { flowScope { } }
76+
testScope { withTimeout(Long.MAX_VALUE) { } }
77+
testScope { withContext(Job()) { } }
78+
testScope { withContext(CoroutineName("")) { } }
79+
}
80+
81+
private inline fun testScope(crossinline block: suspend () -> Unit) = runTest({ it is CancellationException }) {
82+
cancel()
83+
block()
84+
}
85+
}

kotlinx-coroutines-core/jvm/src/Builders.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ private class BlockingCoroutine<T>(
6363
parentContext: CoroutineContext,
6464
private val blockedThread: Thread,
6565
private val eventLoop: EventLoop?
66-
) : AbstractCoroutine<T>(parentContext, true) {
66+
) : AbstractCoroutine<T>(parentContext, true, true) {
67+
6768
override val isScopedCoroutine: Boolean get() = true
6869

6970
override fun afterCompletion(state: Any?) {

kotlinx-coroutines-core/jvm/src/channels/Actor.kt

+5-1
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,11 @@ private open class ActorCoroutine<E>(
127127
parentContext: CoroutineContext,
128128
channel: Channel<E>,
129129
active: Boolean
130-
) : ChannelCoroutine<E>(parentContext, channel, active), ActorScope<E> {
130+
) : ChannelCoroutine<E>(parentContext, channel, initParentJob = false, active = active), ActorScope<E> {
131+
132+
init {
133+
initParentJob(parentContext[Job])
134+
}
131135

132136
override fun onCancelling(cause: Throwable?) {
133137
_channel.cancel(cause?.let {

kotlinx-coroutines-core/jvm/test/JoinStrTest.kt renamed to kotlinx-coroutines-core/jvm/test/JoinStressTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines

kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt

+12-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines
@@ -171,4 +171,15 @@ class RunBlockingTest : TestBase() {
171171
}
172172
rb.hashCode() // unused
173173
}
174+
175+
@Test
176+
fun testCancelledParent() {
177+
val job = Job()
178+
job.cancel()
179+
assertFailsWith<CancellationException> {
180+
runBlocking(job) {
181+
expectUnreached()
182+
}
183+
}
184+
}
174185
}

0 commit comments

Comments
 (0)