Skip to content

Commit 50b510e

Browse files
committed
Tests and proper implementation of Native runBlocking (#2855)
1 parent 83eb7ea commit 50b510e

11 files changed

+105
-74
lines changed

kotlinx-coroutines-core/concurrent/test/ConcurrentTestUtilities.common.kt

+2
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,5 @@ expect val Throwable.suppressed: Array<Throwable>
99
expect fun Throwable.printStackTrace()
1010

1111
expect fun randomWait()
12+
13+
expect fun currentThreadName(): String

kotlinx-coroutines-core/concurrent/test/DefaultDispatcherConcurrencyTest.kt

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
package kotlinx.coroutines
2-
31
/*
42
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
53
*/
4+
package kotlinx.coroutines
65

76
class DefaultDispatcherConcurrencyTest : AbstractDispatcherConcurrencyTest() {
87
override val dispatcher: CoroutineDispatcher = Dispatchers.Default

kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt renamed to kotlinx-coroutines-core/concurrent/test/RunBlockingTest.kt

+41-45
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
/*
22
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
4-
54
package kotlinx.coroutines
65

6+
import kotlinx.coroutines.exceptions.*
77
import kotlin.coroutines.*
88
import kotlin.test.*
99

@@ -59,19 +59,19 @@ class RunBlockingTest : TestBase() {
5959
runBlocking(thread) {
6060
expect(2)
6161
assertSame(coroutineContext[ContinuationInterceptor], thread)
62-
assertTrue(Thread.currentThread().name.contains(name))
62+
assertTrue(currentThreadName().contains(name))
6363
yield() // should work
6464
expect(3)
6565
}
6666
finish(4)
6767
thread.close()
6868
}
6969

70-
7170
@Test
72-
fun testCancellation() = newFixedThreadPoolContext(2, "testCancellation").use {
73-
val job = GlobalScope.launch(it) {
74-
runBlocking(coroutineContext) {
71+
fun testCancellation() {
72+
val ctx = newSingleThreadContext("testCancellation")
73+
val job = GlobalScope.launch {
74+
runBlocking(coroutineContext + ctx) {
7575
while (true) {
7676
yield()
7777
}
@@ -81,6 +81,7 @@ class RunBlockingTest : TestBase() {
8181
runBlocking {
8282
job.cancelAndJoin()
8383
}
84+
ctx.close()
8485
}
8586

8687
@Test
@@ -104,40 +105,44 @@ class RunBlockingTest : TestBase() {
104105
}
105106
}
106107

107-
@Test(expected = CancellationException::class)
108-
fun testDispatchOnShutdown() = runBlocking<Unit> {
109-
expect(1)
110-
val job = launch(NonCancellable) {
111-
try {
112-
expect(2)
113-
delay(Long.MAX_VALUE)
114-
} finally {
115-
finish(4)
108+
@Test
109+
fun testDispatchOnShutdown(): Unit = assertFailsWith<CancellationException> {
110+
runBlocking<Unit> {
111+
expect(1)
112+
val job = launch(NonCancellable) {
113+
try {
114+
expect(2)
115+
delay(Long.MAX_VALUE)
116+
} finally {
117+
finish(4)
118+
}
116119
}
117-
}
118120

119-
yield()
120-
expect(3)
121-
coroutineContext.cancel()
122-
job.cancel()
123-
}
121+
yield()
122+
expect(3)
123+
coroutineContext.cancel()
124+
job.cancel()
125+
}
126+
}.let { }
124127

125-
@Test(expected = CancellationException::class)
126-
fun testDispatchOnShutdown2() = runBlocking<Unit> {
127-
coroutineContext.cancel()
128-
expect(1)
129-
val job = launch(NonCancellable, start = CoroutineStart.UNDISPATCHED) {
130-
try {
131-
expect(2)
132-
delay(Long.MAX_VALUE)
133-
} finally {
134-
finish(4)
128+
@Test
129+
fun testDispatchOnShutdown2(): Unit = assertFailsWith<CancellationException> {
130+
runBlocking<Unit> {
131+
coroutineContext.cancel()
132+
expect(1)
133+
val job = launch(NonCancellable, start = CoroutineStart.UNDISPATCHED) {
134+
try {
135+
expect(2)
136+
delay(Long.MAX_VALUE)
137+
} finally {
138+
finish(4)
139+
}
135140
}
136-
}
137141

138-
expect(3)
139-
job.cancel()
140-
}
142+
expect(3)
143+
job.cancel()
144+
}
145+
}.let { }
141146

142147
@Test
143148
fun testNestedRunBlocking() = runBlocking {
@@ -157,21 +162,12 @@ class RunBlockingTest : TestBase() {
157162
fun testIncompleteState() {
158163
val handle = runBlocking {
159164
// See #835
160-
coroutineContext[Job]!!.invokeOnCompletion { }
165+
coroutineContext[Job]!!.invokeOnCompletion { }
161166
}
162167

163168
handle.dispose()
164169
}
165170

166-
@Test
167-
fun testContract() {
168-
val rb: Int
169-
runBlocking {
170-
rb = 42
171-
}
172-
rb.hashCode() // unused
173-
}
174-
175171
@Test
176172
fun testCancelledParent() {
177173
val job = Job()

kotlinx-coroutines-core/jvm/test/ConcurrentTestUtilities.kt

+1
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,4 @@ internal actual typealias SuppressSupportingThrowable = Throwable
2727
@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
2828
actual fun Throwable.printStackTrace() = printStackTrace()
2929

30+
actual fun currentThreadName(): String = Thread.currentThread().name
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
package kotlinx.coroutines
5+
6+
import org.junit.*
7+
8+
class RunBlockingJvmTest : TestBase() {
9+
@Test
10+
fun testContract() {
11+
val rb: Int
12+
runBlocking {
13+
rb = 42
14+
}
15+
rb.hashCode() // unused
16+
}
17+
18+
@Test
19+
fun testCancellation() = newFixedThreadPoolContext(2, "testCancellation").use {
20+
val job = GlobalScope.launch(it) {
21+
runBlocking(coroutineContext) {
22+
while (true) {
23+
yield()
24+
}
25+
}
26+
}
27+
28+
runBlocking {
29+
job.cancelAndJoin()
30+
}
31+
}
32+
}
33+

kotlinx-coroutines-core/native/src/Builders.kt

+16-1
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

5+
@file:OptIn(ExperimentalContracts::class)
56
package kotlinx.coroutines
67

78
import kotlinx.cinterop.*
89
import platform.posix.*
10+
import kotlin.contracts.*
911
import kotlin.coroutines.*
1012
import kotlin.native.concurrent.*
1113

@@ -32,6 +34,9 @@ import kotlin.native.concurrent.*
3234
* @param block the coroutine code.
3335
*/
3436
public actual fun <T> runBlocking(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T {
37+
contract {
38+
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
39+
}
3540
val contextInterceptor = context[ContinuationInterceptor]
3641
val eventLoop: EventLoop?
3742
val newContext: CoroutineContext
@@ -55,8 +60,18 @@ private class BlockingCoroutine<T>(
5560
parentContext: CoroutineContext,
5661
private val eventLoop: EventLoop?
5762
) : AbstractCoroutine<T>(parentContext, true, true) {
63+
private val joinWorker = Worker.current
64+
5865
override val isScopedCoroutine: Boolean get() = true
5966

67+
override fun afterCompletion(state: Any?) {
68+
// wake up blocked thread
69+
if (joinWorker != Worker.current) {
70+
// Unpark waiting worker
71+
joinWorker.execute(TransferMode.SAFE, {}) {} // send an empty task to unpark the waiting event loop
72+
}
73+
}
74+
6075
@Suppress("UNCHECKED_CAST")
6176
fun joinBlocking(): T {
6277
try {
@@ -65,7 +80,7 @@ private class BlockingCoroutine<T>(
6580
val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
6681
// note: process next even may loose unpark flag, so check if completed before parking
6782
if (isCompleted) break
68-
Worker.current.park(parkNanos / 1000L)
83+
joinWorker.park(parkNanos / 1000L)
6984
}
7085
} finally { // paranoia
7186
eventLoop?.decrementUseCount()

kotlinx-coroutines-core/native/src/CoroutineContext.kt

-3
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,9 @@ internal actual object DefaultExecutor : CoroutineDispatcher(), Delay {
2323
actual fun enqueue(task: Runnable): Unit = delegate.dispatch(EmptyCoroutineContext, task)
2424
}
2525

26-
internal fun loopWasShutDown(): Nothing = error("Cannot execute task because event loop was shut down")
27-
2826
internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
2927
DefaultExecutor
3028

31-
@SharedImmutable
3229
internal actual val DefaultDelay: Delay = DefaultExecutor
3330

3431
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {

kotlinx-coroutines-core/native/src/EventLoop.kt

+3-2
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ internal actual abstract class EventLoopImplPlatform : EventLoop() {
2525
}
2626

2727
internal class EventLoopImpl: EventLoopImplBase() {
28-
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
29-
scheduleInvokeOnTimeout(timeMillis, block)
28+
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
29+
return DefaultDelay.invokeOnTimeout(timeMillis, block, context)
30+
}
3031
}
3132

3233
internal actual fun createEventLoop(): EventLoop = EventLoopImpl()

kotlinx-coroutines-core/native/src/SingleThread.kt

+7-3
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,22 @@ internal class SingleThreadDispatcherImpl(name: String) : SingleThreadDispatcher
2525
worker.executeAfter(0L) { block.run() }
2626

2727
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
28-
// TODO proper toMicros
29-
worker.executeAfter(timeMillis * 1000)
28+
worker.executeAfter(timeMillis.toMicrosSafe())
3029
{ with(continuation) { resumeUndispatched(Unit) } }
3130
}
3231

3332
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
3433
// No API to cancel on timeout
35-
worker.executeAfter(timeMillis * 1000) { block.run() }
34+
worker.executeAfter(timeMillis.toMicrosSafe()) { block.run() }
3635
return NonDisposableHandle
3736
}
3837

3938
override fun close() {
4039
worker.requestTermination().result // Note: calling "result" blocks
4140
}
41+
42+
private fun Long.toMicrosSafe(): Long {
43+
val result = this * 1000
44+
return if (result > this) result else Long.MAX_VALUE
45+
}
4246
}

kotlinx-coroutines-core/native/test/ConcurrentTestUtilities.kt

+1
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,4 @@ actual val Throwable.suppressed: Array<Throwable>
3232
@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
3333
actual fun Throwable.printStackTrace() = printStackTrace()
3434

35+
actual fun currentThreadName(): String = Worker.current.name

kotlinx-coroutines-core/native/test/RunBlockingTest.kt

-18
This file was deleted.

0 commit comments

Comments
 (0)