Skip to content

Commit 2b8218a

Browse files
committed
Introduce CancellableContinuation.resume with onCancelling lambda
* Allows safe return of closeable resources from suspending functions, as it provides a way to close a resource if the corresponding job was cancelled. * Documentation on the context and expected behavior of CompletionHandler implementations is updated. Fixes #1044
1 parent 0808ddd commit 2b8218a

File tree

6 files changed

+215
-20
lines changed

6 files changed

+215
-20
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+2-3
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public abstract interface class kotlinx/coroutines/CancellableContinuation : kot
4040
public abstract fun isActive ()Z
4141
public abstract fun isCancelled ()Z
4242
public abstract fun isCompleted ()Z
43+
public abstract fun resume (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)V
4344
public abstract fun resumeUndispatched (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Object;)V
4445
public abstract fun resumeUndispatchedWithException (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Throwable;)V
4546
public abstract fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
@@ -58,20 +59,18 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/
5859
public fun getCallerFrame ()Lkotlin/coroutines/jvm/internal/CoroutineStackFrame;
5960
public fun getContext ()Lkotlin/coroutines/CoroutineContext;
6061
public fun getContinuationCancellationCause (Lkotlinx/coroutines/Job;)Ljava/lang/Throwable;
61-
public final fun getDelegate ()Lkotlin/coroutines/Continuation;
6262
public final fun getResult ()Ljava/lang/Object;
6363
public fun getStackTraceElement ()Ljava/lang/StackTraceElement;
64-
public fun getSuccessfulResult (Ljava/lang/Object;)Ljava/lang/Object;
6564
public synthetic fun initCancellability ()V
6665
public fun invokeOnCancellation (Lkotlin/jvm/functions/Function1;)V
6766
public fun isActive ()Z
6867
public fun isCancelled ()Z
6968
public fun isCompleted ()Z
7069
protected fun nameString ()Ljava/lang/String;
70+
public fun resume (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)V
7171
public fun resumeUndispatched (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Object;)V
7272
public fun resumeUndispatchedWithException (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Throwable;)V
7373
public fun resumeWith (Ljava/lang/Object;)V
74-
public fun takeState ()Ljava/lang/Object;
7574
public fun toString ()Ljava/lang/String;
7675
public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
7776
public fun tryResumeWithException (Ljava/lang/Throwable;)Ljava/lang/Object;

kotlinx-coroutines-core/common/src/CancellableContinuation.kt

+36-2
Original file line numberDiff line numberDiff line change
@@ -123,10 +123,16 @@ public interface CancellableContinuation<in T> : Continuation<T> {
123123
* with cancellation exception. Otherwise, the handler will be invoked once on cancellation if this
124124
* continuation is cancelled.
125125
*
126-
* Installed [handler] should not throw any exceptions. If it does, they will get caught,
127-
* wrapped into [CompletionHandlerException], and rethrown, potentially causing the crash of unrelated code.
126+
* Installed [handler] should not throw any exceptions.
127+
* If it does, they will get caught, wrapped into [CompletionHandlerException] and
128+
* processed as uncaught exception in the context of the current coroutine
129+
* (see [CoroutineExceptionHandler]).
128130
*
129131
* At most one [handler] can be installed on one continuation.
132+
*
133+
* **Note**: Implementation of `CompletionHandler` must be fast, non-blocking, and thread-safe.
134+
* This handler can be invoked concurrently with the surrounding code.
135+
* There is no guarantee on the execution context in which the [handler] is invoked.
130136
*/
131137
public fun invokeOnCancellation(handler: CompletionHandler)
132138

@@ -151,6 +157,34 @@ public interface CancellableContinuation<in T> : Continuation<T> {
151157
*/
152158
@ExperimentalCoroutinesApi
153159
public fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable)
160+
161+
/**
162+
* Resumes this continuation with a given [value] and calls the specified [onCancellation]
163+
* handler when resumed too late (when continuation was already cancelled) or when resumed
164+
* successfully (before cancellation), but coroutine's job was cancelled before it had a
165+
* chance to run in its dispatcher, so that suspended function threw an exception
166+
* instead of returning this value.
167+
*
168+
* Installed [onCancellation] handler should not throw any exceptions.
169+
* If it does, they will get caught, wrapped into [CompletionHandlerException] and
170+
* processed as uncaught exception in the context of the current coroutine
171+
* (see [CoroutineExceptionHandler]).
172+
*
173+
* This function shall be used when resuming with a resource that must be closed by the
174+
* code that had called the corresponding suspending function, e.g.:
175+
*
176+
* ```
177+
* continuation.resume(resource) {
178+
* resource.close()
179+
* }
180+
* ```
181+
*
182+
* **Note**: Implementation of [onCancellation] handler must be fast, non-blocking, and thread-safe.
183+
* This handler can be invoked concurrently with the surrounding code.
184+
* There is no guarantee on the execution context in which the [onCancellation] handler is invoked.
185+
*/
186+
@ExperimentalCoroutinesApi // since 1.2.0, tentatively graduates in 1.3.0
187+
public fun resume(value: T, onCancellation: (cause: Throwable) -> Unit)
154188
}
155189

