Skip to content

Commit adf3a11

Browse files
committed
Use a Dispatchers.IO thread for DefaultDelay on the JVM
1 parent c523a97 commit adf3a11

File tree

14 files changed

+131
-231
lines changed

14 files changed

+131
-231
lines changed

Diff for: docs/topics/coroutine-context-and-dispatchers.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ Produces the output:
115115
```text
116116
Unconfined : I'm working in thread main
117117
main runBlocking: I'm working in thread main
118-
Unconfined : After delay in thread kotlinx.coroutines.DefaultExecutor
118+
Unconfined : After delay in thread kotlinx.coroutines.DefaultDelay
119119
main runBlocking: After delay in thread main
120120
```
121121

Diff for: kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt

+74-143
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package kotlinx.coroutines
22

3+
import kotlinx.atomicfu.*
34
import kotlinx.coroutines.internal.*
4-
import java.util.concurrent.*
55
import kotlin.coroutines.*
6+
import kotlin.time.Duration
67

78
private val defaultMainDelayOptIn = systemProp("kotlinx.coroutines.main.delay", false)
89

@@ -21,78 +22,38 @@ private fun initializeDefaultDelay(): Delay {
2122
return if (main.isMissing() || main !is Delay) DefaultDelayImpl else main
2223
}
2324

