Skip to content

Commit c414dce

Browse files
committed
~tmp
1 parent 049f0aa commit c414dce

File tree

9 files changed

+463
-15
lines changed

9 files changed

+463
-15
lines changed

build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ allprojects {
128128
google()
129129
mavenCentral()
130130
maven { url "https://maven.pkg.jetbrains.space/kotlin/p/kotlin/dev" }
131+
mavenLocal()
131132
}
132133
}
133134

buildSrc/settings.gradle.kts

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pluginManagement {
1010
maven("https://cache-redirector.jetbrains.com/plugins.gradle.org/m2")
1111
} else {
1212
maven("https://plugins.gradle.org/m2")
13+
mavenLocal()
1314
}
1415
if (build_snapshot_train?.toBoolean() == true) {
1516
mavenLocal()

gradle.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ kotlin_version=1.7.10
1010
# Dependencies
1111
junit_version=4.12
1212
junit5_version=5.7.0
13-
atomicfu_version=0.18.2
13+
atomicfu_version=0.18.2-conditions
1414
knit_version=0.4.0
1515
html_version=0.7.2
1616
lincheck_version=2.14.1

kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt

+50-14
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
package kotlinx.coroutines
66

7+
import kotlinx.atomicfu.*
78
import kotlinx.coroutines.channels.*
9+
import kotlinx.coroutines.internal.*
810
import kotlin.coroutines.*
911
import kotlin.native.concurrent.*
1012

@@ -63,28 +65,62 @@ internal class WorkerDispatcher(name: String) : CloseableCoroutineDispatcher(),
6365
}
6466
}
6567

