Skip to content

Commit 298eb11

Browse files
New implementation of K/N multi-threaded dispatchers (#3421)
* Get rid of the race on LimitedDispatcher due to @volatile being no-op on K/N Improve newFixedThreadPoolContext on K/N: * Create workers lazily * Do properly handle rejection * Await termination in parallel * Extract the concurrent structure from MultithreadedDispatchers to test it with Lincheck * Fix a race Co-authored-by: dkhalanskyjb <[email protected]> Co-authored-by: Dmitry Khalanskiy <[email protected]>
1 parent 150f185 commit 298eb11

File tree

5 files changed

+208
-19
lines changed

5 files changed

+208
-19
lines changed

kotlinx-coroutines-core/build.gradle

+2-2
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ jvmTest {
185185
}
186186
// 'stress' is required to be able to run all subpackage tests like ":jvmTests --tests "*channels*" -Pstress=true"
187187
if (!Idea.active && rootProject.properties['stress'] == null) {
188-
exclude '**/*LincheckTest.*'
188+
exclude '**/*LincheckTest*'
189189
exclude '**/*StressTest.*'
190190
}
191191
if (Idea.active) {
@@ -229,7 +229,7 @@ task jvmStressTest(type: Test, dependsOn: compileTestKotlinJvm) {
229229
task jvmLincheckTest(type: Test, dependsOn: compileTestKotlinJvm) {
230230
classpath = files { jvmTest.classpath }
231231
testClassesDirs = files { jvmTest.testClassesDirs }
232-
include '**/*LincheckTest.*'
232+
include '**/*LincheckTest*'
233233
enableAssertions = true
234234
testLogging.showStandardStreams = true
235235
configureJvmForLincheck(jvmLincheckTest)

kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt

+9-7
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package kotlinx.coroutines.internal
66

7+
import kotlinx.atomicfu.*
78
import kotlinx.coroutines.*
89
import kotlin.coroutines.*
910
import kotlin.jvm.*
@@ -18,8 +19,9 @@ internal class LimitedDispatcher(
1819
private val parallelism: Int
1920
) : CoroutineDispatcher(), Runnable, Delay by (dispatcher as? Delay ?: DefaultDelay) {
2021

21-
@Volatile
22-
private var runningWorkers = 0
22+
// Atomic is necessary here for the sake of K/N memory ordering,
23+
// there is no need in atomic operations for this property
24+
private val runningWorkers = atomic(0)
2325

2426
private val queue = LockFreeTaskQueue<Runnable>(singleConsumer = false)
2527

@@ -54,9 +56,9 @@ internal class LimitedDispatcher(
5456
}
5557

5658
synchronized(workerAllocationLock) {
57-
--runningWorkers
59+
runningWorkers.decrementAndGet()
5860
if (queue.size == 0) return
59-
++runningWorkers
61+
runningWorkers.incrementAndGet()
6062
fairnessCounter = 0
6163
}
6264
}
@@ -90,15 +92,15 @@ internal class LimitedDispatcher(
9092

9193
private fun tryAllocateWorker(): Boolean {
9294
synchronized(workerAllocationLock) {
93-
if (runningWorkers >= parallelism) return false
94-
++runningWorkers
95+
if (runningWorkers.value >= parallelism) return false
96+
runningWorkers.incrementAndGet()
9597
return true
9698
}
9799
}
98100

99101
private fun addAndTryDispatching(block: Runnable): Boolean {
100102
queue.addLast(block)
101-
return runningWorkers >= parallelism
103+
return runningWorkers.value >= parallelism
102104
}
103105
}
104106

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
7+
import kotlinx.atomicfu.*
8+
9+
/**
10+
* A thread-safe resource pool.
11+
*
12+
* [maxCapacity] is the maximum amount of elements.
13+
* [create] is the function that creates a new element.
14+
*
15+
* This is only used in the Native implementation,
16+
* but is part of the `concurrent` source set in order to test it on the JVM.
17+
*/
18+
internal class OnDemandAllocatingPool<T>(
19+
private val maxCapacity: Int,
20+
private val create: (Int) -> T
21+
) {
22+
/**
23+
* Number of existing elements + isClosed flag in the highest bit.
24+
* Once the flag is set, the value is guaranteed not to change anymore.
25+
*/
26+
private val controlState = atomic(0)
27+
private val elements = atomicArrayOfNulls<T>(maxCapacity)
28+
29+
/**
30+
* Returns the number of elements that need to be cleaned up due to the pool being closed.
31+
*/
32+
@Suppress("NOTHING_TO_INLINE")
33+
private inline fun tryForbidNewElements(): Int {
34+
controlState.loop {
35+
if (it.isClosed()) return 0 // already closed
36+
if (controlState.compareAndSet(it, it or IS_CLOSED_MASK)) return it
37+
}
38+
}
39+
40+
@Suppress("NOTHING_TO_INLINE")
41+
private inline fun Int.isClosed(): Boolean = this and IS_CLOSED_MASK != 0
42+
43+
/**
44+
* Request that a new element is created.
45+
*
46+
* Returns `false` if the pool is closed.
47+
*
48+
* Note that it will still return `true` even if an element was not created due to reaching [maxCapacity].
49+
*
50+
* Rethrows the exceptions thrown from [create]. In this case, this operation has no effect.
51+
*/
52+
fun allocate(): Boolean {
53+
controlState.loop { ctl ->
54+
if (ctl.isClosed()) return false
55+
if (ctl >= maxCapacity) return true
56+
if (controlState.compareAndSet(ctl, ctl + 1)) {
57+
elements[ctl].value = create(ctl)
58+
return true
59+
}
60+
}
61+
}
62+
63+
/**
64+
* Close the pool.
65+
*
66+
* This will prevent any new elements from being created.
67+
* All the elements present in the pool will be returned.
68+
*
69+
* The function is thread-safe.
70+
*
71+
* [close] can be called multiple times, but only a single call will return a non-empty list.
72+
* This is due to the elements being cleaned out from the pool on the first invocation to avoid memory leaks,
73+
* and no new elements being created after.
74+
*/
75+
fun close(): List<T> {
76+
val elementsExisting = tryForbidNewElements()
77+
return (0 until elementsExisting).map { i ->
78+
// we wait for the element to be created, because we know that eventually it is going to be there
79+
loop {
80+
val element = elements[i].getAndSet(null)
81+
if (element != null) {
82+
return@map element
83+
}
84+
}
85+
}
86+
}
87+
88+
// for tests
89+
internal fun stateRepresentation(): String {
90+
val ctl = controlState.value
91+
val elementsStr = (0 until (ctl and IS_CLOSED_MASK.inv())).map { elements[it].value }.toString()
92+
val closedStr = if (ctl.isClosed()) "[closed]" else ""
93+
return elementsStr + closedStr
94+
}
95+
96+
override fun toString(): String = "OnDemandAllocatingPool(${stateRepresentation()})"
97+
}
98+
99+
// KT-25023
100+
private inline fun loop(block: () -> Unit): Nothing {
101+
while (true) {
102+
block()
103+
}
104+
}
105+
106+
private const val IS_CLOSED_MASK = 1 shl 31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
7+
import kotlinx.atomicfu.*
8+
import kotlinx.coroutines.*
9+
import org.jetbrains.kotlinx.lincheck.*
10+
import org.jetbrains.kotlinx.lincheck.annotations.*
11+
12+
/**
13+
* Test that:
14+
* * All elements allocated in [OnDemandAllocatingPool] get returned when [close] is invoked.
15+
* * After reaching the maximum capacity, new elements are not added.
16+
* * After [close] is invoked, [OnDemandAllocatingPool.allocate] returns `false`.
17+
* * [OnDemandAllocatingPool.close] will return an empty list after the first invocation.
18+
*/
19+
abstract class OnDemandAllocatingPoolLincheckTest(maxCapacity: Int) : AbstractLincheckTest() {
20+
private val counter = atomic(0)
21+
private val pool = OnDemandAllocatingPool(maxCapacity = maxCapacity, create = {
22+
counter.getAndIncrement()
23+
})
24+
25+
@Operation
26+
fun allocate(): Boolean = pool.allocate()
27+
28+
@Operation
29+
fun close(): String = pool.close().sorted().toString()
30+
31+
override fun extractState(): Any = pool.stateRepresentation()
32+
}
33+
34+
abstract class OnDemandAllocatingSequentialPool(private val maxCapacity: Int) {
35+
var closed = false
36+
var elements = 0
37+
38+
fun allocate() = if (closed) {
39+
false
40+
} else {
41+
if (elements < maxCapacity) {
42+
elements++
43+
}
44+
true
45+
}
46+
47+
fun close(): String = if (closed) {
48+
emptyList()
49+
} else {
50+
closed = true
51+
(0 until elements)
52+
}.sorted().toString()
53+
}
54+
55+
class OnDemandAllocatingPool3LincheckTest : OnDemandAllocatingPoolLincheckTest(3) {
56+
override fun <O : Options<O, *>> O.customize(isStressTest: Boolean): O =
57+
this.sequentialSpecification(OnDemandAllocatingSequentialPool3::class.java)
58+
}
59+
60+
class OnDemandAllocatingSequentialPool3 : OnDemandAllocatingSequentialPool(3)

kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt

+31-10
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package kotlinx.coroutines
66

77
import kotlinx.coroutines.channels.*
8+
import kotlinx.coroutines.internal.*
89
import kotlin.coroutines.*
910
import kotlin.native.concurrent.*
1011

@@ -14,7 +15,7 @@ public actual fun newSingleThreadContext(name: String): CloseableCoroutineDispat
1415
}
1516

1617
public actual fun newFixedThreadPoolContext(nThreads: Int, name: String): CloseableCoroutineDispatcher {
17-
require(nThreads >= 1) { "Expected at least one thread, but got: $nThreads"}
18+
require(nThreads >= 1) { "Expected at least one thread, but got: $nThreads" }
1819
return MultiWorkerDispatcher(name, nThreads)
1920
}
2021

@@ -67,28 +68,48 @@ internal class WorkerDispatcher(name: String) : CloseableCoroutineDispatcher(),
6768
}
6869
}
6970

70-
private class MultiWorkerDispatcher(name: String, workersCount: Int) : CloseableCoroutineDispatcher() {
71+
private class MultiWorkerDispatcher(
72+
private val name: String,
73+
workersCount: Int
74+
) : CloseableCoroutineDispatcher() {
7175
private val tasksQueue = Channel<Runnable>(Channel.UNLIMITED)
72-
private val workers = Array(workersCount) { Worker.start(name = "$name-$it") }
73-
74-
init {
75-
workers.forEach { w -> w.executeAfter(0L) { workerRunLoop() } }
76+
private val workerPool = OnDemandAllocatingPool(workersCount) {
77+
Worker.start(name = "$name-$it").apply {
78+
executeAfter { workerRunLoop() }
79+
}
7680
}
7781

7882
private fun workerRunLoop() = runBlocking {
83+
// NB: we leverage tail-call optimization in this loop, do not replace it with
84+
// .receive() without proper evaluation
7985
for (task in tasksQueue) {
80-
// TODO error handling
86+
/**
87+
* Any unhandled exception here will pass through worker's boundary and will be properly reported.
88+
*/
8189
task.run()
8290
}
8391
}
8492

8593
override fun dispatch(context: CoroutineContext, block: Runnable) {
86-
// TODO handle rejections
87-
tasksQueue.trySend(block)
94+
fun throwClosed(block: Runnable) {
95+
throw IllegalStateException("Dispatcher $name was closed, attempted to schedule: $block")
96+
}
97+
98+
if (!workerPool.allocate()) throwClosed(block) // Do not even try to send to avoid race
99+
100+
tasksQueue.trySend(block).onClosed {
101+
throwClosed(block)
102+
}
88103
}
89104

90105
override fun close() {
106+
val workers = workerPool.close()
91107
tasksQueue.close()
92-
workers.forEach { it.requestTermination().result }
108+
/*
109+
* Here we cannot avoid waiting on `.result`, otherwise it will lead
110+
* to a native memory leak, including a pthread handle.
111+
*/
112+
val requests = workers.map { it.requestTermination() }
113+
requests.map { it.result }
93114
}
94115
}

0 commit comments

Comments
 (0)