24-
internal object DefaultExecutor {
25-
fun shutdown() = DefaultDelayImpl.shutdown()
26-
27-
fun ensureStarted() = DefaultDelayImpl.ensureStarted()
28-
29-
fun shutdownForTests(timeout: Long) = DefaultDelayImpl.shutdownForTests(timeout)
30-
31-
val isThreadPresent: Boolean get() = DefaultDelayImpl.isThreadPresent
25+
/**
26+
* This method can be invoked after all coroutines are completed to wait for the default delay executor to shut down
27+
* in response to the lack of tasks.
28+
*
29+
* This is only useful in tests to ensure that setting a fresh virtual time source will not confuse the default delay
30+
* still running the previous test.
31+
*
32+
* Does nothing if the default delay executor is not in use.
33+
*
34+
* @throws IllegalStateException if the shutdown process notices new tasks entering the system
35+
* @throws IllegalStateException if the shutdown process times out
36+
*/
37+
internal fun ensureDefaultDelayDeinitialized(timeout: Duration) {
38+
(DefaultDelay as? DefaultDelayImpl)?.shutdownForTests(timeout)
3239
}
3340

3441
private object DefaultDelayImpl : EventLoopImplBase(), Runnable {
35-
const val THREAD_NAME = "kotlinx.coroutines.DefaultExecutor"
42+
const val THREAD_NAME = "kotlinx.coroutines.DefaultDelay"
3643

3744
init {
3845
incrementUseCount() // this event loop is never completed
3946
}
4047

41-
private const val DEFAULT_KEEP_ALIVE_MS = 1000L // in milliseconds
42-
43-
private val KEEP_ALIVE_NANOS = TimeUnit.MILLISECONDS.toNanos(
44-
try {
45-
java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE_MS)
46-
} catch (e: SecurityException) {
47-
DEFAULT_KEEP_ALIVE_MS
48-
})
49-
50-
@Suppress("ObjectPropertyName")
51-
@Volatile
52-
private var _thread: Thread? = null
53-
54-
override val thread: Thread
55-
get() = _thread ?: createThreadSync()
56-
57-
private const val FRESH = 0
58-
private const val ACTIVE = 1
59-
private const val SHUTDOWN_REQ = 2
60-
private const val SHUTDOWN_ACK = 3
61-
private const val SHUTDOWN = 4
62-
63-
@Volatile
64-
private var debugStatus: Int = FRESH
65-
66-
private val isShutDown: Boolean get() = debugStatus == SHUTDOWN
67-
68-
private val isShutdownRequested: Boolean get() {
69-
val debugStatus = debugStatus
70-
return debugStatus == SHUTDOWN_REQ || debugStatus == SHUTDOWN_ACK
71-
}
72-
73-
override fun enqueue(task: Runnable) {
74-
if (isShutDown) shutdownError()
75-
super.enqueue(task)
76-
}
77-
78-
override fun reschedule(now: Long, delayedTask: DelayedTask) {
79-
// Reschedule on default executor can only be invoked after Dispatchers.shutdown
80-
shutdownError()
81-
}
48+
private val _thread = atomic<Thread?>(null)
8249

83-
private fun shutdownError() {
84-
throw RejectedExecutionException("DefaultExecutor was shut down. " +
85-
"This error indicates that Dispatchers.shutdown() was invoked prior to completion of exiting coroutines, leaving coroutines in incomplete state. " +
86-
"Please refer to Dispatchers.shutdown documentation for more details")
87-
}
88-
89-
override fun shutdown() {
90-
debugStatus = SHUTDOWN
91-
super.shutdown()
50+
/** Can only happen when tests close the default executor */
51+
override fun reschedule(now: Long, delayedTask: DelayedTask) {
52+
throw IllegalStateException("Attempted to schedule $delayedTask at $now after shutdown")
9253
}
9354

9455
/**
95-
* All event loops are using DefaultExecutor#invokeOnTimeout to avoid livelock on
56+
* All event loops are using DefaultDelay#invokeOnTimeout to avoid livelock on
9657
* ```
9758
* runBlocking(eventLoop) { withTimeout { while(isActive) { ... } } }
9859
* ```
@@ -104,100 +65,70 @@ private object DefaultDelayImpl : EventLoopImplBase(), Runnable {
10465
scheduleInvokeOnTimeout(timeMillis, block)
10566

10667
override fun run() {
107-
ThreadLocalEventLoop.setEventLoop(this)
108-
registerTimeLoopThread()
68+
val currentThread = Thread.currentThread()
69+
if (!_thread.compareAndSet(null, currentThread)) return // some other thread won the race to start the thread
70+
val oldName = currentThread.name
71+
currentThread.name = THREAD_NAME
10972
try {
110-
var shutdownNanos = Long.MAX_VALUE
111-
if (!notifyStartup()) return
112-
while (true) {
113-
Thread.interrupted() // just reset interruption flag
114-
var parkNanos = processNextEvent()
115-
if (parkNanos == Long.MAX_VALUE) {
116-
// nothing to do, initialize shutdown timeout
117-
val now = nanoTime()
118-
if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS
119-
val tillShutdown = shutdownNanos - now
120-
if (tillShutdown <= 0) return // shut thread down
121-
parkNanos = parkNanos.coerceAtMost(tillShutdown)
122-
} else
123-
shutdownNanos = Long.MAX_VALUE
124-
if (parkNanos > 0) {
125-
// check if shutdown was requested and bail out in this case
126-
if (isShutdownRequested) return
127-
parkNanos(this, parkNanos)
73+
ThreadLocalEventLoop.setEventLoop(DefaultDelayImpl)
74+
registerTimeLoopThread()
75+
try {
76+
while (true) {
77+
Thread.interrupted() // just reset interruption flag
78+
val parkNanos = processNextEvent()
79+
if (parkNanos == Long.MAX_VALUE) break // no more events
80+
parkNanos(DefaultDelayImpl, parkNanos)
81+
}
82+
} finally {
83+
_thread.value = null
84+
unregisterTimeLoopThread()
85+
// recheck if queues are empty after _thread reference was set to null (!!!)
86+
if (isEmpty) {
87+
notifyAboutThreadExiting()
88+
} else {
89+
/* recreate the thread, as there is still work to do,
90+
and `unpark` could have awoken the thread we're currently running on */
91+
startThreadOrObtainSleepingThread()
12892
}
12993
}
13094
} finally {
131-
_thread = null // this thread is dead
132-
acknowledgeShutdownIfNeeded()
133-
unregisterTimeLoopThread()
134-
// recheck if queues are empty after _thread reference was set to null (!!!)
135-
if (!isEmpty) thread // recreate thread if it is needed
95+
currentThread.name = oldName
13696
}
13797
}
13898

