Skip to content

Dispatchers.Default backed by worker #2853

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 3, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ class FlowInvariantsTest : TestBase() {
}
expectUnreached()
} catch (e: IllegalStateException) {
assertTrue(e.message!!.contains("Flow invariant is violated"))
assertTrue(e.message!!.contains("Flow invariant is violated"), "But had: ${e.message}")
finish(2)
}
}
Expand Down
20 changes: 20 additions & 0 deletions kotlinx-coroutines-core/concurrent/src/SingleThread.common.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

@ExperimentalCoroutinesApi
public expect fun newSingleThreadContext(name: String): SingleThreadDispatcher

/**
* A coroutine dispatcher that is confined to a single thread.
*/
@ExperimentalCoroutinesApi
public expect abstract class SingleThreadDispatcher : CoroutineDispatcher {

/**
* Closes this coroutine dispatcher and shuts down its thread.
*/
public abstract fun close()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package kotlinx.coroutines

import kotlinx.coroutines.channels.*
import kotlin.test.*

/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

abstract class AbstractDispatcherConcurrencyTest : TestBase() {

public abstract val dispatcher: CoroutineDispatcher

@Test
fun testLaunchAndJoin() {
expect(1)
var capturedMutableState = 0
val job = GlobalScope.launch(dispatcher) {
++capturedMutableState
expect(2)
}
runBlocking { job.join() }
assertEquals(1, capturedMutableState)
finish(3)
}

@Test
fun testDispatcherIsActuallyMultithreaded() {
val channel = Channel<Int>()
GlobalScope.launch(dispatcher) {
channel.send(42)
}

var result = ChannelResult.failure<Int>()
while (!result.isSuccess) {
result = channel.tryReceive()
// Block the thread, wait
}
// Delivery was successful, let's check it
assertEquals(42, result.getOrThrow())
}

@Test
fun testDelayInDefaultDispatcher() {
expect(1)
val job = GlobalScope.launch(dispatcher) {
expect(2)
delay(100)
expect(3)
}
runBlocking { job.join() }
finish(4)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package kotlinx.coroutines

/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

class DefaultDispatcherConcurrencyTest : AbstractDispatcherConcurrencyTest() {
override val dispatcher: CoroutineDispatcher = Dispatchers.Default
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package kotlinx.coroutines

import kotlin.test.*

/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

class SingleThreadDispatcherConcurrencyTest : AbstractDispatcherConcurrencyTest() {
override val dispatcher: CoroutineDispatcher = newSingleThreadContext("SingleThreadDispatcherConcurrencyTest")

@AfterTest
fun shutDown() = (dispatcher as SingleThreadDispatcher).close()
}
7 changes: 6 additions & 1 deletion kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,14 @@ import java.util.concurrent.atomic.AtomicInteger
* @param name the base name of the created thread.
*/
@ObsoleteCoroutinesApi
public fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher =
public actual fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher =
newFixedThreadPoolContext(1, name)

/**
* A coroutine dispatcher that is confined to a single thread.
*/
public actual typealias SingleThreadDispatcher = ExecutorCoroutineDispatcher

/**
* Creates a coroutine execution context with the fixed-size thread-pool and built-in [yield] support.
* **NOTE: The resulting [ExecutorCoroutineDispatcher] owns native resources (its threads).
Expand Down
12 changes: 5 additions & 7 deletions kotlinx-coroutines-core/native/src/Builders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package kotlinx.coroutines
import kotlinx.cinterop.*
import platform.posix.*
import kotlin.coroutines.*
import kotlin.native.concurrent.*

/**
* Runs new coroutine and **blocks** current thread _interruptibly_ until its completion.
Expand All @@ -33,7 +34,7 @@ import kotlin.coroutines.*
public actual fun <T> runBlocking(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T {
val contextInterceptor = context[ContinuationInterceptor]
val eventLoop: EventLoop?
var newContext: CoroutineContext = context // todo: kludge for data flow analysis error
val newContext: CoroutineContext
if (contextInterceptor == null) {
// create or use private event loop if no dispatcher is specified
eventLoop = ThreadLocalEventLoop.eventLoop
Expand All @@ -57,24 +58,21 @@ private class BlockingCoroutine<T>(
override val isScopedCoroutine: Boolean get() = true

@Suppress("UNCHECKED_CAST")
fun joinBlocking(): T = memScoped {
fun joinBlocking(): T {
try {
eventLoop?.incrementUseCount()
val timespec = alloc<timespec>()
while (true) {
val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
// note: process next even may loose unpark flag, so check if completed before parking
if (isCompleted) break
timespec.tv_sec = (parkNanos / 1000000000L).convert() // 1e9 ns -> sec
timespec.tv_nsec = (parkNanos % 1000000000L).convert() // % 1e9
nanosleep(timespec.ptr, null)
Worker.current.park(parkNanos / 1000L)
}
} finally { // paranoia
eventLoop?.decrementUseCount()
}
// now return result
val state = state.unboxState()
(state as? CompletedExceptionally)?.let { throw it.cause }
state as T
return state as T
}
}
18 changes: 9 additions & 9 deletions kotlinx-coroutines-core/native/src/CoroutineContext.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@ import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.native.concurrent.*

private fun takeEventLoop(): EventLoopImpl =
ThreadLocalEventLoop.currentOrNull() as? EventLoopImpl ?:
error("There is no event loop. Use runBlocking { ... } to start one.")

internal actual object DefaultExecutor : CoroutineDispatcher(), Delay {

private val delegate = SingleThreadDispatcherImpl(name = "Dispatchers.Default")

override fun dispatch(context: CoroutineContext, block: Runnable) =
takeEventLoop().dispatch(context, block)
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) =
takeEventLoop().scheduleResumeAfterDelay(timeMillis, continuation)
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
takeEventLoop().invokeOnTimeout(timeMillis, block, context)
delegate.dispatch(context, block)

override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) = delegate.scheduleResumeAfterDelay(timeMillis, continuation)

override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = delegate.invokeOnTimeout(timeMillis, block, context)

actual fun enqueue(task: Runnable): Unit = loopWasShutDown()
actual fun enqueue(task: Runnable): Unit = delegate.dispatch(EmptyCoroutineContext, task)
}

internal fun loopWasShutDown(): Nothing = error("Cannot execute task because event loop was shut down")
Expand Down
14 changes: 12 additions & 2 deletions kotlinx-coroutines-core/native/src/EventLoop.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,21 @@

package kotlinx.coroutines

import kotlinx.cinterop.*
import platform.posix.*
import kotlin.coroutines.*
import kotlin.system.*
import kotlin.native.concurrent.*

internal actual abstract class EventLoopImplPlatform: EventLoop() {
protected actual fun unpark() { /* does nothing */ }
internal actual abstract class EventLoopImplPlatform : EventLoop() {

private val current = Worker.current

protected actual fun unpark() {
current.execute(TransferMode.SAFE, {}) {} // send an empty task to unpark the waiting event loop
}

// TODO actually reschedule
protected actual fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask): Unit =
loopWasShutDown()
}
Expand Down
42 changes: 42 additions & 0 deletions kotlinx-coroutines-core/native/src/SingleThread.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import kotlin.coroutines.*
import kotlin.native.concurrent.*

@ExperimentalCoroutinesApi
public actual fun newSingleThreadContext(name: String): SingleThreadDispatcher = SingleThreadDispatcherImpl(name)

/**
* A coroutine dispatcher that is confined to a single thread.
*/
@ExperimentalCoroutinesApi
public actual abstract class SingleThreadDispatcher : CoroutineDispatcher() {
public actual abstract fun close()
}

internal class SingleThreadDispatcherImpl(name: String) : SingleThreadDispatcher(), Delay {
private val worker = Worker.start(name = name)

override fun dispatch(context: CoroutineContext, block: Runnable) =
worker.executeAfter(0L) { block.run() }

override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
// TODO proper toMicros
worker.executeAfter(timeMillis * 1000)
{ with(continuation) { resumeUndispatched(Unit) } }
}

override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
// No API to cancel on timeout
worker.executeAfter(timeMillis * 1000) { block.run() }
return NonDisposableHandle
}

override fun close() {
worker.requestTermination().result // Note: calling "result" blocks
}
}
16 changes: 0 additions & 16 deletions kotlinx-coroutines-core/native/test/DelayExceptionTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,6 @@ import kotlin.coroutines.*
import kotlin.test.*

class DelayExceptionTest : TestBase() {
private object Dispatcher : CoroutineDispatcher() {
override fun isDispatchNeeded(context: CoroutineContext): Boolean = true
override fun dispatch(context: CoroutineContext, block: Runnable) { block.run() }
}

private lateinit var exception: Throwable


@Test
fun testThrowsTce() {
CoroutineScope(Dispatcher + CoroutineExceptionHandler { _, e -> exception = e }).launch {
delay(10)
}

assertTrue(exception is IllegalStateException)
}

@Test
fun testMaxDelay() = runBlocking {
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/native/test/WorkerTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ class WorkerTest : TestBase() {
}

@Test
fun testLaunchInWorkerTroughGlobalScope() {
fun testLaunchInWorkerThroughGlobalScope() {
val worker = Worker.start()
worker.execute(TransferMode.SAFE, { }) {
runBlocking {
CoroutineScope(EmptyCoroutineContext).launch {
delay(1)
delay(10)
}.join()
}
}.result
Expand Down