Skip to content

Commit dd5c9ca

Browse files
committed
~ a different approach that actually works on JS, simplifies code
1 parent af95e9f commit dd5c9ca

File tree

6 files changed

+56
-65
lines changed

6 files changed

+56
-65
lines changed

kotlinx-coroutines-core/common/src/Await.kt

+2-3
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package kotlinx.coroutines
66

77
import kotlinx.atomicfu.*
8-
import kotlinx.coroutines.channels.*
98
import kotlin.coroutines.*
109

1110
/**
@@ -75,7 +74,7 @@ private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {
7574
val nodes = Array(deferreds.size) { i ->
7675
val deferred = deferreds[i]
7776
deferred.start() // To properly await lazily started deferreds
78-
AwaitAllNode(cont, deferred).apply {
77+
AwaitAllNode(cont).apply {
7978
handle = deferred.invokeOnCompletion(asHandler)
8079
}
8180
}
@@ -101,7 +100,7 @@ private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {
101100
override fun toString(): String = "DisposeHandlersOnCancel[$nodes]"
102101
}
103102

104-
private inner class AwaitAllNode(private val continuation: CancellableContinuation<List<T>>, job: Job) : JobNode<Job>(job) {
103+
private inner class AwaitAllNode(private val continuation: CancellableContinuation<List<T>>) : JobNode() {
105104
lateinit var handle: DisposableHandle
106105

107106
private val _disposer = atomic<DisposeHandlersOnCancel?>(null)

kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ internal open class CancellableContinuationImpl<in T>(
129129
val parent = delegate.context[Job] ?: return // fast path 3 -- don't do anything without parent
130130
val handle = parent.invokeOnCompletion(
131131
onCancelling = true,
132-
handler = ChildContinuation(parent, this).asHandler
132+
handler = ChildContinuation(this).asHandler
133133
)
134134
parentHandle = handle
135135
// now check our state _after_ registering (could have completed while we were registering)

kotlinx-coroutines-core/common/src/Job.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ public interface ChildHandle : DisposableHandle {
490490
* ```
491491
*/
492492
internal fun Job.disposeOnCompletion(handle: DisposableHandle): DisposableHandle =
493-
invokeOnCompletion(handler = DisposeOnCompletion(this, handle).asHandler)
493+
invokeOnCompletion(handler = DisposeOnCompletion(handle).asHandler)
494494

