Skip to content

Commit eb21974

Browse files
authored
Prevent runBlocking failure when the Worker it runs on terminates (#3585)
1 parent 71125e3 commit eb21974

File tree

2 files changed

+67
-2
lines changed

2 files changed

+67
-2
lines changed

kotlinx-coroutines-core/native/src/Builders.kt

+39-2
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,45 @@ public actual fun <T> runBlocking(context: CoroutineContext, block: suspend Coro
5252
newContext = GlobalScope.newCoroutineContext(context)
5353
}
5454
val coroutine = BlockingCoroutine<T>(newContext, eventLoop)
55-
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
56-
return coroutine.joinBlocking()
55+
var completed = false
56+
ThreadLocalKeepAlive.addCheck { !completed }
57+
try {
58+
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
59+
return coroutine.joinBlocking()
60+
} finally {
61+
completed = true
62+
}
63+
}
64+
65+
@ThreadLocal
66+
private object ThreadLocalKeepAlive {
67+
/** If any of these checks passes, this means this [Worker] is still used. */
68+
private var checks = mutableListOf<() -> Boolean>()
69+
70+
/** Whether the worker currently tries to keep itself alive. */
71+
private var keepAliveLoopActive = false
72+
73+
/** Adds another stopgap that must be passed before the [Worker] can be terminated. */
74+
fun addCheck(terminationForbidden: () -> Boolean) {
75+
checks.add(terminationForbidden)
76+
if (!keepAliveLoopActive) keepAlive()
77+
}
78+
79+
/**
80+
* Send a ping to the worker to prevent it from terminating while this coroutine is running,
81+
* ensuring that continuations don't get dropped and forgotten.
82+
*/
83+
private fun keepAlive() {
84+
// only keep the checks that still forbid the termination
85+
checks = checks.filter { it() }.toMutableList()
86+
// if there are no checks left, we no longer keep the worker alive, it can be terminated
87+
keepAliveLoopActive = checks.isNotEmpty()
88+
if (keepAliveLoopActive) {
89+
Worker.current.executeAfter(afterMicroseconds = 100_000) {
90+
keepAlive()
91+
}
92+
}
93+
}
5794
}
5895

5996
private class BlockingCoroutine<T>(

kotlinx-coroutines-core/native/test/WorkerTest.kt

+28
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package kotlinx.coroutines
66

7+
import kotlinx.coroutines.channels.*
78
import kotlin.coroutines.*
89
import kotlin.native.concurrent.*
910
import kotlin.test.*
@@ -34,4 +35,31 @@ class WorkerTest : TestBase() {
3435
}.result
3536
worker.requestTermination()
3637
}
38+
39+
/**
40+
* Test that [runBlocking] does not crash after [Worker.requestTermination] is called on the worker that runs it.
41+
*/
42+
@Test
43+
fun testRunBlockingInTerminatedWorker() {
44+
val workerInRunBlocking = Channel<Unit>()
45+
val workerTerminated = Channel<Unit>()
46+
val checkResumption = Channel<Unit>()
47+
val finished = Channel<Unit>()
48+
val worker = Worker.start()
49+
worker.executeAfter(0) {
50+
runBlocking {
51+
workerInRunBlocking.send(Unit)
52+
workerTerminated.receive()
53+
checkResumption.receive()
54+
finished.send(Unit)
55+
}
56+
}
57+
runBlocking {
58+
workerInRunBlocking.receive()
59+
worker.requestTermination()
60+
workerTerminated.send(Unit)
61+
checkResumption.send(Unit)
62+
finished.receive()
63+
}
64+
}
3765
}

0 commit comments

Comments
 (0)