Skip to content

Remove DCSS #3992

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions kotlinx-coroutines-core/common/src/Job.kt
Original file line number Diff line number Diff line change
Expand Up @@ -682,3 +682,9 @@ public object NonDisposableHandle : DisposableHandle, ChildHandle {
*/
override fun toString(): String = "NonDisposableHandle"
}

private class DisposeOnCompletion(
private val handle: DisposableHandle
) : JobNode() {
override fun invoke(cause: Throwable?) = handle.dispose()
}
97 changes: 55 additions & 42 deletions kotlinx-coroutines-core/common/src/JobSupport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
// Used by the IDEA debugger via reflection and must be kept binary-compatible, see KTIJ-24102
private val _state = atomic<Any?>(if (active) EMPTY_ACTIVE else EMPTY_NEW)

/** `true` means that the Job is cancelling and shouldn't accept [invokeOnCompletion] with `onCancelling = true` */
private val onCancellingHandlersNotAccepted = atomic(false)

private val _parentHandle = atomic<ChildHandle?>(null)
internal var parentHandle: ChildHandle?
get() = _parentHandle.value
Expand All @@ -148,7 +151,6 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
return
}
parent.start() // make sure the parent is started
@Suppress("DEPRECATION")
val handle = parent.attachChild(this)
parentHandle = handle
// now check our state _after_ registering (see tryFinalizeSimpleState order of actions)
Expand Down Expand Up @@ -236,6 +238,8 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
if (!wasCancelling) onCancelling(finalException)
onCompletionInternal(finalState)
// Then CAS to completed state -> it must succeed
// forbid any new children
onCancellingHandlersNotAccepted.value = true
val casSuccess = _state.compareAndSet(state, finalState.boxIncomplete())
assert { casSuccess }
// And process all post-completion actions
Expand Down Expand Up @@ -327,7 +331,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
}

private fun notifyCancelling(list: NodeList, cause: Throwable) {
// first cancel our own children
// then cancel our own children
onCancelling(cause)
notifyHandlers<JobCancellingNode>(list, cause)
// then cancel parent
Expand Down Expand Up @@ -360,8 +364,10 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
return parent.childCancelled(cause) || isCancellation
}

private fun NodeList.notifyCompletion(cause: Throwable?) =
private fun NodeList.notifyCompletion(cause: Throwable?) {
close()
notifyHandlers<JobNode>(this, cause)
}

