@@ -65,12 +65,12 @@ private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {
65
65
val deferred = deferreds[i]
66
66
deferred.start() // To properly await lazily started deferreds
67
67
AwaitAllNode (cont, deferred).apply {
68
- handle = deferred.invokeOnCompletion(asHandler)
68
+ setHandle( deferred.invokeOnCompletion(asHandler) )
69
69
}
70
70
}
71
71
val disposer = DisposeHandlersOnCancel (nodes)
72
72
// Step 2: Set disposer to each node
73
- nodes.forEach { it.disposer = disposer }
73
+ nodes.forEach { it.setDisposer( disposer) }
74
74
// Here we know that if any code the nodes complete, it will dispose the rest
75
75
// Step 3: Now we can check if continuation is complete
76
76
if (cont.isCompleted) {
@@ -83,34 +83,41 @@ private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {
83
83
84
84
private inner class DisposeHandlersOnCancel (private val nodes : Array <AwaitAllNode >) : CancelHandler() {
85
85
fun disposeAll () {
86
- nodes.forEach { it.handle.dispose () }
86
+ nodes.forEach { it.disposeHandle () }
87
87
}
88
88
89
89
override fun invoke (cause : Throwable ? ) { disposeAll() }
90
90
override fun toString (): String = " DisposeHandlersOnCancel[$nodes ]"
91
91
}
92
92
93
93
private inner class AwaitAllNode (private val continuation : CancellableContinuation <List <T >>, job : Job ) : JobNode<Job>(job) {
94
- lateinit var handle: DisposableHandle
95
-
94
+ private val _handle = atomic<DisposableHandle ?>(null )
96
95
private val _disposer = atomic<DisposeHandlersOnCancel ?>(null )
97
- var disposer: DisposeHandlersOnCancel ?
98
- get() = _disposer .value
99
- set(value) { _disposer .value = value }
100
-
96
+
97
+ fun setHandle (handle : DisposableHandle ) { _handle .value = handle }
98
+ fun setDisposer (disposer : DisposeHandlersOnCancel ) { _disposer .value = disposer }
99
+
100
+ fun disposeHandle () {
101
+ _handle .value?.dispose()
102
+ _handle .value = null
103
+ }
104
+
101
105
override fun invoke (cause : Throwable ? ) {
102
106
if (cause != null ) {
103
107
val token = continuation.tryResumeWithException(cause)
104
108
if (token != null ) {
105
109
continuation.completeResume(token)
106
110
// volatile read of disposer AFTER continuation is complete
107
111
// and if disposer was already set (all handlers where already installed, then dispose them all)
108
- disposer ?.disposeAll()
112
+ _disposer .value ?.disposeAll()
109
113
}
110
114
} else if (notCompletedCount.decrementAndGet() == 0 ) {
111
115
continuation.resume(deferreds.map { it.getCompleted() })
112
116
// Note that all deferreds are complete here, so we don't need to dispose their nodes
113
117
}
118
+ // Release all the refs for Kotlin/Native
119
+ _handle .value = null
120
+ _disposer .value = null
114
121
}
115
122
}
116
123
}
0 commit comments