Skip to content

Dispatchers.Default backed by worker #2853

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions kotlinx-coroutines-core/common/test/EmptyContext.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@ package kotlinx.coroutines
import kotlinx.coroutines.intrinsics.*
import kotlin.coroutines.*

suspend fun <T> withEmptyContext(block: suspend () -> T): T {
val baseline = Result.failure<T>(IllegalStateException("Block was suspended"))
var result: Result<T> = baseline
block.startCoroutineUnintercepted(Continuation(EmptyCoroutineContext) { result = it })
while (result == baseline) yield()
return result.getOrThrow()
suspend fun <T> withEmptyContext(block: suspend () -> T): T = suspendCoroutine { cont ->
block.startCoroutineUnintercepted(Continuation(EmptyCoroutineContext) { cont.resumeWith(it) })
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ class FlowInvariantsTest : TestBase() {
}
expectUnreached()
} catch (e: IllegalStateException) {
assertTrue(e.message!!.contains("Flow invariant is violated"))
assertTrue(e.message!!.contains("Flow invariant is violated"), "But had: ${e.message}")
finish(2)
}
}
Expand Down
20 changes: 20 additions & 0 deletions kotlinx-coroutines-core/concurrent/src/SingleThread.common.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

@ExperimentalCoroutinesApi
public expect fun newSingleThreadContext(name: String): SingleThreadDispatcher

