diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/AbstractContinuation.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/AbstractContinuation.kt index 67a7fc64e7..707d6c2e53 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/AbstractContinuation.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/AbstractContinuation.kt @@ -75,7 +75,7 @@ internal abstract class AbstractContinuation( if (trySuspend()) return COROUTINE_SUSPENDED // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state val state = this.state - if (state is CompletedExceptionally) throw state.exception + if (state is CompletedExceptionally) throw state.cause return getSuccessfulResult(state) } @@ -99,8 +99,8 @@ internal abstract class AbstractContinuation( } is Cancelled -> { // Ignore resumes in cancelled coroutines, but handle exception if a different one here - if (proposedUpdate is CompletedExceptionally && proposedUpdate.exception != state.exception) - handleException(proposedUpdate.exception) + if (proposedUpdate is CompletedExceptionally && proposedUpdate.cause != state.cause) + handleException(proposedUpdate.cause) return } else -> error("Already resumed, but got $proposedUpdate") diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/AbstractCoroutine.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/AbstractCoroutine.kt index 5143c15a39..2078a3cc1e 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/AbstractCoroutine.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/AbstractCoroutine.kt @@ -76,8 +76,11 @@ public abstract class AbstractCoroutine( * This function is invoked once when this coroutine is cancelled or is completed, * similarly to [invokeOnCompletion] with `onCancelling` set to `true`. * - * @param cause the cause that was passed to [Job.cancel] function or `null` if coroutine was cancelled - * without cause or is completing normally. + * The meaning of [cause] parameter: + * * Cause is `null` when job has completed normally. + * * Cause is an instance of [CancellationException] when job was cancelled _normally_. + * **It should not be treated as an error**. In particular, it should not be reported to error logs. + * * Otherwise, the job had _failed_. */ protected open fun onCancellation(cause: Throwable?) {} @@ -98,7 +101,7 @@ public abstract class AbstractCoroutine( @Suppress("UNCHECKED_CAST") internal override fun onCompletionInternal(state: Any?, mode: Int) { if (state is CompletedExceptionally) - onCompletedExceptionally(state.exception) + onCompletedExceptionally(state.cause) else onCompleted(state as T) } diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Await.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Await.kt new file mode 100644 index 0000000000..59285b3028 --- /dev/null +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Await.kt @@ -0,0 +1,91 @@ +/* + * Copyright 2016-2017 JetBrains s.r.o. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kotlinx.coroutines.experimental + +import kotlinx.atomicfu.atomic + +/** + * Awaits for completion of given deferred values without blocking a thread and resumes normally with the list of values + * when all deferred computations are complete or resumes with the first thrown exception if any of computations + * complete exceptionally including cancellation. + * + * This function is **not** equivalent to `deferreds.map { it.await() }` which fails only when when it sequentially + * gets to wait the failing deferred, while this `awaitAll` fails immediately as soon as any of the deferreds fail. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, + * this function immediately resumes with [CancellationException]. + */ +public suspend fun awaitAll(vararg deferreds: Deferred): List = + if (deferreds.isEmpty()) emptyList() else AwaitAll(deferreds).await() + +/** + * Awaits for completion of given deferred values without blocking a thread and resumes normally with the list of values + * when all deferred computations are complete or resumes with the first thrown exception if any of computations + * complete exceptionally including cancellation. + * + * This function is **not** equivalent to `this.map { it.await() }` which fails only when when it sequentially + * gets to wait the failing deferred, while this `awaitAll` fails immediately as soon as any of the deferreds fail. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, + * this function immediately resumes with [CancellationException]. + */ +public suspend fun Collection>.awaitAll(): List = + if (isEmpty()) emptyList() else AwaitAll(toTypedArray()).await() + +/** + * Suspends current coroutine until all given jobs are complete. + * This method is semantically equivalent to joining all given jobs one by one with `jobs.forEach { it.join() }`. + * + * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, + * this function immediately resumes with [CancellationException]. + */ +public suspend fun joinAll(vararg jobs: Job): Unit = jobs.forEach { it.join() } + +/** + * Suspends current coroutine until all given jobs are complete. + * This method is semantically equivalent to joining all given jobs one by one with `forEach { it.join() }`. + * + * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, + * this function immediately resumes with [CancellationException]. + */ +public suspend fun Collection.joinAll(): Unit = forEach { it.join() } + +private class AwaitAll(private val deferreds: Array>) { + private val notCompletedCount = atomic(deferreds.size) + + suspend fun await(): List = suspendCancellableCoroutine { cont -> + deferreds.forEach { + it.start() // To properly await lazily started deferreds + cont.disposeOnCompletion(it.invokeOnCompletion(AwaitAllNode(cont, it).asHandler)) + } + } + + inner class AwaitAllNode(private val continuation: CancellableContinuation>, job: Job) : JobNode(job) { + override fun invoke(cause: Throwable?) { + if (cause != null) { + val token = continuation.tryResumeWithException(cause) + if (token != null) { + continuation.completeResume(token) + } + } else if (notCompletedCount.decrementAndGet() == 0) { + continuation.resume(deferreds.map { it.getCompleted() }) + } + } + } +} diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Builders.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Builders.common.kt index 5c6d1f20c1..63f7fe28e5 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Builders.common.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Builders.common.kt @@ -171,7 +171,7 @@ private open class StandaloneCoroutine( override fun hasOnFinishingHandler(update: Any?) = update is CompletedExceptionally override fun onFinishingInternal(update: Any?) { // note the use of the parent's job context below! - if (update is CompletedExceptionally) handleCoroutineException(parentContext, update.exception) + if (update is CompletedExceptionally) handleCoroutineException(parentContext, update.cause) } } diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CompletedExceptionally.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CompletedExceptionally.kt index 8e3276e24d..31c5c6dfa3 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CompletedExceptionally.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CompletedExceptionally.kt @@ -22,55 +22,28 @@ import kotlinx.coroutines.experimental.internalAnnotations.* * Class for an internal state of a job that had completed exceptionally, including cancellation. * * **Note: This class cannot be used outside of internal coroutines framework**. + * **Note: cannot be internal until we get rid of MutableDelegateContinuation in IO** * - * @param cause the exceptional completion cause. If `cause` is null, then an exception is - * if created via [createException] on first get from [exception] property. - * @param allowNullCause if `null` cause is allowed. + * @param cause the exceptional completion cause. It's either original exceptional cause + * or artificial JobCancellationException if no cause was provided * @suppress **This is unstable API and it is subject to change.** */ -public open class CompletedExceptionally protected constructor( - @JvmField public val cause: Throwable?, - allowNullCause: Boolean +open class CompletedExceptionally( + @JvmField public val cause: Throwable ) { - /** - * Creates exceptionally completed state. - * @param cause the exceptional completion cause. - */ - public constructor(cause: Throwable) : this(cause, false) - - @Volatile - private var _exception: Throwable? = cause // will materialize JobCancellationException on first need - - init { - require(allowNullCause || cause != null) { "Null cause is not allowed" } - } - - /** - * Returns completion exception. - */ - public val exception: Throwable get() = - _exception ?: // atomic read volatile var or else create new - createException().also { _exception = it } - - protected open fun createException(): Throwable = error("Completion exception was not specified") - - override fun toString(): String = "$classSimpleName[$exception]" + override fun toString(): String = "$classSimpleName[$cause]" } /** * A specific subclass of [CompletedExceptionally] for cancelled jobs. * * **Note: This class cannot be used outside of internal coroutines framework**. - * + * * @param job the job that was cancelled. - * @param cause the exceptional completion cause. If `cause` is null, then a [JobCancellationException] - * if created on first get from [exception] property. + * @param cause the exceptional completion cause. If `cause` is null, then a [JobCancellationException] is created. * @suppress **This is unstable API and it is subject to change.** */ -public class Cancelled( +internal class Cancelled( private val job: Job, cause: Throwable? -) : CompletedExceptionally(cause, true) { - override fun createException(): Throwable = JobCancellationException("Job was cancelled normally", null, job) -} - +) : CompletedExceptionally(cause ?: JobCancellationException("Job was cancelled normally", null, job)) diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CompletionHandler.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CompletionHandler.common.kt index dc2fd9ae31..e398fae220 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CompletionHandler.common.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CompletionHandler.common.kt @@ -24,6 +24,12 @@ import kotlinx.coroutines.experimental.internal.* * Installed handler should not throw any exceptions. If it does, they will get caught, * wrapped into [CompletionHandlerException], and rethrown, potentially causing crash of unrelated code. * + * The meaning of `cause` that is passed to the handler: + * * Cause is `null` when job has completed normally. + * * Cause is an instance of [CancellationException] when job was cancelled _normally_. + * **It should not be treated as an error**. In particular, it should not be reported to error logs. + * * Otherwise, the job had _failed_. + * * **Note**: This type is a part of internal machinery that supports parent-child hierarchies * and allows for implementation of suspending functions that wait on the Job's state. * This type should not be used in general application code. diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt index 3f15e8a580..7f23fbda35 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt @@ -152,7 +152,7 @@ public interface Deferred : Job { * Other options can be specified via `start` parameter. See [CoroutineStart] for details. * An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case,, * the resulting [Deferred] is created in _new_ state. It can be explicitly started with [start][Job.start] - * function and will be started implicitly on the first invocation of [join][Job.join] or [await][Deferred.await]. + * function and will be started implicitly on the first invocation of [join][Job.join], [await][Deferred.await] or [awaitAll]. * * @param context context of the coroutine. The default value is [DefaultDispatcher]. * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT]. diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Dispatched.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Dispatched.kt index e81e5a920d..5bd1668db9 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Dispatched.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Dispatched.kt @@ -142,7 +142,7 @@ public interface DispatchedTask : Runnable { state as T public fun getExceptionalResult(state: Any?): Throwable? = - (state as? CompletedExceptionally)?.exception + (state as? CompletedExceptionally)?.cause public override fun run() { try { diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.common.kt index 6759e8f845..8743d2bab8 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.common.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.common.kt @@ -25,7 +25,7 @@ public expect class JobCancellationException( cause: Throwable?, job: Job ) : CancellationException { - val job: Job + internal val job: Job } internal expect class DispatchException(message: String, cause: Throwable) : RuntimeException diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt index 7cb17a5f5b..3642688a08 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt @@ -279,6 +279,12 @@ public interface Job : CoroutineContext.Element { * with a job's exception or cancellation cause or `null`. Otherwise, handler will be invoked once when this * job is complete. * + * The meaning of `cause` that is passed to the handler: + * * Cause is `null` when job has completed normally. + * * Cause is an instance of [CancellationException] when job was cancelled _normally_. + * **It should not be treated as an error**. In particular, it should not be reported to error logs. + * * Otherwise, the job had _failed_. + * * The resulting [DisposableHandle] can be used to [dispose][DisposableHandle.dispose] the * registration of this handler and release its memory if its invocation is no longer needed. * There is no need to dispose the handler after completion of this job. The references to @@ -297,6 +303,12 @@ public interface Job : CoroutineContext.Element { * with a job's cancellation cause or `null` unless [invokeImmediately] is set to false. * Otherwise, handler will be invoked once when this job is cancelled or complete. * + * The meaning of `cause` that is passed to the handler: + * * Cause is `null` when job has completed normally. + * * Cause is an instance of [CancellationException] when job was cancelled _normally_. + * **It should not be treated as an error**. In particular, it should not be reported to error logs. + * * Otherwise, the job had _failed_. + * * Invocation of this handler on a transition to a transient _cancelling_ state * is controlled by [onCancelling] boolean parameter. * The handler is invoked on invocation of [cancel] when @@ -641,15 +653,19 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0 private fun isCorrespondinglyCancelled(cancelled: Cancelled, proposedUpdate: Any?): Boolean { if (proposedUpdate !is Cancelled) return false // NOTE: equality comparison of causes is performed here by design, see equals of JobCancellationException - return proposedUpdate.cause == cancelled.cause || - proposedUpdate.cause is JobCancellationException && cancelled.cause == null + return proposedUpdate.cause == cancelled.cause || proposedUpdate.cause is JobCancellationException } private fun createCancelled(cancelled: Cancelled, proposedUpdate: Any?): Cancelled { if (proposedUpdate !is CompletedExceptionally) return cancelled // not exception -- just use original cancelled - val exception = proposedUpdate.exception - if (cancelled.exception == exception) return cancelled // that is the cancelled we need already! - cancelled.cause?.let { exception.addSuppressedThrowable(it) } + val exception = proposedUpdate.cause + if (cancelled.cause == exception) return cancelled // that is the cancelled we need already! + // todo: We need to rework this logic to keep original cancellation cause in the state and suppress other exceptions + // that could have occurred while coroutine is being cancelled. + // Do not spam with JCE in suppressed exceptions + if (cancelled.cause !is JobCancellationException) { + exception.addSuppressedThrowable(cancelled.cause) + } return Cancelled(this, exception) } @@ -750,11 +766,11 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0 val state = this.state return when { state is Finishing && state.cancelled != null -> - state.cancelled.exception.toCancellationException("Job is being cancelled") + state.cancelled.cause.toCancellationException("Job is being cancelled") state is Incomplete -> error("Job was not completed or cancelled yet: $this") state is CompletedExceptionally -> - state.exception.toCancellationException("Job has failed") + state.cause.toCancellationException("Job has failed") else -> JobCancellationException("Job has completed normally", null, this) } } @@ -764,9 +780,8 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0 /** * Returns the cause that signals the completion of this job -- it returns the original - * [cancel] cause or **`null` if this job had completed - * normally or was cancelled without a cause**. This function throws - * [IllegalStateException] when invoked for an job that has not [completed][isCompleted] nor + * [cancel] cause, [JobCancellationException] or **`null` if this job had completed normally**. + * This function throws [IllegalStateException] when invoked for an job that has not [completed][isCompleted] nor * [isCancelled] yet. */ protected fun getCompletionCause(): Throwable? { @@ -1052,7 +1067,7 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0 } // cancel all children in list on exceptional completion if (proposedUpdate is CompletedExceptionally) - child?.cancelChildrenInternal(proposedUpdate.exception) + child?.cancelChildrenInternal(proposedUpdate.cause) // switch to completing state val cancelled = (state as? Finishing)?.cancelled ?: (proposedUpdate as? Cancelled) val completing = Finishing(list, cancelled, true) @@ -1072,7 +1087,7 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0 } private val Any?.exceptionOrNull: Throwable? - get() = (this as? CompletedExceptionally)?.exception + get() = (this as? CompletedExceptionally)?.cause private fun firstChild(state: Incomplete) = state as? Child ?: state.list?.nextChild() @@ -1224,7 +1239,7 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0 internal fun getCompletedInternal(): Any? { val state = this.state check(state !is Incomplete) { "This job has not completed yet" } - if (state is CompletedExceptionally) throw state.exception + if (state is CompletedExceptionally) throw state.cause return state } @@ -1237,7 +1252,7 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0 val state = this.state if (state !is Incomplete) { // already complete -- just return result - if (state is CompletedExceptionally) throw state.exception + if (state is CompletedExceptionally) throw state.cause return state } @@ -1251,7 +1266,7 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0 val state = this.state check(state !is Incomplete) if (state is CompletedExceptionally) - cont.resumeWithException(state.exception) + cont.resumeWithException(state.cause) else cont.resume(state) }) @@ -1270,7 +1285,7 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0 // already complete -- select result if (select.trySelect(null)) { if (state is CompletedExceptionally) - select.resumeSelectCancellableWithException(state.exception) + select.resumeSelectCancellableWithException(state.cause) else block.startCoroutineUndispatched(state as T, select.completion) } @@ -1292,7 +1307,7 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0 val state = this.state // Note: await is non-atomic (can be cancelled while dispatched) if (state is CompletedExceptionally) - select.resumeSelectCancellableWithException(state.exception) + select.resumeSelectCancellableWithException(state.cause) else block.startCoroutineCancellable(state as T, select.completion) } diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt index e06323d105..4c1ce77410 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt @@ -102,7 +102,7 @@ private open class TimeoutCoroutine( @Suppress("UNCHECKED_CAST") internal override fun onCompletionInternal(state: Any?, mode: Int) { if (state is CompletedExceptionally) - cont.resumeWithExceptionMode(state.exception, mode) + cont.resumeWithExceptionMode(state.cause, mode) else cont.resumeMode(state as T, mode) } @@ -171,7 +171,7 @@ private class TimeoutOrNullCoroutine( @Suppress("UNCHECKED_CAST") internal override fun onCompletionInternal(state: Any?, mode: Int) { if (state is CompletedExceptionally) { - val exception = state.exception + val exception = state.cause if (exception is TimeoutCancellationException && exception.coroutine === this) cont.resumeMode(null, mode) else cont.resumeWithExceptionMode(exception, mode) diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt index fc86b0aa7a..3c32e145dd 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt @@ -123,7 +123,8 @@ private class ProducerCoroutine( override fun onCancellationInternal(exceptionally: CompletedExceptionally?) { val cause = exceptionally?.cause val processed = when (exceptionally) { - is Cancelled -> _channel.cancel(cause) // producer coroutine was cancelled -- cancel channel + // producer coroutine was cancelled -- cancel channel, but without cause if it was closed without cause + is Cancelled -> _channel.cancel(if (cause is CancellationException) null else cause) else -> _channel.close(cause) // producer coroutine has completed -- close channel } if (!processed && cause != null) diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/intrinsics/Undispatched.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/intrinsics/Undispatched.kt index fc8783a86c..8178d8733e 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/intrinsics/Undispatched.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/intrinsics/Undispatched.kt @@ -85,7 +85,7 @@ private inline fun AbstractCoroutine.undispatchedResult(startBlock: () -> return when { result === COROUTINE_SUSPENDED -> COROUTINE_SUSPENDED makeCompletingOnce(result, MODE_IGNORE) -> { - if (result is CompletedExceptionally) throw result.exception else result + if (result is CompletedExceptionally) throw result.cause else result } else -> COROUTINE_SUSPENDED } diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/AwaitTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/AwaitTest.kt new file mode 100644 index 0000000000..a63a825301 --- /dev/null +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/AwaitTest.kt @@ -0,0 +1,371 @@ +/* + * Copyright 2016-2017 JetBrains s.r.o. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kotlinx.coroutines.experimental + +import kotlin.coroutines.experimental.coroutineContext +import kotlin.test.* + +class AwaitTest : TestBase() { + + @Test + fun testAwaitAll() = runTest { + expect(1) + val d = async(coroutineContext) { + expect(3) + "OK" + } + + val d2 = async(coroutineContext) { + yield() + expect(4) + 1L + } + + expect(2) + require(d2.isActive && !d2.isCompleted) + + assertEquals(listOf("OK", 1L), awaitAll(d, d2)) + expect(5) + + require(d.isCompleted && d2.isCompleted) + require(!d.isCancelled && !d2.isCancelled) + finish(6) + } + + @Test + fun testAwaitAllLazy() = runTest { + expect(1) + val d = async( + coroutineContext, + start = CoroutineStart.LAZY + ) { expect(2); 1 } + val d2 = async( + coroutineContext, + start = CoroutineStart.LAZY + ) { expect(3); 2 } + assertEquals(listOf(1, 2), awaitAll(d, d2)) + finish(4) + } + + @Test + fun testAwaitAllTyped() = runTest { + val d1 = async(coroutineContext) { 1L } + val d2 = async(coroutineContext) { "" } + val d3 = async(coroutineContext) { } + + assertEquals(listOf(1L, ""), listOf(d1, d2).awaitAll()) + assertEquals(listOf(1L, Unit), listOf(d1, d3).awaitAll()) + assertEquals(listOf("", Unit), listOf(d2, d3).awaitAll()) + } + + @Test + fun testAwaitAllExceptionally() = runTest { + expect(1) + val d = async(coroutineContext) { + expect(3) + "OK" + } + + val d2 = async(coroutineContext) { + yield() + throw TestException() + } + + val d3 = async(coroutineContext) { + expect(4) + delay(Long.MAX_VALUE) + 1 + } + + expect(2) + try { + awaitAll(d, d2, d3) + } catch (e: TestException) { + expect(5) + } + + yield() + require(d.isCompleted && d2.isCompletedExceptionally && d3.isActive) + d3.cancel() + finish(6) + } + + @Test + fun testAwaitAllMultipleExceptions() = runTest { + val d = async(coroutineContext) { + expect(2) + throw TestException() + } + + val d2 = async(coroutineContext) { + yield() + throw TestException() + } + + val d3 = async(coroutineContext) { + yield() + } + + expect(1) + try { + awaitAll(d, d2, d3) + } catch (e: TestException) { + expect(3) + } + + finish(4) + } + + @Test + fun testAwaitAllCancellation() = runTest { + val outer = async(coroutineContext) { + + expect(1) + val inner = async(coroutineContext) { + expect(4) + delay(Long.MAX_VALUE) + } + + expect(2) + awaitAll(inner) + expectUnreached() + } + + yield() + expect(3) + yield() + require(outer.isActive) + outer.cancel() + require(outer.isCancelled) + finish(5) + } + + @Test + fun testAwaitAllPartiallyCompleted() = runTest { + val d1 = async(coroutineContext) { expect(1); 1 } + d1.await() + val d2 = async(coroutineContext) { expect(3); 2 } + expect(2) + assertEquals(listOf(1, 2), awaitAll(d1, d2)) + require(d1.isCompleted && d2.isCompleted) + finish(4) + } + + @Test + fun testAwaitAllPartiallyCompletedExceptionally() = runTest { + val d1 = async(coroutineContext) { + expect(1) + throw TestException() + } + + yield() + + // This job is called after exception propagation + val d2 = async(coroutineContext) { expect(4) } + + expect(2) + try { + awaitAll(d1, d2) + expectUnreached() + } catch (e: TestException) { + expect(3) + } + + require(d2.isActive) + d2.await() + require(d1.isCompleted && d2.isCompleted) + finish(5) + } + + @Test + fun testAwaitAllFullyCompleted() = runTest { + val d1 = CompletableDeferred(Unit) + val d2 = CompletableDeferred(Unit) + val job = async(coroutineContext) { expect(3) } + expect(1) + awaitAll(d1, d2) + expect(2) + job.await() + finish(4) + } + + @Test + fun testAwaitOnSet() = runTest { + val d1 = CompletableDeferred(Unit) + val d2 = CompletableDeferred(Unit) + val job = async(coroutineContext) { expect(2) } + expect(1) + listOf(d1, d2, job).awaitAll() + finish(3) + } + + @Test + fun testAwaitAllFullyCompletedExceptionally() = runTest { + val d1 = CompletableDeferred(parent = null) + .apply { completeExceptionally(TestException()) } + val d2 = CompletableDeferred(parent = null) + .apply { completeExceptionally(TestException()) } + val job = async(coroutineContext) { expect(3) } + expect(1) + try { + awaitAll(d1, d2) + } catch (e: TestException) { + expect(2) + } + + job.await() + finish(4) + } + + @Test + fun testAwaitAllSameJobMultipleTimes() = runTest { + val d = async(coroutineContext) { "OK" } + // Duplicates are allowed though kdoc doesn't guarantee that + assertEquals(listOf("OK", "OK", "OK"), awaitAll(d, d, d)) + } + + @Test + fun testAwaitAllSameThrowingJobMultipleTimes() = runTest { + val d1 = + async(coroutineContext) { throw TestException() } + val d2 = async(coroutineContext) { } // do nothing + + try { + expect(1) + // Duplicates are allowed though kdoc doesn't guarantee that + awaitAll(d1, d2, d1, d2) + expectUnreached() + } catch (e: TestException) { + finish(2) + } + } + + @Test + fun testAwaitAllEmpty() = runTest { + expect(1) + assertEquals(emptyList(), awaitAll()) + assertEquals(emptyList(), emptyList>().awaitAll()) + finish(2) + } + + // joinAll + + @Test + fun testJoinAll() = runTest { + val d1 = launch(coroutineContext) { expect(2) } + val d2 = async(coroutineContext) { + expect(3) + "OK" + } + val d3 = launch(coroutineContext) { expect(4) } + + expect(1) + joinAll(d1, d2, d3) + finish(5) + } + + @Test + fun testJoinAllLazy() = runTest { + expect(1) + val d = async( + coroutineContext, + start = CoroutineStart.LAZY + ) { expect(2) } + val d2 = launch( + coroutineContext, + start = CoroutineStart.LAZY + ) { expect(3) } + joinAll(d, d2) + finish(4) + } + + @Test + fun testJoinAllExceptionally() = runTest { + val d1 = launch(coroutineContext) { + expect(2) + } + val d2 = async(coroutineContext) { + expect(3) + throw TestException() + } + val d3 = async(coroutineContext) { + expect(4) + } + + expect(1) + joinAll(d1, d2, d3) + finish(5) + } + + @Test + fun testJoinAllCancellation() = runTest { + val outer = launch(coroutineContext) { + expect(2) + val inner = launch(coroutineContext) { + expect(3) + delay(Long.MAX_VALUE) + } + + joinAll(inner) + expectUnreached() + } + + expect(1) + yield() + require(outer.isActive) + yield() + outer.cancel() + outer.join() + finish(4) + } + + @Test + fun testJoinAllAlreadyCompleted() = runTest { + val job = launch(coroutineContext) { + expect(1) + } + + job.join() + expect(2) + + joinAll(job) + finish(3) + } + + @Test + fun testJoinAllEmpty() = runTest { + expect(1) + joinAll() + listOf().joinAll() + finish(2) + } + + @Test + fun testJoinAllSameJob() = runTest { + val job = launch(coroutineContext) { } + joinAll(job, job, job) + } + + @Test + fun testJoinAllSameJobExceptionally() = runTest { + val job = + async(coroutineContext) { throw TestException() } + joinAll(job, job, job) + } + + private class TestException : Exception() +} diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/DelayTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/DelayTest.kt new file mode 100644 index 0000000000..9904d3d12c --- /dev/null +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/DelayTest.kt @@ -0,0 +1,61 @@ + +@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913 + +package kotlinx.coroutines.experimental + +import kotlinx.coroutines.experimental.timeunit.* +import kotlin.coroutines.experimental.* +import kotlin.test.* + +class DelayTest : TestBase() { + + @Test + fun testCancellation() = runTest(expected = {it is JobCancellationException}) { + runAndCancel(3600, TimeUnit.SECONDS) + } + + @Test + fun testMaxLongValue()= runTest(expected = {it is JobCancellationException}) { + runAndCancel(Long.MAX_VALUE) + } + + @Test + fun testMaxIntValue()= runTest(expected = {it is JobCancellationException}) { + runAndCancel(Int.MAX_VALUE.toLong()) + } + + @Test + fun testOverflowOnUnitConversion()= runTest(expected = {it is JobCancellationException}) { + runAndCancel(Long.MAX_VALUE, TimeUnit.SECONDS) + } + + @Test + fun testRegularDelay() = runTest { + val deferred = async(coroutineContext) { + expect(2) + delay(1) + expect(3) + } + + expect(1) + yield() + deferred.await() + finish(4) + } + + private suspend fun runAndCancel(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS) { + expect(1) + val deferred = async(coroutineContext) { + expect(2) + delay(time, unit) + expectUnreached() + } + + yield() + expect(3) + require(deferred.isActive) + deferred.cancel() + finish(4) + deferred.await() + } +} diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt index 84da40fe43..b70a7a09cc 100644 --- a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt @@ -21,7 +21,6 @@ import kotlin.coroutines.experimental.* import kotlin.test.* class ArrayBroadcastChannelTest : TestBase() { - @Test fun testBasic() = runTest { expect(1) @@ -161,7 +160,7 @@ class ArrayBroadcastChannelTest : TestBase() { sub.consumeEach { check(it == ++expected) if (it == 2) { - sub.close() + sub.cancel() } } check(expected == 2) @@ -172,8 +171,18 @@ class ArrayBroadcastChannelTest : TestBase() { val channel = BroadcastChannel(1) val sub = channel.openSubscription() assertFalse(sub.isClosedForReceive) - sub.close() + sub.cancel() assertTrue(sub.isClosedForReceive) sub.receive() } + + @Test + fun testCancelWithCause() = runTest({ it is TestException }) { + val channel = BroadcastChannel(1) + val subscription = channel.openSubscription() + subscription.cancel(TestException()) + subscription.receiveOrNull() + } + + private class TestException : Exception() } diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt index 61fdaefd83..170e579625 100644 --- a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt @@ -21,7 +21,6 @@ import kotlin.coroutines.experimental.* import kotlin.test.* class ArrayChannelTest : TestBase() { - @Test fun testSimple() = runTest { val q = ArrayChannel(1) @@ -151,4 +150,13 @@ class ArrayChannelTest : TestBase() { check(q.receiveOrNull() == null) finish(12) } + + @Test + fun testCancelWithCause() = runTest({ it is TestException }) { + val channel = ArrayChannel(5) + channel.cancel(TestException()) + channel.receiveOrNull() + } + + private class TestException : Exception() } diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt index 1fd7413495..2b8775b448 100644 --- a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt @@ -21,7 +21,6 @@ import kotlin.coroutines.experimental.* import kotlin.test.* class ConflatedChannelTest : TestBase() { - @Test fun testBasicConflationOfferPoll() { val q = ConflatedChannel() @@ -90,4 +89,13 @@ class ConflatedChannelTest : TestBase() { check(q.receiveOrNull() == null) finish(2) } -} \ No newline at end of file + + @Test + fun testCancelWithCause() = runTest({ it is TestException }) { + val channel = ConflatedChannel() + channel.cancel(TestException()) + channel.receiveOrNull() + } + + private class TestException : Exception() +} diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt index 897801e378..2bf376f20a 100644 --- a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt @@ -16,11 +16,10 @@ package kotlinx.coroutines.experimental.channels -import kotlinx.coroutines.experimental.TestBase +import kotlinx.coroutines.experimental.* import kotlin.test.* class LinkedListChannelTest : TestBase() { - @Test fun testBasic() = runTest { val c = LinkedListChannel() @@ -46,4 +45,13 @@ class LinkedListChannelTest : TestBase() { check(q.isClosedForReceive) check(q.receiveOrNull() == null) } + + @Test + fun testCancelWithCause() = runTest({ it is TestException }) { + val channel = LinkedListChannel() + channel.cancel(TestException()) + channel.receiveOrNull() + } + + private class TestException : Exception() } diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt index 522f6d6f6b..41ba67851d 100644 --- a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt @@ -21,7 +21,6 @@ import kotlin.coroutines.experimental.* import kotlin.test.* class ProduceTest : TestBase() { - @Test fun testBasic() = runTest { val c = produce(coroutineContext) { @@ -41,7 +40,7 @@ class ProduceTest : TestBase() { } @Test - fun testCancel() = runTest { + fun testCancelWithoutCause() = runTest { val c = produce(coroutineContext) { expect(2) send(1) @@ -60,7 +59,39 @@ class ProduceTest : TestBase() { expect(4) c.cancel() expect(5) - check(c.receiveOrNull() == null) + assertNull(c.receiveOrNull()) expect(6) } + + @Test + fun testCancelWithCause() = runTest { + val c = produce(coroutineContext) { + expect(2) + send(1) + expect(3) + try { + send(2) // will get cancelled + } catch (e: Exception) { + finish(6) + check(e is JobCancellationException && e.job == coroutineContext[Job]) + check(e.cause is TestException) + throw e + } + expectUnreached() + } + + expect(1) + check(c.receive() == 1) + expect(4) + c.cancel(TestException()) + + try { + assertNull(c.receiveOrNull()) + expectUnreached() + } catch (e: TestException) { + expect(5) + } + } + + private class TestException : Exception() } diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt index 6e1b2c347a..2b1b98793a 100644 --- a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt @@ -21,7 +21,6 @@ import kotlin.coroutines.experimental.* import kotlin.test.* class RendezvousChannelTest : TestBase() { - @Test fun testSimple() = runTest { val q = RendezvousChannel() @@ -289,4 +288,13 @@ class RendezvousChannelTest : TestBase() { check(q.receiveOrNull() == null) finish(12) } + + @Test + fun testCancelWithCause() = runTest({ it is TestException }) { + val channel = RendezvousChannel() + channel.cancel(TestException()) + channel.receiveOrNull() + } + + private class TestException : Exception() } diff --git a/core/kotlinx-coroutines-core/README.md b/core/kotlinx-coroutines-core/README.md index 1e1c7226a8..c32be5029d 100644 --- a/core/kotlinx-coroutines-core/README.md +++ b/core/kotlinx-coroutines-core/README.md @@ -39,13 +39,19 @@ Synchronization primitives for coroutines: Top-level suspending functions: -| **Name** | **Description** -| ------------------- | --------------- -| [delay] | Non-blocking sleep -| [yield] | Yields thread in single-threaded dispatchers -| [withContext] | Switches to a different context -| [withTimeout] | Set execution time-limit with exception on timeout -| [withTimeoutOrNull] | Set execution time-limit will null result on timeout +| **Name** | **Description** +| ------------------- | --------------- +| [delay] | Non-blocking sleep +| [yield] | Yields thread in single-threaded dispatchers +| [withContext] | Switches to a different context +| [withTimeout] | Set execution time-limit with exception on timeout +| [withTimeoutOrNull] | Set execution time-limit will null result on timeout +| [awaitAll] | Awaits for successful completion of all given jobs or exceptional completion of any +| [joinAll] | Joins on all given jobs + +Cancellation support for user-defined suspending functions is available with [suspendCancellableCoroutine] +helper function. [NonCancellable] job object is provided to suppress cancellation with +`run(NonCancellable) {...}` block of code. [Select][kotlinx.coroutines.experimental.selects.select] expression waits for the result of multiple suspending functions simultaneously: @@ -59,10 +65,6 @@ Top-level suspending functions: | [Mutex][kotlinx.coroutines.experimental.sync.Mutex] | [lock][kotlinx.coroutines.experimental.sync.Mutex.lock] | [onLock][kotlinx.coroutines.experimental.sync.Mutex.onLock] | [tryLock][kotlinx.coroutines.experimental.sync.Mutex.tryLock] | none | [delay] | [onTimeout][kotlinx.coroutines.experimental.selects.SelectBuilder.onTimeout] | none -Cancellation support for user-defined suspending functions is available with [suspendCancellableCoroutine] -helper function. [NonCancellable] job object is provided to suppress cancellation with -`run(NonCancellable) {...}` block of code. - This module provides debugging facilities for coroutines (run JVM with `-ea` or `-Dkotlinx.coroutines.debug` options) and [newCoroutineContext] function to write user-defined coroutine builders that work with these debugging facilities. @@ -113,12 +115,14 @@ Optional time unit support for multiplatform projects. [withContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-context.html [withTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout.html [withTimeoutOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout-or-null.html +[awaitAll]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/await-all.html +[joinAll]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/join-all.html +[suspendCancellableCoroutine]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/suspend-cancellable-coroutine.html [Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/join.html [Job.onJoin]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/on-join.html [Job.isCompleted]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/is-completed.html [Deferred.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/await.html [Deferred.onAwait]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/on-await.html -[suspendCancellableCoroutine]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/suspend-cancellable-coroutine.html [newCoroutineContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/new-coroutine-context.html [kotlinx.coroutines.experimental.sync.Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/index.html diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt index 7d20709f4e..d4ef498589 100644 --- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt +++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt @@ -94,7 +94,7 @@ private class BlockingCoroutine( timeSource.unregisterTimeLoopThread() // now return result val state = this.state - (state as? CompletedExceptionally)?.let { throw it.exception } + (state as? CompletedExceptionally)?.let { throw it.cause } return state as T } } diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt index 820d87a5da..d8eeedfb26 100644 --- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt +++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt @@ -41,7 +41,7 @@ public const val DEBUG_PROPERTY_VALUE_ON = "on" */ public const val DEBUG_PROPERTY_VALUE_OFF = "off" -private val DEBUG = run { +internal val DEBUG = run { val value = try { System.getProperty(DEBUG_PROPERTY_NAME) } catch (e: SecurityException) { null } when (value) { diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt index 6d6685beff..2cb2433b4d 100644 --- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt +++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt @@ -18,9 +18,10 @@ package kotlinx.coroutines.experimental import kotlinx.atomicfu.* import kotlinx.coroutines.experimental.internal.* -import kotlinx.coroutines.experimental.timeunit.TimeUnit +import kotlinx.coroutines.experimental.timeunit.* import java.util.concurrent.locks.* import kotlin.coroutines.experimental.* +import kotlin.jvm.* /** * Implemented by [CoroutineDispatcher] implementations that have event loop inside and can @@ -303,6 +304,12 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop { time: Long, timeUnit: TimeUnit, private val cont: CancellableContinuation ) : DelayedTask(time, timeUnit) { + + init { + // Note that this operation isn't lock-free, but very short + cont.disposeOnCompletion(this) + } + override fun run() { with(cont) { resumeUndispatched(Unit) } } diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.kt index 7d7c532af1..462d816d78 100644 --- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.kt +++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.kt @@ -16,8 +16,6 @@ package kotlinx.coroutines.experimental -import java.util.concurrent.* - /** * This exception gets thrown if an exception is caught while processing [CompletionHandler] invocation for [Job]. */ @@ -42,12 +40,26 @@ public actual typealias CancellationException = java.util.concurrent.Cancellatio public actual class JobCancellationException public actual constructor( message: String, cause: Throwable?, - /** - * The job that was cancelled. - */ - public actual val job: Job + @JvmField internal actual val job: Job ) : CancellationException(message) { - init { if (cause != null) initCause(cause) } + + init { + if (cause != null) initCause(cause) + } + + override fun fillInStackTrace(): Throwable { + if (DEBUG) { + return super.fillInStackTrace() + } + + /* + * In non-debug mode we don't want to have a stacktrace on every cancellation/close, + * parent job reference is enough. Stacktrace of JCE is not needed most of the time (e.g., it is not logged) + * and hurts performance. + */ + return this + } + override fun toString(): String = "${super.toString()}; job=$job" override fun equals(other: Any?): Boolean = other === this || diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/AwaitStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/AwaitStressTest.kt new file mode 100644 index 0000000000..d5be824e4e --- /dev/null +++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/AwaitStressTest.kt @@ -0,0 +1,136 @@ +package kotlinx.coroutines.experimental + +import org.junit.* +import org.junit.Test +import java.util.concurrent.* + +class AwaitStressTest : TestBase() { + + private class TestException : Exception() { + override fun fillInStackTrace(): Throwable = this + } + + private val iterations = 50_000 * stressTestMultiplier + private val pool = newFixedThreadPoolContext(4, "AwaitStressTest") + + @After + fun tearDown() { + pool.close() + } + + @Test + fun testMultipleExceptions() = runTest { + + repeat(iterations) { + val barrier = CyclicBarrier(4) + + val d1 = async(pool) { + barrier.await() + throw TestException() + } + + val d2 = async(pool) { + barrier.await() + throw TestException() + } + + val d3 = async(pool) { + barrier.await() + 1L + } + + try { + barrier.await() + awaitAll(d1, d2, d3) + expectUnreached() + } catch (e: TestException) { + // Expected behaviour + } + + barrier.reset() + } + } + + @Test + fun testAwaitAll() = runTest { + val barrier = CyclicBarrier(3) + + repeat(iterations) { + val d1 = async(pool) { + barrier.await() + 1L + } + + val d2 = async(pool) { + barrier.await() + 2L + } + + barrier.await() + awaitAll(d1, d2) + require(d1.isCompleted && d2.isCompleted) + barrier.reset() + } + } + + @Test + fun testConcurrentCancellation() = runTest { + var cancelledOnce = false + repeat(iterations) { + val barrier = CyclicBarrier(3) + + val d1 = async(pool) { + barrier.await() + delay(10_000) + yield() + } + + val d2 = async(pool) { + barrier.await() + d1.cancel() + } + + barrier.await() + try { + awaitAll(d1, d2) + } catch (e: JobCancellationException) { + cancelledOnce = true + } + } + + require(cancelledOnce) { "Cancellation exception wasn't properly caught" } + } + + @Test + fun testMutatingCollection() = runTest { + val barrier = CyclicBarrier(4) + + repeat(iterations) { + // thread-safe collection that we are going to modify + val deferreds = CopyOnWriteArrayList>() + + deferreds += async(pool) { + barrier.await() + 1L + } + + deferreds += async(pool) { + barrier.await() + 2L + } + + deferreds += async(pool) { + barrier.await() + deferreds.removeAt(2) + 3L + } + + val allJobs = ArrayList(deferreds) + barrier.await() + val results = deferreds.awaitAll() // shouldn't hang + check(results == listOf(1L, 2L, 3L) || results == listOf(1L, 2L)) + allJobs.awaitAll() + barrier.reset() + } + } +} diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/DefaultExecutorStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/DefaultExecutorStressTest.kt new file mode 100644 index 0000000000..0291866175 --- /dev/null +++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/DefaultExecutorStressTest.kt @@ -0,0 +1,37 @@ +package kotlinx.coroutines.experimental + +import org.junit.* +import java.util.concurrent.* +import kotlin.coroutines.experimental.* + +class DefaultExecutorStressTest : TestBase() { + + @Test + fun testDelay() = runTest { + val iterations = 100_000 * stressTestMultiplier + + val ctx = DefaultExecutor + coroutineContext + expect(1) + var expected = 1 + repeat(iterations) { + expect(++expected) + val deferred = async(ctx) { + expect(++expected) + val largeArray = IntArray(10_000) { it } + delay(Long.MAX_VALUE, TimeUnit.NANOSECONDS) + println(largeArray) // consume to avoid DCE, actually unreachable + } + + expect(++expected) + yield() + deferred.cancel() + try { + deferred.await() + } catch (e: JobCancellationException) { + expect(++expected) + } + } + + finish(2 + iterations * 4) + } +} diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/DelayTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/DelayJvmTest.kt similarity index 98% rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/DelayTest.kt rename to core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/DelayJvmTest.kt index 764f73602f..c21d4f7298 100644 --- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/DelayTest.kt +++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/DelayJvmTest.kt @@ -23,7 +23,7 @@ import java.util.concurrent.Executor import java.util.concurrent.Executors import kotlin.coroutines.experimental.* -class DelayTest : TestBase() { +class DelayJvmTest : TestBase() { /** * Test that delay works properly in contexts with custom [ContinuationInterceptor] */ diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ActorTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ActorTest.kt index 1494d2f653..268ae31409 100644 --- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ActorTest.kt +++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ActorTest.kt @@ -20,13 +20,24 @@ import kotlinx.coroutines.experimental.* import org.hamcrest.core.* import org.junit.* import org.junit.Assert.* +import org.junit.runner.* +import org.junit.runners.* +import java.io.* import kotlin.coroutines.experimental.* -class ActorTest : TestBase() { +@RunWith(Parameterized::class) +class ActorTest(private val capacity: Int) : TestBase() { + + companion object { + @Parameterized.Parameters(name = "Capacity: {0}") + @JvmStatic + fun params(): Collection> = listOf(0, 1, Channel.UNLIMITED, Channel.CONFLATED).map { arrayOf(it) } + } + @Test - fun testEmpty() = runBlocking { + fun testEmpty() = runBlocking { expect(1) - val actor = actor(coroutineContext) { + val actor = actor(coroutineContext, capacity) { expect(3) } actor as Job // type assertion @@ -42,9 +53,9 @@ class ActorTest : TestBase() { } @Test - fun testOne() = runBlocking { + fun testOne() = runBlocking { expect(1) - val actor = actor(coroutineContext) { + val actor = actor(coroutineContext, capacity) { expect(3) assertThat(receive(), IsEqual("OK")) expect(6) @@ -68,4 +79,71 @@ class ActorTest : TestBase() { assertThat(actor.isClosedForSend, IsEqual(true)) finish(7) } -} \ No newline at end of file + + @Test + fun testCloseWithoutCause() = runTest { + val actor = actor(coroutineContext, capacity) { + val element = channel.receiveOrNull() + expect(2) + assertEquals(42, element) + val next = channel.receiveOrNull() + assertNull(next) + expect(3) + } + + expect(1) + actor.send(42) + yield() + actor.close() + yield() + finish(4) + } + + @Test + fun testCloseWithCause() = runTest { + val actor = actor(coroutineContext, capacity) { + val element = channel.receiveOrNull() + expect(2) + require(element!! == 42) + try { + val next = channel.receiveOrNull() + } catch (e: IOException) { + expect(3) + } + } + + expect(1) + actor.send(42) + yield() + actor.close(IOException()) + yield() + finish(4) + } + + @Test + fun testCancelEnclosingJob() = runTest { + val job = async(coroutineContext) { + actor(coroutineContext, capacity) { + expect(1) + channel.receiveOrNull() + expectUnreached() + } + } + + yield() + yield() + + expect(2) + yield() + job.cancel() + + try { + job.await() + expectUnreached() + } catch (e: JobCancellationException) { + assertTrue(e.message?.contains("Job was cancelled normally") ?: false) + } + + finish(3) + } +} diff --git a/core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteChannelCloseTest.kt b/core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteChannelCloseTest.kt new file mode 100644 index 0000000000..91f5c8bd23 --- /dev/null +++ b/core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteChannelCloseTest.kt @@ -0,0 +1,140 @@ +package kotlinx.coroutines.experimental.io + +import kotlinx.coroutines.experimental.* +import kotlinx.coroutines.experimental.channels.ClosedReceiveChannelException +import org.junit.After +import org.junit.Test +import java.io.IOException +import kotlin.coroutines.experimental.coroutineContext + +class ByteChannelCloseTest : TestBase() { + + private val from = ByteChannel(true) + private val to = ByteChannel(true) + + @After + fun tearDown() { + from.close(CancellationException()) + to.close(CancellationException()) + } + + @Test + fun testCloseWithCause() = runBlocking { + expect(1) + + launch(coroutineContext) { + expect(2) + + try { + from.copyAndClose(to) // should suspend and then throw IOException + expectUnreached() + } catch (expected: IOException) { + expect(4) + } + } + + yield() + expect(3) + + from.close(IOException()) + yield() + + expect(5) + + try { + to.readInt() + expectUnreached() + } catch (expected: IOException) { + finish(6) + } + } + + @Test + fun testCancelWithCause() = runBlocking { + expect(1) + + launch(coroutineContext) { + expect(2) + + try { + from.copyAndClose(to) // should suspend and then throw IOException + expectUnreached() + } catch (expected: IOException) { + expect(4) + } + } + + yield() + expect(3) + + from.cancel(IOException()) + yield() + + expect(5) + + try { + to.readInt() + expectUnreached() + } catch (expected: IOException) { + finish(6) + } + } + + @Test + fun testCloseWithoutCause() = runBlocking { + expect(1) + + launch(coroutineContext) { + expect(2) + from.copyAndClose(to) + expect(4) + } + + yield() + expect(3) + + from.close() + yield() + + expect(5) + require(to.isClosedForWrite) + + try { + to.readInt() + expectUnreached() + } catch (expected: ClosedReceiveChannelException) { + finish(6) + } + } + + @Test + fun testCancelWithoutCause() = runBlocking { + expect(1) + + launch(coroutineContext) { + expect(2) + try { + from.copyAndClose(to) + expectUnreached() + } catch (e: CancellationException) { + expect(4) + } + } + + yield() + expect(3) + + from.cancel() + yield() + + expect(5) + require(to.isClosedForWrite) + + try { + to.readInt() + expectUnreached() + } catch (expected: CancellationException) { + finish(6) + } + } +} diff --git a/core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/CopyAndCloseTest.kt b/core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/CopyAndCloseTest.kt index 504a10352b..00abc7c9f4 100644 --- a/core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/CopyAndCloseTest.kt +++ b/core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/CopyAndCloseTest.kt @@ -48,39 +48,6 @@ class CopyAndCloseTest : TestBase() { finish(8) } - @Test - fun failurePropagation() = runBlocking { - expect(1) - - launch(coroutineContext) { - expect(2) - - try { - from.copyAndClose(to) // should suspend and then throw IOException - fail("Should rethrow exception") - } catch (expected: IOException) { - } - - expect(4) - } - - yield() - expect(3) - - from.close(IOException()) - yield() - - expect(5) - - try { - to.readInt() - fail("Should throw exception") - } catch (expected: IOException) { - } - - finish(6) - } - @Test fun copyLimitedTest() = runBlocking { expect(1) diff --git a/js/kotlinx-coroutines-core-js/README.md b/js/kotlinx-coroutines-core-js/README.md index 552d4e0502..4933712719 100644 --- a/js/kotlinx-coroutines-core-js/README.md +++ b/js/kotlinx-coroutines-core-js/README.md @@ -10,7 +10,6 @@ Coroutine builder functions: | [async] | [Deferred] | [CoroutineScope] | Returns a single value with the future result | [produce][kotlinx.coroutines.experimental.channels.produce] | [ReceiveChannel][kotlinx.coroutines.experimental.channels.ReceiveChannel] | [ProducerScope][kotlinx.coroutines.experimental.channels.ProducerScope] | Produces a stream of elements - Coroutine dispatchers implementing [CoroutineDispatcher]: | **Name** | **Description** @@ -32,16 +31,17 @@ Synchronization primitives for coroutines: | [Mutex][kotlinx.coroutines.experimental.sync.Mutex] | [lock][kotlinx.coroutines.experimental.sync.Mutex.lock] | Mutual exclusion | [Channel][kotlinx.coroutines.experimental.channels.Channel] | [send][kotlinx.coroutines.experimental.channels.SendChannel.send], [receive][kotlinx.coroutines.experimental.channels.ReceiveChannel.receive] | Communication channel (aka queue or exchanger) - Top-level suspending functions: -| **Name** | **Description** -| ------------------- | --------------- -| [delay] | Non-blocking sleep -| [yield] | Yields thread in single-threaded dispatchers -| [withContext] | Switches to a different context -| [withTimeout] | Set execution time-limit with exception on timeout -| [withTimeoutOrNull] | Set execution time-limit will null result on timeout +| **Name** | **Description** +| ------------------- | --------------- +| [delay] | Non-blocking sleep +| [yield] | Yields thread in single-threaded dispatchers +| [withContext] | Switches to a different context +| [withTimeout] | Set execution time-limit with exception on timeout +| [withTimeoutOrNull] | Set execution time-limit will null result on timeout +| [awaitAll] | Awaits for successful completion of all given jobs or exceptional completion of any +| [joinAll] | Joins on all given jobs Cancellation support for user-defined suspending functions is available with [suspendCancellableCoroutine] helper function. [NonCancellable] job object is provided to suppress cancellation with @@ -59,7 +59,6 @@ helper function. [NonCancellable] job object is provided to suppress cancellatio | [Mutex][kotlinx.coroutines.experimental.sync.Mutex] | [lock][kotlinx.coroutines.experimental.sync.Mutex.lock] | [onLock][kotlinx.coroutines.experimental.sync.Mutex.onLock] | [tryLock][kotlinx.coroutines.experimental.sync.Mutex.tryLock] | none | [delay] | [onTimeout][kotlinx.coroutines.experimental.selects.SelectBuilder.onTimeout] | none - # Package kotlinx.coroutines.experimental General-purpose coroutine builders, contexts, and helper functions. @@ -81,6 +80,8 @@ General-purpose coroutine builders, contexts, and helper functions. [withContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-context.html [withTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout.html [withTimeoutOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout-or-null.html +[awaitAll]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/await-all.html +[joinAll]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/join-all.html [suspendCancellableCoroutine]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/suspend-cancellable-coroutine.html [Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/join.html [Job.onJoin]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/on-join.html diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.kt index 6894ac00a5..f73467e41b 100644 --- a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.kt +++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.kt @@ -40,10 +40,7 @@ public actual open class CancellationException actual constructor(message: Strin public actual class JobCancellationException public actual constructor( message: String, public override val cause: Throwable?, - /** - * The job that was cancelled. - */ - public actual val job: Job + internal actual val job: Job ) : CancellationException(message.withCause(cause)) { override fun toString(): String = "${super.toString()}; job=$job" override fun equals(other: Any?): Boolean = diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/JSDispatcher.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/JSDispatcher.kt index 10e072f975..080ac466a5 100644 --- a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/JSDispatcher.kt +++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/JSDispatcher.kt @@ -26,7 +26,9 @@ internal class NodeDispatcher : CoroutineDispatcher(), Delay { } override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation) { - setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, time.toIntMillis(unit)) + val handle = setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, time.toIntMillis(unit)) + // Actually on cancellation, but clearTimeout is idempotent + continuation.invokeOnCompletion { clearTimeout(handle) } } override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle { diff --git a/knit/src/Knit.kt b/knit/src/Knit.kt index 566c80eea6..275fc286dd 100644 --- a/knit/src/Knit.kt +++ b/knit/src/Knit.kt @@ -410,8 +410,8 @@ fun writeLines(file: File, lines: List) { fun findModuleRootDir(name: String): String = moduleRoots - .map { it + "/" + name } - .firstOrNull { File(it + "/" + moduleMarker).exists() } + .map { "$it/$name" } + .firstOrNull { File("$it/$moduleMarker").exists() } ?: throw IllegalArgumentException("Module $name is not found in any of the module root dirs") data class ApiIndexKey( @@ -419,26 +419,21 @@ data class ApiIndexKey( val pkg: String ) -val apiIndexCache: MutableMap> = HashMap() +val apiIndexCache: MutableMap>> = HashMap() val REF_LINE_REGEX = Regex("([a-zA-z.]+)") val INDEX_HTML = "/index.html" val INDEX_MD = "/index.md" val FUNCTIONS_SECTION_HEADER = "### Functions" -val AMBIGUOUS = "#AMBIGUOUS: " - -fun HashMap.putUnambiguous(key: String, value: String) { +fun HashMap>.putUnambiguous(key: String, value: String) { val oldValue = this[key] - val putVal = - if (oldValue != null && oldValue != value) { - when { - oldValue.contains("[$value]") -> oldValue - oldValue.startsWith(AMBIGUOUS) -> "$oldValue; [$value]" - else -> "$AMBIGUOUS[$oldValue]; [$value]" - } - } else value - put(key, putVal) + if (oldValue != null) { + oldValue.add(value) + put(key, oldValue) + } else { + put(key, mutableListOf(value)) + } } fun loadApiIndex( @@ -446,10 +441,10 @@ fun loadApiIndex( path: String, pkg: String, namePrefix: String = "" -): Map? { +): Map>? { val fileName = docsRoot + "/" + path + INDEX_MD val visited = mutableSetOf() - val map = HashMap() + val map = HashMap>() var inFunctionsSection = false File(fileName).withLineNumberReader(::LineNumberReader) { while (true) { @@ -499,11 +494,12 @@ fun processApiIndex( while (it.hasNext()) { val refName = it.next() val refLink = map[refName] ?: continue - if (refLink.startsWith(AMBIGUOUS)) { - println("WARNING: Ambiguous reference to [$refName]: ${refLink.substring(AMBIGUOUS.length)}") - continue + if (refLink.size > 1) { + println("INFO: Ambiguous reference to [$refName]: $refLink, taking the shortest one") } - indexList += "[$refName]: $siteRoot/$refLink" + + val link = refLink.minBy { it.length } + indexList += "[$refName]: $siteRoot/$link" it.remove() } return indexList diff --git a/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Publish.kt index 63fa447137..ba53496047 100644 --- a/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Publish.kt +++ b/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Publish.kt @@ -167,7 +167,7 @@ private class PublisherCoroutine( _nRequested.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed) val cause = getCompletionCause() try { - if (cause != null) + if (cause != null && cause !is CancellationException) subscriber.onError(cause) else subscriber.onComplete() diff --git a/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxObservable.kt b/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxObservable.kt index e482de2330..f1fd15a407 100644 --- a/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxObservable.kt @@ -168,7 +168,7 @@ private class RxObservableCoroutine( _nRequested.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed) val cause = getCompletionCause() try { - if (cause != null) + if (cause != null && cause !is CancellationException) subscriber.onError(cause) else subscriber.onCompleted() diff --git a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxObservable.kt index 02514957a5..55f1e36115 100644 --- a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxObservable.kt @@ -158,7 +158,7 @@ private class RxObservableCoroutine( _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed) val cause = getCompletionCause() try { - if (cause != null) + if (cause != null && cause !is CancellationException) subscriber.onError(cause) else subscriber.onComplete()