Skip to content

Commit b7e1803

Browse files
committed
Dispatchers.Default backed by worker
1 parent ca3e621 commit b7e1803

File tree

6 files changed

+89
-36
lines changed

6 files changed

+89
-36
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package kotlinx.coroutines
2+
3+
import kotlinx.coroutines.channels.*
4+
import kotlin.test.*
5+
6+
/*
7+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
8+
*/
9+
10+
class DefaultDispatcherConcurrencyTest : TestBase() {
11+
12+
@Test
13+
fun testLaunchAndJoin() {
14+
expect(1)
15+
var capturedMutableState = 0
16+
val job = GlobalScope.launch {
17+
++capturedMutableState
18+
expect(2)
19+
}
20+
runBlocking { job.join() }
21+
assertEquals(1, capturedMutableState)
22+
finish(3)
23+
}
24+
25+
@Test
26+
fun testDefaultDispatcherIsActuallyMultithreaded() {
27+
val channel = Channel<Int>()
28+
GlobalScope.launch {
29+
channel.send(42)
30+
}
31+
32+
var result = ChannelResult.failure<Int>()
33+
while (!result.isSuccess) {
34+
result = channel.tryReceive()
35+
// Block the thread, wait
36+
}
37+
// Delivery was successful, let's check it
38+
assertEquals(42, result.getOrThrow())
39+
}
40+
41+
@Test
42+
fun testDelayInDefaultDispatcher() {
43+
expect(1)
44+
val job = GlobalScope.launch {
45+
expect(2)
46+
delay(100)
47+
println("Actually resumed")
48+
expect(3)
49+
}
50+
runBlocking { job.join() }
51+
finish(4)
52+
}
53+
}

kotlinx-coroutines-core/native/src/Builders.kt

+5-7
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package kotlinx.coroutines
77
import kotlinx.cinterop.*
88
import platform.posix.*
99
import kotlin.coroutines.*
10+
import kotlin.native.concurrent.*
1011

