Skip to content

Commit b21cb62

Browse files
committed
Cancel current Job on RejectedExecutionException
When the Executor that was used with Executor.asCoroutineDispatcher() extension rejects submitted task, it means that it had reached its capacity and so the executing current Job should be cancelled to terminate it as soon as possible. This way RejectedExecutionException works as a rate-limiter just like it serves this purpose in executor-based Java code. Fixes #2003
1 parent 32feba8 commit b21cb62

File tree

28 files changed

+175
-42
lines changed

28 files changed

+175
-42
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+2-2
Original file line numberDiff line numberDiff line change
@@ -256,13 +256,13 @@ public final class kotlinx/coroutines/Deferred$DefaultImpls {
256256

257257
public abstract interface class kotlinx/coroutines/Delay {
258258
public abstract fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
259-
public abstract fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle;
259+
public abstract fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle;
260260
public abstract fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V
261261
}
262262

263263
public final class kotlinx/coroutines/Delay$DefaultImpls {
264264
public static fun delay (Lkotlinx/coroutines/Delay;JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
265-
public static fun invokeOnTimeout (Lkotlinx/coroutines/Delay;JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle;
265+
public static fun invokeOnTimeout (Lkotlinx/coroutines/Delay;JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle;
266266
}
267267

268268
public final class kotlinx/coroutines/DelayKt {

kotlinx-coroutines-core/common/src/Delay.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ public interface Delay {
5454
*
5555
* This implementation uses a built-in single-threaded scheduled executor service.
5656
*/
57-
public fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle =
58-
DefaultDelay.invokeOnTimeout(timeMillis, block)
57+
public fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
58+
DefaultDelay.invokeOnTimeout(timeMillis, block, context)
5959
}
6060

6161
/**

kotlinx-coroutines-core/common/src/Timeout.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ private fun <U, T: U> setupTimeout(
105105
// schedule cancellation of this coroutine on time
106106
val cont = coroutine.uCont
107107
val context = cont.context
108-
coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine))
108+
coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine, coroutine.context))
109109
// restart the block using a new coroutine with a new job,
110110
// however, start it undispatched, because we already are in the proper context
111111
return coroutine.startUndispatchedOrReturnIgnoreTimeout(coroutine, block)

kotlinx-coroutines-core/common/src/selects/Select.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -649,7 +649,7 @@ internal class SelectBuilderImpl<in R>(
649649
if (trySelect())
650650
block.startCoroutineCancellable(completion) // shall be cancellable while waits for dispatch
651651
}
652-
disposeOnSelect(context.delay.invokeOnTimeout(timeMillis, action))
652+
disposeOnSelect(context.delay.invokeOnTimeout(timeMillis, action, context))
653653
}
654654

655655
private class DisposeNode(

kotlinx-coroutines-core/common/test/flow/VirtualTime.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines
@@ -50,7 +50,7 @@ private class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : CoroutineD
5050
@ExperimentalCoroutinesApi
5151
override fun isDispatchNeeded(context: CoroutineContext): Boolean = originalDispatcher.isDispatchNeeded(context)
5252

53-
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
53+
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
5454
val task = TimedTask(block, currentTime + timeMillis)
5555
heap += task
5656
return task

kotlinx-coroutines-core/jvm/src/CommonPool.kt

+2
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ internal object CommonPool : ExecutorCoroutineDispatcher() {
103103
(pool ?: getOrCreatePoolSync()).execute(wrapTask(block))
104104
} catch (e: RejectedExecutionException) {
105105
unTrackTask()
106+
// CommonPool only rejects execution when it is being closed and this behavior is reserved
107+
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
106108
DefaultExecutor.enqueue(block)
107109
}
108110
}

kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package kotlinx.coroutines
66

77
import java.util.concurrent.*
8+
import kotlin.coroutines.*
89

910
internal actual val DefaultDelay: Delay = DefaultExecutor
1011

@@ -54,7 +55,7 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
5455
* Livelock is possible only if `runBlocking` is called on internal default executed (which is used by default [delay]),
5556
* but it's not exposed as public API.
5657
*/
57-
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle =
58+
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
5859
scheduleInvokeOnTimeout(timeMillis, block)
5960

6061
override fun run() {

kotlinx-coroutines-core/jvm/src/Executors.kt

+15-7
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa
8282
executor.execute(wrapTask(block))
8383
} catch (e: RejectedExecutionException) {
8484
unTrackTask()
85+
cancelJobOnRejection(context, e)
8586
DefaultExecutor.enqueue(block)
8687
}
8788
}
@@ -93,7 +94,7 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa
9394
*/
9495
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
9596
val future = if (removesFutureOnCancellation) {
96-
scheduleBlock(ResumeUndispatchedRunnable(this, continuation), timeMillis, TimeUnit.MILLISECONDS)
97+
scheduleBlock(ResumeUndispatchedRunnable(this, continuation), continuation.context, timeMillis)
9798
} else {
9899
null
99100
}
@@ -106,24 +107,31 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa
106107
DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation)
107108
}
108109

109-
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
110+
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
110111
val future = if (removesFutureOnCancellation) {
111-
scheduleBlock(block, timeMillis, TimeUnit.MILLISECONDS)
112+
scheduleBlock(block, context, timeMillis)
112113
} else {
113114
null
114115
}
115-
116-
return if (future != null ) DisposableFutureHandle(future) else DefaultExecutor.invokeOnTimeout(timeMillis, block)
116+
return when {
117+
future != null -> DisposableFutureHandle(future)
118+
else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context)
119+
}
117120
}
118121

