Skip to content

Commit 483aeb1

Browse files
committed
Extract the concurrent structure from MultithreadedDispatchers
Also, test it properly.
1 parent eda908c commit 483aeb1

File tree

3 files changed

+170
-40
lines changed

3 files changed

+170
-40
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
7+
import kotlinx.atomicfu.*
8+
9+
/**
10+
* A thread-safe resource pool.
11+
*
12+
* [maxCapacity] is the maximum amount of elements.
13+
* [create] is the function that creates a new element.
14+
*/
15+
internal class OnDemandAllocatingPool<T>(
16+
private val maxCapacity: Int,
17+
private val create: (Int) -> T
18+
) {
19+
/** Number of existing elements + isClosed flag in the highest bit.
20+
* Once the flag is set, the value is guaranteed not to change anymore. */
21+
private val controlState = atomic(0)
22+
private val elements = atomicArrayOfNulls<T>(maxCapacity)
23+
24+
/**
25+
* Returns the number of elements that need to be cleaned up due to the pool being closed.
26+
*/
27+
@Suppress("NOTHING_TO_INLINE")
28+
private inline fun tryForbidNewElements(): Int {
29+
controlState.loop {
30+
if (it.isClosed()) return 0 // already closed
31+
if (controlState.compareAndSet(it, it or IS_CLOSED_MASK)) return it
32+
}
33+
}
34+
35+
@Suppress("NOTHING_TO_INLINE")
36+
private inline fun Int.isClosed(): Boolean = this and IS_CLOSED_MASK != 0
37+
38+
/**
39+
* Request that a new element is created.
40+
*
41+
* Returns `false` if the pool is closed.
42+
*
43+
* Note that it will still return `true` even if an element was not created due to reaching [maxCapacity].
44+
*/
45+
fun allocate(): Boolean {
46+
controlState.loop { ctl ->
47+
if (ctl.isClosed()) return false
48+
if (ctl >= maxCapacity) return true
49+
if (controlState.compareAndSet(ctl, ctl + 1)) {
50+
elements[ctl].value = create(ctl)
51+
return true
52+
}
53+
}
54+
}
55+
56+
/**
57+
* Close the pool.
58+
*
59+
* This will prevent any new elements from being created.
60+
* All the elements present in the pool will be returned.
61+
*
62+
* The function is thread-safe.
63+
*
64+
* [close] can be called multiple times, but only a single call will return a non-empty list.
65+
* This is due to the elements being cleaned out from the pool on the first invocation to avoid memory leaks,
66+
* and no new elements being created after.
67+
*/
68+
fun close(): List<T> {
69+
val elementsExisting = tryForbidNewElements()
70+
val result = (0 until elementsExisting).map { i ->
71+
// we wait for the element to be created, because we know that eventually it is going to be there
72+
loop {
73+
val element = elements[i].getAndSet(null)
74+
if (element != null) {
75+
return@map element
76+
}
77+
}
78+
}
79+
return result
80+
}
81+
82+
// for tests
83+
internal fun stateRepresentation(): String {
84+
val ctl = controlState.value
85+
val elementsStr = (0 until (ctl and IS_CLOSED_MASK.inv())).map { elements[it].value }.toString()
86+
val closedStr = if (ctl and IS_CLOSED_MASK != 0) "[closed]" else ""
87+
return elementsStr + closedStr
88+
}
89+
90+
override fun toString(): String = "OnDemandAllocatingPool(${stateRepresentation()})"
91+
}
92+
93+
// KT-25023
94+
private inline fun loop(block: () -> Unit): Nothing {
95+
while (true) {
96+
block()
97+
}
98+
}
99+
100+
private const val IS_CLOSED_MASK = 1 shl 31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
7+
import kotlinx.atomicfu.*
8+
import kotlinx.coroutines.*
9+
import org.jetbrains.kotlinx.lincheck.*
10+
import org.jetbrains.kotlinx.lincheck.annotations.*
11+
12+
/**
13+
* Test that:
14+
* * All elements allocated in [OnDemandAllocatingPool] get returned when [close] is invoked.
15+
* * After reaching the maximum capacity, new elements are not added.
16+
* * After [close] is invoked, [OnDemandAllocatingPool.allocate] returns `false`.
17+
* * [OnDemandAllocatingPool.close] will return an empty list after the first invocation.
18+
*/
19+
abstract class OnDemandAllocatingPoolLincheckTest(maxCapacity: Int) : AbstractLincheckTest() {
20+
private val counter = atomic(0)
21+
private val pool = OnDemandAllocatingPool(maxCapacity = maxCapacity, create = {
22+
counter.getAndIncrement()
23+
})
24+
25+
@Operation
26+
fun allocate(): Boolean = pool.allocate()
27+
28+
@Operation
29+
fun close(): String = pool.close().sorted().toString()
30+
31+
override fun extractState(): Any = pool.stateRepresentation()
32+
}
33+
34+
abstract class OnDemandAllocatingSequentialPool(private val maxCapacity: Int) {
35+
var closed = false
36+
var elements = 0
37+
38+
fun allocate() = if (closed) {
39+
false
40+
} else {
41+
if (elements < maxCapacity) {
42+
elements++
43+
}
44+
true
45+
}
46+
47+
fun close(): String = if (closed) {
48+
emptyList()
49+
} else {
50+
closed = true
51+
(0 until elements)
52+
}.sorted().toString()
53+
}
54+
55+
class OnDemandAllocatingPoolLincheckTest3 : OnDemandAllocatingPoolLincheckTest(3) {
56+
override fun <O : Options<O, *>> O.customize(isStressTest: Boolean): O =
57+
this.sequentialSpecification(OnDemandAllocatingSequentialPool3::class.java)
58+
}
59+
60+
class OnDemandAllocatingSequentialPool3 : OnDemandAllocatingSequentialPool(3)

kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt

+10-40
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44

55
package kotlinx.coroutines
66

7-
import kotlinx.atomicfu.*
87
import kotlinx.coroutines.channels.*
8+
import kotlinx.coroutines.internal.*
99
import kotlin.coroutines.*
1010
import kotlin.native.concurrent.*
1111

@@ -68,24 +68,15 @@ internal class WorkerDispatcher(name: String) : CloseableCoroutineDispatcher(),
6868
}
6969
}
7070

71-
private const val IS_CLOSED_MASK = 1 shl 31
72-
7371
private class MultiWorkerDispatcher(
7472
private val name: String,
75-
private val workersCount: Int
73+
workersCount: Int
7674
) : CloseableCoroutineDispatcher() {
7775
private val tasksQueue = Channel<Runnable>(Channel.UNLIMITED)
78-
private val workers = atomicArrayOfNulls<Worker>(workersCount)
79-
80-
// Number of active workers + isClosed flag in the highest bit
81-
private val controlState = atomic(0)
82-
private val activeWorkers: Int get() = controlState.value and (IS_CLOSED_MASK.inv())
83-
private inline fun forbidNewWorkers() {
84-
controlState.update { it or IS_CLOSED_MASK }
85-
}
86-
87-
private inline fun Int.isClosed(): Boolean {
88-
return this and IS_CLOSED_MASK != 0
76+
private val workerPool = OnDemandAllocatingPool(workersCount) {
77+
Worker.start(name = "$name-$it").apply {
78+
executeAfter { workerRunLoop() }
79+
}
8980
}
9081

9182
private fun workerRunLoop() = runBlocking {
@@ -104,42 +95,21 @@ private class MultiWorkerDispatcher(
10495
throw IllegalStateException("Dispatcher $name was closed, attempted to schedule: $block")
10596
}
10697

107-
if (activeWorkers != workersCount) {
108-
if (!tryAddWorker()) throwClosed(block) // Do not even try to send to avoid race
109-
}
98+
if (!workerPool.allocate()) throwClosed(block) // Do not even try to send to avoid race
11099

111100
tasksQueue.trySend(block).onClosed {
112101
throwClosed(block)
113102
}
114103
}
115104

116-
// Returns 'false' is the dispatcher was closed, 'true' otherwise
117-
private fun tryAddWorker(): Boolean {
118-
controlState.loop { ctl ->
119-
if (ctl.isClosed()) return false
120-
if (ctl >= workersCount) return true
121-
if (controlState.compareAndSet(ctl, ctl + 1)) {
122-
val worker = Worker.start(name = "$name-$ctl")
123-
worker.executeAfter { workerRunLoop() }
124-
workers[ctl].value = worker
125-
return true
126-
}
127-
}
128-
}
129-
130105
override fun close() {
131-
forbidNewWorkers()
106+
val workers = workerPool.close()
132107
tasksQueue.close()
133108
/*
134109
* Here we cannot avoid waiting on `.result`, otherwise it will lead
135110
* to a native memory leak, including a pthread handle.
136111
*/
137-
val requests = Array(activeWorkers) {
138-
while (workers[it].value == null) {
139-
// wait until tryAddWorker completes and sets the worker
140-
}
141-
workers[it].value?.requestTermination()
142-
}
143-
requests.map { it?.result }
112+
val requests = workers.map { it.requestTermination() }
113+
requests.map { it.result }
144114
}
145115
}

0 commit comments

Comments
 (0)