Skip to content

Commit 0427205

Browse files
Incorporate new Native memory model into kotlinx-coroutines mainline (#2833)
* Support of new K/N memory model * Dispatchers.Default backed by a pool of workers on Linux and by global_queue on iOS-like * Implementation of Dispatchers.Main that uses the main queue on iOS and default dispatcher on other platforms (#2858) * Introduced newSingleThreadDispatcher and newFixedThreadPoolDispatcher * Use proper reentrant locking and CoW arrays on new memory model, make TestBase _almost_ race-free * More thread-safety in Native counterpart and one more test from native-mt * Source-set sharing for tests shared between JVM and K/N * Wrap Obj-C interop into autorelease pool to avoid memory leaks Fixes #2914 Co-authored-by: dkhalanskyjb <[email protected]>
1 parent 30b057e commit 0427205

File tree

71 files changed

+1299
-616
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+1299
-616
lines changed

buildSrc/src/main/kotlin/SourceSets.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@ fun KotlinSourceSet.configureMultiplatform() {
1616
optInAnnotations.forEach { optIn(it) }
1717
progressiveMode = true
1818
}
19-
}
19+
}

kotlinx-coroutines-core/build.gradle

+44-12
Original file line numberDiff line numberDiff line change
@@ -74,19 +74,50 @@ kotlin {
7474
SourceSetsKt.configureMultiplatform(it)
7575
}
7676

