Skip to content

Commit 692570d

Browse files
author
Trol
committed
Implement optional thread interrupt on coroutine cancellation (Kotlin#57)
See issue Kotlin#57 for details Signed-off-by: Trol <[email protected]>
1 parent 5eaf83c commit 692570d

File tree

6 files changed

+421
-2
lines changed

6 files changed

+421
-2
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+25
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ public abstract class kotlinx/coroutines/AbstractCoroutine : kotlinx/coroutines/
44
public synthetic fun <init> (Lkotlin/coroutines/CoroutineContext;ZILkotlin/jvm/internal/DefaultConstructorMarker;)V
55
protected fun afterResume (Ljava/lang/Object;)V
66
protected fun cancellationExceptionMessage ()Ljava/lang/String;
7+
public final fun completeCoroutine (Ljava/lang/Object;)Ljava/lang/Object;
78
public final fun getContext ()Lkotlin/coroutines/CoroutineContext;
89
public fun getCoroutineContext ()Lkotlin/coroutines/CoroutineContext;
910
public fun isActive ()Z
@@ -187,6 +188,30 @@ public final class kotlinx/coroutines/CoroutineExceptionHandlerKt {
187188
public static final fun handleCoroutineException (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Throwable;)V
188189
}
189190

191+
public abstract class kotlinx/coroutines/CoroutineInterruptController : kotlin/coroutines/AbstractCoroutineContextElement {
192+
public static final field Key Lkotlinx/coroutines/CoroutineInterruptController$Key;
193+
public fun <init> ()V
194+
public abstract fun updateCoroutineCompleteState (Ljava/lang/Object;)Ljava/lang/Object;
195+
}
196+
197+
public final class kotlinx/coroutines/CoroutineInterruptController$Key : kotlin/coroutines/CoroutineContext$Key {
198+
}
199+
200+
public final class kotlinx/coroutines/CoroutineInterruptible : kotlinx/coroutines/CoroutineInterruptController, kotlinx/coroutines/ThreadContextElement {
201+
public static final field INSTANCE Lkotlinx/coroutines/CoroutineInterruptible;
202+
public synthetic fun restoreThreadContext (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Object;)V
203+
public fun restoreThreadContext (Lkotlin/coroutines/CoroutineContext;Lkotlinx/coroutines/CoroutineInterruptible$ThreadState;)V
204+
public fun updateCoroutineCompleteState (Ljava/lang/Object;)Ljava/lang/Object;
205+
public synthetic fun updateThreadContext (Lkotlin/coroutines/CoroutineContext;)Ljava/lang/Object;
206+
public fun updateThreadContext (Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/CoroutineInterruptible$ThreadState;
207+
}
208+
209+
public final class kotlinx/coroutines/CoroutineInterruptible$ThreadState {
210+
public fun <init> ()V
211+
public final fun clearInterrupt ()V
212+
public final fun initInterrupt (Lkotlinx/coroutines/Job;)V
213+
}
214+
190215
public final class kotlinx/coroutines/CoroutineName : kotlin/coroutines/AbstractCoroutineContextElement {
191216
public static final field Key Lkotlinx/coroutines/CoroutineName$Key;
192217
public fun <init> (Ljava/lang/String;)V

kotlinx-coroutines-core/common/src/AbstractCoroutine.kt

+10-1
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,20 @@ public abstract class AbstractCoroutine<in T>(
104104
onCompleted(state as T)
105105
}
106106

107+
/**
108+
* Completes execution of this coroutine with the specified state.
109+
*/
110+
public fun completeCoroutine(state: Any?): Any? {
111+
var completeState = state
112+
context[CoroutineInterruptController]?.let { completeState = it.updateCoroutineCompleteState(completeState) }
113+
return makeCompletingOnce(completeState)
114+
}
115+
107116
/**
108117
* Completes execution of this with coroutine with the specified result.
109118
*/
110119
public final override fun resumeWith(result: Result<T>) {
111-
val state = makeCompletingOnce(result.toState())
120+
val state = completeCoroutine(result.toState())
112121
if (state === COMPLETING_WAITING_CHILDREN) return
113122
afterResume(state)
114123
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlin.coroutines.AbstractCoroutineContextElement
8+
import kotlin.coroutines.CoroutineContext
9+
10+
/**
11+
* This [CoroutineContext] element makes a coroutine interruptible.
12+
*
13+
* With this element, the thread executing the coroutine is interrupted when the coroutine is canceled, making
14+
* blocking procedures stop. Exceptions that indicate an interrupted procedure, eg., InterruptedException on JVM
15+
* are transformed into [CancellationException] at the end of the coroutine. Thus, everything else goes as if this
16+
* element is not present. In particular, the parent coroutine won't be canceled by those exceptions.
17+
*
18+
* This is an abstract element and will be implemented by each individual platform (or won't be implemented).
19+
* The JVM implementation is named CoroutineInterruptible.
20+
*
21+
* Example:
22+
* ```
23+
* GlobalScope.launch(Dispatchers.IO + CoroutineInterruptible) {
24+
* async {
25+
* // This block will throw [CancellationException] instead of an exception indicating
26+
* // interruption, such as InterruptedException on JVM.
27+
* withContext(CoroutineName) {
28+
* doSomethingUseful()
29+
* // This blocking procedure will be interrupted when this coroutine is canceled
30+
* // by Exception thrown by the below async block.
31+
* doSomethingElseUsefulInterruptible()
32+
* }
33+
* }
34+
*
35+
* async {
36+
* delay(500L)
37+
* throw Exception()
38+
* }
39+
* }
40+
* ```
41+
*/
42+
abstract class CoroutineInterruptController : AbstractCoroutineContextElement(Key) {
43+
/**
44+
* Key for [CoroutineInterruptController] instance in the coroutine context.
45+
*/
46+
@InternalCoroutinesApi
47+
companion object Key : CoroutineContext.Key<CoroutineInterruptController>
48+
49+
/**
50+
* Update the complete state of a coroutine, mainly for exception transformation.
51+
*/
52+
@InternalCoroutinesApi
53+
abstract fun updateCoroutineCompleteState(completeState: Any?): Any?
54+
}

kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ private inline fun <T> ScopeCoroutine<T>.undispatchedResult(
125125
* not a timeout exception.
126126
*/
127127
if (result === COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED // (1)
128-
val state = makeCompletingOnce(result)
128+
val state = completeCoroutine(result)
129129
if (state === COMPLETING_WAITING_CHILDREN) return COROUTINE_SUSPENDED // (2)
130130
return if (state is CompletedExceptionally) { // (3)
131131
when {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlinx.atomicfu.AtomicRef
8+
import kotlinx.atomicfu.atomic
9+
import kotlinx.atomicfu.loop
10+
import kotlin.coroutines.CoroutineContext
11+
12+
/**
13+
* This is the [CoroutineInterruptController] implementation on JVM. See [CoroutineInterruptController] for detailed
14+
* description and examples.
15+
*/
16+
object CoroutineInterruptible :
17+
CoroutineInterruptController(), ThreadContextElement<CoroutineInterruptible.ThreadState?> {
18+
19+
/**
20+
* Update the complete state of a coroutine on JVM.
21+
* Transforms [InterruptedException] into [CancellationException] for coroutines with this context element.
22+
*/
23+
@InternalCoroutinesApi
24+
override fun updateCoroutineCompleteState(completeState: Any?): Any? =
25+
if (completeState is CompletedExceptionally && completeState.cause is InterruptedException)
26+
CompletedExceptionally(CancellationException())
27+
else
28+
completeState
29+
30+
/**
31+
* Updates context of the current thread.
32+
* This function is invoked before the coroutine in the specified [context] is resumed in the current thread.
33+
* Prepares interruption for this execution, watching the [Job] for cancellation and interrupt this executing
34+
* thread on cancellation.
35+
*/
36+
@InternalCoroutinesApi
37+
override fun updateThreadContext(context: CoroutineContext): ThreadState? {
38+
// Fast path: no Job in this context
39+
val job = context[Job] ?: return null
40+
// Slow path
41+
val threadState = ThreadState()
42+
threadState.initInterrupt(job)
43+
return threadState
44+
}
45+
46+
/**
47+
* Restores context of the current thread.
48+
* This function is invoked after the coroutine in the specified [context] is suspended in the current thread.
49+
* Stops watching the [Job] for cancellation and do clean-up work.
50+
*/
51+
@InternalCoroutinesApi
52+
override fun restoreThreadContext(context: CoroutineContext, oldState: ThreadState?) {
53+
// Fast path: no Job in this context
54+
val threadState = oldState ?: return
55+
// Slow path
56+
threadState.clearInterrupt()
57+
}
58+
59+
/**
60+
* Holds the state of executions for interruption.
61+
*/
62+
@InternalCoroutinesApi
63+
class ThreadState {
64+
fun initInterrupt(job: Job) {
65+
initInvokeOnCancel(job)
66+
initThread()
67+
}
68+
69+
fun clearInterrupt() {
70+
state.loop { s ->
71+
when {
72+
s is Working -> {
73+
if (state.compareAndSet(s, Finish)) {
74+
s.cancelHandle?.let { it.dispose() } // no more watching
75+
return
76+
}
77+
}
78+
s === Interrupting -> Thread.yield() // eases the thread
79+
s === Interrupted -> { Thread.interrupted(); return } // no interrupt leak
80+
s === Init || s === Finish -> throw IllegalStateException("impossible state")
81+
else -> throw IllegalStateException("unknown state")
82+
}
83+
}
84+
}
85+
86+
private fun initInvokeOnCancel(job: Job) {
87+
// watches the job for cancellation
88+
val cancelHandle =
89+
job.invokeOnCompletion(onCancelling = true, invokeImmediately = true, handler = CancelHandler())
90+
// remembers the cancel handle or drops it
91+
state.loop { s ->
92+
when {
93+
s === Init -> if (state.compareAndSet(s, Working(null, cancelHandle))) return
94+
s is Working -> if (state.compareAndSet(s, Working(s.thread, cancelHandle))) return
95+
s === Finish -> { cancelHandle.dispose(); return } // no more watching needed
96+
s === Interrupting || s === Interrupted -> return
97+
else -> throw IllegalStateException("unknown state")
98+
}
99+
}
100+
}
101+
102+
private fun initThread() {
103+
val thread = Thread.currentThread()
104+
state.loop { s ->
105+
when {
106+
s === Init -> if (state.compareAndSet(s, Working(thread, null))) return
107+
s is Working -> if (state.compareAndSet(s, Working(thread, s.cancelHandle))) return
108+
s === Interrupted -> { thread.interrupt(); return } // interrupted before the thread is set
109+
s === Finish || s === Interrupting -> throw IllegalStateException("impossible state")
110+
else -> throw IllegalStateException("unknown state")
111+
}
112+
}
113+
}
114+
115+
private inner class CancelHandler : CompletionHandler {
116+
override fun invoke(cause: Throwable?) {
117+
state.loop { s ->
118+
when {
119+
s === Init || (s is Working && s.thread === null) -> {
120+
if (state.compareAndSet(s, Interrupted))
121+
return
122+
}
123+
s is Working -> {
124+
if (state.compareAndSet(s, Interrupting)) {
125+
s.thread!!.interrupt()
126+
state.value = Interrupted
127+
return
128+
}
129+
}
130+
s === Finish -> return
131+
s === Interrupting || s === Interrupted -> return
132+
else -> throw IllegalStateException("unknown state")
133+
}
134+
}
135+
}
136+
}
137+
138+
private val state: AtomicRef<State> = atomic(Init)
139+
140+
private interface State
141+
// initial state
142+
private object Init : State
143+
// cancellation watching is setup and/or the continuation is running
144+
private data class Working(val thread: Thread?, val cancelHandle: DisposableHandle?) : State
145+
// the continuation done running without interruption
146+
private object Finish : State
147+
// interrupting this thread
148+
private object Interrupting: State
149+
// done interrupting
150+
private object Interrupted: State
151+
}
152+
}

0 commit comments

Comments
 (0)