Skip to content

Commit 638a97c

Browse files
committed
Do not use DefaultExecutor for cleanup work
Before this change, DefaultExecutor was occasionally used for executing the work of dispatchers that no longer function. This is no longer the case: instead, Dispatchers.IO is used for that on our multithreaded targets.
1 parent d3f1f23 commit 638a97c

20 files changed

+114
-96
lines changed

kotlinx-coroutines-core/common/src/Dispatchers.common.kt

+24
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,27 @@ public expect object Dispatchers {
7171
*/
7272
public val Unconfined: CoroutineDispatcher
7373
}
74+
75+
/**
76+
* If a task can no longer run because its dispatcher is closed, it is rescheduled to another dispatcher.
77+
*
78+
* This is required to avoid a situation where some finalizers do not run:
79+
* ```
80+
* val dispatcher = newSingleThreadContext("test")
81+
* launch(dispatcher) {
82+
* val resource = Resource()
83+
* try {
84+
* // do something `suspending` with resource
85+
* } finally {
86+
* resource.close()
87+
* }
88+
* }
89+
* dispatcher.close()
90+
* ```
91+
*
92+
* `close` needs to run somewhere, but it can't run on the closed dispatcher.
93+
*
94+
* On the JVM and Native, we reschedule to the thread pool backing `Dispatchers.IO`,
95+
* because an arbitrary task may well have blocking behavior.
96+
*/
97+
internal expect fun rescheduleTaskFromClosedDispatcher(task: Runnable)

kotlinx-coroutines-core/common/src/EventLoop.common.kt

+9-8
Original file line numberDiff line numberDiff line change
@@ -169,9 +169,6 @@ private typealias Queue<T> = LockFreeTaskQueueCore<T>
169169
internal expect abstract class EventLoopImplPlatform() : EventLoop {
170170
// Called to unpark this event loop's thread
171171
protected fun unpark()
172-
173-
// Called to reschedule to DefaultExecutor when this event loop is complete
174-
protected fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask)
175172
}
176173

177174
internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
@@ -275,7 +272,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
275272
// todo: we should unpark only when this delayed task became first in the queue
276273
unpark()
277274
} else {
278-
DefaultExecutor.enqueue(task)
275+
rescheduleTaskFromClosedDispatcher(task)
279276
}
280277
}
281278

@@ -408,6 +405,14 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
408405
}
409406
}
410407

