-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathDefaultExecutor.kt
146 lines (126 loc) · 4.92 KB
/
DefaultExecutor.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
import java.util.concurrent.*
internal actual val DefaultDelay: Delay = DefaultExecutor
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
const val THREAD_NAME = "kotlinx.coroutines.DefaultExecutor"
init {
incrementUseCount() // this event loop is never completed
}
private const val DEFAULT_KEEP_ALIVE = 1000L // in milliseconds
private val KEEP_ALIVE_NANOS = TimeUnit.MILLISECONDS.toNanos(
try {
java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE)
} catch (e: SecurityException) {
DEFAULT_KEEP_ALIVE
})
@Suppress("ObjectPropertyName")
@Volatile
private var _thread: Thread? = null
override val thread: Thread
get() = _thread ?: createThreadSync()
private const val FRESH = 0
private const val ACTIVE = 1
private const val SHUTDOWN_REQ = 2
private const val SHUTDOWN_ACK = 3
@Volatile
private var debugStatus: Int = FRESH
private val isShutdownRequested: Boolean get() {
val debugStatus = debugStatus
return debugStatus == SHUTDOWN_REQ || debugStatus == SHUTDOWN_ACK
}
/**
* All event loops are using DefaultExecutor#invokeOnTimeout to avoid livelock on
* ```
* runBlocking(eventLoop) { withTimeout { while(isActive) { ... } } }
* ```
*
* Livelock is possible only if `runBlocking` is called on internal default executed (which is used by default [delay]),
* but it's not exposed as public API.
*/
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle =
scheduleInvokeOnTimeout(timeMillis, block)
override fun run() {
ThreadLocalEventLoop.setEventLoop(this)
registerTimeLoopThread()
try {
var shutdownNanos = Long.MAX_VALUE
if (!notifyStartup()) return
while (true) {
Thread.interrupted() // just reset interruption flag
var parkNanos = processNextEvent()
if (parkNanos == Long.MAX_VALUE) {
// nothing to do, initialize shutdown timeout
val now = nanoTime()
if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS
val tillShutdown = shutdownNanos - now
if (tillShutdown <= 0) return // shut thread down
parkNanos = parkNanos.coerceAtMost(tillShutdown)
} else
shutdownNanos = Long.MAX_VALUE
if (parkNanos > 0) {
// check if shutdown was requested and bail out in this case
if (isShutdownRequested) return
parkNanos(this, parkNanos)
}
}
} finally {
_thread = null // this thread is dead
acknowledgeShutdownIfNeeded()
unregisterTimeLoopThread()
// recheck if queues are empty after _thread reference was set to null (!!!)
if (!isEmpty) thread // recreate thread if it is needed
}
}
@Synchronized
private fun createThreadSync(): Thread {
return _thread ?: Thread(this, THREAD_NAME).apply {
_thread = this
isDaemon = true
start()
}
}
// used for tests
@Synchronized
internal fun ensureStarted() {
assert { _thread == null } // ensure we are at a clean state
assert { debugStatus == FRESH || debugStatus == SHUTDOWN_ACK }
debugStatus = FRESH
createThreadSync() // create fresh thread
while (debugStatus == FRESH) (this as Object).wait()
}
@Synchronized
private fun notifyStartup(): Boolean {
if (isShutdownRequested) return false
debugStatus = ACTIVE
(this as Object).notifyAll()
return true
}
// used for tests
@Synchronized
fun shutdown(timeout: Long) {
val deadline = System.currentTimeMillis() + timeout
if (!isShutdownRequested) debugStatus = SHUTDOWN_REQ
// loop while there is anything to do immediately or deadline passes
while (debugStatus != SHUTDOWN_ACK && _thread != null) {
_thread?.let { unpark(it) } // wake up thread if present
val remaining = deadline - System.currentTimeMillis()
if (remaining <= 0) break
(this as Object).wait(timeout)
}
// restore fresh status
debugStatus = FRESH
}
@Synchronized
private fun acknowledgeShutdownIfNeeded() {
if (!isShutdownRequested) return
debugStatus = SHUTDOWN_ACK
resetAll() // clear queues
(this as Object).notifyAll()
}
internal val isThreadPresent
get() = _thread != null
}