@@ -193,7 +193,8 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
193
193
194
194
// Finalizes Finishing -> Completed (terminal state) transition.
195
195
// ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
196
- private fun tryFinalizeFinishingState (state : Finishing , proposedUpdate : Any? , mode : Int ): Boolean {
196
+ // Returns final state that was created and updated to
197
+ private fun tryFinalizeFinishingState (state : Finishing , proposedUpdate : Any? ): Any? {
197
198
/*
198
199
* Note: proposed state can be Incomplete, e.g.
199
200
* async {
@@ -234,8 +235,8 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
234
235
// Then CAS to completed state -> it must succeed
235
236
require(_state .compareAndSet(state, finalState.boxIncomplete())) { " Unexpected state: ${_state .value} , expected: $state , update: $finalState " }
236
237
// And process all post-completion actions
237
- completeStateFinalization(state, finalState, mode )
238
- return true
238
+ completeStateFinalization(state, finalState)
239
+ return finalState
239
240
}
240
241
241
242
private fun getFinalRootCause (state : Finishing , exceptions : List <Throwable >): Throwable ? {
@@ -268,18 +269,19 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
268
269
}
269
270
270
271
// fast-path method to finalize normally completed coroutines without children
271
- private fun tryFinalizeSimpleState (state : Incomplete , update : Any? , mode : Int ): Boolean {
272
+ // returns true if complete, and afterCompletionInternal(update, mode) shall be called
273
+ private fun tryFinalizeSimpleState (state : Incomplete , update : Any? ): Boolean {
272
274
assert { state is Empty || state is JobNode <* > } // only simple state without lists where children can concurrently add
273
275
assert { update !is CompletedExceptionally } // only for normal completion
274
276
if (! _state .compareAndSet(state, update.boxIncomplete())) return false
275
277
onCancelling(null ) // simple state is not a failure
276
278
onCompletionInternal(update)
277
- completeStateFinalization(state, update, mode )
279
+ completeStateFinalization(state, update)
278
280
return true
279
281
}
280
282
281
283
// suppressed == true when any exceptions were suppressed while building the final completion cause
282
- private fun completeStateFinalization (state : Incomplete , update : Any? , mode : Int ) {
284
+ private fun completeStateFinalization (state : Incomplete , update : Any? ) {
283
285
/*
284
286
* Now the job in THE FINAL state. We need to properly handle the resulting state.
285
287
* Order of various invocations here is important.
@@ -304,11 +306,6 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
304
306
} else {
305
307
state.list?.notifyCompletion(cause)
306
308
}
307
- /*
308
- * 3) Resumes the rest of the code in scoped coroutines
309
- * (runBlocking, coroutineScope, withContext, withTimeout, etc)
310
- */
311
- afterCompletionInternal(update, mode)
312
309
}
313
310
314
311
private fun notifyCancelling (list : NodeList , cause : Throwable ) {
@@ -641,28 +638,41 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
641
638
// cause is Throwable or ParentJob when cancelChild was invoked
642
639
// returns true is exception was handled, false otherwise
643
640
internal fun cancelImpl (cause : Any? ): Boolean {
641
+ var finalState: Any? = COMPLETING_ALREADY
644
642
if (onCancelComplete) {
645
- // make sure it is completing, if cancelMakeCompleting returns true it means it had make it
643
+ // make sure it is completing, if cancelMakeCompleting returns state it means it had make it
646
644
// completing and had recorded exception
647
- if (cancelMakeCompleting(cause)) return true
648
- // otherwise just record exception via makeCancelling below
645
+ finalState = cancelMakeCompleting(cause)
646
+ if (finalState == = COMPLETING_WAITING_CHILDREN ) return true
647
+ }
648
+ if (finalState == = COMPLETING_ALREADY ) {
649
+ finalState = makeCancelling(cause)
650
+ }
651
+ return when {
652
+ finalState == = COMPLETING_ALREADY -> true
653
+ finalState == = COMPLETING_WAITING_CHILDREN -> true
654
+ finalState == = TOO_LATE_TO_CANCEL -> false
655
+ else -> {
656
+ afterCompletionInternal(finalState, MODE_ATOMIC_DEFAULT )
657
+ true
658
+ }
649
659
}
650
- return makeCancelling(cause)
651
660
}
652
661
653
662
// cause is Throwable or ParentJob when cancelChild was invoked
654
- private fun cancelMakeCompleting (cause : Any? ): Boolean {
663
+ // It contains a loop and never returns COMPLETING_RETRY, can return
664
+ // COMPLETING_ALREADY -- if already completed/completing
665
+ // COMPLETING_WAITING_CHILDREN -- if started waiting for children
666
+ // final state -- when completed, for call to afterCompletionInternal
667
+ private fun cancelMakeCompleting (cause : Any? ): Any? {
655
668
loopOnState { state ->
656
669
if (state !is Incomplete || state is Finishing && state.isCompleting) {
657
- return false // already completed/completing, do not even propose update
670
+ // already completed/completing, do not even create exception to propose update
671
+ return COMPLETING_ALREADY
658
672
}
659
673
val proposedUpdate = CompletedExceptionally (createCauseException(cause))
660
- when (tryMakeCompleting(state, proposedUpdate, mode = MODE_ATOMIC_DEFAULT )) {
661
- COMPLETING_ALREADY_COMPLETING -> return false
662
- COMPLETING_COMPLETED , COMPLETING_WAITING_CHILDREN -> return true
663
- COMPLETING_RETRY -> return @loopOnState
664
- else -> error(" unexpected result" )
665
- }
674
+ val finalState = tryMakeCompleting(state, proposedUpdate)
675
+ if (finalState != = COMPLETING_RETRY ) return finalState
666
676
}
667
677
}
668
678
@@ -689,13 +699,18 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
689
699
690
700
// transitions to Cancelling state
691
701
// cause is Throwable or ParentJob when cancelChild was invoked
692
- private fun makeCancelling (cause : Any? ): Boolean {
702
+ // It contains a loop and never returns COMPLETING_RETRY, can return
703
+ // COMPLETING_ALREADY -- if already completing or successfully made cancelling, added exception
704
+ // COMPLETING_WAITING_CHILDREN -- if started waiting for children, added exception
705
+ // TOO_LATE_TO_CANCEL -- too late to cancel, did not add exception
706
+ // final state -- when completed, for call to afterCompletionInternal
707
+ private fun makeCancelling (cause : Any? ): Any? {
693
708
var causeExceptionCache: Throwable ? = null // lazily init result of createCauseException(cause)
694
709
loopOnState { state ->
695
710
when (state) {
696
711
is Finishing -> { // already finishing -- collect exceptions
697
712
val notifyRootCause = synchronized(state) {
698
- if (state.isSealed) return false // too late, already sealed -- cannot add exception nor mark cancelled
713
+ if (state.isSealed) return TOO_LATE_TO_CANCEL // already sealed -- cannot add exception nor mark cancelled
699
714
// add exception, do nothing is parent is cancelling child that is already being cancelled
700
715
val wasCancelling = state.isCancelling // will notify if was not cancelling
701
716
// Materialize missing exception if it is the first exception (otherwise -- don't)
@@ -707,25 +722,25 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
707
722
state.rootCause.takeIf { ! wasCancelling }
708
723
}
709
724
notifyRootCause?.let { notifyCancelling(state.list, it) }
710
- return true
725
+ return COMPLETING_ALREADY
711
726
}
712
727
is Incomplete -> {
713
728
// Not yet finishing -- try to make it cancelling
714
729
val causeException = causeExceptionCache ? : createCauseException(cause).also { causeExceptionCache = it }
715
730
if (state.isActive) {
716
731
// active state becomes cancelling
717
- if (tryMakeCancelling(state, causeException)) return true
732
+ if (tryMakeCancelling(state, causeException)) return COMPLETING_ALREADY
718
733
} else {
719
734
// non active state starts completing
720
- when ( tryMakeCompleting(state, CompletedExceptionally (causeException), mode = MODE_ATOMIC_DEFAULT )) {
721
- COMPLETING_ALREADY_COMPLETING -> error( " Cannot happen in $state " )
722
- COMPLETING_COMPLETED , COMPLETING_WAITING_CHILDREN -> return true // ok
723
- COMPLETING_RETRY -> return @loopOnState
724
- else -> error( " unexpected result " )
735
+ val finalState = tryMakeCompleting(state, CompletedExceptionally (causeException))
736
+ when {
737
+ finalState == = COMPLETING_ALREADY -> error( " Cannot happen in $state " )
738
+ finalState == = COMPLETING_RETRY -> return @loopOnState
739
+ else -> return finalState
725
740
}
726
741
}
727
742
}
728
- else -> return false // already complete
743
+ else -> return TOO_LATE_TO_CANCEL // already complete
729
744
}
730
745
}
731
746
}
@@ -759,60 +774,78 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
759
774
}
760
775
761
776
/* *
762
- * This function is used by [CompletableDeferred.complete] (and exceptionally) and by [JobImpl.cancel].
763
- * It returns `false` on repeated invocation (when this job is already completing).
764
- *
765
- * @suppress **This is unstable API and it is subject to change.**
777
+ * Completes this job. Used by [CompletableDeferred.complete] (and exceptionally)
778
+ * and by [JobImpl.cancel]. It returns `false` on repeated invocation
779
+ * (when this job is already completing).
766
780
*/
767
- internal fun makeCompleting (proposedUpdate : Any? ): Boolean = loopOnState { state ->
768
- when (tryMakeCompleting(state, proposedUpdate, mode = MODE_ATOMIC_DEFAULT )) {
769
- COMPLETING_ALREADY_COMPLETING -> return false
770
- COMPLETING_COMPLETED , COMPLETING_WAITING_CHILDREN -> return true
771
- COMPLETING_RETRY -> return @loopOnState
772
- else -> error(" unexpected result" )
781
+ internal fun makeCompleting (proposedUpdate : Any? ): Boolean {
782
+ loopOnState { state ->
783
+ val finalState = tryMakeCompleting(state, proposedUpdate)
784
+ when {
785
+ finalState == = COMPLETING_ALREADY -> return false
786
+ finalState == = COMPLETING_WAITING_CHILDREN -> return true
787
+ finalState == = COMPLETING_RETRY -> return @loopOnState
788
+ else -> {
789
+ afterCompletionInternal(finalState, MODE_ATOMIC_DEFAULT )
790
+ return true
791
+ }
792
+ }
773
793
}
774
- }
794
+ }
775
795
776
796
/* *
777
- * This function is used by [AbstractCoroutine.resume].
778
- * It throws exception on repeated invocation (when this job is already completing).
779
- *
797
+ * Completes this job. Used by [AbstractCoroutine.resume].
798
+ * It throws [IllegalStateException] on repeated invocation (when this job is already completing).
780
799
* Returns:
781
- * * `true` if state was updated to completed/cancelled;
782
- * * `false` if made completing or it is cancelling and is waiting for children.
783
- *
784
- * @throws IllegalStateException if job is already complete or completing
785
- * @suppress **This is unstable API and it is subject to change.**
800
+ * * [COMPLETING_WAITING_CHILDREN] if started waiting for children.
801
+ * * Final state otherwise (caller should do [afterCompletionInternal])
786
802
*/
787
- internal fun makeCompletingOnce (proposedUpdate : Any? , mode : Int ): Boolean = loopOnState { state ->
788
- when (tryMakeCompleting(state, proposedUpdate, mode)) {
789
- COMPLETING_ALREADY_COMPLETING -> throw IllegalStateException (" Job $this is already complete or completing, " +
790
- " but is being completed with $proposedUpdate " , proposedUpdate.exceptionOrNull)
791
- COMPLETING_COMPLETED -> return true
792
- COMPLETING_WAITING_CHILDREN -> return false
793
- COMPLETING_RETRY -> return @loopOnState
794
- else -> error(" unexpected result" )
803
+ internal fun makeCompletingOnce (proposedUpdate : Any? ): Any? {
804
+ loopOnState { state ->
805
+ val finalState = tryMakeCompleting(state, proposedUpdate)
806
+ when {
807
+ finalState == = COMPLETING_ALREADY ->
808
+ throw IllegalStateException (
809
+ " Job $this is already complete or completing, " +
810
+ " but is being completed with $proposedUpdate " , proposedUpdate.exceptionOrNull
811
+ )
812
+ finalState == = COMPLETING_RETRY -> return @loopOnState
813
+ else -> return finalState // COMPLETING_WAITING_CHILDREN or final state
814
+ }
795
815
}
796
816
}
797
817
798
- private fun tryMakeCompleting (state : Any? , proposedUpdate : Any? , mode : Int ): Int {
818
+ // Returns one of COMPLETING symbols or final state:
819
+ // COMPLETING_ALREADY -- when already complete or completing
820
+ // COMPLETING_RETRY -- when need to retry due to interference
821
+ // COMPLETING_WAITING_CHILDREN -- when made completing and is waiting for children
822
+ // final state -- when completed, for call to afterCompletionInternal
823
+ private fun tryMakeCompleting (state : Any? , proposedUpdate : Any? ): Any? {
799
824
if (state !is Incomplete )
800
- return COMPLETING_ALREADY_COMPLETING
825
+ return COMPLETING_ALREADY
801
826
/*
802
827
* FAST PATH -- no children to wait for && simple state (no list) && not cancelling => can complete immediately
803
828
* Cancellation (failures) always have to go through Finishing state to serialize exception handling.
804
829
* Otherwise, there can be a race between (completed state -> handled exception and newly attached child/join)
805
830
* which may miss unhandled exception.
806
831
*/
807
832
if ((state is Empty || state is JobNode <* >) && state !is ChildHandleNode && proposedUpdate !is CompletedExceptionally ) {
808
- if (! tryFinalizeSimpleState(state, proposedUpdate, mode)) return COMPLETING_RETRY
809
- return COMPLETING_COMPLETED
833
+ if (tryFinalizeSimpleState(state, proposedUpdate)) {
834
+ // Completed successfully on fast path -- return updated state
835
+ return proposedUpdate
836
+ }
837
+ return COMPLETING_RETRY
810
838
}
811
839
// The separate slow-path function to simplify profiling
812
- return tryMakeCompletingSlowPath(state, proposedUpdate, mode )
840
+ return tryMakeCompletingSlowPath(state, proposedUpdate)
813
841
}
814
842
815
- private fun tryMakeCompletingSlowPath (state : Incomplete , proposedUpdate : Any? , mode : Int ): Int {
843
+ // Returns one of COMPLETING symbols or final state:
844
+ // COMPLETING_ALREADY -- when already complete or completing
845
+ // COMPLETING_RETRY -- when need to retry due to interference
846
+ // COMPLETING_WAITING_CHILDREN -- when made completing and is waiting for children
847
+ // final state -- when completed, for call to afterCompletionInternal
848
+ private fun tryMakeCompletingSlowPath (state : Incomplete , proposedUpdate : Any? ): Any? {
816
849
// get state's list or else promote to list to correctly operate on child lists
817
850
val list = getOrPromoteCancellingList(state) ? : return COMPLETING_RETRY
818
851
// promote to Finishing state if we are not in it yet
@@ -823,7 +856,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
823
856
var notifyRootCause: Throwable ? = null
824
857
synchronized(finishing) {
825
858
// check if this state is already completing
826
- if (finishing.isCompleting) return COMPLETING_ALREADY_COMPLETING
859
+ if (finishing.isCompleting) return COMPLETING_ALREADY
827
860
// mark as completing
828
861
finishing.isCompleting = true
829
862
// if we need to promote to finishing then atomically do it here.
@@ -847,10 +880,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
847
880
if (child != null && tryWaitForChild(finishing, child, proposedUpdate))
848
881
return COMPLETING_WAITING_CHILDREN
849
882
// otherwise -- we have not children left (all were already cancelled?)
850
- if (tryFinalizeFinishingState(finishing, proposedUpdate, mode))
851
- return COMPLETING_COMPLETED
852
- // otherwise retry
853
- return COMPLETING_RETRY
883
+ return tryFinalizeFinishingState(finishing, proposedUpdate)
854
884
}
855
885
856
886
private val Any? .exceptionOrNull: Throwable ?
@@ -879,7 +909,8 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
879
909
// try wait for next child
880
910
if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child
881
911
// no more children to wait -- try update state
882
- if (tryFinalizeFinishingState(state, proposedUpdate, MODE_ATOMIC_DEFAULT )) return
912
+ val finalState = tryFinalizeFinishingState(state, proposedUpdate)
913
+ afterCompletionInternal(finalState, MODE_ATOMIC_DEFAULT )
883
914
}
884
915
885
916
private fun LockFreeLinkedListNode.nextChild (): ChildHandleNode ? {
@@ -1233,10 +1264,15 @@ internal fun Any?.unboxState(): Any? = (this as? IncompleteStateBox)?.state ?: t
1233
1264
1234
1265
// --------------- helper classes & constants for job implementation
1235
1266
1236
- private const val COMPLETING_ALREADY_COMPLETING = 0
1237
- private const val COMPLETING_COMPLETED = 1
1238
- private const val COMPLETING_WAITING_CHILDREN = 2
1239
- private const val COMPLETING_RETRY = 3
1267
+ @SharedImmutable
1268
+ private val COMPLETING_ALREADY = Symbol (" COMPLETING_ALREADY" )
1269
+ @JvmField
1270
+ @SharedImmutable
1271
+ internal val COMPLETING_WAITING_CHILDREN = Symbol (" COMPLETING_WAITING_CHILDREN" )
1272
+ @SharedImmutable
1273
+ private val COMPLETING_RETRY = Symbol (" COMPLETING_RETRY" )
1274
+ @SharedImmutable
1275
+ private val TOO_LATE_TO_CANCEL = Symbol (" TOO_LATE_TO_CANCEL" )
1240
1276
1241
1277
private const val RETRY = - 1
1242
1278
private const val FALSE = 0
0 commit comments