Skip to content

Introduce CancellableContinuation.resume with onCancelling lambda #1084

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public abstract interface class kotlinx/coroutines/CancellableContinuation : kot
public abstract fun isActive ()Z
public abstract fun isCancelled ()Z
public abstract fun isCompleted ()Z
public abstract fun resume (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)V
public abstract fun resumeUndispatched (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Object;)V
public abstract fun resumeUndispatchedWithException (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Throwable;)V
public abstract fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
Expand All @@ -58,20 +59,18 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/
public fun getCallerFrame ()Lkotlin/coroutines/jvm/internal/CoroutineStackFrame;
public fun getContext ()Lkotlin/coroutines/CoroutineContext;
public fun getContinuationCancellationCause (Lkotlinx/coroutines/Job;)Ljava/lang/Throwable;
public final fun getDelegate ()Lkotlin/coroutines/Continuation;
public final fun getResult ()Ljava/lang/Object;
public fun getStackTraceElement ()Ljava/lang/StackTraceElement;
public fun getSuccessfulResult (Ljava/lang/Object;)Ljava/lang/Object;
public synthetic fun initCancellability ()V
public fun invokeOnCancellation (Lkotlin/jvm/functions/Function1;)V
public fun isActive ()Z
public fun isCancelled ()Z
public fun isCompleted ()Z
protected fun nameString ()Ljava/lang/String;
public fun resume (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)V
public fun resumeUndispatched (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Object;)V
public fun resumeUndispatchedWithException (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Throwable;)V
public fun resumeWith (Ljava/lang/Object;)V
public fun takeState ()Ljava/lang/Object;
public fun toString ()Ljava/lang/String;
public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
public fun tryResumeWithException (Ljava/lang/Throwable;)Ljava/lang/Object;
Expand Down
38 changes: 36 additions & 2 deletions kotlinx-coroutines-core/common/src/CancellableContinuation.kt
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,16 @@ public interface CancellableContinuation<in T> : Continuation<T> {
* with cancellation exception. Otherwise, the handler will be invoked once on cancellation if this
* continuation is cancelled.
*
* Installed [handler] should not throw any exceptions. If it does, they will get caught,
* wrapped into [CompletionHandlerException], and rethrown, potentially causing the crash of unrelated code.
* Installed [handler] should not throw any exceptions.
* If it does, they will get caught, wrapped into [CompletionHandlerException] and
* processed as uncaught exception in the context of the current coroutine
* (see [CoroutineExceptionHandler]).
*
* At most one [handler] can be installed on one continuation.
*
* **Note**: Implementation of `CompletionHandler` must be fast, non-blocking, and thread-safe.
* This handler can be invoked concurrently with the surrounding code.
* There is no guarantee on the execution context in which the [handler] is invoked.
*/
public fun invokeOnCancellation(handler: CompletionHandler)

Expand All @@ -151,6 +157,34 @@ public interface CancellableContinuation<in T> : Continuation<T> {
*/
@ExperimentalCoroutinesApi
public fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable)

/**
* Resumes this continuation with a given [value] and calls the specified [onCancellation]
* handler when resumed too late (when continuation was already cancelled) or when resumed
* successfully (before cancellation), but coroutine's job was cancelled before it had a
* chance to run in its dispatcher, so that suspended function threw an exception
* instead of returning this value.
*
* Installed [onCancellation] handler should not throw any exceptions.
* If it does, they will get caught, wrapped into [CompletionHandlerException] and
* processed as uncaught exception in the context of the current coroutine
* (see [CoroutineExceptionHandler]).
*
* This function shall be used when resuming with a resource that must be closed by the
* code that had called the corresponding suspending function, e.g.:
*
* ```
* continuation.resume(resource) {
* resource.close()
* }
* ```
*
* **Note**: Implementation of [onCancellation] handler must be fast, non-blocking, and thread-safe.
* This handler can be invoked concurrently with the surrounding code.
* There is no guarantee on the execution context in which the [onCancellation] handler is invoked.
*/
@ExperimentalCoroutinesApi // since 1.2.0, tentatively graduates in 1.3.0
public fun resume(value: T, onCancellation: (cause: Throwable) -> Unit)
}

