Skip to content

Commit 36a10b9

Browse files
committed
More thread-safety in Native counterpart and one more test from native-mt
1 parent 0f900de commit 36a10b9

File tree

8 files changed

+164
-6
lines changed

8 files changed

+164
-6
lines changed
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlinx.coroutines.exceptions.*
8+
import kotlin.test.*
9+
10+
class ConcurrentExceptionsStressTest : TestBase() {
11+
private val nWorkers = 4
12+
private val nRepeat = 1000 * stressTestMultiplier
13+
14+
private val workers = Array(nWorkers) { index ->
15+
newSingleThreadContext("JobExceptionsStressTest-$index")
16+
}
17+
18+
@AfterTest
19+
fun tearDown() {
20+
workers.forEach {
21+
it.close()
22+
}
23+
}
24+
25+
@Test
26+
fun testStress() = runTest {
27+
repeat(nRepeat) {
28+
testOnce()
29+
}
30+
}
31+
32+
@Suppress("SuspendFunctionOnCoroutineScope") // workaround native inline fun stacktraces
33+
private suspend fun CoroutineScope.testOnce() {
34+
val deferred = async(NonCancellable) {
35+
repeat(nWorkers) { index ->
36+
// Always launch a coroutine even if parent job was already cancelled (atomic start)
37+
launch(workers[index], start = CoroutineStart.ATOMIC) {
38+
randomWait()
39+
throw StressException(index)
40+
}
41+
}
42+
}
43+
deferred.join()
44+
assertTrue(deferred.isCancelled)
45+
val completionException = deferred.getCompletionExceptionOrNull()
46+
val cause = completionException as? StressException
47+
?: unexpectedException("completion", completionException)
48+
val suppressed = cause.suppressed
49+
val indices = listOf(cause.index) + suppressed.mapIndexed { index, e ->
50+
(e as? StressException)?.index ?: unexpectedException("suppressed $index", e)
51+
}
52+
repeat(nWorkers) { index ->
53+
assertTrue(index in indices, "Exception $index is missing: $indices")
54+
}
55+
assertEquals(nWorkers, indices.size, "Duplicated exceptions in list: $indices")
56+
}
57+
58+
private fun unexpectedException(msg: String, e: Throwable?): Nothing {
59+
e?.printStackTrace()
60+
throw IllegalStateException("Unexpected $msg exception", e)
61+
}
62+
63+
private class StressException(val index: Int) : SuppressSupportingThrowable()
64+
}
65+
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.exceptions
6+
7+
internal expect open class SuppressSupportingThrowable() : Throwable
8+
expect val Throwable.suppressed: Array<Throwable>
9+
expect fun Throwable.printStackTrace()
10+
11+
expect fun randomWait()
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.exceptions
6+
7+
import kotlin.random.*
8+
9+
actual fun randomWait() {
10+
val n = Random.nextInt(1000)
11+
if (n < 500) return // no wait 50% of time
12+
repeat(n) {
13+
BlackHole.sink *= 3
14+
}
15+
if (n > 900) Thread.yield()
16+
}
17+
18+
private object BlackHole {
19+
@Volatile
20+
var sink = 1
21+
}
22+
23+
24+
@Suppress("ACTUAL_WITHOUT_EXPECT")
25+
internal actual typealias SuppressSupportingThrowable = Throwable
26+
27+
@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
28+
actual fun Throwable.printStackTrace() = printStackTrace()
29+

