From 0a251f50ed9f0945c2798a7b6d06b4e7a28ffd25 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 3 Jul 2018 11:11:21 +0300 Subject: [PATCH] Change order of handlers in JobSupport and CancellableContinuation 1) Invoke handlers before dispatching in CancellableContinuation to make behaviour timing-independent and Unconfined doesn't produce unexpected results 2) Invoke onCancellation -> handlers -> onCompletion in Job to make behaviour timing-independent Fixes #415 --- .../src/AbstractContinuation.kt | 8 +++-- .../src/JobSupport.kt | 15 ++++++++-- .../test/AbstractCoroutineTest.kt | 12 ++++---- .../test/channels/ProduceTest.kt | 30 +++++++++++++++++++ 4 files changed, 55 insertions(+), 10 deletions(-) diff --git a/common/kotlinx-coroutines-core-common/src/AbstractContinuation.kt b/common/kotlinx-coroutines-core-common/src/AbstractContinuation.kt index 5e81b2b090..758ffa706d 100644 --- a/common/kotlinx-coroutines-core-common/src/AbstractContinuation.kt +++ b/common/kotlinx-coroutines-core-common/src/AbstractContinuation.kt @@ -188,7 +188,7 @@ internal abstract class AbstractContinuation( return updateStateToFinal(state, CancelledContinuation(this, cause), mode = MODE_ATOMIC_DEFAULT) } - private fun onCompletionInternal(mode: Int) { + private fun dispatchResume(mode: Int) { if (tryResume()) return // completed before getResult invocation -- bail out // otherwise, getResult has already commenced, i.e. completed later or in other thread dispatch(mode) @@ -326,9 +326,7 @@ internal abstract class AbstractContinuation( protected fun completeStateUpdate(expect: NotCompleted, update: Any?, mode: Int) { val exceptionally = update as? CompletedExceptionally - onCompletionInternal(mode) - // Invoke cancellation handlers only if necessary if (update is CancelledContinuation && expect is CancelHandler) { try { expect.invoke(exceptionally?.cause) @@ -336,6 +334,10 @@ internal abstract class AbstractContinuation( handleException(CompletionHandlerException("Exception in completion handler $expect for $this", ex)) } } + + // Notify all handlers before dispatching, otherwise behaviour will be timing-dependent + // and confusing with Unconfined + dispatchResume(mode) } private fun handleException(exception: Throwable) { diff --git a/common/kotlinx-coroutines-core-common/src/JobSupport.kt b/common/kotlinx-coroutines-core-common/src/JobSupport.kt index 3ebd95e354..b231820966 100644 --- a/common/kotlinx-coroutines-core-common/src/JobSupport.kt +++ b/common/kotlinx-coroutines-core-common/src/JobSupport.kt @@ -207,8 +207,17 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0 private fun completeUpdateState(expect: Incomplete, update: Any?, mode: Int) { val exceptionally = update as? CompletedExceptionally // Do overridable processing before completion handlers - if (!expect.isCancelling) onCancellationInternal(exceptionally) // only notify when was not cancelling before - onCompletionInternal(update, mode) + + /* + * 1) Invoke onCancellationInternal: exception handling, parent/resource cancellation etc. + * 2) Invoke completion handlers: .join(), callbacks etc. It's important to invoke them only AFTER exception handling, see #208 + * 3) Invoke onCompletionInternal: onNext(), timeout deregistration etc. I should be last so all callbacks observe consistent state + * of the job which doesn't depend on callback scheduling + * + * Only notify on cancellation once (expect.isCancelling) + */ + if (!expect.isCancelling) onCancellationInternal(exceptionally) + // Invoke completion handlers val cause = exceptionally?.cause if (expect is JobNode<*>) { // SINGLE/SINGLE+ state -- one completion handler (common case) @@ -220,6 +229,8 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0 } else { expect.list?.notifyCompletion(cause) } + + onCompletionInternal(update, mode) } private inline fun > notifyHandlers(list: NodeList, cause: Throwable?) { diff --git a/common/kotlinx-coroutines-core-common/test/AbstractCoroutineTest.kt b/common/kotlinx-coroutines-core-common/test/AbstractCoroutineTest.kt index 49e48ee031..dc4f6c683c 100644 --- a/common/kotlinx-coroutines-core-common/test/AbstractCoroutineTest.kt +++ b/common/kotlinx-coroutines-core-common/test/AbstractCoroutineTest.kt @@ -24,20 +24,22 @@ class AbstractCoroutineTest : TestBase() { override fun onCompleted(value: String) { assertEquals("OK", value) - expect(6) + expect(8) } override fun onCompletedExceptionally(exception: Throwable) { expectUnreached() } } + coroutine.invokeOnCompletion(onCancelling = true) { assertTrue(it == null) - expect(7) + expect(6) } + coroutine.invokeOnCompletion { assertTrue(it == null) - expect(8) + expect(7) } expect(2) coroutine.start() @@ -66,7 +68,7 @@ class AbstractCoroutineTest : TestBase() { override fun onCompletedExceptionally(exception: Throwable) { assertTrue(exception is TestException1) - expect(8) + expect(9) } } coroutine.invokeOnCompletion(onCancelling = true) { @@ -75,7 +77,7 @@ class AbstractCoroutineTest : TestBase() { } coroutine.invokeOnCompletion { assertTrue(it is TestException1) - expect(9) + expect(8) } expect(2) coroutine.start() diff --git a/common/kotlinx-coroutines-core-common/test/channels/ProduceTest.kt b/common/kotlinx-coroutines-core-common/test/channels/ProduceTest.kt index 3fb344a3da..70252a7646 100644 --- a/common/kotlinx-coroutines-core-common/test/channels/ProduceTest.kt +++ b/common/kotlinx-coroutines-core-common/test/channels/ProduceTest.kt @@ -9,6 +9,7 @@ import kotlin.coroutines.experimental.* import kotlin.test.* class ProduceTest : TestBase() { + @Test fun testBasic() = runTest { val c = produce(coroutineContext) { @@ -81,5 +82,34 @@ class ProduceTest : TestBase() { } } + @Test + fun testCancelOnCompletionUnconfined() = runTest { + cancelOnCompletion(Unconfined) + } + + @Test + fun testCancelOnCompletion() = runTest { + cancelOnCompletion(coroutineContext) + } + + private suspend fun cancelOnCompletion(coroutineContext: CoroutineContext) { + val source = Channel() + expect(1) + val produced = produce(coroutineContext, onCompletion = source.consumes()) { + expect(2) + source.receive() + } + + yield() + expect(3) + produced.cancel() + try { + source.receive() + // TODO shouldn't it be ClosedReceiveChannelException ? + } catch (e: JobCancellationException) { + finish(4) + } + } + private class TestException : Exception() }