495495
/**
496496
* Cancels the job and suspends the invoking coroutine until the cancelled job is complete.

kotlinx-coroutines-core/common/src/JobSupport.kt

+47-54
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
287287
// fast-path method to finalize normally completed coroutines without children
288288
// returns true if complete, and afterCompletion(update) shall be called
289289
private fun tryFinalizeSimpleState(state: Incomplete, update: Any?): Boolean {
290-
assert { state is Empty || state is JobNode<*> } // only simple state without lists where children can concurrently add
290+
assert { state is Empty || state is JobNode } // only simple state without lists where children can concurrently add
291291
assert { update !is CompletedExceptionally } // only for normal completion
292292
if (!_state.compareAndSet(state, update.boxIncomplete())) return false
293293
onCancelling(null) // simple state is not a failure
@@ -313,7 +313,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
313313
* 2) Invoke completion handlers: .join(), callbacks etc.
314314
* It's important to invoke them only AFTER exception handling and everything else, see #208
315315
*/
316-
if (state is JobNode<*>) { // SINGLE/SINGLE+ state -- one completion handler (common case)
316+
if (state is JobNode) { // SINGLE/SINGLE+ state -- one completion handler (common case)
317317
try {
318318
state.invoke(cause)
319319
} catch (ex: Throwable) {
@@ -327,7 +327,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
327327
private fun notifyCancelling(list: NodeList, cause: Throwable) {
328328
// first cancel our own children
329329
onCancelling(cause)
330-
notifyHandlers<JobCancellingNode<*>>(list, cause)
330+
notifyHandlers<JobCancellingNode>(list, cause)
331331
// then cancel parent
332332
cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
333333
}
@@ -359,9 +359,9 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
359359
}
360360

361361
private fun NodeList.notifyCompletion(cause: Throwable?) =
362-
notifyHandlers<JobNode<*>>(this, cause)
362+
notifyHandlers<JobNode>(this, cause)
363363

364-
private inline fun <reified T: JobNode<*>> notifyHandlers(list: NodeList, cause: Throwable?) {
364+
private inline fun <reified T: JobNode> notifyHandlers(list: NodeList, cause: Throwable?) {
365365
var exception: Throwable? = null
366366
list.forEach<T> { node ->
367367
try {
@@ -453,21 +453,22 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
453453
invokeImmediately: Boolean,
454454
handler: CompletionHandler
455455
): DisposableHandle {
456-
var nodeCache: JobNode<*>? = null
456+
// Create node upfront -- for common cases it just initializes JobNode.job field,
457+
// for user-defined handlers it allocates a JobNode object that we might not need, but this is Ok.
458+
val node: JobNode = makeNode(handler, onCancelling)
457459
loopOnState { state ->
458460
when (state) {
459461
is Empty -> { // EMPTY_X state -- no completion handlers
460462
if (state.isActive) {
461463
// try move to SINGLE state
462-
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
463464
if (_state.compareAndSet(state, node)) return node
464465
} else
465466
promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
466467
}
467468
is Incomplete -> {
468469
val list = state.list
469470
if (list == null) { // SINGLE/SINGLE+
470-
promoteSingleToNodeList(state as JobNode<*>)
471+
promoteSingleToNodeList(state as JobNode)
471472
} else {
472473
var rootCause: Throwable? = null
473474
var handle: DisposableHandle = NonDisposableHandle
@@ -479,7 +480,6 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
479480
// or we are adding a child to a coroutine that is not completing yet
480481
if (rootCause == null || handler.isHandlerOf<ChildHandleNode>() && !state.isCompleting) {
481482
// Note: add node the list while holding lock on state (make sure it cannot change)
482-
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
483483
if (!addLastAtomic(state, list, node)) return@loopOnState // retry
484484
// just return node if we don't have to invoke handler (not cancelling yet)
485485
if (rootCause == null) return node
@@ -493,7 +493,6 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
493493
if (invokeImmediately) handler.invokeIt(rootCause)
494494
return handle
495495
} else {
496-
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
497496
if (addLastAtomic(state, list, node)) return node
498497
}
499498
}
@@ -508,19 +507,20 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
508507
}
509508
}
510509

511-
private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode<*> =
512-
if (onCancelling) {
513-
(handler as? JobCancellingNode<*>)
514-
?.takeIf { it.job === this }
515-
?: InvokeOnCancelling(this, handler)
510+
private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode {
511+
val node = if (onCancelling) {
512+
(handler as? JobCancellingNode)
513+
?: InvokeOnCancelling(handler)
516514
} else {
517-
(handler as? JobNode<*>)
515+
(handler as? JobNode)
518516
?.also { assert { it !is JobCancellingNode } }
519-
?.takeIf { it.job === this }
520-
?: InvokeOnCompletion(this, handler)
517+
?: InvokeOnCompletion(handler)
521518
}
519+
node.job = this
520+
return node
521+
}
522522

523-
private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode<*>) =
523+
private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode) =
524524
list.addLastIf(node) { this.state === expect }
525525

526526
private fun promoteEmptyToNodeList(state: Empty) {
@@ -530,7 +530,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
530530
_state.compareAndSet(state, update)
531531
}
532532

533-
private fun promoteSingleToNodeList(state: JobNode<*>) {
533+
private fun promoteSingleToNodeList(state: JobNode) {
534534
// try to promote it to list (SINGLE+ state)
535535
state.addOneIfEmpty(NodeList())
536536
// it must be in SINGLE+ state or state has changed (node could have need removed from state)
@@ -556,7 +556,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
556556

557557
private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
558558
// We have to invoke join() handler only on cancellation, on completion we will be resumed regularly without handlers
559-
cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(this, cont).asHandler))
559+
cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(cont).asHandler))
560560
}
561561

562562
public final override val onJoin: SelectClause0
@@ -576,7 +576,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
576576
}
577577
if (startInternal(state) == 0) {
578578
// slow-path -- register waiter for completion
579-
select.disposeOnSelect(invokeOnCompletion(handler = SelectJoinOnCompletion(this, select, block).asHandler))
579+
select.disposeOnSelect(invokeOnCompletion(handler = SelectJoinOnCompletion(select, block).asHandler))
580580
return
581581
}
582582
}
@@ -585,11 +585,11 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
585585
/**
586586
* @suppress **This is unstable API and it is subject to change.**
587587
*/
588-
internal fun removeNode(node: JobNode<*>) {
588+
internal fun removeNode(node: JobNode) {
589589
// remove logic depends on the state of the job
590590
loopOnState { state ->
591591
when (state) {
592-
is JobNode<*> -> { // SINGE/SINGLE+ state -- one completion handler
592+
is JobNode -> { // SINGE/SINGLE+ state -- one completion handler
593593
if (state !== node) return // a different job node --> we were already removed
594594
// try remove and revert back to empty state
595595
if (_state.compareAndSet(state, EMPTY_ACTIVE)) return
@@ -773,7 +773,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
773773
private fun getOrPromoteCancellingList(state: Incomplete): NodeList? = state.list ?:
774774
when (state) {
775775
is Empty -> NodeList() // we can allocate new empty list that'll get integrated into Cancelling state
776-
is JobNode<*> -> {
776+
is JobNode -> {
777777
// SINGLE/SINGLE+ must be promoted to NodeList first, because otherwise we cannot
778778
// correctly capture a reference to it
779779
promoteSingleToNodeList(state)
@@ -852,7 +852,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
852852
* Otherwise, there can be a race between (completed state -> handled exception and newly attached child/join)
853853
* which may miss unhandled exception.
854854
*/
855-
if ((state is Empty || state is JobNode<*>) && state !is ChildHandleNode && proposedUpdate !is CompletedExceptionally) {
855+
if ((state is Empty || state is JobNode) && state !is ChildHandleNode && proposedUpdate !is CompletedExceptionally) {
856856
if (tryFinalizeSimpleState(state, proposedUpdate)) {
857857
// Completed successfully on fast path -- return updated state
858858
return proposedUpdate
@@ -967,7 +967,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
967967
* If child is attached when the job is already being cancelled, such child will receive immediate notification on
968968
* cancellation, but parent *will* wait for that child before completion and will handle its exception.
969969
*/
970-
return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(this, child).asHandler) as ChildHandle
970+
return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(child).asHandler) as ChildHandle
971971
}
972972

973973
/**
@@ -1150,7 +1150,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
11501150
private val state: Finishing,
11511151
private val child: ChildHandleNode,
11521152
private val proposedUpdate: Any?
1153-
) : JobNode<Job>(child.childJob) {
1153+
) : JobNode() {
11541154
override fun invoke(cause: Throwable?) {
11551155
parent.continueCompleting(state, child, proposedUpdate)
11561156
}
@@ -1228,7 +1228,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
12281228
* thrown and not a JobCancellationException.
12291229
*/
12301230
val cont = AwaitContinuation(uCont.intercepted(), this)
1231-
cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(this, cont).asHandler))
1231+
cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(cont).asHandler))
12321232
cont.getResult()
12331233
}
12341234

@@ -1255,7 +1255,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
12551255
}
12561256
if (startInternal(state) == 0) {
12571257
// slow-path -- register waiter for completion
1258-
select.disposeOnSelect(invokeOnCompletion(handler = SelectAwaitOnCompletion(this, select, block).asHandler))
1258+
select.disposeOnSelect(invokeOnCompletion(handler = SelectAwaitOnCompletion(select, block).asHandler))
12591259
return
12601260
}
12611261
}
@@ -1345,12 +1345,14 @@ internal interface Incomplete {
13451345
val list: NodeList? // is null only for Empty and JobNode incomplete state objects
13461346
}
13471347

1348-
internal abstract class JobNode<out J : Job>(
1349-
@JvmField val job: J
1350-
) : CompletionHandlerBase(), DisposableHandle, Incomplete {
1348+
internal abstract class JobNode : CompletionHandlerBase(), DisposableHandle, Incomplete {
1349+
/**
1350+
* Initialized by [JobSupport.makeNode].
1351+
*/
1352+
lateinit var job: JobSupport
13511353
override val isActive: Boolean get() = true
13521354
override val list: NodeList? get() = null
1353-
override fun dispose() = (job as JobSupport).removeNode(this)
1355+
override fun dispose() = job.removeNode(this)
13541356
override fun toString() = "$classSimpleName@$hexAddress[job@${job.hexAddress}]"
13551357
}
13561358

@@ -1363,7 +1365,7 @@ internal class NodeList : LockFreeLinkedListHead(), Incomplete {
13631365
append(state)
13641366
append("}[")
13651367
var first = true
1366-
this@NodeList.forEach<JobNode<*>> { node ->
1368+
this@NodeList.forEach<JobNode> { node ->
13671369
if (first) first = false else append(", ")
13681370
append(node)
13691371
}
@@ -1382,23 +1384,20 @@ internal class InactiveNodeList(
13821384
}
13831385

13841386
private class InvokeOnCompletion(
1385-
job: Job,
13861387
private val handler: CompletionHandler
1387-
) : JobNode<Job>(job) {
1388+
) : JobNode() {
13881389
override fun invoke(cause: Throwable?) = handler.invoke(cause)
13891390
}
13901391

13911392
private class ResumeOnCompletion(
1392-
job: Job,
13931393
private val continuation: Continuation<Unit>
1394-
) : JobNode<Job>(job) {
1394+
) : JobNode() {
13951395
override fun invoke(cause: Throwable?) = continuation.resume(Unit)
13961396
}
13971397

13981398
private class ResumeAwaitOnCompletion<T>(
1399-
job: JobSupport,
14001399
private val continuation: CancellableContinuationImpl<T>
1401-
) : JobNode<JobSupport>(job) {
1400+
) : JobNode() {
14021401
override fun invoke(cause: Throwable?) {
14031402
val state = job.state
14041403
assert { state !is Incomplete }
@@ -1414,28 +1413,25 @@ private class ResumeAwaitOnCompletion<T>(
14141413
}
14151414

14161415
internal class DisposeOnCompletion(
1417-
job: Job,
14181416
private val handle: DisposableHandle
1419-
) : JobNode<Job>(job) {
1417+
) : JobNode() {
14201418
override fun invoke(cause: Throwable?) = handle.dispose()
14211419
}
14221420

14231421
private class SelectJoinOnCompletion<R>(
1424-
job: JobSupport,
14251422
private val select: SelectInstance<R>,
14261423
private val block: suspend () -> R
1427-
) : JobNode<JobSupport>(job) {
1424+
) : JobNode() {
14281425
override fun invoke(cause: Throwable?) {
14291426
if (select.trySelect())
14301427
block.startCoroutineCancellable(select.completion)
14311428
}
14321429
}
14331430

14341431
private class SelectAwaitOnCompletion<T, R>(
1435-
job: JobSupport,
14361432
private val select: SelectInstance<R>,
14371433
private val block: suspend (T) -> R
1438-
) : JobNode<JobSupport>(job) {
1434+
) : JobNode() {
14391435
override fun invoke(cause: Throwable?) {
14401436
if (select.trySelect())
14411437
job.selectAwaitCompletion(select, block)
@@ -1448,12 +1444,11 @@ private class SelectAwaitOnCompletion<T, R>(
14481444
* Marker for node that shall be invoked on in _cancelling_ state.
14491445
* **Note: may be invoked multiple times.**
14501446
*/
1451-
internal abstract class JobCancellingNode<out J : Job>(job: J) : JobNode<J>(job)
1447+
internal abstract class JobCancellingNode : JobNode()
14521448

14531449
private class InvokeOnCancelling(
1454-
job: Job,
14551450
private val handler: CompletionHandler
1456-
) : JobCancellingNode<Job>(job) {
1451+
) : JobCancellingNode() {
14571452
// delegate handler shall be invoked at most once, so here is an additional flag
14581453
private val _invoked = atomic(0) // todo: replace with atomic boolean after migration to recent atomicFu
14591454
override fun invoke(cause: Throwable?) {
@@ -1462,18 +1457,16 @@ private class InvokeOnCancelling(
14621457
}
14631458

14641459
internal class ChildHandleNode(
1465-
parent: JobSupport,
14661460
@JvmField val childJob: ChildJob
1467-
) : JobCancellingNode<JobSupport>(parent), ChildHandle {
1461+
) : JobCancellingNode(), ChildHandle {
14681462
override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
14691463
override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
14701464
}
14711465

14721466
// Same as ChildHandleNode, but for cancellable continuation
14731467
internal class ChildContinuation(
1474-
parent: Job,
14751468
@JvmField val child: CancellableContinuationImpl<*>
1476-
) : JobCancellingNode<Job>(parent) {
1469+
) : JobCancellingNode() {
14771470
override fun invoke(cause: Throwable?) {
14781471
child.parentCancelled(child.getContinuationCancellationCause(job))
14791472
}

kotlinx-coroutines-core/common/src/selects/Select.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -327,13 +327,13 @@ internal class SelectBuilderImpl<in R>(
327327
private fun initCancellability() {
328328
val parent = context[Job] ?: return
329329
val newRegistration = parent.invokeOnCompletion(
330-
onCancelling = true, handler = SelectOnCancelling(parent).asHandler)
330+
onCancelling = true, handler = SelectOnCancelling().asHandler)
331331
parentHandle = newRegistration
332332
// now check our state _after_ registering
333333
if (isSelected) newRegistration.dispose()
334334
}
335335

336-
private inner class SelectOnCancelling(job: Job) : JobCancellingNode<Job>(job) {
336+
private inner class SelectOnCancelling : JobCancellingNode() {
337337
// Note: may be invoked multiple times, but only the first trySelect succeeds anyway
338338
override fun invoke(cause: Throwable?) {
339339
if (trySelect())
@@ -552,7 +552,7 @@ internal class SelectBuilderImpl<in R>(
552552
return decision
553553
}
554554

555-
override val atomicOp: AtomicOp<*>?
555+
override val atomicOp: AtomicOp<*>
556556
get() = otherOp.atomicOp
557557
}
558558

0 commit comments

Comments
 (0)