Skip to content

Commit 37a2d73

Browse files
committed
EventLoop integration and reuse for runBlocking and Unconfined dispatchers
- Event loop that is created by runBlocking or by Unconfined dispatcher is reused across the same thread to prevent blocking of the loop - Semantics of runBlocking and Unconfined are fully retained - DefaultExecutor also registers itself as running event loop and thus cannot be blocked by runBlocking - Consolidates thread-local handling for native - Also fixes thread-local memory leak on JVM (does not use custom class) Fixes #860
1 parent b733f7d commit 37a2d73

File tree

23 files changed

+537
-342
lines changed

23 files changed

+537
-342
lines changed

common/kotlinx-coroutines-core-common/src/Dispatched.kt

+75-64
Original file line numberDiff line numberDiff line change
@@ -12,70 +12,64 @@ import kotlin.jvm.*
1212
@SharedImmutable
1313
private val UNDEFINED = Symbol("UNDEFINED")
1414

15-
@NativeThreadLocal
16-
internal object UndispatchedEventLoop {
17-
data class EventLoop(
18-
@JvmField var isActive: Boolean = false,
19-
@JvmField val queue: ArrayQueue<Runnable> = ArrayQueue()
20-
)
21-
22-
@JvmField
23-
internal val threadLocalEventLoop = CommonThreadLocal { EventLoop() }
24-
25-
/**
26-
* Executes given [block] as part of current event loop, updating related to block [continuation]
27-
* mode and state if continuation is not resumed immediately.
28-
* [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty).
29-
* Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise.
30-
*/
31-
inline fun execute(continuation: DispatchedContinuation<*>, contState: Any?, mode: Int,
32-
doYield: Boolean = false, block: () -> Unit) : Boolean {
33-
val eventLoop = threadLocalEventLoop.get()
34-
if (eventLoop.isActive) {
35-
// If we are yielding and queue is empty, we can bail out as part of fast path
36-
if (doYield && eventLoop.queue.isEmpty) {
37-
return false
38-
}
39-
40-
continuation._state = contState
41-
continuation.resumeMode = mode
42-
eventLoop.queue.addLast(continuation)
43-
return true
44-
}
45-
46-
runEventLoop(eventLoop, block)
47-
return false
15+
/**
16+
* Executes given [block] as part of current event loop, updating related to block [continuation]
17+
* mode and state if continuation is not resumed immediately.
18+
* [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty).
19+
* Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise.
20+
*/
21+
private inline fun executeUndispatched(
22+
continuation: DispatchedContinuation<*>, contState: Any?, mode: Int,
23+
doYield: Boolean = false, block: () -> Unit
24+
) : Boolean {
25+
val eventLoop = ThreadLocalEventLoop.eventLoop
26+
// If we are yielding and undispatched queue is empty, we can bail out as part of fast path
27+
if (doYield && eventLoop.isEmptyUnconfinedQueue) return false
28+
return if (eventLoop.isUnconfinedLoopActive) {
29+
// When undispatched loop is active -- dispatch continuation for execution to avoid stack overflow
30+
continuation._state = contState
31+
continuation.resumeMode = mode
32+
eventLoop.dispatchUnconfined(continuation)
33+
true // queued into the active loop
34+
} else {
35+
// Was not active -- run event loop until undispatched tasks are executed
36+
runUndispatchedEventLoop(eventLoop, block = block)
37+
false
4838
}
39+
}
4940

50-
fun resumeUndispatched(task: DispatchedTask<*>): Boolean {
51-
val eventLoop = threadLocalEventLoop.get()
52-
if (eventLoop.isActive) {
53-
eventLoop.queue.addLast(task)
54-
return true
41+
private fun resumeUndispatched(task: DispatchedTask<*>) {
42+
val eventLoop = ThreadLocalEventLoop.eventLoop
43+
if (eventLoop.isUnconfinedLoopActive) {
44+
// When undispatched loop is active -- dispatch continuation for execution to avoid stack overflow
45+
eventLoop.dispatchUnconfined(task)
46+
} else {
47+
// Was not active -- run event loop until undispatched tasks are executed
48+
runUndispatchedEventLoop(eventLoop) {
49+
task.resume(task.delegate, MODE_UNDISPATCHED)
5550
}
56-
57-
runEventLoop(eventLoop, { task.resume(task.delegate, MODE_UNDISPATCHED) })
58-
return false
5951
}
52+
}
6053

61-
inline fun runEventLoop(eventLoop: EventLoop, block: () -> Unit) {
62-
try {
63-
eventLoop.isActive = true
64-
block()
65-
while (true) {
66-
val nextEvent = eventLoop.queue.removeFirstOrNull() ?: return
67-
nextEvent.run()
68-
}
69-
} catch (e: Throwable) {
70-
/*
71-
* This exception doesn't happen normally, only if user either submitted throwing runnable
72-
* or if we have a bug in implementation. Anyway, reset state of the dispatcher to the initial.
73-
*/
74-
eventLoop.queue.clear()
75-
throw DispatchException("Unexpected exception in undispatched event loop, clearing pending tasks", e)
76-
} finally {
77-
eventLoop.isActive = false
54+
private inline fun runUndispatchedEventLoop(
55+
eventLoop: EventLoop,
56+
block: () -> Unit
57+
) {
58+
eventLoop.incrementUseCount(unconfined = true)
59+
try {
60+
block()
61+
while (eventLoop.processNextEvent() <= 0) {
62+
// break when all undispatched continuations where executed
63+
if (eventLoop.isEmptyUnconfinedQueue) break
7864
}
65+
} catch (e: Throwable) {
66+
/*
67+
* This exception doesn't happen normally, only if user either submitted throwing runnable
68+
* or if we have a bug in implementation. Throw an exception that better explains the problem.
69+
*/
70+
throw DispatchException("Unexpected exception in undispatched event loop", e)
71+
} finally {
72+
eventLoop.decrementUseCount(unconfined = true)
7973
}
8074
}
8175

@@ -107,7 +101,7 @@ internal class DispatchedContinuation<in T>(
107101
resumeMode = MODE_ATOMIC_DEFAULT
108102
dispatcher.dispatch(context, this)
109103
} else {
110-
UndispatchedEventLoop.execute(this, state, MODE_ATOMIC_DEFAULT) {
104+
executeUndispatched(this, state, MODE_ATOMIC_DEFAULT) {
111105
withCoroutineContext(this.context, countOrElement) {
112106
continuation.resumeWith(result)
113107
}
@@ -122,7 +116,7 @@ internal class DispatchedContinuation<in T>(
122116
resumeMode = MODE_CANCELLABLE
123117
dispatcher.dispatch(context, this)
124118
} else {
125-
UndispatchedEventLoop.execute(this, value, MODE_CANCELLABLE) {
119+
executeUndispatched(this, value, MODE_CANCELLABLE) {
126120
if (!resumeCancelled()) {
127121
resumeUndispatched(value)
128122
}
@@ -139,7 +133,7 @@ internal class DispatchedContinuation<in T>(
139133
resumeMode = MODE_CANCELLABLE
140134
dispatcher.dispatch(context, this)
141135
} else {
142-
UndispatchedEventLoop.execute(this, state, MODE_CANCELLABLE) {
136+
executeUndispatched(this, state, MODE_CANCELLABLE) {
143137
if (!resumeCancelled()) {
144138
resumeUndispatchedWithException(exception)
145139
}
@@ -204,9 +198,26 @@ internal fun <T> Continuation<T>.resumeDirectWithException(exception: Throwable)
204198
else -> resumeWithException(exception)
205199
}
206200

201+
private const val UNCONFINED_TASK_BIT = 1 shl 31
202+
207203
internal abstract class DispatchedTask<in T>(
208-
@JvmField var resumeMode: Int
204+
resumeMode: Int
209205
) : SchedulerTask() {
206+
private var _resumeMode: Int = resumeMode // can have UNCONFINED_TASK_BIT set
207+
208+
public var resumeMode: Int
209+
get() = _resumeMode and UNCONFINED_TASK_BIT.inv()
210+
set(value) { _resumeMode = value }
211+
212+
/**
213+
* Set to `true` when this task comes from [Dispatchers.Unconfined] or from another dispatcher
214+
* that returned `false` from [CoroutineDispatcher.isDispatchNeeded],
215+
* but there was event loop running, so it was submitted into that event loop.
216+
*/
217+
public var isUnconfinedTask: Boolean
218+
get() = _resumeMode and UNCONFINED_TASK_BIT != 0
219+
set(value) { _resumeMode = if (value) resumeMode or UNCONFINED_TASK_BIT else resumeMode }
220+
210221
public abstract val delegate: Continuation<T>
211222

212223
public abstract fun takeState(): Any?
@@ -246,7 +257,7 @@ internal abstract class DispatchedTask<in T>(
246257
}
247258

248259
internal fun DispatchedContinuation<Unit>.yieldUndispatched(): Boolean =
249-
UndispatchedEventLoop.execute(this, Unit, MODE_CANCELLABLE, doYield = true) {
260+
executeUndispatched(this, Unit, MODE_CANCELLABLE, doYield = true) {
250261
run()
251262
}
252263

@@ -259,7 +270,7 @@ internal fun <T> DispatchedTask<T>.dispatch(mode: Int = MODE_CANCELLABLE) {
259270
if (dispatcher.isDispatchNeeded(context)) {
260271
dispatcher.dispatch(context, this)
261272
} else {
262-
UndispatchedEventLoop.resumeUndispatched(this)
273+
resumeUndispatched(this)
263274
}
264275
} else {
265276
resume(delegate, mode)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlinx.coroutines.internal.*
8+
import kotlin.coroutines.*
9+
10+
/**
11+
* Extended by [CoroutineDispatcher] implementations that have event loop inside and can
12+
* be asked to process next event from their event queue.
13+
*
14+
* It may optionally implement [Delay] interface and support time-scheduled tasks.
15+
* It is created or pigged back onto (see [ThreadLocalEventLoop])
16+
* by [runBlocking] and by [Dispatchers.Unconfined].
17+
*
18+
* @suppress **This an internal API and should not be used from general code.**
19+
*/
20+
internal abstract class EventLoop : CoroutineDispatcher() {
21+
/**
22+
* Processes next event in this event loop.
23+
*
24+
* The result of this function is to be interpreted like this:
25+
* * `<= 0` -- there are potentially more events for immediate processing;
26+
* * `> 0` -- a number of nanoseconds to wait for next scheduled event;
27+
* * [Long.MAX_VALUE] -- no more events, or was invoked from the wrong thread.
28+
*/
29+
public abstract fun processNextEvent(): Long
30+
31+
public abstract val isEmpty: Boolean
32+
33+
/**
34+
* Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded]
35+
* into the current event loop.
36+
*/
37+
public fun dispatchUnconfined(task: DispatchedTask<*>) {
38+
task.isUnconfinedTask = true
39+
check(enqueue(task)) { "Attempting to dispatchUnconfined into the EventLoop that was shut down"}
40+
queuedUnconfinedTasks++
41+
}
42+
43+
public override fun dispatch(context: CoroutineContext, block: Runnable) {
44+
if (block is DispatchedTask<*>) block.isUnconfinedTask = false
45+
enqueue(block)
46+
}
47+
48+
// returns true if it was successfully enqueued for execution in this event loop, false if got to default executor
49+
public abstract fun enqueue(task: Runnable): Boolean
50+
51+
protected fun runBlock(block: Runnable) {
52+
try {
53+
block.run()
54+
} finally {
55+
if (block is DispatchedTask<*> && block.isUnconfinedTask) {
56+
check(--queuedUnconfinedTasks >= 0) { "queuedUnconfinedTasks underflow" }
57+
}
58+
}
59+
}
60+
61+
/**
62+
* Counts the number of nested [runBlocking] and [Dispatchers.Unconfined] that use this event loop.
63+
*/
64+
private var useCount = 0L
65+
66+
/**
67+
* Set to true on any use by [runBlocking], because it potentially leaks this loop to other threads, so
68+
* this instance must be properly shutdown. We don't need to shutdown event loop that was used solely
69+
* by [Dispatchers.Unconfined] -- it can be left as [ThreadLocalEventLoop] and reused next time.
70+
*/
71+
private var shared = false
72+
73+
/**
74+
* Counts a number of currently enqueued (but not executed yet) unconfined tasks.
75+
*/
76+
private var queuedUnconfinedTasks = 0
77+
78+
public val isActive: Boolean
79+
get() = useCount > 0
80+
81+
public val isUnconfinedLoopActive: Boolean
82+
get() = useCount >= increment(unconfined = true)
83+
84+
public val isEmptyUnconfinedQueue: Boolean
85+
get() = queuedUnconfinedTasks == 0
86+
87+
private fun increment(unconfined: Boolean) =
88+
if (unconfined) (1L shl 32) else 1L
89+
90+
fun incrementUseCount(unconfined: Boolean = false) {
91+
useCount += increment(unconfined)
92+
if (!unconfined) shared = true
93+
}
94+
95+
fun decrementUseCount(unconfined: Boolean = false) {
96+
useCount -= increment(unconfined)
97+
if (useCount > 0) return
98+
check(useCount == 0L) { "Extra decrementUseCount" }
99+
if (shared) {
100+
// shut it down and remove from ThreadLocalEventLoop
101+
shutdown()
102+
} else {
103+
// it was not shared, so it could not have accumulated any other tasks
104+
check(isEmpty) { "EventLoop that was used only by unconfined tasks should be empty" }
105+
}
106+
}
107+
108+
protected open fun shutdown() {}
109+
}
110+
111+
@NativeThreadLocal
112+
internal object ThreadLocalEventLoop {
113+
private val ref = CommonThreadLocal<EventLoop?>()
114+
115+
internal val eventLoop: EventLoop
116+
get() = ref.get() ?: createEventLoop().also { ref.set(it) }
117+
118+
internal fun currentOrNull(): EventLoop? =
119+
ref.get()
120+
121+
internal fun resetEventLoop() {
122+
ref.set(null)
123+
}
124+
125+
internal fun setEventLoop(eventLoop: EventLoop) {
126+
ref.set(eventLoop)
127+
}
128+
}
129+
130+
internal expect fun createEventLoop(): EventLoop
131+

common/kotlinx-coroutines-core-common/src/internal/ThreadLocal.common.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package kotlinx.coroutines.internal
88
@UseExperimental(ExperimentalMultiplatform::class)
99
internal expect annotation class NativeThreadLocal()
1010

11-
internal expect class CommonThreadLocal<T>(supplier: () -> T) {
11+
internal expect class CommonThreadLocal<T>() {
1212
fun get(): T
13+
fun set(value: T)
1314
}

0 commit comments

Comments
 (0)