Skip to content

Commit dbca4c1

Browse files
authored
Reliably run finalizers even if runBlocking got interrupted. (#4399)
Also, execute the initial portion of `runBlocking`'s block even if the thread was interrupted at the start. Fixes #4384
1 parent 8627cc3 commit dbca4c1

File tree

3 files changed

+179
-79
lines changed

3 files changed

+179
-79
lines changed

kotlinx-coroutines-core/jvm/src/Builders.kt

+1-2
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,11 @@ private class BlockingCoroutine<T>(
9191
eventLoop?.incrementUseCount()
9292
try {
9393
while (true) {
94-
@Suppress("DEPRECATION")
95-
if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) }
9694
val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
9795
// note: process next even may loose unpark flag, so check if completed before parking
9896
if (isCompleted) break
9997
parkNanos(this, parkNanos)
98+
if (Thread.interrupted()) cancelCoroutine(InterruptedException())
10099
}
101100
} finally { // paranoia
102101
eventLoop?.decrementUseCount()

kotlinx-coroutines-core/jvm/test/ExecutorsTest.kt

-75
Original file line numberDiff line numberDiff line change
@@ -120,54 +120,6 @@ class ExecutorsTest : TestBase() {
120120
check(executorService.isShutdown)
121121
}
122122

123-
@Test
124-
fun testEarlyExecutorShutdown() {
125-
runTestExceptionInDispatch(6, { it is RejectedExecutionException }) {
126-
expect(1)
127-
val dispatcher = newSingleThreadContext("Ctx")
128-
launch(dispatcher) {
129-
withContext(Dispatchers.Default) {
130-
expect(2)
131-
delay(100)
132-
expect(4)
133-
}
134-
}
135-
136-
delay(50)
137-
expect(3)
138-
139-
dispatcher.close()
140-
}
141-
}
142-
143-
@Test
144-
fun testExceptionInDispatch() {
145-
runTestExceptionInDispatch(5, { it is TestException }) {
146-
val dispatcher = object : CoroutineDispatcher() {
147-
private var closed = false
148-
override fun dispatch(context: CoroutineContext, block: Runnable) {
149-
if (closed) throw TestException()
150-
Dispatchers.Default.dispatch(context, block)
151-
}
152-
153-
fun close() {
154-
closed = true
155-
}
156-
}
157-
launch(dispatcher) {
158-
withContext(Dispatchers.Default) {
159-
expect(1)
160-
delay(100)
161-
expect(3)
162-
}
163-
}
164-
165-
delay(50)
166-
expect(2)
167-
dispatcher.close()
168-
}
169-
}
170-
171123
@Test
172124
fun testExceptionInIsDispatchNeeded() {
173125
val dispatcher = object : CoroutineDispatcher() {
@@ -194,31 +146,4 @@ class ExecutorsTest : TestBase() {
194146
finish(4)
195147
}
196148
}
197-
198-
private fun runTestExceptionInDispatch(
199-
totalSteps: Int,
200-
isExpectedException: (Throwable) -> Boolean,
201-
block: suspend CoroutineScope.() -> Unit,
202-
) {
203-
var mainThread: Thread? = null
204-
val exceptionHandler = CoroutineExceptionHandler { _, e ->
205-
if (isExpectedException(e)) {
206-
expect(totalSteps - 1)
207-
mainThread!!.run {
208-
interrupt()
209-
unpark(this)
210-
}
211-
} else {
212-
expectUnreached()
213-
}
214-
}
215-
try {
216-
runBlocking(exceptionHandler) {
217-
block()
218-
mainThread = Thread.currentThread()
219-
}
220-
} catch (_: InterruptedException) {
221-
finish(totalSteps)
222-
}
223-
}
224149
}

kotlinx-coroutines-core/jvm/test/RunBlockingJvmTest.kt

+178-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
package kotlinx.coroutines
22

33
import kotlinx.coroutines.testing.*
4-
import org.junit.*
4+
import java.util.concurrent.CountDownLatch
5+
import java.util.concurrent.atomic.AtomicReference
6+
import kotlin.concurrent.thread
7+
import kotlin.test.*
8+
import kotlin.time.Duration
59

610
class RunBlockingJvmTest : TestBase() {
711
@Test
@@ -12,5 +16,177 @@ class RunBlockingJvmTest : TestBase() {
1216
}
1317
rb.hashCode() // unused
1418
}
15-
}
1619

