|
16 | 16 |
|
17 | 17 | package kotlinx.coroutines.experimental
|
18 | 18 |
|
19 |
| -import java.util.concurrent.ScheduledExecutorService |
20 |
| -import java.util.concurrent.ScheduledThreadPoolExecutor |
21 | 19 | import java.util.concurrent.TimeUnit
|
22 | 20 |
|
23 |
| -private const val DEFAULT_KEEP_ALIVE = 1000L |
| 21 | +internal object DefaultExecutor : EventLoopBase(), Runnable { |
24 | 22 |
|
25 |
| -private val KEEP_ALIVE = |
26 |
| - try { java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE) } |
27 |
| - catch (e: SecurityException) { DEFAULT_KEEP_ALIVE } |
| 23 | + override val canComplete: Boolean get() = false |
| 24 | + override val isCompleted: Boolean get() = false |
28 | 25 |
|
29 |
| -@Volatile |
30 |
| -private var _executor: ScheduledExecutorService? = null |
| 26 | + private const val DEFAULT_KEEP_ALIVE = 1000L // in milliseconds |
31 | 27 |
|
32 |
| -internal val defaultExecutor: ScheduledExecutorService |
33 |
| - get() = _executor ?: getOrCreateExecutorSync() |
| 28 | + private val KEEP_ALIVE_NANOS = TimeUnit.MILLISECONDS.toNanos( |
| 29 | + try { |
| 30 | + java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE) |
| 31 | + } catch (e: SecurityException) { |
| 32 | + DEFAULT_KEEP_ALIVE |
| 33 | + }) |
| 34 | + |
| 35 | + @Volatile |
| 36 | + private var _thread: Thread? = null |
| 37 | + |
| 38 | + private const val FRESH = 0 |
| 39 | + private const val ACTIVE = 1 |
| 40 | + private const val SHUTDOWN_REQ = 2 |
| 41 | + private const val SHUTDOWN_ACK = 3 |
| 42 | + |
| 43 | + @Volatile |
| 44 | + private var debugStatus: Int = FRESH |
34 | 45 |
|
35 |
| -@Synchronized |
36 |
| -private fun getOrCreateExecutorSync(): ScheduledExecutorService = |
37 |
| - _executor ?: ScheduledThreadPoolExecutor(1) { r -> |
38 |
| - Thread(r, "kotlinx.coroutines.DefaultExecutor").apply { isDaemon = true } |
39 |
| - }.apply { |
40 |
| - setKeepAliveTime(KEEP_ALIVE, TimeUnit.MILLISECONDS) |
41 |
| - allowCoreThreadTimeOut(true) |
42 |
| - executeExistingDelayedTasksAfterShutdownPolicy = false |
43 |
| - // "setRemoveOnCancelPolicy" is available only since JDK7, so try it via reflection |
| 46 | + override fun run() { |
| 47 | + var shutdownNanos = Long.MAX_VALUE |
| 48 | + timeSource.registerTimeLoopThread() |
| 49 | + notifyStartup() |
44 | 50 | try {
|
45 |
| - val m = this::class.java.getMethod("setRemoveOnCancelPolicy", Boolean::class.javaPrimitiveType) |
46 |
| - m.invoke(this, true) |
47 |
| - } catch (ex: Throwable) { /* ignore */ } |
48 |
| - _executor = this |
| 51 | + runLoop@ while (true) { |
| 52 | + Thread.interrupted() // just reset interruption flag |
| 53 | + var parkNanos = processNextEvent() |
| 54 | + if (parkNanos == Long.MAX_VALUE) { |
| 55 | + // nothing to do, initialize shutdown timeout |
| 56 | + if (shutdownNanos == Long.MAX_VALUE) { |
| 57 | + val now = timeSource.nanoTime() |
| 58 | + if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS |
| 59 | + val tillShutdown = shutdownNanos - now |
| 60 | + if (tillShutdown <= 0) break@runLoop // shut thread down |
| 61 | + parkNanos = parkNanos.coerceAtMost(tillShutdown) |
| 62 | + } else |
| 63 | + parkNanos = parkNanos.coerceAtMost(KEEP_ALIVE_NANOS) // limit wait time anyway |
| 64 | + } |
| 65 | + if (parkNanos > 0) { |
| 66 | + // check if shutdown was requested and bail out in this case |
| 67 | + if (debugStatus == SHUTDOWN_REQ) { |
| 68 | + acknowledgeShutdown() |
| 69 | + break@runLoop |
| 70 | + } else { |
| 71 | + timeSource.parkNanos(this, parkNanos) |
| 72 | + } |
| 73 | + } |
| 74 | + } |
| 75 | + } finally { |
| 76 | + _thread = null // this thread is dead |
| 77 | + timeSource.unregisterTimeLoopThread() |
| 78 | + // recheck if queues are empty after _thread reference was set to null (!!!) |
| 79 | + if (!isEmpty) thread() // recreate thread if it is needed |
| 80 | + } |
49 | 81 | }
|
50 | 82 |
|
51 |
| -// used for tests |
52 |
| -@Synchronized |
53 |
| -internal fun shutdownDefaultExecutor(timeout: Long) { |
54 |
| - _executor?.apply { |
55 |
| - shutdown() |
56 |
| - awaitTermination(timeout, TimeUnit.MILLISECONDS) |
57 |
| - shutdownNow() // ignore all remaining |
58 |
| - _executor = null |
| 83 | + // ensure that thread is there |
| 84 | + private fun thread(): Thread = _thread ?: createThreadSync() |
| 85 | + |
| 86 | + @Synchronized |
| 87 | + private fun createThreadSync() = _thread ?: |
| 88 | + Thread(this, "kotlinx.coroutines.DefaultExecutor").apply { |
| 89 | + _thread = this |
| 90 | + isDaemon = true |
| 91 | + start() |
| 92 | + } |
| 93 | + |
| 94 | + override fun unpark() { |
| 95 | + timeSource.unpark(thread()) // as a side effect creates thread if it is not there |
59 | 96 | }
|
60 |
| -} |
61 | 97 |
|
| 98 | + override fun isCorrectThread(): Boolean = true |
| 99 | + |
| 100 | + // used for tests |
| 101 | + @Synchronized |
| 102 | + internal fun ensureStarted() { |
| 103 | + assert(_thread == null) // ensure we are at a clean state |
| 104 | + debugStatus = FRESH |
| 105 | + createThreadSync() // create fresh thread |
| 106 | + while (debugStatus == FRESH) (this as Object).wait() |
| 107 | + } |
| 108 | + |
| 109 | + @Synchronized |
| 110 | + private fun notifyStartup() { |
| 111 | + debugStatus = ACTIVE |
| 112 | + (this as Object).notifyAll() |
| 113 | + } |
| 114 | + |
| 115 | + // used for tests |
| 116 | + @Synchronized |
| 117 | + internal fun shutdown(timeout: Long) { |
| 118 | + if (_thread != null) { |
| 119 | + val deadline = System.currentTimeMillis() + timeout |
| 120 | + if (debugStatus == ACTIVE) debugStatus = SHUTDOWN_REQ |
| 121 | + unpark() |
| 122 | + // loop while there is anything to do immediately or deadline passes |
| 123 | + while (debugStatus != SHUTDOWN_ACK && _thread != null) { |
| 124 | + val remaining = deadline - System.currentTimeMillis() |
| 125 | + if (remaining <= 0) break |
| 126 | + (this as Object).wait(timeout) |
| 127 | + } |
| 128 | + } |
| 129 | + // restore fresh status |
| 130 | + debugStatus = FRESH |
| 131 | + } |
| 132 | + |
| 133 | + @Synchronized |
| 134 | + private fun acknowledgeShutdown() { |
| 135 | + debugStatus = SHUTDOWN_ACK |
| 136 | + clearAll() // clear queues |
| 137 | + (this as Object).notifyAll() |
| 138 | + } |
| 139 | +} |
0 commit comments