Skip to content

awaitAll and joinAll extensions #323

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Apr 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ internal abstract class AbstractContinuation<in T>(
if (trySuspend()) return COROUTINE_SUSPENDED
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
val state = this.state
if (state is CompletedExceptionally) throw state.exception
if (state is CompletedExceptionally) throw state.cause
return getSuccessfulResult(state)
}

Expand All @@ -99,8 +99,8 @@ internal abstract class AbstractContinuation<in T>(
}
is Cancelled -> {
// Ignore resumes in cancelled coroutines, but handle exception if a different one here
if (proposedUpdate is CompletedExceptionally && proposedUpdate.exception != state.exception)
handleException(proposedUpdate.exception)
if (proposedUpdate is CompletedExceptionally && proposedUpdate.cause != state.cause)
handleException(proposedUpdate.cause)
return
}
else -> error("Already resumed, but got $proposedUpdate")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,11 @@ public abstract class AbstractCoroutine<in T>(
* This function is invoked once when this coroutine is cancelled or is completed,
* similarly to [invokeOnCompletion] with `onCancelling` set to `true`.
*
* @param cause the cause that was passed to [Job.cancel] function or `null` if coroutine was cancelled
* without cause or is completing normally.
* The meaning of [cause] parameter:
* * Cause is `null` when job has completed normally.
* * Cause is an instance of [CancellationException] when job was cancelled _normally_.
* **It should not be treated as an error**. In particular, it should not be reported to error logs.
* * Otherwise, the job had _failed_.
*/
protected open fun onCancellation(cause: Throwable?) {}

Expand All @@ -98,7 +101,7 @@ public abstract class AbstractCoroutine<in T>(
@Suppress("UNCHECKED_CAST")
internal override fun onCompletionInternal(state: Any?, mode: Int) {
if (state is CompletedExceptionally)
onCompletedExceptionally(state.exception)
onCompletedExceptionally(state.cause)
else
onCompleted(state as T)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2016-2017 JetBrains s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kotlinx.coroutines.experimental

import kotlinx.atomicfu.atomic

/**
* Awaits for completion of given deferred values without blocking a thread and resumes normally with the list of values
* when all deferred computations are complete or resumes with the first thrown exception if any of computations
* complete exceptionally including cancellation.
*
* This function is **not** equivalent to `deferreds.map { it.await() }` which fails only when when it sequentially
* gets to wait the failing deferred, while this `awaitAll` fails immediately as soon as any of the deferreds fail.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
* this function immediately resumes with [CancellationException].
*/
public suspend fun <T> awaitAll(vararg deferreds: Deferred<T>): List<T> =
if (deferreds.isEmpty()) emptyList() else AwaitAll(deferreds).await()

/**
* Awaits for completion of given deferred values without blocking a thread and resumes normally with the list of values
* when all deferred computations are complete or resumes with the first thrown exception if any of computations
* complete exceptionally including cancellation.
*
* This function is **not** equivalent to `this.map { it.await() }` which fails only when when it sequentially
* gets to wait the failing deferred, while this `awaitAll` fails immediately as soon as any of the deferreds fail.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
* this function immediately resumes with [CancellationException].
*/
public suspend fun <T> Collection<Deferred<T>>.awaitAll(): List<T> =
if (isEmpty()) emptyList() else AwaitAll(toTypedArray()).await()

/**
* Suspends current coroutine until all given jobs are complete.
* This method is semantically equivalent to joining all given jobs one by one with `jobs.forEach { it.join() }`.
*
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
* this function immediately resumes with [CancellationException].
*/
public suspend fun joinAll(vararg jobs: Job): Unit = jobs.forEach { it.join() }

/**
* Suspends current coroutine until all given jobs are complete.
* This method is semantically equivalent to joining all given jobs one by one with `forEach { it.join() }`.
*
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
* this function immediately resumes with [CancellationException].
*/
public suspend fun Collection<Job>.joinAll(): Unit = forEach { it.join() }

private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {
private val notCompletedCount = atomic(deferreds.size)

suspend fun await(): List<T> = suspendCancellableCoroutine { cont ->
deferreds.forEach {
it.start() // To properly await lazily started deferreds
cont.disposeOnCompletion(it.invokeOnCompletion(AwaitAllNode(cont, it).asHandler))
}
}

inner class AwaitAllNode(private val continuation: CancellableContinuation<List<T>>, job: Job) : JobNode<Job>(job) {
override fun invoke(cause: Throwable?) {
if (cause != null) {
val token = continuation.tryResumeWithException(cause)
if (token != null) {
continuation.completeResume(token)
}
} else if (notCompletedCount.decrementAndGet() == 0) {
continuation.resume(deferreds.map { it.getCompleted() })
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private open class StandaloneCoroutine(
override fun hasOnFinishingHandler(update: Any?) = update is CompletedExceptionally
override fun onFinishingInternal(update: Any?) {
// note the use of the parent's job context below!
if (update is CompletedExceptionally) handleCoroutineException(parentContext, update.exception)
if (update is CompletedExceptionally) handleCoroutineException(parentContext, update.cause)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,55 +22,28 @@ import kotlinx.coroutines.experimental.internalAnnotations.*
* Class for an internal state of a job that had completed exceptionally, including cancellation.
*
* **Note: This class cannot be used outside of internal coroutines framework**.
* **Note: cannot be internal until we get rid of MutableDelegateContinuation in IO**
*
* @param cause the exceptional completion cause. If `cause` is null, then an exception is
* if created via [createException] on first get from [exception] property.
* @param allowNullCause if `null` cause is allowed.
* @param cause the exceptional completion cause. It's either original exceptional cause
* or artificial JobCancellationException if no cause was provided
* @suppress **This is unstable API and it is subject to change.**
*/
public open class CompletedExceptionally protected constructor(
@JvmField public val cause: Throwable?,
allowNullCause: Boolean
open class CompletedExceptionally(
@JvmField public val cause: Throwable
) {
/**
* Creates exceptionally completed state.
* @param cause the exceptional completion cause.
*/
public constructor(cause: Throwable) : this(cause, false)

@Volatile
private var _exception: Throwable? = cause // will materialize JobCancellationException on first need

init {
require(allowNullCause || cause != null) { "Null cause is not allowed" }
}

/**
* Returns completion exception.
*/
public val exception: Throwable get() =
_exception ?: // atomic read volatile var or else create new
createException().also { _exception = it }

protected open fun createException(): Throwable = error("Completion exception was not specified")

override fun toString(): String = "$classSimpleName[$exception]"
override fun toString(): String = "$classSimpleName[$cause]"
}

/**
* A specific subclass of [CompletedExceptionally] for cancelled jobs.
*
* **Note: This class cannot be used outside of internal coroutines framework**.
*
*
* @param job the job that was cancelled.
* @param cause the exceptional completion cause. If `cause` is null, then a [JobCancellationException]
* if created on first get from [exception] property.
* @param cause the exceptional completion cause. If `cause` is null, then a [JobCancellationException] is created.
* @suppress **This is unstable API and it is subject to change.**
*/
public class Cancelled(
internal class Cancelled(
private val job: Job,
cause: Throwable?
) : CompletedExceptionally(cause, true) {
override fun createException(): Throwable = JobCancellationException("Job was cancelled normally", null, job)
}

) : CompletedExceptionally(cause ?: JobCancellationException("Job was cancelled normally", null, job))
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ import kotlinx.coroutines.experimental.internal.*
* Installed handler should not throw any exceptions. If it does, they will get caught,
* wrapped into [CompletionHandlerException], and rethrown, potentially causing crash of unrelated code.
*
* The meaning of `cause` that is passed to the handler:
* * Cause is `null` when job has completed normally.
* * Cause is an instance of [CancellationException] when job was cancelled _normally_.
* **It should not be treated as an error**. In particular, it should not be reported to error logs.
* * Otherwise, the job had _failed_.
*
* **Note**: This type is a part of internal machinery that supports parent-child hierarchies
* and allows for implementation of suspending functions that wait on the Job's state.
* This type should not be used in general application code.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public interface Deferred<out T> : Job {
* Other options can be specified via `start` parameter. See [CoroutineStart] for details.
* An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case,,
* the resulting [Deferred] is created in _new_ state. It can be explicitly started with [start][Job.start]
* function and will be started implicitly on the first invocation of [join][Job.join] or [await][Deferred.await].
* function and will be started implicitly on the first invocation of [join][Job.join], [await][Deferred.await] or [awaitAll].
*
* @param context context of the coroutine. The default value is [DefaultDispatcher].
* @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public interface DispatchedTask<in T> : Runnable {
state as T

public fun getExceptionalResult(state: Any?): Throwable? =
(state as? CompletedExceptionally)?.exception
(state as? CompletedExceptionally)?.cause

public override fun run() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public expect class JobCancellationException(
cause: Throwable?,
job: Job
) : CancellationException {
val job: Job
internal val job: Job
}

internal expect class DispatchException(message: String, cause: Throwable) : RuntimeException
Expand Down
Loading