/**
Expand Down
44 changes: 38 additions & 6 deletions kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ private const val RESUMED = 2
*/
@PublishedApi
internal open class CancellableContinuationImpl<in T>(
public final override val delegate: Continuation<T>,
final override val delegate: Continuation<T>,
resumeMode: Int
) : DispatchedTask<T>(resumeMode), CancellableContinuation<T>, CoroutineStackFrame {
public override val context: CoroutineContext = delegate.context
Expand Down Expand Up @@ -102,6 +102,14 @@ internal open class CancellableContinuationImpl<in T>(

override fun takeState(): Any? = state

override fun cancelResult(state: Any?, cause: Throwable) {
if (state is CompletedWithCancellation) {
invokeHandlerSafely {
state.onCancellation(cause)
}
}
}

public override fun cancel(cause: Throwable?): Boolean {
_state.loop { state ->
if (state !is NotCompleted) return false // false if already complete or cancelling
Expand Down Expand Up @@ -165,8 +173,19 @@ internal open class CancellableContinuationImpl<in T>(
return getSuccessfulResult(state)
}

override fun resumeWith(result: Result<T>) =
override fun resumeWith(result: Result<T>) {
resumeImpl(result.toState(), resumeMode)
}

override fun resume(value: T, onCancellation: (cause: Throwable) -> Unit) {
val cancelled = resumeImpl(CompletedWithCancellation(value, onCancellation), resumeMode)
if (cancelled != null) {
// too late to resume (was cancelled) -- call handler
invokeHandlerSafely {
onCancellation(cancelled.cause)
}
}
}

internal fun resumeWithExceptionMode(exception: Throwable, mode: Int) =
resumeImpl(CompletedExceptionally(exception), mode)
Expand Down Expand Up @@ -219,22 +238,23 @@ internal open class CancellableContinuationImpl<in T>(
dispatch(mode)
}

private fun resumeImpl(proposedUpdate: Any?, resumeMode: Int) {
// returns null when successfully dispatched resumed, CancelledContinuation if too late (was already cancelled)
private fun resumeImpl(proposedUpdate: Any?, resumeMode: Int): CancelledContinuation? {
_state.loop { state ->
when (state) {
is NotCompleted -> {
if (!_state.compareAndSet(state, proposedUpdate)) return@loop // retry on cas failure
disposeParentHandle()
dispatchResume(resumeMode)
return
return null
}
is CancelledContinuation -> {
/*
* If continuation was cancelled, then resume attempt must be ignored,
* because cancellation is asynchronous and may race with resume.
* Racy exceptions will be lost, too.
*/
if (state.makeResumed()) return // ok -- resumed just once
if (state.makeResumed()) return state // tried to resume just once, but was cancelled
}
}
alreadyResumedError(proposedUpdate) // otherwise -- an error (second resume attempt)
Expand Down Expand Up @@ -307,7 +327,11 @@ internal open class CancellableContinuationImpl<in T>(

@Suppress("UNCHECKED_CAST")
override fun <T> getSuccessfulResult(state: Any?): T =
if (state is CompletedIdempotentResult) state.result as T else state as T
when (state) {
is CompletedIdempotentResult -> state.result as T
is CompletedWithCancellation -> state.result as T
else -> state as T
}

// For nicer debugging
public override fun toString(): String =
Expand Down Expand Up @@ -344,3 +368,11 @@ private class CompletedIdempotentResult(
) {
override fun toString(): String = "CompletedIdempotentResult[$result]"
}

private class CompletedWithCancellation(
@JvmField val result: Any?,
@JvmField val onCancellation: (cause: Throwable) -> Unit
) {
override fun toString(): String = "CompletedWithCancellation[$result]"
}

18 changes: 11 additions & 7 deletions kotlinx-coroutines-core/common/src/Dispatched.kt
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,17 @@ internal fun <T> Continuation<T>.resumeDirectWithException(exception: Throwable)
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
public abstract val delegate: Continuation<T>
internal abstract val delegate: Continuation<T>

public abstract fun takeState(): Any?
internal abstract fun takeState(): Any?

internal open fun cancelResult(state: Any?, cause: Throwable) {}

@Suppress("UNCHECKED_CAST")
public open fun <T> getSuccessfulResult(state: Any?): T =
internal open fun <T> getSuccessfulResult(state: Any?): T =
state as T

public fun getExceptionalResult(state: Any?): Throwable? =
internal fun getExceptionalResult(state: Any?): Throwable? =
(state as? CompletedExceptionally)?.cause

public final override fun run() {
Expand All @@ -224,9 +226,11 @@ internal abstract class DispatchedTask<in T>(
val job = if (resumeMode.isCancellableMode) context[Job] else null
val state = takeState() // NOTE: Must take state in any case, even if cancelled
withCoroutineContext(context, delegate.countOrElement) {
if (job != null && !job.isActive)
continuation.resumeWithException(job.getCancellationException())
else {
if (job != null && !job.isActive) {
val cause = job.getCancellationException()
cancelResult(state, cause)
continuation.resumeWithException(cause)
} else {
val exception = getExceptionalResult(state)
if (exception != null)
continuation.resumeWithStackTrace(exception)
Expand Down
8 changes: 6 additions & 2 deletions kotlinx-coroutines-core/common/src/Job.kt
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,9 @@ public interface Job : CoroutineContext.Element {
* Installed [handler] should not throw any exceptions. If it does, they will get caught,
* wrapped into [CompletionHandlerException], and rethrown, potentially causing crash of unrelated code.
*
* **Note**: Implementations of `CompletionHandler` must be fast and _lock-free_.
* **Note**: Implementation of `CompletionHandler` must be fast, non-blocking, and thread-safe.
* This handler can be invoked concurrently with the surrounding code.
* There is no guarantee on the execution context in which the [handler] is invoked.
*/
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle

Expand Down Expand Up @@ -304,7 +306,9 @@ public interface Job : CoroutineContext.Element {
* **Note**: This function is a part of internal machinery that supports parent-child hierarchies
* and allows for implementation of suspending functions that wait on the Job's state.
* This function should not be used in general application code.
* Implementations of `CompletionHandler` must be fast and _lock-free_.
* Implementation of `CompletionHandler` must be fast, non-blocking, and thread-safe.
* This handler can be invoked concurrently with the surrounding code.
* There is no guarantee on the execution context in which the [handler] is invoked.
*
* @param onCancelling when `true`, then the [handler] is invoked as soon as this job transitions to _cancelling_ state;
* when `false` then the [handler] is invoked only when it transitions to _completed_ state.
Expand Down
122 changes: 122 additions & 0 deletions kotlinx-coroutines-core/common/test/CancellableResumeTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913

package kotlinx.coroutines

import kotlin.test.*

/**
* Test for [CancellableContinuation.resume] with `onCancellation` parameter.
*/
class CancellableResumeTest : TestBase() {
@Test
fun testResumeImmediateNormally() = runTest {
expect(1)
val ok = suspendCancellableCoroutine<String> { cont ->
expect(2)
cont.invokeOnCancellation { expectUnreached() }
cont.resume("OK") { expectUnreached() }
expect(3)
}
assertEquals("OK", ok)
finish(4)
}

@Test
fun testResumeImmediateAfterCancel() = runTest(
expected = { it is TestException }
) {
expect(1)
val ok = suspendCancellableCoroutine<String> { cont ->
expect(2)
cont.invokeOnCancellation { expect(3) }
cont.cancel(TestException("FAIL"))
expect(4)
cont.resume("OK") { cause ->
expect(5)
assertTrue(cause is TestException)
}
finish(6)
}
expectUnreached()
}

@Test
fun testResumeLaterNormally() = runTest {
expect(1)
lateinit var cc: CancellableContinuation<String>
launch(start = CoroutineStart.UNDISPATCHED) {
expect(2)
val ok = suspendCancellableCoroutine<String> { cont ->
expect(3)
cont.invokeOnCancellation { expectUnreached() }
cc = cont
}
assertEquals("OK", ok)
finish(6)
}
expect(4)
cc.resume("OK") { expectUnreached() }
expect(5)
}

@Test
fun testResumeLaterAfterCancel() = runTest {
expect(1)
lateinit var cc: CancellableContinuation<String>
val job = launch(start = CoroutineStart.UNDISPATCHED) {
expect(2)
try {
suspendCancellableCoroutine<String> { cont ->
expect(3)
cont.invokeOnCancellation { expect(5) }
cc = cont
}
expectUnreached()
} catch (e: CancellationException) {
finish(9)
}
}
expect(4)
job.cancel(TestCancellationException())
expect(6)
cc.resume("OK") { cause ->
expect(7)
assertTrue(cause is TestCancellationException)
}
expect(8)
}

@Test
fun testResumeCancelWhileDispatched() = runTest {
expect(1)
lateinit var cc: CancellableContinuation<String>
val job = launch(start = CoroutineStart.UNDISPATCHED) {
expect(2)
try {
suspendCancellableCoroutine<String> { cont ->
expect(3)
// resumed first, then cancelled, so no invokeOnCancellation call
cont.invokeOnCancellation { expectUnreached() }
cc = cont
}
expectUnreached()
} catch (e: CancellationException) {
expect(8)
}
}
expect(4)
cc.resume("OK") { cause ->
expect(7)
assertTrue(cause is TestCancellationException)
}
expect(5)
job.cancel(TestCancellationException()) // cancel while execution is dispatched
expect(6)
yield() // to coroutine -- throws cancellation exception
finish(9)
}
}