Skip to content

Commit 4de3654

Browse files
committed
Introduce newSingleThreadDispatcher
1 parent b7e1803 commit 4de3654

10 files changed

+145
-63
lines changed

kotlinx-coroutines-core/common/test/flow/FlowInvariantsTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ class FlowInvariantsTest : TestBase() {
235235
}
236236
expectUnreached()
237237
} catch (e: IllegalStateException) {
238-
assertTrue(e.message!!.contains("Flow invariant is violated"))
238+
assertTrue(e.message!!.contains("Flow invariant is violated"), "But had: ${e.message}")
239239
finish(2)
240240
}
241241
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
@ExperimentalCoroutinesApi
8+
public expect fun newSingleThreadContext(name: String): SingleThreadDispatcher
9+
10+
/**
11+
* A coroutine dispatcher that is confined to a single thread.
12+
*/
13+
@ExperimentalCoroutinesApi
14+
public expect abstract class SingleThreadDispatcher : CoroutineDispatcher {
15+
16+
/**
17+
* Closes this coroutine dispatcher and shuts down its thread.
18+
*/
19+
public abstract fun close()
20+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package kotlinx.coroutines
2+
3+
import kotlinx.coroutines.channels.*
4+
import kotlin.test.*
5+
6+
/*
7+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
8+
*/
9+
10+
abstract class AbstractDispatcherConcurrencyTest : TestBase() {
11+
12+
public abstract val dispatcher: CoroutineDispatcher
13+
14+
@Test
15+
fun testLaunchAndJoin() {
16+
expect(1)
17+
var capturedMutableState = 0
18+
val job = GlobalScope.launch(dispatcher) {
19+
++capturedMutableState
20+
expect(2)
21+
}
22+
runBlocking { job.join() }
23+
assertEquals(1, capturedMutableState)
24+
finish(3)
25+
}
26+
27+
@Test
28+
fun testDispatcherIsActuallyMultithreaded() {
29+
val channel = Channel<Int>()
30+
GlobalScope.launch(dispatcher) {
31+
channel.send(42)
32+
}
33+
34+
var result = ChannelResult.failure<Int>()
35+
while (!result.isSuccess) {
36+
result = channel.tryReceive()
37+
// Block the thread, wait
38+
}
39+
// Delivery was successful, let's check it
40+
assertEquals(42, result.getOrThrow())
41+
}
42+
43+
@Test
44+
fun testDelayInDefaultDispatcher() {
45+
expect(1)
46+
val job = GlobalScope.launch(dispatcher) {
47+
expect(2)
48+
delay(100)
49+
expect(3)
50+
}
51+
runBlocking { job.join() }
52+
finish(4)
53+
}
54+
}
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,9 @@
11
package kotlinx.coroutines
22

3-
import kotlinx.coroutines.channels.*
4-
import kotlin.test.*
5-
63
/*
74
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
85
*/
96

10-
class DefaultDispatcherConcurrencyTest : TestBase() {
11-
12-
@Test
13-
fun testLaunchAndJoin() {
14-
expect(1)
15-
var capturedMutableState = 0
16-
val job = GlobalScope.launch {
17-
++capturedMutableState
18-
expect(2)
19-
}
20-
runBlocking { job.join() }
21-
assertEquals(1, capturedMutableState)
22-
finish(3)
23-
}
24-
25-
@Test
26-
fun testDefaultDispatcherIsActuallyMultithreaded() {
27-
val channel = Channel<Int>()
28-
GlobalScope.launch {
29-
channel.send(42)
30-
}
31-
32-
var result = ChannelResult.failure<Int>()
33-
while (!result.isSuccess) {
34-
result = channel.tryReceive()
35-
// Block the thread, wait
36-
}
37-
// Delivery was successful, let's check it
38-
assertEquals(42, result.getOrThrow())
39-
}
40-
41-
@Test
42-
fun testDelayInDefaultDispatcher() {
43-
expect(1)
44-
val job = GlobalScope.launch {
45-
expect(2)
46-
delay(100)
47-
println("Actually resumed")
48-
expect(3)
49-
}
50-
runBlocking { job.join() }
51-
finish(4)
52-
}
7+
class DefaultDispatcherConcurrencyTest : AbstractDispatcherConcurrencyTest() {
8+
override val dispatcher: CoroutineDispatcher = Dispatchers.Default
539
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package kotlinx.coroutines
2+
3+
import kotlin.test.*
4+
5+
/*
6+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
7+
*/
8+
9+
class SingleThreadDispatcherConcurrencyTest : AbstractDispatcherConcurrencyTest() {
10+
override val dispatcher: CoroutineDispatcher = newSingleThreadContext("SingleThreadDispatcherConcurrencyTest")
11+
12+
@AfterTest
13+
fun shutDown() = (dispatcher as SingleThreadDispatcher).close()
14+
}

kotlinx-coroutines-core/js/src/EventLoop.kt

-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ internal actual abstract class EventLoopImplPlatform : EventLoop() {
2020
}
2121

2222
internal actual object DefaultExecutor {
23-
public actual fun enqueue(task: Runnable): Unit = unsupported()
2423
}
2524

2625
private fun unsupported(): Nothing =

kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt

+6-1
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,14 @@ import java.util.concurrent.atomic.AtomicInteger
3030
* @param name the base name of the created thread.
3131
*/
3232
@ObsoleteCoroutinesApi
33-
public fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher =
33+
public actual fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher =
3434
newFixedThreadPoolContext(1, name)
3535

36+
/**
37+
* A coroutine dispatcher that is confined to a single thread.
38+
*/
39+
public actual typealias SingleThreadDispatcher = ExecutorCoroutineDispatcher
40+
3641
/**
3742
* Creates a coroutine execution context with the fixed-size thread-pool and built-in [yield] support.
3843
* **NOTE: The resulting [ExecutorCoroutineDispatcher] owns native resources (its threads).

kotlinx-coroutines-core/native/src/CoroutineContext.kt

+5-13
Original file line numberDiff line numberDiff line change
@@ -11,24 +11,16 @@ import kotlin.native.concurrent.*
1111

1212
internal actual object DefaultExecutor : CoroutineDispatcher(), Delay {
1313

14-
private val worker = Worker.start(name = "Dispatchers.Default")
14+
private val delegate = SingleThreadDispatcherImpl(name = "Dispatchers.Default")
1515

1616
override fun dispatch(context: CoroutineContext, block: Runnable) =
17-
worker.executeAfter(0L) { block.run() }
17+
delegate.dispatch(context, block)
1818

19-
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
20-
// TODO proper toMicros
21-
worker.executeAfter(timeMillis * 1000)
22-
{ with(continuation) { resumeUndispatched(Unit) } }
23-
}
19+
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) = delegate.scheduleResumeAfterDelay(timeMillis, continuation)
2420

25-
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
26-
// No API to cancel on timeout
27-
worker.executeAfter(timeMillis * 1000) { block.run() }
28-
return NonDisposableHandle
29-
}
21+
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = delegate.invokeOnTimeout(timeMillis, block, context)
3022

31-
actual fun enqueue(task: Runnable): Unit = worker.executeAfter(0L) { task.run() }
23+
actual fun enqueue(task: Runnable): Unit = delegate.dispatch(EmptyCoroutineContext, task)
3224
}
3325

3426
internal fun loopWasShutDown(): Nothing = error("Cannot execute task because event loop was shut down")

kotlinx-coroutines-core/native/src/EventLoop.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ internal actual abstract class EventLoopImplPlatform : EventLoop() {
1818
current.execute(TransferMode.SAFE, {}) {} // send an empty task to unpark the waiting event loop
1919
}
2020

21-
// TODO verify loop was shut down is an okay behaviour
21+
// TODO actually reschedule
2222
protected actual fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask): Unit =
2323
loopWasShutDown()
2424
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlin.coroutines.*
8+
import kotlin.native.concurrent.*
9+
10+
@ExperimentalCoroutinesApi
11+
public actual fun newSingleThreadContext(name: String): SingleThreadDispatcher = SingleThreadDispatcherImpl(name)
12+
13+
/**
14+
* A coroutine dispatcher that is confined to a single thread.
15+
*/
16+
@ExperimentalCoroutinesApi
17+
public actual abstract class SingleThreadDispatcher : CoroutineDispatcher() {
18+
public actual abstract fun close()
19+
}
20+
21+
internal class SingleThreadDispatcherImpl(name: String) : SingleThreadDispatcher(), Delay {
22+
private val worker = Worker.start(name = name)
23+
24+
override fun dispatch(context: CoroutineContext, block: Runnable) =
25+
worker.executeAfter(0L) { block.run() }
26+
27+
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
28+
// TODO proper toMicros
29+
worker.executeAfter(timeMillis * 1000)
30+
{ with(continuation) { resumeUndispatched(Unit) } }
31+
}
32+
33+
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
34+
// No API to cancel on timeout
35+
worker.executeAfter(timeMillis * 1000) { block.run() }
36+
return NonDisposableHandle
37+
}
38+
39+
override fun close() {
40+
worker.requestTermination().result // Note: calling "result" blocks
41+
}
42+
}

0 commit comments

Comments
 (0)