1
- @file:OptIn(ExperimentalContracts ::class , ObsoleteWorkersApi ::class )
2
- @file:Suppress(" LEAKED_IN_PLACE_LAMBDA" , " WRONG_INVOCATION_KIND" )
1
+ @file:OptIn(ObsoleteWorkersApi ::class )
3
2
4
3
package kotlinx.coroutines
5
4
6
- import kotlinx.cinterop.*
7
- import kotlin.contracts.*
8
5
import kotlin.coroutines.*
9
6
import kotlin.native.concurrent.*
10
7
11
- /* *
12
- * Runs a new coroutine and **blocks** the current thread _interruptibly_ until its completion.
13
- *
14
- * It is designed to bridge regular blocking code to libraries that are written in suspending style, to be used in
15
- * `main` functions and in tests.
16
- *
17
- * Calling [runBlocking] from a suspend function is redundant.
18
- * For example, the following code is incorrect:
19
- * ```
20
- * suspend fun loadConfiguration() {
21
- * // DO NOT DO THIS:
22
- * val data = runBlocking { // <- redundant and blocks the thread, do not do that
23
- * fetchConfigurationData() // suspending function
24
- * }
25
- * ```
26
- *
27
- * Here, instead of releasing the thread on which `loadConfiguration` runs if `fetchConfigurationData` suspends, it will
28
- * block, potentially leading to thread starvation issues.
29
- *
30
- * The default [CoroutineDispatcher] for this builder is an internal implementation of event loop that processes continuations
31
- * in this blocked thread until the completion of this coroutine.
32
- * See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`.
33
- *
34
- * When [CoroutineDispatcher] is explicitly specified in the [context], then the new coroutine runs in the context of
35
- * the specified dispatcher while the current thread is blocked. If the specified dispatcher is an event loop of another `runBlocking`,
36
- * then this invocation uses the outer event loop.
37
- *
38
- * If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and
39
- * this `runBlocking` invocation throws [InterruptedException].
40
- *
41
- * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available
42
- * for a newly created coroutine.
43
- *
44
- * @param context the context of the coroutine. The default value is an event loop on the current thread.
45
- * @param block the coroutine code.
46
- */
47
- public actual fun <T > runBlocking (context : CoroutineContext , block : suspend CoroutineScope .() -> T ): T {
48
- contract {
49
- callsInPlace(block, InvocationKind .EXACTLY_ONCE )
50
- }
51
- val contextInterceptor = context[ContinuationInterceptor ]
52
- val eventLoop: EventLoop ?
53
- val newContext: CoroutineContext
54
- if (contextInterceptor == null ) {
55
- // create or use private event loop if no dispatcher is specified
56
- eventLoop = ThreadLocalEventLoop .eventLoop
57
- newContext = GlobalScope .newCoroutineContext(context + eventLoop)
58
- } else {
59
- // See if context's interceptor is an event loop that we shall use (to support TestContext)
60
- // or take an existing thread-local event loop if present to avoid blocking it (but don't create one)
61
- eventLoop = (contextInterceptor as ? EventLoop )?.takeIf { it.shouldBeProcessedFromContext() }
62
- ? : ThreadLocalEventLoop .currentOrNull()
63
- newContext = GlobalScope .newCoroutineContext(context)
64
- }
65
- val coroutine = BlockingCoroutine <T >(newContext, eventLoop)
66
- var completed = false
67
- ThreadLocalKeepAlive .addCheck { ! completed }
8
+ internal actual fun <T > runBlockingImpl (
9
+ newContext : CoroutineContext , eventLoop : EventLoop ? , block : suspend CoroutineScope .() -> T
10
+ ): T {
11
+ val coroutine = BlockingCoroutine <T >(newContext, Worker .current, eventLoop)
12
+ ThreadLocalKeepAlive .registerUsage()
68
13
try {
69
14
coroutine.start(CoroutineStart .DEFAULT , coroutine, block)
70
15
return coroutine.joinBlocking()
71
16
} finally {
72
- completed = true
17
+ ThreadLocalKeepAlive .unregisterUsage()
73
18
}
74
19
}
75
20
76
21
@ThreadLocal
77
22
private object ThreadLocalKeepAlive {
78
- /* * If any of these checks passes , this means this [Worker] is still used. */
79
- private var checks = mutableListOf < () -> Boolean > ()
23
+ /* * If larger than 0 , this means this [Worker] is still used. */
24
+ private var usages = 0
80
25
81
26
/* * Whether the worker currently tries to keep itself alive. */
82
27
private var keepAliveLoopActive = false
83
28
84
- /* * Adds another stopgap that must be passed before the [Worker] can be terminated . */
85
- fun addCheck ( terminationForbidden : () -> Boolean ) {
86
- checks.add(terminationForbidden)
29
+ /* * Ensure that the worker is kept alive until the matching [unregisterUsage] is called . */
30
+ fun registerUsage ( ) {
31
+ usages ++
87
32
if (! keepAliveLoopActive) keepAlive()
88
33
}
89
34
35
+ /* * Undo [registerUsage]. */
36
+ fun unregisterUsage () {
37
+ usages--
38
+ }
39
+
90
40
/* *
91
41
* Send a ping to the worker to prevent it from terminating while this coroutine is running,
92
42
* ensuring that continuations don't get dropped and forgotten.
93
43
*/
94
44
private fun keepAlive () {
95
- // only keep the checks that still forbid the termination
96
- checks = checks.filter { it() }.toMutableList()
97
45
// if there are no checks left, we no longer keep the worker alive, it can be terminated
98
- keepAliveLoopActive = checks.isNotEmpty()
46
+ keepAliveLoopActive = usages > 0
99
47
if (keepAliveLoopActive) {
100
48
Worker .current.executeAfter(afterMicroseconds = 100_000 ) {
101
49
keepAlive()
@@ -106,9 +54,9 @@ private object ThreadLocalKeepAlive {
106
54
107
55
private class BlockingCoroutine <T >(
108
56
parentContext : CoroutineContext ,
57
+ private val joinWorker : Worker ,
109
58
private val eventLoop : EventLoop ?
110
59
) : AbstractCoroutine<T>(parentContext, true , true ) {
111
- private val joinWorker = Worker .current
112
60
113
61
override val isScopedCoroutine: Boolean get() = true
114
62
0 commit comments