156190
/**

kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt

+38-6
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ private const val RESUMED = 2
1919
*/
2020
@PublishedApi
2121
internal open class CancellableContinuationImpl<in T>(
22-
public final override val delegate: Continuation<T>,
22+
final override val delegate: Continuation<T>,
2323
resumeMode: Int
2424
) : DispatchedTask<T>(resumeMode), CancellableContinuation<T>, CoroutineStackFrame {
2525
public override val context: CoroutineContext = delegate.context
@@ -102,6 +102,14 @@ internal open class CancellableContinuationImpl<in T>(
102102

103103
override fun takeState(): Any? = state
104104

105+
override fun cancelResult(state: Any?, cause: Throwable) {
106+
if (state is CompletedWithCancellation) {
107+
invokeHandlerSafely {
108+
state.onCancellation(cause)
109+
}
110+
}
111+
}
112+
105113
public override fun cancel(cause: Throwable?): Boolean {
106114
_state.loop { state ->
107115
if (state !is NotCompleted) return false // false if already complete or cancelling
@@ -165,8 +173,19 @@ internal open class CancellableContinuationImpl<in T>(
165173
return getSuccessfulResult(state)
166174
}
167175

168-
override fun resumeWith(result: Result<T>) =
176+
override fun resumeWith(result: Result<T>) {
169177
resumeImpl(result.toState(), resumeMode)
178+
}
179+
180+
override fun resume(value: T, onCancellation: (cause: Throwable) -> Unit) {
181+
val cancelled = resumeImpl(CompletedWithCancellation(value, onCancellation), resumeMode)
182+
if (cancelled != null) {
183+
// too late to resume (was cancelled) -- call handler
184+
invokeHandlerSafely {
185+
onCancellation(cancelled.cause)
186+
}
187+
}
188+
}
170189

171190
internal fun resumeWithExceptionMode(exception: Throwable, mode: Int) =
172191
resumeImpl(CompletedExceptionally(exception), mode)
@@ -219,22 +238,23 @@ internal open class CancellableContinuationImpl<in T>(
219238
dispatch(mode)
220239
}
221240

222-
private fun resumeImpl(proposedUpdate: Any?, resumeMode: Int) {
241+
// returns null when successfully dispatched resumed, CancelledContinuation if too late (was already cancelled)
242+
private fun resumeImpl(proposedUpdate: Any?, resumeMode: Int): CancelledContinuation? {
223243
_state.loop { state ->
224244
when (state) {
225245
is NotCompleted -> {
226246
if (!_state.compareAndSet(state, proposedUpdate)) return@loop // retry on cas failure
227247
disposeParentHandle()
228248
dispatchResume(resumeMode)
229-
return
249+
return null
230250
}
231251
is CancelledContinuation -> {
232252
/*
233253
* If continuation was cancelled, then resume attempt must be ignored,
234254
* because cancellation is asynchronous and may race with resume.
235255
* Racy exceptions will be lost, too.
236256
*/
237-
if (state.makeResumed()) return // ok -- resumed just once
257+
if (state.makeResumed()) return state // tried to resume just once, but was cancelled
238258
}
239259
}
240260
alreadyResumedError(proposedUpdate) // otherwise -- an error (second resume attempt)
@@ -307,7 +327,11 @@ internal open class CancellableContinuationImpl<in T>(
307327

308328
@Suppress("UNCHECKED_CAST")
309329
override fun <T> getSuccessfulResult(state: Any?): T =
310-
if (state is CompletedIdempotentResult) state.result as T else state as T
330+
when (state) {
331+
is CompletedIdempotentResult -> state.result as T
332+
is CompletedWithCancellation -> state.result as T
333+
else -> state as T
334+
}
311335

312336
// For nicer debugging
313337
public override fun toString(): String =
@@ -344,3 +368,11 @@ private class CompletedIdempotentResult(
344368
) {
345369
override fun toString(): String = "CompletedIdempotentResult[$result]"
346370
}
371+
372+
private class CompletedWithCancellation(
373+
@JvmField val result: Any?,
374+
@JvmField val onCancellation: (cause: Throwable) -> Unit
375+
) {
376+
override fun toString(): String = "CompletedWithCancellation[$result]"
377+
}
378+

kotlinx-coroutines-core/common/src/Dispatched.kt

+11-7
Original file line numberDiff line numberDiff line change
@@ -203,15 +203,17 @@ internal fun <T> Continuation<T>.resumeDirectWithException(exception: Throwable)
203203
internal abstract class DispatchedTask<in T>(
204204
@JvmField public var resumeMode: Int
205205
) : SchedulerTask() {
206-
public abstract val delegate: Continuation<T>
206+
internal abstract val delegate: Continuation<T>
207207

208-
public abstract fun takeState(): Any?
208+
internal abstract fun takeState(): Any?
209+
210+
internal open fun cancelResult(state: Any?, cause: Throwable) {}
209211

210212
@Suppress("UNCHECKED_CAST")
211-
public open fun <T> getSuccessfulResult(state: Any?): T =
213+
internal open fun <T> getSuccessfulResult(state: Any?): T =
212214
state as T
213215

214-
public fun getExceptionalResult(state: Any?): Throwable? =
216+
internal fun getExceptionalResult(state: Any?): Throwable? =
215217
(state as? CompletedExceptionally)?.cause
216218

217219
public final override fun run() {
@@ -224,9 +226,11 @@ internal abstract class DispatchedTask<in T>(
224226
val job = if (resumeMode.isCancellableMode) context[Job] else null
225227
val state = takeState() // NOTE: Must take state in any case, even if cancelled
226228
withCoroutineContext(context, delegate.countOrElement) {
227-
if (job != null && !job.isActive)
228-
continuation.resumeWithException(job.getCancellationException())
229-
else {
229+
if (job != null && !job.isActive) {
230+
val cause = job.getCancellationException()
231+
cancelResult(state, cause)
232+
continuation.resumeWithException(cause)
233+
} else {
230234
val exception = getExceptionalResult(state)
231235
if (exception != null)
232236
continuation.resumeWithStackTrace(exception)

kotlinx-coroutines-core/common/src/Job.kt

+6-2
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,9 @@ public interface Job : CoroutineContext.Element {
273273
* Installed [handler] should not throw any exceptions. If it does, they will get caught,
274274
* wrapped into [CompletionHandlerException], and rethrown, potentially causing crash of unrelated code.
275275
*
276-
* **Note**: Implementations of `CompletionHandler` must be fast and _lock-free_.
276+
* **Note**: Implementation of `CompletionHandler` must be fast, non-blocking, and thread-safe.
277+
* This handler can be invoked concurrently with the surrounding code.
278+
* There is no guarantee on the execution context in which the [handler] is invoked.
277279
*/
278280
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
279281

@@ -304,7 +306,9 @@ public interface Job : CoroutineContext.Element {
304306
* **Note**: This function is a part of internal machinery that supports parent-child hierarchies
305307
* and allows for implementation of suspending functions that wait on the Job's state.
306308
* This function should not be used in general application code.
307-
* Implementations of `CompletionHandler` must be fast and _lock-free_.
309+
* Implementation of `CompletionHandler` must be fast, non-blocking, and thread-safe.
310+
* This handler can be invoked concurrently with the surrounding code.
311+
* There is no guarantee on the execution context in which the [handler] is invoked.
308312
*
309313
* @param onCancelling when `true`, then the [handler] is invoked as soon as this job transitions to _cancelling_ state;
310314
* when `false` then the [handler] is invoked only when it transitions to _completed_ state.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913
6+
7+
package kotlinx.coroutines
8+
9+
import kotlin.test.*
10+
11+
/**
12+
* Test for [CancellableContinuation.resume] with `onCancellation` parameter.
13+
*/
14+
class CancellableResumeTest : TestBase() {
15+
@Test
16+
fun testResumeImmediateNormally() = runTest {
17+
expect(1)
18+
val ok = suspendCancellableCoroutine<String> { cont ->
19+
expect(2)
20+
cont.invokeOnCancellation { expectUnreached() }
21+
cont.resume("OK") { expectUnreached() }
22+
expect(3)
23+
}
24+
assertEquals("OK", ok)
25+
finish(4)
26+
}
27+
28+
@Test
29+
fun testResumeImmediateAfterCancel() = runTest(
30+
expected = { it is TestException }
31+
) {
32+
expect(1)
33+
val ok = suspendCancellableCoroutine<String> { cont ->
34+
expect(2)
35+
cont.invokeOnCancellation { expect(3) }
36+
cont.cancel(TestException("FAIL"))
37+
expect(4)
38+
cont.resume("OK") { cause ->
39+
expect(5)
40+
assertTrue(cause is TestException)
41+
}
42+
finish(6)
43+
}
44+
expectUnreached()
45+
}
46+
47+
@Test
48+
fun testResumeLaterNormally() = runTest {
49+
expect(1)
50+
lateinit var cc: CancellableContinuation<String>
51+
launch(start = CoroutineStart.UNDISPATCHED) {
52+
expect(2)
53+
val ok = suspendCancellableCoroutine<String> { cont ->
54+
expect(3)
55+
cont.invokeOnCancellation { expectUnreached() }
56+
cc = cont
57+
}
58+
assertEquals("OK", ok)
59+
finish(6)
60+
}
61+
expect(4)
62+
cc.resume("OK") { expectUnreached() }
63+
expect(5)
64+
}
65+
66+
@Test
67+
fun testResumeLaterAfterCancel() = runTest {
68+
expect(1)
69+
lateinit var cc: CancellableContinuation<String>
70+
val job = launch(start = CoroutineStart.UNDISPATCHED) {
71+
expect(2)
72+
try {
73+
suspendCancellableCoroutine<String> { cont ->
74+
expect(3)
75+
cont.invokeOnCancellation { expect(5) }
76+
cc = cont
77+
}
78+
expectUnreached()
79+
} catch (e: CancellationException) {
80+
finish(9)
81+
}
82+
}
83+
expect(4)
84+
job.cancel(TestCancellationException())
85+
expect(6)
86+
cc.resume("OK") { cause ->
87+
expect(7)
88+
assertTrue(cause is TestCancellationException)
89+
}
90+
expect(8)
91+
}
92+
93+
@Test
94+
fun testResumeCancelWhileDispatched() = runTest {
95+
expect(1)
96+
lateinit var cc: CancellableContinuation<String>
97+
val job = launch(start = CoroutineStart.UNDISPATCHED) {
98+
expect(2)
99+
try {
100+
suspendCancellableCoroutine<String> { cont ->
101+
expect(3)
102+
// resumed first, then cancelled, so no invokeOnCancellation call
103+
cont.invokeOnCancellation { expectUnreached() }
104+
cc = cont
105+
}
106+
expectUnreached()
107+
} catch (e: CancellationException) {
108+
expect(8)
109+
}
110+
}
111+
expect(4)
112+
cc.resume("OK") { cause ->
113+
expect(7)
114+
assertTrue(cause is TestCancellationException)
115+
}
116+
expect(5)
117+
job.cancel(TestCancellationException()) // cancel while execution is dispatched
118+
expect(6)
119+
yield() // to coroutine -- throws cancellation exception
120+
finish(9)
121+
}
122+
}

0 commit comments

Comments
 (0)