Skip to content

Commit ee62268

Browse files
qwwdfsadTrol
authored andcommitted
Support thread interrupting blocking functions (Kotlin#1972)
* Support thread interrupting blocking functions (Kotlin#1947) This is the implementation of issue Kotlin#1947 Signed-off-by: Trol <[email protected]> Co-authored-by: Trol <[email protected]>
1 parent 8e5041f commit ee62268

File tree

4 files changed

+290
-0
lines changed

4 files changed

+290
-0
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+5
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,11 @@ public final class kotlinx/coroutines/GlobalScope : kotlinx/coroutines/Coroutine
328328
public abstract interface annotation class kotlinx/coroutines/InternalCoroutinesApi : java/lang/annotation/Annotation {
329329
}
330330

331+
public final class kotlinx/coroutines/InterruptibleKt {
332+
public static final fun runInterruptible (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
333+
public static synthetic fun runInterruptible$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
334+
}
335+
331336
public abstract interface class kotlinx/coroutines/Job : kotlin/coroutines/CoroutineContext$Element {
332337
public static final field Key Lkotlinx/coroutines/Job$Key;
333338
public abstract fun attachChild (Lkotlinx/coroutines/ChildJob;)Lkotlinx/coroutines/ChildHandle;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlinx.atomicfu.*
8+
import kotlin.coroutines.*
9+
10+
/**
11+
* Calls the specified [block] with a given coroutine context in a interruptible manner.
12+
* The blocking code block will be interrupted and this function will throw [CancellationException]
13+
* if the coroutine is cancelled.
14+
* The specified [coroutineContext] directly translates into [withContext] argument.
15+
*
16+
* Example:
17+
* ```
18+
* val blockingJob = launch {
19+
* // This function will throw CancellationException
20+
* runInterruptible(Dispatchers.IO) {
21+
* // This blocking procedure will be interrupted when this coroutine is canceled
22+
* doSomethingElseUsefulInterruptible()
23+
* }
24+
* }
25+
*
26+
* delay(500L)
27+
* blockingJob.cancel() // Interrupt blocking call
28+
* }
29+
* ```
30+
*
31+
* There is also an optional context parameter to this function to enable single-call conversion of
32+
* interruptible Java methods into suspending functions like this:
33+
* ```
34+
* // With one call here we are moving the call to Dispatchers.IO and supporting interruption.
35+
* suspend fun <T> BlockingQueue<T>.awaitTake(): T =
36+
* runInterruptible(Dispatchers.IO) { queue.take() }
37+
* ```
38+
*/
39+
public suspend fun <T> runInterruptible(
40+
context: CoroutineContext = EmptyCoroutineContext,
41+
block: () -> T
42+
): T = withContext(context) {
43+
runInterruptibleInExpectedContext(block)
44+
}
45+
46+
private suspend fun <T> runInterruptibleInExpectedContext(block: () -> T): T {
47+
try {
48+
// No job -> no cancellation
49+
val job = coroutineContext[Job] ?: return block()
50+
val threadState = ThreadState(job)
51+
try {
52+
return block()
53+
} finally {
54+
threadState.clearInterrupt()
55+
}
56+
} catch (e: InterruptedException) {
57+
throw CancellationException("Blocking call was interrupted due to parent cancellation").initCause(e)
58+
}
59+
}
60+
61+
private const val WORKING = 0
62+
private const val FINISHED = 1
63+
private const val INTERRUPTING = 2
64+
private const val INTERRUPTED = 3
65+
66+
private class ThreadState : CompletionHandler {
67+
/*
68+
=== States ===
69+
70+
WORKING: running normally
71+
FINISH: complete normally
72+
INTERRUPTING: canceled, going to interrupt this thread
73+
INTERRUPTED: this thread is interrupted
74+
75+
=== Possible Transitions ===
76+
77+
+----------------+ register job +-------------------------+
78+
| WORKING | cancellation listener | WORKING |
79+
| (thread, null) | -------------------------> | (thread, cancel handle) |
80+
+----------------+ +-------------------------+
81+
| | |
82+
| cancel cancel | | complete
83+
| | |
84+
V | |
85+
+---------------+ | |
86+
| INTERRUPTING | <--------------------------------------+ |
87+
+---------------+ |
88+
| |
89+
| interrupt |
90+
| |
91+
V V
92+
+---------------+ +-------------------------+
93+
| INTERRUPTED | | FINISHED |
94+
+---------------+ +-------------------------+
95+
*/
96+
private val state: AtomicRef<State> = atomic(State(WORKING, null))
97+
private val targetThread = Thread.currentThread()
98+
99+
private data class State(@JvmField val state: Int, @JvmField val cancelHandle: DisposableHandle?)
100+
101+
// We're using a non-primary constructor instead of init block of a primary constructor here, because
102+
// we need to `return`.
103+
constructor(job: Job) {
104+
// Register cancellation handler
105+
val cancelHandle =
106+
job.invokeOnCompletion(onCancelling = true, invokeImmediately = true, handler = this)
107+
// Either we successfully stored it or it was immediately cancelled
108+
state.loop { s ->
109+
when (s.state) {
110+
// Happy-path, move forward
111+
WORKING -> if (state.compareAndSet(s, State(WORKING, cancelHandle))) return
112+
// Immediately cancelled, just continue
113+
INTERRUPTING, INTERRUPTED -> return
114+
else -> throw IllegalStateException("Illegal state $s")
115+
}
116+
}
117+
}
118+
119+
fun clearInterrupt() {
120+
/*
121+
* Do not allow to untriggered interrupt to leak
122+
*/
123+
state.loop { s ->
124+
when (s.state) {
125+
WORKING -> if (state.compareAndSet(s, State(FINISHED, null))) {
126+
s.cancelHandle?.dispose()
127+
return
128+
}
129+
INTERRUPTING -> {
130+
/*
131+
* Spin, cancellation mechanism is interrupting our thread right now
132+
* and we have to wait it and then clear interrupt status
133+
*/
134+
}
135+
INTERRUPTED -> {
136+
// Clear it and bail out
137+
Thread.interrupted();
138+
return
139+
}
140+
else -> error("Illegal state: $s")
141+
}
142+
}
143+
}
144+
145+
// Cancellation handler
146+
override fun invoke(cause: Throwable?) {
147+
state.loop { s ->
148+
when (s.state) {
149+
// Working -> try to transite state and interrupt the thread
150+
WORKING -> {
151+
if (state.compareAndSet(s, State(INTERRUPTING, null))) {
152+
targetThread.interrupt()
153+
state.value = State(INTERRUPTED, null)
154+
return
155+
}
156+
}
157+
// Finished -- runInterruptible is already complete
158+
FINISHED -> return
159+
INTERRUPTING, INTERRUPTED -> return
160+
else -> error("Illegal state: $s")
161+
}
162+
}
163+
}
164+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import java.util.concurrent.atomic.*
8+
import kotlin.test.*
9+
10+
class RunInterruptibleStressTest : TestBase() {
11+
12+
private val dispatcher = Dispatchers.IO
13+
private val REPEAT_TIMES = 1000 * stressTestMultiplier
14+
15+
@Test
16+
fun testStress() = runBlocking {
17+
val interruptLeak = AtomicBoolean(false)
18+
val enterCount = AtomicInteger(0)
19+
val interruptedCount = AtomicInteger(0)
20+
val otherExceptionCount = AtomicInteger(0)
21+
22+
repeat(REPEAT_TIMES) { repeat ->
23+
val job = launch(dispatcher, start = CoroutineStart.LAZY) {
24+
try {
25+
runInterruptible {
26+
enterCount.incrementAndGet()
27+
try {
28+
Thread.sleep(Long.MAX_VALUE)
29+
} catch (e: InterruptedException) {
30+
interruptedCount.incrementAndGet()
31+
throw e
32+
}
33+
}
34+
} catch (e: CancellationException) {
35+
} catch (e: Throwable) {
36+
otherExceptionCount.incrementAndGet()
37+
} finally {
38+
interruptLeak.set(interruptLeak.get() || Thread.currentThread().isInterrupted)
39+
}
40+
}
41+
42+
val cancelJob = launch(dispatcher, start = CoroutineStart.LAZY) {
43+
job.cancel()
44+
}
45+
46+
job.start()
47+
val canceller = launch(dispatcher) {
48+
cancelJob.start()
49+
}
50+
51+
joinAll(job, cancelJob, canceller)
52+
}
53+
54+
assertFalse(interruptLeak.get())
55+
assertEquals(enterCount.get(), interruptedCount.get())
56+
assertEquals(0, otherExceptionCount.get())
57+
}
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlinx.coroutines.channels.*
8+
import java.io.*
9+
import java.util.concurrent.*
10+
import java.util.concurrent.atomic.*
11+
import kotlin.test.*
12+
13+
class RunInterruptibleTest : TestBase() {
14+
15+
@Test
16+
fun testNormalRun() = runTest {
17+
val result = runInterruptible {
18+
val x = 1
19+
val y = 2
20+
Thread.sleep(1)
21+
x + y
22+
}
23+
assertEquals(3, result)
24+
}
25+
26+
@Test
27+
fun testExceptionalRun() = runTest {
28+
try {
29+
runInterruptible {
30+
expect(1)
31+
throw TestException()
32+
}
33+
} catch (e: TestException) {
34+
finish(2)
35+
}
36+
}
37+
38+
@Test
39+
fun testInterrupt() = runTest {
40+
val latch = Channel<Unit>(1)
41+
val job = launch {
42+
runInterruptible(Dispatchers.IO) {
43+
expect(2)
44+
latch.offer(Unit)
45+
try {
46+
Thread.sleep(10_000L)
47+
expectUnreached()
48+
} catch (e: InterruptedException) {
49+
expect(4)
50+
assertFalse { Thread.currentThread().isInterrupted }
51+
}
52+
}
53+
}
54+
55+
launch(start = CoroutineStart.UNDISPATCHED) {
56+
expect(1)
57+
latch.receive()
58+
expect(3)
59+
job.cancelAndJoin()
60+
}.join()
61+
finish(5)
62+
}
63+
}

0 commit comments

Comments
 (0)