From 9b19e7fb85f35a396451bd00d285641bee4d94ea Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Thu, 27 Sep 2018 19:17:16 +0300 Subject: [PATCH] async and async-like builders cancel parent on failure * Affects async, CompletableDeferred, and all Rx integration builders. * This makes all coroutine builders totally consistent. They all cancel parent on failure, but they all consider "CancellationException" to be the case of "normal cancellation" that does not propagate to parent. The only missing case is Job() that should be fixed together with introduction of SupervisorJob(). * Note that "scoping" builders don't "cancel the parent", but rethrow the corresponding exception instead, so it that is how it gets propagated up the stack. * This makes parallel decomposition exception-safe. You cannot loose an exception as along as default (child async) behavior is not overridden. Fixes #552 --- .../kotlinx-coroutines-core.txt | 3 +- .../src/AbstractContinuation.kt | 20 ++- .../src/Builders.common.kt | 1 + .../src/CancellableContinuation.kt | 4 +- .../src/CompletableDeferred.kt | 1 + .../src/JobSupport.kt | 61 ++++++-- .../src/channels/Broadcast.kt | 2 +- .../src/channels/ChannelCoroutine.kt | 2 + .../test/AbstractCoroutineTest.kt | 9 +- .../test/AsyncLazyTest.kt | 2 +- .../test/AsyncTest.kt | 58 ++++---- .../test/AwaitTest.kt | 14 +- .../test/CompletableDeferredTest.kt | 5 +- .../test/CoroutineScopeTest.kt | 96 ++++++++----- .../test/CoroutinesTest.kt | 2 +- .../test/NonCancellableTest.kt | 9 +- .../test/ParentCancellationTest.kt | 132 ++++++++++++++++++ .../test/WithTimeoutOrNullTest.kt | 15 ++ .../test/WithTimeoutTest.kt | 15 ++ .../test/channels/BasicOperationsTest.kt | 14 +- .../test/channels/ProduceTest.kt | 19 +-- .../test/AwaitStressTest.kt | 15 +- .../test/JobActivationStressTest.kt | 2 +- .../test/JoinStressTest.kt | 6 +- .../test/channels/ChannelsConsumeTest.kt | 7 +- .../ConflatedChannelCloseStressTest.kt | 2 +- .../exceptions/JobBasicCancellationTest.kt | 10 +- .../test/exceptions/SuppresionTests.kt | 2 +- .../test/ListenableFutureTest.kt | 2 +- .../src/Publish.kt | 1 + .../test/PublishTest.kt | 12 +- .../kotlinx-coroutines-reactor/src/Mono.kt | 2 + .../test/ConvertTest.kt | 2 +- .../test/FluxTest.kt | 15 +- .../test/MonoTest.kt | 15 +- .../src/RxCompletable.kt | 1 + .../kotlinx-coroutines-rx2/src/RxMaybe.kt | 1 + .../src/RxObservable.kt | 1 + .../kotlinx-coroutines-rx2/src/RxSingle.kt | 1 + .../test/CompletableTest.kt | 18 ++- .../test/FlowableTest.kt | 16 ++- .../kotlinx-coroutines-rx2/test/MaybeTest.kt | 15 +- .../test/ObservableTest.kt | 15 +- .../kotlinx-coroutines-rx2/test/SingleTest.kt | 15 +- 44 files changed, 507 insertions(+), 153 deletions(-) create mode 100644 common/kotlinx-coroutines-core-common/test/ParentCancellationTest.kt diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index b1c6765145..bfb88f2538 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -71,13 +71,14 @@ public final class kotlinx/coroutines/experimental/CancellableContinuation$Defau public static synthetic fun tryResume$default (Lkotlinx/coroutines/experimental/CancellableContinuation;Ljava/lang/Object;Ljava/lang/Object;ILjava/lang/Object;)Ljava/lang/Object; } -public final class kotlinx/coroutines/experimental/CancellableContinuationImpl : java/lang/Runnable, kotlinx/coroutines/experimental/CancellableContinuation { +public class kotlinx/coroutines/experimental/CancellableContinuationImpl : java/lang/Runnable, kotlinx/coroutines/experimental/CancellableContinuation { public fun (Lkotlin/coroutines/experimental/Continuation;I)V public fun completeResume (Ljava/lang/Object;)V public fun getContext ()Lkotlin/coroutines/experimental/CoroutineContext; public fun getSuccessfulResult (Ljava/lang/Object;)Ljava/lang/Object; public fun initCancellability ()V public fun invokeOnCompletion (ZZLkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/experimental/DisposableHandle; + protected fun nameString ()Ljava/lang/String; public fun resumeUndispatched (Lkotlinx/coroutines/experimental/CoroutineDispatcher;Ljava/lang/Object;)V public fun resumeUndispatchedWithException (Lkotlinx/coroutines/experimental/CoroutineDispatcher;Ljava/lang/Throwable;)V public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object; diff --git a/common/kotlinx-coroutines-core-common/src/AbstractContinuation.kt b/common/kotlinx-coroutines-core-common/src/AbstractContinuation.kt index 14330b20fe..41035ebf45 100644 --- a/common/kotlinx-coroutines-core-common/src/AbstractContinuation.kt +++ b/common/kotlinx-coroutines-core-common/src/AbstractContinuation.kt @@ -91,13 +91,23 @@ internal abstract class AbstractContinuation( override fun takeState(): Any? = state - public fun cancel(cause: Throwable?): Boolean { + public fun cancel(cause: Throwable?): Boolean = + cancelImpl(cause) + + fun cancelImpl(cause: Throwable?): Boolean { loopOnState { state -> if (state !is NotCompleted) return false // quit if already complete - if (tryCancel(state, cause)) return true + val update = CancelledContinuation(this, cause) + if (updateStateToFinal(state, update, mode = MODE_ATOMIC_DEFAULT)) return true } } + /** + * It is used when parent is cancelled to get the cancellation cause for this continuation. + */ + open fun getParentCancellationCause(parent: Job): Throwable = + parent.getCancellationException() + private fun trySuspend(): Boolean { _decision.loop { decision -> when (decision) { @@ -133,6 +143,9 @@ internal abstract class AbstractContinuation( override fun resumeWithException(exception: Throwable) = resumeImpl(CompletedExceptionally(exception), resumeMode) + internal fun resumeWithExceptionMode(exception: Throwable, mode: Int) = + resumeImpl(CompletedExceptionally(exception), mode) + public fun invokeOnCancellation(handler: CompletionHandler) { var handleCache: CancelHandler? = null loopOnState { state -> @@ -166,9 +179,6 @@ internal abstract class AbstractContinuation( private fun makeHandler(handler: CompletionHandler): CancelHandler = if (handler is CancelHandler) handler else InvokeOnCancel(handler) - private fun tryCancel(state: NotCompleted, cause: Throwable?): Boolean = - updateStateToFinal(state, CancelledContinuation(this, cause), mode = MODE_ATOMIC_DEFAULT) - private fun dispatchResume(mode: Int) { if (tryResume()) return // completed before getResult invocation -- bail out // otherwise, getResult has already commenced, i.e. completed later or in other thread diff --git a/common/kotlinx-coroutines-core-common/src/Builders.common.kt b/common/kotlinx-coroutines-core-common/src/Builders.common.kt index cc77b1a1fb..f16679c03c 100644 --- a/common/kotlinx-coroutines-core-common/src/Builders.common.kt +++ b/common/kotlinx-coroutines-core-common/src/Builders.common.kt @@ -157,6 +157,7 @@ private open class DeferredCoroutine( parentContext: CoroutineContext, active: Boolean ) : AbstractCoroutine(parentContext, active), Deferred, SelectClause1 { + override val cancelsParent: Boolean get() = true override fun getCompleted(): T = getCompletedInternal() as T override suspend fun await(): T = awaitInternal() as T override val onAwait: SelectClause1 get() = this diff --git a/common/kotlinx-coroutines-core-common/src/CancellableContinuation.kt b/common/kotlinx-coroutines-core-common/src/CancellableContinuation.kt index deef82cb24..aa3679a673 100644 --- a/common/kotlinx-coroutines-core-common/src/CancellableContinuation.kt +++ b/common/kotlinx-coroutines-core-common/src/CancellableContinuation.kt @@ -278,7 +278,7 @@ private class DisposeOnCancel(private val handle: DisposableHandle) : CancelHand } @PublishedApi -internal class CancellableContinuationImpl( +internal open class CancellableContinuationImpl( delegate: Continuation, resumeMode: Int ) : AbstractContinuation(delegate, resumeMode), CancellableContinuation, Runnable { @@ -317,7 +317,7 @@ internal class CancellableContinuationImpl( override fun tryResumeWithException(exception: Throwable): Any? { loopOnState { state -> - when (state) { + when (state) { is NotCompleted -> { if (tryUpdateStateToFinal(state, CompletedExceptionally(exception))) return state } diff --git a/common/kotlinx-coroutines-core-common/src/CompletableDeferred.kt b/common/kotlinx-coroutines-core-common/src/CompletableDeferred.kt index ba872861ae..dd66847cbd 100644 --- a/common/kotlinx-coroutines-core-common/src/CompletableDeferred.kt +++ b/common/kotlinx-coroutines-core-common/src/CompletableDeferred.kt @@ -63,6 +63,7 @@ private class CompletableDeferredImpl( parent: Job? ) : JobSupport(true), CompletableDeferred, SelectClause1 { init { initParentJobInternal(parent) } + override val cancelsParent: Boolean get() = true override val onCancelComplete get() = true override fun getCompleted(): T = getCompletedInternal() as T override suspend fun await(): T = awaitInternal() as T diff --git a/common/kotlinx-coroutines-core-common/src/JobSupport.kt b/common/kotlinx-coroutines-core-common/src/JobSupport.kt index 63bc6bb4fb..6858670d2d 100644 --- a/common/kotlinx-coroutines-core-common/src/JobSupport.kt +++ b/common/kotlinx-coroutines-core-common/src/JobSupport.kt @@ -9,6 +9,7 @@ import kotlinx.coroutines.experimental.internal.* import kotlinx.coroutines.experimental.intrinsics.* import kotlinx.coroutines.experimental.selects.* import kotlin.coroutines.experimental.* +import kotlin.coroutines.experimental.intrinsics.* /** * A concrete implementation of [Job]. It is optionally a child to a parent job. @@ -1051,6 +1052,25 @@ internal open class JobSupport constructor(active: Boolean) : Job, ChildJob, Sel "ChildCompletion[$child, $proposedUpdate]" } + private class AwaitContinuation( + delegate: Continuation, + private val job: JobSupport + ) : CancellableContinuationImpl(delegate, MODE_CANCELLABLE) { + override fun getParentCancellationCause(parent: Job): Throwable { + val state = job.state + /* + * When the job we are waiting for had already completely completed exceptionally or + * is failing, we shall use its root/completion cause for await's result. + */ + if (state is Finishing) state.rootCause?.let { return it } + if (state is CompletedExceptionally) return state.cause + return parent.getCancellationException() + } + + protected override fun nameString(): String = + "AwaitContinuation(${delegate.toDebugString()})" + } + /* * ================================================================================================= * This is ready-to-use implementation for Deferred interface. @@ -1095,16 +1115,16 @@ internal open class JobSupport constructor(active: Boolean) : Job, ChildJob, Sel return awaitSuspend() // slow-path } - private suspend fun awaitSuspend(): Any? = suspendCancellableCoroutine { cont -> - // We have to invoke await() handler only on cancellation, on completion we will be resumed regularly without handlers - cont.disposeOnCancellation(invokeOnCompletion { - val state = this.state - check(state !is Incomplete) - if (state is CompletedExceptionally) - cont.resumeWithException(state.cause) - else - cont.resume(state) - }) + private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont -> + /* + * Custom code here, so that parent coroutine that is using await + * on its child deferred (async) coroutine would throw the exception that this child had + * thrown and not a JobCancellationException. + */ + val cont = AwaitContinuation(uCont.intercepted(), this) + cont.initCancellability() + invokeOnCompletion(ResumeAwaitOnCompletion(this, cont).asHandler) + cont.getResult() } /** @@ -1232,6 +1252,25 @@ private class ResumeOnCompletion( override fun toString() = "ResumeOnCompletion[$continuation]" } +private class ResumeAwaitOnCompletion( + job: JobSupport, + private val continuation: AbstractContinuation +) : JobNode(job) { + override fun invoke(cause: Throwable?) { + val state = job.state + check(state !is Incomplete) + if (state is CompletedExceptionally) { + // Resume with exception in atomic way to preserve exception + continuation.resumeWithExceptionMode(state.cause, MODE_ATOMIC_DEFAULT) + } else { + // Resuming with value in a cancellable way (AwaitContinuation is configured for this mode). + @Suppress("UNCHECKED_CAST") + continuation.resume(state as T) + } + } + override fun toString() = "ResumeAwaitOnCompletion[$continuation]" +} + internal class DisposeOnCompletion( job: Job, private val handle: DisposableHandle @@ -1299,7 +1338,7 @@ internal class ChildContinuation( @JvmField val child: AbstractContinuation<*> ) : JobCancellingNode(parent) { override fun invoke(cause: Throwable?) { - child.cancel(job.getCancellationException()) + child.cancelImpl(child.getParentCancellationCause(job)) } override fun toString(): String = "ChildContinuation[$child]" diff --git a/common/kotlinx-coroutines-core-common/src/channels/Broadcast.kt b/common/kotlinx-coroutines-core-common/src/channels/Broadcast.kt index 3da7fd653f..8b46a51d92 100644 --- a/common/kotlinx-coroutines-core-common/src/channels/Broadcast.kt +++ b/common/kotlinx-coroutines-core-common/src/channels/Broadcast.kt @@ -105,7 +105,7 @@ private open class BroadcastCoroutine( protected val _channel: BroadcastChannel, active: Boolean ) : AbstractCoroutine(parentContext, active), ProducerScope, BroadcastChannel by _channel { - + override val cancelsParent: Boolean get() = true override val isActive: Boolean get() = super.isActive override val channel: SendChannel diff --git a/common/kotlinx-coroutines-core-common/src/channels/ChannelCoroutine.kt b/common/kotlinx-coroutines-core-common/src/channels/ChannelCoroutine.kt index dfb511291b..07e5798597 100644 --- a/common/kotlinx-coroutines-core-common/src/channels/ChannelCoroutine.kt +++ b/common/kotlinx-coroutines-core-common/src/channels/ChannelCoroutine.kt @@ -12,6 +12,8 @@ internal open class ChannelCoroutine( protected val _channel: Channel, active: Boolean ) : AbstractCoroutine(parentContext, active), Channel by _channel { + override val cancelsParent: Boolean get() = true + val channel: Channel get() = this override fun cancel() = cancel(null) diff --git a/common/kotlinx-coroutines-core-common/test/AbstractCoroutineTest.kt b/common/kotlinx-coroutines-core-common/test/AbstractCoroutineTest.kt index 06e45f54d3..e9a141ecf9 100644 --- a/common/kotlinx-coroutines-core-common/test/AbstractCoroutineTest.kt +++ b/common/kotlinx-coroutines-core-common/test/AbstractCoroutineTest.kt @@ -7,7 +7,6 @@ package kotlinx.coroutines.experimental import kotlin.test.* class AbstractCoroutineTest : TestBase() { - @Test fun testNotifications() = runTest { expect(1) @@ -18,7 +17,7 @@ class AbstractCoroutineTest : TestBase() { } override fun onCancellation(cause: Throwable?) { - assertTrue(cause == null) + assertEquals(null, cause) expect(5) } @@ -33,12 +32,12 @@ class AbstractCoroutineTest : TestBase() { } coroutine.invokeOnCompletion(onCancelling = true) { - assertTrue(it == null) + assertEquals(null, it) expect(6) } coroutine.invokeOnCompletion { - assertTrue(it == null) + assertEquals(null, it) expect(7) } expect(2) @@ -52,7 +51,7 @@ class AbstractCoroutineTest : TestBase() { fun testNotificationsWithException() = runTest { expect(1) val coroutineContext = coroutineContext // workaround for KT-22984 - val coroutine = object : AbstractCoroutine(coroutineContext, false) { + val coroutine = object : AbstractCoroutine(coroutineContext + NonCancellable, false) { override fun onStart() { expect(3) } diff --git a/common/kotlinx-coroutines-core-common/test/AsyncLazyTest.kt b/common/kotlinx-coroutines-core-common/test/AsyncLazyTest.kt index 83be9e603b..79fea2c551 100644 --- a/common/kotlinx-coroutines-core-common/test/AsyncLazyTest.kt +++ b/common/kotlinx-coroutines-core-common/test/AsyncLazyTest.kt @@ -104,7 +104,7 @@ class AsyncLazyTest : TestBase() { @Test fun testCatchException() = runTest { expect(1) - val d = async(start = CoroutineStart.LAZY) { + val d = async(NonCancellable, start = CoroutineStart.LAZY) { expect(3) throw TestException() } diff --git a/common/kotlinx-coroutines-core-common/test/AsyncTest.kt b/common/kotlinx-coroutines-core-common/test/AsyncTest.kt index 5ccb97f901..3e598375c1 100644 --- a/common/kotlinx-coroutines-core-common/test/AsyncTest.kt +++ b/common/kotlinx-coroutines-core-common/test/AsyncTest.kt @@ -51,15 +51,15 @@ class AsyncTest : TestBase() { } @Test - fun testCancellationWithCause() = runTest(expected = { it is AssertionError }) { + fun testCancellationWithCause() = runTest(expected = { it is TestException }) { expect(1) - val d = async(start = CoroutineStart.ATOMIC) { + val d = async(NonCancellable, start = CoroutineStart.ATOMIC) { finish(3) yield() } expect(2) - d.cancel(AssertionError()) + d.cancel(TestException()) d.await() } @@ -78,46 +78,50 @@ class AsyncTest : TestBase() { @Test fun testParallelDecompositionCaughtException() = runTest { - val deferred = async(Job()) { - val decomposed = async { - throw AssertionError() + val deferred = async(NonCancellable) { + val decomposed = async(NonCancellable) { + throw TestException() 1 } - try { decomposed.await() - } catch (e: AssertionError) { + } catch (e: TestException) { 42 } } - assertEquals(42, deferred.await()) } - @Test fun testParallelDecompositionCaughtExceptionWithInheritedParent() = runTest { - val deferred = async { - val decomposed = async { - throw AssertionError() + expect(1) + val deferred = async(NonCancellable) { + expect(2) + val decomposed = async { // inherits parent job! + expect(3) + throw TestException() 1 } - try { decomposed.await() - } catch (e: AssertionError) { + } catch (e: TestException) { + expect(4) // Should catch this exception, but parent is already cancelled 42 } } - - assertEquals(42, deferred.await()) + try { + // This will fail + assertEquals(42, deferred.await()) + } catch (e: TestException) { + finish(5) + } } @Test - fun testParallelDecompositionUncaughtExceptionWithInheritedParent() = runTest(expected = { it is AssertionError }) { - val deferred = async { + fun testParallelDecompositionUncaughtExceptionWithInheritedParent() = runTest(expected = { it is TestException }) { + val deferred = async(NonCancellable) { val decomposed = async { - throw AssertionError() + throw TestException() 1 } @@ -129,10 +133,10 @@ class AsyncTest : TestBase() { } @Test - fun testParallelDecompositionUncaughtException() = runTest(expected = { it is AssertionError }) { - val deferred = async(Job()) { + fun testParallelDecompositionUncaughtException() = runTest(expected = { it is TestException }) { + val deferred = async(NonCancellable) { val decomposed = async { - throw AssertionError() + throw TestException() 1 } @@ -145,17 +149,15 @@ class AsyncTest : TestBase() { @Test fun testCancellationTransparency() = runTest { - val deferred = async(kotlin.coroutines.experimental.coroutineContext, CoroutineStart.ATOMIC) { + val deferred = async(NonCancellable, start = CoroutineStart.ATOMIC) { expect(2) throw TestException() } - expect(1) - deferred.cancel(UnsupportedOperationException()) - + deferred.cancel(TestException()) try { deferred.await() - } catch (e: UnsupportedOperationException) { + } catch (e: TestException) { finish(3) } } diff --git a/common/kotlinx-coroutines-core-common/test/AwaitTest.kt b/common/kotlinx-coroutines-core-common/test/AwaitTest.kt index 8a56fb1ddc..810f00b998 100644 --- a/common/kotlinx-coroutines-core-common/test/AwaitTest.kt +++ b/common/kotlinx-coroutines-core-common/test/AwaitTest.kt @@ -67,7 +67,7 @@ class AwaitTest : TestBase() { "OK" } - val d2 = async { + val d2 = async(NonCancellable) { yield() throw TestException() } @@ -93,12 +93,12 @@ class AwaitTest : TestBase() { @Test fun testAwaitAllMultipleExceptions() = runTest { - val d = async { + val d = async(NonCancellable) { expect(2) throw TestException() } - val d2 = async { + val d2 = async(NonCancellable) { yield() throw TestException() } @@ -154,7 +154,7 @@ class AwaitTest : TestBase() { @Test fun testAwaitAllPartiallyCompletedExceptionally() = runTest { - val d1 = async { + val d1 = async(NonCancellable) { expect(1) throw TestException() } @@ -228,7 +228,7 @@ class AwaitTest : TestBase() { @Test fun testAwaitAllSameThrowingJobMultipleTimes() = runTest { val d1 = - async { throw TestException() } + async(NonCancellable) { throw TestException() } val d2 = async { } // do nothing try { @@ -283,7 +283,7 @@ class AwaitTest : TestBase() { val d1 = launch { expect(2) } - val d2 = async { + val d2 = async(NonCancellable) { expect(3) throw TestException() } @@ -348,7 +348,7 @@ class AwaitTest : TestBase() { @Test fun testJoinAllSameJobExceptionally() = runTest { val job = - async { throw TestException() } + async(NonCancellable) { throw TestException() } joinAll(job, job, job) } diff --git a/common/kotlinx-coroutines-core-common/test/CompletableDeferredTest.kt b/common/kotlinx-coroutines-core-common/test/CompletableDeferredTest.kt index f9a71b84b9..b7cc7ab55a 100644 --- a/common/kotlinx-coroutines-core-common/test/CompletableDeferredTest.kt +++ b/common/kotlinx-coroutines-core-common/test/CompletableDeferredTest.kt @@ -104,14 +104,15 @@ class CompletableDeferredTest : TestBase() { } @Test - fun testParentActiveOnChildException() { + fun testParentCancelledOnChildException() { val parent = Job() val c = CompletableDeferred(parent) checkFresh(c) assertEquals(true, parent.isActive) assertEquals(true, c.cancel(TestException())) checkCancelWithException(c) - assertEquals(true, parent.isActive) + assertEquals(false, parent.isActive) + assertEquals(true, parent.isCancelled) } @Test diff --git a/common/kotlinx-coroutines-core-common/test/CoroutineScopeTest.kt b/common/kotlinx-coroutines-core-common/test/CoroutineScopeTest.kt index dd0e4882dc..949dfce6a3 100644 --- a/common/kotlinx-coroutines-core-common/test/CoroutineScopeTest.kt +++ b/common/kotlinx-coroutines-core-common/test/CoroutineScopeTest.kt @@ -9,16 +9,13 @@ import kotlin.coroutines.experimental.* import kotlin.test.* class CoroutineScopeTest : TestBase() { - @Test fun testScope() = runTest { suspend fun callJobScoped() = coroutineScope { expect(2) - launch { expect(4) } - launch { expect(5) @@ -29,12 +26,9 @@ class CoroutineScopeTest : TestBase() { expect(6) } - expect(3) 42 } - - expect(1) val result = callJobScoped() assertEquals(42, result) @@ -46,59 +40,65 @@ class CoroutineScopeTest : TestBase() { fun testScopeCancelledFromWithin() = runTest { expect(1) suspend fun callJobScoped() = coroutineScope { - launch { expect(2) delay(Long.MAX_VALUE) } - launch { expect(3) - throw IllegalArgumentException() + throw TestException2() } } try { callJobScoped() expectUnreached() - } catch (e: IllegalArgumentException) { + } catch (e: TestException2) { expect(4) } - yield() // Check we're not cancelled finish(5) } + @Test + fun testExceptionFromWithin() = runTest { + expect(1) + try { + expect(2) + coroutineScope { + expect(3) + throw TestException1() + } + expectUnreached() + } catch (e: TestException1) { + finish(4) + } + } + @Test fun testScopeBlockThrows() = runTest { expect(1) suspend fun callJobScoped(): Unit = coroutineScope { - launch { expect(2) delay(Long.MAX_VALUE) } - yield() // let launch sleep - throw NotImplementedError() + throw TestException1() } - try { callJobScoped() expectUnreached() - } catch (e: NotImplementedError) { + } catch (e: TestException1) { expect(3) } - yield() // Check we're not cancelled finish(4) } @Test fun testOuterJobIsCancelled() = runTest { - suspend fun callJobScoped() = coroutineScope { - launch { expect(3) try { @@ -113,7 +113,6 @@ class CoroutineScopeTest : TestBase() { 42 } - val outerJob = launch(NonCancellable) { expect(1) try { @@ -124,7 +123,6 @@ class CoroutineScopeTest : TestBase() { assertNull(e.cause) } } - repeat(3) { yield() } // let everything to start properly outerJob.cancel() outerJob.join() @@ -132,21 +130,21 @@ class CoroutineScopeTest : TestBase() { } @Test - fun testAsyncCancellation() = runTest { + fun testAsyncCancellationFirst() = runTest { try { expect(1) - failedConcurrentSum() + failedConcurrentSumFirst() expectUnreached() - } catch (e: IndexOutOfBoundsException) { - finish(5) + } catch (e: TestException1) { + finish(6) } } - private suspend fun failedConcurrentSum(): Int = coroutineScope { + // First async child fails -> second is cancelled + private suspend fun failedConcurrentSumFirst(): Int = coroutineScope { val one = async { - println("First child throws an exception") expect(3) - throw IndexOutOfBoundsException() + throw TestException1() } val two = async(start = CoroutineStart.ATOMIC) { try { @@ -154,10 +152,39 @@ class CoroutineScopeTest : TestBase() { delay(Long.MAX_VALUE) // Emulates very long computation 42 } finally { - println("Second child was cancelled") + expect(5) } } + expect(2) + one.await() + two.await() + } + + @Test + fun testAsyncCancellationSecond() = runTest { + try { + expect(1) + failedConcurrentSumSecond() + expectUnreached() + } catch (e: TestException1) { + finish(6) + } + } + // Second async child fails -> fist is cancelled + private suspend fun failedConcurrentSumSecond(): Int = coroutineScope { + val one = async { + try { + expect(3) + delay(Long.MAX_VALUE) // Emulates very long computation + 42 + } finally { + expect(5) + } + } + val two = async(start = CoroutineStart.ATOMIC) { + expect(4) + throw TestException1() + } expect(2) one.await() + two.await() } @@ -174,13 +201,11 @@ class CoroutineScopeTest : TestBase() { expect(3) } } - yield() - // UI updater withContext(coroutineContext) { expect(2) - throw AssertionError() + throw TestException1() data.await() // Actually unreached expectUnreached() } @@ -189,11 +214,11 @@ class CoroutineScopeTest : TestBase() { try { loadData() expectUnreached() - } catch (e: AssertionError) { + } catch (e: TestException1) { finish(4) } } - + @Test fun testScopePlusContext() { assertSame(EmptyCoroutineContext, scopePlusContext(EmptyCoroutineContext, EmptyCoroutineContext)) @@ -207,4 +232,7 @@ class CoroutineScopeTest : TestBase() { private fun scopePlusContext(c1: CoroutineContext, c2: CoroutineContext) = (ContextScope(c1) + c2).coroutineContext + + private class TestException1 : Exception() + private class TestException2 : Exception() } diff --git a/common/kotlinx-coroutines-core-common/test/CoroutinesTest.kt b/common/kotlinx-coroutines-core-common/test/CoroutinesTest.kt index db59aa5b98..05da2d2583 100644 --- a/common/kotlinx-coroutines-core-common/test/CoroutinesTest.kt +++ b/common/kotlinx-coroutines-core-common/test/CoroutinesTest.kt @@ -317,7 +317,7 @@ class CoroutinesTest : TestBase() { fun testNotCancellableChildWithExceptionCancelled() = runTest(expected = { it is IllegalArgumentException }) { expect(1) // CoroutineStart.ATOMIC makes sure it will not get cancelled for it starts executing - val d = async(start = CoroutineStart.ATOMIC) { + val d = async(NonCancellable, start = CoroutineStart.ATOMIC) { finish(4) throwTestException() // will throw expectUnreached() diff --git a/common/kotlinx-coroutines-core-common/test/NonCancellableTest.kt b/common/kotlinx-coroutines-core-common/test/NonCancellableTest.kt index 94050ee2e9..3c161941d3 100644 --- a/common/kotlinx-coroutines-core-common/test/NonCancellableTest.kt +++ b/common/kotlinx-coroutines-core-common/test/NonCancellableTest.kt @@ -7,7 +7,6 @@ package kotlinx.coroutines.experimental import kotlin.test.* class NonCancellableTest : TestBase() { - @Test fun testNonCancellable() = runTest { expect(1) @@ -39,7 +38,7 @@ class NonCancellableTest : TestBase() { @Test fun testNonCancellableWithException() = runTest { expect(1) - val deferred = async { + val deferred = async(NonCancellable) { withContext(NonCancellable) { expect(2) yield() @@ -52,13 +51,13 @@ class NonCancellableTest : TestBase() { } yield() - deferred.cancel(NumberFormatException()) + deferred.cancel(TestException()) expect(3) assertTrue(deferred.isCancelled) try { deferred.await() expectUnreached() - } catch (e: NumberFormatException) { + } catch (e: TestException) { finish(6) } } @@ -124,4 +123,6 @@ class NonCancellableTest : TestBase() { finish(7) } } + + private class TestException : Exception() } diff --git a/common/kotlinx-coroutines-core-common/test/ParentCancellationTest.kt b/common/kotlinx-coroutines-core-common/test/ParentCancellationTest.kt new file mode 100644 index 0000000000..bb34ad7360 --- /dev/null +++ b/common/kotlinx-coroutines-core-common/test/ParentCancellationTest.kt @@ -0,0 +1,132 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913 + +package kotlinx.coroutines.experimental + +import kotlinx.coroutines.experimental.channels.* +import kotlin.test.* + +/** + * Systematically tests that various builders cancel parent on failure. + */ +class ParentCancellationTest : TestBase() { + @Test + @Ignore // todo: shall be passing in Supervisor branch + fun testJobChild() = runTest { + testParentCancellation { fail -> + val child = Job(coroutineContext[Job]) + CoroutineScope(coroutineContext + child).fail() + } + } + + @Test + fun testCompletableDeferredChild() = runTest { + testParentCancellation { fail -> + val child = CompletableDeferred(coroutineContext[Job]) + CoroutineScope(coroutineContext + child).fail() + } + } + + @Test + fun testLaunchChild() = runTest { + testParentCancellation { fail -> + launch { fail() } + } + } + + @Test + fun testAsyncChild() = runTest { + testParentCancellation { fail -> + async { fail() } + } + } + + @Test + fun testProduceChild() = runTest { + testParentCancellation { fail -> + produce { fail() } + } + } + + @Test + fun testBroadcastChild() = runTest { + testParentCancellation { fail -> + broadcast { fail() }.openSubscription() + } + } + + @Test + fun testCoroutineScopeChild() = runTest { + testParentCancellation(expectRethrows = true) { fail -> + coroutineScope { fail() } + } + } + + @Test + fun testWithContextChild() = runTest { + testParentCancellation(expectRethrows = true) { fail -> + withContext(CoroutineName("fail")) { fail() } + } + } + + @Test + fun testWithTimeoutChild() = runTest { + testParentCancellation(expectRethrows = true) { fail -> + withTimeout(1000) { fail() } + } + } + + private suspend fun CoroutineScope.testParentCancellation( + expectRethrows: Boolean = false, + child: suspend CoroutineScope.(block: suspend CoroutineScope.() -> Unit) -> Unit + ) { + testWithException(expectRethrows, TestException(), child) + testWithException(expectRethrows, CancellationException("Test"), child) + } + + private suspend fun CoroutineScope.testWithException( + expectRethrows: Boolean, + throwException: Throwable, + child: suspend CoroutineScope.(block: suspend CoroutineScope.() -> Unit) -> Unit + ) { + reset() + expect(1) + val parent = CompletableDeferred() // parent that handles exception (!) + val scope = CoroutineScope(coroutineContext + parent) + try { + scope.child { + // launch failing grandchild + val grandchild = launch { + throw throwException + } + grandchild.join() + } + if (expectRethrows && throwException !is CancellationException) { + expectUnreached() + } else { + expect(2) + } + } catch (e: Throwable) { + if (expectRethrows) { + expect(2) + assertSame(throwException, e) + } else { + expectUnreached() + } + } + if (expectRethrows || throwException is CancellationException) { + // Note: parent is not cancelled on CancellationException or when primitive rethrows it + assertTrue(parent.isActive) + } else { + parent.join() + assertFalse(parent.isActive) + assertTrue(parent.isCancelled) + } + finish(3) + } + + private class TestException : Exception() +} \ No newline at end of file diff --git a/common/kotlinx-coroutines-core-common/test/WithTimeoutOrNullTest.kt b/common/kotlinx-coroutines-core-common/test/WithTimeoutOrNullTest.kt index a22b0bbe25..f7760b80b6 100644 --- a/common/kotlinx-coroutines-core-common/test/WithTimeoutOrNullTest.kt +++ b/common/kotlinx-coroutines-core-common/test/WithTimeoutOrNullTest.kt @@ -225,4 +225,19 @@ class WithTimeoutOrNullTest : TestBase() { assertNull(result) finish(2) } + + @Test + fun testExceptionFromWithinTimeout() = runTest { + expect(1) + try { + expect(2) + withTimeoutOrNull(1000) { + expect(3) + throw TestException() + } + expectUnreached() + } catch (e: TestException) { + finish(4) + } + } } \ No newline at end of file diff --git a/common/kotlinx-coroutines-core-common/test/WithTimeoutTest.kt b/common/kotlinx-coroutines-core-common/test/WithTimeoutTest.kt index 8f126d557c..639672c74b 100644 --- a/common/kotlinx-coroutines-core-common/test/WithTimeoutTest.kt +++ b/common/kotlinx-coroutines-core-common/test/WithTimeoutTest.kt @@ -182,5 +182,20 @@ class WithTimeoutTest : TestBase() { finish(2) } } + + @Test + fun testExceptionFromWithinTimeout() = runTest { + expect(1) + try { + expect(2) + withTimeout(1000) { + expect(3) + throw TestException() + } + expectUnreached() + } catch (e: TestException) { + finish(4) + } + } } diff --git a/common/kotlinx-coroutines-core-common/test/channels/BasicOperationsTest.kt b/common/kotlinx-coroutines-core-common/test/channels/BasicOperationsTest.kt index e8ce316776..ad78940d47 100644 --- a/common/kotlinx-coroutines-core-common/test/channels/BasicOperationsTest.kt +++ b/common/kotlinx-coroutines-core-common/test/channels/BasicOperationsTest.kt @@ -71,7 +71,7 @@ class BasicOperationsTest : TestBase() { private suspend fun testReceiveOrNull(kind: TestChannelKind) = coroutineScope { val channel = kind.create() - val d = async { + val d = async(NonCancellable) { channel.receive() } @@ -88,24 +88,24 @@ class BasicOperationsTest : TestBase() { private suspend fun testReceiveOrNullException(kind: TestChannelKind) = coroutineScope { val channel = kind.create() - val d = async { + val d = async(NonCancellable) { channel.receive() } yield() - channel.close(IndexOutOfBoundsException()) + channel.close(TestException()) assertTrue(channel.isClosedForReceive) - assertFailsWith { channel.poll() } + assertFailsWith { channel.poll() } try { channel.receiveOrNull() fail() - } catch (e: IndexOutOfBoundsException) { + } catch (e: TestException) { // Expected } d.join() - assertTrue(d.getCancellationException().cause is IndexOutOfBoundsException) + assertTrue(d.getCancellationException().cause is TestException) } @@ -147,4 +147,6 @@ class BasicOperationsTest : TestBase() { assertEquals(iterations, expected) } } + + private class TestException : Exception() } diff --git a/common/kotlinx-coroutines-core-common/test/channels/ProduceTest.kt b/common/kotlinx-coroutines-core-common/test/channels/ProduceTest.kt index 770077a1c0..4c0540907e 100644 --- a/common/kotlinx-coroutines-core-common/test/channels/ProduceTest.kt +++ b/common/kotlinx-coroutines-core-common/test/channels/ProduceTest.kt @@ -9,7 +9,6 @@ import kotlin.coroutines.experimental.* import kotlin.test.* class ProduceTest : TestBase() { - @Test fun testBasic() = runTest { val c = produce { @@ -30,14 +29,15 @@ class ProduceTest : TestBase() { @Test fun testCancelWithoutCause() = runTest { - val c = produce { + val c = produce(NonCancellable) { expect(2) send(1) expect(3) try { send(2) // will get cancelled + expectUnreached() } catch (e: Throwable) { - finish(7) + expect(7) check(e is ClosedSendChannelException) throw e } @@ -50,35 +50,38 @@ class ProduceTest : TestBase() { expect(5) assertNull(c.receiveOrNull()) expect(6) + yield() // to produce + finish(8) } @Test fun testCancelWithCause() = runTest { - val c = produce { + val c = produce(NonCancellable) { expect(2) send(1) expect(3) try { send(2) // will get cancelled - } catch (e: Exception) { - finish(6) + expectUnreached() + } catch (e: Throwable) { + expect(6) check(e is TestException) throw e } expectUnreached() } - expect(1) check(c.receive() == 1) expect(4) c.cancel(TestException()) - try { assertNull(c.receiveOrNull()) expectUnreached() } catch (e: TestException) { expect(5) } + yield() // to produce + finish(7) } @Test diff --git a/core/kotlinx-coroutines-core/test/AwaitStressTest.kt b/core/kotlinx-coroutines-core/test/AwaitStressTest.kt index b2683fe028..a50ccd0aa6 100644 --- a/core/kotlinx-coroutines-core/test/AwaitStressTest.kt +++ b/core/kotlinx-coroutines-core/test/AwaitStressTest.kt @@ -24,25 +24,21 @@ class AwaitStressTest : TestBase() { @Test fun testMultipleExceptions() = runTest { - + val ctx = pool + NonCancellable repeat(iterations) { val barrier = CyclicBarrier(4) - - val d1 = async(pool) { + val d1 = async(ctx) { barrier.await() throw TestException() } - - val d2 = async(pool) { + val d2 = async(ctx) { barrier.await() throw TestException() } - - val d3 = async(pool) { + val d3 = async(ctx) { barrier.await() 1L } - try { barrier.await() awaitAll(d1, d2, d3) @@ -58,18 +54,15 @@ class AwaitStressTest : TestBase() { @Test fun testAwaitAll() = runTest { val barrier = CyclicBarrier(3) - repeat(iterations) { val d1 = async(pool) { barrier.await() 1L } - val d2 = async(pool) { barrier.await() 2L } - barrier.await() awaitAll(d1, d2) require(d1.isCompleted && d2.isCompleted) diff --git a/core/kotlinx-coroutines-core/test/JobActivationStressTest.kt b/core/kotlinx-coroutines-core/test/JobActivationStressTest.kt index fa1927cd95..01b9a8bf85 100644 --- a/core/kotlinx-coroutines-core/test/JobActivationStressTest.kt +++ b/core/kotlinx-coroutines-core/test/JobActivationStressTest.kt @@ -28,7 +28,7 @@ class JobActivationStressTest : TestBase() { val scope = CoroutineScope(pool) repeat(N_ITERATIONS) { var wasStarted = false - val d = scope.async(start = CoroutineStart.LAZY) { + val d = scope.async(NonCancellable, start = CoroutineStart.LAZY) { wasStarted = true throw TestException() } diff --git a/core/kotlinx-coroutines-core/test/JoinStressTest.kt b/core/kotlinx-coroutines-core/test/JoinStressTest.kt index 32a105da5f..b4055611c6 100644 --- a/core/kotlinx-coroutines-core/test/JoinStressTest.kt +++ b/core/kotlinx-coroutines-core/test/JoinStressTest.kt @@ -30,7 +30,7 @@ class JoinStressTest : TestBase() { repeat(iterations) { val barrier = CyclicBarrier(3) - val exceptionalJob = async(pool) { + val exceptionalJob = async(pool + NonCancellable) { barrier.await() throw TestException() } @@ -65,7 +65,7 @@ class JoinStressTest : TestBase() { repeat(iterations) { val barrier = CyclicBarrier(4) - val exceptionalJob = async(pool) { + val exceptionalJob = async(pool + NonCancellable) { barrier.await() throw TestException() } @@ -81,7 +81,7 @@ class JoinStressTest : TestBase() { } } - val canceller = async(pool) { + val canceller = async(pool + NonCancellable) { barrier.await() exceptionalJob.cancel(IOException()) } diff --git a/core/kotlinx-coroutines-core/test/channels/ChannelsConsumeTest.kt b/core/kotlinx-coroutines-core/test/channels/ChannelsConsumeTest.kt index 9875b78ad6..73557cce1b 100644 --- a/core/kotlinx-coroutines-core/test/channels/ChannelsConsumeTest.kt +++ b/core/kotlinx-coroutines-core/test/channels/ChannelsConsumeTest.kt @@ -5,6 +5,7 @@ package kotlinx.coroutines.experimental.channels import kotlinx.coroutines.experimental.* +import kotlin.coroutines.experimental.* import kotlin.test.* /** @@ -14,7 +15,7 @@ class ChannelsConsumeTest { private val sourceList = (1..10).toList() // test source with numbers 1..10 - private fun CoroutineScope.testSource() = produce { + private fun CoroutineScope.testSource() = produce(NonCancellable) { for (i in sourceList) { send(i) } @@ -478,7 +479,7 @@ class ChannelsConsumeTest { fun testFlatMap() { checkTransform(sourceList.flatMap { listOf("A$it", "B$it") }) { flatMap { - GlobalScope.produce { + GlobalScope.produce(coroutineContext) { send("A$it") send("B$it") } @@ -846,7 +847,7 @@ class ChannelsConsumeTest { val src = runBlocking { val src = testSource() // terminal operation in a separate async context started until the first suspension - val d = async(start = CoroutineStart.UNDISPATCHED) { + val d = async(NonCancellable, start = CoroutineStart.UNDISPATCHED) { terminal(src) } // then cancel it diff --git a/core/kotlinx-coroutines-core/test/channels/ConflatedChannelCloseStressTest.kt b/core/kotlinx-coroutines-core/test/channels/ConflatedChannelCloseStressTest.kt index a06224a2b8..d031e51eb4 100644 --- a/core/kotlinx-coroutines-core/test/channels/ConflatedChannelCloseStressTest.kt +++ b/core/kotlinx-coroutines-core/test/channels/ConflatedChannelCloseStressTest.kt @@ -59,7 +59,7 @@ class ConflatedChannelCloseStressTest : TestBase() { closerJob.cancel() } } - val receiver = async(pool) { + val receiver = async(pool + NonCancellable) { while (isActive) { curChannel.get().receiveOrNull() received.incrementAndGet() diff --git a/core/kotlinx-coroutines-core/test/exceptions/JobBasicCancellationTest.kt b/core/kotlinx-coroutines-core/test/exceptions/JobBasicCancellationTest.kt index 8421d5e83b..c9dc270d72 100644 --- a/core/kotlinx-coroutines-core/test/exceptions/JobBasicCancellationTest.kt +++ b/core/kotlinx-coroutines-core/test/exceptions/JobBasicCancellationTest.kt @@ -95,8 +95,8 @@ class JobBasicCancellationTest : TestBase() { @Test fun testNestedAsyncFailure() = runTest { - val deferred = async { - val nested = async { + val deferred = async(NonCancellable) { + val nested = async(NonCancellable) { expect(3) throw IOException() } @@ -121,9 +121,8 @@ class JobBasicCancellationTest : TestBase() { expect(1) val child = Job(coroutineContext[Job]) expect(2) - assertFalse(child.cancel(IOException())) + assertFalse(child.cancel()) // cancel without cause -- should not cancel us (parent) child.join() - assertTrue(child.getCancellationException().cause is IOException) expect(3) } @@ -137,9 +136,8 @@ class JobBasicCancellationTest : TestBase() { expect(1) val child = CompletableDeferred(coroutineContext[Job]) expect(2) - assertTrue(child.cancel(IOException())) + assertTrue(child.cancel()) // cancel without cause -- should not cancel us (parent) child.join() - assertTrue(child.getCancellationException().cause is IOException) expect(3) } diff --git a/core/kotlinx-coroutines-core/test/exceptions/SuppresionTests.kt b/core/kotlinx-coroutines-core/test/exceptions/SuppresionTests.kt index abbd56cb5e..1708a5e040 100644 --- a/core/kotlinx-coroutines-core/test/exceptions/SuppresionTests.kt +++ b/core/kotlinx-coroutines-core/test/exceptions/SuppresionTests.kt @@ -15,7 +15,7 @@ class SuppresionTests : TestBase() { @Test fun testCancellationTransparency() = runTest { - val deferred = async(kotlin.coroutines.experimental.coroutineContext, CoroutineStart.ATOMIC) { + val deferred = async(NonCancellable, start = CoroutineStart.ATOMIC) { expect(2) throw ArithmeticException() } diff --git a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt index 817d72e352..79ba609116 100644 --- a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt +++ b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt @@ -245,7 +245,7 @@ class ListenableFutureTest : TestBase() { fun testThrowingFutureAsDeferred() = runTest { val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool()) val future = executor.submit(Callable { throw TestException() }) - val deferred = async { + val deferred = GlobalScope.async { future.asDeferred().await() } diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt index f86b5d8afc..0012981dcf 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt @@ -72,6 +72,7 @@ private class PublisherCoroutine( private val subscriber: Subscriber ) : AbstractCoroutine(parentContext, true), ProducerScope, Subscription, SelectClause2> { override val channel: SendChannel get() = this + override val cancelsParent: Boolean get() = true // Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked private val mutex = Mutex(locked = true) diff --git a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt index a55dad061c..edecb2fde0 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt @@ -58,7 +58,7 @@ class PublishTest : TestBase() { @Test fun testBasicError() = runBlocking { expect(1) - val publisher = publish { + val publisher = publish(NonCancellable) { expect(5) throw RuntimeException("OK") } @@ -80,4 +80,14 @@ class PublishTest : TestBase() { yield() // to publish coroutine finish(7) } + + @Test + fun testCancelsParentOnFailure() = runTest( + expected = { it is RuntimeException && it.message == "OK" } + ) { + // has parent, so should cancel it on failure + publish { + throw RuntimeException("OK") + }.openSubscription() + } } \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactor/src/Mono.kt b/reactive/kotlinx-coroutines-reactor/src/Mono.kt index 65e2c747c7..afb86fec00 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Mono.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Mono.kt @@ -57,6 +57,8 @@ private class MonoCoroutine( parentContext: CoroutineContext, private val sink: MonoSink ) : AbstractCoroutine(parentContext, true), Disposable { + override val cancelsParent: Boolean get() = true + var disposed = false override fun onCompleted(value: T) { diff --git a/reactive/kotlinx-coroutines-reactor/test/ConvertTest.kt b/reactive/kotlinx-coroutines-reactor/test/ConvertTest.kt index f22446e67a..8d36779f6a 100644 --- a/reactive/kotlinx-coroutines-reactor/test/ConvertTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/ConvertTest.kt @@ -35,7 +35,7 @@ class ConvertTest : TestBase() { expect(3) throw RuntimeException("OK") } - val mono = job.asMono(coroutineContext) + val mono = job.asMono(coroutineContext + NonCancellable) mono.subscribe( { fail("no item should be emitted") }, { expect(4) } diff --git a/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt b/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt index 7ebd80db5c..97196fd8f8 100644 --- a/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt @@ -29,7 +29,7 @@ class FluxTest : TestBase() { @Test fun testBasicFailure() = runBlocking { expect(1) - val flux = flux { + val flux = flux(NonCancellable) { expect(4) throw RuntimeException("OK") } @@ -67,4 +67,17 @@ class FluxTest : TestBase() { yield() finish(6) } + + @Test + fun testCancelsParentOnFailure() = runTest( + expected = { it is RuntimeException && it.message == "OK" } + ) { + // has parent, so should cancel it on failure + flux { + throw RuntimeException("OK") + }.subscribe( + { expectUnreached() }, + { assert(it is RuntimeException) } + ) + } } \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt b/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt index e840a328dd..9655fb995e 100644 --- a/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt @@ -38,7 +38,7 @@ class MonoTest : TestBase() { @Test fun testBasicFailure() = runBlocking { expect(1) - val mono = mono { + val mono = mono(NonCancellable) { expect(4) throw RuntimeException("OK") } @@ -191,4 +191,17 @@ class MonoTest : TestBase() { assertEquals("OK", it.message) } } + + @Test + fun testCancelsParentOnFailure() = runTest( + expected = { it is RuntimeException && it.message == "OK" } + ) { + // has parent, so should cancel it on failure + mono { + throw RuntimeException("OK") + }.subscribe( + { expectUnreached() }, + { assert(it is RuntimeException) } + ) + } } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt index 159bc3ca65..0c975a09d6 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt @@ -56,6 +56,7 @@ private class RxCompletableCoroutine( parentContext: CoroutineContext, private val subscriber: CompletableEmitter ) : AbstractCoroutine(parentContext, true) { + override val cancelsParent: Boolean get() = true override fun onCompleted(value: Unit) { if (!subscriber.isDisposed) subscriber.onComplete() } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt index 4b9496bc1b..a4cb0e9e0d 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt @@ -57,6 +57,7 @@ private class RxMaybeCoroutine( parentContext: CoroutineContext, private val subscriber: MaybeEmitter ) : AbstractCoroutine(parentContext, true) { + override val cancelsParent: Boolean get() = true override fun onCompleted(value: T) { if (!subscriber.isDisposed) { if (value == null) subscriber.onComplete() else subscriber.onSuccess(value) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt index bb26bbf7e5..6588962716 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt @@ -73,6 +73,7 @@ private class RxObservableCoroutine( private val subscriber: ObservableEmitter ) : AbstractCoroutine(parentContext, true), ProducerScope, SelectClause2> { override val channel: SendChannel get() = this + override val cancelsParent: Boolean get() = true // Mutex is locked when while subscriber.onXXX is being invoked private val mutex = Mutex() diff --git a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt index 1fb2799c75..67d1994919 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt @@ -56,6 +56,7 @@ private class RxSingleCoroutine( parentContext: CoroutineContext, private val subscriber: SingleEmitter ) : AbstractCoroutine(parentContext, true) { + override val cancelsParent: Boolean get() = true override fun onCompleted(value: T) { if (!subscriber.isDisposed) subscriber.onSuccess(value) } diff --git a/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt b/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt index e2914a7342..cfb5475367 100644 --- a/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt @@ -5,6 +5,7 @@ package kotlinx.coroutines.experimental.rx2 import kotlinx.coroutines.experimental.* +import kotlinx.coroutines.experimental.reactive.* import org.hamcrest.core.* import org.junit.* import org.junit.Assert.* @@ -28,7 +29,7 @@ class CompletableTest : TestBase() { @Test fun testBasicFailure() = runBlocking { expect(1) - val completable = rxCompletable { + val completable = rxCompletable(NonCancellable) { expect(4) throw RuntimeException("OK") } @@ -82,7 +83,7 @@ class CompletableTest : TestBase() { @Test fun testAwaitFailure() = runBlocking { expect(1) - val completable = rxCompletable { + val completable = rxCompletable(NonCancellable) { expect(3) throw RuntimeException("OK") } @@ -95,4 +96,17 @@ class CompletableTest : TestBase() { assertThat(e.message, IsEqual("OK")) } } + + @Test + fun testCancelsParentOnFailure() = runTest( + expected = { it is RuntimeException && it.message == "OK" } + ) { + // has parent, so should cancel it on failure + rxCompletable { + throw RuntimeException("OK") + }.subscribe( + { expectUnreached() }, + { assert(it is RuntimeException) } + ) + } } diff --git a/reactive/kotlinx-coroutines-rx2/test/FlowableTest.kt b/reactive/kotlinx-coroutines-rx2/test/FlowableTest.kt index 415a65bf47..a73c3276a8 100644 --- a/reactive/kotlinx-coroutines-rx2/test/FlowableTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/FlowableTest.kt @@ -5,6 +5,7 @@ package kotlinx.coroutines.experimental.rx2 import kotlinx.coroutines.experimental.* +import kotlinx.coroutines.experimental.reactive.* import org.hamcrest.core.* import org.junit.* @@ -29,7 +30,7 @@ class FlowableTest : TestBase() { @Test fun testBasicFailure() = runBlocking { expect(1) - val observable = rxFlowable { + val observable = rxFlowable(NonCancellable) { expect(4) throw RuntimeException("OK") } @@ -67,4 +68,17 @@ class FlowableTest : TestBase() { yield() finish(6) } + + @Test + fun testCancelsParentOnFailure() = runTest( + expected = { it is RuntimeException && it.message == "OK" } + ) { + // has parent, so should cancel it on failure + rxFlowable { + throw RuntimeException("OK") + }.subscribe( + { expectUnreached() }, + { assert(it is RuntimeException) } + ) + } } \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt index b5ca5b93d7..07b0feb514 100644 --- a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt @@ -55,7 +55,7 @@ class MaybeTest : TestBase() { @Test fun testBasicFailure() = runBlocking { expect(1) - val maybe = rxMaybe { + val maybe = rxMaybe(NonCancellable) { expect(4) throw RuntimeException("OK") } @@ -198,4 +198,17 @@ class MaybeTest : TestBase() { assertEquals("OK", it.message) } } + + @Test + fun testCancelsParentOnFailure() = runTest( + expected = { it is RuntimeException && it.message == "OK" } + ) { + // has parent, so should cancel it on failure + rxMaybe { + throw RuntimeException("OK") + }.subscribe( + { expectUnreached() }, + { assert(it is RuntimeException) } + ) + } } diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt index 0e2575bc64..a91dcc3d01 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt @@ -29,7 +29,7 @@ class ObservableTest : TestBase() { @Test fun testBasicFailure() = runBlocking { expect(1) - val observable = rxObservable { + val observable = rxObservable(NonCancellable) { expect(4) throw RuntimeException("OK") } @@ -67,4 +67,17 @@ class ObservableTest : TestBase() { yield() finish(6) } + + @Test + fun testCancelsParentOnFailure() = runTest( + expected = { it is RuntimeException && it.message == "OK" } + ) { + // has parent, so should cancel it on failure + rxObservable { + throw RuntimeException("OK") + }.subscribe( + { expectUnreached() }, + { assert(it is RuntimeException) } + ) + } } \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt b/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt index fe04727ec7..fdb66c7864 100644 --- a/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt @@ -37,7 +37,7 @@ class SingleTest : TestBase() { @Test fun testBasicFailure() = runBlocking { expect(1) - val single = rxSingle { + val single = rxSingle(NonCancellable) { expect(4) throw RuntimeException("OK") } @@ -175,4 +175,17 @@ class SingleTest : TestBase() { assertEquals("OK", it.message) } } + + @Test + fun testCancelsParentOnFailure() = runTest( + expected = { it is RuntimeException && it.message == "OK" } + ) { + // has parent, so should cancel it on failure + rxSingle { + throw RuntimeException("OK") + }.subscribe( + { expectUnreached() }, + { assert(it is RuntimeException) } + ) + } }