Skip to content

Commit 11e6677

Browse files
committed
Merge develop into decouple-job
# Conflicts: # common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/AbstractContinuation.kt # common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CompletedExceptionally.kt # common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt # common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt
2 parents f6430f4 + 76146bb commit 11e6677

File tree

97 files changed

+2163
-632
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

97 files changed

+2163
-632
lines changed

build.gradle

+40-6
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
1-
allprojects {
2-
def deployVersion = properties['DeployVersion']
3-
if (deployVersion != null) version = deployVersion
4-
}
5-
61
buildscript {
7-
if (rootProject.properties['kotlinSnapshot'] != null) {
2+
ext.useKotlinSnapshot = rootProject.properties['kotlinSnapshot'] != null
3+
if (useKotlinSnapshot) {
84
ext.kotlin_version = '1.2-SNAPSHOT'
95
repositories {
106
maven { url "https://oss.sonatype.org/content/repositories/snapshots" }
@@ -17,6 +13,18 @@ buildscript {
1713
maven { url "https://jetbrains.bintray.com/kotlin-native-dependencies" }
1814
maven { url "https://plugins.gradle.org/m2/" }
1915
}
16+
configurations.classpath {
17+
resolutionStrategy {
18+
eachDependency { DependencyResolveDetails details ->
19+
if (details.requested.group == 'org.jetbrains.kotlin' && details.requested.name != 'kotlin-native-gradle-plugin') {
20+
// fix version of all dependencies from org.jetbrains.kotlin group
21+
// even when other dependencies require other versions indirectly,
22+
// except kotlin-native, which has its own pre-release versioning
23+
details.useVersion kotlin_version
24+
}
25+
}
26+
}
27+
}
2028
dependencies {
2129
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
2230
classpath "org.jetbrains.kotlin:kotlin-native-gradle-plugin:$kotlin_native_version"
@@ -27,6 +35,15 @@ buildscript {
2735
}
2836
}
2937

38+
allprojects {
39+
def deployVersion = properties['DeployVersion']
40+
if (deployVersion != null) version = deployVersion
41+
if (useKotlinSnapshot) {
42+
kotlin_version = '1.2-SNAPSHOT'
43+
}
44+
}
45+
46+
3047
// Report Kotlin compiler version when building project
3148
println("Using Kotlin compiler version: $org.jetbrains.kotlin.config.KotlinCompilerVersion.VERSION")
3249

@@ -46,6 +63,23 @@ static def platformLib(base, platform) {
4663
return "$base-$platform"
4764
}
4865

66+
subprojects {
67+
if (useKotlinSnapshot) {
68+
repositories {
69+
maven { url "https://oss.sonatype.org/content/repositories/snapshots" }
70+
}
71+
}
72+
configurations.all {
73+
resolutionStrategy {
74+
eachDependency { DependencyResolveDetails details ->
75+
if (details.requested.group == 'org.jetbrains.kotlin') {
76+
details.useVersion kotlin_version
77+
}
78+
}
79+
}
80+
}
81+
}
82+
4983
configure(subprojects.findAll { !sourceless.contains(it.name) }) {
5084
def platform = platformOf(it)
5185
apply from: rootProject.file("gradle/compile-${platform}.gradle")

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/AbstractContinuation.kt

+11-13
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ internal abstract class AbstractContinuation<in T>(
101101
return
102102
}
103103
parent.start() // make sure the parent is started
104-
val handle = parent.invokeOnCompletion(onCancelling = true, handler = ChildContinuation(parent, this).asHandler)
104+
val handle = parent.invokeOnCompletion(onCancelling = true,
105+
handler = ChildContinuation(parent, this).asHandler)
105106

106107
parentHandle = handle
107108
// now check our state _after_ registering (see updateStateToFinal order of actions)
@@ -146,7 +147,7 @@ internal abstract class AbstractContinuation<in T>(
146147
if (trySuspend()) return COROUTINE_SUSPENDED
147148
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
148149
val state = this.state
149-
if (state is CompletedExceptionally) throw state.exception
150+
if (state is CompletedExceptionally) throw state.cause
150151
return getSuccessfulResult(state)
151152
}
152153

@@ -254,7 +255,7 @@ internal abstract class AbstractContinuation<in T>(
254255
* }
255256
* ```
256257
*/
257-
if (proposedUpdate.exception is CancellationException) {
258+
if (proposedUpdate.cause is CancellationException) {
258259
// Keep original cancellation cause and try add to suppressed exception from proposed cancel
259260
update = state.cancel
260261
coerceWithCancellation(state, proposedUpdate, update)
@@ -269,11 +270,10 @@ internal abstract class AbstractContinuation<in T>(
269270
* }
270271
* ```
271272
*/
272-
val exception = proposedUpdate.exception
273-
// TODO clashes with await all
273+
val exception = proposedUpdate.cause
274274
val currentException = state.cancel.cause
275275
// Add to suppressed if original cancellation differs from proposed exception
276-
if (currentException != null && (currentException !is CancellationException || currentException.cause !== exception)) {
276+
if (currentException !is CancellationException || currentException.cause !== exception) {
277277
exception.addSuppressedThrowable(currentException)
278278
}
279279

@@ -304,15 +304,13 @@ internal abstract class AbstractContinuation<in T>(
304304
// Coerce current cancelling state with proposed cancellation
305305
private fun coerceWithCancellation(state: Cancelling, proposedUpdate: CompletedExceptionally, update: CompletedExceptionally) {
306306
val originalCancellation = state.cancel
307-
val originalException = originalCancellation.exception
307+
val originalException = originalCancellation.cause
308308
val updateCause = proposedUpdate.cause
309-
310309
// Cause of proposed update is present and differs from one in current state TODO clashes with await all
311-
val isSameCancellation = originalCancellation.exception is CancellationException
312-
&& originalException.cause === updateCause?.cause
313-
314-
if (!isSameCancellation && updateCause !== null && originalException.cause !== updateCause) {
315-
update.exception.addSuppressedThrowable(updateCause)
310+
val isSameCancellation = originalCancellation.cause is CancellationException
311+
&& originalException.cause === updateCause.cause
312+
if (!isSameCancellation && originalException.cause !== updateCause) {
313+
update.cause.addSuppressedThrowable(updateCause)
316314
}
317315
}
318316

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/AbstractCoroutine.kt

+6-3
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,11 @@ public abstract class AbstractCoroutine<in T>(
7676
* This function is invoked once when this coroutine is cancelled or is completed,
7777
* similarly to [invokeOnCompletion] with `onCancelling` set to `true`.
7878
*
79-
* @param cause the cause that was passed to [Job.cancel] function or `null` if coroutine was cancelled
80-
* without cause or is completing normally.
79+
* The meaning of [cause] parameter:
80+
* * Cause is `null` when job has completed normally.
81+
* * Cause is an instance of [CancellationException] when job was cancelled _normally_.
82+
* **It should not be treated as an error**. In particular, it should not be reported to error logs.
83+
* * Otherwise, the job had _failed_.
8184
*/
8285
protected open fun onCancellation(cause: Throwable?) {}
8386

@@ -98,7 +101,7 @@ public abstract class AbstractCoroutine<in T>(
98101
@Suppress("UNCHECKED_CAST")
99102
internal override fun onCompletionInternal(state: Any?, mode: Int) {
100103
if (state is CompletedExceptionally)
101-
onCompletedExceptionally(state.exception)
104+
onCompletedExceptionally(state.cause)
102105
else
103106
onCompleted(state as T)
104107
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental
18+
19+
import kotlinx.atomicfu.atomic
20+
21+
/**
22+
* Awaits for completion of given deferred values without blocking a thread and resumes normally with the list of values
23+
* when all deferred computations are complete or resumes with the first thrown exception if any of computations
24+
* complete exceptionally including cancellation.
25+
*
26+
* This function is **not** equivalent to `deferreds.map { it.await() }` which fails only when when it sequentially
27+
* gets to wait the failing deferred, while this `awaitAll` fails immediately as soon as any of the deferreds fail.
28+
*
29+
* This suspending function is cancellable.
30+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
31+
* this function immediately resumes with [CancellationException].
32+
*/
33+
public suspend fun <T> awaitAll(vararg deferreds: Deferred<T>): List<T> =
34+
if (deferreds.isEmpty()) emptyList() else AwaitAll(deferreds).await()
35+
36+
/**
37+
* Awaits for completion of given deferred values without blocking a thread and resumes normally with the list of values
38+
* when all deferred computations are complete or resumes with the first thrown exception if any of computations
39+
* complete exceptionally including cancellation.
40+
*
41+
* This function is **not** equivalent to `this.map { it.await() }` which fails only when when it sequentially
42+
* gets to wait the failing deferred, while this `awaitAll` fails immediately as soon as any of the deferreds fail.
43+
*
44+
* This suspending function is cancellable.
45+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
46+
* this function immediately resumes with [CancellationException].
47+
*/
48+
public suspend fun <T> Collection<Deferred<T>>.awaitAll(): List<T> =
49+
if (isEmpty()) emptyList() else AwaitAll(toTypedArray()).await()
50+
51+
/**
52+
* Suspends current coroutine until all given jobs are complete.
53+
* This method is semantically equivalent to joining all given jobs one by one with `jobs.forEach { it.join() }`.
54+
*
55+
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
56+
* this function immediately resumes with [CancellationException].
57+
*/
58+
public suspend fun joinAll(vararg jobs: Job): Unit = jobs.forEach { it.join() }
59+
60+
/**
61+
* Suspends current coroutine until all given jobs are complete.
62+
* This method is semantically equivalent to joining all given jobs one by one with `forEach { it.join() }`.
63+
*
64+
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
65+
* this function immediately resumes with [CancellationException].
66+
*/
67+
public suspend fun Collection<Job>.joinAll(): Unit = forEach { it.join() }
68+
69+
private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {
70+
private val notCompletedCount = atomic(deferreds.size)
71+
72+
suspend fun await(): List<T> = suspendCancellableCoroutine { cont ->
73+
val handlers = deferreds.map {
74+
it.start() // To properly await lazily started deferreds
75+
it.invokeOnCompletion(AwaitAllNode(cont, it).asHandler)
76+
}
77+
cont.invokeOnCancellation {
78+
handlers.forEach { it.dispose() }
79+
}
80+
}
81+
82+
inner class AwaitAllNode(private val continuation: CancellableContinuation<List<T>>, job: Job) : JobNode<Job>(job) {
83+
override fun invoke(cause: Throwable?) {
84+
if (cause != null) {
85+
val token = continuation.tryResumeWithException(cause)
86+
if (token != null) {
87+
continuation.completeResume(token)
88+
}
89+
} else if (notCompletedCount.decrementAndGet() == 0) {
90+
continuation.resume(deferreds.map { it.getCompleted() })
91+
}
92+
}
93+
}
94+
}

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Builders.common.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ private open class StandaloneCoroutine(
171171
override fun hasOnFinishingHandler(update: Any?) = update is CompletedExceptionally
172172
override fun onFinishingInternal(update: Any?) {
173173
// note the use of the parent's job context below!
174-
if (update is CompletedExceptionally) handleCoroutineException(parentContext, update.exception)
174+
if (update is CompletedExceptionally) handleCoroutineException(parentContext, update.cause)
175175
}
176176
}
177177

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CompletedExceptionally.kt

+12-40
Original file line numberDiff line numberDiff line change
@@ -23,39 +23,16 @@ import kotlin.coroutines.experimental.*
2323
* Class for an internal state of a job that had completed exceptionally, including cancellation.
2424
*
2525
* **Note: This class cannot be used outside of internal coroutines framework**.
26+
* **Note: cannot be internal until we get rid of MutableDelegateContinuation in IO**
2627
*
27-
* @param cause the exceptional completion cause. If `cause` is null, then an exception is
28-
* if created via [createException] on first get from [exception] property.
29-
* @param allowNullCause if `null` cause is allowed.
28+
* @param cause the exceptional completion cause. It's either original exceptional cause
29+
* or artificial JobCancellationException if no cause was provided
3030
* @suppress **This is unstable API and it is subject to change.**
3131
*/
32-
public open class CompletedExceptionally protected constructor(
33-
@JvmField public val cause: Throwable?,
34-
allowNullCause: Boolean
32+
open class CompletedExceptionally(
33+
@JvmField public val cause: Throwable
3534
) {
36-
/**
37-
* Creates exceptionally completed state.
38-
* @param cause the exceptional completion cause.
39-
*/
40-
public constructor(cause: Throwable) : this(cause, false)
41-
42-
@Volatile
43-
private var _exception: Throwable? = cause // will materialize JobCancellationException on first need
44-
45-
init {
46-
require(allowNullCause || cause != null) { "Null cause is not allowed" }
47-
}
48-
49-
/**
50-
* Returns completion exception.
51-
*/
52-
public val exception: Throwable get() =
53-
_exception ?: // atomic read volatile var or else create new
54-
createException().also { _exception = it }
55-
56-
protected open fun createException(): Throwable = error("Completion exception was not specified")
57-
58-
override fun toString(): String = "$classSimpleName[$exception]"
35+
override fun toString(): String = "$classSimpleName[$cause]"
5936
}
6037

6138
/**
@@ -65,16 +42,13 @@ public open class CompletedExceptionally protected constructor(
6542
* TODO rename to CancelledJob?
6643
*
6744
* @param job the job that was cancelled.
68-
* @param cause the exceptional completion cause. If `cause` is null, then a [JobCancellationException]
69-
* if created on first get from [exception] property.
45+
* @param cause the exceptional completion cause. If `cause` is null, then a [JobCancellationException] is created.
7046
* @suppress **This is unstable API and it is subject to change.**
7147
*/
72-
public class Cancelled(
73-
private val job: Job,
48+
internal class Cancelled(
49+
job: Job,
7450
cause: Throwable?
75-
) : CompletedExceptionally(cause, true) {
76-
override fun createException(): Throwable = JobCancellationException("Job was cancelled normally", null, job)
77-
}
51+
) : CompletedExceptionally(cause ?: JobCancellationException("Job was cancelled normally", null, job))
7852

7953
/**
8054
* A specific subclass of [CompletedExceptionally] for cancelled [AbstractContinuation].
@@ -87,8 +61,6 @@ public class Cancelled(
8761
* @suppress **This is unstable API and it is subject to change.**
8862
*/
8963
public class CancelledContinuation(
90-
private val continuation: Continuation<*>,
64+
continuation: Continuation<*>,
9165
cause: Throwable?
92-
) : CompletedExceptionally(cause, true) {
93-
override fun createException(): Throwable = CancellationException("Coroutine $continuation was cancelled normally")
94-
}
66+
) : CompletedExceptionally(cause ?: CancellationException("Continuation $continuation was cancelled normally"))

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CompletionHandler.common.kt

+6
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@ import kotlinx.coroutines.experimental.internal.*
2424
* Installed handler should not throw any exceptions. If it does, they will get caught,
2525
* wrapped into [CompletionHandlerException], and rethrown, potentially causing crash of unrelated code.
2626
*
27+
* The meaning of `cause` that is passed to the handler:
28+
* * Cause is `null` when job has completed normally.
29+
* * Cause is an instance of [CancellationException] when job was cancelled _normally_.
30+
* **It should not be treated as an error**. In particular, it should not be reported to error logs.
31+
* * Otherwise, the job had _failed_.
32+
*
2733
* **Note**: This type is a part of internal machinery that supports parent-child hierarchies
2834
* and allows for implementation of suspending functions that wait on the Job's state.
2935
* This type should not be used in general application code.

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ public interface Deferred<out T> : Job {
152152
* Other options can be specified via `start` parameter. See [CoroutineStart] for details.
153153
* An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case,,
154154
* the resulting [Deferred] is created in _new_ state. It can be explicitly started with [start][Job.start]
155-
* function and will be started implicitly on the first invocation of [join][Job.join] or [await][Deferred.await].
155+
* function and will be started implicitly on the first invocation of [join][Job.join], [await][Deferred.await] or [awaitAll].
156156
*
157157
* @param context context of the coroutine. The default value is [DefaultDispatcher].
158158
* @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Dispatched.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public interface DispatchedTask<in T> : Runnable {
142142
state as T
143143

144144
public fun getExceptionalResult(state: Any?): Throwable? =
145-
(state as? CompletedExceptionally)?.exception
145+
(state as? CompletedExceptionally)?.cause
146146

147147
public override fun run() {
148148
try {

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.common.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@ package kotlinx.coroutines.experimental
1818

1919
public expect class CompletionHandlerException(message: String, cause: Throwable) : RuntimeException
2020

21-
public expect open class CancellationException(message: String) : IllegalStateException
21+
public expect open class CancellationException(message: String?) : IllegalStateException
2222

2323
public expect class JobCancellationException(
2424
message: String,
2525
cause: Throwable?,
2626
job: Job
2727
) : CancellationException {
28-
val job: Job
28+
internal val job: Job
2929
}
3030

3131
internal expect class DispatchException(message: String, cause: Throwable) : RuntimeException

0 commit comments

Comments
 (0)