Skip to content

Commit b517f05

Browse files
qwwdfsadelizarov
authored andcommitted
Introduce ExecutorCoroutineDispatcher instead of CloseableCoroutineDispatcher
Fixes #385
1 parent cdf8468 commit b517f05

File tree

8 files changed

+105
-46
lines changed

8 files changed

+105
-46
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+13-10
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,12 @@ public abstract class kotlinx/coroutines/experimental/CloseableCoroutineDispatch
8686
public fun <init> ()V
8787
}
8888

89-
public final class kotlinx/coroutines/experimental/CommonPool : kotlinx/coroutines/experimental/CoroutineDispatcher {
89+
public final class kotlinx/coroutines/experimental/CommonPool : kotlinx/coroutines/experimental/ExecutorCoroutineDispatcher {
9090
public static final field DEFAULT_PARALLELISM_PROPERTY_NAME Ljava/lang/String;
9191
public static final field INSTANCE Lkotlinx/coroutines/experimental/CommonPool;
92+
public fun close ()V
9293
public fun dispatch (Lkotlin/coroutines/experimental/CoroutineContext;Ljava/lang/Runnable;)V
94+
public fun getExecutor ()Ljava/util/concurrent/Executor;
9395
public fun toString ()Ljava/lang/String;
9496
}
9597

@@ -271,12 +273,6 @@ public final class kotlinx/coroutines/experimental/DispatchedTask$DefaultImpls {
271273
public static fun run (Lkotlinx/coroutines/experimental/DispatchedTask;)V
272274
}
273275

274-
public final class kotlinx/coroutines/experimental/DisposableFutureHandle : kotlinx/coroutines/experimental/DisposableHandle {
275-
public fun <init> (Ljava/util/concurrent/Future;)V
276-
public fun dispose ()V
277-
public fun toString ()Ljava/lang/String;
278-
}
279-
280276
public abstract interface class kotlinx/coroutines/experimental/DisposableHandle {
281277
public abstract fun dispose ()V
282278
}
@@ -305,7 +301,12 @@ public final class kotlinx/coroutines/experimental/EventLoopKt {
305301
public static synthetic fun EventLoop$default (Ljava/lang/Thread;Lkotlinx/coroutines/experimental/Job;ILjava/lang/Object;)Lkotlinx/coroutines/experimental/EventLoop;
306302
}
307303

308-
public abstract class kotlinx/coroutines/experimental/ExecutorCoroutineDispatcherBase : kotlinx/coroutines/experimental/CloseableCoroutineDispatcher, kotlinx/coroutines/experimental/Delay {
304+
public abstract class kotlinx/coroutines/experimental/ExecutorCoroutineDispatcher : kotlinx/coroutines/experimental/CloseableCoroutineDispatcher, java/io/Closeable {
305+
public fun <init> ()V
306+
public abstract fun getExecutor ()Ljava/util/concurrent/Executor;
307+
}
308+
309+
public abstract class kotlinx/coroutines/experimental/ExecutorCoroutineDispatcherBase : kotlinx/coroutines/experimental/ExecutorCoroutineDispatcher, kotlinx/coroutines/experimental/Delay {
309310
public fun <init> ()V
310311
public fun close ()V
311312
public fun delay (JLjava/util/concurrent/TimeUnit;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
@@ -319,7 +320,8 @@ public abstract class kotlinx/coroutines/experimental/ExecutorCoroutineDispatche
319320

320321
public final class kotlinx/coroutines/experimental/ExecutorsKt {
321322
public static final fun asCoroutineDispatcher (Ljava/util/concurrent/Executor;)Lkotlinx/coroutines/experimental/CoroutineDispatcher;
322-
public static final fun asCoroutineDispatcher (Ljava/util/concurrent/ExecutorService;)Lkotlinx/coroutines/experimental/CloseableCoroutineDispatcher;
323+
public static final synthetic fun asCoroutineDispatcher (Ljava/util/concurrent/ExecutorService;)Lkotlinx/coroutines/experimental/CloseableCoroutineDispatcher;
324+
public static final fun asCoroutineDispatcher (Ljava/util/concurrent/ExecutorService;)Lkotlinx/coroutines/experimental/ExecutorCoroutineDispatcher;
323325
public static final fun toCoroutineDispatcher (Ljava/util/concurrent/Executor;)Lkotlinx/coroutines/experimental/CoroutineDispatcher;
324326
}
325327

@@ -434,8 +436,9 @@ public final class kotlinx/coroutines/experimental/ScheduledKt {
434436
public static synthetic fun withTimeoutOrNull$default (JLjava/util/concurrent/TimeUnit;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/experimental/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
435437
}
436438

437-
public final class kotlinx/coroutines/experimental/ThreadPoolDispatcher : kotlinx/coroutines/experimental/ExecutorCoroutineDispatcherBase, java/io/Closeable {
439+
public final class kotlinx/coroutines/experimental/ThreadPoolDispatcher : kotlinx/coroutines/experimental/ExecutorCoroutineDispatcherBase {
438440
public fun close ()V
441+
public fun getExecutor ()Ljava/util/concurrent/Executor;
439442
public fun toString ()Ljava/lang/String;
440443
}
441444

core/kotlinx-coroutines-core/src/CommonPool.kt

+6-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import kotlin.coroutines.experimental.*
2121
* pool is created. This is to work around the fact that ForkJoinPool creates threads that cannot perform
2222
* privileged actions.
2323
*/
24-
object CommonPool : CoroutineDispatcher() {
24+
object CommonPool : ExecutorCoroutineDispatcher() {
2525

2626
/**
2727
* Name of the property that controls default parallelism level of [CommonPool].
@@ -31,6 +31,9 @@ object CommonPool : CoroutineDispatcher() {
3131
*/
3232
public const val DEFAULT_PARALLELISM_PROPERTY_NAME = "kotlinx.coroutines.default.parallelism"
3333

34+
override val executor: Executor
35+
get() = pool ?: getOrCreatePoolSync()
36+
3437
// Equals to -1 if not explicitly specified
3538
private val requestedParallelism = run<Int> {
3639
val property = Try { System.getProperty(DEFAULT_PARALLELISM_PROPERTY_NAME) } ?: return@run -1
@@ -132,4 +135,6 @@ object CommonPool : CoroutineDispatcher() {
132135
}
133136

134137
override fun toString(): String = "CommonPool"
138+
139+
override fun close(): Unit = error("Close cannot be invoked on CommonPool")
135140
}

core/kotlinx-coroutines-core/src/Executors.kt

+46-23
Original file line numberDiff line numberDiff line change
@@ -10,43 +10,66 @@ import java.util.concurrent.*
1010
import kotlin.coroutines.experimental.*
1111

1212
/**
13-
* [CoroutineDispatcher] that implements [Closeable]
13+
* [CoroutineDispatcher] that has underlying [Executor] for dispatching tasks.
14+
* Instances of [ExecutorCoroutineDispatcher] should be closed by the owner of the dispatcher.
15+
*
16+
* This class is generally used as a bridge between coroutine-based API and
17+
* asynchronous API which requires instance of the [Executor].
1418
*/
15-
abstract class CloseableCoroutineDispatcher: CoroutineDispatcher(), Closeable
19+
public abstract class ExecutorCoroutineDispatcher: CloseableCoroutineDispatcher(), Closeable {
20+
21+
/**
22+
* Underlying executor of current [CoroutineDispatcher].
23+
*/
24+
public abstract val executor: Executor
25+
}
1626

1727
/**
18-
* Converts an instance of [ExecutorService] to an implementation of [CloseableCoroutineDispatcher].
28+
* [CoroutineDispatcher] that implements [Closeable].
29+
*
30+
* @suppress **Deprecated**: Use [ExecutorCoroutineDispatcher].
1931
*/
20-
public fun ExecutorService.asCoroutineDispatcher(): CloseableCoroutineDispatcher =
32+
@Deprecated("Use ExecutorCoroutineDispatcher instead", replaceWith = ReplaceWith("ExecutorCoroutineDispatcher"))
33+
public abstract class CloseableCoroutineDispatcher: CoroutineDispatcher(), Closeable
34+
35+
/**
36+
* Converts an instance of [ExecutorService] to an implementation of [ExecutorCoroutineDispatcher].
37+
*/
38+
public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher =
2139
// we know that an implementation of Executor.asCoroutineDispatcher actually returns a closeable one
22-
(this as Executor).asCoroutineDispatcher() as CloseableCoroutineDispatcher
40+
(this as Executor).asCoroutineDispatcher() as ExecutorCoroutineDispatcher
2341

2442
/**
2543
* Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
26-
* @suppress **Deprecated**: Renamed to [asCoroutineDispatcher].
2744
*/
28-
@Deprecated("Renamed to `asCoroutineDispatcher`",
29-
replaceWith = ReplaceWith("asCoroutineDispatcher()"))
30-
public fun Executor.toCoroutineDispatcher(): CoroutineDispatcher =
31-
ExecutorCoroutineDispatcher(this)
45+
public fun Executor.asCoroutineDispatcher(): CoroutineDispatcher =
46+
ExecutorCoroutineDispatcherImpl(this)
47+
48+
/**
49+
* Converts an instance of [ExecutorService] to an implementation of [CloseableCoroutineDispatcher].
50+
* @suppress **Deprecated**: Return type changed to [ExecutorCoroutineDispatcher].
51+
*/
52+
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Return type changed to ExecutorCoroutineDispatcher")
53+
@JvmName("asCoroutineDispatcher") // for binary compatibility
54+
public fun ExecutorService.asCoroutineDispatcher_Deprecated(): CloseableCoroutineDispatcher =
55+
asCoroutineDispatcher()
3256

3357
/**
3458
* Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
59+
* @suppress **Deprecated**: Renamed to [asCoroutineDispatcher].
3560
*/
36-
public fun Executor.asCoroutineDispatcher(): CoroutineDispatcher =
37-
ExecutorCoroutineDispatcher(this)
61+
@Deprecated("Renamed to `asCoroutineDispatcher`",
62+
replaceWith = ReplaceWith("asCoroutineDispatcher()"))
63+
public fun Executor.toCoroutineDispatcher(): CoroutineDispatcher =
64+
asCoroutineDispatcher()
3865

39-
private class ExecutorCoroutineDispatcher(override val executor: Executor) : ExecutorCoroutineDispatcherBase()
66+
private class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcherBase()
4067

4168
/**
4269
* @suppress **This is unstable API and it is subject to change.**
4370
*/
44-
public abstract class ExecutorCoroutineDispatcherBase : CloseableCoroutineDispatcher(), Delay {
45-
/**
46-
* @suppress **This is unstable API and it is subject to change.**
47-
*/
48-
internal abstract val executor: Executor
49-
71+
public abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay {
72+
5073
override fun dispatch(context: CoroutineContext, block: Runnable) =
5174
try { executor.execute(timeSource.trackTask(block)) }
5275
catch (e: RejectedExecutionException) {
@@ -70,10 +93,10 @@ public abstract class ExecutorCoroutineDispatcherBase : CloseableCoroutineDispat
7093
try { (executor as? ScheduledExecutorService)
7194
?.schedule(block, time, unit) }
7295
catch (e: RejectedExecutionException) { null }
73-
if (timeout != null)
74-
return DisposableFutureHandle(timeout)
96+
return if (timeout != null)
97+
DisposableFutureHandle(timeout)
7598
else
76-
return DefaultExecutor.invokeOnTimeout(time, unit, block)
99+
DefaultExecutor.invokeOnTimeout(time, unit, block)
77100
}
78101

79102
override fun close() {
@@ -98,7 +121,7 @@ private class ResumeUndispatchedRunnable(
98121
* An implementation of [DisposableHandle] that cancels the specified future on dispose.
99122
* @suppress **This is unstable API and it is subject to change.**
100123
*/
101-
public class DisposableFutureHandle(private val future: Future<*>) : DisposableHandle {
124+
private class DisposableFutureHandle(private val future: Future<*>) : DisposableHandle {
102125
override fun dispose() {
103126
future.cancel(false)
104127
}

core/kotlinx-coroutines-core/src/ThreadPoolDispatcher.kt

+5-7
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,9 @@
44

55
package kotlinx.coroutines.experimental
66

7-
import java.io.Closeable
8-
import java.util.concurrent.Executors
9-
import java.util.concurrent.ScheduledExecutorService
7+
import java.util.concurrent.*
108
import java.util.concurrent.atomic.AtomicInteger
11-
import kotlin.coroutines.experimental.CoroutineContext
9+
import kotlin.coroutines.experimental.*
1210

1311
/**
1412
* Creates a new coroutine execution context using a single thread with built-in [yield] and [delay] support.
@@ -63,18 +61,18 @@ internal class PoolThread(
6361
public class ThreadPoolDispatcher internal constructor(
6462
private val nThreads: Int,
6563
private val name: String
66-
) : ExecutorCoroutineDispatcherBase(), Closeable {
64+
) : ExecutorCoroutineDispatcherBase() {
6765
private val threadNo = AtomicInteger()
6866

69-
internal override val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(nThreads) { target ->
67+
override val executor: Executor = Executors.newScheduledThreadPool(nThreads) { target ->
7068
PoolThread(this, target, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet())
7169
}
7270

7371
/**
7472
* Closes this dispatcher -- shuts down all threads in this pool and releases resources.
7573
*/
7674
public override fun close() {
77-
executor.shutdown()
75+
(executor as ExecutorService).shutdown()
7876
}
7977

8078
override fun toString(): String = "ThreadPoolDispatcher[$nThreads, $name]"

core/kotlinx-coroutines-core/src/scheduling/CoroutineScheduler.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ internal class CoroutineScheduler(
6262
private val maxPoolSize: Int,
6363
private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
6464
private val schedulerName: String = DEFAULT_SCHEDULER_NAME
65-
) : Closeable {
65+
) : Executor, Closeable {
6666
init {
6767
require(corePoolSize >= MIN_SUPPORTED_POOL_SIZE) {
6868
"Core pool size $corePoolSize should be at least $MIN_SUPPORTED_POOL_SIZE"
@@ -290,6 +290,8 @@ internal class CoroutineScheduler(
290290
private const val PARKED_VERSION_INC = 1L shl BLOCKING_SHIFT
291291
}
292292

293+
override fun execute(command: Runnable) = dispatch(command)
294+
293295
override fun close() = shutdown(1000L)
294296

295297
/*

core/kotlinx-coroutines-core/src/scheduling/ExperimentalCoroutineDispatcher.kt

+13-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ package kotlinx.coroutines.experimental.scheduling
66

77
import kotlinx.atomicfu.*
88
import kotlinx.coroutines.experimental.*
9-
import java.io.*
109
import java.util.concurrent.*
1110
import kotlin.coroutines.experimental.*
1211

@@ -18,7 +17,7 @@ class ExperimentalCoroutineDispatcher(
1817
private val corePoolSize: Int,
1918
private val maxPoolSize: Int,
2019
private val idleWorkerKeepAliveNs: Long
21-
) : CoroutineDispatcher(), Delay, Closeable {
20+
) : ExecutorCoroutineDispatcher(), Delay {
2221
constructor(
2322
corePoolSize: Int = CORE_POOL_SIZE,
2423
maxPoolSize: Int = MAX_POOL_SIZE
@@ -28,6 +27,9 @@ class ExperimentalCoroutineDispatcher(
2827
IDLE_WORKER_KEEP_ALIVE_NS
2928
)
3029

30+
override val executor: Executor
31+
get() = coroutineScheduler
32+
3133
// This is variable for test purposes, so that we can reinitialize from clean state
3234
private var coroutineScheduler = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs)
3335

@@ -40,6 +42,7 @@ class ExperimentalCoroutineDispatcher(
4042
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>): Unit =
4143
DefaultExecutor.scheduleResumeAfterDelay(time, unit, continuation)
4244

45+
// TODO throw error when this API becomes public and close it in tests via another method
4346
override fun close() = coroutineScheduler.close()
4447

4548
override fun toString(): String {
@@ -90,11 +93,18 @@ private class LimitingDispatcher(
9093
val dispatcher: ExperimentalCoroutineDispatcher,
9194
val parallelism: Int,
9295
override val taskMode: TaskMode
93-
) : CoroutineDispatcher(), Delay, TaskContext {
96+
) : ExecutorCoroutineDispatcher(), Delay, TaskContext, Executor {
9497

9598
private val queue = ConcurrentLinkedQueue<Runnable>()
9699
private val inFlightTasks = atomic(0)
97100

101+
override val executor: Executor
102+
get() = this
103+
104+
override fun execute(command: Runnable) = dispatch(command, false)
105+
106+
override fun close(): Unit = error("Close cannot be invoked on LimitingBlockingDispatcher")
107+
98108
override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)
99109

100110
private fun dispatch(block: Runnable, fair: Boolean) {

core/kotlinx-coroutines-core/test/guide/test/TestUtil.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ private fun shutdownDispatcherPools(timeout: Long) {
100100
for (i in 0 until n) {
101101
val thread = threads[i]
102102
if (thread is PoolThread)
103-
thread.dispatcher.executor.apply {
103+
(thread.dispatcher.executor as ExecutorService).apply {
104104
shutdown()
105105
awaitTermination(timeout, TimeUnit.MILLISECONDS)
106106
shutdownNow().forEach { DefaultExecutor.execute(it) }

integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt

+18
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import org.junit.Assert.*
1212
import java.util.concurrent.*
1313
import java.util.concurrent.atomic.*
1414
import java.util.concurrent.locks.*
15+
import java.util.function.Supplier
1516
import kotlin.concurrent.*
1617
import kotlin.coroutines.experimental.*
1718

@@ -314,6 +315,23 @@ class FutureTest : TestBase() {
314315
}
315316
}
316317

318+
private val threadLocal = ThreadLocal<String>()
319+
320+
@Test
321+
fun testApiBridge() = runTest {
322+
val result = newSingleThreadContext("ctx").use {
323+
val future = CompletableFuture.supplyAsync(Supplier { threadLocal.set("value") }, it.executor)
324+
val job = async(it) {
325+
future.await()
326+
threadLocal.get()
327+
}
328+
329+
job.await()
330+
}
331+
332+
assertEquals("value", result)
333+
}
334+
317335
class TestException(message: String) : Exception(message)
318336

319337
private fun wrapContinuation(wrapper: (() -> Unit) -> Unit): CoroutineDispatcher = object : CoroutineDispatcher() {

0 commit comments

Comments
 (0)