/**
* A coroutine dispatcher that is confined to a single thread.
*/
@ExperimentalCoroutinesApi
public expect abstract class SingleThreadDispatcher : CoroutineDispatcher {

/**
* Closes this coroutine dispatcher and shuts down its thread.
*/
public abstract fun close()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package kotlinx.coroutines

import kotlinx.coroutines.channels.*
import kotlin.test.*

/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

abstract class AbstractDispatcherConcurrencyTest : TestBase() {

public abstract val dispatcher: CoroutineDispatcher

@Test
fun testLaunchAndJoin() {
expect(1)
var capturedMutableState = 0
val job = GlobalScope.launch(dispatcher) {
++capturedMutableState
expect(2)
}
runBlocking { job.join() }
assertEquals(1, capturedMutableState)
finish(3)
}

@Test
fun testDispatcherIsActuallyMultithreaded() {
val channel = Channel<Int>()
GlobalScope.launch(dispatcher) {
channel.send(42)
}

var result = ChannelResult.failure<Int>()
while (!result.isSuccess) {
result = channel.tryReceive()
// Block the thread, wait
}
// Delivery was successful, let's check it
assertEquals(42, result.getOrThrow())
}

@Test
fun testDelayInDefaultDispatcher() {
expect(1)
val job = GlobalScope.launch(dispatcher) {
expect(2)
delay(100)
expect(3)
}
runBlocking { job.join() }
finish(4)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import kotlinx.coroutines.exceptions.*
import kotlin.test.*

class ConcurrentExceptionsStressTest : TestBase() {
private val nWorkers = 4
private val nRepeat = 1000 * stressTestMultiplier

private val workers = Array(nWorkers) { index ->
newSingleThreadContext("JobExceptionsStressTest-$index")
}

@AfterTest
fun tearDown() {
workers.forEach {
it.close()
}
}

@Test
fun testStress() = runTest {
repeat(nRepeat) {
testOnce()
}
}

@Suppress("SuspendFunctionOnCoroutineScope") // workaround native inline fun stacktraces
private suspend fun CoroutineScope.testOnce() {
val deferred = async(NonCancellable) {
repeat(nWorkers) { index ->
// Always launch a coroutine even if parent job was already cancelled (atomic start)
launch(workers[index], start = CoroutineStart.ATOMIC) {
randomWait()
throw StressException(index)
}
}
}
deferred.join()
assertTrue(deferred.isCancelled)
val completionException = deferred.getCompletionExceptionOrNull()
val cause = completionException as? StressException
?: unexpectedException("completion", completionException)
val suppressed = cause.suppressed
val indices = listOf(cause.index) + suppressed.mapIndexed { index, e ->
(e as? StressException)?.index ?: unexpectedException("suppressed $index", e)
}
repeat(nWorkers) { index ->
assertTrue(index in indices, "Exception $index is missing: $indices")
}
assertEquals(nWorkers, indices.size, "Duplicated exceptions in list: $indices")
}

private fun unexpectedException(msg: String, e: Throwable?): Nothing {
e?.printStackTrace()
throw IllegalStateException("Unexpected $msg exception", e)
}

private class StressException(val index: Int) : SuppressSupportingThrowable()
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.exceptions

internal expect open class SuppressSupportingThrowable() : Throwable
expect val Throwable.suppressed: Array<Throwable>
expect fun Throwable.printStackTrace()

expect fun randomWait()
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package kotlinx.coroutines

/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

class DefaultDispatcherConcurrencyTest : AbstractDispatcherConcurrencyTest() {
override val dispatcher: CoroutineDispatcher = Dispatchers.Default
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package kotlinx.coroutines

import kotlin.test.*

/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

class SingleThreadDispatcherConcurrencyTest : AbstractDispatcherConcurrencyTest() {
override val dispatcher: CoroutineDispatcher = newSingleThreadContext("SingleThreadDispatcherConcurrencyTest")

@AfterTest
fun shutDown() = (dispatcher as SingleThreadDispatcher).close()
}
7 changes: 6 additions & 1 deletion kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,14 @@ import java.util.concurrent.atomic.AtomicInteger
* @param name the base name of the created thread.
*/
@ObsoleteCoroutinesApi
public fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher =
public actual fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher =
newFixedThreadPoolContext(1, name)

/**
* A coroutine dispatcher that is confined to a single thread.
*/
public actual typealias SingleThreadDispatcher = ExecutorCoroutineDispatcher

/**
* Creates a coroutine execution context with the fixed-size thread-pool and built-in [yield] support.
* **NOTE: The resulting [ExecutorCoroutineDispatcher] owns native resources (its threads).
Expand Down
29 changes: 29 additions & 0 deletions kotlinx-coroutines-core/jvm/test/ConcurrentTestUtilities.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.exceptions

import kotlin.random.*

actual fun randomWait() {
val n = Random.nextInt(1000)
if (n < 500) return // no wait 50% of time
repeat(n) {
BlackHole.sink *= 3
}
if (n > 900) Thread.yield()
}

private object BlackHole {
@Volatile
var sink = 1
}


@Suppress("ACTUAL_WITHOUT_EXPECT")
internal actual typealias SuppressSupportingThrowable = Throwable

@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
actual fun Throwable.printStackTrace() = printStackTrace()

2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/jvm/test/exceptions/Exceptions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import kotlin.test.*
* but run only under JDK 1.8
*/
@Suppress("ConflictingExtensionProperty")
val Throwable.suppressed: Array<Throwable> get() {
actual val Throwable.suppressed: Array<Throwable> get() {
val method = this::class.java.getMethod("getSuppressed") ?: error("This test can only be run using JDK 1.7")
@Suppress("UNCHECKED_CAST")
return method.invoke(this) as Array<Throwable>
Expand Down
12 changes: 5 additions & 7 deletions kotlinx-coroutines-core/native/src/Builders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package kotlinx.coroutines
import kotlinx.cinterop.*
import platform.posix.*
import kotlin.coroutines.*
import kotlin.native.concurrent.*

/**
* Runs new coroutine and **blocks** current thread _interruptibly_ until its completion.
Expand All @@ -33,7 +34,7 @@ import kotlin.coroutines.*
public actual fun <T> runBlocking(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T {
val contextInterceptor = context[ContinuationInterceptor]
val eventLoop: EventLoop?
var newContext: CoroutineContext = context // todo: kludge for data flow analysis error
val newContext: CoroutineContext
if (contextInterceptor == null) {
// create or use private event loop if no dispatcher is specified
eventLoop = ThreadLocalEventLoop.eventLoop
Expand All @@ -57,24 +58,21 @@ private class BlockingCoroutine<T>(
override val isScopedCoroutine: Boolean get() = true

@Suppress("UNCHECKED_CAST")
fun joinBlocking(): T = memScoped {
fun joinBlocking(): T {
try {
eventLoop?.incrementUseCount()
val timespec = alloc<timespec>()
while (true) {
val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
// note: process next even may loose unpark flag, so check if completed before parking
if (isCompleted) break
timespec.tv_sec = (parkNanos / 1000000000L).convert() // 1e9 ns -> sec
timespec.tv_nsec = (parkNanos % 1000000000L).convert() // % 1e9
nanosleep(timespec.ptr, null)
Worker.current.park(parkNanos / 1000L)
}
} finally { // paranoia
eventLoop?.decrementUseCount()
}
// now return result
val state = state.unboxState()
(state as? CompletedExceptionally)?.let { throw it.cause }
state as T
return state as T
}
}
18 changes: 9 additions & 9 deletions kotlinx-coroutines-core/native/src/CoroutineContext.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@ import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.native.concurrent.*

private fun takeEventLoop(): EventLoopImpl =
ThreadLocalEventLoop.currentOrNull() as? EventLoopImpl ?:
error("There is no event loop. Use runBlocking { ... } to start one.")

internal actual object DefaultExecutor : CoroutineDispatcher(), Delay {

private val delegate = SingleThreadDispatcherImpl(name = "Dispatchers.Default")

override fun dispatch(context: CoroutineContext, block: Runnable) =
takeEventLoop().dispatch(context, block)
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) =
takeEventLoop().scheduleResumeAfterDelay(timeMillis, continuation)
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
takeEventLoop().invokeOnTimeout(timeMillis, block, context)
delegate.dispatch(context, block)

override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) = delegate.scheduleResumeAfterDelay(timeMillis, continuation)

override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = delegate.invokeOnTimeout(timeMillis, block, context)

actual fun enqueue(task: Runnable): Unit = loopWasShutDown()
actual fun enqueue(task: Runnable): Unit = delegate.dispatch(EmptyCoroutineContext, task)
}

internal fun loopWasShutDown(): Nothing = error("Cannot execute task because event loop was shut down")
Expand Down
19 changes: 15 additions & 4 deletions kotlinx-coroutines-core/native/src/EventLoop.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,24 @@

package kotlinx.coroutines

import kotlinx.cinterop.*
import platform.posix.*
import kotlin.coroutines.*
import kotlin.native.concurrent.*
import kotlin.system.*

internal actual abstract class EventLoopImplPlatform: EventLoop() {
protected actual fun unpark() { /* does nothing */ }
protected actual fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask): Unit =
loopWasShutDown()
internal actual abstract class EventLoopImplPlatform : EventLoop() {

private val current = Worker.current

protected actual fun unpark() {
current.execute(TransferMode.SAFE, {}) {} // send an empty task to unpark the waiting event loop
}

protected actual fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask): Unit {
DefaultExecutor.invokeOnTimeout(now, delayedTask, EmptyCoroutineContext)
Unit
}
}

internal class EventLoopImpl: EventLoopImplBase() {
Expand Down
7 changes: 6 additions & 1 deletion kotlinx-coroutines-core/native/src/Exceptions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package kotlinx.coroutines

import kotlinx.coroutines.internal.*
import kotlinx.coroutines.internal.SuppressSupportingThrowableImpl

/**
* Thrown by cancellable suspending functions if the [Job] of the coroutine is cancelled while it is suspending.
* It indicates _normal_ cancellation of a coroutine.
Expand Down Expand Up @@ -31,7 +34,9 @@ internal actual class JobCancellationException public actual constructor(
}

@Suppress("NOTHING_TO_INLINE")
internal actual inline fun Throwable.addSuppressedThrowable(other: Throwable) { /* empty */ }
internal actual inline fun Throwable.addSuppressedThrowable(other: Throwable) {
if (this is SuppressSupportingThrowableImpl) addSuppressed(other)
}

// For use in tests
internal actual val RECOVER_STACK_TRACES: Boolean = false
Loading