119-
private fun scheduleBlock(block: Runnable, time: Long, unit: TimeUnit): ScheduledFuture<*>? {
122+
private fun scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? {
120123
return try {
121-
(executor as? ScheduledExecutorService)?.schedule(block, time, unit)
124+
(executor as? ScheduledExecutorService)?.schedule(block, timeMillis, TimeUnit.MILLISECONDS)
122125
} catch (e: RejectedExecutionException) {
126+
cancelJobOnRejection(context, e)
123127
null
124128
}
125129
}
126130

131+
private fun cancelJobOnRejection(context: CoroutineContext, exception: RejectedExecutionException) {
132+
context[Job]?.cancel(CancellationException("The task was rejected", exception))
133+
}
134+
127135
override fun close() {
128136
(executor as? ExecutorService)?.shutdown()
129137
}

kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt

+3-6
Original file line numberDiff line numberDiff line change
@@ -87,17 +87,14 @@ private class MissingMainCoroutineDispatcher(
8787

8888
override val immediate: MainCoroutineDispatcher get() = this
8989

90-
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
90+
override fun isDispatchNeeded(context: CoroutineContext): Boolean =
9191
missing()
92-
}
9392

94-
override suspend fun delay(time: Long) {
93+
override suspend fun delay(time: Long) =
9594
missing()
96-
}
9795

98-
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
96+
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
9997
missing()
100-
}
10198

10299
override fun dispatch(context: CoroutineContext, block: Runnable) =
103100
missing()

kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt

+7-1
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,17 @@ public open class ExperimentalCoroutineDispatcher(
6060
try {
6161
coroutineScheduler.dispatch(block)
6262
} catch (e: RejectedExecutionException) {
63+
// CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
64+
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
6365
DefaultExecutor.dispatch(context, block)
6466
}
6567

6668
override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
6769
try {
6870
coroutineScheduler.dispatch(block, tailDispatch = true)
6971
} catch (e: RejectedExecutionException) {
72+
// CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
73+
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
7074
DefaultExecutor.dispatchYield(context, block)
7175
}
7276

@@ -105,7 +109,9 @@ public open class ExperimentalCoroutineDispatcher(
105109
try {
106110
coroutineScheduler.dispatch(block, context, tailDispatch)
107111
} catch (e: RejectedExecutionException) {
108-
// Context shouldn't be lost here to properly invoke before/after task
112+
// CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
113+
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
114+
// TaskContext shouldn't be lost here to properly invoke before/after task
109115
DefaultExecutor.enqueue(coroutineScheduler.createTask(block, context))
110116
}
111117
}

kotlinx-coroutines-core/jvm/src/test_/TestCoroutineContext.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ public class TestCoroutineContext(private val name: String? = null) : CoroutineC
230230
}, timeMillis)
231231
}
232232