77-
configure(targets) {
78-
// Configure additional binaries and test runs -- one for each OS
79-
if (["macos", "linux", "mingw"].any { name.startsWith(it) }) {
80-
binaries {
81-
// Test for memory leaks using a special entry point that does not exit but returns from main
82-
binaries.getTest("DEBUG").freeCompilerArgs += ["-e", "kotlinx.coroutines.mainNoExit"]
83-
// Configure a separate test where code runs in background
84-
test("background", [org.jetbrains.kotlin.gradle.plugin.mpp.NativeBuildType.DEBUG]) {
85-
freeCompilerArgs += ["-e", "kotlinx.coroutines.mainBackground"]
86-
}
77+
/*
78+
* Configure four test runs:
79+
* 1) Old memory model, Main thread
80+
* 2) New memory model, Main thread
81+
* 3) Old memory model, BG thread
82+
* 4) New memory model, BG thread (required for Dispatchers.Main tests on Darwin)
83+
*
84+
* All new MM targets are build with optimize = true to have stress tests properly run.
85+
*/
86+
targets.withType(org.jetbrains.kotlin.gradle.plugin.mpp.KotlinNativeTargetWithTests.class).configureEach {
87+
binaries {
88+
// Test for memory leaks using a special entry point that does not exit but returns from main
89+
binaries.getTest("DEBUG").freeCompilerArgs += ["-e", "kotlinx.coroutines.mainNoExit"]
90+
}
91+
92+
binaries.test("newMM", [DEBUG]) {
93+
def thisTest = it
94+
freeCompilerArgs += ["-e", "kotlinx.coroutines.mainNoExit"]
95+
optimized = true
96+
binaryOptions["memoryModel"] = "experimental"
97+
testRuns.create("newMM") {
98+
setExecutionSourceFrom(thisTest)
99+
// A hack to get different suffixes in the aggregated report.
100+
executionTask.configure { targetName = "$targetName new MM" }
101+
}
102+
}
103+
104+
binaries.test("worker", [DEBUG]) {
105+
def thisTest = it
106+
freeCompilerArgs += ["-e", "kotlinx.coroutines.mainBackground"]
107+
testRuns.create("worker") {
108+
setExecutionSourceFrom(thisTest)
109+
executionTask.configure { targetName = "$targetName worker" }
87110
}
88-
testRuns {
89-
background { setExecutionSourceFrom(binaries.backgroundDebugTest) }
111+
}
112+
113+
binaries.test("workerWithNewMM", [DEBUG]) {
114+
def thisTest = it
115+
optimized = true
116+
freeCompilerArgs += ["-e", "kotlinx.coroutines.mainBackground"]
117+
binaryOptions["memoryModel"] = "experimental"
118+
testRuns.create("workerWithNewMM") {
119+
setExecutionSourceFrom(thisTest)
120+
executionTask.configure { targetName = "$targetName worker with new MM" }
90121
}
91122
}
92123
}
@@ -97,6 +128,7 @@ kotlin {
97128
}
98129
}
99130

131+
100132
configurations {
101133
configureKotlinJvmPlatform(kotlinCompilerPluginClasspath)
102134
}

kotlinx-coroutines-core/common/src/EventLoop.common.kt

+12-1
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
276276
// then process one event from queue
277277
val task = dequeue()
278278
if (task != null) {
279-
task.run()
279+
platformAutoreleasePool { task.run() }
280280
return 0
281281
}
282282
return nextTime
@@ -530,3 +530,14 @@ internal expect fun nanoTime(): Long
530530
internal expect object DefaultExecutor {
531531
public fun enqueue(task: Runnable)
532532
}
533+
534+
/**
535+
* Used by Darwin targets to wrap a [Runnable.run] call in an Objective-C Autorelease Pool. It is a no-op on JVM, JS and
536+
* non-Darwin native targets.
537+
*
538+
* Coroutines on Darwin targets can call into the Objective-C world, where a callee may push a to-be-returned object to
539+
* the Autorelease Pool, so as to avoid a premature ARC release before it reaches the caller. This means the pool must
540+
* be eventually drained to avoid leaks. Since Kotlin Coroutines does not use [NSRunLoop], which provides automatic
541+
* pool management, it must manage the pool creation and pool drainage manually.
542+
*/
543+
internal expect inline fun platformAutoreleasePool(crossinline block: () -> Unit)

kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt

+6
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ internal class ArrayBroadcastChannel<E>(
3333
require(capacity >= 1) { "ArrayBroadcastChannel capacity must be at least 1, but $capacity was specified" }
3434
}
3535

36+
/**
37+
* NB: prior to changing any logic of ArrayBroadcastChannel internals, please ensure that
38+
* you do not break internal invariants of the SubscriberList implementation on K/N and KJS
39+
*/
40+
3641
/*
3742
* Writes to buffer are guarded by bufferLock, but reads from buffer are concurrent with writes
3843
* - Write element to buffer then write "tail" (volatile)
@@ -60,6 +65,7 @@ internal class ArrayBroadcastChannel<E>(
6065
get() = _size.value
6166
set(value) { _size.value = value }
6267

68+
@Suppress("DEPRECATION")
6369
private val subscribers = subscriberList<Subscriber<E>>()
6470

6571
override val isBufferAlwaysFull: Boolean get() = false

kotlinx-coroutines-core/common/src/internal/Concurrent.common.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@ package kotlinx.coroutines.internal
88
* Special kind of list intended to be used as collection of subscribers in `ArrayBroadcastChannel`
99
* On JVM it's CopyOnWriteList and on JS it's MutableList.
1010
*
11-
* Note that this alias is intentionally not named as CopyOnWriteList to avoid accidental misusage outside of ArrayBroadcastChannel
11+
* Note that this alias is intentionally not named as CopyOnWriteList to avoid accidental misusage outside of the ArrayBroadcastChannel
1212
*/
1313
internal typealias SubscribersList<E> = MutableList<E>
1414

15+
@Deprecated(message = "Implementation of this primitive is tailored to specific ArrayBroadcastChannel usages on K/N " +
16+
"and K/JS platforms and it is unsafe to use it anywhere else")
1517
internal expect fun <E> subscriberList(): SubscribersList<E>
1618

1719
internal expect class ReentrantLock() {

kotlinx-coroutines-core/common/test/EmptyContext.kt

+2-6
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,6 @@ package kotlinx.coroutines
77
import kotlinx.coroutines.intrinsics.*
88
import kotlin.coroutines.*
99

10-
suspend fun <T> withEmptyContext(block: suspend () -> T): T {
11-
val baseline = Result.failure<T>(IllegalStateException("Block was suspended"))
12-
var result: Result<T> = baseline
13-
block.startCoroutineUnintercepted(Continuation(EmptyCoroutineContext) { result = it })
14-
while (result == baseline) yield()
15-
return result.getOrThrow()
10+
suspend fun <T> withEmptyContext(block: suspend () -> T): T = suspendCoroutine { cont ->
11+
block.startCoroutineUnintercepted(Continuation(EmptyCoroutineContext) { cont.resumeWith(it) })
1612
}

kotlinx-coroutines-core/common/test/TestBase.common.kt

+4
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@
77
package kotlinx.coroutines
88

99
import kotlinx.coroutines.flow.*
10+
import kotlinx.coroutines.internal.*
1011
import kotlin.coroutines.*
1112
import kotlin.test.*
1213

1314
public expect val isStressTest: Boolean
1415
public expect val stressTestMultiplier: Int
16+
public expect val stressTestMultiplierSqrt: Int
1517

1618
/**
1719
* The result of a multiplatform asynchronous test.
@@ -20,6 +22,8 @@ public expect val stressTestMultiplier: Int
2022
@Suppress("NO_ACTUAL_FOR_EXPECT")
2123
public expect class TestResult
2224

25+
public expect val isNative: Boolean
26+
2327
public expect open class TestBase constructor() {
2428
/*
2529
* In common tests we emulate parameterized tests

kotlinx-coroutines-core/common/test/flow/FlowInvariantsTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ class FlowInvariantsTest : TestBase() {
235235
}
236236
expectUnreached()
237237
} catch (e: IllegalStateException) {
238-
assertTrue(e.message!!.contains("Flow invariant is violated"))
238+
assertTrue(e.message!!.contains("Flow invariant is violated"), "But had: ${e.message}")
239239
finish(2)
240240
}
241241
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlin.coroutines.*
8+
9+
/**
10+
* Runs a new coroutine and **blocks** the current thread until its completion.
11+
* This function should not be used from a coroutine. It is designed to bridge regular blocking code
12+
* to libraries that are written in suspending style, to be used in `main` functions and in tests.
13+
*/
14+
public expect fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
@ExperimentalCoroutinesApi
8+
public expect fun newSingleThreadContext(name: String): CloseableCoroutineDispatcher
9+
10+
@ExperimentalCoroutinesApi
11+
public expect fun newFixedThreadPoolContext(nThreads: Int, name: String): CloseableCoroutineDispatcher

kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt renamed to kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt

+5-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ package kotlinx.coroutines.internal
77

88
import kotlinx.atomicfu.*
99
import kotlinx.coroutines.*
10+
import kotlin.jvm.*
11+
import kotlin.native.concurrent.*
1012

1113
private typealias Node = LockFreeLinkedListNode
1214

@@ -20,9 +22,11 @@ internal const val SUCCESS: Int = 1
2022
internal const val FAILURE: Int = 2
2123

2224
@PublishedApi
25+
@SharedImmutable
2326
internal val CONDITION_FALSE: Any = Symbol("CONDITION_FALSE")
2427

2528
@PublishedApi
29+
@SharedImmutable
2630
internal val LIST_EMPTY: Any = Symbol("LIST_EMPTY")
2731

2832
/** @suppress **This is unstable API and it is subject to change.** */
@@ -616,7 +620,7 @@ public actual open class LockFreeLinkedListNode {
616620
assert { next === this._next.value }
617621
}
618622

619-
override fun toString(): String = "${this::class.java.simpleName}@${Integer.toHexString(System.identityHashCode(this))}"
623+
override fun toString(): String = "${this::classSimpleName}@${this.hexAddress}"
620624
}
621625

622626
private class Removed(@JvmField val ref: Node) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
package kotlinx.coroutines
5+
6+
import kotlinx.coroutines.channels.*
7+
import kotlin.test.*
8+
9+
10+
abstract class AbstractDispatcherConcurrencyTest : TestBase() {
11+
12+
public abstract val dispatcher: CoroutineDispatcher
13+
14+
@Test
15+
fun testLaunchAndJoin() = runMtTest {
16+
expect(1)
17+
var capturedMutableState = 0
18+
val job = GlobalScope.launch(dispatcher) {
19+
++capturedMutableState
20+
expect(2)
21+
}
22+
runBlocking { job.join() }
23+
assertEquals(1, capturedMutableState)
24+
finish(3)
25+
}
26+
27+
@Test
28+
fun testDispatcherHasOwnThreads() = runMtTest {
29+
val channel = Channel<Int>()
30+
GlobalScope.launch(dispatcher) {
31+
channel.send(42)
32+
}
33+
34+
var result = ChannelResult.failure<Int>()
35+
while (!result.isSuccess) {
36+
result = channel.tryReceive()
37+
// Block the thread, wait
38+
}
39+
// Delivery was successful, let's check it
40+
assertEquals(42, result.getOrThrow())
41+
}
42+
43+
@Test
44+
fun testDelayInDispatcher() = runMtTest {
45+
expect(1)
46+
val job = GlobalScope.launch(dispatcher) {
47+
expect(2)
48+
delay(100)
49+
expect(3)
50+
}
51+
runBlocking { job.join() }
52+
finish(4)
53+
}
54+
}

kotlinx-coroutines-core/jvm/test/AtomicCancellationTest.kt renamed to kotlinx-coroutines-core/concurrent/test/AtomicCancellationTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines
@@ -142,4 +142,4 @@ class AtomicCancellationTest : TestBase() {
142142
yield() // to jobToJoin & canceller
143143
expect(6)
144144
}
145-
}
145+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlinx.coroutines.exceptions.*
8+
import kotlinx.coroutines.internal.*
9+
import kotlin.test.*
10+
11+
class ConcurrentExceptionsStressTest : TestBase() {
12+
private val nWorkers = 4
13+
private val nRepeat = 1000 * stressTestMultiplier
14+
15+
private var workers: Array<CloseableCoroutineDispatcher> = emptyArray()
16+
17+
@AfterTest
18+
fun tearDown() {
19+
workers.forEach {
20+
it.close()
21+
}
22+
}
23+
24+
@Test
25+
fun testStress() = runMtTest {
26+
workers = Array(nWorkers) { index ->
27+
newSingleThreadContext("JobExceptionsStressTest-$index")
28+
}
29+
30+
repeat(nRepeat) {
31+
testOnce()
32+
}
33+
}
34+
35+
@Suppress("SuspendFunctionOnCoroutineScope") // workaround native inline fun stacktraces
36+
private suspend fun CoroutineScope.testOnce() {
37+
val deferred = async(NonCancellable) {
38+
repeat(nWorkers) { index ->
39+
// Always launch a coroutine even if parent job was already cancelled (atomic start)
40+
launch(workers[index], start = CoroutineStart.ATOMIC) {
41+
randomWait()
42+
throw StressException(index)
43+
}
44+
}
45+
}
46+
deferred.join()
47+
assertTrue(deferred.isCancelled)
48+
val completionException = deferred.getCompletionExceptionOrNull()
49+
val cause = completionException as? StressException
50+
?: unexpectedException("completion", completionException)
51+
val suppressed = cause.suppressed
52+
val indices = listOf(cause.index) + suppressed.mapIndexed { index, e ->
53+
(e as? StressException)?.index ?: unexpectedException("suppressed $index", e)
54+
}
55+
repeat(nWorkers) { index ->
56+
assertTrue(index in indices, "Exception $index is missing: $indices")
57+
}
58+
assertEquals(nWorkers, indices.size, "Duplicated exceptions in list: $indices")
59+
}
60+
61+
private fun unexpectedException(msg: String, e: Throwable?): Nothing {
62+
throw IllegalStateException("Unexpected $msg exception", e)
63+
}
64+
65+
private class StressException(val index: Int) : SuppressSupportingThrowable()
66+
}
67+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.exceptions
6+
7+
import kotlinx.coroutines.*
8+
9+
internal expect open class SuppressSupportingThrowable() : Throwable
10+
expect val Throwable.suppressed: Array<Throwable>
11+
expect fun Throwable.printStackTrace()
12+
13+
expect fun randomWait()
14+
15+
expect fun currentThreadName(): String
16+
17+
inline fun CloseableCoroutineDispatcher.use(block: (CloseableCoroutineDispatcher) -> Unit) {
18+
try {
19+
block(this)
20+
} finally {
21+
close()
22+
}
23+
}

0 commit comments

Comments
 (0)