-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathEventLoop.common.kt
131 lines (106 loc) · 4.33 KB
/
EventLoop.common.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
/**
* Extended by [CoroutineDispatcher] implementations that have event loop inside and can
* be asked to process next event from their event queue.
*
* It may optionally implement [Delay] interface and support time-scheduled tasks.
* It is created or pigged back onto (see [ThreadLocalEventLoop])
* by [runBlocking] and by [Dispatchers.Unconfined].
*
* @suppress **This an internal API and should not be used from general code.**
*/
internal abstract class EventLoop : CoroutineDispatcher() {
/**
* Processes next event in this event loop.
*
* The result of this function is to be interpreted like this:
* * `<= 0` -- there are potentially more events for immediate processing;
* * `> 0` -- a number of nanoseconds to wait for next scheduled event;
* * [Long.MAX_VALUE] -- no more events, or was invoked from the wrong thread.
*/
public abstract fun processNextEvent(): Long
public abstract val isEmpty: Boolean
/**
* Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded]
* into the current event loop.
*/
public fun dispatchUnconfined(task: DispatchedTask<*>) {
task.isUnconfinedTask = true
check(enqueue(task)) { "Attempting to dispatchUnconfined into the EventLoop that was shut down"}
queuedUnconfinedTasks++
}
public override fun dispatch(context: CoroutineContext, block: Runnable) {
if (block is DispatchedTask<*>) block.isUnconfinedTask = false
enqueue(block)
}
// returns true if it was successfully enqueued for execution in this event loop, false if got to default executor
public abstract fun enqueue(task: Runnable): Boolean
protected fun runBlock(block: Runnable) {
try {
block.run()
} finally {
if (block is DispatchedTask<*> && block.isUnconfinedTask) {
check(--queuedUnconfinedTasks >= 0) { "queuedUnconfinedTasks underflow" }
}
}
}
/**
* Counts the number of nested [runBlocking] and [Dispatchers.Unconfined] that use this event loop.
*/
private var useCount = 0L
/**
* Set to true on any use by [runBlocking], because it potentially leaks this loop to other threads, so
* this instance must be properly shutdown. We don't need to shutdown event loop that was used solely
* by [Dispatchers.Unconfined] -- it can be left as [ThreadLocalEventLoop] and reused next time.
*/
private var shared = false
/**
* Counts a number of currently enqueued (but not executed yet) unconfined tasks.
*/
private var queuedUnconfinedTasks = 0
public val isActive: Boolean
get() = useCount > 0
public val isUnconfinedLoopActive: Boolean
get() = useCount >= increment(unconfined = true)
public val isEmptyUnconfinedQueue: Boolean
get() = queuedUnconfinedTasks == 0
private fun increment(unconfined: Boolean) =
if (unconfined) (1L shl 32) else 1L
fun incrementUseCount(unconfined: Boolean = false) {
useCount += increment(unconfined)
if (!unconfined) shared = true
}
fun decrementUseCount(unconfined: Boolean = false) {
useCount -= increment(unconfined)
if (useCount > 0) return
check(useCount == 0L) { "Extra decrementUseCount" }
if (shared) {
// shut it down and remove from ThreadLocalEventLoop
shutdown()
} else {
// it was not shared, so it could not have accumulated any other tasks
check(isEmpty) { "EventLoop that was used only by unconfined tasks should be empty" }
}
}
protected open fun shutdown() {}
}
@NativeThreadLocal
internal object ThreadLocalEventLoop {
private val ref = CommonThreadLocal<EventLoop?>()
internal val eventLoop: EventLoop
get() = ref.get() ?: createEventLoop().also { ref.set(it) }
internal fun currentOrNull(): EventLoop? =
ref.get()
internal fun resetEventLoop() {
ref.set(null)
}
internal fun setEventLoop(eventLoop: EventLoop) {
ref.set(eventLoop)
}
}
internal expect fun createEventLoop(): EventLoop