Skip to content

Commit 0daffdb

Browse files
qwwdfsadelizarov
authored andcommitted
awaitAll improvements: create defensive copy of jobs to consistently handle concurrent mutations, make handler JobNode to avoid extra object allocation
1 parent c692477 commit 0daffdb

File tree

2 files changed

+50
-17
lines changed
  • core/kotlinx-coroutines-core/src

2 files changed

+50
-17
lines changed

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Await.kt

+17-14
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ public suspend fun awaitAll(vararg jobs: Job): Unit = jobs.asList().awaitAll()
2020
*/
2121
public suspend fun Collection<Job>.awaitAll() {
2222
if (isEmpty()) return
23-
AwaitAll(this).await()
23+
val snapshot = ArrayList(this)
24+
AwaitAll(snapshot).await()
2425
}
2526

2627
/**
@@ -43,21 +44,23 @@ private class AwaitAll(private val jobs: Collection<Job>) {
4344

4445
suspend fun await() {
4546
suspendCancellableCoroutine<Unit> { cont ->
46-
// todo: create a separate named class instance of JobNode to avoid extra object
47-
val handler: (Throwable?) -> Unit = {
48-
if (it != null) {
49-
val token = cont.tryResumeWithException(it)
50-
if (token != null) {
51-
cont.completeResume(token)
52-
}
53-
} else if (notCompletedCount.decrementAndGet() == 0) {
54-
cont.resume(Unit)
55-
}
56-
}
57-
5847
jobs.forEach {
5948
it.start() // To properly await lazily started jobs
60-
cont.disposeOnCompletion(it.invokeOnCompletion(handler))
49+
cont.disposeOnCompletion(it.invokeOnCompletion(AwaitAllNode(cont, it)))
50+
}
51+
}
52+
}
53+
54+
inner class AwaitAllNode(private val continuation: CancellableContinuation<Unit>, job: Job) : JobNode<Job>(job), CompletionHandler {
55+
56+
override fun invoke(cause: Throwable?) {
57+
if (cause != null) {
58+
val token = continuation.tryResumeWithException(cause)
59+
if (token != null) {
60+
continuation.completeResume(token)
61+
}
62+
} else if (notCompletedCount.decrementAndGet() == 0) {
63+
continuation.resume(Unit)
6164
}
6265
}
6366
}

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/AwaitStressTest.kt

+33-3
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package kotlinx.coroutines.experimental
22

3-
import org.junit.After
4-
import java.util.concurrent.CyclicBarrier
5-
import kotlin.test.Test
3+
import org.junit.*
4+
import org.junit.Test
5+
import java.util.concurrent.*
66

77
class AwaitStressTest : TestBase() {
88

@@ -100,4 +100,34 @@ class AwaitStressTest : TestBase() {
100100

101101
require(cancelledOnce) { "Cancellation exception wasn't properly caught" }
102102
}
103+
104+
@Test
105+
fun testMutatingCollection() = runTest {
106+
val barrier = CyclicBarrier(4)
107+
108+
repeat(iterations) {
109+
val jobs = mutableListOf<Job>()
110+
111+
jobs += async(pool) {
112+
barrier.await()
113+
1L
114+
}
115+
116+
jobs += async(pool) {
117+
barrier.await()
118+
2L
119+
}
120+
121+
jobs += async(pool) {
122+
barrier.await()
123+
jobs.removeAt(2)
124+
}
125+
126+
val allJobs = ArrayList(jobs)
127+
barrier.await()
128+
jobs.awaitAll() // shouldn't hang
129+
allJobs.awaitAll()
130+
barrier.reset()
131+
}
132+
}
103133
}

0 commit comments

Comments
 (0)