Skip to content

Commit bfbb33e

Browse files
committed
Dispatchers.Default backed by worker (#2853)
* Dispatchers.Default backed by worker * Introduce newSingleThreadDispatcher * Use proper reentrant locking and CoW arrays on new memory model, make TestBase _almost_ race-free * More thread-safety in Native counterpart and one more test from native-mt * Properly reschedule closed event loop in K/N
1 parent f1f66ef commit bfbb33e

23 files changed

+389
-123
lines changed

kotlinx-coroutines-core/common/test/EmptyContext.kt

+2-6
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,6 @@ package kotlinx.coroutines
77
import kotlinx.coroutines.intrinsics.*
88
import kotlin.coroutines.*
99

10-
suspend fun <T> withEmptyContext(block: suspend () -> T): T {
11-
val baseline = Result.failure<T>(IllegalStateException("Block was suspended"))
12-
var result: Result<T> = baseline
13-
block.startCoroutineUnintercepted(Continuation(EmptyCoroutineContext) { result = it })
14-
while (result == baseline) yield()
15-
return result.getOrThrow()
10+
suspend fun <T> withEmptyContext(block: suspend () -> T): T = suspendCoroutine { cont ->
11+
block.startCoroutineUnintercepted(Continuation(EmptyCoroutineContext) { cont.resumeWith(it) })
1612
}

kotlinx-coroutines-core/common/test/flow/FlowInvariantsTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ class FlowInvariantsTest : TestBase() {
235235
}
236236
expectUnreached()
237237
} catch (e: IllegalStateException) {
238-
assertTrue(e.message!!.contains("Flow invariant is violated"))
238+
assertTrue(e.message!!.contains("Flow invariant is violated"), "But had: ${e.message}")
239239
finish(2)
240240
}
241241
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
@ExperimentalCoroutinesApi
8+
public expect fun newSingleThreadContext(name: String): SingleThreadDispatcher
9+
10+
/**
11+
* A coroutine dispatcher that is confined to a single thread.
12+
*/
13+
@ExperimentalCoroutinesApi
14+
public expect abstract class SingleThreadDispatcher : CoroutineDispatcher {
15+
16+
/**
17+
* Closes this coroutine dispatcher and shuts down its thread.
18+
*/
19+
public abstract fun close()
20+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package kotlinx.coroutines
2+
3+
import kotlinx.coroutines.channels.*
4+
import kotlin.test.*
5+
6+
/*
7+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
8+
*/
9+
10+
abstract class AbstractDispatcherConcurrencyTest : TestBase() {
11+
12+
public abstract val dispatcher: CoroutineDispatcher
13+
14+
@Test
15+
fun testLaunchAndJoin() {
16+
expect(1)
17+
var capturedMutableState = 0
18+
val job = GlobalScope.launch(dispatcher) {
19+
++capturedMutableState
20+
expect(2)
21+
}
22+
runBlocking { job.join() }
23+
assertEquals(1, capturedMutableState)
24+
finish(3)
25+
}
26+
27+
@Test
28+
fun testDispatcherIsActuallyMultithreaded() {
29+
val channel = Channel<Int>()
30+
GlobalScope.launch(dispatcher) {
31+
channel.send(42)
32+
}
33+
34+
var result = ChannelResult.failure<Int>()
35+
while (!result.isSuccess) {
36+
result = channel.tryReceive()
37+
// Block the thread, wait
38+
}
39+
// Delivery was successful, let's check it
40+
assertEquals(42, result.getOrThrow())
41+
}
42+
43+
@Test
44+
fun testDelayInDefaultDispatcher() {
45+
expect(1)
46+
val job = GlobalScope.launch(dispatcher) {
47+
expect(2)
48+
delay(100)
49+
expect(3)
50+
}
51+
runBlocking { job.join() }
52+
finish(4)
53+
}
54+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlinx.coroutines.exceptions.*
8+
import kotlin.test.*
9+
10+
class ConcurrentExceptionsStressTest : TestBase() {
11+
private val nWorkers = 4
12+
private val nRepeat = 1000 * stressTestMultiplier
13+
14+
private val workers = Array(nWorkers) { index ->
15+
newSingleThreadContext("JobExceptionsStressTest-$index")
16+
}
17+
18+
@AfterTest
19+
fun tearDown() {
20+
workers.forEach {
21+
it.close()
22+
}
23+
}
24+
25+
@Test
26+
fun testStress() = runTest {
27+
repeat(nRepeat) {
28+
testOnce()
29+
}
30+
}
31+
32+
@Suppress("SuspendFunctionOnCoroutineScope") // workaround native inline fun stacktraces
33+
private suspend fun CoroutineScope.testOnce() {
34+
val deferred = async(NonCancellable) {
35+
repeat(nWorkers) { index ->
36+
// Always launch a coroutine even if parent job was already cancelled (atomic start)
37+
launch(workers[index], start = CoroutineStart.ATOMIC) {
38+
randomWait()
39+
throw StressException(index)
40+
}
41+
}
42+
}
43+
deferred.join()
44+
assertTrue(deferred.isCancelled)
45+
val completionException = deferred.getCompletionExceptionOrNull()
46+
val cause = completionException as? StressException
47+
?: unexpectedException("completion", completionException)
48+
val suppressed = cause.suppressed
49+
val indices = listOf(cause.index) + suppressed.mapIndexed { index, e ->
50+
(e as? StressException)?.index ?: unexpectedException("suppressed $index", e)
51+
}
52+
repeat(nWorkers) { index ->
53+
assertTrue(index in indices, "Exception $index is missing: $indices")
54+
}
55+
assertEquals(nWorkers, indices.size, "Duplicated exceptions in list: $indices")
56+
}
57+
58+
private fun unexpectedException(msg: String, e: Throwable?): Nothing {
59+
e?.printStackTrace()
60+
throw IllegalStateException("Unexpected $msg exception", e)
61+
}
62+
63+
private class StressException(val index: Int) : SuppressSupportingThrowable()
64+
}
65+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.exceptions
6+
7+
internal expect open class SuppressSupportingThrowable() : Throwable
8+
expect val Throwable.suppressed: Array<Throwable>
9+
expect fun Throwable.printStackTrace()
10+
11+
expect fun randomWait()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package kotlinx.coroutines
2+
3+
/*
4+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
5+
*/
6+
7+
class DefaultDispatcherConcurrencyTest : AbstractDispatcherConcurrencyTest() {
8+
override val dispatcher: CoroutineDispatcher = Dispatchers.Default
9+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package kotlinx.coroutines
2+
3+
import kotlin.test.*
4+
5+
/*
6+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
7+
*/
8+
9+
class SingleThreadDispatcherConcurrencyTest : AbstractDispatcherConcurrencyTest() {
10+
override val dispatcher: CoroutineDispatcher = newSingleThreadContext("SingleThreadDispatcherConcurrencyTest")
11+
12+
@AfterTest
13+
fun shutDown() = (dispatcher as SingleThreadDispatcher).close()
14+
}

kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt

+6-1
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,14 @@ import java.util.concurrent.atomic.AtomicInteger
3030
* @param name the base name of the created thread.
3131
*/
3232
@ObsoleteCoroutinesApi
33-
public fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher =
33+
public actual fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher =
3434
newFixedThreadPoolContext(1, name)
3535

36+
/**
37+
* A coroutine dispatcher that is confined to a single thread.
38+
*/
39+
public actual typealias SingleThreadDispatcher = ExecutorCoroutineDispatcher
40+
3641
/**
3742
* Creates a coroutine execution context with the fixed-size thread-pool and built-in [yield] support.
3843
* **NOTE: The resulting [ExecutorCoroutineDispatcher] owns native resources (its threads).
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.exceptions
6+
7+
import kotlin.random.*
8+
9+
actual fun randomWait() {
10+
val n = Random.nextInt(1000)
11+
if (n < 500) return // no wait 50% of time
12+
repeat(n) {
13+
BlackHole.sink *= 3
14+
}
15+
if (n > 900) Thread.yield()
16+
}
17+
18+
private object BlackHole {
19+
@Volatile
20+
var sink = 1
21+
}
22+
23+
24+
@Suppress("ACTUAL_WITHOUT_EXPECT")
25+
internal actual typealias SuppressSupportingThrowable = Throwable
26+
27+
@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
28+
actual fun Throwable.printStackTrace() = printStackTrace()
29+

kotlinx-coroutines-core/jvm/test/exceptions/Exceptions.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import kotlin.test.*
1515
* but run only under JDK 1.8
1616
*/
1717
@Suppress("ConflictingExtensionProperty")
18-
val Throwable.suppressed: Array<Throwable> get() {
18+
actual val Throwable.suppressed: Array<Throwable> get() {
1919
val method = this::class.java.getMethod("getSuppressed") ?: error("This test can only be run using JDK 1.7")
2020
@Suppress("UNCHECKED_CAST")
2121
return method.invoke(this) as Array<Throwable>

kotlinx-coroutines-core/native/src/Builders.kt

+5-7
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package kotlinx.coroutines
77
import kotlinx.cinterop.*
88
import platform.posix.*
99
import kotlin.coroutines.*
10+
import kotlin.native.concurrent.*
1011

1112
/**
1213
* Runs new coroutine and **blocks** current thread _interruptibly_ until its completion.
@@ -33,7 +34,7 @@ import kotlin.coroutines.*
3334
public actual fun <T> runBlocking(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T {
3435
val contextInterceptor = context[ContinuationInterceptor]
3536
val eventLoop: EventLoop?
36-
var newContext: CoroutineContext = context // todo: kludge for data flow analysis error
37+
val newContext: CoroutineContext
3738
if (contextInterceptor == null) {
3839
// create or use private event loop if no dispatcher is specified
3940
eventLoop = ThreadLocalEventLoop.eventLoop
@@ -57,24 +58,21 @@ private class BlockingCoroutine<T>(
5758
override val isScopedCoroutine: Boolean get() = true
5859

5960
@Suppress("UNCHECKED_CAST")
60-
fun joinBlocking(): T = memScoped {
61+
fun joinBlocking(): T {
6162
try {
6263
eventLoop?.incrementUseCount()
63-
val timespec = alloc<timespec>()
6464
while (true) {
6565
val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
6666
// note: process next even may loose unpark flag, so check if completed before parking
6767
if (isCompleted) break
68-
timespec.tv_sec = (parkNanos / 1000000000L).convert() // 1e9 ns -> sec
69-
timespec.tv_nsec = (parkNanos % 1000000000L).convert() // % 1e9
70-
nanosleep(timespec.ptr, null)
68+
Worker.current.park(parkNanos / 1000L)
7169
}
7270
} finally { // paranoia
7371
eventLoop?.decrementUseCount()
7472
}
7573
// now return result
7674
val state = state.unboxState()
7775
(state as? CompletedExceptionally)?.let { throw it.cause }
78-
state as T
76+
return state as T
7977
}
8078
}

kotlinx-coroutines-core/native/src/CoroutineContext.kt

+9-9
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,19 @@ import kotlinx.coroutines.internal.*
88
import kotlin.coroutines.*
99
import kotlin.native.concurrent.*
1010

11-
private fun takeEventLoop(): EventLoopImpl =
12-
ThreadLocalEventLoop.currentOrNull() as? EventLoopImpl ?:
13-
error("There is no event loop. Use runBlocking { ... } to start one.")
1411

1512
internal actual object DefaultExecutor : CoroutineDispatcher(), Delay {
13+
14+
private val delegate = SingleThreadDispatcherImpl(name = "Dispatchers.Default")
15+
1616
override fun dispatch(context: CoroutineContext, block: Runnable) =
17-
takeEventLoop().dispatch(context, block)
18-
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) =
19-
takeEventLoop().scheduleResumeAfterDelay(timeMillis, continuation)
20-
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
21-
takeEventLoop().invokeOnTimeout(timeMillis, block, context)
17+
delegate.dispatch(context, block)
18+
19+
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) = delegate.scheduleResumeAfterDelay(timeMillis, continuation)
20+
21+
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = delegate.invokeOnTimeout(timeMillis, block, context)
2222

23-
actual fun enqueue(task: Runnable): Unit = loopWasShutDown()
23+
actual fun enqueue(task: Runnable): Unit = delegate.dispatch(EmptyCoroutineContext, task)
2424
}
2525

2626
internal fun loopWasShutDown(): Nothing = error("Cannot execute task because event loop was shut down")

kotlinx-coroutines-core/native/src/EventLoop.kt

+15-4
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,24 @@
44

55
package kotlinx.coroutines
66

7+
import kotlinx.cinterop.*
8+
import platform.posix.*
79
import kotlin.coroutines.*
10+
import kotlin.native.concurrent.*
811
import kotlin.system.*
912

10-
internal actual abstract class EventLoopImplPlatform: EventLoop() {
11-
protected actual fun unpark() { /* does nothing */ }
12-
protected actual fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask): Unit =
13-
loopWasShutDown()
13+
internal actual abstract class EventLoopImplPlatform : EventLoop() {
14+
15+
private val current = Worker.current
16+
17+
protected actual fun unpark() {
18+
current.execute(TransferMode.SAFE, {}) {} // send an empty task to unpark the waiting event loop
19+
}
20+
21+
protected actual fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask): Unit {
22+
DefaultExecutor.invokeOnTimeout(now, delayedTask, EmptyCoroutineContext)
23+
Unit
24+
}
1425
}
1526

1627
internal class EventLoopImpl: EventLoopImplBase() {

kotlinx-coroutines-core/native/src/Exceptions.kt

+6-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55
package kotlinx.coroutines
66

7+
import kotlinx.coroutines.internal.*
8+
import kotlinx.coroutines.internal.SuppressSupportingThrowableImpl
9+
710
/**
811
* Thrown by cancellable suspending functions if the [Job] of the coroutine is cancelled while it is suspending.
912
* It indicates _normal_ cancellation of a coroutine.
@@ -31,7 +34,9 @@ internal actual class JobCancellationException public actual constructor(
3134
}
3235

3336
@Suppress("NOTHING_TO_INLINE")
34-
internal actual inline fun Throwable.addSuppressedThrowable(other: Throwable) { /* empty */ }
37+
internal actual inline fun Throwable.addSuppressedThrowable(other: Throwable) {
38+
if (this is SuppressSupportingThrowableImpl) addSuppressed(other)
39+
}
3540

3641
// For use in tests
3742
internal actual val RECOVER_STACK_TRACES: Boolean = false

0 commit comments

Comments
 (0)