diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index 0c63b6a4b4..3ef0f0b076 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -464,8 +464,9 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren if (state.isActive) { // try move to SINGLE state if (_state.compareAndSet(state, node)) return node - } else + } else { promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine + } } is Incomplete -> { val list = state.list @@ -522,7 +523,32 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren return node } - private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode) = + /** + * Adds the [node] to the given [list] if the state of this job is equal to [expect]. + * + * This method is invoked only from [invokeOnCompletion] (two places) and [expect] is always a snapshot of the + * `state` that [invokeOnCompletion] works on. + * + * It is required to preserve the following invariants: + * + * 1) Exactly once handler invocation. + * Without atomic addIf, the following scenario is possible: + * * T1 reads `state`, attempts to add a handler to the list, gets preempted + * * T2 finalizes the state, CASes it and invokes all the handlers from the list + * * T1 adds its handler to the list to be never invoked + * If T1 starts to re-check the state after the addition, then the handler might be invoked twice. + * + * 2) Parent finalization. + * + * NB: it's totally unobvious from the signature, but [tryWaitForChildren] if's over `invokeOnCompletion` result: + * if it's `NonDisposableHandle` then the addition failed, otherwise it's successfully added. + * If handler fails to be added, then child is cancelled/completed and there is no need to + * either wait for it **or** invoke its handler. + * + * Additional implementation note: all internal handlers implement [LockFreeLinkedListNode] and + * LFLL **does not** support additions and removals of the same node (with the same identity) more than once. + */ + private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode): Boolean = list.addLastIf(node) { this.state === expect } private fun promoteEmptyToNodeList(state: Empty) { @@ -904,7 +930,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren notifyRootCause?.let { notifyCancelling(list, it) } // now wait for children val child = firstChild(state) - if (child != null && tryWaitForChild(finishing, child, proposedUpdate)) + if (child != null && tryWaitForChildren(finishing, child, proposedUpdate)) return COMPLETING_WAITING_CHILDREN // otherwise -- we have not children left (all were already cancelled?) return finalizeFinishingState(finishing, proposedUpdate) @@ -916,16 +942,26 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren private fun firstChild(state: Incomplete) = state as? ChildHandleNode ?: state.list?.nextChild() - // return false when there is no more incomplete children to wait - // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method. - private tailrec fun tryWaitForChild(state: Finishing, child: ChildHandleNode, proposedUpdate: Any?): Boolean { + /** + * This method is invoked by the JobSupport lifecycle when it transitions into "finishing" state, + * and in order to be transitioned into its final state it has to wait for all the children. + * If this method returns `true` then the current job is waiting for its children. + * It's done via an addition of a special completion handler to the child that, when invoked, invokes + * [continueCompleting] that almost immediately fallbacks to [tryWaitForChildren]. + * + * If it returns `false`, then there are no children to wait for and it's safe to finalize the final job's state. + * Note that at this point, new children can appear: + * * If the state is `cancelling`, they are immediately cancelled. + * * If the state is `completing`, they are not cancelled; instead, we observe an inconsistent state where their parent is completed and doesn't store references to the children, but they are still active. + */ + private tailrec fun tryWaitForChildren(state: Finishing, child: ChildHandleNode, proposedUpdate: Any?): Boolean { val handle = child.childJob.invokeOnCompletion( invokeImmediately = false, handler = ChildCompletion(this, state, child, proposedUpdate).asHandler ) if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it val nextChild = child.nextChild() ?: return false - return tryWaitForChild(state, nextChild, proposedUpdate) + return tryWaitForChildren(state, nextChild, proposedUpdate) } // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method. @@ -934,7 +970,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren // figure out if we need to wait for next child val waitChild = lastChild.nextChild() // try wait for next child - if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child + if (waitChild != null && tryWaitForChildren(state, waitChild, proposedUpdate)) return // waiting for next child // no more children to wait -- try update state val finalState = finalizeFinishingState(state, proposedUpdate) afterCompletion(finalState) diff --git a/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt b/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt index a692e26f7c..5f8c412000 100644 --- a/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt +++ b/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt @@ -72,6 +72,8 @@ public actual open class LockFreeLinkedListNode { } } + // This is Harris's RDCSS (Restricted Double-Compare Single Swap) operation + // It inserts "op" descriptor of when "op" status is still undecided (rolls back otherwise) // fixme replace the suppress with AllowDifferentMembersInActual once stdlib is updated to 1.9.20 https://github.com/Kotlin/kotlinx.coroutines/issues/3846 @Suppress("NON_ACTUAL_MEMBER_DECLARED_IN_EXPECT_NON_FINAL_CLASSIFIER_ACTUALIZATION") @PublishedApi @@ -231,9 +233,6 @@ public actual open class LockFreeLinkedListNode { } } - // This is Harris's RDCSS (Restricted Double-Compare Single Swap) operation - // It inserts "op" descriptor of when "op" status is still undecided (rolls back otherwise) - // ------ other helpers ------ diff --git a/kotlinx-coroutines-core/jvm/test/MemoryFootprintTest.kt b/kotlinx-coroutines-core/jvm/test/MemoryFootprintTest.kt index be467cc5cd..d9b5c65eb8 100644 --- a/kotlinx-coroutines-core/jvm/test/MemoryFootprintTest.kt +++ b/kotlinx-coroutines-core/jvm/test/MemoryFootprintTest.kt @@ -5,14 +5,32 @@ package kotlinx.coroutines import org.junit.Test -import org.openjdk.jol.info.ClassLayout +import org.openjdk.jol.info.* import kotlin.test.* class MemoryFootprintTest : TestBase(true) { @Test - fun testJobLayout() = assertLayout(Job().javaClass, 24) + fun testJobLayout() { + assertLayout(jobWithChildren(0).javaClass, 24) + } + + @Test + fun testJobSize() { + assertTotalSize(jobWithChildren(1), 112) + assertTotalSize(jobWithChildren(2), 192) // + 80 + assertTotalSize(jobWithChildren(3), 248) // + 56 + assertTotalSize(jobWithChildren(4), 304) // + 56 + } + + private fun jobWithChildren(numberOfChildren: Int): Job { + val result = Job() + repeat(numberOfChildren) { + Job(result) + } + return result + } @Test fun testCancellableContinuationFootprint() = assertLayout(CancellableContinuationImpl::class.java, 48) @@ -22,4 +40,9 @@ class MemoryFootprintTest : TestBase(true) { // println(ClassLayout.parseClass(clz).toPrintable()) assertEquals(expectedSize.toLong(), size) } + + private fun assertTotalSize(instance: Job, expectedSize: Int) { + val size = GraphLayout.parseInstance(instance).totalSize() + assertEquals(expectedSize.toLong(), size) + } }