Skip to content

Commit 023b194

Browse files
committed
Forbid adding new child jobs after the parent decides to complete
1 parent ffaa790 commit 023b194

File tree

2 files changed

+63
-21
lines changed

2 files changed

+63
-21
lines changed

kotlinx-coroutines-core/common/src/JobSupport.kt

+36-21
Original file line numberDiff line numberDiff line change
@@ -909,6 +909,10 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
909909
val child = firstChild(state)
910910
if (child != null && tryWaitForChild(finishing, child, proposedUpdate))
911911
return COMPLETING_WAITING_CHILDREN
912+
list.close(LIST_CHILD_PERMISSION)
913+
val anotherChild = firstChild(state)
914+
if (anotherChild != null && tryWaitForChild(finishing, anotherChild, proposedUpdate))
915+
return COMPLETING_WAITING_CHILDREN
912916
// otherwise -- we have not children left (all were already cancelled?)
913917
return finalizeFinishingState(finishing, proposedUpdate)
914918
}
@@ -938,7 +942,13 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
938942
val waitChild = lastChild.nextChild()
939943
// try wait for next child
940944
if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child
941-
// no more children to wait -- try update state
945+
// no more children to wait -- stop accepting children
946+
state.list.close(LIST_CHILD_PERMISSION)
947+
// did any children get added?
948+
val waitChildAgain = lastChild.nextChild()
949+
// try wait for next child
950+
if (waitChildAgain != null && tryWaitForChild(state, waitChildAgain, proposedUpdate)) return // waiting for next child
951+
// no more children, now we are sure; try to update the state
942952
val finalState = finalizeFinishingState(state, proposedUpdate)
943953
afterCompletion(finalState)
944954
}
@@ -978,41 +988,45 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
978988
val node = ChildHandleNode(child).also { it.job = this }
979989
val added = tryPutNodeIntoList(node) { state, list ->
980990
if (state is Finishing) {
981-
val rootCause: Throwable
991+
val rootCause: Throwable?
982992
val handle: ChildHandle
983993
synchronized(state) {
984994
// check if we are installing cancellation handler on job that is being cancelled
985-
val maybeRootCause = state.rootCause // != null if cancelling job
995+
rootCause = state.rootCause // != null if cancelling job
986996
// We add the node to the list in two cases --- either the job is not being cancelled,
987997
// or we are adding a child to a coroutine that is not completing yet
988-
if (maybeRootCause == null || !state.isCompleting) {
998+
if (rootCause == null || !state.isCompleting) {
989999
// Note: add node the list while holding lock on state (make sure it cannot change)
990-
if (!list.addLast(node, LIST_MAX_PERMISSION))
991-
return@tryPutNodeIntoList false // retry
1000+
handle = if (list.addLast(node, LIST_CHILD_PERMISSION)) {
1001+
node
1002+
} else {
1003+
NonDisposableHandle
1004+
}
9921005
// just return the node if we don't have to invoke the handler (not cancelling yet)
993-
rootCause = maybeRootCause ?: return@tryPutNodeIntoList true
9941006
// otherwise handler is invoked immediately out of the synchronized section & handle returned
995-
handle = node
9961007
} else {
997-
rootCause = maybeRootCause
9981008
handle = NonDisposableHandle
9991009
}
10001010
}
10011011
node.invoke(rootCause)
10021012
return handle
1003-
} else list.addLast(node, LIST_MAX_PERMISSION).also { success ->
1004-
if (success) {
1005-
/** Handling the following case:
1006-
* - A child requested to be added to the list;
1007-
* - We checked the state and saw that it wasn't `Finishing`;
1008-
* - Then, the job got cancelled and notified everyone about it;
1009-
* - Only then did we add the child to the list
1010-
* - and ended up here.
1011-
*/
1012-
val latestState = this@JobSupport.state
1013-
if (latestState is Finishing) {
1014-
synchronized(latestState) { latestState.rootCause }?.let { node.invoke(it) }
1013+
} else {
1014+
list.addLast(node, LIST_CHILD_PERMISSION).also { success ->
1015+
if (success) {
1016+
/** Handling the following case:
1017+
* - A child requested to be added to the list;
1018+
* - We checked the state and saw that it wasn't `Finishing`;
1019+
* - Then, the job got cancelled and notified everyone about it;
1020+
* - Only then did we add the child to the list
1021+
* - and ended up here.
1022+
*/
1023+
val latestState = this@JobSupport.state
1024+
if (latestState is Finishing) {
1025+
synchronized(latestState) { latestState.rootCause }?.let { node.invoke(it) }
1026+
}
10151027
}
1028+
// if we didn't add the node to the list, we'll loop and notice
1029+
// either `Finishing` or the final state, so no spin loop here
10161030
}
10171031
}
10181032
}
@@ -1348,6 +1362,7 @@ private val EMPTY_NEW = Empty(false)
13481362
private val EMPTY_ACTIVE = Empty(true)
13491363

13501364
private const val LIST_MAX_PERMISSION = Int.MAX_VALUE
1365+
private const val LIST_CHILD_PERMISSION = 1
13511366
private const val LIST_CANCELLATION_PERMISSION = 0
13521367

13531368
private class Empty(override val isActive: Boolean) : Incomplete {

kotlinx-coroutines-core/jvm/test/JobChildStressTest.kt

+27
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import kotlinx.coroutines.testing.*
44
import org.junit.*
55
import org.junit.Test
66
import java.util.concurrent.*
7+
import java.util.concurrent.atomic.*
78
import kotlin.test.*
89

910
class JobChildStressTest : TestBase() {
@@ -54,4 +55,30 @@ class JobChildStressTest : TestBase() {
5455
}
5556
}
5657
}
58+
59+
@Test
60+
fun testFailingChildIsAddedWhenJobFinalizesItsState() {
61+
// All exceptions should get aggregated here
62+
repeat(N_ITERATIONS) {
63+
runBlocking {
64+
val rogueJob = AtomicReference<Job?>()
65+
val deferred = CompletableDeferred<Unit>()
66+
launch(pool + deferred) {
67+
deferred.complete(Unit) // Transition deferred into "completing" state waiting for current child
68+
// **Asynchronously** submit task that launches a child so it races with completion
69+
pool.executor.execute {
70+
rogueJob.set(launch(pool + deferred) {
71+
throw TestException("isCancelled: ${coroutineContext.job.isCancelled}")
72+
})
73+
}
74+
}
75+
76+
deferred.join()
77+
val rogue = rogueJob.get()
78+
if (rogue?.isActive == true) {
79+
throw TestException("Rogue job $rogue with parent " + rogue.parent + " and children list: " + rogue.parent?.children?.toList())
80+
}
81+
}
82+
}
83+
}
5784
}

0 commit comments

Comments
 (0)