diff --git a/kotlinx-coroutines-core/common/src/Job.kt b/kotlinx-coroutines-core/common/src/Job.kt index 5f40dfc194..6fb83a42f0 100644 --- a/kotlinx-coroutines-core/common/src/Job.kt +++ b/kotlinx-coroutines-core/common/src/Job.kt @@ -682,3 +682,9 @@ public object NonDisposableHandle : DisposableHandle, ChildHandle { */ override fun toString(): String = "NonDisposableHandle" } + +private class DisposeOnCompletion( + private val handle: DisposableHandle +) : JobNode() { + override fun invoke(cause: Throwable?) = handle.dispose() +} diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index 0c63b6a4b4..602cebcccc 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -127,6 +127,9 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren // Used by the IDEA debugger via reflection and must be kept binary-compatible, see KTIJ-24102 private val _state = atomic(if (active) EMPTY_ACTIVE else EMPTY_NEW) + /** `true` means that the Job is cancelling and shouldn't accept [invokeOnCompletion] with `onCancelling = true` */ + private val onCancellingHandlersNotAccepted = atomic(false) + private val _parentHandle = atomic(null) internal var parentHandle: ChildHandle? get() = _parentHandle.value @@ -148,7 +151,6 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren return } parent.start() // make sure the parent is started - @Suppress("DEPRECATION") val handle = parent.attachChild(this) parentHandle = handle // now check our state _after_ registering (see tryFinalizeSimpleState order of actions) @@ -236,6 +238,8 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren if (!wasCancelling) onCancelling(finalException) onCompletionInternal(finalState) // Then CAS to completed state -> it must succeed + // forbid any new children + onCancellingHandlersNotAccepted.value = true val casSuccess = _state.compareAndSet(state, finalState.boxIncomplete()) assert { casSuccess } // And process all post-completion actions @@ -327,7 +331,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren } private fun notifyCancelling(list: NodeList, cause: Throwable) { - // first cancel our own children + // then cancel our own children onCancelling(cause) notifyHandlers(list, cause) // then cancel parent @@ -360,8 +364,10 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren return parent.childCancelled(cause) || isCancellation } - private fun NodeList.notifyCompletion(cause: Throwable?) = + private fun NodeList.notifyCompletion(cause: Throwable?) { + close() notifyHandlers(this, cause) + } private inline fun notifyHandlers(list: NodeList, cause: Throwable?) { var exception: Throwable? = null @@ -459,52 +465,63 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren // for user-defined handlers it allocates a JobNode object that we might not need, but this is Ok. val node: JobNode = makeNode(handler, onCancelling) loopOnState { state -> - when (state) { - is Empty -> { // EMPTY_X state -- no completion handlers + when { + state !is Incomplete || onCancelling && onCancellingHandlersNotAccepted.value -> { + if (invokeImmediately) { + val exception = when (state) { + is CompletedExceptionally -> state.cause + is Finishing -> state.rootCause + else -> null + } + // :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension, + // because we play type tricks on Kotlin/JS and handler is not necessarily a function there + handler.invokeIt(exception) + } + return NonDisposableHandle + } + state is Empty -> { // EMPTY_X state -- no completion handlers if (state.isActive) { // try move to SINGLE state if (_state.compareAndSet(state, node)) return node } else promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine } - is Incomplete -> { + else -> { + // is Incomplete val list = state.list if (list == null) { // SINGLE/SINGLE+ promoteSingleToNodeList(state as JobNode) } else { - var rootCause: Throwable? = null - var handle: DisposableHandle = NonDisposableHandle - if (onCancelling && state is Finishing) { - synchronized(state) { - // check if we are installing cancellation handler on job that is being cancelled - rootCause = state.rootCause // != null if cancelling job - // We add node to the list in two cases --- either the job is not being cancelled - // or we are adding a child to a coroutine that is not completing yet - if (rootCause == null || handler.isHandlerOf() && !state.isCompleting) { - // Note: add node the list while holding lock on state (make sure it cannot change) - if (!addLastAtomic(state, list, node)) return@loopOnState // retry - // just return node if we don't have to invoke handler (not cancelling yet) - if (rootCause == null) return node - // otherwise handler is invoked immediately out of the synchronized section & handle returned - handle = node + if (onCancelling) { + if (state is Finishing) { + val rootCause: Throwable? + val handle: DisposableHandle + synchronized(state) { + // check if we are installing cancellation handler on job that is being cancelled + rootCause = state.rootCause // != null if cancelling job + // We add node to the list in two cases --- either the job is not being cancelled + // or we are adding a child to a coroutine that is not completing yet + if (rootCause == null || handler.isHandlerOf() && !state.isCompleting) { + // Note: add node the list while holding lock on state (make sure it cannot change) + if (!list.addLastIf(node) { !onCancellingHandlersNotAccepted.value }) return@loopOnState // retry + // just return node if we don't have to invoke handler (not cancelling yet) + if (rootCause == null) return node + // otherwise handler is invoked immediately out of the synchronized section & handle returned + handle = node + } else { + handle = NonDisposableHandle + } } + // Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job + if (invokeImmediately) handler.invokeIt(rootCause) + return handle } - } - if (rootCause != null) { - // Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job - if (invokeImmediately) handler.invokeIt(rootCause) - return handle + if (list.addLastIf(node) { !onCancellingHandlersNotAccepted.value }) return node } else { - if (addLastAtomic(state, list, node)) return node + if (list.addLast(node)) return node } } } - else -> { // is complete - // :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension, - // because we play type tricks on Kotlin/JS and handler is not necessarily a function there - if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause) - return NonDisposableHandle - } } } } @@ -881,6 +898,8 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren val finishing = state as? Finishing ?: Finishing(list, false, null) // must synchronize updates to finishing state var notifyRootCause: Throwable? = null + // forbid any new children + onCancellingHandlersNotAccepted.value = true synchronized(finishing) { // check if this state is already completing if (finishing.isCompleting) return COMPLETING_ALREADY @@ -1175,7 +1194,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren return parent.getCancellationException() } - protected override fun nameString(): String = + override fun nameString(): String = "AwaitContinuation" } @@ -1372,7 +1391,7 @@ internal class NodeList : LockFreeLinkedListHead(), Incomplete { if (DEBUG) getString("Active") else super.toString() } -internal class InactiveNodeList( +private class InactiveNodeList( override val list: NodeList ) : Incomplete { override val isActive: Boolean get() = false @@ -1408,12 +1427,6 @@ private class ResumeAwaitOnCompletion( } } -internal class DisposeOnCompletion( - private val handle: DisposableHandle -) : JobNode() { - override fun invoke(cause: Throwable?) = handle.dispose() -} - // -------- invokeOnCancellation nodes /** @@ -1432,7 +1445,7 @@ private class InvokeOnCancelling( } } -internal class ChildHandleNode( +private class ChildHandleNode( @JvmField val childJob: ChildJob ) : JobCancellingNode(), ChildHandle { override val parent: Job get() = job diff --git a/kotlinx-coroutines-core/common/src/internal/Atomic.kt b/kotlinx-coroutines-core/common/src/internal/Atomic.kt index ff4320e0b3..d04edfe9cc 100644 --- a/kotlinx-coroutines-core/common/src/internal/Atomic.kt +++ b/kotlinx-coroutines-core/common/src/internal/Atomic.kt @@ -22,12 +22,6 @@ public abstract class OpDescriptor { */ abstract fun perform(affected: Any?): Any? - /** - * Returns reference to atomic operation that this descriptor is a part of or `null` - * if not a part of any [AtomicOp]. - */ - abstract val atomicOp: AtomicOp<*>? - override fun toString(): String = "$classSimpleName@$hexAddress" // debug } @@ -49,8 +43,6 @@ internal val NO_DECISION: Any = Symbol("NO_DECISION") public abstract class AtomicOp : OpDescriptor() { private val _consensus = atomic(NO_DECISION) - override val atomicOp: AtomicOp<*> get() = this - private fun decide(decision: Any?): Any? { assert { decision !== NO_DECISION } val current = _consensus.value diff --git a/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt b/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt index 121cdedc9c..3ff42f29ab 100644 --- a/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt +++ b/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt @@ -13,10 +13,11 @@ public expect open class LockFreeLinkedListNode() { public val isRemoved: Boolean public val nextNode: LockFreeLinkedListNode public val prevNode: LockFreeLinkedListNode - public fun addLast(node: LockFreeLinkedListNode) + public fun addLast(node: LockFreeLinkedListNode): Boolean public fun addOneIfEmpty(node: LockFreeLinkedListNode): Boolean public inline fun addLastIf(node: LockFreeLinkedListNode, crossinline condition: () -> Boolean): Boolean public open fun remove(): Boolean + public fun close() } diff --git a/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt b/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt index 533fba6436..009f5b02a4 100644 --- a/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt +++ b/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt @@ -94,7 +94,7 @@ public actual open class LockFreeLinkedListNode { // prev.next correction, which does not provide linearizable backwards iteration, but can be used to // resume forward iteration when current node was removed. public actual val prevNode: Node - get() = correctPrev(null) ?: findPrevNonRemoved(_prev.value) + get() = correctPrev() ?: findPrevNonRemoved(_prev.value) private tailrec fun findPrevNonRemoved(current: Node): Node { if (!current.isRemoved) return current @@ -120,14 +120,21 @@ public actual open class LockFreeLinkedListNode { // ------ addLastXXX ------ /** - * Adds last item to this list. + * Adds last item to this list. Returns `false` if the list is closed. */ - public actual fun addLast(node: Node) { + public actual fun addLast(node: Node): Boolean { while (true) { // lock-free loop on prev.next - if (prevNode.addNext(node, this)) return + val currentPrev = prevNode + if (currentPrev is LIST_CLOSED) return false + if (currentPrev.addNext(node, this)) return true } } + /** + * Forbids adding new items to this list. + */ + public actual fun close() { addLast(LIST_CLOSED()) } + /** * Adds last item to this list atomically if the [condition] is true. */ @@ -135,6 +142,7 @@ public actual open class LockFreeLinkedListNode { val condAdd = makeCondAddOp(node, condition) while (true) { // lock-free loop on prev.next val prev = prevNode // sentinel node is never removed, so prev is always defined + if (prev is LIST_CLOSED) return false when (prev.tryCondAddNext(node, this, condAdd)) { SUCCESS -> return true FAILURE -> return false @@ -210,7 +218,7 @@ public actual open class LockFreeLinkedListNode { val removed = (next as Node).removed() if (_next.compareAndSet(next, removed)) { // was removed successfully (linearized remove) -- fixup the list - next.correctPrev(null) + next.correctPrev() return null } } @@ -250,7 +258,7 @@ public actual open class LockFreeLinkedListNode { if (next._prev.compareAndSet(nextPrev, this)) { // This newly added node could have been removed, and the above CAS would have added it physically again. // Let us double-check for this situation and correct if needed - if (isRemoved) next.correctPrev(null) + if (isRemoved) next.correctPrev() return } } @@ -268,7 +276,7 @@ public actual open class LockFreeLinkedListNode { * remover of this node will ultimately call [correctPrev] on the next node and that will fix all * the links from this node, too. */ - private tailrec fun correctPrev(op: OpDescriptor?): Node? { + private tailrec fun correctPrev(): Node? { val oldPrev = _prev.value var prev: Node = oldPrev var last: Node? = null // will be set so that last.next === prev @@ -281,22 +289,21 @@ public actual open class LockFreeLinkedListNode { // otherwise need to update prev if (!this._prev.compareAndSet(oldPrev, prev)) { // Note: retry from scratch on failure to update prev - return correctPrev(op) + return correctPrev() } return prev // return the correct prev } // slow path when we need to help remove operations this.isRemoved -> return null // nothing to do, this node was removed, bail out asap to save time - prevNext === op -> return prev // part of the same op -- don't recurse, didn't correct prev prevNext is OpDescriptor -> { // help & retry prevNext.perform(prev) - return correctPrev(op) // retry from scratch + return correctPrev() // retry from scratch } prevNext is Removed -> { if (last !== null) { // newly added (prev) node is already removed, correct last.next around it if (!last._next.compareAndSet(prev, prevNext.ref)) { - return correctPrev(op) // retry from scratch on failure to update next + return correctPrev() // retry from scratch on failure to update next } prev = last last = null @@ -363,3 +370,6 @@ public actual open class LockFreeLinkedListHead : LockFreeLinkedListNode() { validateNode(prev, next as Node) } } + +// not private due to what seems to be a compiler bug +internal class LIST_CLOSED: LockFreeLinkedListNode() diff --git a/kotlinx-coroutines-core/jsAndWasmShared/src/internal/LinkedList.kt b/kotlinx-coroutines-core/jsAndWasmShared/src/internal/LinkedList.kt index c4b1fddd03..4af9ff6152 100644 --- a/kotlinx-coroutines-core/jsAndWasmShared/src/internal/LinkedList.kt +++ b/kotlinx-coroutines-core/jsAndWasmShared/src/internal/LinkedList.kt @@ -26,12 +26,18 @@ public open class LinkedListNode : DisposableHandle { public inline val prevNode get() = _prev public inline val isRemoved get() = _removed - public fun addLast(node: Node) { + public fun addLast(node: Node): Boolean { val prev = this._prev + if (prev is CLOSED) return false node._next = this node._prev = prev prev._next = node this._prev = node + return true + } + + public fun close() { + addLast(CLOSED()) } /* @@ -41,15 +47,6 @@ public open class LinkedListNode : DisposableHandle { * invokes handler on remove */ public open fun remove(): Boolean { - return removeImpl() - } - - override fun dispose() { - remove() - } - - @PublishedApi - internal fun removeImpl(): Boolean { if (_removed) return false val prev = this._prev val next = this._next @@ -59,6 +56,10 @@ public open class LinkedListNode : DisposableHandle { return true } + override fun dispose() { + remove() + } + public fun addOneIfEmpty(node: Node): Boolean { if (_next !== this) return false addLast(node) @@ -67,34 +68,7 @@ public open class LinkedListNode : DisposableHandle { public inline fun addLastIf(node: Node, crossinline condition: () -> Boolean): Boolean { if (!condition()) return false - addLast(node) - return true - } - - public inline fun addLastIfPrev(node: Node, predicate: (Node) -> Boolean): Boolean { - if (!predicate(_prev)) return false - addLast(node) - return true - } - - public inline fun addLastIfPrevAndIf( - node: Node, - predicate: (Node) -> Boolean, // prev node predicate - crossinline condition: () -> Boolean // atomically checked condition - ): Boolean { - if (!predicate(_prev)) return false - if (!condition()) return false - addLast(node) - return true - } - - public fun helpRemove() {} // No concurrency on JS -> no removal - - public fun removeFirstOrNull(): Node? { - val next = _next - if (next === this) return null - check(next.removeImpl()) { "Should remove" } - return next + return addLast(node) } } @@ -116,3 +90,5 @@ public open class LinkedListHead : LinkedListNode() { // just a defensive programming -- makes sure that list head sentinel is never removed public final override fun remove(): Nothing = throw UnsupportedOperationException() } + +private class CLOSED: LinkedListNode() diff --git a/kotlinx-coroutines-core/jvm/test/MemoryFootprintTest.kt b/kotlinx-coroutines-core/jvm/test/MemoryFootprintTest.kt index be467cc5cd..0900acd547 100644 --- a/kotlinx-coroutines-core/jvm/test/MemoryFootprintTest.kt +++ b/kotlinx-coroutines-core/jvm/test/MemoryFootprintTest.kt @@ -12,7 +12,7 @@ import kotlin.test.* class MemoryFootprintTest : TestBase(true) { @Test - fun testJobLayout() = assertLayout(Job().javaClass, 24) + fun testJobLayout() = assertLayout(Job().javaClass, 32) @Test fun testCancellableContinuationFootprint() = assertLayout(CancellableContinuationImpl::class.java, 48)