Skip to content

Commit 76146bb

Browse files
committed
Merge branch 'await-all' into develop
2 parents 0406a9b + b4c7b40 commit 76146bb

File tree

39 files changed

+1151
-181
lines changed

39 files changed

+1151
-181
lines changed

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/AbstractContinuation.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ internal abstract class AbstractContinuation<in T>(
7575
if (trySuspend()) return COROUTINE_SUSPENDED
7676
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
7777
val state = this.state
78-
if (state is CompletedExceptionally) throw state.exception
78+
if (state is CompletedExceptionally) throw state.cause
7979
return getSuccessfulResult(state)
8080
}
8181

@@ -99,8 +99,8 @@ internal abstract class AbstractContinuation<in T>(
9999
}
100100
is Cancelled -> {
101101
// Ignore resumes in cancelled coroutines, but handle exception if a different one here
102-
if (proposedUpdate is CompletedExceptionally && proposedUpdate.exception != state.exception)
103-
handleException(proposedUpdate.exception)
102+
if (proposedUpdate is CompletedExceptionally && proposedUpdate.cause != state.cause)
103+
handleException(proposedUpdate.cause)
104104
return
105105
}
106106
else -> error("Already resumed, but got $proposedUpdate")

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/AbstractCoroutine.kt

+6-3
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,11 @@ public abstract class AbstractCoroutine<in T>(
7676
* This function is invoked once when this coroutine is cancelled or is completed,
7777
* similarly to [invokeOnCompletion] with `onCancelling` set to `true`.
7878
*
79-
* @param cause the cause that was passed to [Job.cancel] function or `null` if coroutine was cancelled
80-
* without cause or is completing normally.
79+
* The meaning of [cause] parameter:
80+
* * Cause is `null` when job has completed normally.
81+
* * Cause is an instance of [CancellationException] when job was cancelled _normally_.
82+
* **It should not be treated as an error**. In particular, it should not be reported to error logs.
83+
* * Otherwise, the job had _failed_.
8184
*/
8285
protected open fun onCancellation(cause: Throwable?) {}
8386

@@ -98,7 +101,7 @@ public abstract class AbstractCoroutine<in T>(
98101
@Suppress("UNCHECKED_CAST")
99102
internal override fun onCompletionInternal(state: Any?, mode: Int) {
100103
if (state is CompletedExceptionally)
101-
onCompletedExceptionally(state.exception)
104+
onCompletedExceptionally(state.cause)
102105
else
103106
onCompleted(state as T)
104107
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental
18+
19+
import kotlinx.atomicfu.atomic
20+
21+
/**
22+
* Awaits for completion of given deferred values without blocking a thread and resumes normally with the list of values
23+
* when all deferred computations are complete or resumes with the first thrown exception if any of computations
24+
* complete exceptionally including cancellation.
25+
*
26+
* This function is **not** equivalent to `deferreds.map { it.await() }` which fails only when when it sequentially
27+
* gets to wait the failing deferred, while this `awaitAll` fails immediately as soon as any of the deferreds fail.
28+
*
29+
* This suspending function is cancellable.
30+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
31+
* this function immediately resumes with [CancellationException].
32+
*/
33+
public suspend fun <T> awaitAll(vararg deferreds: Deferred<T>): List<T> =
34+
if (deferreds.isEmpty()) emptyList() else AwaitAll(deferreds).await()
35+
36+
/**
37+
* Awaits for completion of given deferred values without blocking a thread and resumes normally with the list of values
38+
* when all deferred computations are complete or resumes with the first thrown exception if any of computations
39+
* complete exceptionally including cancellation.
40+
*
41+
* This function is **not** equivalent to `this.map { it.await() }` which fails only when when it sequentially
42+
* gets to wait the failing deferred, while this `awaitAll` fails immediately as soon as any of the deferreds fail.
43+
*
44+
* This suspending function is cancellable.
45+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
46+
* this function immediately resumes with [CancellationException].
47+
*/
48+
public suspend fun <T> Collection<Deferred<T>>.awaitAll(): List<T> =
49+
if (isEmpty()) emptyList() else AwaitAll(toTypedArray()).await()
50+
51+
/**
52+
* Suspends current coroutine until all given jobs are complete.
53+
* This method is semantically equivalent to joining all given jobs one by one with `jobs.forEach { it.join() }`.
54+
*
55+
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
56+
* this function immediately resumes with [CancellationException].
57+
*/
58+
public suspend fun joinAll(vararg jobs: Job): Unit = jobs.forEach { it.join() }
59+
60+
/**
61+
* Suspends current coroutine until all given jobs are complete.
62+
* This method is semantically equivalent to joining all given jobs one by one with `forEach { it.join() }`.
63+
*
64+
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
65+
* this function immediately resumes with [CancellationException].
66+
*/
67+
public suspend fun Collection<Job>.joinAll(): Unit = forEach { it.join() }
68+
69+
private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {
70+
private val notCompletedCount = atomic(deferreds.size)
71+
72+
suspend fun await(): List<T> = suspendCancellableCoroutine { cont ->
73+
deferreds.forEach {
74+
it.start() // To properly await lazily started deferreds
75+
cont.disposeOnCompletion(it.invokeOnCompletion(AwaitAllNode(cont, it).asHandler))
76+
}
77+
}
78+
79+
inner class AwaitAllNode(private val continuation: CancellableContinuation<List<T>>, job: Job) : JobNode<Job>(job) {
80+
override fun invoke(cause: Throwable?) {
81+
if (cause != null) {
82+
val token = continuation.tryResumeWithException(cause)
83+
if (token != null) {
84+
continuation.completeResume(token)
85+
}
86+
} else if (notCompletedCount.decrementAndGet() == 0) {
87+
continuation.resume(deferreds.map { it.getCompleted() })
88+
}
89+
}
90+
}
91+
}

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Builders.common.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ private open class StandaloneCoroutine(
171171
override fun hasOnFinishingHandler(update: Any?) = update is CompletedExceptionally
172172
override fun onFinishingInternal(update: Any?) {
173173
// note the use of the parent's job context below!
174-
if (update is CompletedExceptionally) handleCoroutineException(parentContext, update.exception)
174+
if (update is CompletedExceptionally) handleCoroutineException(parentContext, update.cause)
175175
}
176176
}
177177

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CompletedExceptionally.kt

+10-37
Original file line numberDiff line numberDiff line change
@@ -22,55 +22,28 @@ import kotlinx.coroutines.experimental.internalAnnotations.*
2222
* Class for an internal state of a job that had completed exceptionally, including cancellation.
2323
*
2424
* **Note: This class cannot be used outside of internal coroutines framework**.
25+
* **Note: cannot be internal until we get rid of MutableDelegateContinuation in IO**
2526
*
26-
* @param cause the exceptional completion cause. If `cause` is null, then an exception is
27-
* if created via [createException] on first get from [exception] property.
28-
* @param allowNullCause if `null` cause is allowed.
27+
* @param cause the exceptional completion cause. It's either original exceptional cause
28+
* or artificial JobCancellationException if no cause was provided
2929
* @suppress **This is unstable API and it is subject to change.**
3030
*/
31-
public open class CompletedExceptionally protected constructor(
32-
@JvmField public val cause: Throwable?,
33-
allowNullCause: Boolean
31+
open class CompletedExceptionally(
32+
@JvmField public val cause: Throwable
3433
) {
35-
/**
36-
* Creates exceptionally completed state.
37-
* @param cause the exceptional completion cause.
38-
*/
39-
public constructor(cause: Throwable) : this(cause, false)
40-
41-
@Volatile
42-
private var _exception: Throwable? = cause // will materialize JobCancellationException on first need
43-
44-
init {
45-
require(allowNullCause || cause != null) { "Null cause is not allowed" }
46-
}
47-
48-
/**
49-
* Returns completion exception.
50-
*/
51-
public val exception: Throwable get() =
52-
_exception ?: // atomic read volatile var or else create new
53-
createException().also { _exception = it }
54-
55-
protected open fun createException(): Throwable = error("Completion exception was not specified")
56-
57-
override fun toString(): String = "$classSimpleName[$exception]"
34+
override fun toString(): String = "$classSimpleName[$cause]"
5835
}
5936

6037
/**
6138
* A specific subclass of [CompletedExceptionally] for cancelled jobs.
6239
*
6340
* **Note: This class cannot be used outside of internal coroutines framework**.
64-
*
41+
*
6542
* @param job the job that was cancelled.
66-
* @param cause the exceptional completion cause. If `cause` is null, then a [JobCancellationException]
67-
* if created on first get from [exception] property.
43+
* @param cause the exceptional completion cause. If `cause` is null, then a [JobCancellationException] is created.
6844
* @suppress **This is unstable API and it is subject to change.**
6945
*/
70-
public class Cancelled(
46+
internal class Cancelled(
7147
private val job: Job,
7248
cause: Throwable?
73-
) : CompletedExceptionally(cause, true) {
74-
override fun createException(): Throwable = JobCancellationException("Job was cancelled normally", null, job)
75-
}
76-
49+
) : CompletedExceptionally(cause ?: JobCancellationException("Job was cancelled normally", null, job))

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CompletionHandler.common.kt

+6
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@ import kotlinx.coroutines.experimental.internal.*
2424
* Installed handler should not throw any exceptions. If it does, they will get caught,
2525
* wrapped into [CompletionHandlerException], and rethrown, potentially causing crash of unrelated code.
2626
*
27+
* The meaning of `cause` that is passed to the handler:
28+
* * Cause is `null` when job has completed normally.
29+
* * Cause is an instance of [CancellationException] when job was cancelled _normally_.
30+
* **It should not be treated as an error**. In particular, it should not be reported to error logs.
31+
* * Otherwise, the job had _failed_.
32+
*
2733
* **Note**: This type is a part of internal machinery that supports parent-child hierarchies
2834
* and allows for implementation of suspending functions that wait on the Job's state.
2935
* This type should not be used in general application code.

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ public interface Deferred<out T> : Job {
152152
* Other options can be specified via `start` parameter. See [CoroutineStart] for details.
153153
* An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case,,
154154
* the resulting [Deferred] is created in _new_ state. It can be explicitly started with [start][Job.start]
155-
* function and will be started implicitly on the first invocation of [join][Job.join] or [await][Deferred.await].
155+
* function and will be started implicitly on the first invocation of [join][Job.join], [await][Deferred.await] or [awaitAll].
156156
*
157157
* @param context context of the coroutine. The default value is [DefaultDispatcher].
158158
* @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Dispatched.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public interface DispatchedTask<in T> : Runnable {
142142
state as T
143143

144144
public fun getExceptionalResult(state: Any?): Throwable? =
145-
(state as? CompletedExceptionally)?.exception
145+
(state as? CompletedExceptionally)?.cause
146146

147147
public override fun run() {
148148
try {

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.common.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public expect class JobCancellationException(
2525
cause: Throwable?,
2626
job: Job
2727
) : CancellationException {
28-
val job: Job
28+
internal val job: Job
2929
}
3030

3131
internal expect class DispatchException(message: String, cause: Throwable) : RuntimeException

0 commit comments

Comments
 (0)