Skip to content

Dispatchers.Default backed by worker #2853

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 3, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ class FlowInvariantsTest : TestBase() {
}
expectUnreached()
} catch (e: IllegalStateException) {
assertTrue(e.message!!.contains("Flow invariant is violated"))
assertTrue(e.message!!.contains("Flow invariant is violated"), "But had: ${e.message}")
finish(2)
}
}
Expand Down
20 changes: 20 additions & 0 deletions kotlinx-coroutines-core/concurrent/src/SingleThread.common.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

@ExperimentalCoroutinesApi
public expect fun newSingleThreadContext(name: String): SingleThreadDispatcher

/**
* A coroutine dispatcher that is confined to a single thread.
*/
@ExperimentalCoroutinesApi
public expect abstract class SingleThreadDispatcher : CoroutineDispatcher {

/**
* Closes this coroutine dispatcher and shuts down its thread.
*/
public abstract fun close()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package kotlinx.coroutines

import kotlinx.coroutines.channels.*
import kotlin.test.*

/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

abstract class AbstractDispatcherConcurrencyTest : TestBase() {

public abstract val dispatcher: CoroutineDispatcher

@Test
fun testLaunchAndJoin() {
expect(1)
var capturedMutableState = 0
val job = GlobalScope.launch(dispatcher) {
++capturedMutableState
expect(2)
}
runBlocking { job.join() }
assertEquals(1, capturedMutableState)
finish(3)
}

@Test
fun testDispatcherIsActuallyMultithreaded() {
val channel = Channel<Int>()
GlobalScope.launch(dispatcher) {
channel.send(42)
}

var result = ChannelResult.failure<Int>()
while (!result.isSuccess) {
result = channel.tryReceive()
// Block the thread, wait
}
// Delivery was successful, let's check it
assertEquals(42, result.getOrThrow())
}

@Test
fun testDelayInDefaultDispatcher() {
expect(1)
val job = GlobalScope.launch(dispatcher) {
expect(2)
delay(100)
expect(3)
}
runBlocking { job.join() }
finish(4)
}
}
Original file line number Diff line number Diff line change
@@ -1,53 +1,9 @@
package kotlinx.coroutines

import kotlinx.coroutines.channels.*
import kotlin.test.*

/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

class DefaultDispatcherConcurrencyTest : TestBase() {

@Test
fun testLaunchAndJoin() {
expect(1)
var capturedMutableState = 0
val job = GlobalScope.launch {
++capturedMutableState
expect(2)
}
runBlocking { job.join() }
assertEquals(1, capturedMutableState)
finish(3)
}

@Test
fun testDefaultDispatcherIsActuallyMultithreaded() {
val channel = Channel<Int>()
GlobalScope.launch {
channel.send(42)
}

var result = ChannelResult.failure<Int>()
while (!result.isSuccess) {
result = channel.tryReceive()
// Block the thread, wait
}
// Delivery was successful, let's check it
assertEquals(42, result.getOrThrow())
}

@Test
fun testDelayInDefaultDispatcher() {
expect(1)
val job = GlobalScope.launch {
expect(2)
delay(100)
println("Actually resumed")
expect(3)
}
runBlocking { job.join() }
finish(4)
}
class DefaultDispatcherConcurrencyTest : AbstractDispatcherConcurrencyTest() {
override val dispatcher: CoroutineDispatcher = Dispatchers.Default
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package kotlinx.coroutines

import kotlin.test.*

/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

class SingleThreadDispatcherConcurrencyTest : AbstractDispatcherConcurrencyTest() {
override val dispatcher: CoroutineDispatcher = newSingleThreadContext("SingleThreadDispatcherConcurrencyTest")

@AfterTest
fun shutDown() = (dispatcher as SingleThreadDispatcher).close()
}
7 changes: 6 additions & 1 deletion kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,14 @@ import java.util.concurrent.atomic.AtomicInteger
* @param name the base name of the created thread.
*/
@ObsoleteCoroutinesApi
public fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher =
public actual fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher =
newFixedThreadPoolContext(1, name)

/**
* A coroutine dispatcher that is confined to a single thread.
*/
public actual typealias SingleThreadDispatcher = ExecutorCoroutineDispatcher

/**
* Creates a coroutine execution context with the fixed-size thread-pool and built-in [yield] support.
* **NOTE: The resulting [ExecutorCoroutineDispatcher] owns native resources (its threads).
Expand Down
18 changes: 5 additions & 13 deletions kotlinx-coroutines-core/native/src/CoroutineContext.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,16 @@ import kotlin.native.concurrent.*

internal actual object DefaultExecutor : CoroutineDispatcher(), Delay {

private val worker = Worker.start(name = "Dispatchers.Default")
private val delegate = SingleThreadDispatcherImpl(name = "Dispatchers.Default")

override fun dispatch(context: CoroutineContext, block: Runnable) =
worker.executeAfter(0L) { block.run() }
delegate.dispatch(context, block)

override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
// TODO proper toMicros
worker.executeAfter(timeMillis * 1000)
{ with(continuation) { resumeUndispatched(Unit) } }
}
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) = delegate.scheduleResumeAfterDelay(timeMillis, continuation)

override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
// No API to cancel on timeout
worker.executeAfter(timeMillis * 1000) { block.run() }
return NonDisposableHandle
}
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = delegate.invokeOnTimeout(timeMillis, block, context)

actual fun enqueue(task: Runnable): Unit = worker.executeAfter(0L) { task.run() }
actual fun enqueue(task: Runnable): Unit = delegate.dispatch(EmptyCoroutineContext, task)
}

internal fun loopWasShutDown(): Nothing = error("Cannot execute task because event loop was shut down")
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/native/src/EventLoop.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ internal actual abstract class EventLoopImplPlatform : EventLoop() {
current.execute(TransferMode.SAFE, {}) {} // send an empty task to unpark the waiting event loop
}

// TODO verify loop was shut down is an okay behaviour
// TODO actually reschedule
protected actual fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask): Unit =
loopWasShutDown()
}
Expand Down
42 changes: 42 additions & 0 deletions kotlinx-coroutines-core/native/src/SingleThread.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import kotlin.coroutines.*
import kotlin.native.concurrent.*

@ExperimentalCoroutinesApi
public actual fun newSingleThreadContext(name: String): SingleThreadDispatcher = SingleThreadDispatcherImpl(name)

/**
* A coroutine dispatcher that is confined to a single thread.
*/
@ExperimentalCoroutinesApi
public actual abstract class SingleThreadDispatcher : CoroutineDispatcher() {
public actual abstract fun close()
}

internal class SingleThreadDispatcherImpl(name: String) : SingleThreadDispatcher(), Delay {
private val worker = Worker.start(name = name)

override fun dispatch(context: CoroutineContext, block: Runnable) =
worker.executeAfter(0L) { block.run() }

override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
// TODO proper toMicros
worker.executeAfter(timeMillis * 1000)
{ with(continuation) { resumeUndispatched(Unit) } }
}

override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
// No API to cancel on timeout
worker.executeAfter(timeMillis * 1000) { block.run() }
return NonDisposableHandle
}

override fun close() {
worker.requestTermination().result // Note: calling "result" blocks
}
}