Skip to content

Stricter checking for CompletableContinuation state machine #906

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ internal open class CancellableContinuationImpl<in T>(
_state.loop { state ->
if (state !is NotCompleted) return false // false if already complete or cancelling
// Active -- update to final state
if (!_state.compareAndSet(state, CancelledContinuation(this, cause))) return@loop // retry on cas failure
val update = CancelledContinuation(this, cause, handled = state is CancelHandler)
if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
// Invoke cancel handler if it was present
if (state is CancelHandler) invokeHandlerSafely { state.invoke(cause) }
// Complete state update
Expand Down Expand Up @@ -177,28 +178,37 @@ internal open class CancellableContinuationImpl<in T>(
val node = handleCache ?: makeHandler(handler).also { handleCache = it }
if (_state.compareAndSet(state, node)) return // quit on cas success
}
is CancelHandler -> {
error("It's prohibited to register multiple handlers, tried to register $handler, already has $state")
}
is CancelHandler -> multipleHandlersError(handler, state)
is CancelledContinuation -> {
/*
* Continuation was already cancelled, invoke directly.
* NOTE: multiple invokeOnCancellation calls with different handlers are allowed on cancelled continuation.
* It's inconsistent with running continuation, but currently, we have no mechanism to check
* whether any handler was registered during continuation lifecycle without additional overhead.
* This may be changed in the future.
*
* NOTE: multiple invokeOnCancellation calls with different handlers are not allowed,
* so we check to make sure that handler was installed just once.
*/
if (!state.makeHandled()) multipleHandlersError(handler, state)
/*
* :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
* because we play type tricks on Kotlin/JS and handler is not necessarily a function there
*/
invokeHandlerSafely { handler.invokeIt((state as? CompletedExceptionally)?.cause) }
return
}
else -> return
else -> {
/*
* Continuation was already completed, do nothing.
* NOTE: multiple invokeOnCancellation calls with different handlers are not allowed,
* but we have no way to check that it was installed just once in this case.
*/
return
}
}
}
}

private fun multipleHandlersError(handler: CompletionHandler, state: Any?) {
error("It's prohibited to register multiple handlers, tried to register $handler, already has $state")
}

private fun makeHandler(handler: CompletionHandler): CancelHandler =
if (handler is CancelHandler) handler else InvokeOnCancel(handler)

Expand All @@ -219,20 +229,21 @@ internal open class CancellableContinuationImpl<in T>(
}
is CancelledContinuation -> {
/*
* If continuation was cancelled, then all further resumes must be
* ignored, because cancellation is asynchronous and may race with resume.
* Racy exceptions will be lost, too. There does not see to be a safe way to
* handle them without producing spurious crashes.
*
* :todo: we should somehow remember the attempt to invoke resume and fail on the second attempt.
* If continuation was cancelled, then resume attempt must be ignored,
* because cancellation is asynchronous and may race with resume.
* Racy exceptions will be lost, too.
*/
return
if (state.makeResumed()) return // ok -- resumed just once
}
else -> error("Already resumed, but proposed with update $proposedUpdate")
}
alreadyResumedError(proposedUpdate) // otherwise -- an error (second resume attempt)
}
}

private fun alreadyResumedError(proposedUpdate: Any?) {
error("Already resumed, but proposed with update $proposedUpdate")
}

// Unregister from parent job
private fun disposeParentHandle() {
parentHandle?.let { // volatile read parentHandle (once)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package kotlinx.coroutines

import kotlinx.atomicfu.*
import kotlin.coroutines.*
import kotlin.jvm.*

Expand Down Expand Up @@ -31,5 +32,12 @@ internal open class CompletedExceptionally(
*/
internal class CancelledContinuation(
continuation: Continuation<*>,
cause: Throwable?
) : CompletedExceptionally(cause ?: CancellationException("Continuation $continuation was cancelled normally"))
cause: Throwable?,
handled: Boolean
) : CompletedExceptionally(cause ?: CancellationException("Continuation $continuation was cancelled normally")) {
private val resumed = atomic(false)
private val handled = atomic(handled)

fun makeResumed(): Boolean = resumed.compareAndSet(false, true)
fun makeHandled(): Boolean = handled.compareAndSet(false, true)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class CancellableContinuationHandlersTest : TestBase() {
c.resume(Unit)
// Nothing happened
c.invokeOnCancellation { expectUnreached() }
// Cannot validate after completion
c.invokeOnCancellation { expectUnreached() }
}
}
Expand All @@ -36,13 +37,10 @@ class CancellableContinuationHandlersTest : TestBase() {
assertTrue(it is CancellationException)
expect(1)
}
c.invokeOnCancellation {
assertTrue(it is CancellationException)
expect(2)
}
assertFailsWith<IllegalStateException> { c.invokeOnCancellation { expectUnreached() } }
}
} catch (e: CancellationException) {
finish(3)
finish(2)
}
}

Expand All @@ -55,13 +53,10 @@ class CancellableContinuationHandlersTest : TestBase() {
require(it is AssertionError)
expect(1)
}
c.invokeOnCancellation {
require(it is AssertionError)
expect(2)
}
assertFailsWith<IllegalStateException> { c.invokeOnCancellation { expectUnreached() } }
}
} catch (e: AssertionError) {
finish(3)
finish(2)
}
}

Expand All @@ -73,15 +68,11 @@ class CancellableContinuationHandlersTest : TestBase() {
require(it is IndexOutOfBoundsException)
expect(1)
}

c.cancel(IndexOutOfBoundsException())
c.invokeOnCancellation {
require(it is IndexOutOfBoundsException)
expect(2)
}
assertFailsWith<IllegalStateException> { c.invokeOnCancellation { expectUnreached() } }
}
} catch (e: IndexOutOfBoundsException) {
finish(3)
finish(2)
}
}

Expand Down