kotlinx-coroutines-core/jvm/test/exceptions/Exceptions.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import kotlin.test.*
1515
* but run only under JDK 1.8
1616
*/
1717
@Suppress("ConflictingExtensionProperty")
18-
val Throwable.suppressed: Array<Throwable> get() {
18+
actual val Throwable.suppressed: Array<Throwable> get() {
1919
val method = this::class.java.getMethod("getSuppressed") ?: error("This test can only be run using JDK 1.7")
2020
@Suppress("UNCHECKED_CAST")
2121
return method.invoke(this) as Array<Throwable>

kotlinx-coroutines-core/native/src/Exceptions.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55
package kotlinx.coroutines
66

7+
import kotlinx.coroutines.internal.*
8+
import kotlinx.coroutines.internal.SuppressSupportingThrowableImpl
9+
710
/**
811
* Thrown by cancellable suspending functions if the [Job] of the coroutine is cancelled while it is suspending.
912
* It indicates _normal_ cancellation of a coroutine.
@@ -31,7 +34,9 @@ internal actual class JobCancellationException public actual constructor(
3134
}
3235

3336
@Suppress("NOTHING_TO_INLINE")
34-
internal actual inline fun Throwable.addSuppressedThrowable(other: Throwable) { /* empty */ }
37+
internal actual inline fun Throwable.addSuppressedThrowable(other: Throwable) {
38+
if (this is SuppressSupportingThrowableImpl) addSuppressed(other)
39+
}
3540

3641
// For use in tests
3742
internal actual val RECOVER_STACK_TRACES: Boolean = false

kotlinx-coroutines-core/native/src/internal/Concurrent.kt

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package kotlinx.coroutines.internal
66

7+
import kotlinx.atomicfu.*
78
import kotlinx.atomicfu.locks.withLock as withLock2
89

910
@Suppress("ACTUAL_WITHOUT_EXPECT")
@@ -14,3 +15,17 @@ internal actual inline fun <T> ReentrantLock.withLock(action: () -> T): T = this
1415
internal actual fun <E> subscriberList(): MutableList<E> = CopyOnWriteList<E>()
1516

1617
internal actual fun <E> identitySet(expectedSize: Int): MutableSet<E> = HashSet()
18+
19+
// "Suppress-supporting throwable" is currently used for tests only
20+
internal open class SuppressSupportingThrowableImpl : Throwable() {
21+
private val _suppressed = atomic<Array<Throwable>>(emptyArray())
22+
23+
val suppressed: Array<Throwable>
24+
get() = _suppressed.value
25+
26+
fun addSuppressed(other: Throwable) {
27+
_suppressed.update { current ->
28+
current + other
29+
}
30+
}
31+
}

kotlinx-coroutines-core/native/src/internal/Synchronized.kt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,15 @@
55
package kotlinx.coroutines.internal
66

77
import kotlinx.coroutines.*
8-
8+
import kotlinx.atomicfu.locks.withLock as withLock2
99
/**
1010
* @suppress **This an internal API and should not be used from general code.**
1111
*/
1212
@InternalCoroutinesApi
13-
public actual typealias SynchronizedObject = Any
13+
public actual typealias SynchronizedObject = kotlinx.atomicfu.locks.SynchronizedObject
1414

1515
/**
1616
* @suppress **This an internal API and should not be used from general code.**
1717
*/
1818
@InternalCoroutinesApi
19-
public actual inline fun <T> synchronized(lock: SynchronizedObject, block: () -> T): T =
20-
block()
19+
public actual inline fun <T> synchronized(lock: SynchronizedObject, block: () -> T): T = lock.withLock2(block)
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.exceptions
6+
7+
import kotlinx.atomicfu.*
8+
import kotlinx.coroutines.internal.*
9+
import platform.posix.*
10+
import kotlin.native.concurrent.*
11+
import kotlin.random.*
12+
13+
actual fun randomWait() {
14+
val n = Random.nextInt(1000)
15+
if (n < 500) return // no wait 50% of time
16+
repeat(n) {
17+
BlackHole.sink *= 3
18+
}
19+
if (n > 900) sched_yield()
20+
}
21+
22+
@ThreadLocal
23+
private object BlackHole {
24+
var sink = 1
25+
}
26+
27+
internal actual typealias SuppressSupportingThrowable = SuppressSupportingThrowableImpl
28+
29+
actual val Throwable.suppressed: Array<Throwable>
30+
get() = (this as? SuppressSupportingThrowableImpl)?.suppressed ?: emptyArray()
31+
32+
@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
33+
actual fun Throwable.printStackTrace() = printStackTrace()
34+

0 commit comments

Comments
 (0)