Skip to content

Commit ed150dc

Browse files
committed
Basic resource transfer support
Fixes #1191
1 parent aafa17a commit ed150dc

23 files changed

+416
-104
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+13-1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public abstract interface class kotlinx/coroutines/CancellableContinuation : kot
4646
public abstract fun resumeUndispatched (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Object;)V
4747
public abstract fun resumeUndispatchedWithException (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Throwable;)V
4848
public abstract fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
49+
public abstract fun tryResumeAtomic (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
4950
public abstract fun tryResumeWithException (Ljava/lang/Throwable;)Ljava/lang/Object;
5051
}
5152

@@ -56,6 +57,8 @@ public final class kotlinx/coroutines/CancellableContinuation$DefaultImpls {
5657

5758
public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/jvm/internal/CoroutineStackFrame, kotlinx/coroutines/CancellableContinuation {
5859
public fun <init> (Lkotlin/coroutines/Continuation;I)V
60+
public final fun callCancelHandler (Lkotlinx/coroutines/CancelHandler;Ljava/lang/Throwable;)V
61+
public final fun callOnCancellation (Lkotlin/jvm/functions/Function1;Ljava/lang/Throwable;)V
5962
public fun cancel (Ljava/lang/Throwable;)Z
6063
public fun completeResume (Ljava/lang/Object;)V
6164
public fun getCallerFrame ()Lkotlin/coroutines/jvm/internal/CoroutineStackFrame;
@@ -75,6 +78,7 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/
7578
public fun resumeWith (Ljava/lang/Object;)V
7679
public fun toString ()Ljava/lang/String;
7780
public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
81+
public fun tryResumeAtomic (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
7882
public fun tryResumeWithException (Ljava/lang/Throwable;)Ljava/lang/Object;
7983
}
8084

@@ -268,7 +272,8 @@ public final class kotlinx/coroutines/DelayKt {
268272
}
269273

270274
public final class kotlinx/coroutines/DispatchedContinuationKt {
271-
public static final fun resumeCancellableWith (Lkotlin/coroutines/Continuation;Ljava/lang/Object;)V
275+
public static final fun resumeCancellableWith (Lkotlin/coroutines/Continuation;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)V
276+
public static synthetic fun resumeCancellableWith$default (Lkotlin/coroutines/Continuation;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)V
272277
}
273278

274279
public final class kotlinx/coroutines/Dispatchers {
@@ -482,6 +487,13 @@ public final class kotlinx/coroutines/ParentJob$DefaultImpls {
482487
public static fun plus (Lkotlinx/coroutines/ParentJob;Lkotlinx/coroutines/Job;)Lkotlinx/coroutines/Job;
483488
}
484489

490+
public final class kotlinx/coroutines/Resource {
491+
public fun <init> (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)V
492+
public final fun cancel ()V
493+
public final fun getValue ()Ljava/lang/Object;
494+
public final fun isCancelled ()Z
495+
}
496+
485497
public final class kotlinx/coroutines/RunnableKt {
486498
public static final fun Runnable (Lkotlin/jvm/functions/Function0;)Ljava/lang/Runnable;
487499
}

kotlinx-coroutines-core/common/src/CancellableContinuation.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public interface CancellableContinuation<in T> : Continuation<T> {
8484
* guaranteed can be provided by having a cancellation fallback.
8585
*/
8686
@InternalCoroutinesApi
87-
public fun tryResumeAtomic(value: T, idempotent: Any?, onCancellation: (cause: Throwable) -> Unit): Any?
87+
public fun tryResumeAtomic(value: T, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any?
8888

8989
/**
9090
* Tries to resume this continuation with the specified [exception] and returns a non-null object token if successful,

kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt

+44-14
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,9 @@ internal open class CancellableContinuationImpl<in T>(
148148

149149
override fun takeState(): Any? = state
150150

151-
override fun cancelCompletedResult(cause: Throwable): Unit = _state.loop { state ->
151+
// Note: takeState does not clear the state so we don't use takenState
152+
// and we use the actual current state where in CAS-loop
153+
override fun cancelCompletedResult(takenState: Any?, cause: Throwable): Unit = _state.loop { state ->
152154
when (state) {
153155
is NotCompleted -> error("Not completed")
154156
is CompletedExceptionally -> return // already completed exception or cancelled, nothing to do
@@ -162,7 +164,10 @@ internal open class CancellableContinuationImpl<in T>(
162164
}
163165
else -> {
164166
// completed normally without marker class, promote to CompletedContinuation to synchronize cancellation
165-
if (_state.compareAndSet(state, CompletedContinuation(state, cancelCause = cause))) return
167+
if (_state.compareAndSet(state, CompletedContinuation(state, cancelCause = cause))) {
168+
cancelResourceIfNeeded(state) { context }
169+
return // done
170+
}
166171
}
167172
}
168173
}
@@ -183,7 +188,7 @@ internal open class CancellableContinuationImpl<in T>(
183188
val update = CancelledContinuation(this, cause, handled = state is CancelHandler)
184189
if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
185190
// Invoke cancel handler if it was present
186-
if (state is CancelHandler) invokeHandlerSafely { state.invoke(cause) }
191+
(state as? CancelHandler)?.let { callCancelHandler(it, cause) }
187192
// Complete state update
188193
detachChildIfNonResuable()
189194
dispatchResume(mode = MODE_ATOMIC) // no need for additional cancellation checks
@@ -198,14 +203,36 @@ internal open class CancellableContinuationImpl<in T>(
198203
detachChildIfNonResuable()
199204
}
200205

201-
internal inline fun invokeHandlerSafely(block: () -> Unit) {
206+
private inline fun callCancelHandlerSafely(block: () -> Unit) {
207+
try {
208+
block()
209+
} catch (ex: Throwable) {
210+
// Handler should never fail, if it does -- it is an unhandled exception
211+
handleCoroutineException(
212+
context,
213+
CompletionHandlerException("Exception in invokeOnCancellation handler for $this", ex)
214+
)
215+
}
216+
}
217+
218+
private fun callCancelHandler(handler: CompletionHandler, cause: Throwable?) =
219+
/*
220+
* :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
221+
* because we play type tricks on Kotlin/JS and handler is not necessarily a function there
222+
*/
223+
callCancelHandlerSafely { handler.invokeIt(cause) }
224+
225+
fun callCancelHandler(handler: CancelHandler, cause: Throwable?) =
226+
callCancelHandlerSafely { handler.invoke(cause) }
227+
228+
fun callOnCancellation(onCancellation: (cause: Throwable) -> Unit, cause: Throwable) {
202229
try {
203-
block()
230+
onCancellation.invoke(cause)
204231
} catch (ex: Throwable) {
205232
// Handler should never fail, if it does -- it is an unhandled exception
206233
handleCoroutineException(
207234
context,
208-
CompletionHandlerException("Exception in cancellation handler for $this", ex)
235+
CompletionHandlerException("Exception in resume onCancellation handler for $this", ex)
209236
)
210237
}
211238
}
@@ -251,7 +278,7 @@ internal open class CancellableContinuationImpl<in T>(
251278
val job = context[Job]
252279
if (job != null && !job.isActive) {
253280
val cause = job.getCancellationException()
254-
cancelCompletedResult(cause)
281+
cancelCompletedResult(state, cause)
255282
throw recoverStackTrace(cause, this)
256283
}
257284
}
@@ -285,7 +312,7 @@ internal open class CancellableContinuationImpl<in T>(
285312
* because we play type tricks on Kotlin/JS and handler is not necessarily a function there
286313
*/
287314
if (state is CancelledContinuation) {
288-
invokeHandlerSafely { handler.invokeIt((state as? CompletedExceptionally)?.cause) }
315+
callCancelHandler(handler, (state as? CompletedExceptionally)?.cause)
289316
}
290317
return
291318
}
@@ -298,7 +325,7 @@ internal open class CancellableContinuationImpl<in T>(
298325
// todo: extra layer of protection against the second invokeOnCancellation
299326
// if (!state.makeHandled()) multipleHandlersError(handler, state)
300327
// Was already cancelled while being dispatched -- invoke the handler directly
301-
invokeHandlerSafely { handler.invokeIt(state.cancelCause) }
328+
callCancelHandler(handler, state.cancelCause)
302329
return
303330
}
304331
val update = state.copy(cancelHandler = cancelHandler)
@@ -370,7 +397,9 @@ internal open class CancellableContinuationImpl<in T>(
370397
*/
371398
if (state.makeResumed()) { // check if trying to resume one (otherwise error)
372399
// call onCancellation
373-
onCancellation?.let { invokeHandlerSafely { it(state.cause) } }
400+
onCancellation?.let { callOnCancellation(it, state.cause) }
401+
// cancel resource
402+
cancelResourceIfNeeded(state) { context }
374403
return // done
375404
}
376405
}
@@ -398,7 +427,7 @@ internal open class CancellableContinuationImpl<in T>(
398427
}
399428
is CompletedContinuation -> {
400429
return if (idempotent != null && state.idempotentResume === idempotent) {
401-
assert { state.result === proposedUpdate } // "Non-idempotent resume"
430+
assert { state.result == proposedUpdate } // "Non-idempotent resume"
402431
RESUME_TOKEN // resumed with the same token -- ok
403432
} else {
404433
null // resumed with a different token or non-idempotent -- too late
@@ -433,7 +462,7 @@ internal open class CancellableContinuationImpl<in T>(
433462
override fun tryResume(value: T, idempotent: Any?): Any? =
434463
tryResumeImpl(value, idempotent = idempotent, onCancellation = null)
435464

436-
override fun tryResumeAtomic(value: T, idempotent: Any?, onCancellation: (cause: Throwable) -> Unit): Any? =
465+
override fun tryResumeAtomic(value: T, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any? =
437466
tryResumeImpl(value, idempotent, onCancellation)
438467

439468
override fun tryResumeWithException(exception: Throwable): Any? =
@@ -501,7 +530,8 @@ private data class CompletedContinuation(
501530
val cancelled: Boolean get() = cancelCause != null
502531

503532
fun invokeHandlers(cont: CancellableContinuationImpl<*>, cause: Throwable) {
504-
cancelHandler?.let { cont.invokeHandlerSafely { it.invoke(cause) } }
505-
onCancellation?.let { cont.invokeHandlerSafely { it.invoke(cause) } }
533+
cancelHandler?.let { cont.callCancelHandler(it, cause) }
534+
onCancellation?.let { cont.callOnCancellation(it, cause) }
535+
cancelResourceIfNeeded(result) { cont.context }
506536
}
507537
}

kotlinx-coroutines-core/common/src/CompletedExceptionally.kt

+25-3
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,27 @@ import kotlinx.coroutines.internal.*
99
import kotlin.coroutines.*
1010
import kotlin.jvm.*
1111

12-
internal fun <T> Result<T>.toState(): Any? = fold({ it }, { CompletedExceptionally(it) })
12+
internal fun <T> Result<T>.toState(
13+
onCancellation: ((cause: Throwable) -> Unit)? = null
14+
): Any? = fold(
15+
onSuccess = { if (onCancellation != null) CompletedWithCancellation(it, onCancellation) else it },
16+
onFailure = { CompletedExceptionally(it) }
17+
)
1318

14-
internal fun <T> Result<T>.toState(caller: CancellableContinuation<*>): Any? = fold({ it },
15-
{ CompletedExceptionally(recoverStackTrace(it, caller)) })
19+
internal fun <T> Result<T>.toState(caller: CancellableContinuation<*>): Any? = fold(
20+
onSuccess = { it },
21+
onFailure = { CompletedExceptionally(recoverStackTrace(it, caller)) }
22+
)
23+
24+
internal fun DispatchedContinuation<*>.cancelState(state: Any?, cause: Throwable) {
25+
when (state) {
26+
is CompletedWithCancellation -> {
27+
cancelResourceIfNeeded(state.result) { context }
28+
state.onCancellation(cause)
29+
}
30+
else -> cancelResourceIfNeeded(state) { context }
31+
}
32+
}
1633

1734
@Suppress("RESULT_CLASS_IN_RETURN_TYPE", "UNCHECKED_CAST")
1835
internal fun <T> recoverResult(state: Any?, uCont: Continuation<T>): Result<T> =
@@ -21,6 +38,11 @@ internal fun <T> recoverResult(state: Any?, uCont: Continuation<T>): Result<T> =
2138
else
2239
Result.success(state as T)
2340

41+
internal data class CompletedWithCancellation(
42+
@JvmField val result: Any?,
43+
@JvmField val onCancellation: ((cause: Throwable) -> Unit)
44+
)
45+
2446
/**
2547
* Class for an internal state of a job that was cancelled (completed exceptionally).
2648
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package kotlinx.coroutines
2+
3+
import kotlinx.atomicfu.*
4+
import kotlin.coroutines.*
5+
6+
@ExperimentalCoroutinesApi
7+
public class Resource<T>(
8+
public val value: T,
9+
private val onCancellation: (value: T) -> Unit
10+
) {
11+
private val _cancelled = atomic(false)
12+
13+
public val isCancelled: Boolean
14+
get() = _cancelled.value
15+
16+
public fun cancel() {
17+
if (!_cancelled.getAndSet(true)) onCancellation(value)
18+
}
19+
}
20+
21+
internal fun callCancelResourceSafely(resource: Resource<*>, resourceException: ResourceCancellationException? = null): ResourceCancellationException? {
22+
try {
23+
resource.cancel()
24+
} catch (ex: Throwable) {
25+
if (resourceException != null) {
26+
resourceException.addSuppressedThrowable(ex)
27+
} else {
28+
return ResourceCancellationException("Exception in resource cancellation: ${resource.value}", ex)
29+
}
30+
}
31+
return resourceException
32+
}
33+
34+
internal inline fun callCancelResource(resource: Resource<*>, context: () -> CoroutineContext) {
35+
callCancelResourceSafely(resource)?.let { ex ->
36+
handleCoroutineException(context(), ex)
37+
}
38+
}
39+
40+
internal inline fun cancelResourceIfNeeded(resource: Any?, context: () -> CoroutineContext) {
41+
(resource as? Resource<*>)?.let { callCancelResource(it, context) }
42+
}
43+
44+
internal class ResourceCancellationException(message: String, cause: Throwable) : RuntimeException(message, cause)

0 commit comments

Comments
 (0)