20+
/** Tests that the [runBlocking] coroutine runs to completion even it was interrupted. */
21+
@Test
22+
fun testFinishingWhenInterrupted() {
23+
startInSeparateThreadAndInterrupt { mayInterrupt ->
24+
expect(1)
25+
try {
26+
runBlocking {
27+
try {
28+
mayInterrupt()
29+
expect(2)
30+
delay(Duration.INFINITE)
31+
} finally {
32+
withContext(NonCancellable) {
33+
expect(3)
34+
repeat(10) { yield() }
35+
expect(4)
36+
}
37+
}
38+
}
39+
} catch (_: InterruptedException) {
40+
expect(5)
41+
}
42+
}
43+
finish(6)
44+
}
45+
46+
/** Tests that [runBlocking] will exit if it gets interrupted. */
47+
@Test
48+
fun testCancellingWhenInterrupted() {
49+
startInSeparateThreadAndInterrupt { mayInterrupt ->
50+
expect(1)
51+
try {
52+
runBlocking {
53+
try {
54+
mayInterrupt()
55+
expect(2)
56+
delay(Duration.INFINITE)
57+
} catch (_: CancellationException) {
58+
expect(3)
59+
}
60+
}
61+
} catch (_: InterruptedException) {
62+
expect(4)
63+
}
64+
}
65+
finish(5)
66+
}
67+
68+
/** Tests that [runBlocking] does not check for interruptions before the first attempt to suspend,
69+
* as no blocking actually happens. */
70+
@Test
71+
fun testInitialPortionRunningDespiteInterruptions() {
72+
Thread.currentThread().interrupt()
73+
runBlocking {
74+
expect(1)
75+
try {
76+
Thread.sleep(Long.MAX_VALUE)
77+
} catch (_: InterruptedException) {
78+
expect(2)
79+
}
80+
}
81+
assertFalse(Thread.interrupted())
82+
finish(3)
83+
}
84+
85+
/**
86+
* Tests that [runBlockingNonInterruptible] is going to run its job to completion even if it gets interrupted
87+
* or if thread switches occur.
88+
*/
89+
@Test
90+
fun testNonInterruptibleRunBlocking() {
91+
startInSeparateThreadAndInterrupt { mayInterrupt ->
92+
val v = runBlockingNonInterruptible {
93+
mayInterrupt()
94+
repeat(10) {
95+
expect(it + 1)
96+
delay(1)
97+
}
98+
42
99+
}
100+
assertTrue(Thread.interrupted())
101+
assertEquals(42, v)
102+
expect(11)
103+
}
104+
finish(12)
105+
}
106+
107+
/**
108+
* Tests that [runBlockingNonInterruptible] is going to run its job to completion even if it gets interrupted
109+
* or if thread switches occur, and then will rethrow the exception thrown by the job.
110+
*/
111+
@Test
112+
fun testNonInterruptibleRunBlockingFailure() {
113+
val exception = AssertionError()
114+
startInSeparateThreadAndInterrupt { mayInterrupt ->
115+
val exception2 = assertFailsWith<AssertionError> {
116+
runBlockingNonInterruptible {
117+
mayInterrupt()
118+
repeat(10) {
119+
expect(it + 1)
120+
// even thread switches should not be a problem
121+
withContext(Dispatchers.IO) {
122+
delay(1)
123+
}
124+
}
125+
throw exception
126+
}
127+
}
128+
assertTrue(Thread.interrupted())
129+
assertSame(exception, exception2)
130+
expect(11)
131+
}
132+
finish(12)
133+
}
134+
135+
136+
/**
137+
* Tests that [runBlockingNonInterruptible] is going to run its job to completion even if it gets interrupted
138+
* or if thread switches occur.
139+
*/
140+
@Test
141+
fun testNonInterruptibleRunBlockingPropagatingInterruptions() {
142+
val exception = AssertionError()
143+
startInSeparateThreadAndInterrupt { mayInterrupt ->
144+
runBlockingNonInterruptible {
145+
mayInterrupt()
146+
try {
147+
Thread.sleep(Long.MAX_VALUE)
148+
} catch (_: InterruptedException) {
149+
expect(1)
150+
}
151+
}
152+
expect(2)
153+
assertFalse(Thread.interrupted())
154+
}
155+
finish(3)
156+
}
157+
158+
/**
159+
* Tests that starting [runBlockingNonInterruptible] in an interrupted thread does not affect the result.
160+
*/
161+
@Test
162+
fun testNonInterruptibleRunBlockingStartingInterrupted() {
163+
Thread.currentThread().interrupt()
164+
val v = runBlockingNonInterruptible { 42 }
165+
assertEquals(42, v)
166+
assertTrue(Thread.interrupted())
167+
}
168+
169+
private fun startInSeparateThreadAndInterrupt(action: (mayInterrupt: () -> Unit) -> Unit) {
170+
val latch = CountDownLatch(1)
171+
val thread = thread {
172+
action { latch.countDown() }
173+
}
174+
latch.await()
175+
thread.interrupt()
176+
thread.join()
177+
}
178+
179+
private fun <T> runBlockingNonInterruptible(action: suspend () -> T): T {
180+
val result = AtomicReference<Result<T>>()
181+
try {
182+
runBlocking {
183+
withContext(NonCancellable) {
184+
result.set(runCatching { action() })
185+
}
186+
}
187+
} catch (_: InterruptedException) {
188+
Thread.currentThread().interrupt() // restore the interrupted flag
189+
}
190+
return result.get().getOrThrow()
191+
}
192+
}

0 commit comments

Comments
 (0)