408+
// Called to reschedule when this event loop is complete
409+
protected open fun reschedule(now: Long, delayedTask: DelayedTask) {
410+
val delayTimeMillis = delayNanosToMillis(delayedTask.nanoTime - now)
411+
DefaultDelay.invokeOnTimeout(delayTimeMillis, Runnable {
412+
rescheduleTaskFromClosedDispatcher(delayedTask)
413+
}, EmptyCoroutineContext)
414+
}
415+
411416
internal abstract class DelayedTask(
412417
/**
413418
* This field can be only modified in [scheduleTask] before putting this DelayedTask
@@ -530,10 +535,6 @@ internal expect fun createEventLoop(): EventLoop
530535

531536
internal expect fun nanoTime(): Long
532537

533-
internal expect object DefaultExecutor {
534-
fun enqueue(task: Runnable)
535-
}
536-
537538
/**
538539
* Used by Darwin targets to wrap a [Runnable.run] call in an Objective-C Autorelease Pool. It is a no-op on JVM, JS and
539540
* non-Darwin native targets.

kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt

+3
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,8 @@ import kotlin.coroutines.*
2020
*
2121
* Here, instead of releasing the thread on which `loadConfiguration` runs if `fetchConfigurationData` suspends, it will
2222
* block, potentially leading to thread starvation issues.
23+
*
24+
* If new tasks are submitted to the dispatcher created by [runBlocking] after this function returns,
25+
* they are resubmitted to [Dispatchers.IO].
2326
*/
2427
public expect fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T

kotlinx-coroutines-core/concurrent/src/Dispatchers.kt renamed to kotlinx-coroutines-core/concurrent/src/Dispatchers.concurrent.kt

+23-1
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,26 @@ package kotlinx.coroutines
3939
@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
4040
public expect val Dispatchers.IO: CoroutineDispatcher
4141

42-
42+
internal actual fun rescheduleTaskFromClosedDispatcher(task: Runnable) {
43+
/**
44+
* We do not create a separate view of [Dispatchers.IO] for the cleanup needs.
45+
*
46+
* If [Dispatchers.IO] is not flooded with other tasks + the cleanup view does not have more threads than
47+
* [Dispatchers.IO], there is no difference between sending cleanup tasks to [Dispatchers.IO] and creating
48+
* a separate view of [Dispatchers.IO] for cleanup.
49+
*
50+
* If [Dispatchers.IO] is flooded with other tasks, we are at a crossroads:
51+
* - On the one hand, we do not want to create too many threads.
52+
* Some clients are carefully monitoring the number of threads and are relying on it not being larger than
53+
* the system property + the sum of explicit `limitedParallelism` arguments in the system.
54+
* - On the other hand, we don't want to delay productive tasks in favor of cleanup tasks.
55+
*
56+
* The first consideration wins on two accounts:
57+
* - As of writing this, [Dispatchers.IO] has been in use as the cleanup dispatcher for dispatchers obtained
58+
* from JVM executors for years, and this has not caused any issues that we know of.
59+
* - On the other hand, some people likely rely on the number of threads not exceeding the number they control.
60+
* If we were to create a separate view of [Dispatchers.IO] for cleanup, this number would be exceeded, which
61+
* is a regression.
62+
*/
63+
Dispatchers.IO.dispatch(Dispatchers.IO, task)
64+
}

kotlinx-coroutines-core/jsAndWasmJsShared/src/EventLoop.kt

+4-5
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,13 @@ internal class UnconfinedEventLoop : EventLoop() {
1212

1313
internal actual abstract class EventLoopImplPlatform : EventLoop() {
1414
protected actual fun unpark(): Unit = unsupported()
15-
protected actual fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask): Unit = unsupported()
16-
}
17-
18-
internal actual object DefaultExecutor {
19-
public actual fun enqueue(task: Runnable): Unit = unsupported()
2015
}
2116

2217
private fun unsupported(): Nothing =
2318
throw UnsupportedOperationException("runBlocking event loop is not supported")
2419

2520
internal actual inline fun platformAutoreleasePool(crossinline block: () -> Unit) = block()
21+
22+
internal actual fun rescheduleTaskFromClosedDispatcher(task: Runnable) {
23+
Dispatchers.Default.dispatch(Dispatchers.Default, task)
24+
}

kotlinx-coroutines-core/jvm/src/Builders.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
package kotlinx.coroutines
77

8-
import java.util.concurrent.locks.*
98
import kotlin.contracts.*
109
import kotlin.coroutines.*
1110

@@ -39,6 +38,9 @@ import kotlin.coroutines.*
3938
* If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and
4039
* this `runBlocking` invocation throws [InterruptedException].
4140
*
41+
* If new tasks are submitted to the dispatcher created by [runBlocking] after this function returns,
42+
* they are resubmitted to [Dispatchers.IO].
43+
*
4244
* See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available
4345
* for a newly created coroutine.
4446
*

kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt

+15-6
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,27 @@ internal actual val DefaultDelay: Delay = initializeDefaultDelay()
1111

1212
private fun initializeDefaultDelay(): Delay {
1313
// Opt-out flag
14-
if (!defaultMainDelayOptIn) return DefaultExecutor
14+
if (!defaultMainDelayOptIn) return DefaultDelayImpl
1515
val main = Dispatchers.Main
1616
/*
1717
* When we already are working with UI and Main threads, it makes
1818
* no sense to create a separate thread with timer that cannot be controller
1919
* by the UI runtime.
2020
*/
21-
return if (main.isMissing() || main !is Delay) DefaultExecutor else main
21+
return if (main.isMissing() || main !is Delay) DefaultDelayImpl else main
2222
}
2323

24-
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
25-
internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
24+
internal object DefaultExecutor {
25+
fun shutdown() = DefaultDelayImpl.shutdown()
26+
27+
fun ensureStarted() = DefaultDelayImpl.ensureStarted()
28+
29+
fun shutdownForTests(timeout: Long) = DefaultDelayImpl.shutdownForTests(timeout)
30+
31+
val isThreadPresent: Boolean get() = DefaultDelayImpl.isThreadPresent
32+
}
33+
34+
private object DefaultDelayImpl : EventLoopImplBase(), Runnable {
2635
const val THREAD_NAME = "kotlinx.coroutines.DefaultExecutor"
2736

2837
init {
@@ -61,7 +70,7 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
6170
return debugStatus == SHUTDOWN_REQ || debugStatus == SHUTDOWN_ACK
6271
}
6372

64-
actual override fun enqueue(task: Runnable) {
73+
override fun enqueue(task: Runnable) {
6574
if (isShutDown) shutdownError()
6675
super.enqueue(task)
6776
}
@@ -137,7 +146,7 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
137146
* the singleton itself instead of using parent' thread one
138147
* in order not to accidentally capture temporary application classloader.
139148
*/
140-
contextClassLoader = this@DefaultExecutor.javaClass.classLoader
149+
contextClassLoader = this@DefaultDelayImpl.javaClass.classLoader
141150
isDaemon = true
142151
start()
143152
}

kotlinx-coroutines-core/jvm/src/Dispatchers.kt

-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package kotlinx.coroutines
22

33
import kotlinx.coroutines.internal.*
44
import kotlinx.coroutines.scheduling.*
5-
import kotlin.coroutines.*
65

76
/**
87
* Name of the property that defines the maximal number of threads that are used by [Dispatchers.IO] coroutines dispatcher.

kotlinx-coroutines-core/jvm/src/EventLoop.kt

+1-4
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package kotlinx.coroutines
33
import kotlinx.coroutines.Runnable
44
import kotlinx.coroutines.scheduling.*
55
import kotlinx.coroutines.scheduling.CoroutineScheduler
6+
import kotlin.coroutines.EmptyCoroutineContext
67

78
internal actual abstract class EventLoopImplPlatform: EventLoop() {
89

@@ -14,9 +15,6 @@ internal actual abstract class EventLoopImplPlatform: EventLoop() {
1415
unpark(thread)
1516
}
1617

17-
protected actual open fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) {
18-
DefaultExecutor.schedule(now, delayedTask)
19-
}
2018
}
2119

2220
internal class BlockingEventLoop(
@@ -122,4 +120,3 @@ internal fun Thread.isIoDispatcherThread(): Boolean {
122120
if (this !is CoroutineScheduler.Worker) return false
123121
return isIo()
124122
}
125-

kotlinx-coroutines-core/jvm/src/Executors.kt

+12-4
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor)
131131
} catch (e: RejectedExecutionException) {
132132
unTrackTask()
133133
cancelJobOnRejection(context, e)
134-
Dispatchers.IO.dispatch(context, block)
134+
rescheduleTaskFromClosedDispatcher(block)
135135
}
136136
}
137137

@@ -146,15 +146,15 @@ internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor)
146146
continuation.invokeOnCancellation(CancelFutureOnCancel(future))
147147
return
148148
}
149-
// Otherwise fallback to default executor
150-
DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation)
149+
// Otherwise fallback to default delay
150+
DefaultDelay.scheduleResumeAfterDelay(timeMillis, continuation)
151151
}
152152

153153
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
154154
val future = (executor as? ScheduledExecutorService)?.scheduleBlock(block, context, timeMillis)
155155
return when {
156156
future != null -> DisposableFutureHandle(future)
157-
else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context)
157+
else -> DefaultDelay.invokeOnTimeout(timeMillis, block, context)
158158
}
159159
}
160160

@@ -189,6 +189,14 @@ private class ResumeUndispatchedRunnable(
189189
}
190190
}
191191

192+
private class ResumeDispatchedRunnable(
193+
private val continuation: CancellableContinuation<Unit>
194+
) : Runnable {
195+
override fun run() {
196+
continuation.resume(Unit)
197+
}
198+
}
199+
192200
/**
193201
* An implementation of [DisposableHandle] that cancels the specified future on dispose.
194202
* @suppress **This is unstable API and it is subject to change.**

kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ class DefaultExecutorStressTest : TestBase() {
88
@Test
99
fun testDelay() = runTest {
1010
val iterations = 100_000 * stressTestMultiplier
11-
withContext(DefaultExecutor) {
11+
withContext(DefaultDelay as CoroutineDispatcher) {
1212
expect(1)
1313
var expected = 1
1414
repeat(iterations) {

kotlinx-coroutines-core/jvm/test/EventLoopsTest.kt

+1-20
Original file line numberDiff line numberDiff line change
@@ -49,25 +49,6 @@ class EventLoopsTest : TestBase() {
4949
finish(5)
5050
}
5151

52-
@Test
53-
fun testEventLoopInDefaultExecutor() = runTest {
54-
expect(1)
55-
withContext(Dispatchers.Unconfined) {
56-
delay(1)
57-
assertTrue(Thread.currentThread().name.startsWith(DefaultExecutor.THREAD_NAME))
58-
expect(2)
59-
// now runBlocking inside default executor thread --> should use outer event loop
60-
DefaultExecutor.enqueue(Runnable {
61-
expect(4) // will execute when runBlocking runs loop
62-
})
63-
expect(3)
64-
runBlocking {
65-
expect(5)
66-
}
67-
}
68-
finish(6)
69-
}
70-
7152
/**
7253
* Simple test for [processNextEventInCurrentThread] API use-case.
7354
*/
@@ -159,4 +140,4 @@ class EventLoopsTest : TestBase() {
159140
waitingThread.value = null
160141
}
161142
}
162-
}
143+
}