66-
private class MultiWorkerDispatcher(name: String, workersCount: Int) : CloseableCoroutineDispatcher() {
67-
private val tasksQueue = Channel<Runnable>(Channel.UNLIMITED)
68-
private val workers = Array(workersCount) { Worker.start(name = "$name-$it") }
68+
public class MultiWorkerDispatcher(
69+
private val name: String, private val workersCount: Int
70+
) : CloseableCoroutineDispatcher() {
71+
private val runningWorkers = atomic(0)
72+
private val queue = DispatcherQueue()
73+
private val workers = atomicArrayOfNulls<Worker>(workersCount)
74+
private val isTerminated = atomic(false)
6975

70-
init {
71-
workers.forEach { w -> w.executeAfter(0L) { workerRunLoop() } }
76+
override fun dispatch(context: CoroutineContext, block: Runnable) {
77+
if (runningWorkers.value != workersCount) {
78+
tryAddWorker()
79+
}
80+
queue.put(block)
7281
}
7382

74-
private fun workerRunLoop() = runBlocking {
75-
for (task in tasksQueue) {
76-
// TODO error handling
77-
task.run()
83+
private fun tryAddWorker() {
84+
runningWorkers.loop {
85+
if (it == workersCount) return
86+
if (runningWorkers.compareAndSet(it, it + 1)) {
87+
addWorker(it)
88+
return
89+
}
7890
}
7991
}
8092

81-
override fun dispatch(context: CoroutineContext, block: Runnable) {
82-
// TODO handle rejections
83-
tasksQueue.trySend(block)
93+
private fun addWorker(sequenceNumber: Int) {
94+
val worker = Worker.start(name = "$name-#$sequenceNumber")
95+
workers[sequenceNumber].value = worker
96+
worker.executeAfter(0L) {
97+
workerLoop()
98+
}
99+
}
100+
101+
private fun workerLoop() {
102+
while (!isTerminated.value) {
103+
val runnable = queue.take()
104+
runnable.run()
105+
}
84106
}
85107

86108
override fun close() {
87-
tasksQueue.close()
88-
workers.forEach { it.requestTermination().result }
109+
// TODO it races with worker creation
110+
if (!isTerminated.compareAndSet(false, true)) return
111+
repeat(workersCount) {
112+
queue.put(Runnable {}) // Empty poison pill to wakeup workers and make them check isTerminated
113+
}
114+
115+
val requests = ArrayList<Future<Unit>>()
116+
for (i in 0 until workers.size) {
117+
val worker = workers[i].value ?: continue
118+
requests += worker.requestTermination(false)
119+
}
120+
for (request in requests) {
121+
request.result // Wait for workers termination
122+
}
123+
124+
// queue.close()
89125
}
90126
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
7+
/**
8+
* A domain-specific bounded array-backed blocking queue used
9+
* in multithreaded dispatchers.
10+
*
11+
* It is written in the most straightforward way and is not expected to be a bottleneck.
12+
*
13+
* NB: this implementation used POSIX mutexes and should be explicitly [closed][close]
14+
* in order to release underlying POSIX-specific primitives.
15+
*/
16+
internal class BlockingQueue<T: Any>(capacity: Int) {
17+
18+
init {
19+
// Prior to all resource allocations
20+
require(capacity >= 1) { "Capacity should be positive, but had $capacity" }
21+
}
22+
23+
private val lock = SpinLockWithCondition()
24+
private val notEmptyCondition = lock.Condition()
25+
private val notFullCondition = lock.Condition()
26+
27+
private val elements = arrayOfNulls<Any?>(capacity)
28+
private var tailIndex = 0
29+
private var headIndex = 0
30+
private var size = 0
31+
32+
fun put(value: T) = lock.withLock {
33+
while (size == elements.size) {
34+
notFullCondition.wait()
35+
}
36+
elements[headIndex] = value
37+
if (++headIndex == elements.size) headIndex = 0
38+
++size
39+
notEmptyCondition.notifyOne()
40+
}
41+
42+
fun take(): T = lock.withLock {
43+
while (size == 0) {
44+
notEmptyCondition.wait()
45+
}
46+
val result = elements[tailIndex]
47+
elements[tailIndex] = null
48+
if (++tailIndex == elements.size) tailIndex = 0
49+
--size
50+
notFullCondition.notifyOne()
51+
@Suppress("UNCHECKED_CAST")
52+
return result as T
53+
}
54+
55+
fun close() {
56+
// TODO
57+
}
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
7+
import kotlinx.atomicfu.*
8+
import kotlinx.coroutines.*
9+
10+
internal class DispatcherQueue {
11+
12+
private val queue = LockFreeTaskQueue<Runnable>(false)
13+
private val lock = SpinLockWithCondition()
14+
private val nonEmpty = lock.Condition()
15+
private val nonEmptyWaiters = atomic(0)
16+
17+
fun put(value: Runnable) {
18+
queue.addLast(value)
19+
notifyNonEmpty()
20+
}
21+
22+
private fun notifyNonEmpty() {
23+
if (nonEmptyWaiters.value == 0) return
24+
lock.withLock {
25+
nonEmpty.notifyOne()
26+
}
27+
}
28+
29+
fun take(): Runnable {
30+
takeFastPath()?.let { return it }
31+
return takeSlowPath()
32+
}
33+
34+
private fun takeFastPath(): Runnable? {
35+
val fastPathResult = queue.removeFirstOrNull()
36+
if (fastPathResult !== null) {
37+
return fastPathResult
38+
}
39+
return null
40+
}
41+
42+
private fun takeSlowPath(): Runnable = lock.withLock {
43+
nonEmptyWaiters.incrementAndGet()
44+
while (true) {
45+
// Check if `put` missed our increment
46+
val result = takeFastPath()
47+
if (result !== null) {
48+
nonEmptyWaiters.decrementAndGet()
49+
return result
50+
}
51+
nonEmpty.wait()
52+
}
53+
TODO() // Hack for some reason required for this code to compile
54+
}
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
7+
import kotlinx.atomicfu.*
8+
import kotlin.native.concurrent.*
9+
import kotlin.test.*
10+
11+
private val UNPARK_TASK = {}
12+
private const val DEFAULT = 0
13+
private const val PARKED = 1
14+
private const val UNPARKED = 2
15+
16+
17+
internal class SpinLockWithCondition {
18+
19+
private val isLocked = atomic(false)
20+
21+
private class ParkSupport(private val worker: Worker) {
22+
private val parkingState = atomic(0)
23+
24+
fun park() {
25+
parkingState.loop { state ->
26+
when (state) {
27+
DEFAULT -> {
28+
if (parkingState.compareAndSet(DEFAULT, PARKED)) {
29+
worker.park(-1)
30+
return
31+
}
32+
}
33+
UNPARKED -> {
34+
return
35+
}
36+
PARKED -> {
37+
fail("Unexpected state: worker is already parked")
38+
}
39+
}
40+
}
41+
}
42+
43+
fun unpark() {
44+
parkingState.loop { state ->
45+
when (state) {
46+
DEFAULT -> {
47+
if (parkingState.compareAndSet(DEFAULT, UNPARKED)) {
48+
return
49+
}
50+
}
51+
UNPARKED -> {
52+
fail("Unexpected state: worker is already unparked")
53+
}
54+
PARKED -> {
55+
parkingState.compareAndSet(DEFAULT, UNPARKED)
56+
worker.executeAfter(0L, UNPARK_TASK)
57+
return
58+
}
59+
}
60+
}
61+
}
62+
}
63+
64+
inner class Condition {
65+
private val waitingWorkers = ArrayList<ParkSupport>()
66+
67+
fun wait() {
68+
assertLocked()
69+
val worker = ParkSupport(Worker.current)
70+
waitingWorkers += worker
71+
unlock() // Release before parking
72+
worker.park()
73+
lock() // Immediately lock
74+
}
75+
76+
fun notifyOne() {
77+
assertLocked()
78+
waitingWorkers.removeLastOrNull()?.unpark()
79+
}
80+
}
81+
82+
public fun lock() {
83+
isLocked.loop { locked ->
84+
if (locked) return@loop
85+
else if (isLocked.compareAndSet(false, true)) return
86+
}
87+
}
88+
89+
90+
public fun unlock() {
91+
require(isLocked.compareAndSet(true, false)) { "Lock release should always succeed" }
92+
}
93+
94+
private fun assertLocked() {
95+
require(isLocked.value) { "Lock should be locked" }
96+
}
97+
}
98+
99+
internal inline fun <T> SpinLockWithCondition.withLock(block: () -> T): T {
100+
try {
101+
lock()
102+
return block()
103+
} finally {
104+
unlock()
105+
}
106+
}

0 commit comments

Comments
 (0)