diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index 8ff82c508c..ab51a2dd72 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -381,7 +381,6 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin public fun fold (Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)Ljava/lang/Object; public fun get (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext$Element; public final fun getCancellationException ()Ljava/util/concurrent/CancellationException; - protected fun getCancelsParent ()Z public fun getChildJobCancellationCause ()Ljava/util/concurrent/CancellationException; public final fun getChildren ()Lkotlin/sequences/Sequence; protected final fun getCompletionCause ()Ljava/lang/Throwable; @@ -396,6 +395,7 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin public final fun isCancelled ()Z public final fun isCompleted ()Z public final fun isCompletedExceptionally ()Z + protected fun isScopedCoroutine ()Z public final fun join (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun minusKey (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext; protected fun onCancelling (Ljava/lang/Throwable;)V diff --git a/kotlinx-coroutines-core/common/src/Exceptions.common.kt b/kotlinx-coroutines-core/common/src/Exceptions.common.kt index e8c2f5e1db..62b6ba4d51 100644 --- a/kotlinx-coroutines-core/common/src/Exceptions.common.kt +++ b/kotlinx-coroutines-core/common/src/Exceptions.common.kt @@ -23,7 +23,7 @@ internal expect class JobCancellationException( internal val job: Job } -internal expect class CoroutinesInternalError(message: String, cause: Throwable) : Error +internal class CoroutinesInternalError(message: String, cause: Throwable) : Error(message, cause) internal expect fun Throwable.addSuppressedThrowable(other: Throwable) // For use in tests diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index 4963c37bed..d8b6b92d62 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -319,6 +319,31 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren cancelParent(cause) // tentative cancellation -- does not matter if there is no parent } + /** + * The method that is invoked when the job is cancelled to possibly propagate cancellation to the parent. + * Returns `true` if the parent is responsible for handling the exception, `false` otherwise. + * + * Invariant: never returns `false` for instances of [CancellationException], otherwise such exception + * may leak to the [CoroutineExceptionHandler]. + */ + private fun cancelParent(cause: Throwable): Boolean { + /* CancellationException is considered "normal" and parent usually is not cancelled when child produces it. + * This allow parent to cancel its children (normally) without being cancelled itself, unless + * child crashes and produce some other exception during its completion. + */ + val isCancellation = cause is CancellationException + val parent = parentHandle + // No parent -- ignore CE, report other exceptions. + if (parent === null || parent === NonDisposableHandle) { + return isCancellation + } + + // Is scoped coroutine -- don't propagate, will be rethrown + if (isScopedCoroutine) return isCancellation + // Notify parent but don't forget to check cancellation + return parent.childCancelled(cause) || isCancellation + } + private fun NodeList.notifyCompletion(cause: Throwable?) = notifyHandlers>(this, cause) @@ -594,21 +619,29 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren cancelImpl(parentJob) } - // Child was cancelled with cause - // It is overridden in supervisor implementations to ignore child cancellation - public open fun childCancelled(cause: Throwable): Boolean = - cancelImpl(cause) && handlesException + /** + * Child was cancelled with a cause. + * In this method parent decides whether it cancels itself (e.g. on a critical failure) and whether it handles the exception of the child. + * It is overridden in supervisor implementations to completely ignore any child cancellation. + * Returns `true` if exception is handled, `false` otherwise (then caller is responsible for handling an exception) + * + * Invariant: never returns `false` for instances of [CancellationException], otherwise such exception + * may leak to the [CoroutineExceptionHandler]. + */ + public open fun childCancelled(cause: Throwable): Boolean { + if (cause is CancellationException) return true + return cancelImpl(cause) && handlesException + } /** * Makes this [Job] cancelled with a specified [cause]. * It is used in [AbstractCoroutine]-derived classes when there is an internal failure. */ - public fun cancelCoroutine(cause: Throwable?) = - cancelImpl(cause) + public fun cancelCoroutine(cause: Throwable?) = cancelImpl(cause) // cause is Throwable or ParentJob when cancelChild was invoked // returns true is exception was handled, false otherwise - private fun cancelImpl(cause: Any?): Boolean { + internal fun cancelImpl(cause: Any?): Boolean { if (onCancelComplete) { // make sure it is completing, if cancelMakeCompleting returns true it means it had make it // completing and had recorded exception @@ -912,14 +945,12 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren protected open fun onCancelling(cause: Throwable?) {} /** - * When this function returns `true` the parent is cancelled on cancellation of this job. - * Note that [CancellationException] is considered "normal" and parent is not cancelled when child produces it. - * This allows parent to cancel its children (normally) without being cancelled itself, unless - * child crashes and produce some other exception during its completion. - * - * @suppress **This is unstable API and it is subject to change.* + * Returns `true` for scoped coroutines. + * Scoped coroutine is a coroutine that is executed sequentially within the enclosing scope without any concurrency. + * Scoped coroutines always handle any exception happened within -- they just rethrow it to the enclosing scope. + * Examples of scoped coroutines are `coroutineScope`, `withTimeout` and `runBlocking`. */ - protected open val cancelsParent: Boolean get() = true + protected open val isScopedCoroutine: Boolean get() = false /** * Returns `true` for jobs that handle their exceptions or integrate them into the job's result via [onCompletionInternal]. @@ -939,20 +970,9 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren * * This method is invoked **exactly once** when the final exception of the job is determined * and before it becomes complete. At the moment of invocation the job and all its children are complete. - * - * @suppress **This is unstable API and it is subject to change.* */ protected open fun handleJobException(exception: Throwable): Boolean = false - private fun cancelParent(cause: Throwable): Boolean { - // CancellationException is considered "normal" and parent is not cancelled when child produces it. - // This allow parent to cancel its children (normally) without being cancelled itself, unless - // child crashes and produce some other exception during its completion. - if (cause is CancellationException) return true - if (!cancelsParent) return false - return parentHandle?.childCancelled(cause) == true - } - /** * Override for completion actions that need to update some external object depending on job's state, * right before all the waiters for coroutine's completion are notified. diff --git a/kotlinx-coroutines-core/common/src/Timeout.kt b/kotlinx-coroutines-core/common/src/Timeout.kt index 3c902db9a8..8bfaf336fe 100644 --- a/kotlinx-coroutines-core/common/src/Timeout.kt +++ b/kotlinx-coroutines-core/common/src/Timeout.kt @@ -85,9 +85,7 @@ private open class TimeoutCoroutine( override val defaultResumeMode: Int get() = MODE_DIRECT override val callerFrame: CoroutineStackFrame? get() = (uCont as? CoroutineStackFrame) override fun getStackTraceElement(): StackTraceElement? = null - - override val cancelsParent: Boolean - get() = false // it throws exception to parent instead of cancelling it + override val isScopedCoroutine: Boolean get() = true @Suppress("LeakingThis", "Deprecation") override fun run() { diff --git a/kotlinx-coroutines-core/common/src/channels/Produce.kt b/kotlinx-coroutines-core/common/src/channels/Produce.kt index d7e01aba0b..9e34773c57 100644 --- a/kotlinx-coroutines-core/common/src/channels/Produce.kt +++ b/kotlinx-coroutines-core/common/src/channels/Produce.kt @@ -126,7 +126,7 @@ public fun CoroutineScope.produce( return coroutine } -private class ProducerCoroutine( +internal open class ProducerCoroutine( parentContext: CoroutineContext, channel: Channel ) : ChannelCoroutine(parentContext, channel, active = true), ProducerScope { override val isActive: Boolean diff --git a/kotlinx-coroutines-core/common/src/flow/Builders.kt b/kotlinx-coroutines-core/common/src/flow/Builders.kt index 6147b65202..b4ff26de80 100644 --- a/kotlinx-coroutines-core/common/src/flow/Builders.kt +++ b/kotlinx-coroutines-core/common/src/flow/Builders.kt @@ -313,7 +313,7 @@ public fun channelFlow(@BuilderInference block: suspend ProducerScope.() public inline fun callbackFlow(@BuilderInference noinline block: suspend ProducerScope.() -> Unit): Flow = channelFlow(block) -// ChannelFlow implementation that is the first in the chain of flow operations and introduces (builds) a flow +// ChannelFlow implementation that is the first in the chain of flow operations and introduces (builds) a flow private class ChannelFlowBuilder( private val block: suspend ProducerScope.() -> Unit, context: CoroutineContext = EmptyCoroutineContext, diff --git a/kotlinx-coroutines-core/common/src/flow/Migration.kt b/kotlinx-coroutines-core/common/src/flow/Migration.kt index 77beb3779c..bf20d2f2a2 100644 --- a/kotlinx-coroutines-core/common/src/flow/Migration.kt +++ b/kotlinx-coroutines-core/common/src/flow/Migration.kt @@ -118,7 +118,6 @@ public fun Flow.onErrorResume(fallback: Flow): Flow = error("Should @Deprecated(message = "withContext in flow body is deprecated, use flowOn instead", level = DeprecationLevel.ERROR) public fun FlowCollector.withContext(context: CoroutineContext, block: suspend () -> R): Unit = error("Should not be called") - /** * `subscribe` is Rx-specific API that has no direct match in flows. * One can use `launch` instead, for example the following: diff --git a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt index 30005afa5d..57a0132ff5 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt @@ -63,10 +63,10 @@ internal abstract class ChannelFlow( scope.broadcast(context, produceCapacity, start, block = collectToFun) fun produceImpl(scope: CoroutineScope): ReceiveChannel = - scope.produce(context, produceCapacity, block = collectToFun) + scope.flowProduce(context, produceCapacity, block = collectToFun) override suspend fun collect(collector: FlowCollector) = - coroutineScope { // todo: flowScope + coroutineScope { val channel = produceImpl(this) channel.consumeEach { collector.emit(it) } } diff --git a/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt b/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt new file mode 100644 index 0000000000..98f5cec597 --- /dev/null +++ b/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt @@ -0,0 +1,89 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow.internal + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.internal.* +import kotlinx.coroutines.intrinsics.* +import kotlin.coroutines.* +import kotlin.coroutines.intrinsics.* +import kotlinx.coroutines.flow.unsafeFlow as flow + +/** + * Creates a [CoroutineScope] and calls the specified suspend block with this scope. + * This builder is similar to [coroutineScope] with the only exception that it *ties* lifecycle of children + * and itself regarding the cancellation, thus being cancelled when one of the children becomes cancelled. + * + * For example: + * ``` + * flowScope { + * launch { + * throw CancellationException() + * } + * } // <- CE will be rethrown here + * ``` + */ +internal suspend fun flowScope(@BuilderInference block: suspend CoroutineScope.() -> R): R = + suspendCoroutineUninterceptedOrReturn { uCont -> + val coroutine = FlowCoroutine(uCont.context, uCont) + coroutine.startUndispatchedOrReturn(coroutine, block) + } + +/** + * Creates a flow that also provides a [CoroutineScope] for each collector + * Shorthand for: + * ``` + * flow { + * flowScope { + * ... + * } + * } + * ``` + * with additional constraint on cancellation. + * To cancel child without cancelling itself, `cancel(ChildCancelledException())` should be used. + */ +internal fun scopedFlow(@BuilderInference block: suspend CoroutineScope.(FlowCollector) -> Unit): Flow = + flow { + val collector = this + flowScope { block(collector) } + } + +/* + * Shortcut for produce { flowScope {block() } } + */ +internal fun CoroutineScope.flowProduce( + context: CoroutineContext, + capacity: Int = 0, @BuilderInference block: suspend ProducerScope.() -> Unit +): ReceiveChannel { + val channel = Channel(capacity) + val newContext = newCoroutineContext(context) + val coroutine = FlowProduceCoroutine(newContext, channel) + coroutine.start(CoroutineStart.DEFAULT, coroutine, block) + return coroutine +} + +private class FlowCoroutine( + context: CoroutineContext, + uCont: Continuation +) : ScopeCoroutine(context, uCont) { + + public override fun childCancelled(cause: Throwable): Boolean { + if (cause is ChildCancelledException) return true + return cancelImpl(cause) + } +} + +private class FlowProduceCoroutine( + parentContext: CoroutineContext, + channel: Channel +) : ProducerCoroutine(parentContext, channel) { + + public override fun childCancelled(cause: Throwable): Boolean { + if (cause is ChildCancelledException) return true + return cancelImpl(cause) + } +} diff --git a/kotlinx-coroutines-core/common/src/flow/internal/AbortFlowException.common.kt b/kotlinx-coroutines-core/common/src/flow/internal/FlowExceptions.common.kt similarity index 71% rename from kotlinx-coroutines-core/common/src/flow/internal/AbortFlowException.common.kt rename to kotlinx-coroutines-core/common/src/flow/internal/FlowExceptions.common.kt index 6d5a4b4d4d..6c675b33eb 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/AbortFlowException.common.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/FlowExceptions.common.kt @@ -11,3 +11,8 @@ import kotlinx.coroutines.* * This exception should never escape outside of operator's implementation. */ internal expect class AbortFlowException() : CancellationException + +/** + * Exception used to cancel child of [scopedFlow] without cancelling the whole scope. + */ +internal expect class ChildCancelledException() : CancellationException diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt index 32e9b3f3b7..4db30440f6 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt @@ -60,34 +60,33 @@ public fun Flow.delayEach(timeMillis: Long): Flow = flow { */ public fun Flow.debounce(timeoutMillis: Long): Flow { require(timeoutMillis > 0) { "Debounce timeout should be positive" } - return flow { - coroutineScope { - val values = Channel(Channel.CONFLATED) // Actually Any, KT-30796 - // Channel is not closed deliberately as there is no close with value - val collector = async { - collect { value -> values.send(value ?: NULL) } - } + return scopedFlow { downstream -> + val values = Channel(Channel.CONFLATED) // Actually Any, KT-30796 + // Channel is not closed deliberately as there is no close with value + val collector = async { + collect { value -> values.send(value ?: NULL) } + } - var isDone = false - var lastValue: Any? = null - while (!isDone) { - select { - values.onReceive { - lastValue = it - } + var isDone = false + var lastValue: Any? = null + while (!isDone) { + select { + values.onReceive { + lastValue = it + } - lastValue?.let { value -> // set timeout when lastValue != null - onTimeout(timeoutMillis) { - lastValue = null // Consume the value - emit(NULL.unbox(value)) - } + lastValue?.let { value -> + // set timeout when lastValue != null + onTimeout(timeoutMillis) { + lastValue = null // Consume the value + downstream.emit(NULL.unbox(value)) } + } - // Close with value 'idiom' - collector.onAwait { - if (lastValue != null) emit(NULL.unbox(lastValue)) - isDone = true - } + // Close with value 'idiom' + collector.onAwait { + if (lastValue != null) downstream.emit(NULL.unbox(lastValue)) + isDone = true } } } @@ -112,32 +111,31 @@ public fun Flow.debounce(timeoutMillis: Long): Flow { */ public fun Flow.sample(periodMillis: Long): Flow { require(periodMillis > 0) { "Sample period should be positive" } - return flow { - coroutineScope { - val values = produce(capacity = Channel.CONFLATED) { // Actually Any, KT-30796 - collect { value -> send(value ?: NULL) } - } + return scopedFlow { downstream -> + val values = produce(capacity = Channel.CONFLATED) { + // Actually Any, KT-30796 + collect { value -> send(value ?: NULL) } + } - var isDone = false - var lastValue: Any? = null - val ticker = fixedPeriodTicker(periodMillis) - while (!isDone) { - select { - values.onReceiveOrNull { - if (it == null) { - ticker.cancel() - isDone = true - } else { - lastValue = it - } + var isDone = false + var lastValue: Any? = null + val ticker = fixedPeriodTicker(periodMillis) + while (!isDone) { + select { + values.onReceiveOrNull { + if (it == null) { + ticker.cancel(ChildCancelledException()) + isDone = true + } else { + lastValue = it } + } - // todo: shall be start sampling only when an element arrives or sample aways as here? - ticker.onReceive { - val value = lastValue ?: return@onReceive - lastValue = null // Consume the value - emit(NULL.unbox(value)) - } + // todo: shall be start sampling only when an element arrives or sample aways as here? + ticker.onReceive { + val value = lastValue ?: return@onReceive + lastValue = null // Consume the value + downstream.emit(NULL.unbox(value)) } } } diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt index f7a644710f..0fa6e8abd4 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt @@ -129,17 +129,16 @@ public fun Flow>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY * produces `aa bb b_last` */ @FlowPreview -public fun Flow.switchMap(transform: suspend (value: T) -> Flow): Flow = flow { - coroutineScope { - var previousFlow: Job? = null - collect { value -> - // Linearize calls to emit as alternative to the channel. Bonus points for never-overlapping channels. - previousFlow?.cancelAndJoin() - // Undispatched to have better user experience in case of synchronous flows - previousFlow = launch(start = CoroutineStart.UNDISPATCHED) { - transform(value).collect { innerValue -> - emit(innerValue) - } +public fun Flow.switchMap(transform: suspend (value: T) -> Flow): Flow = scopedFlow { downstream -> + var previousFlow: Job? = null + collect { value -> + // Linearize calls to emit as alternative to the channel. Bonus points for never-overlapping channels. + previousFlow?.cancel(ChildCancelledException()) + previousFlow?.join() + // Undispatched to have better user experience in case of synchronous flows + previousFlow = launch(start = CoroutineStart.UNDISPATCHED) { + transform(value).collect { innerValue -> + downstream.emit(innerValue) } } } @@ -175,7 +174,7 @@ private class ChannelFlowMerge( override suspend fun flowCollect(collector: FlowCollector) { // this function should not have been invoked when channel was explicitly requested check(capacity == OPTIONAL_CHANNEL) - coroutineScope { // todo: flowScope + flowScope { mergeImpl(this, collector.asConcurrentFlowCollector()) } } diff --git a/kotlinx-coroutines-core/common/src/internal/Scopes.kt b/kotlinx-coroutines-core/common/src/internal/Scopes.kt index 3361694481..9197ec83c0 100644 --- a/kotlinx-coroutines-core/common/src/internal/Scopes.kt +++ b/kotlinx-coroutines-core/common/src/internal/Scopes.kt @@ -17,13 +17,12 @@ internal open class ScopeCoroutine( ) : AbstractCoroutine(context, true), CoroutineStackFrame { final override val callerFrame: CoroutineStackFrame? get() = uCont as CoroutineStackFrame? final override fun getStackTraceElement(): StackTraceElement? = null + final override val isScopedCoroutine: Boolean get() = true + override val defaultResumeMode: Int get() = MODE_DIRECT internal val parent: Job? get() = parentContext[Job] - override val cancelsParent: Boolean - get() = false // it throws exception to parent instead of cancelling it - @Suppress("UNCHECKED_CAST") override fun afterCompletionInternal(state: Any?, mode: Int) { if (state is CompletedExceptionally) { diff --git a/kotlinx-coroutines-core/common/test/SupervisorTest.kt b/kotlinx-coroutines-core/common/test/SupervisorTest.kt index fae7091851..535073e046 100644 --- a/kotlinx-coroutines-core/common/test/SupervisorTest.kt +++ b/kotlinx-coroutines-core/common/test/SupervisorTest.kt @@ -219,4 +219,22 @@ class SupervisorTest : TestBase() { yield() // to coroutineScope finish(7) } + + @Test + fun testSupervisorJobCancellationException() = runTest { + val job = SupervisorJob() + val child = launch(job + CoroutineExceptionHandler { _, _ -> expectUnreached() }) { + expect(1) + hang { + expect(3) + } + } + + yield() + expect(2) + child.cancelAndJoin() + job.complete() + job.join() + finish(4) + } } diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt index 0ae30e80ce..a77f8fafe5 100644 --- a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt @@ -81,4 +81,63 @@ class ChannelFlowTest : TestBase() { assertFailsWith(flow) finish(4) } + + @Test + fun testMergeOneCoroutineWithCancellation() = runTest { + val flow = flowOf(1, 2, 3) + val f = flow.mergeOneCoroutine(flow).take(2) + assertEquals(listOf(1, 1), f.toList()) + } + + @Test + fun testMergeTwoCoroutinesWithCancellation() = runTest { + val flow = flowOf(1, 2, 3) + val f = flow.mergeTwoCoroutines(flow).take(2) + assertEquals(listOf(1, 1), f.toList()) + } + + private fun Flow.mergeTwoCoroutines(other: Flow): Flow = channelFlow { + launch { + collect { send(it); yield() } + } + launch { + other.collect { send(it) } + } + } + + private fun Flow.mergeOneCoroutine(other: Flow): Flow = channelFlow { + launch { + collect { send(it); yield() } + } + + other.collect { send(it); yield() } + } + + @Test + fun testBufferWithTimeout() = runTest { + fun Flow.bufferWithTimeout(): Flow = channelFlow { + expect(2) + launch { + expect(3) + hang { + expect(5) + } + } + launch { + expect(4) + collect { + withTimeout(-1) { + send(it) + } + expectUnreached() + } + expectUnreached() + } + } + + val flow = flowOf(1, 2, 3).bufferWithTimeout() + expect(1) + assertFailsWith(flow) + finish(6) + } } diff --git a/kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt index d992d06e48..a6b5340555 100644 --- a/kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt @@ -45,4 +45,3 @@ class FlowCallbackTest : TestBase() { finish(3) } } - diff --git a/kotlinx-coroutines-core/common/test/flow/internal/FlowScopeTest.kt b/kotlinx-coroutines-core/common/test/flow/internal/FlowScopeTest.kt new file mode 100644 index 0000000000..d41ab8893f --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/internal/FlowScopeTest.kt @@ -0,0 +1,77 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow.internal + +import kotlinx.coroutines.* +import kotlin.test.* + +class FlowScopeTest : TestBase() { + + @Test + fun testCancellation() = runTest { + assertFailsWith { + flowScope { + expect(1) + val child = launch { + expect(3) + hang { expect(5) } + } + expect(2) + yield() + expect(4) + child.cancel() + } + } + finish(6) + } + + @Test + fun testCancellationWithChildCancelled() = runTest { + flowScope { + expect(1) + val child = launch { + expect(3) + hang { expect(5) } + } + expect(2) + yield() + expect(4) + child.cancel(ChildCancelledException()) + } + finish(6) + } + + @Test + fun testCancellationWithSuspensionPoint() = runTest { + assertFailsWith { + flowScope { + expect(1) + val child = launch { + expect(3) + hang { expect(6) } + } + expect(2) + yield() + expect(4) + child.cancel() + hang { expect(5) } + } + } + finish(7) + } + + @Test + fun testNestedScopes() = runTest { + assertFailsWith { + flowScope { + flowScope { + launch { + throw CancellationException(null) + } + } + } + } + } +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt index bda9927c79..54244f05db 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt @@ -197,6 +197,46 @@ abstract class CombineLatestTestBase : TestBase() { assertFailsWith(flow) finish(2) } + + @Test + fun testCancellationExceptionUpstream() = runTest { + val f1 = flow { + expect(1) + emit(1) + throw CancellationException("") + } + val f2 = flow { + emit(1) + hang { expect(3) } + } + + val flow = f1.combineLatest(f2, { _, _ -> 1 }).onEach { expect(2) } + assertFailsWith(flow) + finish(4) + } + + @Test + fun testCancellationExceptionDownstream() = runTest { + val f1 = flow { + emit(1) + expect(2) + hang { expect(5) } + } + val f2 = flow { + emit(1) + expect(3) + hang { expect(6) } + } + + val flow = f1.combineLatest(f2, { _, _ -> 1 }).onEach { + expect(1) + yield() + expect(4) + throw CancellationException("") + } + assertFailsWith(flow) + finish(7) + } } class CombineLatestTest : CombineLatestTestBase() { diff --git a/kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt index 607d4cd661..2a6e9c1238 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt @@ -95,20 +95,25 @@ class DebounceTest : TestBase() { } @Test - fun testUpstreamError() = runTest { + fun testUpstreamError()= testUpstreamError(TimeoutCancellationException("")) + + @Test + fun testUpstreamErrorCancellation() = testUpstreamError(TimeoutCancellationException("")) + + private inline fun testUpstreamError(cause: T) = runTest { val latch = Channel() val flow = flow { expect(1) emit(1) expect(2) latch.receive() - throw TestException() + throw cause }.debounce(1).map { latch.send(Unit) hang { expect(3) } } - assertFailsWith(flow) + assertFailsWith(flow) finish(4) } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt index 5d007c33a7..6069ae6d2a 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt @@ -36,4 +36,41 @@ class FlatMapMergeTest : FlatMapMergeBaseTest() { consumer.cancelAndJoin() finish(3) } + + @Test + fun testCancellationExceptionDownstream() = runTest { + val flow = flow { + emit(1) + hang { expect(2) } + }.flatMapMerge { + flow { + emit(it) + expect(1) + throw CancellationException("") + } + } + + assertFailsWith(flow) + finish(3) + } + + @Test + fun testCancellationExceptionUpstream() = runTest { + val flow = flow { + expect(1) + emit(1) + expect(2) + yield() + throw CancellationException("") + }.flatMapMerge { + flow { + expect(3) + emit(it) + hang { expect(4) } + } + } + + assertFailsWith(flow) + finish(5) + } } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt index 49df21d576..4adc35415e 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt @@ -234,6 +234,33 @@ class FlowOnTest : TestBase() { finish(6) } + @Test + fun testTimeoutExceptionUpstream() = runTest { + val flow = flow { + emit(1) + yield() + withTimeout(-1) {} + emit(42) + }.flowOn(NamedDispatchers("foo")).onEach { + expect(1) + } + assertFailsWith(flow) + finish(2) + } + + @Test + fun testTimeoutExceptionDownstream() = runTest { + val flow = flow { + emit(1) + hang { expect(2) } + }.flowOn(NamedDispatchers("foo")).onEach { + expect(1) + withTimeout(-1) {} + } + assertFailsWith(flow) + finish(3) + } + private inner class Source(private val value: Int) { public var contextName: String = "unknown" diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlowWithTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlowWithTest.kt index 055f84741c..a785814206 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/FlowWithTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/FlowWithTest.kt @@ -199,4 +199,33 @@ class FlowWithTest : TestBase() { ensureActive() finish(5) } + + @Test + fun testTimeoutException() = runTest { + val flow = flow { + emit(1) + yield() + withTimeout(-1) {} + emit(42) + }.flowWith(NamedDispatchers("foo")) { + onEach { expect(1) } + } + assertFailsWith(flow) + finish(2) + } + + @Test + fun testTimeoutExceptionDownstream() = runTest { + val flow = flow { + emit(1) + hang { expect(2) } + }.flowWith(NamedDispatchers("foo")) { + onEach { + expect(1) + withTimeout(-1) {} + } + } + assertFailsWith(flow) + finish(3) + } } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt index e77b128f76..9c96352df2 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt @@ -169,20 +169,25 @@ class SampleTest : TestBase() { } @Test - fun testUpstreamError() = runTest { + fun testUpstreamError() = testUpstreamError(TestException()) + + @Test + fun testUpstreamErrorCancellationException() = testUpstreamError(CancellationException("")) + + private inline fun testUpstreamError(cause: T) = runTest { val latch = Channel() val flow = flow { expect(1) emit(1) expect(2) latch.receive() - throw TestException() + throw cause }.sample(1).map { latch.send(Unit) hang { expect(3) } } - assertFailsWith(flow) + assertFailsWith(flow) finish(4) } @@ -219,7 +224,6 @@ class SampleTest : TestBase() { finish(3) } - @Test fun testUpstreamErrorSampleNotTriggeredInIsolatedContext() = runTest { val flow = flow { diff --git a/kotlinx-coroutines-core/common/test/flow/operators/SwitchMapTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/SwitchMapTest.kt index 933bb1628e..fabca72c70 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/SwitchMapTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/SwitchMapTest.kt @@ -113,4 +113,10 @@ class SwitchMapTest : TestBase() { assertFailsWith(flow) finish(5) } + + @Test + fun testTake() = runTest { + val flow = flowOf(1, 2, 3, 4, 5).switchMap { flowOf(it) } + assertEquals(listOf(1), flow.take(1).toList()) + } } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt index decd2307c6..b28320c391 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt @@ -93,7 +93,7 @@ class ZipTest : TestBase() { } @Test - fun testCancesWhenFlowIsDone2() = runTest { + fun testCancelWhenFlowIsDone2() = runTest { val f1 = flow { emit("1") emit("2") @@ -189,4 +189,52 @@ class ZipTest : TestBase() { assertFailsWith(flow) finish(2) } + + @Test + fun testCancellationUpstream() = runTest { + val f1 = flow { + expect(1) + emit(1) + yield() + expect(4) + throw CancellationException("") + } + + val f2 = flow { + expect(2) + emit(1) + expect(5) + hang { expect(6) } + } + + val flow = f1.zip(f2, { _, _ -> 1 }).onEach { expect(3) } + assertFailsWith(flow) + finish(7) + } + + @Test + fun testCancellationDownstream() = runTest { + val f1 = flow { + expect(1) + emit(1) + yield() + expect(4) + hang { expect(6) } + } + + val f2 = flow { + expect(2) + emit(1) + expect(5) + hang { expect(7) } + } + + val flow = f1.zip(f2, { _, _ -> 1 }).onEach { + expect(3) + yield() + throw CancellationException("") + } + assertFailsWith(flow) + finish(8) + } } diff --git a/kotlinx-coroutines-core/js/src/Exceptions.kt b/kotlinx-coroutines-core/js/src/Exceptions.kt index 83a0cdaf90..f42704107b 100644 --- a/kotlinx-coroutines-core/js/src/Exceptions.kt +++ b/kotlinx-coroutines-core/js/src/Exceptions.kt @@ -48,8 +48,6 @@ internal actual class JobCancellationException public actual constructor( (message!!.hashCode() * 31 + job.hashCode()) * 31 + (cause?.hashCode() ?: 0) } -internal actual class CoroutinesInternalError actual constructor(message: String, cause: Throwable) : Error(message.withCause(cause)) - @Suppress("FunctionName") internal fun IllegalStateException(message: String, cause: Throwable?) = IllegalStateException(message.withCause(cause)) diff --git a/kotlinx-coroutines-core/js/src/flow/internal/AbortFlowException.kt b/kotlinx-coroutines-core/js/src/flow/internal/FlowExceptions.kt similarity index 72% rename from kotlinx-coroutines-core/js/src/flow/internal/AbortFlowException.kt rename to kotlinx-coroutines-core/js/src/flow/internal/FlowExceptions.kt index d6a9c31eaa..8422f2bf33 100644 --- a/kotlinx-coroutines-core/js/src/flow/internal/AbortFlowException.kt +++ b/kotlinx-coroutines-core/js/src/flow/internal/FlowExceptions.kt @@ -7,3 +7,4 @@ package kotlinx.coroutines.flow.internal import kotlinx.coroutines.* internal actual class AbortFlowException : CancellationException("Flow was aborted, no more elements needed") +internal actual class ChildCancelledException : CancellationException("Child of the scoped flow was cancelled") diff --git a/kotlinx-coroutines-core/jvm/src/Builders.kt b/kotlinx-coroutines-core/jvm/src/Builders.kt index d8f8ee33e1..52841cd2b2 100644 --- a/kotlinx-coroutines-core/jvm/src/Builders.kt +++ b/kotlinx-coroutines-core/jvm/src/Builders.kt @@ -59,8 +59,7 @@ private class BlockingCoroutine( private val blockedThread: Thread, private val eventLoop: EventLoop? ) : AbstractCoroutine(parentContext, true) { - override val cancelsParent: Boolean - get() = false // it throws exception to parent instead of cancelling it + override val isScopedCoroutine: Boolean get() = true override fun afterCompletionInternal(state: Any?, mode: Int) { // wake up blocked thread diff --git a/kotlinx-coroutines-core/jvm/src/Exceptions.kt b/kotlinx-coroutines-core/jvm/src/Exceptions.kt index bc7e92cadf..7a8f385e64 100644 --- a/kotlinx-coroutines-core/jvm/src/Exceptions.kt +++ b/kotlinx-coroutines-core/jvm/src/Exceptions.kt @@ -80,8 +80,6 @@ internal actual class JobCancellationException public actual constructor( (message!!.hashCode() * 31 + job.hashCode()) * 31 + (cause?.hashCode() ?: 0) } -internal actual class CoroutinesInternalError actual constructor(message: String, cause: Throwable) : Error(message, cause) - @Suppress("NOTHING_TO_INLINE") internal actual inline fun Throwable.addSuppressedThrowable(other: Throwable) = - addSuppressed(other) + addSuppressed(other) \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/src/channels/Actor.kt b/kotlinx-coroutines-core/jvm/src/channels/Actor.kt index ee41a0ac1a..ffabb99dec 100644 --- a/kotlinx-coroutines-core/jvm/src/channels/Actor.kt +++ b/kotlinx-coroutines-core/jvm/src/channels/Actor.kt @@ -127,7 +127,6 @@ private open class ActorCoroutine( channel: Channel, active: Boolean ) : ChannelCoroutine(parentContext, channel, active), ActorScope { - override val cancelsParent: Boolean get() = true override fun onCancelling(cause: Throwable?) { _channel.cancel(cause?.let { diff --git a/kotlinx-coroutines-core/jvm/src/flow/internal/AbortFlowException.kt b/kotlinx-coroutines-core/jvm/src/flow/internal/AbortFlowException.kt deleted file mode 100644 index 7ff34e735b..0000000000 --- a/kotlinx-coroutines-core/jvm/src/flow/internal/AbortFlowException.kt +++ /dev/null @@ -1,11 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.coroutines.flow.internal - -import kotlinx.coroutines.* - -internal actual class AbortFlowException : CancellationException("Flow was aborted, no more elements needed") { - override fun fillInStackTrace(): Throwable = this -} diff --git a/kotlinx-coroutines-core/jvm/src/flow/internal/FlowExceptions.kt b/kotlinx-coroutines-core/jvm/src/flow/internal/FlowExceptions.kt new file mode 100644 index 0000000000..d8d4d21e6f --- /dev/null +++ b/kotlinx-coroutines-core/jvm/src/flow/internal/FlowExceptions.kt @@ -0,0 +1,21 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow.internal + +import kotlinx.coroutines.* + +internal actual class AbortFlowException : CancellationException("Flow was aborted, no more elements needed") { + override fun fillInStackTrace(): Throwable { + if (DEBUG) super.fillInStackTrace() + return this + } +} + +internal actual class ChildCancelledException : CancellationException("Child of the scoped flow was cancelled") { + override fun fillInStackTrace(): Throwable { + if (DEBUG) super.fillInStackTrace() + return this + } +} diff --git a/kotlinx-coroutines-core/native/src/Builders.kt b/kotlinx-coroutines-core/native/src/Builders.kt index 82fd81a08f..0dc90d556a 100644 --- a/kotlinx-coroutines-core/native/src/Builders.kt +++ b/kotlinx-coroutines-core/native/src/Builders.kt @@ -54,8 +54,7 @@ private class BlockingCoroutine( parentContext: CoroutineContext, private val eventLoop: EventLoop? ) : AbstractCoroutine(parentContext, true) { - override val cancelsParent: Boolean - get() = false // it throws exception to parent instead of cancelling it + override val isScopedCoroutine: Boolean get() = true @Suppress("UNCHECKED_CAST") fun joinBlocking(): T = memScoped { diff --git a/kotlinx-coroutines-core/native/src/Exceptions.kt b/kotlinx-coroutines-core/native/src/Exceptions.kt index 29c3ce5135..109b9100cb 100644 --- a/kotlinx-coroutines-core/native/src/Exceptions.kt +++ b/kotlinx-coroutines-core/native/src/Exceptions.kt @@ -48,8 +48,6 @@ internal actual class JobCancellationException public actual constructor( (message!!.hashCode() * 31 + job.hashCode()) * 31 + (cause?.hashCode() ?: 0) } -internal actual class CoroutinesInternalError actual constructor(message: String, cause: Throwable) : Error(message.withCause(cause)) - @Suppress("FunctionName") internal fun IllegalStateException(message: String, cause: Throwable?) = IllegalStateException(message.withCause(cause)) diff --git a/kotlinx-coroutines-core/native/src/flow/internal/AbortFlowException.kt b/kotlinx-coroutines-core/native/src/flow/internal/FlowExceptions.kt similarity index 72% rename from kotlinx-coroutines-core/native/src/flow/internal/AbortFlowException.kt rename to kotlinx-coroutines-core/native/src/flow/internal/FlowExceptions.kt index d6a9c31eaa..4a291ea27e 100644 --- a/kotlinx-coroutines-core/native/src/flow/internal/AbortFlowException.kt +++ b/kotlinx-coroutines-core/native/src/flow/internal/FlowExceptions.kt @@ -7,3 +7,5 @@ package kotlinx.coroutines.flow.internal import kotlinx.coroutines.* internal actual class AbortFlowException : CancellationException("Flow was aborted, no more elements needed") +internal actual class ChildCancelledException : CancellationException("Child of the scoped flow was cancelled") +