233-
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
233+
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
234234
val node = postDelayed(block, timeMillis)
235235
return object : DisposableHandle {
236236
override fun dispose() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import org.junit.*
8+
import org.junit.Test
9+
import java.util.concurrent.*
10+
import kotlin.test.*
11+
12+
class RejectedExecutionTest : TestBase() {
13+
private val threadName = "RejectedExecutionTest"
14+
private val executor = RejectingExecutor()
15+
16+
@After
17+
fun tearDown() {
18+
executor.shutdown()
19+
executor.awaitTermination(10, TimeUnit.SECONDS)
20+
}
21+
22+
@Test
23+
fun testRejectOnLaunch() = runTest {
24+
expect(1)
25+
val job = launch(executor.asCoroutineDispatcher()) {
26+
expectUnreached()
27+
}
28+
assertEquals(1, executor.submittedTasks)
29+
assertTrue(job.isCancelled)
30+
finish(2)
31+
}
32+
33+
@Test
34+
fun testRejectOnLaunchAtomic() = runTest {
35+
expect(1)
36+
val job = launch(executor.asCoroutineDispatcher(), start = CoroutineStart.ATOMIC) {
37+
expect(2)
38+
assertEquals(true, coroutineContext[Job]?.isCancelled)
39+
assertNotSame(threadName, Thread.currentThread().name) // should have got dispatched on the DefaultExecutor
40+
}
41+
assertEquals(1, executor.submittedTasks)
42+
job.join()
43+
finish(3)
44+
}
45+
46+
@Test
47+
fun testRejectOnWithContext() = runTest {
48+
expect(1)
49+
assertFailsWith<CancellationException> {
50+
withContext(executor.asCoroutineDispatcher()) {
51+
expectUnreached()
52+
}
53+
}
54+
assertEquals(1, executor.submittedTasks)
55+
finish(2)
56+
}
57+
58+
@Test
59+
fun testRejectOnResumeInContext() = runTest {
60+
expect(1)
61+
executor.acceptTasks = 1 // accept one task
62+
assertFailsWith<CancellationException> {
63+
withContext(executor.asCoroutineDispatcher()) {
64+
expect(2)
65+
withContext(Dispatchers.Default) {
66+
expect(3)
67+
}
68+
// cancelled on resume back
69+
expectUnreached()
70+
}
71+
}
72+
assertEquals(2, executor.submittedTasks)
73+
finish(4)
74+
}
75+
76+
@Test
77+
fun testRejectOnDelay() = runTest {
78+
expect(1)
79+
executor.acceptTasks = 1 // accept one task
80+
assertFailsWith<CancellationException> {
81+
withContext(executor.asCoroutineDispatcher()) {
82+
expect(2)
83+
delay(10) // cancelled
84+
expectUnreached()
85+
}
86+
}
87+
assertEquals(2, executor.submittedTasks)
88+
finish(3)
89+
}
90+
91+
@Test
92+
fun testRejectWithTimeout() = runTest {
93+
expect(1)
94+
executor.acceptTasks = 1 // accept one task
95+
assertFailsWith<CancellationException> {
96+
withContext(executor.asCoroutineDispatcher()) {
97+
expect(2)
98+
withTimeout(1000) {
99+
expect(3) // atomic entry into the block (legacy behavior, it seem to be Ok with way)
100+
assertEquals(true, coroutineContext[Job]?.isCancelled) // but the job is already cancelled
101+
}
102+
expectUnreached()
103+
}
104+
}
105+
assertEquals(2, executor.submittedTasks)
106+
finish(4)
107+
}
108+
109+
private inner class RejectingExecutor : ScheduledThreadPoolExecutor(1, { r -> Thread(r, threadName) }) {
110+
var acceptTasks = 0
111+
var submittedTasks = 0
112+
113+
override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> {
114+
submittedTasks++
115+
if (submittedTasks > acceptTasks) throw RejectedExecutionException()
116+
return super.schedule(command, delay, unit)
117+
}
118+
}
119+
}

kotlinx-coroutines-core/native/src/CoroutineContext.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ internal actual object DefaultExecutor : CoroutineDispatcher(), Delay {
1616
takeEventLoop().dispatch(context, block)
1717
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) =
1818
takeEventLoop().scheduleResumeAfterDelay(timeMillis, continuation)
19-
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle =
20-
takeEventLoop().invokeOnTimeout(timeMillis, block)
19+
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
20+
takeEventLoop().invokeOnTimeout(timeMillis, block, context)
2121

2222
actual fun enqueue(task: Runnable): Unit = loopWasShutDown()
2323
}

