From ea913b00650591ab29bba124950d72410ace198e Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Wed, 20 Dec 2023 11:23:52 +0100 Subject: [PATCH 01/13] Remove DCSS --- .../common/src/JobSupport.kt | 41 ++++++--- .../common/src/internal/Atomic.kt | 67 --------------- .../src/internal/LockFreeLinkedList.common.kt | 10 +-- .../common/test/JobTest.kt | 14 +++ .../src/internal/LockFreeLinkedList.kt | 86 +++++-------------- .../src/internal/LinkedList.kt | 41 +++++---- .../test/internal/LinkedListTest.kt | 8 +- .../LockFreeLinkedListLongStressTest.kt | 2 +- 8 files changed, 96 insertions(+), 173 deletions(-) delete mode 100644 kotlinx-coroutines-core/common/src/internal/Atomic.kt diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index 5fda74fc88..d07592631a 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -159,12 +159,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren * If final state of the job is [Incomplete], then it is boxed into [IncompleteStateBox] * and should be [unboxed][unboxState] before returning to user code. */ - internal val state: Any? get() { - _state.loop { state -> // helper loop on state (complete in-progress atomic operations) - if (state !is OpDescriptor) return state - state.perform(this) - } - } + internal val state: Any? get() = _state.value /** * @suppress **This is unstable API and it is subject to change.** @@ -324,6 +319,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren private fun notifyCancelling(list: NodeList, cause: Throwable) { // first cancel our own children onCancelling(cause) + list.closeForSome() notifyHandlers(list, cause) { it.onCancelling } // then cancel parent cancelParent(cause) // tentative cancellation -- does not matter if there is no parent @@ -355,8 +351,10 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren return parent.childCancelled(cause) || isCancellation } - private fun NodeList.notifyCompletion(cause: Throwable?) = + private fun NodeList.notifyCompletion(cause: Throwable?) { + close() notifyHandlers(this, cause) { true } + } private inline fun notifyHandlers(list: NodeList, cause: Throwable?, predicate: (JobNode) -> Boolean) { var exception: Throwable? = null @@ -488,7 +486,11 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren // or we are adding a child to a coroutine that is not completing yet if (rootCause == null || node is ChildHandleNode && !state.isCompleting) { // Note: add node the list while holding lock on state (make sure it cannot change) - if (!addLastAtomic(state, list, node)) return@loopOnState // retry + if (!list.addLast( + node, + allowedAfterPartialClosing = node is ChildHandleNode + ) + ) return@loopOnState // retry // just return node if we don't have to invoke handler (not cancelling yet) if (rootCause == null) return node // otherwise handler is invoked immediately out of the synchronized section & handle returned @@ -500,8 +502,24 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren // Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job if (invokeImmediately) node.invoke(rootCause) return handle - } else { - if (addLastAtomic(state, list, node)) return node + } else if (list.addLast( + node, allowedAfterPartialClosing = !node.onCancelling || node is ChildHandleNode + )) { + if (node is ChildHandleNode) { + /** Handling the following case: + * - A child requested to be added to the list; + * - We checked the state and saw that it wasn't `Finishing`; + * - Then, the job got cancelled and notified everyone about it; + * - Only then did we add the child to the list + * - and ended up here. + */ + val latestState = this@JobSupport.state + if (latestState is Finishing) { + assert { invokeImmediately } + synchronized(latestState) { latestState.rootCause }?.let { node.invoke(it) } + } + } + return node } } } @@ -515,9 +533,6 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren } } - private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode) = - list.addLastIf(node) { this.state === expect } - private fun promoteEmptyToNodeList(state: Empty) { // try to promote it to LIST state with the corresponding state val list = NodeList() diff --git a/kotlinx-coroutines-core/common/src/internal/Atomic.kt b/kotlinx-coroutines-core/common/src/internal/Atomic.kt deleted file mode 100644 index eddddc72f1..0000000000 --- a/kotlinx-coroutines-core/common/src/internal/Atomic.kt +++ /dev/null @@ -1,67 +0,0 @@ -@file:Suppress("NO_EXPLICIT_VISIBILITY_IN_API_MODE") - -package kotlinx.coroutines.internal - -import kotlinx.atomicfu.atomic -import kotlinx.coroutines.* -import kotlin.jvm.* - -/** - * The most abstract operation that can be in process. Other threads observing an instance of this - * class in the fields of their object shall invoke [perform] to help. - * - * @suppress **This is unstable API and it is subject to change.** - */ -public abstract class OpDescriptor { - /** - * Returns `null` is operation was performed successfully or some other - * object that indicates the failure reason. - */ - abstract fun perform(affected: Any?): Any? - - override fun toString(): String = "$classSimpleName@$hexAddress" // debug -} - -@JvmField -internal val NO_DECISION: Any = Symbol("NO_DECISION") - -/** - * Descriptor for multi-word atomic operation. - * Based on paper - * ["A Practical Multi-Word Compare-and-Swap Operation"](https://www.cl.cam.ac.uk/research/srg/netos/papers/2002-casn.pdf) - * by Timothy L. Harris, Keir Fraser and Ian A. Pratt. - * - * Note: parts of atomic operation must be globally ordered. Otherwise, this implementation will produce - * `StackOverflowError`. - * - * @suppress **This is unstable API and it is subject to change.** - */ -@InternalCoroutinesApi -public abstract class AtomicOp : OpDescriptor() { - private val _consensus = atomic(NO_DECISION) - - private fun decide(decision: Any?): Any? { - assert { decision !== NO_DECISION } - val current = _consensus.value - if (current !== NO_DECISION) return current - if (_consensus.compareAndSet(NO_DECISION, decision)) return decision - return _consensus.value - } - - abstract fun prepare(affected: T): Any? // `null` if Ok, or failure reason - - abstract fun complete(affected: T, failure: Any?) // failure != null if failed to prepare op - - // returns `null` on success - @Suppress("UNCHECKED_CAST") - final override fun perform(affected: Any?): Any? { - // make decision on status - var decision = this._consensus.value - if (decision === NO_DECISION) { - decision = decide(prepare(affected as T)) - } - // complete operation - complete(affected as T, decision) - return decision - } -} diff --git a/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt b/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt index 7f9ee666ab..8a7814ecf6 100644 --- a/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt +++ b/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt @@ -2,22 +2,18 @@ package kotlinx.coroutines.internal -import kotlinx.coroutines.* -import kotlin.jvm.* - /** @suppress **This is unstable API and it is subject to change.** */ public expect open class LockFreeLinkedListNode() { public val isRemoved: Boolean public val nextNode: LockFreeLinkedListNode public val prevNode: LockFreeLinkedListNode + public fun addLast(node: LockFreeLinkedListNode, allowedAfterPartialClosing: Boolean): Boolean public fun addOneIfEmpty(node: LockFreeLinkedListNode): Boolean - public inline fun addLastIf(node: LockFreeLinkedListNode, crossinline condition: () -> Boolean): Boolean public open fun remove(): Boolean - + public fun close() + public fun closeForSome() } -internal fun LockFreeLinkedListNode.addLast(node: LockFreeLinkedListNode) = addLastIf(node) { true } - /** @suppress **This is unstable API and it is subject to change.** */ public expect open class LockFreeLinkedListHead() : LockFreeLinkedListNode { public inline fun forEach(block: (LockFreeLinkedListNode) -> Unit) diff --git a/kotlinx-coroutines-core/common/test/JobTest.kt b/kotlinx-coroutines-core/common/test/JobTest.kt index b86ac73138..55119ab65c 100644 --- a/kotlinx-coroutines-core/common/test/JobTest.kt +++ b/kotlinx-coroutines-core/common/test/JobTest.kt @@ -174,6 +174,20 @@ class JobTest : TestBase() { finish(4) } + @Test + fun testInvokeOnCancellingFiringOnNormalExit() = runTest { + val job = launch { + expect(2) + } + job.invokeOnCompletion(onCancelling = true) { + assertNull(it) + expect(3) + } + expect(1) + job.join() + finish(4) + } + @Test fun testOverriddenParent() = runTest { val parent = Job() diff --git a/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt b/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt index b2d069e6aa..d89bc71825 100644 --- a/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt +++ b/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt @@ -8,18 +8,6 @@ import kotlin.jvm.* private typealias Node = LockFreeLinkedListNode -@PublishedApi -internal const val UNDECIDED: Int = 0 - -@PublishedApi -internal const val SUCCESS: Int = 1 - -@PublishedApi -internal const val FAILURE: Int = 2 - -@PublishedApi -internal val CONDITION_FALSE: Any = Symbol("CONDITION_FALSE") - /** * Doubly-linked concurrent list node with remove support. * Based on paper @@ -49,37 +37,10 @@ public actual open class LockFreeLinkedListNode { private fun removed(): Removed = _removedRef.value ?: Removed(this).also { _removedRef.lazySet(it) } - @PublishedApi - internal abstract class CondAddOp( - @JvmField val newNode: Node - ) : AtomicOp() { - @JvmField var oldNext: Node? = null - - override fun complete(affected: Node, failure: Any?) { - val success = failure == null - val update = if (success) newNode else oldNext - if (update != null && affected._next.compareAndSet( this, update)) { - // only the thread the makes this update actually finishes add operation - if (success) newNode.finishAdd(oldNext!!) - } - } - } - - @PublishedApi - internal inline fun makeCondAddOp(node: Node, crossinline condition: () -> Boolean): CondAddOp = - object : CondAddOp(node) { - override fun prepare(affected: Node): Any? = if (condition()) null else CONDITION_FALSE - } - public actual open val isRemoved: Boolean get() = next is Removed // LINEARIZABLE. Returns Node | Removed - public val next: Any get() { - _next.loop { next -> - if (next !is OpDescriptor) return next - next.perform(this) - } - } + public val next: Any get() = _next.value // LINEARIZABLE. Returns next non-removed Node public actual val nextNode: Node get() = @@ -117,20 +78,30 @@ public actual open class LockFreeLinkedListNode { // ------ addLastXXX ------ /** - * Adds last item to this list atomically if the [condition] is true. + * Adds last item to this list. Returns `false` if the list is closed. */ - public actual inline fun addLastIf(node: Node, crossinline condition: () -> Boolean): Boolean { - val condAdd = makeCondAddOp(node, condition) + public actual fun addLast(node: Node, allowedAfterPartialClosing: Boolean): Boolean { while (true) { // lock-free loop on prev.next - val prev = prevNode // sentinel node is never removed, so prev is always defined - when (prev.tryCondAddNext(node, this, condAdd)) { - SUCCESS -> return true - FAILURE -> return false + val currentPrev = prevNode + return when { + currentPrev is ListClosedForAll -> false + currentPrev is ListClosedForSome -> + allowedAfterPartialClosing && currentPrev.addLast(node, allowedAfterPartialClosing) + currentPrev.addNext(node, this) -> true + else -> continue } } } - // ------ addXXX util ------ + /** + * Forbids adding some of the new items to this list. + */ + public actual fun closeForSome() { addLast(ListClosedForSome(), allowedAfterPartialClosing = false) } + + /** + * Forbids adding new items to this list. + */ + public actual fun close() { addLast(ListClosedForAll(), allowedAfterPartialClosing = true) } /** * Given: @@ -165,17 +136,6 @@ public actual open class LockFreeLinkedListNode { return true } - // returns UNDECIDED, SUCCESS or FAILURE - @PublishedApi - internal fun tryCondAddNext(node: Node, next: Node, condAdd: CondAddOp): Int { - node._prev.lazySet(this) - node._next.lazySet(next) - condAdd.oldNext = next - if (!_next.compareAndSet(next, condAdd)) return UNDECIDED - // added operation successfully (linearized) -- complete it & fixup the list - return if (condAdd.perform(this) == null) SUCCESS else FAILURE - } - // ------ removeXXX ------ /** @@ -273,10 +233,6 @@ public actual open class LockFreeLinkedListNode { } // slow path when we need to help remove operations this.isRemoved -> return null // nothing to do, this node was removed, bail out asap to save time - prevNext is OpDescriptor -> { // help & retry - prevNext.perform(prev) - return correctPrev() // retry from scratch - } prevNext is Removed -> { if (last !== null) { // newly added (prev) node is already removed, correct last.next around it @@ -332,3 +288,7 @@ public actual open class LockFreeLinkedListHead : LockFreeLinkedListNode() { // optimization: because head is never removed, we don't have to read _next.value to check these: override val isRemoved: Boolean get() = false } + +private class ListClosedForSome: LockFreeLinkedListNode() + +private class ListClosedForAll: LockFreeLinkedListNode() diff --git a/kotlinx-coroutines-core/jsAndWasmShared/src/internal/LinkedList.kt b/kotlinx-coroutines-core/jsAndWasmShared/src/internal/LinkedList.kt index 6e81c79f40..abed3ac498 100644 --- a/kotlinx-coroutines-core/jsAndWasmShared/src/internal/LinkedList.kt +++ b/kotlinx-coroutines-core/jsAndWasmShared/src/internal/LinkedList.kt @@ -14,13 +14,24 @@ public actual open class LockFreeLinkedListNode { inline actual val prevNode get() = _prev inline actual val isRemoved get() = _removed - @PublishedApi - internal fun addLast(node: Node) { - val prev = this._prev - node._next = this - node._prev = prev - prev._next = node - this._prev = node + public actual fun addLast(node: Node, allowedAfterPartialClosing: Boolean): Boolean = when (val prev = this._prev) { + is ListClosedForAll -> false + is ListClosedForSome -> allowedAfterPartialClosing && prev.addLast(node, allowedAfterPartialClosing) + else -> { + node._next = this + node._prev = prev + prev._next = node + this._prev = node + true + } + } + + public actual fun closeForSome() { + addLast(ListClosedForSome(), allowedAfterPartialClosing = true) + } + + public actual fun close() { + addLast(ListClosedForAll(), allowedAfterPartialClosing = false) } /* @@ -30,10 +41,6 @@ public actual open class LockFreeLinkedListNode { * invokes handler on remove */ public actual open fun remove(): Boolean { - return removeImpl() - } - - private fun removeImpl(): Boolean { if (_removed) return false val prev = this._prev val next = this._next @@ -45,13 +52,7 @@ public actual open class LockFreeLinkedListNode { public actual fun addOneIfEmpty(node: Node): Boolean { if (_next !== this) return false - addLast(node) - return true - } - - public actual inline fun addLastIf(node: Node, crossinline condition: () -> Boolean): Boolean { - if (!condition()) return false - addLast(node) + addLast(node, allowedAfterPartialClosing = false) return true } } @@ -72,3 +73,7 @@ public actual open class LockFreeLinkedListHead : Node() { // just a defensive programming -- makes sure that list head sentinel is never removed public actual final override fun remove(): Nothing = throw UnsupportedOperationException() } + +private class ListClosedForSome: LockFreeLinkedListNode() + +private class ListClosedForAll: LockFreeLinkedListNode() diff --git a/kotlinx-coroutines-core/jsAndWasmShared/test/internal/LinkedListTest.kt b/kotlinx-coroutines-core/jsAndWasmShared/test/internal/LinkedListTest.kt index d88ddf18f3..8e5610fa89 100644 --- a/kotlinx-coroutines-core/jsAndWasmShared/test/internal/LinkedListTest.kt +++ b/kotlinx-coroutines-core/jsAndWasmShared/test/internal/LinkedListTest.kt @@ -12,13 +12,13 @@ class LinkedListTest { fun testSimpleAddLastRemove() { val list = LockFreeLinkedListHead() assertContents(list) - val n1 = IntNode(1).apply { list.addLast(this) } + val n1 = IntNode(1).apply { list.addLast(this, allowedAfterPartialClosing = false) } assertContents(list, 1) - val n2 = IntNode(2).apply { list.addLast(this) } + val n2 = IntNode(2).apply { list.addLast(this, allowedAfterPartialClosing = false) } assertContents(list, 1, 2) - val n3 = IntNode(3).apply { list.addLast(this) } + val n3 = IntNode(3).apply { list.addLast(this, allowedAfterPartialClosing = false) } assertContents(list, 1, 2, 3) - val n4 = IntNode(4).apply { list.addLast(this) } + val n4 = IntNode(4).apply { list.addLast(this, allowedAfterPartialClosing = false) } assertContents(list, 1, 2, 3, 4) assertTrue(n1.remove()) assertContents(list, 2, 3, 4) diff --git a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListLongStressTest.kt b/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListLongStressTest.kt index 21adeb401e..ce041f23df 100644 --- a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListLongStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListLongStressTest.kt @@ -31,7 +31,7 @@ class LockFreeLinkedListLongStressTest : TestBase() { for (j in 0 until nAddThreads) threads += thread(start = false, name = "adder-$j") { for (i in j until nAdded step nAddThreads) { - list.addLast(IntNode(i)) + list.addLast(IntNode(i), allowedAfterPartialClosing = false) } println("${Thread.currentThread().name} completed") workingAdders.decrementAndGet() From a69748ceb63b0347ca99d5fabe3999a2c9dcc819 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Wed, 13 Mar 2024 14:33:12 +0100 Subject: [PATCH 02/13] Extract child-handling logic into another function This change is mostly a refactoring, except now, an arbitrary `onCancelling` handler that's not a child will not add itself in a `synchronized` block. Instead, only the root cause is read under a lock. --- .../common/src/JobSupport.kt | 160 ++++++++++-------- 1 file changed, 94 insertions(+), 66 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index d07592631a..b615570776 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -462,73 +462,57 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren node: JobNode ): DisposableHandle { node.job = this + // Create node upfront -- for common cases it just initializes JobNode.job field, + // for user-defined handlers it allocates a JobNode object that we might not need, but this is Ok. + val added = tryPutNodeIntoList(node) { state, list -> + if (node.onCancelling) { + val rootCause = (state as? Finishing)?.let { synchronized(it) { it.rootCause } } + if (rootCause == null) { + list.addLast(node, allowedAfterPartialClosing = false) + } else { + if (invokeImmediately) node.invoke(rootCause) + return NonDisposableHandle + } + } else { + list.addLast(node, allowedAfterPartialClosing = true) + } + } + when { + added -> return node + invokeImmediately -> node.invoke((state as? CompletedExceptionally)?.cause) + } + return NonDisposableHandle + } + + /** + * Puts [node] into the current state's list of completion handlers. + * + * Returns `false` if the state is already complete and doesn't accept new handlers. + * Returns `true` if the handler was successfully added to the list. + * + * [tryAdd] is invoked when the state is [Incomplete] and the list is not `null`, to decide on the specific + * behavior in this case. It must return + * - `true` if the element was successfully added to the list + * - `false` if the operation needs to be retried + */ + private inline fun tryPutNodeIntoList( + node: JobNode, + tryAdd: (Incomplete, NodeList) -> Boolean + ): Boolean { loopOnState { state -> when (state) { is Empty -> { // EMPTY_X state -- no completion handlers if (state.isActive) { - // try move to SINGLE state - if (_state.compareAndSet(state, node)) return node + // try to move to the SINGLE state + if (_state.compareAndSet(state, node)) return true } else promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine } - is Incomplete -> { - val list = state.list - if (list == null) { // SINGLE/SINGLE+ - promoteSingleToNodeList(state as JobNode) - } else { - var rootCause: Throwable? = null - var handle: DisposableHandle = NonDisposableHandle - if (node.onCancelling && state is Finishing) { - synchronized(state) { - // check if we are installing cancellation handler on job that is being cancelled - rootCause = state.rootCause // != null if cancelling job - // We add node to the list in two cases --- either the job is not being cancelled - // or we are adding a child to a coroutine that is not completing yet - if (rootCause == null || node is ChildHandleNode && !state.isCompleting) { - // Note: add node the list while holding lock on state (make sure it cannot change) - if (!list.addLast( - node, - allowedAfterPartialClosing = node is ChildHandleNode - ) - ) return@loopOnState // retry - // just return node if we don't have to invoke handler (not cancelling yet) - if (rootCause == null) return node - // otherwise handler is invoked immediately out of the synchronized section & handle returned - handle = node - } - } - } - if (rootCause != null) { - // Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job - if (invokeImmediately) node.invoke(rootCause) - return handle - } else if (list.addLast( - node, allowedAfterPartialClosing = !node.onCancelling || node is ChildHandleNode - )) { - if (node is ChildHandleNode) { - /** Handling the following case: - * - A child requested to be added to the list; - * - We checked the state and saw that it wasn't `Finishing`; - * - Then, the job got cancelled and notified everyone about it; - * - Only then did we add the child to the list - * - and ended up here. - */ - val latestState = this@JobSupport.state - if (latestState is Finishing) { - assert { invokeImmediately } - synchronized(latestState) { latestState.rootCause }?.let { node.invoke(it) } - } - } - return node - } - } - } - else -> { // is complete - // :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension, - // because we play type tricks on Kotlin/JS and handler is not necessarily a function there - if (invokeImmediately) node.invoke((state as? CompletedExceptionally)?.cause) - return NonDisposableHandle + is Incomplete -> when (val list = state.list) { + null -> promoteSingleToNodeList(state as JobNode) + else -> if (tryAdd(state, list)) return true } + else -> return false } } } @@ -973,14 +957,58 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren public final override fun attachChild(child: ChildJob): ChildHandle { /* * Note: This function attaches a special ChildHandleNode node object. This node object - * is handled in a special way on completion on the coroutine (we wait for all of them) and - * is handled specially by invokeOnCompletion itself -- it adds this node to the list even - * if the job is already cancelling. For cancelling state child is attached under state lock. - * It's required to properly wait all children before completion and provide linearizable hierarchy view: - * If child is attached when the job is already being cancelled, such child will receive immediate notification on - * cancellation, but parent *will* wait for that child before completion and will handle its exception. + * is handled in a special way on completion on the coroutine (we wait for all of them) and also + * can't be added simply with `invokeOnCompletionInternal` -- we add this node to the list even + * if the job is already cancelling. For cancelling state, the child is attached under state lock. + * It's required to properly await all children before completion and provide a linearizable hierarchy view: + * If the child is attached when the job is already being cancelled, such a child will receive + * an immediate notification on cancellation, + * but the parent *will* wait for that child before completion and will handle its exception. */ - return invokeOnCompletion(handler = ChildHandleNode(child)) as ChildHandle + val node = ChildHandleNode(child).also { it.job = this } + val added = tryPutNodeIntoList(node) { state, list -> + if (state is Finishing) { + val rootCause: Throwable + val handle: ChildHandle + synchronized(state) { + // check if we are installing cancellation handler on job that is being cancelled + val maybeRootCause = state.rootCause // != null if cancelling job + // We add the node to the list in two cases --- either the job is not being cancelled, + // or we are adding a child to a coroutine that is not completing yet + if (maybeRootCause == null || !state.isCompleting) { + // Note: add node the list while holding lock on state (make sure it cannot change) + if (!list.addLast(node, allowedAfterPartialClosing = true)) + return@tryPutNodeIntoList false // retry + // just return the node if we don't have to invoke the handler (not cancelling yet) + rootCause = maybeRootCause ?: return@tryPutNodeIntoList true + // otherwise handler is invoked immediately out of the synchronized section & handle returned + handle = node + } else { + rootCause = maybeRootCause + handle = NonDisposableHandle + } + } + node.invoke(rootCause) + return handle + } else list.addLast(node, allowedAfterPartialClosing = true).also { success -> + if (success) { + /** Handling the following case: + * - A child requested to be added to the list; + * - We checked the state and saw that it wasn't `Finishing`; + * - Then, the job got cancelled and notified everyone about it; + * - Only then did we add the child to the list + * - and ended up here. + */ + val latestState = this@JobSupport.state + if (latestState is Finishing) { + synchronized(latestState) { latestState.rootCause }?.let { node.invoke(it) } + } + } + } + } + if (added) return node + node.invoke((state as? CompletedExceptionally)?.cause) + return NonDisposableHandle } /** From 8a76e3188ce79ccca23909c0bf5cfb95f3525b0a Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Thu, 14 Mar 2024 14:33:45 +0100 Subject: [PATCH 03/13] Generalize the concept of list closing to any number of levels --- .../common/src/JobSupport.kt | 15 +++++++++------ .../src/internal/LockFreeLinkedList.common.kt | 9 ++++++--- .../src/internal/LockFreeLinkedList.kt | 18 +++++------------- .../src/internal/LinkedList.kt | 19 ++++++------------- .../test/internal/LinkedListTest.kt | 8 ++++---- .../LockFreeLinkedListLongStressTest.kt | 3 +-- 6 files changed, 31 insertions(+), 41 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index b615570776..d3d30b2011 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -319,7 +319,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren private fun notifyCancelling(list: NodeList, cause: Throwable) { // first cancel our own children onCancelling(cause) - list.closeForSome() + list.close(LIST_CANCELLATION_PERMISSION) notifyHandlers(list, cause) { it.onCancelling } // then cancel parent cancelParent(cause) // tentative cancellation -- does not matter if there is no parent @@ -352,7 +352,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren } private fun NodeList.notifyCompletion(cause: Throwable?) { - close() + close(LIST_MAX_PERMISSION) notifyHandlers(this, cause) { true } } @@ -468,13 +468,13 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren if (node.onCancelling) { val rootCause = (state as? Finishing)?.let { synchronized(it) { it.rootCause } } if (rootCause == null) { - list.addLast(node, allowedAfterPartialClosing = false) + list.addLast(node, LIST_CANCELLATION_PERMISSION) } else { if (invokeImmediately) node.invoke(rootCause) return NonDisposableHandle } } else { - list.addLast(node, allowedAfterPartialClosing = true) + list.addLast(node, LIST_MAX_PERMISSION) } } when { @@ -977,7 +977,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren // or we are adding a child to a coroutine that is not completing yet if (maybeRootCause == null || !state.isCompleting) { // Note: add node the list while holding lock on state (make sure it cannot change) - if (!list.addLast(node, allowedAfterPartialClosing = true)) + if (!list.addLast(node, LIST_MAX_PERMISSION)) return@tryPutNodeIntoList false // retry // just return the node if we don't have to invoke the handler (not cancelling yet) rootCause = maybeRootCause ?: return@tryPutNodeIntoList true @@ -990,7 +990,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren } node.invoke(rootCause) return handle - } else list.addLast(node, allowedAfterPartialClosing = true).also { success -> + } else list.addLast(node, LIST_MAX_PERMISSION).also { success -> if (success) { /** Handling the following case: * - A child requested to be added to the list; @@ -1339,6 +1339,9 @@ private val SEALED = Symbol("SEALED") private val EMPTY_NEW = Empty(false) private val EMPTY_ACTIVE = Empty(true) +private const val LIST_MAX_PERMISSION = Int.MAX_VALUE +private const val LIST_CANCELLATION_PERMISSION = 0 + private class Empty(override val isActive: Boolean) : Incomplete { override val list: NodeList? get() = null override fun toString(): String = "Empty{${if (isActive) "Active" else "New" }}" diff --git a/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt b/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt index 8a7814ecf6..2be4119f23 100644 --- a/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt +++ b/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt @@ -7,11 +7,14 @@ public expect open class LockFreeLinkedListNode() { public val isRemoved: Boolean public val nextNode: LockFreeLinkedListNode public val prevNode: LockFreeLinkedListNode - public fun addLast(node: LockFreeLinkedListNode, allowedAfterPartialClosing: Boolean): Boolean + public fun addLast(node: LockFreeLinkedListNode, clearanceLevel: Int): Boolean public fun addOneIfEmpty(node: LockFreeLinkedListNode): Boolean public open fun remove(): Boolean - public fun close() - public fun closeForSome() + + /** + * Closes the list for [maxForbidden] and all numbers below. + */ + public fun close(maxForbidden: Int) } /** @suppress **This is unstable API and it is subject to change.** */ diff --git a/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt b/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt index d89bc71825..0ffcd27c4c 100644 --- a/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt +++ b/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt @@ -80,28 +80,22 @@ public actual open class LockFreeLinkedListNode { /** * Adds last item to this list. Returns `false` if the list is closed. */ - public actual fun addLast(node: Node, allowedAfterPartialClosing: Boolean): Boolean { + public actual fun addLast(node: Node, clearanceLevel: Int): Boolean { while (true) { // lock-free loop on prev.next val currentPrev = prevNode return when { - currentPrev is ListClosedForAll -> false - currentPrev is ListClosedForSome -> - allowedAfterPartialClosing && currentPrev.addLast(node, allowedAfterPartialClosing) + currentPrev is ListClosed -> + currentPrev.maxForbidden < clearanceLevel && currentPrev.addLast(node, clearanceLevel) currentPrev.addNext(node, this) -> true else -> continue } } } - /** - * Forbids adding some of the new items to this list. - */ - public actual fun closeForSome() { addLast(ListClosedForSome(), allowedAfterPartialClosing = false) } - /** * Forbids adding new items to this list. */ - public actual fun close() { addLast(ListClosedForAll(), allowedAfterPartialClosing = true) } + public actual fun close(maxForbidden: Int) { addLast(ListClosed(maxForbidden), maxForbidden) } /** * Given: @@ -289,6 +283,4 @@ public actual open class LockFreeLinkedListHead : LockFreeLinkedListNode() { override val isRemoved: Boolean get() = false } -private class ListClosedForSome: LockFreeLinkedListNode() - -private class ListClosedForAll: LockFreeLinkedListNode() +private class ListClosed(val maxForbidden: Int): LockFreeLinkedListNode() diff --git a/kotlinx-coroutines-core/jsAndWasmShared/src/internal/LinkedList.kt b/kotlinx-coroutines-core/jsAndWasmShared/src/internal/LinkedList.kt index abed3ac498..16fa8331cf 100644 --- a/kotlinx-coroutines-core/jsAndWasmShared/src/internal/LinkedList.kt +++ b/kotlinx-coroutines-core/jsAndWasmShared/src/internal/LinkedList.kt @@ -14,9 +14,8 @@ public actual open class LockFreeLinkedListNode { inline actual val prevNode get() = _prev inline actual val isRemoved get() = _removed - public actual fun addLast(node: Node, allowedAfterPartialClosing: Boolean): Boolean = when (val prev = this._prev) { - is ListClosedForAll -> false - is ListClosedForSome -> allowedAfterPartialClosing && prev.addLast(node, allowedAfterPartialClosing) + public actual fun addLast(node: Node, clearanceLevel: Int): Boolean = when (val prev = this._prev) { + is ListClosed -> prev.maxForbidden < clearanceLevel && prev.addLast(node, clearanceLevel) else -> { node._next = this node._prev = prev @@ -26,12 +25,8 @@ public actual open class LockFreeLinkedListNode { } } - public actual fun closeForSome() { - addLast(ListClosedForSome(), allowedAfterPartialClosing = true) - } - - public actual fun close() { - addLast(ListClosedForAll(), allowedAfterPartialClosing = false) + public actual fun close(maxForbidden: Int) { + addLast(ListClosed(maxForbidden), maxForbidden) } /* @@ -52,7 +47,7 @@ public actual open class LockFreeLinkedListNode { public actual fun addOneIfEmpty(node: Node): Boolean { if (_next !== this) return false - addLast(node, allowedAfterPartialClosing = false) + addLast(node, Int.MIN_VALUE) return true } } @@ -74,6 +69,4 @@ public actual open class LockFreeLinkedListHead : Node() { public actual final override fun remove(): Nothing = throw UnsupportedOperationException() } -private class ListClosedForSome: LockFreeLinkedListNode() - -private class ListClosedForAll: LockFreeLinkedListNode() +private class ListClosed(val maxForbidden: Int): LockFreeLinkedListNode() diff --git a/kotlinx-coroutines-core/jsAndWasmShared/test/internal/LinkedListTest.kt b/kotlinx-coroutines-core/jsAndWasmShared/test/internal/LinkedListTest.kt index 8e5610fa89..305484f741 100644 --- a/kotlinx-coroutines-core/jsAndWasmShared/test/internal/LinkedListTest.kt +++ b/kotlinx-coroutines-core/jsAndWasmShared/test/internal/LinkedListTest.kt @@ -12,13 +12,13 @@ class LinkedListTest { fun testSimpleAddLastRemove() { val list = LockFreeLinkedListHead() assertContents(list) - val n1 = IntNode(1).apply { list.addLast(this, allowedAfterPartialClosing = false) } + val n1 = IntNode(1).apply { list.addLast(this, Int.MAX_VALUE) } assertContents(list, 1) - val n2 = IntNode(2).apply { list.addLast(this, allowedAfterPartialClosing = false) } + val n2 = IntNode(2).apply { list.addLast(this, Int.MAX_VALUE) } assertContents(list, 1, 2) - val n3 = IntNode(3).apply { list.addLast(this, allowedAfterPartialClosing = false) } + val n3 = IntNode(3).apply { list.addLast(this, Int.MAX_VALUE) } assertContents(list, 1, 2, 3) - val n4 = IntNode(4).apply { list.addLast(this, allowedAfterPartialClosing = false) } + val n4 = IntNode(4).apply { list.addLast(this, Int.MAX_VALUE) } assertContents(list, 1, 2, 3, 4) assertTrue(n1.remove()) assertContents(list, 2, 3, 4) diff --git a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListLongStressTest.kt b/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListLongStressTest.kt index ce041f23df..95be1cbe62 100644 --- a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListLongStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListLongStressTest.kt @@ -1,6 +1,5 @@ package kotlinx.coroutines.internal -import kotlinx.coroutines.testing.* import kotlinx.coroutines.testing.TestBase import org.junit.Test import java.util.* @@ -31,7 +30,7 @@ class LockFreeLinkedListLongStressTest : TestBase() { for (j in 0 until nAddThreads) threads += thread(start = false, name = "adder-$j") { for (i in j until nAdded step nAddThreads) { - list.addLast(IntNode(i), allowedAfterPartialClosing = false) + list.addLast(IntNode(i), Int.MAX_VALUE) } println("${Thread.currentThread().name} completed") workingAdders.decrementAndGet() From ef43ce5cb190566bec20ae8bc5de9089d5fb221f Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Thu, 14 Mar 2024 16:25:04 +0100 Subject: [PATCH 04/13] Forbid adding new child jobs after the parent decides to complete --- .../common/src/JobSupport.kt | 57 ++++++++++++------- .../jvm/test/JobChildStressTest.kt | 27 +++++++++ 2 files changed, 63 insertions(+), 21 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index d3d30b2011..e4699df8fb 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -899,6 +899,10 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren val child = firstChild(state) if (child != null && tryWaitForChild(finishing, child, proposedUpdate)) return COMPLETING_WAITING_CHILDREN + list.close(LIST_CHILD_PERMISSION) + val anotherChild = firstChild(state) + if (anotherChild != null && tryWaitForChild(finishing, anotherChild, proposedUpdate)) + return COMPLETING_WAITING_CHILDREN // otherwise -- we have not children left (all were already cancelled?) return finalizeFinishingState(finishing, proposedUpdate) } @@ -928,7 +932,13 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren val waitChild = lastChild.nextChild() // try wait for next child if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child - // no more children to wait -- try update state + // no more children to wait -- stop accepting children + state.list.close(LIST_CHILD_PERMISSION) + // did any children get added? + val waitChildAgain = lastChild.nextChild() + // try wait for next child + if (waitChildAgain != null && tryWaitForChild(state, waitChildAgain, proposedUpdate)) return // waiting for next child + // no more children, now we are sure; try to update the state val finalState = finalizeFinishingState(state, proposedUpdate) afterCompletion(finalState) } @@ -968,41 +978,45 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren val node = ChildHandleNode(child).also { it.job = this } val added = tryPutNodeIntoList(node) { state, list -> if (state is Finishing) { - val rootCause: Throwable + val rootCause: Throwable? val handle: ChildHandle synchronized(state) { // check if we are installing cancellation handler on job that is being cancelled - val maybeRootCause = state.rootCause // != null if cancelling job + rootCause = state.rootCause // != null if cancelling job // We add the node to the list in two cases --- either the job is not being cancelled, // or we are adding a child to a coroutine that is not completing yet - if (maybeRootCause == null || !state.isCompleting) { + if (rootCause == null || !state.isCompleting) { // Note: add node the list while holding lock on state (make sure it cannot change) - if (!list.addLast(node, LIST_MAX_PERMISSION)) - return@tryPutNodeIntoList false // retry + handle = if (list.addLast(node, LIST_CHILD_PERMISSION)) { + node + } else { + NonDisposableHandle + } // just return the node if we don't have to invoke the handler (not cancelling yet) - rootCause = maybeRootCause ?: return@tryPutNodeIntoList true // otherwise handler is invoked immediately out of the synchronized section & handle returned - handle = node } else { - rootCause = maybeRootCause handle = NonDisposableHandle } } node.invoke(rootCause) return handle - } else list.addLast(node, LIST_MAX_PERMISSION).also { success -> - if (success) { - /** Handling the following case: - * - A child requested to be added to the list; - * - We checked the state and saw that it wasn't `Finishing`; - * - Then, the job got cancelled and notified everyone about it; - * - Only then did we add the child to the list - * - and ended up here. - */ - val latestState = this@JobSupport.state - if (latestState is Finishing) { - synchronized(latestState) { latestState.rootCause }?.let { node.invoke(it) } + } else { + list.addLast(node, LIST_CHILD_PERMISSION).also { success -> + if (success) { + /** Handling the following case: + * - A child requested to be added to the list; + * - We checked the state and saw that it wasn't `Finishing`; + * - Then, the job got cancelled and notified everyone about it; + * - Only then did we add the child to the list + * - and ended up here. + */ + val latestState = this@JobSupport.state + if (latestState is Finishing) { + synchronized(latestState) { latestState.rootCause }?.let { node.invoke(it) } + } } + // if we didn't add the node to the list, we'll loop and notice + // either `Finishing` or the final state, so no spin loop here } } } @@ -1340,6 +1354,7 @@ private val EMPTY_NEW = Empty(false) private val EMPTY_ACTIVE = Empty(true) private const val LIST_MAX_PERMISSION = Int.MAX_VALUE +private const val LIST_CHILD_PERMISSION = 1 private const val LIST_CANCELLATION_PERMISSION = 0 private class Empty(override val isActive: Boolean) : Incomplete { diff --git a/kotlinx-coroutines-core/jvm/test/JobChildStressTest.kt b/kotlinx-coroutines-core/jvm/test/JobChildStressTest.kt index 3ac1967b9f..291845e75f 100644 --- a/kotlinx-coroutines-core/jvm/test/JobChildStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/JobChildStressTest.kt @@ -4,6 +4,7 @@ import kotlinx.coroutines.testing.* import org.junit.* import org.junit.Test import java.util.concurrent.* +import java.util.concurrent.atomic.* import kotlin.test.* class JobChildStressTest : TestBase() { @@ -54,4 +55,30 @@ class JobChildStressTest : TestBase() { } } } + + @Test + fun testFailingChildIsAddedWhenJobFinalizesItsState() { + // All exceptions should get aggregated here + repeat(N_ITERATIONS) { + runBlocking { + val rogueJob = AtomicReference() + val deferred = CompletableDeferred() + launch(pool + deferred) { + deferred.complete(Unit) // Transition deferred into "completing" state waiting for current child + // **Asynchronously** submit task that launches a child so it races with completion + pool.executor.execute { + rogueJob.set(launch(pool + deferred) { + throw TestException("isCancelled: ${coroutineContext.job.isCancelled}") + }) + } + } + + deferred.join() + val rogue = rogueJob.get() + if (rogue?.isActive == true) { + throw TestException("Rogue job $rogue with parent " + rogue.parent + " and children list: " + rogue.parent?.children?.toList()) + } + } + } + } } From 85c45a0a60732b387592c3774ab81a0ea7968e60 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Fri, 15 Mar 2024 10:42:21 +0100 Subject: [PATCH 05/13] Simplify adding the children to just two operations on a list --- .../common/src/JobSupport.kt | 74 +++++++++---------- 1 file changed, 33 insertions(+), 41 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index e4699df8fb..cd169b4efb 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -879,7 +879,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren if (finishing.isCompleting) return COMPLETING_ALREADY // mark as completing finishing.isCompleting = true - // if we need to promote to finishing then atomically do it here. + // if we need to promote to finishing, then atomically do it here. // We do it as early is possible while still holding the lock. This ensures that we cancelImpl asap // (if somebody else is faster) and we synchronize all the threads on this finishing lock asap. if (finishing !== state) { @@ -893,7 +893,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren // If it just becomes cancelling --> must process cancelling notifications notifyRootCause = finishing.rootCause.takeIf { !wasCancelling } } - // process cancelling notification here -- it cancels all the children _before_ we start to to wait them (sic!!!) + // process cancelling notification here -- it cancels all the children _before_ we start to wait them (sic!!!) notifyRootCause?.let { notifyCancelling(list, it) } // now wait for children val child = firstChild(state) @@ -976,47 +976,39 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren * but the parent *will* wait for that child before completion and will handle its exception. */ val node = ChildHandleNode(child).also { it.job = this } - val added = tryPutNodeIntoList(node) { state, list -> - if (state is Finishing) { - val rootCause: Throwable? - val handle: ChildHandle - synchronized(state) { - // check if we are installing cancellation handler on job that is being cancelled - rootCause = state.rootCause // != null if cancelling job - // We add the node to the list in two cases --- either the job is not being cancelled, - // or we are adding a child to a coroutine that is not completing yet - if (rootCause == null || !state.isCompleting) { - // Note: add node the list while holding lock on state (make sure it cannot change) - handle = if (list.addLast(node, LIST_CHILD_PERMISSION)) { - node - } else { - NonDisposableHandle - } - // just return the node if we don't have to invoke the handler (not cancelling yet) - // otherwise handler is invoked immediately out of the synchronized section & handle returned - } else { - handle = NonDisposableHandle - } - } - node.invoke(rootCause) - return handle + val added = tryPutNodeIntoList(node) { _, list -> + // First, try to add a child along the cancellation handlers + val addedBeforeCancellation = list.addLast(node, LIST_CANCELLATION_PERMISSION) + if (addedBeforeCancellation) { + // The child managed to be added before the parent started to cancel or complete. Success. + true } else { - list.addLast(node, LIST_CHILD_PERMISSION).also { success -> - if (success) { - /** Handling the following case: - * - A child requested to be added to the list; - * - We checked the state and saw that it wasn't `Finishing`; - * - Then, the job got cancelled and notified everyone about it; - * - Only then did we add the child to the list - * - and ended up here. - */ - val latestState = this@JobSupport.state - if (latestState is Finishing) { - synchronized(latestState) { latestState.rootCause }?.let { node.invoke(it) } - } + // Either cancellation or completion already happened, the child was not added. + // Now we need to try adding it for completion. + val addedBeforeCompletion = list.addLast(node, LIST_CHILD_PERMISSION) + // Whether or not we managed to add the child before the parent completed, we need to investigate: + // why didn't we manage to add it before cancellation? + // If it's because cancellation happened in the meantime, we need to notify the child. + // We check the latest state because the original state with which we started may not have had + // the information about the cancellation yet. + val rootCause = when (val latestState = this.state) { + is Finishing -> { + // The state is still incomplete, so we need to notify the child about the completion cause. + synchronized(latestState) { latestState.rootCause } + } + else -> { + // Since the list is already closed for `onCancelling`, the job is either Finishing or + // already completed. We need to notify the child about the completion cause. + assert { latestState !is Incomplete } + (latestState as? CompletedExceptionally)?.cause } - // if we didn't add the node to the list, we'll loop and notice - // either `Finishing` or the final state, so no spin loop here + } + if (addedBeforeCompletion) { + if (rootCause != null) node.invoke(rootCause) + true + } else { + node.invoke(rootCause) + return NonDisposableHandle } } } From 815db365ea9a555eaa2f0b55e31ea82292839d78 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Fri, 15 Mar 2024 11:19:44 +0100 Subject: [PATCH 06/13] Fix a spinlock in invokeOnCompletion with onCancelling = true Before this change, when children were prohibited from adding themselves to the list, cancellation handlers were also prohibited from doing so. This was plain incorrect, because a list that's closed for new children could still get cancelled later, and also because being closed for new children happened before advancing the state to the final one. --- .../common/src/JobSupport.kt | 23 ++++++++++++------- .../src/internal/LockFreeLinkedList.common.kt | 7 +++--- .../src/internal/LockFreeLinkedList.kt | 11 +++++---- .../src/internal/LinkedList.kt | 11 +++++---- 4 files changed, 32 insertions(+), 20 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index cd169b4efb..8c1f9ef3de 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -352,7 +352,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren } private fun NodeList.notifyCompletion(cause: Throwable?) { - close(LIST_MAX_PERMISSION) + close(LIST_ON_COMPLETION_PERMISSION) notifyHandlers(this, cause) { true } } @@ -468,13 +468,13 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren if (node.onCancelling) { val rootCause = (state as? Finishing)?.let { synchronized(it) { it.rootCause } } if (rootCause == null) { - list.addLast(node, LIST_CANCELLATION_PERMISSION) + list.addLast(node, LIST_CANCELLATION_PERMISSION or LIST_ON_COMPLETION_PERMISSION) } else { if (invokeImmediately) node.invoke(rootCause) return NonDisposableHandle } } else { - list.addLast(node, LIST_MAX_PERMISSION) + list.addLast(node, LIST_ON_COMPLETION_PERMISSION) } } when { @@ -978,14 +978,20 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren val node = ChildHandleNode(child).also { it.job = this } val added = tryPutNodeIntoList(node) { _, list -> // First, try to add a child along the cancellation handlers - val addedBeforeCancellation = list.addLast(node, LIST_CANCELLATION_PERMISSION) + val addedBeforeCancellation = list.addLast( + node, + LIST_ON_COMPLETION_PERMISSION or LIST_CHILD_PERMISSION or LIST_CANCELLATION_PERMISSION + ) if (addedBeforeCancellation) { // The child managed to be added before the parent started to cancel or complete. Success. true } else { // Either cancellation or completion already happened, the child was not added. // Now we need to try adding it for completion. - val addedBeforeCompletion = list.addLast(node, LIST_CHILD_PERMISSION) + val addedBeforeCompletion = list.addLast( + node, + LIST_CHILD_PERMISSION or LIST_ON_COMPLETION_PERMISSION + ) // Whether or not we managed to add the child before the parent completed, we need to investigate: // why didn't we manage to add it before cancellation? // If it's because cancellation happened in the meantime, we need to notify the child. @@ -1345,9 +1351,10 @@ private val SEALED = Symbol("SEALED") private val EMPTY_NEW = Empty(false) private val EMPTY_ACTIVE = Empty(true) -private const val LIST_MAX_PERMISSION = Int.MAX_VALUE -private const val LIST_CHILD_PERMISSION = 1 -private const val LIST_CANCELLATION_PERMISSION = 0 +// bit mask +private const val LIST_ON_COMPLETION_PERMISSION = 1 +private const val LIST_CHILD_PERMISSION = 2 +private const val LIST_CANCELLATION_PERMISSION = 4 private class Empty(override val isActive: Boolean) : Incomplete { override val list: NodeList? get() = null diff --git a/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt b/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt index 2be4119f23..32209fc0ac 100644 --- a/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt +++ b/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt @@ -7,14 +7,15 @@ public expect open class LockFreeLinkedListNode() { public val isRemoved: Boolean public val nextNode: LockFreeLinkedListNode public val prevNode: LockFreeLinkedListNode - public fun addLast(node: LockFreeLinkedListNode, clearanceLevel: Int): Boolean + public fun addLast(node: LockFreeLinkedListNode, permissionsBitmask: Int): Boolean public fun addOneIfEmpty(node: LockFreeLinkedListNode): Boolean public open fun remove(): Boolean /** - * Closes the list for [maxForbidden] and all numbers below. + * Closes the list for anything that requests the permission [forbiddenElementsBit]. + * Only a single permission can be forbidden at a time, but this isn't checked. */ - public fun close(maxForbidden: Int) + public fun close(forbiddenElementsBit: Int) } /** @suppress **This is unstable API and it is subject to change.** */ diff --git a/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt b/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt index 0ffcd27c4c..92630a45a5 100644 --- a/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt +++ b/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt @@ -80,12 +80,13 @@ public actual open class LockFreeLinkedListNode { /** * Adds last item to this list. Returns `false` if the list is closed. */ - public actual fun addLast(node: Node, clearanceLevel: Int): Boolean { + public actual fun addLast(node: Node, permissionsBitmask: Int): Boolean { while (true) { // lock-free loop on prev.next val currentPrev = prevNode return when { currentPrev is ListClosed -> - currentPrev.maxForbidden < clearanceLevel && currentPrev.addLast(node, clearanceLevel) + currentPrev.forbiddenElementsBitmask and permissionsBitmask == 0 && + currentPrev.addLast(node, permissionsBitmask) currentPrev.addNext(node, this) -> true else -> continue } @@ -95,7 +96,9 @@ public actual open class LockFreeLinkedListNode { /** * Forbids adding new items to this list. */ - public actual fun close(maxForbidden: Int) { addLast(ListClosed(maxForbidden), maxForbidden) } + public actual fun close(forbiddenElementsBit: Int) { + addLast(ListClosed(forbiddenElementsBit), forbiddenElementsBit) + } /** * Given: @@ -283,4 +286,4 @@ public actual open class LockFreeLinkedListHead : LockFreeLinkedListNode() { override val isRemoved: Boolean get() = false } -private class ListClosed(val maxForbidden: Int): LockFreeLinkedListNode() +private class ListClosed(val forbiddenElementsBitmask: Int): LockFreeLinkedListNode() diff --git a/kotlinx-coroutines-core/jsAndWasmShared/src/internal/LinkedList.kt b/kotlinx-coroutines-core/jsAndWasmShared/src/internal/LinkedList.kt index 16fa8331cf..6810d614d1 100644 --- a/kotlinx-coroutines-core/jsAndWasmShared/src/internal/LinkedList.kt +++ b/kotlinx-coroutines-core/jsAndWasmShared/src/internal/LinkedList.kt @@ -14,8 +14,9 @@ public actual open class LockFreeLinkedListNode { inline actual val prevNode get() = _prev inline actual val isRemoved get() = _removed - public actual fun addLast(node: Node, clearanceLevel: Int): Boolean = when (val prev = this._prev) { - is ListClosed -> prev.maxForbidden < clearanceLevel && prev.addLast(node, clearanceLevel) + public actual fun addLast(node: Node, permissionsBitmask: Int): Boolean = when (val prev = this._prev) { + is ListClosed -> + prev.forbiddenElementsBitmask and permissionsBitmask == 0 && prev.addLast(node, permissionsBitmask) else -> { node._next = this node._prev = prev @@ -25,8 +26,8 @@ public actual open class LockFreeLinkedListNode { } } - public actual fun close(maxForbidden: Int) { - addLast(ListClosed(maxForbidden), maxForbidden) + public actual fun close(forbiddenElementsBit: Int) { + addLast(ListClosed(forbiddenElementsBit), forbiddenElementsBit) } /* @@ -69,4 +70,4 @@ public actual open class LockFreeLinkedListHead : Node() { public actual final override fun remove(): Nothing = throw UnsupportedOperationException() } -private class ListClosed(val maxForbidden: Int): LockFreeLinkedListNode() +private class ListClosed(val forbiddenElementsBitmask: Int): LockFreeLinkedListNode() From 970da72c6b41afeef1ca8a3fbb59e9635cac931b Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Fri, 15 Mar 2024 12:27:57 +0100 Subject: [PATCH 07/13] Improve the docs --- .../common/src/JobSupport.kt | 72 +++++++++++++++---- 1 file changed, 60 insertions(+), 12 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index 8c1f9ef3de..2085ec3dfd 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -466,14 +466,47 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren // for user-defined handlers it allocates a JobNode object that we might not need, but this is Ok. val added = tryPutNodeIntoList(node) { state, list -> if (node.onCancelling) { + /** + * We are querying whether the job was already cancelled when we entered this block. + * We can't naively attempt to add the node to the list, because a lot of time could pass between + * notifying the cancellation handlers (and thus closing the list, forcing us to retry) + * and reaching a final state. + * + * Alternatively, we could also try to add the node to the list first and then read the latest state + * to check for an exception, but that logic would need to manually handle the final state, which is + * less straightforward. + */ val rootCause = (state as? Finishing)?.let { synchronized(it) { it.rootCause } } if (rootCause == null) { + /** + * There is no known root cause yet, so we can add the node to the list of state handlers. + * + * If this call fails, because of the bitmask, this means one of the two happened: + * - [notifyCancelling] was already called. + * This means that the job is already being cancelled: otherwise, with what exception would we + * notify the handler? + * So, we can retry the operation: either the state is already final, or the `rootCause` check + * above will give a different result. + * - [notifyCompletion] was already called. + * This means that the job is already complete. + * We can retry the operation and will observe the final state. + */ list.addLast(node, LIST_CANCELLATION_PERMISSION or LIST_ON_COMPLETION_PERMISSION) } else { + /** + * The root cause is known, so we can invoke the handler immediately and avoid adding it. + */ if (invokeImmediately) node.invoke(rootCause) return NonDisposableHandle } } else { + /** + * The non-[onCancelling]-handlers are interested in completions only, so it's safe to add them at + * any time before [notifyCompletion] is called (which closes the list). + * + * If the list *is* closed, on a retry, we'll observe the final state, as [notifyCompletion] is only + * called after the state transition. + */ list.addLast(node, LIST_ON_COMPLETION_PERMISSION) } } @@ -969,7 +1002,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren * Note: This function attaches a special ChildHandleNode node object. This node object * is handled in a special way on completion on the coroutine (we wait for all of them) and also * can't be added simply with `invokeOnCompletionInternal` -- we add this node to the list even - * if the job is already cancelling. For cancelling state, the child is attached under state lock. + * if the job is already cancelling. * It's required to properly await all children before completion and provide a linearizable hierarchy view: * If the child is attached when the job is already being cancelled, such a child will receive * an immediate notification on cancellation, @@ -986,39 +1019,54 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren // The child managed to be added before the parent started to cancel or complete. Success. true } else { - // Either cancellation or completion already happened, the child was not added. - // Now we need to try adding it for completion. + /* Either cancellation or completion already happened, the child was not added. + * Now we need to try adding it just for completion. */ val addedBeforeCompletion = list.addLast( node, LIST_CHILD_PERMISSION or LIST_ON_COMPLETION_PERMISSION ) - // Whether or not we managed to add the child before the parent completed, we need to investigate: - // why didn't we manage to add it before cancellation? - // If it's because cancellation happened in the meantime, we need to notify the child. - // We check the latest state because the original state with which we started may not have had - // the information about the cancellation yet. + /* + * Whether or not we managed to add the child before the parent completed, we need to investigate: + * why didn't we manage to add it before cancellation? + * If it's because cancellation happened in the meantime, we need to notify the child about it. + * We check the latest state because the original state with which we started may not have had + * the information about the cancellation yet. + */ val rootCause = when (val latestState = this.state) { is Finishing -> { // The state is still incomplete, so we need to notify the child about the completion cause. synchronized(latestState) { latestState.rootCause } } else -> { - // Since the list is already closed for `onCancelling`, the job is either Finishing or - // already completed. We need to notify the child about the completion cause. + /** Since the list is already closed for [onCancelling], the job is either Finishing or + * already completed. We need to notify the child about the completion cause. */ assert { latestState !is Incomplete } (latestState as? CompletedExceptionally)?.cause } } + /** + * We must cancel the child if the parent was cancelled already, even if we successfully attached, + * as this child didn't make it before [notifyCancelling] and won't be notified that it should be + * cancelled. + * + * And if the parent wasn't cancelled and the previous [LockFreeLinkedListNode.addLast] failed because + * the job is in its final state already, we won't be able to attach anyway, so we must just invoke + * the handler and return. + */ + node.invoke(rootCause) if (addedBeforeCompletion) { - if (rootCause != null) node.invoke(rootCause) + /** The root cause can't be null: since the earlier addition to the list failed, this means that + * the job was already cancelled or completed. */ + assert { rootCause != null } true } else { - node.invoke(rootCause) + /** No sense in retrying: we know it won't succeed, and we already invoked the handler. */ return NonDisposableHandle } } } if (added) return node + /** We can only end up here if [tryPutNodeIntoList] detected a final state. */ node.invoke((state as? CompletedExceptionally)?.cause) return NonDisposableHandle } From 0c52fe8cfea85dd0c8a4086345a0e1ddbbf100d3 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Fri, 15 Mar 2024 13:05:14 +0100 Subject: [PATCH 08/13] Remove some unnecessary `synchronized` blocks `rootCause` is atomic anyway. --- kotlinx-coroutines-core/common/src/JobSupport.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index 2085ec3dfd..3a1af77049 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -476,7 +476,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren * to check for an exception, but that logic would need to manually handle the final state, which is * less straightforward. */ - val rootCause = (state as? Finishing)?.let { synchronized(it) { it.rootCause } } + val rootCause = (state as? Finishing)?.rootCause if (rootCause == null) { /** * There is no known root cause yet, so we can add the node to the list of state handlers. @@ -906,7 +906,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren // atomically transition to finishing & completing state val finishing = state as? Finishing ?: Finishing(list, false, null) // must synchronize updates to finishing state - var notifyRootCause: Throwable? = null + val notifyRootCause: Throwable? synchronized(finishing) { // check if this state is already completing if (finishing.isCompleting) return COMPLETING_ALREADY @@ -1035,7 +1035,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren val rootCause = when (val latestState = this.state) { is Finishing -> { // The state is still incomplete, so we need to notify the child about the completion cause. - synchronized(latestState) { latestState.rootCause } + latestState.rootCause } else -> { /** Since the list is already closed for [onCancelling], the job is either Finishing or From d989e06f236f8ae4941a1274daa0a581d173f8e3 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Fri, 15 Mar 2024 15:33:41 +0100 Subject: [PATCH 09/13] Fix a newcoming child being sometimes ignored The failure went like this: * A child arrives. * In the meantime, the parent enters `tryMakeCompletingSlowPath` and remembers the current list of handlers, which is an empty or a single-element one. * The parent updates the state to the finishing one. * The child enters the list. * The parent traverses *an old list*, the one from before the child arrived. It sees no children in the empty/single-element list and forgets about the child. Why, then, was it that this worked before? It was because there was a guarantee that no new children are going to be registered if three conditions are true: * The state of the `JobSupport` is a list, * The root cause of the error is set to something, * And the state is already "completing". `tryMakeCompletingSlowPath` sets the state to completing, and because it updates the state inside `synchronized`, there was a guarantee that the child would see either the old state (and, if it adds itself successfully, then `tryMakeCompletingSlowPath` will retry) or the complete new one, with `isCompleting` and the error set to something. So, there could be no case when a child entered a *list*, but this list was something different from what `tryMakeCompletingSlowPath` stores in its state. --- kotlinx-coroutines-core/common/src/JobSupport.kt | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index 3a1af77049..a099a93d34 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -929,11 +929,11 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren // process cancelling notification here -- it cancels all the children _before_ we start to wait them (sic!!!) notifyRootCause?.let { notifyCancelling(list, it) } // now wait for children - val child = firstChild(state) + val child = list.nextChild() if (child != null && tryWaitForChild(finishing, child, proposedUpdate)) return COMPLETING_WAITING_CHILDREN list.close(LIST_CHILD_PERMISSION) - val anotherChild = firstChild(state) + val anotherChild = list.nextChild() if (anotherChild != null && tryWaitForChild(finishing, anotherChild, proposedUpdate)) return COMPLETING_WAITING_CHILDREN // otherwise -- we have not children left (all were already cancelled?) @@ -943,9 +943,6 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren private val Any?.exceptionOrNull: Throwable? get() = (this as? CompletedExceptionally)?.cause - private fun firstChild(state: Incomplete) = - state as? ChildHandleNode ?: state.list?.nextChild() - // return false when there is no more incomplete children to wait // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method. private tailrec fun tryWaitForChild(state: Finishing, child: ChildHandleNode, proposedUpdate: Any?): Boolean { From 494d8cafae79cc8220b3445a9559f51f07fbd2d9 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Tue, 26 Mar 2024 13:02:26 +0100 Subject: [PATCH 10/13] Add and enhance stress tests for attaching completion handlers --- .../jvm/test/JobChildStressTest.kt | 37 +++- .../jvm/test/JobHandlersUpgradeStressTest.kt | 5 +- .../jvm/test/JobOnCompletionStressTest.kt | 192 ++++++++++++++++++ 3 files changed, 226 insertions(+), 8 deletions(-) create mode 100644 kotlinx-coroutines-core/jvm/test/JobOnCompletionStressTest.kt diff --git a/kotlinx-coroutines-core/jvm/test/JobChildStressTest.kt b/kotlinx-coroutines-core/jvm/test/JobChildStressTest.kt index 291845e75f..a30e6393b6 100644 --- a/kotlinx-coroutines-core/jvm/test/JobChildStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/JobChildStressTest.kt @@ -1,27 +1,36 @@ package kotlinx.coroutines import kotlinx.coroutines.testing.* -import org.junit.* -import org.junit.Test import java.util.concurrent.* import java.util.concurrent.atomic.* import kotlin.test.* +/** + * Testing the procedure of attaching a child to the parent job. + */ class JobChildStressTest : TestBase() { private val N_ITERATIONS = 10_000 * stressTestMultiplier private val pool = newFixedThreadPoolContext(3, "JobChildStressTest") - @After + @AfterTest fun tearDown() { pool.close() } /** - * Perform concurrent launch of a child job & cancellation of the explicit parent job + * Tests attaching a child while the parent is trying to finalize its state. + * + * Checks the following interleavings: + * - A child attaches before the parent is cancelled. + * - A child attaches after the parent is cancelled, but before the parent notifies anyone about it. + * - A child attaches after the parent notifies the children about being cancelled, + * but before it starts waiting for its children. + * - A child attempts to attach after the parent stops waiting for its children, + * which immediately cancels the child. */ @Test @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN") - fun testChild() = runTest { + fun testChildAttachmentRacingWithCancellation() = runTest { val barrier = CyclicBarrier(3) repeat(N_ITERATIONS) { var wasLaunched = false @@ -30,7 +39,7 @@ class JobChildStressTest : TestBase() { unhandledException = ex } val scope = CoroutineScope(pool + handler) - val parent = CompletableDeferred() + val parent = createCompletableDeferredForTesting(it) // concurrent child launcher val launcher = scope.launch { barrier.await() @@ -56,13 +65,27 @@ class JobChildStressTest : TestBase() { } } + /** + * Tests attaching a child while the parent is waiting for the last child job to complete. + * + * Checks the following interleavings: + * - A child attaches while the parent is already completing, but is waiting for its children. + * - A child attempts to attach after the parent stops waiting for its children, + * which immediately cancels the child. + */ @Test - fun testFailingChildIsAddedWhenJobFinalizesItsState() { + fun testChildAttachmentRacingWithLastChildCompletion() { // All exceptions should get aggregated here repeat(N_ITERATIONS) { runBlocking { val rogueJob = AtomicReference() + /** not using [createCompletableDeferredForTesting] because we don't need extra children. */ val deferred = CompletableDeferred() + // optionally, add a completion handler to the parent job, so that the child tries to enter a list with + // multiple elements, not just one. + if (it.mod(2) == 0) { + deferred.invokeOnCompletion { } + } launch(pool + deferred) { deferred.complete(Unit) // Transition deferred into "completing" state waiting for current child // **Asynchronously** submit task that launches a child so it races with completion diff --git a/kotlinx-coroutines-core/jvm/test/JobHandlersUpgradeStressTest.kt b/kotlinx-coroutines-core/jvm/test/JobHandlersUpgradeStressTest.kt index dc2314bb6c..3f085b6f20 100644 --- a/kotlinx-coroutines-core/jvm/test/JobHandlersUpgradeStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/JobHandlersUpgradeStressTest.kt @@ -30,6 +30,9 @@ class JobHandlersUpgradeStressTest : TestBase() { val state = atomic(0) } + /** + * Tests handlers not being invoked more than once. + */ @Test fun testStress() { println("--- JobHandlersUpgradeStressTest") @@ -91,4 +94,4 @@ class JobHandlersUpgradeStressTest : TestBase() { println(" Fired handler ${fired.value} times") } -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/JobOnCompletionStressTest.kt b/kotlinx-coroutines-core/jvm/test/JobOnCompletionStressTest.kt new file mode 100644 index 0000000000..3df62b666e --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/JobOnCompletionStressTest.kt @@ -0,0 +1,192 @@ +package kotlinx.coroutines + +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.testing.* +import java.util.concurrent.CyclicBarrier +import java.util.concurrent.atomic.* +import kotlin.test.* +import kotlin.time.Duration.Companion.seconds + +class JobOnCompletionStressTest: TestBase() { + private val N_ITERATIONS = 10_000 * stressTestMultiplier + private val pool = newFixedThreadPoolContext(2, "JobOnCompletionStressTest") + + private val completionHandlerSeesCompletedParent = AtomicBoolean(false) + private val completionHandlerSeesCancelledParent = AtomicBoolean(false) + private val encounteredException = AtomicReference(null) + + @AfterTest + fun tearDown() { + pool.close() + } + + @Test + fun testOnCompletionRacingWithCompletion() = runTest { + testHandlerRacingWithCancellation( + onCancelling = false, + invokeImmediately = true, + parentCompletion = { complete(Unit) } + ) { + assertNull(encounteredException.get()) + assertTrue(completionHandlerSeesCompletedParent.get()) + assertFalse(completionHandlerSeesCancelledParent.get()) + } + } + + @Test + fun testOnCompletionRacingWithCancellation() = runTest { + testHandlerRacingWithCancellation( + onCancelling = false, + invokeImmediately = true, + parentCompletion = { completeExceptionally(TestException()) } + ) { + assertIs(encounteredException.get()) + assertTrue(completionHandlerSeesCompletedParent.get()) + assertTrue(completionHandlerSeesCancelledParent.get()) + } + } + + @Test + fun testOnCancellingRacingWithCompletion() = runTest { + testHandlerRacingWithCancellation( + onCancelling = true, + invokeImmediately = true, + parentCompletion = { complete(Unit) } + ) { + assertNull(encounteredException.get()) + assertTrue(completionHandlerSeesCompletedParent.get()) + assertFalse(completionHandlerSeesCancelledParent.get()) + } + } + + @Test + fun testOnCancellingRacingWithCancellation() = runTest { + testHandlerRacingWithCancellation( + onCancelling = true, + invokeImmediately = true, + parentCompletion = { completeExceptionally(TestException()) } + ) { + assertIs(encounteredException.get()) + assertTrue(completionHandlerSeesCancelledParent.get()) + } + } + + @Test + fun testNonImmediateOnCompletionRacingWithCompletion() = runTest { + testHandlerRacingWithCancellation( + onCancelling = false, + invokeImmediately = false, + parentCompletion = { complete(Unit) } + ) { + assertNull(encounteredException.get()) + assertTrue(completionHandlerSeesCompletedParent.get()) + assertFalse(completionHandlerSeesCancelledParent.get()) + } + } + + @Test + fun testNonImmediateOnCompletionRacingWithCancellation() = runTest { + testHandlerRacingWithCancellation( + onCancelling = false, + invokeImmediately = false, + parentCompletion = { completeExceptionally(TestException()) } + ) { + assertIs(encounteredException.get()) + assertTrue(completionHandlerSeesCompletedParent.get()) + assertTrue(completionHandlerSeesCancelledParent.get()) + } + } + + @Test + fun testNonImmediateOnCancellingRacingWithCompletion() = runTest { + testHandlerRacingWithCancellation( + onCancelling = true, + invokeImmediately = false, + parentCompletion = { complete(Unit) } + ) { + assertNull(encounteredException.get()) + assertTrue(completionHandlerSeesCompletedParent.get()) + assertFalse(completionHandlerSeesCancelledParent.get()) + } + } + + @Test + fun testNonImmediateOnCancellingRacingWithCancellation() = runTest { + testHandlerRacingWithCancellation( + onCancelling = true, + invokeImmediately = false, + parentCompletion = { completeExceptionally(TestException()) } + ) { + assertIs(encounteredException.get()) + assertTrue(completionHandlerSeesCancelledParent.get()) + } + } + + private suspend fun testHandlerRacingWithCancellation( + onCancelling: Boolean, + invokeImmediately: Boolean, + parentCompletion: CompletableDeferred.() -> Unit, + validate: () -> Unit, + ) { + repeat(N_ITERATIONS) { + val entered = Channel(1) + completionHandlerSeesCompletedParent.set(false) + completionHandlerSeesCancelledParent.set(false) + encounteredException.set(null) + val parent = createCompletableDeferredForTesting(it) + val barrier = CyclicBarrier(2) + val handlerInstallJob = coroutineScope { + launch(pool) { + barrier.await() + parent.parentCompletion() + } + async(pool) { + barrier.await() + parent.invokeOnCompletion( + onCancelling = onCancelling, + invokeImmediately = invokeImmediately, + ) { exception -> + encounteredException.set(exception) + completionHandlerSeesCompletedParent.set(parent.isCompleted) + completionHandlerSeesCancelledParent.set(parent.isCancelled) + entered.trySend(Unit) + } + } + } + if (invokeImmediately || handlerInstallJob.getCompleted() !== NonDisposableHandle) { + withTimeout(1.seconds) { + entered.receive() + } + try { + validate() + } catch (e: Throwable) { + println("Iteration $it failed") + println("invokeOnCompletion returned ${handlerInstallJob.getCompleted()}") + throw e + } + } else { + assertTrue(entered.isEmpty) + } + } + } +} + +/** + * Creates a [CompletableDeferred], optionally adding completion handlers and/or other children to the job depending + * on [iteration]. + * The purpose is to test not just attaching completion handlers to empty or one-element lists (see the [JobSupport] + * implementation for details on what this means), but also to lists with multiple elements. + */ +fun createCompletableDeferredForTesting(iteration: Int): CompletableDeferred { + val parent = CompletableDeferred() + /* We optionally add completion handlers and/or other children to the parent job + to test the scenarios where a child is placed into an empty list, a single-element list, + or a list with multiple elements. */ + if (iteration.mod(2) == 0) { + parent.invokeOnCompletion { } + } + if (iteration.mod(3) == 0) { + GlobalScope.launch(parent) { } + } + return parent +} From 855ec80d87590d978f4c5a39f4a08060bc8783ba Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy <52952525+dkhalanskyjb@users.noreply.github.com> Date: Wed, 8 May 2024 15:00:47 +0300 Subject: [PATCH 11/13] Update kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt Co-authored-by: Vsevolod Tolstopyatov --- .../concurrent/src/internal/LockFreeLinkedList.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt b/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt index 92630a45a5..a172e66c8b 100644 --- a/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt +++ b/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt @@ -286,4 +286,4 @@ public actual open class LockFreeLinkedListHead : LockFreeLinkedListNode() { override val isRemoved: Boolean get() = false } -private class ListClosed(val forbiddenElementsBitmask: Int): LockFreeLinkedListNode() +private class ListClosed(@JvmField val forbiddenElementsBitmask: Int): LockFreeLinkedListNode() From e6a7f36f2a3ae786b38da7cb16ff5c20543687e1 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Fri, 10 May 2024 12:02:56 +0200 Subject: [PATCH 12/13] Add a comment about the sequence of closing the list for children --- .../common/src/JobSupport.kt | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index a099a93d34..2756878022 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -929,10 +929,15 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren // process cancelling notification here -- it cancels all the children _before_ we start to wait them (sic!!!) notifyRootCause?.let { notifyCancelling(list, it) } // now wait for children + // we can't close the list yet: while there are active children, adding new ones is still allowed. val child = list.nextChild() if (child != null && tryWaitForChild(finishing, child, proposedUpdate)) return COMPLETING_WAITING_CHILDREN + // turns out, there are no children to await, so we close the list. list.close(LIST_CHILD_PERMISSION) + // some children could have sneaked into the list, so we try waiting for them again. + // it would be more correct to re-open the list (otherwise, we get non-linearizable behavior), + // but it's too difficult with the current lock-free list implementation. val anotherChild = list.nextChild() if (anotherChild != null && tryWaitForChild(finishing, anotherChild, proposedUpdate)) return COMPLETING_WAITING_CHILDREN @@ -958,16 +963,21 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method. private fun continueCompleting(state: Finishing, lastChild: ChildHandleNode, proposedUpdate: Any?) { assert { this.state === state } // consistency check -- it cannot change while we are waiting for children - // figure out if we need to wait for next child + // figure out if we need to wait for the next child val waitChild = lastChild.nextChild() - // try wait for next child + // try to wait for the next child if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child - // no more children to wait -- stop accepting children + // no more children to await, so *maybe* we can complete the job; for that, we stop accepting new children. state.list.close(LIST_CHILD_PERMISSION) - // did any children get added? + // did any new children sneak in? val waitChildAgain = lastChild.nextChild() - // try wait for next child - if (waitChildAgain != null && tryWaitForChild(state, waitChildAgain, proposedUpdate)) return // waiting for next child + if (waitChildAgain != null && tryWaitForChild(state, waitChildAgain, proposedUpdate)) { + // yes, so now we have to wait for them! + // ideally, we should re-open the list, + // but it's too difficult with the current lock-free list implementation, + // so we'll live with non-linearizable behavior for now. + return + } // no more children, now we are sure; try to update the state val finalState = finalizeFinishingState(state, proposedUpdate) afterCompletion(finalState) From 66254c8564acbdc700a7154202f0c00939125110 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Mon, 13 May 2024 11:38:34 +0200 Subject: [PATCH 13/13] Add a comment about the list being closed more than once --- kotlinx-coroutines-core/common/src/JobSupport.kt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index 2756878022..3252d1047f 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -968,6 +968,9 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren // try to wait for the next child if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child // no more children to await, so *maybe* we can complete the job; for that, we stop accepting new children. + // potentially, the list can be closed for children more than once: if we detect that there are no more + // children, attempt to close the list, and then new children sneak in, this whole logic will be + // repeated, including closing the list. state.list.close(LIST_CHILD_PERMISSION) // did any new children sneak in? val waitChildAgain = lastChild.nextChild()