-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathExecutors.kt
210 lines (188 loc) · 9.16 KB
/
Executors.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
package kotlinx.coroutines
import kotlinx.coroutines.internal.*
import java.io.Closeable
import java.util.concurrent.*
import kotlin.coroutines.*
import kotlin.AutoCloseable
/**
* [CoroutineDispatcher] that has underlying [Executor] for dispatching tasks.
* Instances of [ExecutorCoroutineDispatcher] should be closed by the owner of the dispatcher.
*
* This class is generally used as a bridge between coroutine-based API and
* asynchronous API that requires an instance of the [Executor].
*/
public abstract class ExecutorCoroutineDispatcher : CoroutineDispatcher(), Closeable, AutoCloseable {
/** @suppress */
@ExperimentalStdlibApi
public companion object Key : AbstractCoroutineContextKey<CoroutineDispatcher, ExecutorCoroutineDispatcher>(
CoroutineDispatcher,
{ it as? ExecutorCoroutineDispatcher })
/**
* Underlying executor of current [CoroutineDispatcher].
*/
public abstract val executor: Executor
/**
* Closes this coroutine dispatcher and shuts down its executor.
*
* It may throw an exception if this dispatcher is global and cannot be closed.
*/
public abstract override fun close()
}
@ExperimentalCoroutinesApi
public actual typealias CloseableCoroutineDispatcher = ExecutorCoroutineDispatcher
/**
* Converts an instance of [ExecutorService] to an implementation of [ExecutorCoroutineDispatcher].
*
* ## Interaction with [delay] and time-based coroutines.
*
* If the given [ExecutorService] is an instance of [ScheduledExecutorService], then all time-related
* coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled
* on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding
* coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future.
*
* If the given [ExecutorService] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling,
* remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order
* to reduce the memory pressure of cancelled coroutines.
*
* If the executor service is neither of this types, the separate internal thread will be used to
* _track_ the delay and time-related executions, but the coroutine itself will still be executed
* on top of the given executor.
*
* ## Rejected execution
* If the underlying executor throws [RejectedExecutionException] on
* attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the
* resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues),
* then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the
* [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete.
*/
@JvmName("from") // this is for a nice Java API, see issue #255
public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher =
ExecutorCoroutineDispatcherImpl(this)
/**
* Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
*
* ## Interaction with [delay] and time-based coroutines.
*
* If the given [Executor] is an instance of [ScheduledExecutorService], then all time-related
* coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled
* on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding
* coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future.
*
* If the given [Executor] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling,
* remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order
* to reduce the memory pressure of cancelled coroutines.
*
* If the executor is neither of this types, the separate internal thread will be used to
* _track_ the delay and time-related executions, but the coroutine itself will still be executed
* on top of the given executor.
*
* ## Rejected execution
*
* If the underlying executor throws [RejectedExecutionException] on
* attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the
* resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues),
* then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the
* [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete.
*/
@JvmName("from") // this is for a nice Java API, see issue #255
public fun Executor.asCoroutineDispatcher(): CoroutineDispatcher =
(this as? DispatcherExecutor)?.dispatcher ?: ExecutorCoroutineDispatcherImpl(this)
/**
* Converts an instance of [CoroutineDispatcher] to an implementation of [Executor].
*
* It returns the original executor when used on the result of [Executor.asCoroutineDispatcher] extensions.
*/
public fun CoroutineDispatcher.asExecutor(): Executor =
(this as? ExecutorCoroutineDispatcher)?.executor ?: DispatcherExecutor(this)
private class DispatcherExecutor(@JvmField val dispatcher: CoroutineDispatcher) : Executor {
override fun execute(block: Runnable) {
if (dispatcher.safeIsDispatchNeeded(EmptyCoroutineContext)) {
dispatcher.safeDispatch(EmptyCoroutineContext, block)
} else {
block.run()
}
}
override fun toString(): String = dispatcher.toString()
}
internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcher(), Delay {
init {
/* Attempt to invoke ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy in order to clean up
* the internal scheduler queue on cancellation. */
if (executor is ScheduledThreadPoolExecutor) {
executor.removeOnCancelPolicy = true
}
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
try {
executor.execute(wrapTask(block))
} catch (e: RejectedExecutionException) {
unTrackTask()
cancelJobOnRejection(context, e)
Dispatchers.IO.dispatch(context, block)
}
}
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val future = (executor as? ScheduledExecutorService)?.scheduleBlock(
ResumeUndispatchedRunnable(this, continuation),
continuation.context,
timeMillis
)
// If everything went fine and the scheduling attempt was not rejected -- use it
if (future != null) {
continuation.invokeOnCancellation(CancelFutureOnCancel(future))
return
}
// Otherwise fallback to default executor
DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation)
}
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
val future = (executor as? ScheduledExecutorService)?.scheduleBlock(block, context, timeMillis)
return when {
future != null -> DisposableFutureHandle(future)
else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context)
}
}
private fun ScheduledExecutorService.scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? {
return try {
schedule(block, timeMillis, TimeUnit.MILLISECONDS)
} catch (e: RejectedExecutionException) {
cancelJobOnRejection(context, e)
null
}
}
private fun cancelJobOnRejection(context: CoroutineContext, exception: RejectedExecutionException) {
context.cancel(CancellationException("The task was rejected", exception))
}
override fun close() {
(executor as? ExecutorService)?.shutdown()
}
override fun toString(): String = executor.toString()
override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherImpl && other.executor === executor
override fun hashCode(): Int = System.identityHashCode(executor)
}
private class ResumeUndispatchedRunnable(
private val dispatcher: CoroutineDispatcher,
private val continuation: CancellableContinuation<Unit>
) : Runnable {
override fun run() {
with(continuation) { dispatcher.resumeUndispatched(Unit) }
}
}
/**
* An implementation of [DisposableHandle] that cancels the specified future on dispose.
* @suppress **This is unstable API and it is subject to change.**
*/
private class DisposableFutureHandle(private val future: Future<*>) : DisposableHandle {
override fun dispose() {
future.cancel(false)
}
override fun toString(): String = "DisposableFutureHandle[$future]"
}
private class CancelFutureOnCancel(private val future: Future<*>) : CancelHandler {
override fun invoke(cause: Throwable?) {
// Don't interrupt when cancelling future on completion, because no one is going to reset this
// interruption flag and it will cause spurious failures elsewhere
future.cancel(false)
}
override fun toString() = "CancelFutureOnCancel[$future]"
}