Skip to content

Commit 24ef1b1

Browse files
committed
Introduce CancellableContinuation.resume with onCancelling lambda
* Allows safe return of closeable resources from suspending functions, as it provides a way to close a resource if the corresponding job was cancelled. * Documentation on the context and expected behavior of CompletionHandler implementations is updated. Fixes #1044
1 parent aad393b commit 24ef1b1

File tree

6 files changed

+206
-12
lines changed

6 files changed

+206
-12
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+3
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public abstract interface class kotlinx/coroutines/CancellableContinuation : kot
4040
public abstract fun isActive ()Z
4141
public abstract fun isCancelled ()Z
4242
public abstract fun isCompleted ()Z
43+
public abstract fun resume (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)V
4344
public abstract fun resumeUndispatched (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Object;)V
4445
public abstract fun resumeUndispatchedWithException (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Throwable;)V
4546
public abstract fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
@@ -54,6 +55,7 @@ public final class kotlinx/coroutines/CancellableContinuation$DefaultImpls {
5455
public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/jvm/internal/CoroutineStackFrame, kotlinx/coroutines/CancellableContinuation {
5556
public fun <init> (Lkotlin/coroutines/Continuation;I)V
5657
public fun cancel (Ljava/lang/Throwable;)Z
58+
public fun cancelResult (Ljava/lang/Object;Ljava/lang/Throwable;)V
5759
public fun completeResume (Ljava/lang/Object;)V
5860
public fun getCallerFrame ()Lkotlin/coroutines/jvm/internal/CoroutineStackFrame;
5961
public fun getContext ()Lkotlin/coroutines/CoroutineContext;
@@ -68,6 +70,7 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/
6870
public fun isCancelled ()Z
6971
public fun isCompleted ()Z
7072
protected fun nameString ()Ljava/lang/String;
73+
public fun resume (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)V
7174
public fun resumeUndispatched (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Object;)V
7275
public fun resumeUndispatchedWithException (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Throwable;)V
7376
public fun resumeWith (Ljava/lang/Object;)V

kotlinx-coroutines-core/common/src/CancellableContinuation.kt

+34-2
Original file line numberDiff line numberDiff line change
@@ -123,10 +123,15 @@ public interface CancellableContinuation<in T> : Continuation<T> {
123123
* with cancellation exception. Otherwise, the handler will be invoked once on cancellation if this
124124
* continuation is cancelled.
125125
*
126-
* Installed [handler] should not throw any exceptions. If it does, they will get caught,
127-
* wrapped into [CompletionHandlerException], and rethrown, potentially causing the crash of unrelated code.
126+
* Installed [handler] should not throw any exceptions.
127+
* If it does, they will get caught, wrapped into [CompletionHandlerException] and
128+
* processed as uncaught exception in the context of the current coroutine
129+
* (see [CoroutineExceptionHandler]).
128130
*
129131
* At most one [handler] can be installed on one continuation.
132+
*
133+
* **Note**: Implementation of `CompletionHandler` must be fast, non-blocking, and thread-safe.
134+
* There is no guarantee on the execution context in which the [handler] is invoked.
130135
*/
131136
public fun invokeOnCancellation(handler: CompletionHandler)
132137

@@ -151,6 +156,33 @@ public interface CancellableContinuation<in T> : Continuation<T> {
151156
*/
152157
@ExperimentalCoroutinesApi
153158
public fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable)
159+
160+
/**
161+
* Resumes this continuation with a given [value] and calls the specified [onCancellation]
162+
* handler when resumed too late (when continuation was already cancelled) or when resumed
163+
* successfully (before cancellation), but coroutine's job was cancelled before it had a
164+
* chance to run in its dispatcher, so that suspended function threw an exception
165+
* instead of returning this value.
166+
*
167+
* Installed [onCancellation] handler should not throw any exceptions.
168+
* If it does, they will get caught, wrapped into [CompletionHandlerException] and
169+
* processed as uncaught exception in the context of the current coroutine
170+
* (see [CoroutineExceptionHandler]).
171+
*
172+
* This function shall be used when resuming with a resource that must be closed by the
173+
* code that had called the corresponding suspending function, e.g.:
174+
*
175+
* ```
176+
* continuation.resume(resource) {
177+
* resource.close()
178+
* }
179+
* ```
180+
*
181+
* **Note**: Implementation of `CompletionHandler` must be fast, non-blocking, and thread-safe.
182+
* There is no guarantee on the execution context in which the [onCancellation] handler is invoked.
183+
*/
184+
@ExperimentalCoroutinesApi // since 1.2.0, tentatively graduates in 1.3.0
185+
public fun resume(value: T, onCancellation: CompletionHandler)
154186
}
155187

156188
/**

kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt

+36-5
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,14 @@ internal open class CancellableContinuationImpl<in T>(
102102

103103
override fun takeState(): Any? = state
104104

105+
override fun cancelResult(state: Any?, cause: Throwable) {
106+
if (state is CompletedWithCancellation) {
107+
invokeHandlerSafely {
108+
state.onCancellation.invokeIt(cause)
109+
}
110+
}
111+
}
112+
105113
public override fun cancel(cause: Throwable?): Boolean {
106114
_state.loop { state ->
107115
if (state !is NotCompleted) return false // false if already complete or cancelling
@@ -165,8 +173,18 @@ internal open class CancellableContinuationImpl<in T>(
165173
return getSuccessfulResult(state)
166174
}
167175

168-
override fun resumeWith(result: Result<T>) =
176+
override fun resumeWith(result: Result<T>) {
169177
resumeImpl(result.toState(), resumeMode)
178+
}
179+
180+
override fun resume(value: T, onCancellation: CompletionHandler) {
181+
if (!resumeImpl(CompletedWithCancellation(value, onCancellation), resumeMode)) {
182+
// too late to resume (was cancelled) -- call handler
183+
invokeHandlerSafely {
184+
onCancellation.invokeIt((state as? CompletedExceptionally)?.cause)
185+
}
186+
}
187+
}
170188

171189
internal fun resumeWithExceptionMode(exception: Throwable, mode: Int) =
172190
resumeImpl(CompletedExceptionally(exception), mode)
@@ -219,22 +237,23 @@ internal open class CancellableContinuationImpl<in T>(
219237
dispatch(mode)
220238
}
221239

222-
private fun resumeImpl(proposedUpdate: Any?, resumeMode: Int) {
240+
// returns true when successfully dispatched resumed, false if too late
241+
private fun resumeImpl(proposedUpdate: Any?, resumeMode: Int): Boolean {
223242
_state.loop { state ->
224243
when (state) {
225244
is NotCompleted -> {
226245
if (!_state.compareAndSet(state, proposedUpdate)) return@loop // retry on cas failure
227246
disposeParentHandle()
228247
dispatchResume(resumeMode)
229-
return
248+
return true
230249
}
231250
is CancelledContinuation -> {
232251
/*
233252
* If continuation was cancelled, then resume attempt must be ignored,
234253
* because cancellation is asynchronous and may race with resume.
235254
* Racy exceptions will be lost, too.
236255
*/
237-
if (state.makeResumed()) return // ok -- resumed just once
256+
if (state.makeResumed()) return false // ok -- tried to resume just once
238257
}
239258
}
240259
alreadyResumedError(proposedUpdate) // otherwise -- an error (second resume attempt)
@@ -307,7 +326,11 @@ internal open class CancellableContinuationImpl<in T>(
307326

308327
@Suppress("UNCHECKED_CAST")
309328
override fun <T> getSuccessfulResult(state: Any?): T =
310-
if (state is CompletedIdempotentResult) state.result as T else state as T
329+
when (state) {
330+
is CompletedIdempotentResult -> state.result as T
331+
is CompletedWithCancellation -> state.result as T
332+
else -> state as T
333+
}
311334

312335
// For nicer debugging
313336
public override fun toString(): String =
@@ -344,3 +367,11 @@ private class CompletedIdempotentResult(
344367
) {
345368
override fun toString(): String = "CompletedIdempotentResult[$result]"
346369
}
370+
371+
private class CompletedWithCancellation(
372+
@JvmField val result: Any?,
373+
@JvmField val onCancellation: CompletionHandler
374+
) {
375+
override fun toString(): String = "CompletedWithCancellation[$result]"
376+
}
377+

kotlinx-coroutines-core/common/src/Dispatched.kt

+7-3
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,8 @@ internal abstract class DispatchedTask<in T>(
207207

208208
public abstract fun takeState(): Any?
209209

210+
public open fun cancelResult(state: Any?, cause: Throwable) {}
211+
210212
@Suppress("UNCHECKED_CAST")
211213
public open fun <T> getSuccessfulResult(state: Any?): T =
212214
state as T
@@ -224,9 +226,11 @@ internal abstract class DispatchedTask<in T>(
224226
val job = if (resumeMode.isCancellableMode) context[Job] else null
225227
val state = takeState() // NOTE: Must take state in any case, even if cancelled
226228
withCoroutineContext(context, delegate.countOrElement) {
227-
if (job != null && !job.isActive)
228-
continuation.resumeWithException(job.getCancellationException())
229-
else {
229+
if (job != null && !job.isActive) {
230+
val cause = job.getCancellationException()
231+
cancelResult(state, cause)
232+
continuation.resumeWithException(cause)
233+
} else {
230234
val exception = getExceptionalResult(state)
231235
if (exception != null)
232236
continuation.resumeWithStackTrace(exception)

kotlinx-coroutines-core/common/src/Job.kt

+4-2
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,8 @@ public interface Job : CoroutineContext.Element {
273273
* Installed [handler] should not throw any exceptions. If it does, they will get caught,
274274
* wrapped into [CompletionHandlerException], and rethrown, potentially causing crash of unrelated code.
275275
*
276-
* **Note**: Implementations of `CompletionHandler` must be fast and _lock-free_.
276+
* **Note**: Implementation of `CompletionHandler` must be fast, non-blocking, and thread-safe.
277+
* There is no guarantee on the execution context in which the [handler] is invoked.
277278
*/
278279
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
279280

@@ -304,7 +305,8 @@ public interface Job : CoroutineContext.Element {
304305
* **Note**: This function is a part of internal machinery that supports parent-child hierarchies
305306
* and allows for implementation of suspending functions that wait on the Job's state.
306307
* This function should not be used in general application code.
307-
* Implementations of `CompletionHandler` must be fast and _lock-free_.
308+
* Implementation of `CompletionHandler` must be fast, non-blocking, and thread-safe.
309+
* There is no guarantee on the execution context in which the [handler] is invoked.
308310
*
309311
* @param onCancelling when `true`, then the [handler] is invoked as soon as this job transitions to _cancelling_ state;
310312
* when `false` then the [handler] is invoked only when it transitions to _completed_ state.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913
6+
7+
package kotlinx.coroutines
8+
9+
import kotlin.test.*
10+
11+
/**
12+
* Test for [CancellableContinuation.resume] with `onCancellation` parameter.
13+
*/
14+
class CancellableResumeTest : TestBase() {
15+
@Test
16+
fun testResumeImmediateNormally() = runTest {
17+
expect(1)
18+
val ok = suspendCancellableCoroutine<String> { cont ->
19+
expect(2)
20+
cont.invokeOnCancellation { expectUnreached() }
21+
cont.resume("OK") { expectUnreached() }
22+
expect(3)
23+
}
24+
assertEquals("OK", ok)
25+
finish(4)
26+
}
27+
28+
@Test
29+
fun testResumeImmediateAfterCancel() = runTest(
30+
expected = { it is TestException }
31+
) {
32+
expect(1)
33+
val ok = suspendCancellableCoroutine<String> { cont ->
34+
expect(2)
35+
cont.invokeOnCancellation { expect(3) }
36+
cont.cancel(TestException("FAIL"))
37+
expect(4)
38+
cont.resume("OK") { cause ->
39+
expect(5)
40+
assertTrue(cause is TestException)
41+
}
42+
finish(6)
43+
}
44+
expectUnreached()
45+
}
46+
47+
@Test
48+
fun testResumeLaterNormally() = runTest {
49+
expect(1)
50+
lateinit var cc: CancellableContinuation<String>
51+
launch(start = CoroutineStart.UNDISPATCHED) {
52+
expect(2)
53+
val ok = suspendCancellableCoroutine<String> { cont ->
54+
expect(3)
55+
cont.invokeOnCancellation { expectUnreached() }
56+
cc = cont
57+
}
58+
assertEquals("OK", ok)
59+
finish(6)
60+
}
61+
expect(4)
62+
cc.resume("OK") { expectUnreached() }
63+
expect(5)
64+
}
65+
66+
@Test
67+
fun testResumeLaterAfterCancel() = runTest {
68+
expect(1)
69+
lateinit var cc: CancellableContinuation<String>
70+
val job = launch(start = CoroutineStart.UNDISPATCHED) {
71+
expect(2)
72+
try {
73+
suspendCancellableCoroutine<String> { cont ->
74+
expect(3)
75+
cont.invokeOnCancellation { expect(5) }
76+
cc = cont
77+
}
78+
expectUnreached()
79+
} catch (e: CancellationException) {
80+
finish(9)
81+
}
82+
}
83+
expect(4)
84+
job.cancel(TestCancellationException())
85+
expect(6)
86+
cc.resume("OK") { cause ->
87+
expect(7)
88+
assertTrue(cause is TestCancellationException)
89+
}
90+
expect(8)
91+
}
92+
93+
@Test
94+
fun testResumeCancelWhileDispatched() = runTest {
95+
expect(1)
96+
lateinit var cc: CancellableContinuation<String>
97+
val job = launch(start = CoroutineStart.UNDISPATCHED) {
98+
expect(2)
99+
try {
100+
suspendCancellableCoroutine<String> { cont ->
101+
expect(3)
102+
// resumed first, then cancelled, so no invokeOnCancellation call
103+
cont.invokeOnCancellation { expectUnreached() }
104+
cc = cont
105+
}
106+
expectUnreached()
107+
} catch (e: CancellationException) {
108+
expect(8)
109+
}
110+
}
111+
expect(4)
112+
cc.resume("OK") { cause ->
113+
expect(7)
114+
assertTrue(cause is TestCancellationException)
115+
}
116+
expect(5)
117+
job.cancel(TestCancellationException()) // cancel while execution is dispatched
118+
expect(6)
119+
yield() // to coroutine -- throws cancellation exception
120+
finish(9)
121+
}
122+
}

0 commit comments

Comments
 (0)