1112
/**
1213
* Runs new coroutine and **blocks** current thread _interruptibly_ until its completion.
@@ -33,7 +34,7 @@ import kotlin.coroutines.*
3334
public actual fun <T> runBlocking(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T {
3435
val contextInterceptor = context[ContinuationInterceptor]
3536
val eventLoop: EventLoop?
36-
var newContext: CoroutineContext = context // todo: kludge for data flow analysis error
37+
val newContext: CoroutineContext
3738
if (contextInterceptor == null) {
3839
// create or use private event loop if no dispatcher is specified
3940
eventLoop = ThreadLocalEventLoop.eventLoop
@@ -57,24 +58,21 @@ private class BlockingCoroutine<T>(
5758
override val isScopedCoroutine: Boolean get() = true
5859

5960
@Suppress("UNCHECKED_CAST")
60-
fun joinBlocking(): T = memScoped {
61+
fun joinBlocking(): T {
6162
try {
6263
eventLoop?.incrementUseCount()
63-
val timespec = alloc<timespec>()
6464
while (true) {
6565
val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
6666
// note: process next even may loose unpark flag, so check if completed before parking
6767
if (isCompleted) break
68-
timespec.tv_sec = (parkNanos / 1000000000L).convert() // 1e9 ns -> sec
69-
timespec.tv_nsec = (parkNanos % 1000000000L).convert() // % 1e9
70-
nanosleep(timespec.ptr, null)
68+
Worker.current.park(parkNanos / 1000L)
7169
}
7270
} finally { // paranoia
7371
eventLoop?.decrementUseCount()
7472
}
7573
// now return result
7674
val state = state.unboxState()
7775
(state as? CompletedExceptionally)?.let { throw it.cause }
78-
state as T
76+
return state as T
7977
}
8078
}

kotlinx-coroutines-core/native/src/CoroutineContext.kt

+17-9
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,27 @@ import kotlinx.coroutines.internal.*
88
import kotlin.coroutines.*
99
import kotlin.native.concurrent.*
1010

11-
private fun takeEventLoop(): EventLoopImpl =
12-
ThreadLocalEventLoop.currentOrNull() as? EventLoopImpl ?:
13-
error("There is no event loop. Use runBlocking { ... } to start one.")
1411

1512
internal actual object DefaultExecutor : CoroutineDispatcher(), Delay {
13+
14+
private val worker = Worker.start(name = "Dispatchers.Default")
15+
1616
override fun dispatch(context: CoroutineContext, block: Runnable) =
17-
takeEventLoop().dispatch(context, block)
18-
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) =
19-
takeEventLoop().scheduleResumeAfterDelay(timeMillis, continuation)
20-
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
21-
takeEventLoop().invokeOnTimeout(timeMillis, block, context)
17+
worker.executeAfter(0L) { block.run() }
18+
19+
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
20+
// TODO proper toMicros
21+
worker.executeAfter(timeMillis * 1000)
22+
{ with(continuation) { resumeUndispatched(Unit) } }
23+
}
24+
25+
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
26+
// No API to cancel on timeout
27+
worker.executeAfter(timeMillis * 1000) { block.run() }
28+
return NonDisposableHandle
29+
}
2230

23-
actual fun enqueue(task: Runnable): Unit = loopWasShutDown()
31+
actual fun enqueue(task: Runnable): Unit = worker.executeAfter(0L) { task.run() }
2432
}
2533

2634
internal fun loopWasShutDown(): Nothing = error("Cannot execute task because event loop was shut down")

kotlinx-coroutines-core/native/src/EventLoop.kt

+12-2
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,21 @@
44

55
package kotlinx.coroutines
66

7+
import kotlinx.cinterop.*
8+
import platform.posix.*
79
import kotlin.coroutines.*
810
import kotlin.system.*
11+
import kotlin.native.concurrent.*
912

10-
internal actual abstract class EventLoopImplPlatform: EventLoop() {
11-
protected actual fun unpark() { /* does nothing */ }
13+
internal actual abstract class EventLoopImplPlatform : EventLoop() {
14+
15+
private val current = Worker.current
16+
17+
protected actual fun unpark() {
18+
current.execute(TransferMode.SAFE, {}) {} // send an empty task to unpark the waiting event loop
19+
}
20+
21+
// TODO verify loop was shut down is an okay behaviour
1222
protected actual fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask): Unit =
1323
loopWasShutDown()
1424
}

kotlinx-coroutines-core/native/test/DelayExceptionTest.kt

-16
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,6 @@ import kotlin.coroutines.*
88
import kotlin.test.*
99

1010
class DelayExceptionTest : TestBase() {
11-
private object Dispatcher : CoroutineDispatcher() {
12-
override fun isDispatchNeeded(context: CoroutineContext): Boolean = true
13-
override fun dispatch(context: CoroutineContext, block: Runnable) { block.run() }
14-
}
15-
16-
private lateinit var exception: Throwable
17-
18-
19-
@Test
20-
fun testThrowsTce() {
21-
CoroutineScope(Dispatcher + CoroutineExceptionHandler { _, e -> exception = e }).launch {
22-
delay(10)
23-
}
24-
25-
assertTrue(exception is IllegalStateException)
26-
}
2711

2812
@Test
2913
fun testMaxDelay() = runBlocking {

kotlinx-coroutines-core/native/test/WorkerTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ class WorkerTest : TestBase() {
2323
}
2424

2525
@Test
26-
fun testLaunchInWorkerTroughGlobalScope() {
26+
fun testLaunchInWorkerThroughGlobalScope() {
2727
val worker = Worker.start()
2828
worker.execute(TransferMode.SAFE, { }) {
2929
runBlocking {
3030
CoroutineScope(EmptyCoroutineContext).launch {
31-
delay(1)
31+
delay(10)
3232
}.join()
3333
}
3434
}.result

0 commit comments

Comments
 (0)