Skip to content

Commit a4ae389

Browse files
authored
Revert "JobSupport: non-atomic adding to a Job's list of listeners (#2096)" (#3034)
This reverts commit 2b1e276.
1 parent 9d1be50 commit a4ae389

File tree

10 files changed

+110
-136
lines changed

10 files changed

+110
-136
lines changed

kotlinx-coroutines-core/common/src/Await.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,8 @@ private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {
107107
var disposer: DisposeHandlersOnCancel?
108108
get() = _disposer.value
109109
set(value) { _disposer.value = value }
110-
111-
override fun invokeOnce(cause: Throwable?) {
110+
111+
override fun invoke(cause: Throwable?) {
112112
if (cause != null) {
113113
val token = continuation.tryResumeWithException(cause)
114114
if (token != null) {

kotlinx-coroutines-core/common/src/CompletionHandler.common.kt

-2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,4 @@ internal expect val CancelHandlerBase.asHandler: CompletionHandler
4444
// because we play type tricks on Kotlin/JS and handler is not necessarily a function there
4545
internal expect fun CompletionHandler.invokeIt(cause: Throwable?)
4646

47-
// :KLUDGE: We have to use `isHandlerOf` extension, because performing this type check directly in the code
48-
// causes Incompatible types error during Kotlin/JS compilation
4947
internal inline fun <reified T> CompletionHandler.isHandlerOf(): Boolean = this is T

kotlinx-coroutines-core/common/src/JobSupport.kt

+46-117
Original file line numberDiff line numberDiff line change
@@ -456,86 +456,51 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
456456
// Create node upfront -- for common cases it just initializes JobNode.job field,
457457
// for user-defined handlers it allocates a JobNode object that we might not need, but this is Ok.
458458
val node: JobNode = makeNode(handler, onCancelling)
459-
/*
460-
* LinkedList algorithm does not support removing and re-adding the same node,
461-
* so here we check whether the node is already added to the list to avoid adding the node twice.
462-
*/
463-
var isNodeAdded = false
464459
loopOnState { state ->
465460
when (state) {
466461
is Empty -> { // EMPTY_X state -- no completion handlers
467462
if (state.isActive) {
468463
// try move to SINGLE state
469464
if (_state.compareAndSet(state, node)) return node
470-
} else {
465+
} else
471466
promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
472-
}
473467
}
474468
is Incomplete -> {
475469
val list = state.list
476470
if (list == null) { // SINGLE/SINGLE+
477471
promoteSingleToNodeList(state as JobNode)
478-
return@loopOnState // retry
479-
}
480-
// ...else {, but without nesting
481-
var rootCause: Throwable? = null
482-
var handle: DisposableHandle = NonDisposableHandle
483-
if (onCancelling && state is Finishing) {
484-
synchronized(state) {
485-
// check if we are installing cancellation handler on job that is being cancelled
486-
rootCause = state.rootCause // != null if cancelling job
487-
// We add node to the list in two cases --- either the job is not being cancelled
488-
// or we are adding a child to a coroutine that is not completing yet
489-
if (rootCause == null || handler.isHandlerOf<ChildHandleNode>() && !state.isCompleting) {
490-
// Note: add node the list while holding lock on state (make sure it cannot change)
491-
if (!isNodeAdded) list.addLast(node)
492-
if (this.state !== state) {
493-
/*
494-
* Here we have an additional check for ChildCompletion. Invoking the handler once is not enough
495-
* for this particular kind of node -- the caller makes a decision based on whether the node was added
496-
* or not and that decision should be made **once**.
497-
* To be more precise, the caller of ChildCompletion, in case when it's the last child,
498-
* should make a decision whether to start transition to the final state, based on
499-
* whether the ChildCompletion was added to the list or not. If not -- the JobNode.invoke will do that.
500-
* See comment to JobNode.invoke, we cannot differentiate the situation when external state updater
501-
* invoked or skipped the node, thus we additionally synchronize on 'markInvoked'.
502-
*/
503-
if (node is ChildCompletion && !node.markInvoked()) return node
504-
// The state can only change to Complete here, so the node can stay in the list, just retry
505-
return@loopOnState
472+
} else {
473+
var rootCause: Throwable? = null
474+
var handle: DisposableHandle = NonDisposableHandle
475+
if (onCancelling && state is Finishing) {
476+
synchronized(state) {
477+
// check if we are installing cancellation handler on job that is being cancelled
478+
rootCause = state.rootCause // != null if cancelling job
479+
// We add node to the list in two cases --- either the job is not being cancelled
480+
// or we are adding a child to a coroutine that is not completing yet
481+
if (rootCause == null || handler.isHandlerOf<ChildHandleNode>() && !state.isCompleting) {
482+
// Note: add node the list while holding lock on state (make sure it cannot change)
483+
if (!addLastAtomic(state, list, node)) return@loopOnState // retry
484+
// just return node if we don't have to invoke handler (not cancelling yet)
485+
if (rootCause == null) return node
486+
// otherwise handler is invoked immediately out of the synchronized section & handle returned
487+
handle = node
506488
}
507-
// just return node if we don't have to invoke handler (not cancelling yet)
508-
if (rootCause == null) return node
509-
// otherwise handler is invoked immediately out of the synchronized section & handle returned
510-
handle = node
511489
}
512490
}
513-
}
514-
if (rootCause != null) {
515-
// Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job
516-
if (invokeImmediately) {
517-
node.invoke(rootCause)
518-
}
519-
return handle
520-
} else {
521-
if (!isNodeAdded) list.addLast(node)
522-
if (this.state !== state) {
523-
// Here again, we try to prevent concurrent finalization of the parent,
524-
// if the parent fails to add ChildCompletion because the child changed it's state to Completed.
525-
if (node is ChildCompletion && !node.markInvoked()) return node
526-
// If the state has changed to Complete, the node can stay in the list, just retry.
527-
if (this.state !is Incomplete) return@loopOnState
528-
// If the state is Incomplete, set the flag that the node is already added to the list instead of removing it.
529-
isNodeAdded = true
491+
if (rootCause != null) {
492+
// Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job
493+
if (invokeImmediately) handler.invokeIt(rootCause)
494+
return handle
530495
} else {
531-
return node
496+
if (addLastAtomic(state, list, node)) return node
532497
}
533498
}
534499
}
535500
else -> { // is complete
536-
if (invokeImmediately) {
537-
node.invoke((state as? CompletedExceptionally)?.cause)
538-
}
501+
// :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
502+
// because we play type tricks on Kotlin/JS and handler is not necessarily a function there
503+
if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause)
539504
return NonDisposableHandle
540505
}
541506
}
@@ -555,6 +520,9 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
555520
return node
556521
}
557522

523+
private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode) =
524+
list.addLastIf(node) { this.state === expect }
525+
558526
private fun promoteEmptyToNodeList(state: Empty) {
559527
// try to promote it to LIST state with the corresponding state
560528
val list = NodeList()
@@ -628,13 +596,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
628596
}
629597
is Incomplete -> { // may have a list of completion handlers
630598
// remove node from the list if there is a list
631-
if (state.list != null) {
632-
if (!node.remove() && node.isRemoved) {
633-
// Note: .remove() returns 'false' if the node wasn't added at all, e.g.
634-
// because it was an optimized "single element list"
635-
node.helpRemove()
636-
}
637-
}
599+
if (state.list != null) node.remove()
638600
return
639601
}
640602
else -> return // it is complete and does not have any completion handlers
@@ -852,7 +814,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
852814
}
853815
}
854816
}
855-
}
817+
}
856818

857819
/**
858820
* Completes this job. Used by [AbstractCoroutine.resume].
@@ -1189,11 +1151,9 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
11891151
private val child: ChildHandleNode,
11901152
private val proposedUpdate: Any?
11911153
) : JobNode() {
1192-
override fun invokeOnce(cause: Throwable?) {
1154+
override fun invoke(cause: Throwable?) {
11931155
parent.continueCompleting(state, child, proposedUpdate)
11941156
}
1195-
override fun toString(): String =
1196-
"ChildCompletion[$child, $proposedUpdate]"
11971157
}
11981158

11991159
private class AwaitContinuation<T>(
@@ -1395,31 +1355,6 @@ internal abstract class JobNode : CompletionHandlerBase(), DisposableHandle, Inc
13951355
override val isActive: Boolean get() = true
13961356
override val list: NodeList? get() = null
13971357
override fun dispose() = job.removeNode(this)
1398-
private val isInvoked = atomic(false)
1399-
1400-
/*
1401-
* This method is guaranteed to be invoked once per JobNode lifecycle.
1402-
*/
1403-
protected abstract fun invokeOnce(cause: Throwable?)
1404-
1405-
/*
1406-
* This method can be invoked more than once thus it's protected
1407-
* with atomic flag.
1408-
*
1409-
* It can be invoked twice via [invokeOnCompletion(invokeImmediately = true)]
1410-
* when addLastIf fails due to the race with Job state update.
1411-
* In that case, we cannot distinguish the situation when the node was added
1412-
* and the external state updater actually invoked it from the situation
1413-
* when the state updater already invoked all elements from a list and then
1414-
* we added a new node to already abandoned list.
1415-
* So, when addLastIf fails, we invoke handler in-place, using
1416-
* markInvoked as protection against the former case.
1417-
*/
1418-
final override fun invoke(cause: Throwable?) {
1419-
if (!markInvoked()) return
1420-
invokeOnce(cause)
1421-
}
1422-
fun markInvoked() = isInvoked.compareAndSet(false, true)
14231358
override fun toString() = "$classSimpleName@$hexAddress[job@${job.hexAddress}]"
14241359
}
14251360

@@ -1452,22 +1387,20 @@ internal class InactiveNodeList(
14521387

14531388
private class InvokeOnCompletion(
14541389
private val handler: CompletionHandler
1455-
) : JobNode() {
1456-
override fun invokeOnce(cause: Throwable?) = handler.invoke(cause)
1457-
override fun toString() = "InvokeOnCompletion[$classSimpleName@$hexAddress]"
1390+
) : JobNode() {
1391+
override fun invoke(cause: Throwable?) = handler.invoke(cause)
14581392
}
14591393

14601394
private class ResumeOnCompletion(
14611395
private val continuation: Continuation<Unit>
14621396
) : JobNode() {
1463-
override fun invokeOnce(cause: Throwable?) = continuation.resume(Unit)
1464-
override fun toString() = "ResumeOnCompletion[$continuation]"
1397+
override fun invoke(cause: Throwable?) = continuation.resume(Unit)
14651398
}
14661399

14671400
private class ResumeAwaitOnCompletion<T>(
14681401
private val continuation: CancellableContinuationImpl<T>
14691402
) : JobNode() {
1470-
override fun invokeOnce(cause: Throwable?) {
1403+
override fun invoke(cause: Throwable?) {
14711404
val state = job.state
14721405
assert { state !is Incomplete }
14731406
if (state is CompletedExceptionally) {
@@ -1479,36 +1412,32 @@ private class ResumeAwaitOnCompletion<T>(
14791412
continuation.resume(state.unboxState() as T)
14801413
}
14811414
}
1482-
override fun toString() = "ResumeAwaitOnCompletion[$continuation]"
14831415
}
14841416

14851417
internal class DisposeOnCompletion(
14861418
private val handle: DisposableHandle
14871419
) : JobNode() {
1488-
override fun invokeOnce(cause: Throwable?) = handle.dispose()
1489-
override fun toString(): String = "DisposeOnCompletion[${handle}]"
1420+
override fun invoke(cause: Throwable?) = handle.dispose()
14901421
}
14911422

14921423
private class SelectJoinOnCompletion<R>(
14931424
private val select: SelectInstance<R>,
14941425
private val block: suspend () -> R
14951426
) : JobNode() {
1496-
override fun invokeOnce(cause: Throwable?) {
1427+
override fun invoke(cause: Throwable?) {
14971428
if (select.trySelect())
14981429
block.startCoroutineCancellable(select.completion)
14991430
}
1500-
override fun toString(): String = "SelectJoinOnCompletion[$select]"
15011431
}
15021432

15031433
private class SelectAwaitOnCompletion<T, R>(
15041434
private val select: SelectInstance<R>,
15051435
private val block: suspend (T) -> R
15061436
) : JobNode() {
1507-
override fun invokeOnce(cause: Throwable?) {
1437+
override fun invoke(cause: Throwable?) {
15081438
if (select.trySelect())
15091439
job.selectAwaitCompletion(select, block)
15101440
}
1511-
override fun toString(): String = "SelectAwaitOnCompletion[$select]"
15121441
}
15131442

15141443
// -------- invokeOnCancellation nodes
@@ -1521,28 +1450,28 @@ internal abstract class JobCancellingNode : JobNode()
15211450

15221451
private class InvokeOnCancelling(
15231452
private val handler: CompletionHandler
1524-
) : JobCancellingNode() {
1453+
) : JobCancellingNode() {
15251454
// delegate handler shall be invoked at most once, so here is an additional flag
1526-
override fun invokeOnce(cause: Throwable?) = handler.invoke(cause)
1527-
override fun toString() = "InvokeOnCancelling[$classSimpleName@$hexAddress]"
1455+
private val _invoked = atomic(0) // todo: replace with atomic boolean after migration to recent atomicFu
1456+
override fun invoke(cause: Throwable?) {
1457+
if (_invoked.compareAndSet(0, 1)) handler.invoke(cause)
1458+
}
15281459
}
15291460

15301461
internal class ChildHandleNode(
15311462
@JvmField val childJob: ChildJob
15321463
) : JobCancellingNode(), ChildHandle {
15331464
override val parent: Job get() = job
1534-
override fun invokeOnce(cause: Throwable?) = childJob.parentCancelled(job)
1465+
override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
15351466
override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
1536-
override fun toString(): String = "ChildHandle[$childJob]"
15371467
}
15381468

15391469
// Same as ChildHandleNode, but for cancellable continuation
15401470
internal class ChildContinuation(
15411471
@JvmField val child: CancellableContinuationImpl<*>
15421472
) : JobCancellingNode() {
1543-
override fun invokeOnce(cause: Throwable?) =
1473+
override fun invoke(cause: Throwable?) {
15441474
child.parentCancelled(child.getContinuationCancellationCause(job))
1545-
override fun toString(): String =
1546-
"ChildContinuation[$child]"
1475+
}
15471476
}
15481477

kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ public expect open class LockFreeLinkedListNode() {
1515
public val prevNode: LockFreeLinkedListNode
1616
public fun addLast(node: LockFreeLinkedListNode)
1717
public fun addOneIfEmpty(node: LockFreeLinkedListNode): Boolean
18+
public inline fun addLastIf(node: LockFreeLinkedListNode, crossinline condition: () -> Boolean): Boolean
1819
public inline fun addLastIfPrev(
1920
node: LockFreeLinkedListNode,
2021
predicate: (LockFreeLinkedListNode) -> Boolean

kotlinx-coroutines-core/common/src/selects/Select.kt

+2-3
Original file line numberDiff line numberDiff line change
@@ -336,10 +336,9 @@ internal class SelectBuilderImpl<in R>(
336336

337337
private inner class SelectOnCancelling : JobCancellingNode() {
338338
// Note: may be invoked multiple times, but only the first trySelect succeeds anyway
339-
override fun invokeOnce(cause: Throwable?) {
340-
if (trySelect()) {
339+
override fun invoke(cause: Throwable?) {
340+
if (trySelect())
341341
resumeSelectWithException(job.getCancellationException())
342-
}
343342
}
344343
}
345344

kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt

+21-3
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@ public actual open class LockFreeLinkedListNode {
8686
}
8787
}
8888

89+
@PublishedApi
90+
internal inline fun makeCondAddOp(node: Node, crossinline condition: () -> Boolean): CondAddOp =
91+
object : CondAddOp(node) {
92+
override fun prepare(affected: Node): Any? = if (condition()) null else CONDITION_FALSE
93+
}
94+
8995
public actual open val isRemoved: Boolean get() = next is Removed
9096

9197
// LINEARIZABLE. Returns Node | Removed
@@ -141,6 +147,20 @@ public actual open class LockFreeLinkedListNode {
141147

142148
public fun <T : Node> describeAddLast(node: T): AddLastDesc<T> = AddLastDesc(this, node)
143149

150+
/**
151+
* Adds last item to this list atomically if the [condition] is true.
152+
*/
153+
public actual inline fun addLastIf(node: Node, crossinline condition: () -> Boolean): Boolean {
154+
val condAdd = makeCondAddOp(node, condition)
155+
while (true) { // lock-free loop on prev.next
156+
val prev = prevNode // sentinel node is never removed, so prev is always defined
157+
when (prev.tryCondAddNext(node, this, condAdd)) {
158+
SUCCESS -> return true
159+
FAILURE -> return false
160+
}
161+
}
162+
}
163+
144164
public actual inline fun addLastIfPrev(node: Node, predicate: (Node) -> Boolean): Boolean {
145165
while (true) { // lock-free loop on prev.next
146166
val prev = prevNode // sentinel node is never removed, so prev is always defined
@@ -154,9 +174,7 @@ public actual open class LockFreeLinkedListNode {
154174
predicate: (Node) -> Boolean, // prev node predicate
155175
crossinline condition: () -> Boolean // atomically checked condition
156176
): Boolean {
157-
val condAdd = object : CondAddOp(node) {
158-
override fun prepare(affected: Node): Any? = if (condition()) null else CONDITION_FALSE
159-
}
177+
val condAdd = makeCondAddOp(node, condition)
160178
while (true) { // lock-free loop on prev.next
161179
val prev = prevNode // sentinel node is never removed, so prev is always defined
162180
if (!predicate(prev)) return false

kotlinx-coroutines-core/concurrent/test/internal/LockFreeLinkedListTest.kt

+14
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,20 @@ class LockFreeLinkedListTest {
3232
assertContents(list)
3333
}
3434

35+
@Test
36+
fun testCondOps() {
37+
val list = LockFreeLinkedListHead()
38+
assertContents(list)
39+
assertTrue(list.addLastIf(IntNode(1)) { true })
40+
assertContents(list, 1)
41+
assertFalse(list.addLastIf(IntNode(2)) { false })
42+
assertContents(list, 1)
43+
assertTrue(list.addLastIf(IntNode(3)) { true })
44+
assertContents(list, 1, 3)
45+
assertFalse(list.addLastIf(IntNode(4)) { false })
46+
assertContents(list, 1, 3)
47+
}
48+
3549
@Test
3650
fun testAtomicOpsSingle() {
3751
val list = LockFreeLinkedListHead()

0 commit comments

Comments
 (0)