Skip to content

Commit e0b6db0

Browse files
committed
Leaking handles in awaitAll are fixed
1 parent 3e9f244 commit e0b6db0

File tree

2 files changed

+77
-8
lines changed
  • common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental
  • core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental

2 files changed

+77
-8
lines changed

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Await.kt

+36-8
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package kotlinx.coroutines.experimental
1818

1919
import kotlinx.atomicfu.atomic
20+
import kotlinx.coroutines.experimental.internalAnnotations.Volatile
2021

2122
/**
2223
* Awaits for completion of given deferred values without blocking a thread and resumes normally with the list of values
@@ -70,29 +71,56 @@ private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {
7071
private val notCompletedCount = atomic(deferreds.size)
7172

7273
suspend fun await(): List<T> = suspendCancellableCoroutine { cont ->
73-
val handlers = deferreds.map {
74-
it.start() // To properly await lazily started deferreds
75-
it.invokeOnCompletion(AwaitAllNode(cont, it).asHandler)
74+
// Intricate dance here
75+
// Step 1: Create nodes and install them as completion handlers, they may fire!
76+
val nodes = Array<AwaitAllNode>(deferreds.size) { i ->
77+
val deferred = deferreds[i]
78+
deferred.start() // To properly await lazily started deferreds
79+
AwaitAllNode(cont, deferred).apply {
80+
handle = deferred.invokeOnCompletion(asHandler)
81+
}
82+
}
83+
val disposer = DisposeHandlersOnCancel(nodes)
84+
// Step 2: Set disposer to each node
85+
nodes.forEach { it.disposer = disposer }
86+
// Here we know that if any code the nodes complete, it will dipsose the rest
87+
// Step 3: Now we can check if continuation is complete
88+
if (cont.isCompleted) {
89+
// it is already complete while handlers were being installed -- dispose them all
90+
disposer.disposeAll()
91+
} else {
92+
cont.invokeOnCancellation(handler = disposer.asHandler)
7693
}
77-
cont.invokeOnCancellation(handler = DisposeHandlersOnCancel(handlers).asHandler)
7894
}
7995

80-
private class DisposeHandlersOnCancel(private val handlers: List<DisposableHandle>) : CancelHandler() {
81-
override fun invoke(cause: Throwable?) {
82-
handlers.forEach { it.dispose() }
96+
private inner class DisposeHandlersOnCancel(private val nodes: Array<AwaitAllNode>) : CancelHandler() {
97+
fun disposeAll() {
98+
nodes.forEach { it.handle.dispose() }
8399
}
84-
override fun toString(): String = "DisposeHandlersOnCancel[$handlers]"
100+
101+
override fun invoke(cause: Throwable?) { disposeAll() }
102+
override fun toString(): String = "DisposeHandlersOnCancel[$nodes]"
85103
}
86104

87105
private inner class AwaitAllNode(private val continuation: CancellableContinuation<List<T>>, job: Job) : JobNode<Job>(job) {
106+
lateinit var handle: DisposableHandle
107+
108+
@Volatile
109+
var disposer: DisposeHandlersOnCancel? = null
110+
88111
override fun invoke(cause: Throwable?) {
89112
if (cause != null) {
90113
val token = continuation.tryResumeWithException(cause)
91114
if (token != null) {
92115
continuation.completeResume(token)
116+
// volatile read of disposer AFTER continuation is complete
117+
val disposer = this.disposer
118+
// and if disposer was already set (all handlers where already installed, then dispose them all)
119+
if (disposer != null) disposer.disposeAll()
93120
}
94121
} else if (notCompletedCount.decrementAndGet() == 0) {
95122
continuation.resume(deferreds.map { it.getCompleted() })
123+
// Note, that all deferreds are complete here, so we don't need to dispose their nodes
96124
}
97125
}
98126
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental
18+
19+
import org.junit.*
20+
21+
class AwaitJvmTest : TestBase() {
22+
@Test
23+
public fun testSecondLeak() = runTest {
24+
// This test is to make sure that handlers installed on the second deferred do not leak
25+
val d1 = CompletableDeferred<Int>()
26+
val d2 = CompletableDeferred<Int>()
27+
d1.completeExceptionally(TestException()) // first is crashed
28+
val iterations = 3_000_000 * stressTestMultiplier
29+
for (iter in 1..iterations) {
30+
try {
31+
awaitAll(d1, d2)
32+
expectUnreached()
33+
} catch (e: TestException) {
34+
expect(iter)
35+
}
36+
}
37+
finish(iterations + 1)
38+
}
39+
40+
private class TestException : Exception()
41+
}

0 commit comments

Comments
 (0)