kotlinx-coroutines-test/api/kotlinx-coroutines-test.api

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public final class kotlinx/coroutines/test/TestCoroutineDispatcher : kotlinx/cor
2525
public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V
2626
public fun dispatchYield (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V
2727
public fun getCurrentTime ()J
28-
public fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle;
28+
public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle;
2929
public fun pauseDispatcher ()V
3030
public fun pauseDispatcher (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
3131
public fun resumeDispatcher ()V

kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayControl
6565
}
6666

6767
/** @suppress */
68-
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
68+
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
6969
val node = postDelayed(block, timeMillis)
7070
return object : DisposableHandle {
7171
override fun dispose() {

kotlinx-coroutines-test/src/internal/MainTestDispatcher.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ internal class TestMainDispatcher(private val mainFactory: MainDispatcherFactory
4646
delay.delay(time)
4747
}
4848

49-
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
50-
return delay.invokeOnTimeout(timeMillis, block)
49+
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
50+
return delay.invokeOnTimeout(timeMillis, block, context)
5151
}
5252

5353
public fun setDispatcher(dispatcher: CoroutineDispatcher) {

reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public final class kotlinx/coroutines/reactor/SchedulerCoroutineDispatcher : kot
4747
public fun equals (Ljava/lang/Object;)Z
4848
public final fun getScheduler ()Lreactor/core/scheduler/Scheduler;
4949
public fun hashCode ()I
50-
public fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle;
50+
public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle;
5151
public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V
5252
public fun toString ()Ljava/lang/String;
5353
}

reactive/kotlinx-coroutines-reactor/src/Scheduler.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public class SchedulerCoroutineDispatcher(
3939
}
4040

4141
/** @suppress */
42-
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle =
42+
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
4343
scheduler.schedule(block, timeMillis, TimeUnit.MILLISECONDS).asDisposableHandle()
4444

4545
/** @suppress */

reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api

+1-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public final class kotlinx/coroutines/rx2/SchedulerCoroutineDispatcher : kotlinx
7676
public fun equals (Ljava/lang/Object;)Z
7777
public final fun getScheduler ()Lio/reactivex/Scheduler;
7878
public fun hashCode ()I
79-
public fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle;
79+
public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle;
8080
public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V
8181
public fun toString ()Ljava/lang/String;
8282
}

reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public class SchedulerCoroutineDispatcher(
3838
}
3939

4040
/** @suppress */
41-
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
41+
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
4242
val disposable = scheduler.scheduleDirect(block, timeMillis, TimeUnit.MILLISECONDS)
4343
return DisposableHandle { disposable.dispose() }
4444
}

0 commit comments

Comments
 (0)