kotlinx-coroutines-core/jvm/test/RunBlockingJvmTest.kt

-1
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,3 @@ class RunBlockingJvmTest : TestBase() {
1313
rb.hashCode() // unused
1414
}
1515
}
16-

kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt

+2-3
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,8 @@ fun <R> test(name: String, block: () -> R): List<String> = outputException(name)
3535
} finally {
3636
// the shutdown
3737
log.println("--- shutting down")
38-
DefaultScheduler.shutdown(SHUTDOWN_TIMEOUT)
3938
shutdownDispatcherPools(SHUTDOWN_TIMEOUT)
40-
DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) // the last man standing -- cleanup all pending tasks
39+
DefaultScheduler.shutdown(SHUTDOWN_TIMEOUT) // the last man standing -- cleanup all pending tasks
4140
}
4241
checkTestThreads(threadsBefore) // check thread if the main completed successfully
4342
}
@@ -55,7 +54,7 @@ private fun shutdownDispatcherPools(timeout: Long) {
5554
(thread.dispatcher.executor as ExecutorService).apply {
5655
shutdown()
5756
awaitTermination(timeout, TimeUnit.MILLISECONDS)
58-
shutdownNow().forEach { DefaultExecutor.enqueue(it) }
57+
shutdownNow().forEach { rescheduleTaskFromClosedDispatcher(it) }
5958
}
6059
}
6160
}

