Skip to content

Commit 05f7d5d

Browse files
authored
Simplify internal coroutines machinery (#2512)
* Merge onStartInternal and onStart to reduce the number of methods and make code a bit simpler * Rework initParentJob * Always establish a parent-child relationship when creating a subclass of AbstractCoroutine. That's our own internal class that we have full control of and it never has a chance to leak to the user-code (so cancellation handlers will be installed etc.). Force implementors of AbstractCoroutine deliberately choose whether parent-child relationship should be established * As a consequence, get rid of parentContext in all our coroutine classes that are not ScopeCoroutine * Remove some dead code * Get rid of an additional parent field from ScopeCoroutine Leverage already presented information in our implementation, just expose it via an already present internal interface
1 parent f2940d5 commit 05f7d5d

File tree

37 files changed

+259
-109
lines changed

37 files changed

+259
-109
lines changed

Diff for: integration/kotlinx-coroutines-guava/src/ListenableFuture.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ private class ToContinuation<T>(
299299
*/
300300
private class ListenableFutureCoroutine<T>(
301301
context: CoroutineContext
302-
) : AbstractCoroutine<T>(context) {
302+
) : AbstractCoroutine<T>(context, initParentJob = true, active = true) {
303303

304304
// JobListenableFuture propagates external cancellation to `this` coroutine. See JobListenableFuture.
305305
@JvmField

Diff for: integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt

+9-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines.guava
@@ -747,4 +747,12 @@ class ListenableFutureTest : TestBase() {
747747
latch.countDown()
748748
return future
749749
}
750+
751+
@Test
752+
fun testCancelledParent() = runTest({ it is CancellationException }) {
753+
cancel()
754+
future { expectUnreached() }
755+
future(start = CoroutineStart.ATOMIC) { }
756+
future(start = CoroutineStart.UNDISPATCHED) { }
757+
}
750758
}

Diff for: integration/kotlinx-coroutines-jdk8/src/future/Future.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public fun <T> CoroutineScope.future(
4848
private class CompletableFutureCoroutine<T>(
4949
context: CoroutineContext,
5050
private val future: CompletableFuture<T>
51-
) : AbstractCoroutine<T>(context), BiConsumer<T?, Throwable?> {
51+
) : AbstractCoroutine<T>(context, initParentJob = true, active = true), BiConsumer<T?, Throwable?> {
5252
override fun accept(value: T?, exception: Throwable?) {
5353
cancel()
5454
}

Diff for: integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt

+9-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines.future
@@ -567,4 +567,12 @@ class FutureTest : TestBase() {
567567
assertFailsWith<CancellationException> { stage.await() }
568568
finish(4)
569569
}
570+
571+
@Test
572+
fun testCancelledParent() = runTest({ it is java.util.concurrent.CancellationException }) {
573+
cancel()
574+
future { expectUnreached() }
575+
future(start = CoroutineStart.ATOMIC) { }
576+
future(start = CoroutineStart.UNDISPATCHED) { }
577+
}
570578
}

Diff for: kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
public abstract class kotlinx/coroutines/AbstractCoroutine : kotlinx/coroutines/JobSupport, kotlin/coroutines/Continuation, kotlinx/coroutines/CoroutineScope, kotlinx/coroutines/Job {
2-
protected final field parentContext Lkotlin/coroutines/CoroutineContext;
3-
public fun <init> (Lkotlin/coroutines/CoroutineContext;Z)V
4-
public synthetic fun <init> (Lkotlin/coroutines/CoroutineContext;ZILkotlin/jvm/internal/DefaultConstructorMarker;)V
2+
public fun <init> (Lkotlin/coroutines/CoroutineContext;ZZ)V
53
protected fun afterResume (Ljava/lang/Object;)V
64
protected fun cancellationExceptionMessage ()Ljava/lang/String;
75
public final fun getContext ()Lkotlin/coroutines/CoroutineContext;
@@ -10,10 +8,8 @@ public abstract class kotlinx/coroutines/AbstractCoroutine : kotlinx/coroutines/
108
protected fun onCancelled (Ljava/lang/Throwable;Z)V
119
protected fun onCompleted (Ljava/lang/Object;)V
1210
protected final fun onCompletionInternal (Ljava/lang/Object;)V
13-
protected fun onStart ()V
1411
public final fun resumeWith (Ljava/lang/Object;)V
1512
public final fun start (Lkotlinx/coroutines/CoroutineStart;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)V
16-
public final fun start (Lkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function1;)V
1713
}
1814

1915
public final class kotlinx/coroutines/AwaitKt {
@@ -89,6 +85,7 @@ public final class kotlinx/coroutines/CancellableContinuationKt {
8985

9086
public abstract interface class kotlinx/coroutines/ChildHandle : kotlinx/coroutines/DisposableHandle {
9187
public abstract fun childCancelled (Ljava/lang/Throwable;)Z
88+
public abstract fun getParent ()Lkotlinx/coroutines/Job;
9289
}
9390

9491
public abstract interface class kotlinx/coroutines/ChildJob : kotlinx/coroutines/Job {
@@ -420,6 +417,7 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin
420417
public final fun getKey ()Lkotlin/coroutines/CoroutineContext$Key;
421418
public final fun getOnJoin ()Lkotlinx/coroutines/selects/SelectClause0;
422419
protected fun handleJobException (Ljava/lang/Throwable;)Z
420+
protected final fun initParentJob (Lkotlinx/coroutines/Job;)V
423421
public final fun invokeOnCompletion (Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/DisposableHandle;
424422
public final fun invokeOnCompletion (ZZLkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/DisposableHandle;
425423
public fun isActive ()Z
@@ -431,6 +429,7 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin
431429
public fun minusKey (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext;
432430
protected fun onCancelling (Ljava/lang/Throwable;)V
433431
protected fun onCompletionInternal (Ljava/lang/Object;)V
432+
protected fun onStart ()V
434433
public final fun parentCancelled (Lkotlinx/coroutines/ParentJob;)V
435434
public fun plus (Lkotlin/coroutines/CoroutineContext;)Lkotlin/coroutines/CoroutineContext;
436435
public fun plus (Lkotlinx/coroutines/Job;)Lkotlinx/coroutines/Job;
@@ -473,6 +472,7 @@ public final class kotlinx/coroutines/NonDisposableHandle : kotlinx/coroutines/C
473472
public static final field INSTANCE Lkotlinx/coroutines/NonDisposableHandle;
474473
public fun childCancelled (Ljava/lang/Throwable;)Z
475474
public fun dispose ()V
475+
public fun getParent ()Lkotlinx/coroutines/Job;
476476
public fun toString ()Ljava/lang/String;
477477
}
478478

Diff for: kotlinx-coroutines-core/common/src/AbstractCoroutine.kt

+18-50
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ package kotlinx.coroutines
88
import kotlinx.coroutines.CoroutineStart.*
99
import kotlinx.coroutines.intrinsics.*
1010
import kotlin.coroutines.*
11-
import kotlin.jvm.*
1211

1312
/**
1413
* Abstract base class for implementation of coroutines in coroutine builders.
@@ -26,20 +25,32 @@ import kotlin.jvm.*
2625
* * [onCancelled] in invoked when the coroutine completes with an exception (cancelled).
2726
*
2827
* @param parentContext the context of the parent coroutine.
28+
* @param initParentJob specifies whether the parent-child relationship should be instantiated directly
29+
* in `AbstractCoroutine` constructor. If set to `false`, it's the responsibility of the child class
30+
* to invoke [initParentJob] manually.
2931
* @param active when `true` (by default), the coroutine is created in the _active_ state, otherwise it is created in the _new_ state.
3032
* See [Job] for details.
3133
*
3234
* @suppress **This an internal API and should not be used from general code.**
3335
*/
3436
@InternalCoroutinesApi
3537
public abstract class AbstractCoroutine<in T>(
36-
/**
37-
* The context of the parent coroutine.
38-
*/
39-
@JvmField
40-
protected val parentContext: CoroutineContext,
41-
active: Boolean = true
38+
parentContext: CoroutineContext,
39+
initParentJob: Boolean,
40+
active: Boolean
4241
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
42+
43+
init {
44+
/*
45+
* Setup parent-child relationship between the parent in the context and the current coroutine.
46+
* It may cause this coroutine to become _cancelling_ if the parent is already cancelled.
47+
* It is dangerous to install parent-child relationship here if the coroutine class
48+
* operates its state from within onCancelled or onCancelling
49+
* (with exceptions for rx integrations that can't have any parent)
50+
*/
51+
if (initParentJob) initParentJob(parentContext[Job])
52+
}
53+
4354
/**
4455
* The context of this coroutine that includes this coroutine as a [Job].
4556
*/
@@ -53,28 +64,6 @@ public abstract class AbstractCoroutine<in T>(
5364

5465
override val isActive: Boolean get() = super.isActive
5566

56-
/**
57-
* Initializes the parent job from the `parentContext` of this coroutine that was passed to it during construction.
58-
* It shall be invoked at most once after construction after all other initialization.
59-
*
60-
* Invocation of this function may cause this coroutine to become cancelled if the parent is already cancelled,
61-
* in which case it synchronously invokes all the corresponding handlers.
62-
* @suppress **This is unstable API and it is subject to change.**
63-
*/
64-
internal fun initParentJob() {
65-
initParentJobInternal(parentContext[Job])
66-
}
67-
68-
/**
69-
* This function is invoked once when a non-active coroutine (constructed with `active` set to `false)
70-
* is [started][start].
71-
*/
72-
protected open fun onStart() {}
73-
74-
internal final override fun onStartInternal() {
75-
onStart()
76-
}
77-
7867
/**
7968
* This function is invoked once when the job was completed normally with the specified [value],
8069
* right before all the waiters for the coroutine's completion are notified.
@@ -127,34 +116,13 @@ public abstract class AbstractCoroutine<in T>(
127116
/**
128117
* Starts this coroutine with the given code [block] and [start] strategy.
129118
* This function shall be invoked at most once on this coroutine.
130-
*
131-
* First, this function initializes parent job from the `parentContext` of this coroutine that was passed to it
132-
* during construction. Second, it starts the coroutine based on [start] parameter:
133-
*
134-
* * [DEFAULT] uses [startCoroutineCancellable].
135-
* * [ATOMIC] uses [startCoroutine].
136-
* * [UNDISPATCHED] uses [startCoroutineUndispatched].
137-
* * [LAZY] does nothing.
138-
*/
139-
public fun start(start: CoroutineStart, block: suspend () -> T) {
140-
initParentJob()
141-
start(block, this)
142-
}
143-
144-
/**
145-
* Starts this coroutine with the given code [block] and [start] strategy.
146-
* This function shall be invoked at most once on this coroutine.
147-
*
148-
* First, this function initializes parent job from the `parentContext` of this coroutine that was passed to it
149-
* during construction. Second, it starts the coroutine based on [start] parameter:
150119
*
151120
* * [DEFAULT] uses [startCoroutineCancellable].
152121
* * [ATOMIC] uses [startCoroutine].
153122
* * [UNDISPATCHED] uses [startCoroutineUndispatched].
154123
* * [LAZY] does nothing.
155124
*/
156125
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
157-
initParentJob()
158126
start(block, receiver, this)
159127
}
160128
}

Diff for: kotlinx-coroutines-core/common/src/Builders.common.kt

+2-3
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public fun <T> CoroutineScope.async(
9696
private open class DeferredCoroutine<T>(
9797
parentContext: CoroutineContext,
9898
active: Boolean
99-
) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> {
99+
) : AbstractCoroutine<T>(parentContext, true, active = active), Deferred<T>, SelectClause1<T> {
100100
override fun getCompleted(): T = getCompletedInternal() as T
101101
override suspend fun await(): T = awaitInternal() as T
102102
override val onAwait: SelectClause1<T> get() = this
@@ -167,7 +167,6 @@ public suspend fun <T> withContext(
167167
}
168168
// SLOW PATH -- use new dispatcher
169169
val coroutine = DispatchedCoroutine(newContext, uCont)
170-
coroutine.initParentJob()
171170
block.startCoroutineCancellable(coroutine, coroutine)
172171
coroutine.getResult()
173172
}
@@ -188,7 +187,7 @@ public suspend inline operator fun <T> CoroutineDispatcher.invoke(
188187
private open class StandaloneCoroutine(
189188
parentContext: CoroutineContext,
190189
active: Boolean
191-
) : AbstractCoroutine<Unit>(parentContext, active) {
190+
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
192191
override fun handleJobException(exception: Throwable): Boolean {
193192
handleCoroutineException(context, exception)
194193
return true

Diff for: kotlinx-coroutines-core/common/src/CompletableDeferred.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public fun <T> CompletableDeferred(value: T): CompletableDeferred<T> = Completab
8080
private class CompletableDeferredImpl<T>(
8181
parent: Job?
8282
) : JobSupport(true), CompletableDeferred<T>, SelectClause1<T> {
83-
init { initParentJobInternal(parent) }
83+
init { initParentJob(parent) }
8484
override val onCancelComplete get() = true
8585
override fun getCompleted(): T = getCompletedInternal() as T
8686
override suspend fun await(): T = awaitInternal() as T

Diff for: kotlinx-coroutines-core/common/src/Job.kt

+11
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,14 @@ public interface ParentJob : Job {
457457
@InternalCoroutinesApi
458458
@Deprecated(level = DeprecationLevel.ERROR, message = "This is internal API and may be removed in the future releases")
459459
public interface ChildHandle : DisposableHandle {
460+
461+
/**
462+
* Returns the parent of the current parent-child relationship.
463+
* @suppress **This is unstable API and it is subject to change.**
464+
*/
465+
@InternalCoroutinesApi
466+
public val parent: Job?
467+
460468
/**
461469
* Child is cancelling its parent by invoking this method.
462470
* This method is invoked by the child twice. The first time child report its root cause as soon as possible,
@@ -650,6 +658,9 @@ private fun Throwable?.orCancellation(job: Job): Throwable = this ?: JobCancella
650658
*/
651659
@InternalCoroutinesApi
652660
public object NonDisposableHandle : DisposableHandle, ChildHandle {
661+
662+
override val parent: Job? get() = null
663+
653664
/**
654665
* Does not do anything.
655666
* @suppress

Diff for: kotlinx-coroutines-core/common/src/JobSupport.kt

+7-6
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
9696
~ waits for start
9797
>> start / join / await invoked
9898
## ACTIVE: state == EMPTY_ACTIVE | is JobNode | is NodeList
99-
+ onStartInternal / onStart (lazy coroutine is started)
99+
+ onStart (lazy coroutine is started)
100100
~ active coroutine is working (or scheduled to execution)
101101
>> childCancelled / cancelImpl invoked
102102
## CANCELLING: state is Finishing, state.rootCause != null
@@ -139,7 +139,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
139139
* Initializes parent job.
140140
* It shall be invoked at most once after construction after all other initialization.
141141
*/
142-
internal fun initParentJobInternal(parent: Job?) {
142+
protected fun initParentJob(parent: Job?) {
143143
assert { parentHandle == null }
144144
if (parent == null) {
145145
parentHandle = NonDisposableHandle
@@ -393,12 +393,12 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
393393
is Empty -> { // EMPTY_X state -- no completion handlers
394394
if (state.isActive) return FALSE // already active
395395
if (!_state.compareAndSet(state, EMPTY_ACTIVE)) return RETRY
396-
onStartInternal()
396+
onStart()
397397
return TRUE
398398
}
399399
is InactiveNodeList -> { // LIST state -- inactive with a list of completion handlers
400400
if (!_state.compareAndSet(state, state.list)) return RETRY
401-
onStartInternal()
401+
onStart()
402402
return TRUE
403403
}
404404
else -> return FALSE // not a new state
@@ -409,7 +409,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
409409
* Override to provide the actual [start] action.
410410
* This function is invoked exactly once when non-active coroutine is [started][start].
411411
*/
412-
internal open fun onStartInternal() {}
412+
protected open fun onStart() {}
413413

414414
public final override fun getCancellationException(): CancellationException =
415415
when (val state = this.state) {
@@ -1311,7 +1311,7 @@ private class Empty(override val isActive: Boolean) : Incomplete {
13111311
}
13121312

13131313
internal open class JobImpl(parent: Job?) : JobSupport(true), CompletableJob {
1314-
init { initParentJobInternal(parent) }
1314+
init { initParentJob(parent) }
13151315
override val onCancelComplete get() = true
13161316
/*
13171317
* Check whether parent is able to handle exceptions as well.
@@ -1459,6 +1459,7 @@ private class InvokeOnCancelling(
14591459
internal class ChildHandleNode(
14601460
@JvmField val childJob: ChildJob
14611461
) : JobCancellingNode(), ChildHandle {
1462+
override val parent: Job get() = job
14621463
override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
14631464
override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
14641465
}

Diff for: kotlinx-coroutines-core/common/src/channels/Broadcast.kt

+7-1
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,13 @@ private open class BroadcastCoroutine<E>(
127127
parentContext: CoroutineContext,
128128
protected val _channel: BroadcastChannel<E>,
129129
active: Boolean
130-
) : AbstractCoroutine<Unit>(parentContext, active), ProducerScope<E>, BroadcastChannel<E> by _channel {
130+
) : AbstractCoroutine<Unit>(parentContext, initParentJob = false, active = active),
131+
ProducerScope<E>, BroadcastChannel<E> by _channel {
132+
133+
init {
134+
initParentJob(parentContext[Job])
135+
}
136+
131137
override val isActive: Boolean get() = super.isActive
132138

133139
override val channel: SendChannel<E>

Diff for: kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@ import kotlin.coroutines.*
1111
internal open class ChannelCoroutine<E>(
1212
parentContext: CoroutineContext,
1313
protected val _channel: Channel<E>,
14+
initParentJob: Boolean,
1415
active: Boolean
15-
) : AbstractCoroutine<Unit>(parentContext, active), Channel<E> by _channel {
16+
) : AbstractCoroutine<Unit>(parentContext, initParentJob, active), Channel<E> by _channel {
17+
1618
val channel: Channel<E> get() = this
1719

1820
override fun cancel() {

Diff for: kotlinx-coroutines-core/common/src/channels/Produce.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ internal fun <E> CoroutineScope.produce(
139139

140140
internal open class ProducerCoroutine<E>(
141141
parentContext: CoroutineContext, channel: Channel<E>
142-
) : ChannelCoroutine<E>(parentContext, channel, active = true), ProducerScope<E> {
142+
) : ChannelCoroutine<E>(parentContext, channel, true, active = true), ProducerScope<E> {
143143
override val isActive: Boolean
144144
get() = super.isActive
145145

Diff for: kotlinx-coroutines-core/common/src/internal/Scopes.kt

+4-3
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@ import kotlin.jvm.*
1515
internal open class ScopeCoroutine<in T>(
1616
context: CoroutineContext,
1717
@JvmField val uCont: Continuation<T> // unintercepted continuation
18-
) : AbstractCoroutine<T>(context, true), CoroutineStackFrame {
18+
) : AbstractCoroutine<T>(context, true, true), CoroutineStackFrame {
19+
1920
final override val callerFrame: CoroutineStackFrame? get() = uCont as? CoroutineStackFrame
2021
final override fun getStackTraceElement(): StackTraceElement? = null
21-
final override val isScopedCoroutine: Boolean get() = true
2222

23-
internal val parent: Job? get() = parentContext[Job]
23+
final override val isScopedCoroutine: Boolean get() = true
24+
internal val parent: Job? get() = parentHandle?.parent
2425

2526
override fun afterCompletion(state: Any?) {
2627
// Resume in a cancellable way by default when resuming from another context

0 commit comments

Comments
 (0)