Skip to content

Commit 20341f2

Browse files
authored
Cancel current Job on RejectedExecutionException (#2012)
When the Executor that was used with Executor.asCoroutineDispatcher() extension rejects the submitted task, it means that it had reached its capacity and so the executing current Job should be canceled to terminate it as soon as possible. This way RejectedExecutionException works as a rate-limiter just like it serves this purpose in executor-based Java code. Fixes #2003
1 parent b82439e commit 20341f2

33 files changed

+282
-60
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,13 +257,13 @@ public final class kotlinx/coroutines/Deferred$DefaultImpls {
257257

258258
public abstract interface class kotlinx/coroutines/Delay {
259259
public abstract fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
260-
public abstract fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle;
260+
public abstract fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle;
261261
public abstract fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V
262262
}
263263

264264
public final class kotlinx/coroutines/Delay$DefaultImpls {
265265
public static fun delay (Lkotlinx/coroutines/Delay;JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
266-
public static fun invokeOnTimeout (Lkotlinx/coroutines/Delay;JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle;
266+
public static fun invokeOnTimeout (Lkotlinx/coroutines/Delay;JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle;
267267
}
268268

269269
public final class kotlinx/coroutines/DelayKt {

kotlinx-coroutines-core/common/src/Delay.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ public interface Delay {
5454
*
5555
* This implementation uses a built-in single-threaded scheduled executor service.
5656
*/
57-
public fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle =
58-
DefaultDelay.invokeOnTimeout(timeMillis, block)
57+
public fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
58+
DefaultDelay.invokeOnTimeout(timeMillis, block, context)
5959
}
6060

6161
/**

kotlinx-coroutines-core/common/src/Timeout.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ private fun <U, T: U> setupTimeout(
142142
// schedule cancellation of this coroutine on time
143143
val cont = coroutine.uCont
144144
val context = cont.context
145-
coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine))
145+
coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine, coroutine.context))
146146
// restart the block using a new coroutine with a new job,
147147
// however, start it undispatched, because we already are in the proper context
148148
return coroutine.startUndispatchedOrReturnIgnoreTimeout(coroutine, block)

kotlinx-coroutines-core/common/src/selects/Select.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -655,7 +655,7 @@ internal class SelectBuilderImpl<in R>(
655655
if (trySelect())
656656
block.startCoroutineCancellable(completion) // shall be cancellable while waits for dispatch
657657
}
658-
disposeOnSelect(context.delay.invokeOnTimeout(timeMillis, action))
658+
disposeOnSelect(context.delay.invokeOnTimeout(timeMillis, action, context))
659659
}
660660

661661
private class DisposeNode(

kotlinx-coroutines-core/common/test/flow/VirtualTime.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines
@@ -50,7 +50,7 @@ private class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : CoroutineD
5050
@ExperimentalCoroutinesApi
5151
override fun isDispatchNeeded(context: CoroutineContext): Boolean = originalDispatcher.isDispatchNeeded(context)
5252

53-
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
53+
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
5454
val task = TimedTask(block, currentTime + timeMillis)
5555
heap += task
5656
return task

kotlinx-coroutines-core/js/src/JSDispatcher.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ internal sealed class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay {
3535
messageQueue.enqueue(block)
3636
}
3737

38-
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
38+
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
3939
val handle = setTimeout({ block.run() }, delayToInt(timeMillis))
4040
return ClearTimeout(handle)
4141
}
@@ -81,7 +81,7 @@ internal class WindowDispatcher(private val window: Window) : CoroutineDispatche
8181
window.setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis))
8282
}
8383

84-
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
84+
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
8585
val handle = window.setTimeout({ block.run() }, delayToInt(timeMillis))
8686
return object : DisposableHandle {
8787
override fun dispose() {

kotlinx-coroutines-core/jvm/src/CommonPool.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ internal object CommonPool : ExecutorCoroutineDispatcher() {
103103
(pool ?: getOrCreatePoolSync()).execute(wrapTask(block))
104104
} catch (e: RejectedExecutionException) {
105105
unTrackTask()
106+
// CommonPool only rejects execution when it is being closed and this behavior is reserved
107+
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
106108
DefaultExecutor.enqueue(block)
107109
}
108110
}

kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package kotlinx.coroutines
66

77
import java.util.concurrent.*
8+
import kotlin.coroutines.*
89

910
internal actual val DefaultDelay: Delay = DefaultExecutor
1011

@@ -54,7 +55,7 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
5455
* Livelock is possible only if `runBlocking` is called on internal default executed (which is used by default [delay]),
5556
* but it's not exposed as public API.
5657
*/
57-
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle =
58+
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
5859
scheduleInvokeOnTimeout(timeMillis, block)
5960

6061
override fun run() {

kotlinx-coroutines-core/jvm/src/Executors.kt

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,25 @@ public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closea
3838

3939
/**
4040
* Converts an instance of [ExecutorService] to an implementation of [ExecutorCoroutineDispatcher].
41+
*
42+
* If the underlying executor throws [RejectedExecutionException] on
43+
* attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the
44+
* resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues),
45+
* then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the
46+
* [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete.
4147
*/
4248
@JvmName("from") // this is for a nice Java API, see issue #255
4349
public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher =
4450
ExecutorCoroutineDispatcherImpl(this)
4551

4652
/**
4753
* Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
54+
*
55+
* If the underlying executor throws [RejectedExecutionException] on
56+
* attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the
57+
* resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues),
58+
* then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the
59+
* [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete.
4860
*/
4961
@JvmName("from") // this is for a nice Java API, see issue #255
5062
public fun Executor.asCoroutineDispatcher(): CoroutineDispatcher =
@@ -82,7 +94,8 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa
8294
executor.execute(wrapTask(block))
8395
} catch (e: RejectedExecutionException) {
8496
unTrackTask()
85-
DefaultExecutor.enqueue(block)
97+
cancelJobOnRejection(context, e)
98+
Dispatchers.IO.dispatch(context, block)
8699
}
87100
}
88101

@@ -93,7 +106,7 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa
93106
*/
94107
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
95108
val future = if (removesFutureOnCancellation) {
96-
scheduleBlock(ResumeUndispatchedRunnable(this, continuation), timeMillis, TimeUnit.MILLISECONDS)
109+
scheduleBlock(ResumeUndispatchedRunnable(this, continuation), continuation.context, timeMillis)
97110
} else {
98111
null
99112
}
@@ -106,24 +119,31 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa
106119
DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation)
107120
}
108121

109-
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
122+
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
110123
val future = if (removesFutureOnCancellation) {
111-
scheduleBlock(block, timeMillis, TimeUnit.MILLISECONDS)
124+
scheduleBlock(block, context, timeMillis)
112125
} else {
113126
null
114127
}
115-
116-
return if (future != null ) DisposableFutureHandle(future) else DefaultExecutor.invokeOnTimeout(timeMillis, block)
128+
return when {
129+
future != null -> DisposableFutureHandle(future)
130+
else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context)
131+
}
117132
}
118133

119-
private fun scheduleBlock(block: Runnable, time: Long, unit: TimeUnit): ScheduledFuture<*>? {
134+
private fun scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? {
120135
return try {
121-
(executor as? ScheduledExecutorService)?.schedule(block, time, unit)
136+
(executor as? ScheduledExecutorService)?.schedule(block, timeMillis, TimeUnit.MILLISECONDS)
122137
} catch (e: RejectedExecutionException) {
138+
cancelJobOnRejection(context, e)
123139
null
124140
}
125141
}
126142

143+
private fun cancelJobOnRejection(context: CoroutineContext, exception: RejectedExecutionException) {
144+
context.cancel(CancellationException("The task was rejected", exception))
145+
}
146+
127147
override fun close() {
128148
(executor as? ExecutorService)?.shutdown()
129149
}

kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@ import kotlin.coroutines.*
1414
* **NOTE: The resulting [ExecutorCoroutineDispatcher] owns native resources (its thread).
1515
* Resources are reclaimed by [ExecutorCoroutineDispatcher.close].**
1616
*
17+
* If the resulting dispatcher is [closed][ExecutorCoroutineDispatcher.close] and
18+
* attempt to submit a continuation task is made,
19+
* then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the
20+
* [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete.
21+
*
1722
* **NOTE: This API will be replaced in the future**. A different API to create thread-limited thread pools
1823
* that is based on a shared thread-pool and does not require the resulting dispatcher to be explicitly closed
1924
* will be provided, thus avoiding potential thread leaks and also significantly improving performance, due
@@ -35,6 +40,11 @@ public fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher =
3540
* **NOTE: The resulting [ExecutorCoroutineDispatcher] owns native resources (its threads).
3641
* Resources are reclaimed by [ExecutorCoroutineDispatcher.close].**
3742
*
43+
* If the resulting dispatcher is [closed][ExecutorCoroutineDispatcher.close] and
44+
* attempt to submit a continuation task is made,
45+
* then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the
46+
* [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete.
47+
*
3848
* **NOTE: This API will be replaced in the future**. A different API to create thread-limited thread pools
3949
* that is based on a shared thread-pool and does not require the resulting dispatcher to be explicitly closed
4050
* will be provided, thus avoiding potential thread leaks and also significantly improving performance, due

kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -87,17 +87,14 @@ private class MissingMainCoroutineDispatcher(
8787

8888
override val immediate: MainCoroutineDispatcher get() = this
8989

90-
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
90+
override fun isDispatchNeeded(context: CoroutineContext): Boolean =
9191
missing()
92-
}
9392

94-
override suspend fun delay(time: Long) {
93+
override suspend fun delay(time: Long) =
9594
missing()
96-
}
9795

98-
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
96+
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
9997
missing()
100-
}
10198

10299
override fun dispatch(context: CoroutineContext, block: Runnable) =
103100
missing()

kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,17 @@ public open class ExperimentalCoroutineDispatcher(
6565
try {
6666
coroutineScheduler.dispatch(block)
6767
} catch (e: RejectedExecutionException) {
68+
// CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
69+
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
6870
DefaultExecutor.dispatch(context, block)
6971
}
7072

7173
override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
7274
try {
7375
coroutineScheduler.dispatch(block, tailDispatch = true)
7476
} catch (e: RejectedExecutionException) {
77+
// CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
78+
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
7579
DefaultExecutor.dispatchYield(context, block)
7680
}
7781

@@ -110,7 +114,9 @@ public open class ExperimentalCoroutineDispatcher(
110114
try {
111115
coroutineScheduler.dispatch(block, context, tailDispatch)
112116
} catch (e: RejectedExecutionException) {
113-
// Context shouldn't be lost here to properly invoke before/after task
117+
// CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
118+
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
119+
// TaskContext shouldn't be lost here to properly invoke before/after task
114120
DefaultExecutor.enqueue(coroutineScheduler.createTask(block, context))
115121
}
116122
}

kotlinx-coroutines-core/jvm/src/test_/TestCoroutineContext.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ public class TestCoroutineContext(private val name: String? = null) : CoroutineC
230230
}, timeMillis)
231231
}
232232

233-
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
233+
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
234234
val node = postDelayed(block, timeMillis)
235235
return object : DisposableHandle {
236236
override fun dispose() {

kotlinx-coroutines-core/jvm/test/ExecutorsTest.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines
@@ -29,6 +29,8 @@ class ExecutorsTest : TestBase() {
2929
val context = newFixedThreadPoolContext(2, "TestPool")
3030
runBlocking(context) {
3131
checkThreadName("TestPool")
32+
delay(10)
33+
checkThreadName("TestPool") // should dispatch on the right thread
3234
}
3335
context.close()
3436
}
@@ -38,6 +40,8 @@ class ExecutorsTest : TestBase() {
3840
val executor = Executors.newSingleThreadExecutor { r -> Thread(r, "TestExecutor") }
3941
runBlocking(executor.asCoroutineDispatcher()) {
4042
checkThreadName("TestExecutor")
43+
delay(10)
44+
checkThreadName("TestExecutor") // should dispatch on the right thread
4145
}
4246
executor.shutdown()
4347
}

kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines
@@ -15,8 +15,21 @@ import kotlin.test.*
1515
@RunWith(Parameterized::class)
1616
class FailingCoroutinesMachineryTest(
1717
private val element: CoroutineContext.Element,
18-
private val dispatcher: CoroutineDispatcher
18+
private val dispatcher: TestDispatcher
1919
) : TestBase() {
20+
class TestDispatcher(val name: String, val block: () -> CoroutineDispatcher) {
21+
private var _value: CoroutineDispatcher? = null
22+
23+
val value: CoroutineDispatcher
24+
get() = _value ?: block().also { _value = it }
25+
26+
override fun toString(): String = name
27+
28+
fun reset() {
29+
runCatching { (_value as? ExecutorCoroutineDispatcher)?.close() }
30+
_value = null
31+
}
32+
}
2033

2134
private var caught: Throwable? = null
2235
private val latch = CountDownLatch(1)
@@ -75,7 +88,7 @@ class FailingCoroutinesMachineryTest(
7588

7689
@After
7790
fun tearDown() {
78-
runCatching { (dispatcher as? ExecutorCoroutineDispatcher)?.close() }
91+
dispatcher.reset()
7992
if (lazyOuterDispatcher.isInitialized()) lazyOuterDispatcher.value.close()
8093
}
8194

@@ -84,14 +97,14 @@ class FailingCoroutinesMachineryTest(
8497
@Parameterized.Parameters(name = "Element: {0}, dispatcher: {1}")
8598
fun dispatchers(): List<Array<Any>> {
8699
val elements = listOf<Any>(FailingRestore, FailingUpdate)
87-
val dispatchers = listOf<Any>(
88-
Dispatchers.Unconfined,
89-
Dispatchers.Default,
90-
Executors.newFixedThreadPool(1).asCoroutineDispatcher(),
91-
Executors.newScheduledThreadPool(1).asCoroutineDispatcher(),
92-
ThrowingDispatcher, ThrowingDispatcher2
100+
val dispatchers = listOf<TestDispatcher>(
101+
TestDispatcher("Dispatchers.Unconfined") { Dispatchers.Unconfined },
102+
TestDispatcher("Dispatchers.Default") { Dispatchers.Default },
103+
TestDispatcher("Executors.newFixedThreadPool(1)") { Executors.newFixedThreadPool(1).asCoroutineDispatcher() },
104+
TestDispatcher("Executors.newScheduledThreadPool(1)") { Executors.newScheduledThreadPool(1).asCoroutineDispatcher() },
105+
TestDispatcher("ThrowingDispatcher") { ThrowingDispatcher },
106+
TestDispatcher("ThrowingDispatcher2") { ThrowingDispatcher2 }
93107
)
94-
95108
return elements.flatMap { element ->
96109
dispatchers.map { dispatcher ->
97110
arrayOf(element, dispatcher)
@@ -102,13 +115,13 @@ class FailingCoroutinesMachineryTest(
102115

103116
@Test
104117
fun testElement() = runTest {
105-
launch(NonCancellable + dispatcher + exceptionHandler + element) {}
118+
launch(NonCancellable + dispatcher.value + exceptionHandler + element) {}
106119
checkException()
107120
}
108121

109122
@Test
110123
fun testNestedElement() = runTest {
111-
launch(NonCancellable + dispatcher + exceptionHandler) {
124+
launch(NonCancellable + dispatcher.value + exceptionHandler) {
112125
launch(element) { }
113126
}
114127
checkException()
@@ -117,7 +130,7 @@ class FailingCoroutinesMachineryTest(
117130
@Test
118131
fun testNestedDispatcherAndElement() = runTest {
119132
launch(lazyOuterDispatcher.value + NonCancellable + exceptionHandler) {
120-
launch(element + dispatcher) { }
133+
launch(element + dispatcher.value) { }
121134
}
122135
checkException()
123136
}

0 commit comments

Comments
 (0)