@@ -456,51 +456,86 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
456
456
// Create node upfront -- for common cases it just initializes JobNode.job field,
457
457
// for user-defined handlers it allocates a JobNode object that we might not need, but this is Ok.
458
458
val node: JobNode = makeNode(handler, onCancelling)
459
+ /*
460
+ * LinkedList algorithm does not support removing and re-adding the same node,
461
+ * so here we check whether the node is already added to the list to avoid adding the node twice.
462
+ */
463
+ var isNodeAdded = false
459
464
loopOnState { state ->
460
465
when (state) {
461
466
is Empty -> { // EMPTY_X state -- no completion handlers
462
467
if (state.isActive) {
463
468
// try move to SINGLE state
464
469
if (_state .compareAndSet(state, node)) return node
465
- } else
470
+ } else {
466
471
promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
472
+ }
467
473
}
468
474
is Incomplete -> {
469
475
val list = state.list
470
476
if (list == null ) { // SINGLE/SINGLE+
471
477
promoteSingleToNodeList(state as JobNode )
472
- } else {
473
- var rootCause: Throwable ? = null
474
- var handle: DisposableHandle = NonDisposableHandle
475
- if (onCancelling && state is Finishing ) {
476
- synchronized(state) {
477
- // check if we are installing cancellation handler on job that is being cancelled
478
- rootCause = state.rootCause // != null if cancelling job
479
- // We add node to the list in two cases --- either the job is not being cancelled
480
- // or we are adding a child to a coroutine that is not completing yet
481
- if (rootCause == null || handler.isHandlerOf<ChildHandleNode >() && ! state.isCompleting) {
482
- // Note: add node the list while holding lock on state (make sure it cannot change)
483
- if (! addLastAtomic(state, list, node)) return @loopOnState // retry
484
- // just return node if we don't have to invoke handler (not cancelling yet)
485
- if (rootCause == null ) return node
486
- // otherwise handler is invoked immediately out of the synchronized section & handle returned
487
- handle = node
478
+ return @loopOnState // retry
479
+ }
480
+ // ...else {, but without nesting
481
+ var rootCause: Throwable ? = null
482
+ var handle: DisposableHandle = NonDisposableHandle
483
+ if (onCancelling && state is Finishing ) {
484
+ synchronized(state) {
485
+ // check if we are installing cancellation handler on job that is being cancelled
486
+ rootCause = state.rootCause // != null if cancelling job
487
+ // We add node to the list in two cases --- either the job is not being cancelled
488
+ // or we are adding a child to a coroutine that is not completing yet
489
+ if (rootCause == null || handler.isHandlerOf<ChildHandleNode >() && ! state.isCompleting) {
490
+ // Note: add node the list while holding lock on state (make sure it cannot change)
491
+ if (! isNodeAdded) list.addLast(node)
492
+ if (this .state != = state) {
493
+ /*
494
+ * Here we have an additional check for ChildCompletion. Invoking the handler once is not enough
495
+ * for this particular kind of node -- the caller makes a decision based on whether the node was added
496
+ * or not and that decision should be made **once**.
497
+ * To be more precise, the caller of ChildCompletion, in case when it's the last child,
498
+ * should make a decision whether to start transition to the final state, based on
499
+ * whether the ChildCompletion was added to the list or not. If not -- the JobNode.invoke will do that.
500
+ * See comment to JobNode.invoke, we cannot differentiate the situation when external state updater
501
+ * invoked or skipped the node, thus we additionally synchronize on 'markInvoked'.
502
+ */
503
+ if (node is ChildCompletion && ! node.markInvoked()) return node
504
+ // The state can only change to Complete here, so the node can stay in the list, just retry
505
+ return @loopOnState
488
506
}
507
+ // just return node if we don't have to invoke handler (not cancelling yet)
508
+ if (rootCause == null ) return node
509
+ // otherwise handler is invoked immediately out of the synchronized section & handle returned
510
+ handle = node
489
511
}
490
512
}
491
- if (rootCause != null ) {
492
- // Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job
493
- if (invokeImmediately) handler.invokeIt(rootCause)
494
- return handle
513
+ }
514
+ if (rootCause != null ) {
515
+ // Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job
516
+ if (invokeImmediately) {
517
+ node.invoke(rootCause)
518
+ }
519
+ return handle
520
+ } else {
521
+ if (! isNodeAdded) list.addLast(node)
522
+ if (this .state != = state) {
523
+ // Here again, we try to prevent concurrent finalization of the parent,
524
+ // if the parent fails to add ChildCompletion because the child changed it's state to Completed.
525
+ if (node is ChildCompletion && ! node.markInvoked()) return node
526
+ // If the state has changed to Complete, the node can stay in the list, just retry.
527
+ if (this .state !is Incomplete ) return @loopOnState
528
+ // If the state is Incomplete, set the flag that the node is already added to the list instead of removing it.
529
+ isNodeAdded = true
495
530
} else {
496
- if (addLastAtomic(state, list, node)) return node
531
+ return node
497
532
}
498
533
}
499
534
}
500
535
else -> { // is complete
501
- // :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
502
- // because we play type tricks on Kotlin/JS and handler is not necessarily a function there
503
- if (invokeImmediately) handler.invokeIt((state as ? CompletedExceptionally )?.cause)
536
+ if (invokeImmediately) {
537
+ node.invoke((state as ? CompletedExceptionally )?.cause)
538
+ }
504
539
return NonDisposableHandle
505
540
}
506
541
}
@@ -520,9 +555,6 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
520
555
return node
521
556
}
522
557
523
- private fun addLastAtomic (expect : Any , list : NodeList , node : JobNode ) =
524
- list.addLastIf(node) { this .state == = expect }
525
-
526
558
private fun promoteEmptyToNodeList (state : Empty ) {
527
559
// try to promote it to LIST state with the corresponding state
528
560
val list = NodeList ()
@@ -596,7 +628,13 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
596
628
}
597
629
is Incomplete -> { // may have a list of completion handlers
598
630
// remove node from the list if there is a list
599
- if (state.list != null ) node.remove()
631
+ if (state.list != null ) {
632
+ if (! node.remove() && node.isRemoved) {
633
+ // Note: .remove() returns 'false' if the node wasn't added at all, e.g.
634
+ // because it was an optimized "single element list"
635
+ node.helpRemove()
636
+ }
637
+ }
600
638
return
601
639
}
602
640
else -> return // it is complete and does not have any completion handlers
@@ -814,7 +852,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
814
852
}
815
853
}
816
854
}
817
- }
855
+ }
818
856
819
857
/* *
820
858
* Completes this job. Used by [AbstractCoroutine.resume].
@@ -1151,9 +1189,11 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
1151
1189
private val child : ChildHandleNode ,
1152
1190
private val proposedUpdate : Any?
1153
1191
) : JobNode() {
1154
- override fun invoke (cause : Throwable ? ) {
1192
+ override fun invokeOnce (cause : Throwable ? ) {
1155
1193
parent.continueCompleting(state, child, proposedUpdate)
1156
1194
}
1195
+ override fun toString (): String =
1196
+ " ChildCompletion[$child , $proposedUpdate ]"
1157
1197
}
1158
1198
1159
1199
private class AwaitContinuation <T >(
@@ -1355,6 +1395,31 @@ internal abstract class JobNode : CompletionHandlerBase(), DisposableHandle, Inc
1355
1395
override val isActive: Boolean get() = true
1356
1396
override val list: NodeList ? get() = null
1357
1397
override fun dispose () = job.removeNode(this )
1398
+ private val isInvoked = atomic(false )
1399
+
1400
+ /*
1401
+ * This method is guaranteed to be invoked once per JobNode lifecycle.
1402
+ */
1403
+ protected abstract fun invokeOnce (cause : Throwable ? )
1404
+
1405
+ /*
1406
+ * This method can be invoked more than once thus it's protected
1407
+ * with atomic flag.
1408
+ *
1409
+ * It can be invoked twice via [invokeOnCompletion(invokeImmediately = true)]
1410
+ * when addLastIf fails due to the race with Job state update.
1411
+ * In that case, we cannot distinguish the situation when the node was added
1412
+ * and the external state updater actually invoked it from the situation
1413
+ * when the state updater already invoked all elements from a list and then
1414
+ * we added a new node to already abandoned list.
1415
+ * So, when addLastIf fails, we invoke handler in-place, using
1416
+ * markInvoked as protection against the former case.
1417
+ */
1418
+ final override fun invoke (cause : Throwable ? ) {
1419
+ if (! markInvoked()) return
1420
+ invokeOnce(cause)
1421
+ }
1422
+ fun markInvoked () = isInvoked.compareAndSet(false , true )
1358
1423
override fun toString () = " $classSimpleName @$hexAddress [job@${job.hexAddress} ]"
1359
1424
}
1360
1425
@@ -1387,20 +1452,22 @@ internal class InactiveNodeList(
1387
1452
1388
1453
private class InvokeOnCompletion (
1389
1454
private val handler : CompletionHandler
1390
- ) : JobNode() {
1391
- override fun invoke (cause : Throwable ? ) = handler.invoke(cause)
1455
+ ) : JobNode() {
1456
+ override fun invokeOnce (cause : Throwable ? ) = handler.invoke(cause)
1457
+ override fun toString () = " InvokeOnCompletion[$classSimpleName @$hexAddress ]"
1392
1458
}
1393
1459
1394
1460
private class ResumeOnCompletion (
1395
1461
private val continuation : Continuation <Unit >
1396
1462
) : JobNode() {
1397
- override fun invoke (cause : Throwable ? ) = continuation.resume(Unit )
1463
+ override fun invokeOnce (cause : Throwable ? ) = continuation.resume(Unit )
1464
+ override fun toString () = " ResumeOnCompletion[$continuation ]"
1398
1465
}
1399
1466
1400
1467
private class ResumeAwaitOnCompletion <T >(
1401
1468
private val continuation : CancellableContinuationImpl <T >
1402
1469
) : JobNode() {
1403
- override fun invoke (cause : Throwable ? ) {
1470
+ override fun invokeOnce (cause : Throwable ? ) {
1404
1471
val state = job.state
1405
1472
assert { state !is Incomplete }
1406
1473
if (state is CompletedExceptionally ) {
@@ -1412,32 +1479,36 @@ private class ResumeAwaitOnCompletion<T>(
1412
1479
continuation.resume(state.unboxState() as T )
1413
1480
}
1414
1481
}
1482
+ override fun toString () = " ResumeAwaitOnCompletion[$continuation ]"
1415
1483
}
1416
1484
1417
1485
internal class DisposeOnCompletion (
1418
1486
private val handle : DisposableHandle
1419
1487
) : JobNode() {
1420
- override fun invoke (cause : Throwable ? ) = handle.dispose()
1488
+ override fun invokeOnce (cause : Throwable ? ) = handle.dispose()
1489
+ override fun toString (): String = " DisposeOnCompletion[${handle} ]"
1421
1490
}
1422
1491
1423
1492
private class SelectJoinOnCompletion <R >(
1424
1493
private val select : SelectInstance <R >,
1425
1494
private val block : suspend () -> R
1426
1495
) : JobNode() {
1427
- override fun invoke (cause : Throwable ? ) {
1496
+ override fun invokeOnce (cause : Throwable ? ) {
1428
1497
if (select.trySelect())
1429
1498
block.startCoroutineCancellable(select.completion)
1430
1499
}
1500
+ override fun toString (): String = " SelectJoinOnCompletion[$select ]"
1431
1501
}
1432
1502
1433
1503
private class SelectAwaitOnCompletion <T , R >(
1434
1504
private val select : SelectInstance <R >,
1435
1505
private val block : suspend (T ) -> R
1436
1506
) : JobNode() {
1437
- override fun invoke (cause : Throwable ? ) {
1507
+ override fun invokeOnce (cause : Throwable ? ) {
1438
1508
if (select.trySelect())
1439
1509
job.selectAwaitCompletion(select, block)
1440
1510
}
1511
+ override fun toString (): String = " SelectAwaitOnCompletion[$select ]"
1441
1512
}
1442
1513
1443
1514
// -------- invokeOnCancellation nodes
@@ -1450,28 +1521,28 @@ internal abstract class JobCancellingNode : JobNode()
1450
1521
1451
1522
private class InvokeOnCancelling (
1452
1523
private val handler : CompletionHandler
1453
- ) : JobCancellingNode() {
1524
+ ) : JobCancellingNode() {
1454
1525
// delegate handler shall be invoked at most once, so here is an additional flag
1455
- private val _invoked = atomic(0 ) // todo: replace with atomic boolean after migration to recent atomicFu
1456
- override fun invoke (cause : Throwable ? ) {
1457
- if (_invoked .compareAndSet(0 , 1 )) handler.invoke(cause)
1458
- }
1526
+ override fun invokeOnce (cause : Throwable ? ) = handler.invoke(cause)
1527
+ override fun toString () = " InvokeOnCancelling[$classSimpleName @$hexAddress ]"
1459
1528
}
1460
1529
1461
1530
internal class ChildHandleNode (
1462
1531
@JvmField val childJob : ChildJob
1463
1532
) : JobCancellingNode(), ChildHandle {
1464
1533
override val parent: Job get() = job
1465
- override fun invoke (cause : Throwable ? ) = childJob.parentCancelled(job)
1534
+ override fun invokeOnce (cause : Throwable ? ) = childJob.parentCancelled(job)
1466
1535
override fun childCancelled (cause : Throwable ): Boolean = job.childCancelled(cause)
1536
+ override fun toString (): String = " ChildHandle[$childJob ]"
1467
1537
}
1468
1538
1469
1539
// Same as ChildHandleNode, but for cancellable continuation
1470
1540
internal class ChildContinuation (
1471
1541
@JvmField val child : CancellableContinuationImpl <* >
1472
1542
) : JobCancellingNode() {
1473
- override fun invoke (cause : Throwable ? ) {
1543
+ override fun invokeOnce (cause : Throwable ? ) =
1474
1544
child.parentCancelled(child.getContinuationCancellationCause(job))
1475
- }
1545
+ override fun toString (): String =
1546
+ " ChildContinuation[$child ]"
1476
1547
}
1477
1548
0 commit comments