private inline fun <reified T: JobNode> notifyHandlers(list: NodeList, cause: Throwable?) {
var exception: Throwable? = null
Expand Down Expand Up @@ -459,52 +465,63 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
// for user-defined handlers it allocates a JobNode object that we might not need, but this is Ok.
val node: JobNode = makeNode(handler, onCancelling)
loopOnState { state ->
when (state) {
is Empty -> { // EMPTY_X state -- no completion handlers
when {
state !is Incomplete || onCancelling && onCancellingHandlersNotAccepted.value -> {
if (invokeImmediately) {
val exception = when (state) {
is CompletedExceptionally -> state.cause
is Finishing -> state.rootCause
else -> null
}
// :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
// because we play type tricks on Kotlin/JS and handler is not necessarily a function there
handler.invokeIt(exception)
}
return NonDisposableHandle
}
state is Empty -> { // EMPTY_X state -- no completion handlers
if (state.isActive) {
// try move to SINGLE state
if (_state.compareAndSet(state, node)) return node
} else
promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
}
is Incomplete -> {
else -> {
// is Incomplete
val list = state.list
if (list == null) { // SINGLE/SINGLE+
promoteSingleToNodeList(state as JobNode)
} else {
var rootCause: Throwable? = null
var handle: DisposableHandle = NonDisposableHandle
if (onCancelling && state is Finishing) {
synchronized(state) {
// check if we are installing cancellation handler on job that is being cancelled
rootCause = state.rootCause // != null if cancelling job
// We add node to the list in two cases --- either the job is not being cancelled
// or we are adding a child to a coroutine that is not completing yet
if (rootCause == null || handler.isHandlerOf<ChildHandleNode>() && !state.isCompleting) {
// Note: add node the list while holding lock on state (make sure it cannot change)
if (!addLastAtomic(state, list, node)) return@loopOnState // retry
// just return node if we don't have to invoke handler (not cancelling yet)
if (rootCause == null) return node
// otherwise handler is invoked immediately out of the synchronized section & handle returned
handle = node
if (onCancelling) {
if (state is Finishing) {
val rootCause: Throwable?
val handle: DisposableHandle
synchronized(state) {
// check if we are installing cancellation handler on job that is being cancelled
rootCause = state.rootCause // != null if cancelling job
// We add node to the list in two cases --- either the job is not being cancelled
// or we are adding a child to a coroutine that is not completing yet
if (rootCause == null || handler.isHandlerOf<ChildHandleNode>() && !state.isCompleting) {
// Note: add node the list while holding lock on state (make sure it cannot change)
if (!list.addLastIf(node) { !onCancellingHandlersNotAccepted.value }) return@loopOnState // retry
// just return node if we don't have to invoke handler (not cancelling yet)
if (rootCause == null) return node
// otherwise handler is invoked immediately out of the synchronized section & handle returned
handle = node
} else {
handle = NonDisposableHandle
}
}
// Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job
if (invokeImmediately) handler.invokeIt(rootCause)
return handle
}
}
if (rootCause != null) {
// Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job
if (invokeImmediately) handler.invokeIt(rootCause)
return handle
if (list.addLastIf(node) { !onCancellingHandlersNotAccepted.value }) return node
} else {
if (addLastAtomic(state, list, node)) return node
if (list.addLast(node)) return node
}
}
}
else -> { // is complete
// :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
// because we play type tricks on Kotlin/JS and handler is not necessarily a function there
if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause)
return NonDisposableHandle
}
}
}
}
Expand Down Expand Up @@ -881,6 +898,8 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
val finishing = state as? Finishing ?: Finishing(list, false, null)
// must synchronize updates to finishing state
var notifyRootCause: Throwable? = null
// forbid any new children
onCancellingHandlersNotAccepted.value = true
synchronized(finishing) {
// check if this state is already completing
if (finishing.isCompleting) return COMPLETING_ALREADY
Expand Down Expand Up @@ -1175,7 +1194,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
return parent.getCancellationException()
}

protected override fun nameString(): String =
override fun nameString(): String =
"AwaitContinuation"
}

Expand Down Expand Up @@ -1372,7 +1391,7 @@ internal class NodeList : LockFreeLinkedListHead(), Incomplete {
if (DEBUG) getString("Active") else super.toString()
}

internal class InactiveNodeList(
private class InactiveNodeList(
override val list: NodeList
) : Incomplete {
override val isActive: Boolean get() = false
Expand Down Expand Up @@ -1408,12 +1427,6 @@ private class ResumeAwaitOnCompletion<T>(
}
}

internal class DisposeOnCompletion(
private val handle: DisposableHandle
) : JobNode() {
override fun invoke(cause: Throwable?) = handle.dispose()
}

// -------- invokeOnCancellation nodes

/**
Expand All @@ -1432,7 +1445,7 @@ private class InvokeOnCancelling(
}
}

internal class ChildHandleNode(
private class ChildHandleNode(
@JvmField val childJob: ChildJob
) : JobCancellingNode(), ChildHandle {
override val parent: Job get() = job
Expand Down
8 changes: 0 additions & 8 deletions kotlinx-coroutines-core/common/src/internal/Atomic.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@ public abstract class OpDescriptor {
*/
abstract fun perform(affected: Any?): Any?

/**
* Returns reference to atomic operation that this descriptor is a part of or `null`
* if not a part of any [AtomicOp].
*/
abstract val atomicOp: AtomicOp<*>?

override fun toString(): String = "$classSimpleName@$hexAddress" // debug
}

Expand All @@ -49,8 +43,6 @@ internal val NO_DECISION: Any = Symbol("NO_DECISION")
public abstract class AtomicOp<in T> : OpDescriptor() {
private val _consensus = atomic<Any?>(NO_DECISION)

override val atomicOp: AtomicOp<*> get() = this

private fun decide(decision: Any?): Any? {
assert { decision !== NO_DECISION }
val current = _consensus.value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ public expect open class LockFreeLinkedListNode() {
public val isRemoved: Boolean
public val nextNode: LockFreeLinkedListNode
public val prevNode: LockFreeLinkedListNode
public fun addLast(node: LockFreeLinkedListNode)
public fun addLast(node: LockFreeLinkedListNode): Boolean
public fun addOneIfEmpty(node: LockFreeLinkedListNode): Boolean
public inline fun addLastIf(node: LockFreeLinkedListNode, crossinline condition: () -> Boolean): Boolean
public open fun remove(): Boolean
public fun close()

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public actual open class LockFreeLinkedListNode {
// prev.next correction, which does not provide linearizable backwards iteration, but can be used to
// resume forward iteration when current node was removed.
public actual val prevNode: Node
get() = correctPrev(null) ?: findPrevNonRemoved(_prev.value)
get() = correctPrev() ?: findPrevNonRemoved(_prev.value)

private tailrec fun findPrevNonRemoved(current: Node): Node {
if (!current.isRemoved) return current
Expand All @@ -120,21 +120,29 @@ public actual open class LockFreeLinkedListNode {
// ------ addLastXXX ------

/**
* Adds last item to this list.
* Adds last item to this list. Returns `false` if the list is closed.
*/
public actual fun addLast(node: Node) {
public actual fun addLast(node: Node): Boolean {
while (true) { // lock-free loop on prev.next
if (prevNode.addNext(node, this)) return
val currentPrev = prevNode
if (currentPrev is LIST_CLOSED) return false
if (currentPrev.addNext(node, this)) return true
}
}

/**
* Forbids adding new items to this list.
*/
public actual fun close() { addLast(LIST_CLOSED()) }

/**
* Adds last item to this list atomically if the [condition] is true.
*/
public actual inline fun addLastIf(node: Node, crossinline condition: () -> Boolean): Boolean {
val condAdd = makeCondAddOp(node, condition)
while (true) { // lock-free loop on prev.next
val prev = prevNode // sentinel node is never removed, so prev is always defined
if (prev is LIST_CLOSED) return false
when (prev.tryCondAddNext(node, this, condAdd)) {
SUCCESS -> return true
FAILURE -> return false
Expand Down Expand Up @@ -210,7 +218,7 @@ public actual open class LockFreeLinkedListNode {
val removed = (next as Node).removed()
if (_next.compareAndSet(next, removed)) {
// was removed successfully (linearized remove) -- fixup the list
next.correctPrev(null)
next.correctPrev()
return null
}
}
Expand Down Expand Up @@ -250,7 +258,7 @@ public actual open class LockFreeLinkedListNode {
if (next._prev.compareAndSet(nextPrev, this)) {
// This newly added node could have been removed, and the above CAS would have added it physically again.
// Let us double-check for this situation and correct if needed
if (isRemoved) next.correctPrev(null)
if (isRemoved) next.correctPrev()
return
}
}
Expand All @@ -268,7 +276,7 @@ public actual open class LockFreeLinkedListNode {
* remover of this node will ultimately call [correctPrev] on the next node and that will fix all
* the links from this node, too.
*/
private tailrec fun correctPrev(op: OpDescriptor?): Node? {
private tailrec fun correctPrev(): Node? {
val oldPrev = _prev.value
var prev: Node = oldPrev
var last: Node? = null // will be set so that last.next === prev
Expand All @@ -281,22 +289,21 @@ public actual open class LockFreeLinkedListNode {
// otherwise need to update prev
if (!this._prev.compareAndSet(oldPrev, prev)) {
// Note: retry from scratch on failure to update prev
return correctPrev(op)
return correctPrev()
}
return prev // return the correct prev
}
// slow path when we need to help remove operations
this.isRemoved -> return null // nothing to do, this node was removed, bail out asap to save time
prevNext === op -> return prev // part of the same op -- don't recurse, didn't correct prev
prevNext is OpDescriptor -> { // help & retry
prevNext.perform(prev)
return correctPrev(op) // retry from scratch
return correctPrev() // retry from scratch
}
prevNext is Removed -> {
if (last !== null) {
// newly added (prev) node is already removed, correct last.next around it
if (!last._next.compareAndSet(prev, prevNext.ref)) {
return correctPrev(op) // retry from scratch on failure to update next
return correctPrev() // retry from scratch on failure to update next
}
prev = last
last = null
Expand Down Expand Up @@ -363,3 +370,6 @@ public actual open class LockFreeLinkedListHead : LockFreeLinkedListNode() {
validateNode(prev, next as Node)
}
}

// not private due to what seems to be a compiler bug
internal class LIST_CLOSED: LockFreeLinkedListNode()
Loading