kotlinx-coroutines-core/native/src/Builders.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ import kotlin.native.concurrent.*
3535
* the specified dispatcher while the current thread is blocked. If the specified dispatcher is an event loop of another `runBlocking`,
3636
* then this invocation uses the outer event loop.
3737
*
38-
* If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and
39-
* this `runBlocking` invocation throws [InterruptedException].
38+
* If new tasks are submitted to the dispatcher created by [runBlocking] after this function returns,
39+
* they are resubmitted to [Dispatchers.IO].
4040
*
4141
* See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available
4242
* for a newly created coroutine.

kotlinx-coroutines-core/native/src/CoroutineContext.kt

+2-23
Original file line numberDiff line numberDiff line change
@@ -3,32 +3,11 @@ package kotlinx.coroutines
33
import kotlinx.coroutines.internal.*
44
import kotlin.coroutines.*
55

6-
internal actual object DefaultExecutor : CoroutineDispatcher(), Delay {
7-
8-
private val delegate = WorkerDispatcher(name = "DefaultExecutor")
9-
10-
override fun dispatch(context: CoroutineContext, block: Runnable) {
11-
delegate.dispatch(context, block)
12-
}
13-
14-
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
15-
delegate.scheduleResumeAfterDelay(timeMillis, continuation)
16-
}
17-
18-
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
19-
return delegate.invokeOnTimeout(timeMillis, block, context)
20-
}
21-
22-
actual fun enqueue(task: Runnable): Unit {
23-
delegate.dispatch(EmptyCoroutineContext, task)
24-
}
25-
}
6+
@PublishedApi
7+
internal actual val DefaultDelay: Delay = WorkerDispatcher(name = "DefaultDelay")
268

279
internal expect fun createDefaultDispatcher(): CoroutineDispatcher
2810

29-
@PublishedApi
30-
internal actual val DefaultDelay: Delay = DefaultExecutor
31-
3211
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
3312
val combined = coroutineContext + context
3413
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)

kotlinx-coroutines-core/native/src/Dispatchers.kt

-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package kotlinx.coroutines
22

3-
import kotlinx.coroutines.internal.*
43
import kotlin.coroutines.*
54

65

0 commit comments

Comments
 (0)