Skip to content

Commit 1786eb0

Browse files
authored
Introduce Dispatchers.shutdown (#2915)
Fixes #2558
1 parent 81d7780 commit 1786eb0

File tree

9 files changed

+73
-16
lines changed

9 files changed

+73
-16
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+1
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ public final class kotlinx/coroutines/Dispatchers {
279279
public static final fun getIO ()Lkotlinx/coroutines/CoroutineDispatcher;
280280
public static final fun getMain ()Lkotlinx/coroutines/MainCoroutineDispatcher;
281281
public static final fun getUnconfined ()Lkotlinx/coroutines/CoroutineDispatcher;
282+
public final fun shutdown ()V
282283
}
283284

284285
public final class kotlinx/coroutines/DispatchersKt {

kotlinx-coroutines-core/common/src/EventLoop.common.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ internal abstract class EventLoop : CoroutineDispatcher() {
115115
}
116116
}
117117

118-
protected open fun shutdown() {}
118+
open fun shutdown() {}
119119
}
120120

121121
@ThreadLocal
@@ -279,7 +279,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
279279

280280
public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)
281281

282-
public fun enqueue(task: Runnable) {
282+
open fun enqueue(task: Runnable) {
283283
if (enqueueImpl(task)) {
284284
// todo: we should unpark only when this delayed task became first in the queue
285285
unpark()

kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt

+29-6
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
1717
incrementUseCount() // this event loop is never completed
1818
}
1919

20-
private const val DEFAULT_KEEP_ALIVE = 1000L // in milliseconds
20+
private const val DEFAULT_KEEP_ALIVE_MS = 1000L // in milliseconds
2121

2222
private val KEEP_ALIVE_NANOS = TimeUnit.MILLISECONDS.toNanos(
2323
try {
24-
java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE)
24+
java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE_MS)
2525
} catch (e: SecurityException) {
26-
DEFAULT_KEEP_ALIVE
26+
DEFAULT_KEEP_ALIVE_MS
2727
})
2828

2929
@Suppress("ObjectPropertyName")
@@ -37,15 +37,39 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
3737
private const val ACTIVE = 1
3838
private const val SHUTDOWN_REQ = 2
3939
private const val SHUTDOWN_ACK = 3
40+
private const val SHUTDOWN = 4
4041

4142
@Volatile
4243
private var debugStatus: Int = FRESH
4344

45+
val isShutDown: Boolean get() = debugStatus == SHUTDOWN
46+
4447
private val isShutdownRequested: Boolean get() {
4548
val debugStatus = debugStatus
4649
return debugStatus == SHUTDOWN_REQ || debugStatus == SHUTDOWN_ACK
4750
}
4851

52+
actual override fun enqueue(task: Runnable) {
53+
if (isShutDown) shutdownError()
54+
super.enqueue(task)
55+
}
56+
57+
override fun reschedule(now: Long, delayedTask: DelayedTask) {
58+
// Reschedule on default executor can only be invoked after Dispatchers.shutdown
59+
shutdownError()
60+
}
61+
62+
private fun shutdownError() {
63+
throw RejectedExecutionException("DefaultExecutor was shut down. " +
64+
"This error indicates that Dispatchers.shutdown() was invoked prior to completion of exiting coroutines, leaving coroutines in incomplete state. " +
65+
"Please refer to Dispatchers.shutdown documentation for more details")
66+
}
67+
68+
override fun shutdown() {
69+
debugStatus = SHUTDOWN
70+
super.shutdown()
71+
}
72+
4973
/**
5074
* All event loops are using DefaultExecutor#invokeOnTimeout to avoid livelock on
5175
* ```
@@ -118,9 +142,8 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
118142
return true
119143
}
120144

121-
// used for tests
122-
@Synchronized
123-
fun shutdown(timeout: Long) {
145+
@Synchronized // used _only_ for tests
146+
fun shutdownForTests(timeout: Long) {
124147
val deadline = System.currentTimeMillis() + timeout
125148
if (!isShutdownRequested) debugStatus = SHUTDOWN_REQ
126149
// loop while there is anything to do immediately or deadline passes

kotlinx-coroutines-core/jvm/src/Dispatchers.kt

+28-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public const val IO_PARALLELISM_PROPERTY_NAME: String = "kotlinx.coroutines.io.p
2121
public actual object Dispatchers {
2222
/**
2323
* The default [CoroutineDispatcher] that is used by all standard builders like
24-
* [launch][CoroutineScope.launch], [async][CoroutineScope.async], etc
24+
* [launch][CoroutineScope.launch], [async][CoroutineScope.async], etc.
2525
* if no dispatcher nor any other [ContinuationInterceptor] is specified in their context.
2626
*
2727
* It is backed by a shared pool of threads on JVM. By default, the maximal level of parallelism used
@@ -116,4 +116,31 @@ public actual object Dispatchers {
116116
*/
117117
@JvmStatic
118118
public val IO: CoroutineDispatcher = DefaultScheduler.IO
119+
120+
/**
121+
* Shuts down built-in dispatchers, such as [Default] and [IO],
122+
* stopping all the threads associated with them and making them reject all new tasks.
123+
* Dispatcher used as a fallback for time-related operations (`delay`, `withTimeout`)
124+
* and to handle rejected tasks from other dispatchers is also shut down.
125+
*
126+
* This is a **delicate** API. It is not supposed to be called from a general
127+
* application-level code and its invocation is irreversible.
128+
* The invocation of shutdown affects most of the coroutines machinery and
129+
* leaves the coroutines framework in an inoperable state.
130+
* The shutdown method should only be invoked when there are no pending tasks or active coroutines.
131+
* Otherwise, the behavior is unspecified: the call to `shutdown` may throw an exception without completing
132+
* the shutdown, or it may finish successfully, but the remaining jobs will be in a permanent dormant state,
133+
* never completing nor executing.
134+
*
135+
* The main goal of the shutdown is to stop all background threads associated with the coroutines
136+
* framework in order to make kotlinx.coroutines classes unloadable by Java Virtual Machine.
137+
* It is only recommended to be used in containerized environments (OSGi, Gradle plugins system,
138+
* IDEA plugins) at the end of the container lifecycle.
139+
*/
140+
@DelicateCoroutinesApi
141+
public fun shutdown() {
142+
DefaultExecutor.shutdown()
143+
// Also shuts down Dispatchers.IO
144+
DefaultScheduler.shutdown()
145+
}
119146
}