139-
@Synchronized
140-
private fun createThreadSync(): Thread {
141-
return _thread ?: Thread(this, THREAD_NAME).apply {
142-
_thread = this
143-
/*
144-
* `DefaultExecutor` is a global singleton that creates its thread lazily.
145-
* To isolate the classloaders properly, we are inherting the context classloader from
146-
* the singleton itself instead of using parent' thread one
147-
* in order not to accidentally capture temporary application classloader.
148-
*/
149-
contextClassLoader = this@DefaultDelayImpl.javaClass.classLoader
150-
isDaemon = true
151-
start()
152-
}
153-
}
154-
155-
// used for tests
156-
@Synchronized
157-
internal fun ensureStarted() {
158-
assert { _thread == null } // ensure we are at a clean state
159-
assert { debugStatus == FRESH || debugStatus == SHUTDOWN_ACK }
160-
debugStatus = FRESH
161-
createThreadSync() // create fresh thread
162-
while (debugStatus == FRESH) (this as Object).wait()
99+
override fun startThreadOrObtainSleepingThread(): Thread? {
100+
// Check if the thread is already running
101+
_thread.value?.let { return it }
102+
/* Now we know that at the moment of this call the thread was not initially running.
103+
This means that whatever thread is going to be running by the end of this function,
104+
it's going to notice the tasks it's supposed to run.
105+
We can return `null` unconditionally. */
106+
ioView.dispatch(ioView, this)
107+
return null
163108
}
164109

165-
@Synchronized
166-
private fun notifyStartup(): Boolean {
167-
if (isShutdownRequested) return false
168-
debugStatus = ACTIVE
169-
(this as Object).notifyAll()
170-
return true
171-
}
172-
173-
@Synchronized // used _only_ for tests
174-
fun shutdownForTests(timeout: Long) {
175-
val deadline = System.currentTimeMillis() + timeout
176-
if (!isShutdownRequested) debugStatus = SHUTDOWN_REQ
177-
// loop while there is anything to do immediately or deadline passes
178-
while (debugStatus != SHUTDOWN_ACK && _thread != null) {
179-
_thread?.let { unpark(it) } // wake up thread if present
180-
val remaining = deadline - System.currentTimeMillis()
181-
if (remaining <= 0) break
182-
(this as Object).wait(timeout)
110+
fun shutdownForTests(timeout: Duration) {
111+
if (_thread.value != null) {
112+
val end = System.currentTimeMillis() + timeout.inWholeMilliseconds
113+
while (true) {
114+
check(isEmpty) { "There are tasks in the DefaultExecutor" }
115+
synchronized(this) {
116+
unpark(_thread.value ?: return)
117+
val toWait = end - System.currentTimeMillis()
118+
check(toWait > 0) { "Timeout waiting for DefaultExecutor to shutdown" }
119+
(this as Object).wait(toWait)
120+
}
121+
}
183122
}
184-
// restore fresh status
185-
debugStatus = FRESH
186123
}
187124

188-
@Synchronized
189-
private fun acknowledgeShutdownIfNeeded() {
190-
if (!isShutdownRequested) return
191-
debugStatus = SHUTDOWN_ACK
192-
resetAll() // clear queues
193-
(this as Object).notifyAll()
125+
private fun notifyAboutThreadExiting() {
126+
synchronized(this) { (this as Object).notifyAll() }
194127
}
195128

196-
// User only for testing and nothing else
197-
internal val isThreadPresent
198-
get() = _thread != null
199-
200-
override fun toString(): String {
201-
return "DefaultExecutor"
202-
}
129+
override fun toString(): String = "DefaultDelay"
203130
}
131+
132+
/** A view separate from [Dispatchers.IO].
133+
* [Int.MAX_VALUE] instead of `1` to avoid needlessly using the [LimitedDispatcher] machinery. */
134+
private val ioView = Dispatchers.IO.limitedParallelism(Int.MAX_VALUE)

Diff for: kotlinx-coroutines-core/jvm/src/Dispatchers.kt

+2-3
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ public actual object Dispatchers {
6666
/**
6767
* Shuts down built-in dispatchers, such as [Default] and [IO],
6868
* stopping all the threads associated with them and making them reject all new tasks.
69-
* Dispatcher used as a fallback for time-related operations (`delay`, `withTimeout`)
70-
* and to handle rejected tasks from other dispatchers is also shut down.
69+
* Dispatchers used as fallbacks for time-related operations (`delay`, `withTimeout`)
70+
* and to handle rejected tasks from other dispatchers are also shut down.
7171
*
7272
* This is a **delicate** API. It is not supposed to be called from a general
7373
* application-level code and its invocation is irreversible.
@@ -85,7 +85,6 @@ public actual object Dispatchers {
8585
*/
8686
@DelicateCoroutinesApi
8787
public fun shutdown() {
88-
DefaultExecutor.shutdown()
8988
// Also shuts down Dispatchers.IO
9089
DefaultScheduler.shutdown()
9190
}

Diff for: kotlinx-coroutines-core/jvm/src/EventLoop.kt

+10-10
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,25 @@
11
package kotlinx.coroutines
22

3-
import kotlinx.coroutines.Runnable
4-
import kotlinx.coroutines.scheduling.*
53
import kotlinx.coroutines.scheduling.CoroutineScheduler
6-
import kotlin.coroutines.EmptyCoroutineContext
74

85
internal actual abstract class EventLoopImplPlatform: EventLoop() {
9-
10-
protected abstract val thread: Thread
6+
/** Returns `null` if a thread was created and doesn't need to be awoken.
7+
* Returns a thread to awaken if the thread already existed when this method was called. */
8+
protected abstract fun startThreadOrObtainSleepingThread(): Thread?
119

1210
protected actual fun unpark() {
13-
val thread = thread // atomic read
14-
if (Thread.currentThread() !== thread)
15-
unpark(thread)
11+
startThreadOrObtainSleepingThread()?.let(::unpark)
1612
}
1713

1814
}
1915

2016
internal class BlockingEventLoop(
21-
override val thread: Thread
22-
) : EventLoopImplBase()
17+
private val thread: Thread
18+
) : EventLoopImplBase() {
19+
override fun startThreadOrObtainSleepingThread(): Thread? =
20+
if (Thread.currentThread() !== thread) thread else null
21+
22+
}
2323

2424
internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.currentThread())
2525

Diff for: kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt

-26
Original file line numberDiff line numberDiff line change
@@ -33,30 +33,4 @@ class DefaultExecutorStressTest : TestBase() {
3333
}
3434
finish(2 + iterations * 4)
3535
}
36-
37-
@Test
38-
fun testWorkerShutdown() = withVirtualTimeSource {
39-
val iterations = 1_000 * stressTestMultiplier
40-
// wait for the worker to shut down
41-
suspend fun awaitWorkerShutdown() {
42-
val executorTimeoutMs = 1000L
43-
delay(executorTimeoutMs)
44-
while (DefaultExecutor.isThreadPresent) { delay(10) } // hangs if the thread refuses to stop
45-
assertFalse(DefaultExecutor.isThreadPresent) // just to make sure
46-
}
47-
runTest {
48-
awaitWorkerShutdown() // so that the worker shuts down after the initial launch
49-
repeat (iterations) {
50-
val job = launch(Dispatchers.Unconfined) {
51-
// this line runs in the main thread
52-
delay(1)
53-
// this line runs in the DefaultExecutor worker
54-
}
55-
delay(100) // yield the execution, allow the worker to spawn
56-
assertTrue(DefaultExecutor.isThreadPresent) // the worker spawned
57-
job.join()
58-
awaitWorkerShutdown()
59-
}
60-
}
61-
}
6236
}

Diff for: kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@ class DispatchersToStringTest {
2929
)
3030
}
3131
// Not overridden at all, limited parallelism returns `this`
32-
assertEquals("DefaultExecutor", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString())
32+
assertEquals("DefaultDelay", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString())
3333

3434
assertEquals("filesDispatcher", Dispatchers.IO.limitedParallelism(1, "filesDispatcher").toString())
3535
assertEquals("json", Dispatchers.Default.limitedParallelism(2, "json").toString())
3636
assertEquals("\uD80C\uDE11", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42, "\uD80C\uDE11").toString())
37-
assertEquals("DefaultExecutor", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString())
37+
assertEquals("DefaultDelay", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString())
3838

3939
val limitedNamed = Dispatchers.IO.limitedParallelism(10, "limited")
4040
assertEquals("limited.limitedParallelism(2)", limitedNamed.limitedParallelism(2).toString())
@@ -53,4 +53,4 @@ class DispatchersToStringTest {
5353
assertEquals("Named", named.toString())
5454
}
5555
}
56-
}
56+
}

0 commit comments

Comments
 (0)