Skip to content

Commit f415656

Browse files
committed
Fix native-mt awaitAll
Fixes #2025
1 parent c90afac commit f415656

File tree

2 files changed

+30
-10
lines changed

2 files changed

+30
-10
lines changed

kotlinx-coroutines-core/common/src/Await.kt

+17-10
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,12 @@ private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {
6565
val deferred = deferreds[i]
6666
deferred.start() // To properly await lazily started deferreds
6767
AwaitAllNode(cont, deferred).apply {
68-
handle = deferred.invokeOnCompletion(asHandler)
68+
setHandle(deferred.invokeOnCompletion(asHandler))
6969
}
7070
}
7171
val disposer = DisposeHandlersOnCancel(nodes)
7272
// Step 2: Set disposer to each node
73-
nodes.forEach { it.disposer = disposer }
73+
nodes.forEach { it.setDisposer(disposer) }
7474
// Here we know that if any code the nodes complete, it will dispose the rest
7575
// Step 3: Now we can check if continuation is complete
7676
if (cont.isCompleted) {
@@ -83,34 +83,41 @@ private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {
8383

8484
private inner class DisposeHandlersOnCancel(private val nodes: Array<AwaitAllNode>) : CancelHandler() {
8585
fun disposeAll() {
86-
nodes.forEach { it.handle.dispose() }
86+
nodes.forEach { it.disposeHandle() }
8787
}
8888

8989
override fun invoke(cause: Throwable?) { disposeAll() }
9090
override fun toString(): String = "DisposeHandlersOnCancel[$nodes]"
9191
}
9292

9393
private inner class AwaitAllNode(private val continuation: CancellableContinuation<List<T>>, job: Job) : JobNode<Job>(job) {
94-
lateinit var handle: DisposableHandle
95-
94+
private val _handle = atomic<DisposableHandle?>(null)
9695
private val _disposer = atomic<DisposeHandlersOnCancel?>(null)
97-
var disposer: DisposeHandlersOnCancel?
98-
get() = _disposer.value
99-
set(value) { _disposer.value = value }
100-
96+
97+
fun setHandle(handle: DisposableHandle) { _handle.value = handle }
98+
fun setDisposer(disposer: DisposeHandlersOnCancel) { _disposer.value = disposer }
99+
100+
fun disposeHandle() {
101+
_handle.value?.dispose()
102+
_handle.value = null
103+
}
104+
101105
override fun invoke(cause: Throwable?) {
102106
if (cause != null) {
103107
val token = continuation.tryResumeWithException(cause)
104108
if (token != null) {
105109
continuation.completeResume(token)
106110
// volatile read of disposer AFTER continuation is complete
107111
// and if disposer was already set (all handlers where already installed, then dispose them all)
108-
disposer?.disposeAll()
112+
_disposer.value?.disposeAll()
109113
}
110114
} else if (notCompletedCount.decrementAndGet() == 0) {
111115
continuation.resume(deferreds.map { it.getCompleted() })
112116
// Note that all deferreds are complete here, so we don't need to dispose their nodes
113117
}
118+
// Release all the refs for Kotlin/Native
119+
_handle.value = null
120+
_disposer.value = null
114121
}
115122
}
116123
}

kotlinx-coroutines-core/native/test/WorkerDispatcherTest.kt

+13
Original file line numberDiff line numberDiff line change
@@ -313,5 +313,18 @@ class WorkerDispatcherTest : TestBase() {
313313
finish(6)
314314
}
315315

316+
@Test
317+
fun testAwaitAll() = runTest {
318+
expect(1)
319+
val d1 = async(dispatcher) {
320+
"A"
321+
}
322+
val d2 = async(dispatcher) {
323+
"B"
324+
}
325+
assertEquals("AB", awaitAll(d1, d2).joinToString(""))
326+
finish(2)
327+
}
328+
316329
private data class Data(val s: String)
317330
}

0 commit comments

Comments
 (0)