kotlinx-coroutines-core/jvm/src/EventLoop.kt

+1-2
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,7 @@ internal actual abstract class EventLoopImplPlatform: EventLoop() {
1313
unpark(thread)
1414
}
1515

16-
protected actual fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) {
17-
assert { this !== DefaultExecutor } // otherwise default execution was shutdown with tasks in it (cannot be)
16+
protected actual open fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) {
1817
DefaultExecutor.schedule(now, delayedTask)
1918
}
2019
}

kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt

+7
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,20 @@ import kotlin.coroutines.*
1414
* Default instance of coroutine dispatcher.
1515
*/
1616
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
17+
@JvmField
1718
val IO: CoroutineDispatcher = LimitingDispatcher(
1819
this,
1920
systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
2021
"Dispatchers.IO",
2122
TASK_PROBABLY_BLOCKING
2223
)
2324

25+
// Shuts down the dispatcher, used only by Dispatchers.shutdown()
26+
internal fun shutdown() {
27+
super.close()
28+
}
29+
30+
// Overridden in case anyone writes (Dispatchers.Default as ExecutorCoroutineDispatcher).close()
2431
override fun close() {
2532
throw UnsupportedOperationException("$DEFAULT_DISPATCHER_NAME cannot be closed")
2633
}

kotlinx-coroutines-core/jvm/test/TestBase.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ public actual open class TestBase(private var disableOutCheck: Boolean) {
204204

205205
fun shutdownPoolsAfterTest() {
206206
DefaultScheduler.shutdown(SHUTDOWN_TIMEOUT)
207-
DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT)
207+
DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT)
208208
DefaultScheduler.restore()
209209
}
210210

kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@ import java.util.concurrent.locks.*
1111
private const val SHUTDOWN_TIMEOUT = 1000L
1212

1313
internal inline fun withVirtualTimeSource(log: PrintStream? = null, block: () -> Unit) {
14-
DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT) // shutdown execution with old time source (in case it was working)
14+
DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) // shutdown execution with old time source (in case it was working)
1515
val testTimeSource = VirtualTimeSource(log)
1616
timeSource = testTimeSource
1717
DefaultExecutor.ensureStarted() // should start with new time source
1818
try {
1919
block()
2020
} finally {
21-
DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT)
21+
DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT)
2222
testTimeSource.shutdown()
2323
timeSource = null // restore time source
2424
}

kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ fun <R> test(name: String, block: () -> R): List<String> = outputException(name)
2727
try {
2828
captureOutput(name, stdoutEnabled = OUT_ENABLED) { log ->
2929
DefaultScheduler.usePrivateScheduler()
30-
DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT)
30+
DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT)
3131
resetCoroutineId()
3232
val threadsBefore = currentThreads()
3333
try {
@@ -40,7 +40,7 @@ fun <R> test(name: String, block: () -> R): List<String> = outputException(name)
4040
log.println("--- shutting down")
4141
DefaultScheduler.shutdown(SHUTDOWN_TIMEOUT)
4242
shutdownDispatcherPools(SHUTDOWN_TIMEOUT)
43-
DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT) // the last man standing -- cleanup all pending tasks
43+
DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) // the last man standing -- cleanup all pending tasks
4444
}
4545
checkTestThreads(threadsBefore) // check thread if the main completed successfully
4646
}

0 commit comments

Comments
 (0)